From: Vsevolod Stakhov Date: Sat, 4 May 2024 13:57:45 +0000 (+0100) Subject: [Project] Further tcp/udp refactoring X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=937128e11d29db2c2843eae00a520a579a2c0ddc;p=rspamd.git [Project] Further tcp/udp refactoring --- diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index ce8b205a7..0ecac635b 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -237,12 +237,24 @@ struct fuzzy_tcp_reply { struct fuzzy_tcp_reply *prev, *next; /* Link */ }; -struct fuzzy_tcp_session { - struct rspamd_worker *worker; - rspamd_inet_addr_t *addr; +struct fuzzy_common_session { struct rspamd_fuzzy_storage_ctx *ctx; int fd; struct ev_io io; + ev_tstamp timestamp; + struct rspamd_worker *worker; + rspamd_inet_addr_t *addr; + + enum rspamd_fuzzy_epoch epoch; + enum fuzzy_cmd_type cmd_type; + struct rspamd_fuzzy_shingle_cmd cmd; + struct fuzzy_key *key; + struct rspamd_fuzzy_cmd_extension *extensions; + struct fuzzy_key_stat *ip_stat; + unsigned char nm[rspamd_cryptobox_MAX_NMBYTES]; +}; + +struct fuzzy_tcp_session { struct ev_timer tm; /* @@ -256,39 +268,17 @@ struct fuzzy_tcp_session { uint16_t bytes_unprocessed; /* Common with UDP session */ - enum rspamd_fuzzy_epoch epoch; - enum fuzzy_cmd_type cmd_type; - struct rspamd_fuzzy_shingle_cmd cmd; - struct fuzzy_key *key; - struct rspamd_fuzzy_cmd_extension *extensions; - struct fuzzy_key_stat *ip_stat; - unsigned char nm[rspamd_cryptobox_MAX_NMBYTES]; - + struct fuzzy_common_session common; ref_entry_t ref; struct fuzzy_tcp_reply *replies_queue; unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH]; }; -struct fuzzy_session { - struct rspamd_worker *worker; - rspamd_inet_addr_t *addr; - struct rspamd_fuzzy_storage_ctx *ctx; - - struct rspamd_fuzzy_shingle_cmd cmd; /* Can handle both shingles and non-shingles */ +struct fuzzy_udp_session { + /* Common fields with TCP session */ + struct fuzzy_common_session common; struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */ - struct fuzzy_key_stat *ip_stat; - - enum rspamd_fuzzy_epoch epoch; - enum fuzzy_cmd_type cmd_type; - int fd; - ev_tstamp timestamp; - - struct fuzzy_key *key; - struct rspamd_fuzzy_cmd_extension *extensions; - unsigned char nm[rspamd_cryptobox_MAX_NMBYTES]; - - struct ev_io io; ref_entry_t ref; }; @@ -305,7 +295,7 @@ struct rspamd_updates_cbdata { }; -static void rspamd_fuzzy_write_reply(struct fuzzy_session *session); +static void rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session); static gboolean rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx, const char *source, gboolean final); static gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx, @@ -887,28 +877,28 @@ rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx, } static void -rspamd_fuzzy_reply_io(EV_P_ ev_io *w, int revents) +rspamd_fuzzy_udp_reply_io(EV_P_ ev_io *w, int revents) { - struct fuzzy_session *session = (struct fuzzy_session *) w->data; + struct fuzzy_udp_session *session = (struct fuzzy_udp_session *) w->data; ev_io_stop(EV_A_ w); - rspamd_fuzzy_write_reply(session); + rspamd_fuzzy_udp_write_reply(session); REF_RELEASE(session); } static void -rspamd_fuzzy_write_reply(struct fuzzy_session *session) +rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session) { gssize r; gsize len; gconstpointer data; - if (session->cmd_type == CMD_ENCRYPTED_NORMAL || - session->cmd_type == CMD_ENCRYPTED_SHINGLE) { + if (session->common.cmd_type == CMD_ENCRYPTED_NORMAL || + session->common.cmd_type == CMD_ENCRYPTED_SHINGLE) { /* Encrypted reply */ data = &session->reply; - if (session->epoch > RSPAMD_FUZZY_EPOCH10) { + if (session->common.epoch > RSPAMD_FUZZY_EPOCH10) { len = sizeof(session->reply); } else { @@ -918,7 +908,7 @@ rspamd_fuzzy_write_reply(struct fuzzy_session *session) else { data = &session->reply.rep; - if (session->epoch > RSPAMD_FUZZY_EPOCH10) { + if (session->common.epoch > RSPAMD_FUZZY_EPOCH10) { len = sizeof(session->reply.rep); } else { @@ -926,17 +916,17 @@ rspamd_fuzzy_write_reply(struct fuzzy_session *session) } } - r = rspamd_inet_address_sendto(session->fd, data, len, 0, - session->addr); + r = rspamd_inet_address_sendto(session->common.fd, data, len, 0, + session->common.addr); if (r == -1) { if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { /* Grab reference to avoid early destruction */ REF_RETAIN(session); - session->io.data = session; - ev_io_init(&session->io, - rspamd_fuzzy_reply_io, session->fd, EV_WRITE); - ev_io_start(session->ctx->event_loop, &session->io); + session->common.io.data = session; + ev_io_init(&session->common.io, + rspamd_fuzzy_udp_reply_io, session->common.fd, EV_WRITE); + ev_io_start(session->common.ctx->event_loop, &session->common.io); } else { msg_err("error while writing reply: %s", strerror(errno)); @@ -1060,11 +1050,43 @@ enum rspamd_fuzzy_reply_flags { RSPAMD_FUZZY_REPLY_DELAY = 0x1u << 2u, }; +static bool +rspamd_fuzzy_can_reply(struct fuzzy_common_session *session, bool encrypted, int flag, float prob) +{ + bool default_disabled = false; + + { + khiter_t k; + + k = kh_get(fuzzy_key_ids_set, session->ctx->default_forbidden_ids, flag); + + if (k != kh_end(session->ctx->default_forbidden_ids)) { + /* Hash is from a forbidden flag by default */ + default_disabled = true; + } + } + + if (encrypted) { + if (prob > 0 && session->key && session->key->forbidden_ids) { + khiter_t k; + + k = kh_get(fuzzy_key_ids_set, session->key->forbidden_ids, flag); + + if (k != kh_end(session->key->forbidden_ids)) { + /* Hash is from a forbidden flag for this key */ + default_disabled = true; + } + } + } + + return !default_disabled; +} + static void -rspamd_fuzzy_make_reply(struct rspamd_fuzzy_cmd *cmd, - struct rspamd_fuzzy_reply *result, - struct fuzzy_session *session, - int flags) +rspamd_fuzzy_make_udp_reply(struct rspamd_fuzzy_cmd *cmd, + struct rspamd_fuzzy_reply *result, + struct fuzzy_udp_session *session, + int flags) { gsize len; @@ -1078,43 +1100,31 @@ rspamd_fuzzy_make_reply(struct rspamd_fuzzy_cmd *cmd, session->reply.rep.v1.prob = 0.0f; session->reply.rep.v1.value = 0; } - - bool default_disabled = false; - - { - khiter_t k; - - k = kh_get(fuzzy_key_ids_set, session->ctx->default_forbidden_ids, session->reply.rep.v1.flag); - - if (k != kh_end(session->ctx->default_forbidden_ids)) { - /* Hash is from a forbidden flag by default */ - default_disabled = true; + else if (!rspamd_fuzzy_can_reply(&session->common, flags & RSPAMD_FUZZY_REPLY_ENCRYPTED, + session->reply.rep.v1.flag, session->reply.rep.v1.prob)) { + /* Hash is from a forbidden flag */ + session->reply.rep.ts = 0; + session->reply.rep.v1.prob = 0.0f; + session->reply.rep.v1.value = 0; + session->reply.rep.v1.flag = 0; + } + else { + /* Update stats before encryption */ + if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) { + rspamd_fuzzy_update_stats(session->common.ctx, + session->common.epoch, + session->reply.rep.v1.prob > 0.5f, + flags & RSPAMD_FUZZY_REPLY_SHINGLE, + flags & RSPAMD_FUZZY_REPLY_DELAY, + session->common.key, + session->common.ip_stat, + cmd->cmd, + &session->reply.rep, + session->common.timestamp); } } if (flags & RSPAMD_FUZZY_REPLY_ENCRYPTED) { - - if (session->reply.rep.v1.prob > 0 && session->key && session->key->forbidden_ids) { - khiter_t k; - - k = kh_get(fuzzy_key_ids_set, session->key->forbidden_ids, session->reply.rep.v1.flag); - - if (k != kh_end(session->key->forbidden_ids)) { - /* Hash is from a forbidden flag for this key */ - session->reply.rep.ts = 0; - session->reply.rep.v1.prob = 0.0f; - session->reply.rep.v1.value = 0; - session->reply.rep.v1.flag = 0; - } - } - else if (default_disabled) { - /* Hash is from a forbidden flag by default */ - session->reply.rep.ts = 0; - session->reply.rep.v1.prob = 0.0f; - session->reply.rep.v1.value = 0; - session->reply.rep.v1.flag = 0; - } - /* We need also to encrypt reply */ ottery_rand_bytes(session->reply.hdr.nonce, sizeof(session->reply.hdr.nonce)); @@ -1124,58 +1134,23 @@ rspamd_fuzzy_make_reply(struct rspamd_fuzzy_cmd *cmd, * decryption would fail due to mac verification mistake */ - if (session->epoch > RSPAMD_FUZZY_EPOCH10) { + if (session->common.epoch > RSPAMD_FUZZY_EPOCH10) { len = sizeof(session->reply.rep); } else { len = sizeof(session->reply.rep.v1); } - /* Update stats before encryption */ - if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) { - rspamd_fuzzy_update_stats(session->ctx, - session->epoch, - session->reply.rep.v1.prob > 0.5f, - flags & RSPAMD_FUZZY_REPLY_SHINGLE, - flags & RSPAMD_FUZZY_REPLY_DELAY, - session->key, - session->ip_stat, - cmd->cmd, - &session->reply.rep, - session->timestamp); - } - rspamd_cryptobox_encrypt_nm_inplace((unsigned char *) &session->reply.rep, len, session->reply.hdr.nonce, - session->nm, + session->common.nm, session->reply.hdr.mac, RSPAMD_CRYPTOBOX_MODE_25519); } - else if (default_disabled) { - /* Hash is from a forbidden flag by default, and there is no encryption override */ - session->reply.rep.ts = 0; - session->reply.rep.v1.prob = 0.0f; - session->reply.rep.v1.value = 0; - session->reply.rep.v1.flag = 0; - } - if (!(flags & RSPAMD_FUZZY_REPLY_ENCRYPTED)) { - if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) { - rspamd_fuzzy_update_stats(session->ctx, - session->epoch, - session->reply.rep.v1.prob > 0.5f, - flags & RSPAMD_FUZZY_REPLY_SHINGLE, - flags & RSPAMD_FUZZY_REPLY_DELAY, - session->key, - session->ip_stat, - cmd->cmd, - &session->reply.rep, - session->timestamp); - } - } } - rspamd_fuzzy_write_reply(session); + rspamd_fuzzy_udp_write_reply(session); } static gboolean @@ -1237,35 +1212,33 @@ rspamd_fuzzy_extensions_tolua(lua_State *L, } } -static void -rspamd_fuzzy_check_callback(struct rspamd_fuzzy_reply *result, void *ud) +static struct rspamd_fuzzy_cmd * +rspamd_fuzzy_fill_reply(struct rspamd_fuzzy_reply *result, struct fuzzy_common_session *session, int *send_flags) { - struct fuzzy_session *session = ud; - gboolean is_shingle = FALSE, __attribute__((unused)) encrypted = FALSE; + bool encrypted = false, is_shingle = false; struct rspamd_fuzzy_cmd *cmd = NULL; const struct rspamd_shingle *shingle = NULL; struct rspamd_shingle sgl_cpy; - int send_flags = 0; switch (session->cmd_type) { case CMD_ENCRYPTED_NORMAL: - encrypted = TRUE; - send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED; + encrypted = true; + *send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED; /* Fallthrough */ case CMD_NORMAL: cmd = &session->cmd.basic; break; case CMD_ENCRYPTED_SHINGLE: - encrypted = TRUE; - send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED; + encrypted = true; + *send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED; /* Fallthrough */ case CMD_SHINGLE: cmd = &session->cmd.basic; memcpy(&sgl_cpy, &session->cmd.sgl, sizeof(sgl_cpy)); shingle = &sgl_cpy; - is_shingle = TRUE; - send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE; + is_shingle = true; + *send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE; break; } @@ -1335,10 +1308,7 @@ rspamd_fuzzy_check_callback(struct rspamd_fuzzy_reply *result, void *ud) } lua_settop(L, 0); - rspamd_fuzzy_make_reply(cmd, result, session, send_flags); - REF_RELEASE(session); - - return; + return cmd; } } @@ -1353,7 +1323,7 @@ rspamd_fuzzy_check_callback(struct rspamd_fuzzy_reply *result, void *ud) session->ctx->delay / 2.0); if (hash_age < jittered_age) { - send_flags |= RSPAMD_FUZZY_REPLY_DELAY; + *send_flags |= RSPAMD_FUZZY_REPLY_DELAY; } } @@ -1407,57 +1377,73 @@ rspamd_fuzzy_check_callback(struct rspamd_fuzzy_reply *result, void *ud) } } - rspamd_fuzzy_make_reply(cmd, result, session, send_flags); + return cmd; +} + +static void +rspamd_fuzzy_udp_check_callback(struct rspamd_fuzzy_reply *result, void *ud) +{ + struct fuzzy_udp_session *session = (struct fuzzy_udp_session *) ud; + int send_flags = 0; + struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_fill_reply(result, &session->common, &send_flags); + rspamd_fuzzy_make_udp_reply(cmd, result, session, send_flags); REF_RELEASE(session); } static void -rspamd_fuzzy_process_udp_session(struct fuzzy_session *session) +rspamd_fuzzy_tcp_check_callback(struct rspamd_fuzzy_reply *result, void *ud) { - gboolean is_shingle = FALSE, __attribute__((unused)) encrypted = FALSE; - struct rspamd_fuzzy_cmd *cmd = NULL; - struct rspamd_fuzzy_reply result; - struct fuzzy_peer_cmd up_cmd; - struct fuzzy_peer_request *up_req; - struct fuzzy_key_stat *ip_stat = NULL; - char hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1]; - rspamd_inet_addr_t *naddr; - gpointer ptr; - gsize up_len = 0; + struct fuzzy_tcp_session *session = (struct fuzzy_tcp_session *) ud; int send_flags = 0; + struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_fill_reply(result, &session->common, &send_flags); + /* TODO write reply impl */ + REF_RELEASE(session); +} + +static struct rspamd_fuzzy_cmd * +rspamd_fuzzy_prepare_cmd(struct fuzzy_common_session *session, + struct rspamd_fuzzy_reply *result, + int *send_flags, + size_t *up_len, + bool *final) +{ + struct fuzzy_key_stat *ip_stat = NULL; + struct rspamd_fuzzy_cmd *cmd = NULL; + bool is_encrypted = false, is_shingle = false; + cmd = &session->cmd.basic; switch (session->cmd_type) { case CMD_NORMAL: - up_len = sizeof(session->cmd.basic); + *up_len = sizeof(session->cmd.basic); break; case CMD_SHINGLE: - up_len = sizeof(session->cmd); - is_shingle = TRUE; - send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE; + *up_len = sizeof(session->cmd); + is_shingle = true; + *send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE; break; case CMD_ENCRYPTED_NORMAL: - up_len = sizeof(session->cmd.basic); - encrypted = TRUE; - send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED; + *up_len = sizeof(session->cmd.basic); + is_encrypted = true; + *send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED; break; case CMD_ENCRYPTED_SHINGLE: - up_len = sizeof(session->cmd); - encrypted = TRUE; - is_shingle = TRUE; - send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE | RSPAMD_FUZZY_REPLY_ENCRYPTED; + *up_len = sizeof(session->cmd); + is_encrypted = true; + is_shingle = true; + *send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE | RSPAMD_FUZZY_REPLY_ENCRYPTED; break; default: msg_err("invalid command type: %d", session->cmd_type); - return; + return NULL; } - memset(&result, 0, sizeof(result)); - memcpy(result.digest, cmd->digest, sizeof(result.digest)); - result.v1.flag = cmd->flag; - result.v1.tag = cmd->tag; + memset(result, 0, sizeof(*result)); + memcpy(result->digest, cmd->digest, sizeof(result->digest)); + result->v1.flag = cmd->flag; + result->v1.tag = cmd->tag; if (session->ctx->lua_pre_handler_cbref != -1) { /* Start lua pre handler */ @@ -1497,19 +1483,18 @@ rspamd_fuzzy_process_udp_session(struct fuzzy_session *session) if (ret) { /* Artificial reply */ - result.v1.value = lua_tointeger(L, err_idx + 2); + result->v1.value = lua_tointeger(L, err_idx + 2); if (lua_isnumber(L, err_idx + 3)) { - result.v1.prob = lua_tonumber(L, err_idx + 3); + result->v1.prob = lua_tonumber(L, err_idx + 3); } else { - result.v1.prob = 0.0f; + result->v1.prob = 0.0f; } lua_settop(L, 0); - rspamd_fuzzy_make_reply(cmd, &result, session, send_flags); - return; + return cmd; } } @@ -1518,18 +1503,20 @@ rspamd_fuzzy_process_udp_session(struct fuzzy_session *session) if (G_UNLIKELY(cmd == NULL || up_len == 0)) { - result.v1.value = 500; - result.v1.prob = 0.0f; - rspamd_fuzzy_make_reply(cmd, &result, session, send_flags); - return; + result->v1.value = 500; + result->v1.prob = 0.0f; + *final = true; + + return cmd; } - if (session->ctx->encrypted_only && !encrypted) { + if (session->ctx->encrypted_only && !is_encrypted) { /* Do not accept unencrypted commands */ - result.v1.value = 403; - result.v1.prob = 0.0f; - rspamd_fuzzy_make_reply(cmd, &result, session, send_flags); - return; + result->v1.value = 403; + result->v1.prob = 0.0f; + *final = true; + + return cmd; } if (session->key && session->addr) { @@ -1537,7 +1524,7 @@ rspamd_fuzzy_process_udp_session(struct fuzzy_session *session) session->addr, -1); if (ip_stat == NULL) { - naddr = rspamd_inet_address_copy(session->addr, NULL); + rspamd_inet_addr_t *naddr = rspamd_inet_address_copy(session->addr, NULL); ip_stat = g_malloc0(sizeof(*ip_stat)); REF_INIT_RETAIN(ip_stat, fuzzy_key_stat_dtor); rspamd_lru_hash_insert(session->key->stat->last_ips, @@ -1548,77 +1535,108 @@ rspamd_fuzzy_process_udp_session(struct fuzzy_session *session) session->ip_stat = ip_stat; } + /* Unset final flag */ + *final = false; + + return cmd; +} + +static void +rspamd_fuzzy_process_udp_session(struct fuzzy_udp_session *session) +{ + gboolean is_shingle = FALSE, __attribute__((unused)) encrypted = FALSE; + char hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1]; + rspamd_inet_addr_t *naddr; + gpointer ptr; + int send_flags = 0; + bool final = false; + size_t up_len; + + struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_prepare_cmd(&session->common, &session->reply.rep, + &send_flags, &up_len, &final); + + + if (final) { + rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags); + REF_RELEASE(session); + return; + } + if (cmd->cmd == FUZZY_CHECK) { bool can_continue = true; - - if (session->ctx->ratelimit_buckets) { - if (session->ctx->ratelimit_log_only) { - (void) rspamd_fuzzy_check_ratelimit(session->ctx, - session->addr, - session->worker, - session->timestamp); /* Check but ignore */ + if (session->common.ctx->ratelimit_buckets) { + if (session->common.ctx->ratelimit_log_only) { + (void) rspamd_fuzzy_check_ratelimit(session->common.ctx, + session->common.addr, + session->common.worker, + session->common.timestamp); /* Check but ignore */ } else { - can_continue = rspamd_fuzzy_check_ratelimit(session->ctx, - session->addr, - session->worker, - session->timestamp); + can_continue = rspamd_fuzzy_check_ratelimit(session->common.ctx, + session->common.addr, + session->common.worker, + session->common.timestamp); } } if (can_continue) { REF_RETAIN(session); - rspamd_fuzzy_backend_check(session->ctx->backend, cmd, - rspamd_fuzzy_check_callback, session); + rspamd_fuzzy_backend_check(session->common.ctx->backend, cmd, + rspamd_fuzzy_udp_check_callback, session); } else { - result.v1.value = 403; - result.v1.prob = 0.0f; - result.v1.flag = 0; - rspamd_fuzzy_make_reply(cmd, &result, session, send_flags); + session->reply.rep.v1.value = 403; + session->reply.rep.v1.prob = 0.0f; + session->reply.rep.v1.flag = 0; + rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags); } } else if (cmd->cmd == FUZZY_STAT) { /* Store approximation (if needed) */ - result.v1.prob = session->ctx->stat.fuzzy_hashes; + session->reply.rep.v1.prob = session->common.ctx->stat.fuzzy_hashes; /* Store high qword in value and low qword in flag */ - result.v1.value = (int32_t) ((uint64_t) session->ctx->stat.fuzzy_hashes >> 32); - result.v1.flag = (uint32_t) (session->ctx->stat.fuzzy_hashes & G_MAXUINT32); - rspamd_fuzzy_make_reply(cmd, &result, session, send_flags); + session->reply.rep.v1.value = (int32_t) ((uint64_t) session->common.ctx->stat.fuzzy_hashes >> 32); + session->reply.rep.v1.flag = (uint32_t) (session->common.ctx->stat.fuzzy_hashes & G_MAXUINT32); + rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags); } else if (cmd->cmd == FUZZY_PING) { - result.v1.prob = 1.0f; - result.v1.value = cmd->value; - rspamd_fuzzy_make_reply(cmd, &result, session, send_flags); + session->reply.rep.v1.prob = 1.0f; + session->reply.rep.v1.value = cmd->value; + rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags); } else { - if (rspamd_fuzzy_check_write(session->ctx, session->addr, session->key)) { + if (rspamd_fuzzy_check_write(session->common.ctx, session->common.addr, session->common.key)) { /* Check whitelist */ - if (session->ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) { + if (session->common.ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) { rspamd_encode_hex_buf(cmd->digest, sizeof(cmd->digest), hexbuf, sizeof(hexbuf) - 1); hexbuf[sizeof(hexbuf) - 1] = '\0'; - if (rspamd_match_hash_map(session->ctx->skip_hashes, + if (rspamd_match_hash_map(session->common.ctx->skip_hashes, hexbuf, sizeof(hexbuf) - 1)) { - result.v1.value = 401; - result.v1.prob = 0.0f; + session->reply.rep.v1.value = 401; + session->reply.rep.v1.prob = 0.0f; goto reply; } } - if (session->ctx->weak_ids && kh_get(fuzzy_key_ids_set, session->ctx->weak_ids, cmd->flag) != kh_end(session->ctx->weak_ids)) { + if (session->common.ctx->weak_ids && + kh_get(fuzzy_key_ids_set, session->common.ctx->weak_ids, cmd->flag) != kh_end(session->common.ctx->weak_ids)) { /* Flag command as weak */ cmd->version |= RSPAMD_FUZZY_FLAG_WEAK; } - if (session->worker->index == 0 || session->ctx->peer_fd == -1) { + struct fuzzy_peer_cmd up_cmd; + struct fuzzy_peer_request *up_req; + + /* Decide if we can process add request by this worker */ + if (session->common.worker->index == 0 || session->common.ctx->peer_fd == -1) { /* Just add to the queue */ up_cmd.is_shingle = is_shingle; ptr = is_shingle ? (gpointer) &up_cmd.cmd.shingle : (gpointer) &up_cmd.cmd.normal; memcpy(ptr, cmd, up_len); - g_array_append_val(session->ctx->updates_pending, up_cmd); + g_array_append_val(session->common.ctx->updates_pending, up_cmd); } else { /* We need to send request to the peer */ @@ -1627,26 +1645,26 @@ rspamd_fuzzy_process_udp_session(struct fuzzy_session *session) ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal; memcpy(ptr, cmd, up_len); - if (!fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) { + if (!fuzzy_peer_try_send(session->common.ctx->peer_fd, up_req)) { up_req->io_ev.data = up_req; ev_io_init(&up_req->io_ev, fuzzy_peer_send_io, - session->ctx->peer_fd, EV_WRITE); - ev_io_start(session->ctx->event_loop, &up_req->io_ev); + session->common.ctx->peer_fd, EV_WRITE); + ev_io_start(session->common.ctx->event_loop, &up_req->io_ev); } else { g_free(up_req); } } - result.v1.value = 0; - result.v1.prob = 1.0f; + session->reply.rep.v1.value = 0; + session->reply.rep.v1.prob = 1.0f; } else { - result.v1.value = 403; - result.v1.prob = 0.0f; + session->reply.rep.v1.value = 403; + session->reply.rep.v1.prob = 0.0f; } reply: - rspamd_fuzzy_make_reply(cmd, &result, session, send_flags); + rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags); } } @@ -2013,12 +2031,9 @@ rspamd_fuzzy_cmd_from_wire(struct rspamd_fuzzy_storage_ctx *ctx, return TRUE; } - static void -fuzzy_session_destroy(gpointer d) +fuzzy_common_session_dtor(struct fuzzy_common_session *session) { - struct fuzzy_session *session = d; - rspamd_inet_address_free(session->addr); rspamd_explicit_memzero(session->nm, sizeof(session->nm)); session->worker->nconns--; @@ -2034,7 +2049,14 @@ fuzzy_session_destroy(gpointer d) if (session->key) { REF_RELEASE(session->key); } +} + +static void +fuzzy_udp_session_dtor(gpointer d) +{ + struct fuzzy_udp_session *session = d; + fuzzy_common_session_dtor(&session->common); g_free(session); } @@ -2057,19 +2079,12 @@ static void tcp_session_dtor(struct fuzzy_tcp_session *tcp_session) { struct fuzzy_tcp_reply *rep; - if (tcp_session->addr) { - rspamd_inet_address_free(tcp_session->addr); - } - - tcp_session->worker->nconns--; - if (tcp_session->ip_stat) { - REF_RELEASE(tcp_session->ip_stat); - } + fuzzy_common_session_dtor(&tcp_session->common); - if (tcp_session->ctx->event_loop) { - ev_timer_stop(tcp_session->ctx->event_loop, &tcp_session->tm); - ev_io_stop(tcp_session->ctx->event_loop, &tcp_session->io); + if (tcp_session->common.ctx->event_loop) { + ev_timer_stop(tcp_session->common.ctx->event_loop, &tcp_session->tm); + ev_io_stop(tcp_session->common.ctx->event_loop, &tcp_session->common.io); } DL_FOREACH(tcp_session->replies_queue, rep) @@ -2077,7 +2092,8 @@ tcp_session_dtor(struct fuzzy_tcp_session *tcp_session) g_free(rep); } - close(tcp_session->fd); + /* For TCP session we also close a socket as it is owned by a session unlike UDP socket that is shared */ + close(tcp_session->common.fd); g_free(tcp_session); } @@ -2094,27 +2110,27 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c gpointer ptr; int send_flags = 0; - if (!rspamd_fuzzy_cmd_from_wire(tcp_session->ctx, tcp_session->addr, buf, + if (!rspamd_fuzzy_cmd_from_wire(tcp_session->common.ctx, tcp_session->common.addr, buf, buflen, - &tcp_session->key, - tcp_session->nm, - &tcp_session->cmd, - &tcp_session->epoch, - &tcp_session->cmd_type, - &tcp_session->extensions)) { + &tcp_session->common.key, + tcp_session->common.nm, + &tcp_session->common.cmd, + &tcp_session->common.epoch, + &tcp_session->common.cmd_type, + &tcp_session->common.extensions)) { /* Discard input */ - tcp_session->ctx->stat.invalid_requests++; + tcp_session->common.ctx->stat.invalid_requests++; msg_debug("invalid fuzzy command of size %z received", buflen); - if (tcp_session->addr) { - uint64_t *nerrors = rspamd_lru_hash_lookup(tcp_session->ctx->errors_ips, - tcp_session->addr, -1); + if (tcp_session->common.addr) { + uint64_t *nerrors = rspamd_lru_hash_lookup(tcp_session->common.ctx->errors_ips, + tcp_session->common.addr, -1); if (nerrors == NULL) { nerrors = g_malloc(sizeof(*nerrors)); *nerrors = 1; - rspamd_lru_hash_insert(tcp_session->ctx->errors_ips, - rspamd_inet_address_copy(tcp_session->addr, NULL), + rspamd_lru_hash_insert(tcp_session->common.ctx->errors_ips, + rspamd_inet_address_copy(tcp_session->common.addr, NULL), nerrors, -1, -1); } else { @@ -2125,153 +2141,54 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c return false; } - struct rspamd_fuzzy_cmd *cmd = &tcp_session->cmd.basic; + bool final = false; size_t up_len = 0; + struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_prepare_cmd(&tcp_session->common, &result, &send_flags, &up_len, &final); - switch (tcp_session->cmd_type) { - case CMD_NORMAL: - up_len = sizeof(tcp_session->cmd.basic); - break; - case CMD_SHINGLE: - up_len = sizeof(tcp_session->cmd); - is_shingle = TRUE; - send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE; - break; - case CMD_ENCRYPTED_NORMAL: - up_len = sizeof(tcp_session->cmd.basic); - encrypted = TRUE; - send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED; - break; - case CMD_ENCRYPTED_SHINGLE: - up_len = sizeof(tcp_session->cmd); - encrypted = TRUE; - is_shingle = TRUE; - send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE | RSPAMD_FUZZY_REPLY_ENCRYPTED; - break; - default: - msg_err("invalid command type: %d", tcp_session->cmd_type); - return false; - } - - memset(&result, 0, sizeof(result)); - memcpy(result.digest, cmd->digest, sizeof(result.digest)); - result.v1.flag = cmd->flag; - result.v1.tag = cmd->tag; + if (G_UNLIKELY(cmd == NULL || final)) { - if (tcp_session->ctx->lua_pre_handler_cbref != -1) { - /* Start lua pre handler */ - lua_State *L = tcp_session->ctx->cfg->lua_state; - int err_idx, ret; - - lua_pushcfunction(L, &rspamd_lua_traceback); - err_idx = lua_gettop(L); - /* Preallocate stack (small opt) */ - lua_checkstack(L, err_idx + 5); - /* function */ - lua_rawgeti(L, LUA_REGISTRYINDEX, tcp_session->ctx->lua_pre_handler_cbref); - /* client IP */ - rspamd_lua_ip_push(L, tcp_session->addr); - /* client command */ - lua_pushinteger(L, cmd->cmd); - /* command value (push as rspamd_text) */ - (void) lua_new_text(L, cmd->digest, sizeof(cmd->digest), FALSE); - /* is shingle */ - lua_pushboolean(L, is_shingle); - /* TODO: add additional data maybe (encryption, pubkey, etc) */ - rspamd_fuzzy_extensions_tolua(L, tcp_session->extensions); - - if ((ret = lua_pcall(L, 5, LUA_MULTRET, err_idx)) != 0) { - msg_err("call to lua_pre_handler lua " - "script failed (%d): %s", - ret, lua_tostring(L, -1)); - - return false; - } - else { - /* Return values order: - * the first reply will be on err_idx + 1 - * if it is true, then we need to read the former ones: - * 2-nd will be reply code - * 3-rd will be probability (or 0.0 if missing) - */ - ret = lua_toboolean(L, err_idx + 1); - - if (ret) { - /* Artificial reply */ - result.v1.value = lua_tointeger(L, err_idx + 2); - - if (lua_isnumber(L, err_idx + 3)) { - result.v1.prob = lua_tonumber(L, err_idx + 3); - } - else { - result.v1.prob = 0.0f; - } - - lua_settop(L, 0); - /* TODO: write reply */ - - return true; - } - } - - lua_settop(L, 0); - } - - - if (G_UNLIKELY(cmd == NULL || up_len == 0)) { - result.v1.value = 500; - result.v1.prob = 0.0f; - /* TODO: write reply */ - - return true; - } - - if (tcp_session->ctx->encrypted_only && !encrypted) { - /* Do not accept unencrypted commands */ - result.v1.value = 403; - result.v1.prob = 0.0f; /* TODO: write reply */ return true; } - if (tcp_session->key && tcp_session->addr) { - ip_stat = rspamd_lru_hash_lookup(tcp_session->key->stat->last_ips, - tcp_session->addr, -1); + if (tcp_session->common.key && tcp_session->common.addr) { + ip_stat = rspamd_lru_hash_lookup(tcp_session->common.key->stat->last_ips, + tcp_session->common.addr, -1); if (ip_stat == NULL) { - naddr = rspamd_inet_address_copy(tcp_session->addr, NULL); + naddr = rspamd_inet_address_copy(tcp_session->common.addr, NULL); ip_stat = g_malloc0(sizeof(*ip_stat)); REF_INIT_RETAIN(ip_stat, fuzzy_key_stat_dtor); - rspamd_lru_hash_insert(tcp_session->key->stat->last_ips, + rspamd_lru_hash_insert(tcp_session->common.key->stat->last_ips, naddr, ip_stat, -1, 0); } REF_RETAIN(ip_stat); - tcp_session->ip_stat = ip_stat; + tcp_session->common.ip_stat = ip_stat; } if (cmd->cmd == FUZZY_CHECK) { bool can_continue = true; - if (tcp_session->ctx->ratelimit_buckets) { - if (tcp_session->ctx->ratelimit_log_only) { - (void) rspamd_fuzzy_check_ratelimit(tcp_session->ctx, tcp_session->addr, - tcp_session->worker, - ev_now(tcp_session->ctx->event_loop)); /* Check but ignore */ + if (tcp_session->common.ctx->ratelimit_buckets) { + if (tcp_session->common.ctx->ratelimit_log_only) { + (void) rspamd_fuzzy_check_ratelimit(tcp_session->common.ctx, tcp_session->common.addr, + tcp_session->common.worker, + ev_now(tcp_session->common.ctx->event_loop)); /* Check but ignore */ } else { - can_continue = rspamd_fuzzy_check_ratelimit(tcp_session->ctx, tcp_session->addr, - tcp_session->worker, - ev_now(tcp_session->ctx->event_loop)); + can_continue = rspamd_fuzzy_check_ratelimit(tcp_session->common.ctx, tcp_session->common.addr, + tcp_session->common.worker, + ev_now(tcp_session->common.ctx->event_loop)); } } if (can_continue) { REF_RETAIN(tcp_session); /* TODO: use a different callback */ - rspamd_fuzzy_backend_check(tcp_session->ctx->backend, cmd, - rspamd_fuzzy_check_callback, tcp_session); + rspamd_fuzzy_backend_check(tcp_session->common.ctx->backend, cmd, + rspamd_fuzzy_tcp_check_callback, tcp_session); } else { result.v1.value = 403; @@ -2284,10 +2201,10 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c } else if (cmd->cmd == FUZZY_STAT) { /* Store approximation (if needed) */ - result.v1.prob = tcp_session->ctx->stat.fuzzy_hashes; + result.v1.prob = tcp_session->common.ctx->stat.fuzzy_hashes; /* Store high qword in value and low qword in flag */ - result.v1.value = (int32_t) ((uint64_t) tcp_session->ctx->stat.fuzzy_hashes >> 32); - result.v1.flag = (uint32_t) (tcp_session->ctx->stat.fuzzy_hashes & G_MAXUINT32); + result.v1.value = (int32_t) ((uint64_t) tcp_session->common.ctx->stat.fuzzy_hashes >> 32); + result.v1.flag = (uint32_t) (tcp_session->common.ctx->stat.fuzzy_hashes & G_MAXUINT32); /* TODO: write reply */ } else if (cmd->cmd == FUZZY_PING) { @@ -2296,14 +2213,14 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c /* TODO: write reply */ } else { - if (rspamd_fuzzy_check_write(tcp_session->ctx, tcp_session->addr, tcp_session->key)) { + if (rspamd_fuzzy_check_write(tcp_session->common.ctx, tcp_session->common.addr, tcp_session->common.key)) { /* Check whitelist */ - if (tcp_session->ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) { + if (tcp_session->common.ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) { rspamd_encode_hex_buf(cmd->digest, sizeof(cmd->digest), hexbuf, sizeof(hexbuf) - 1); hexbuf[sizeof(hexbuf) - 1] = '\0'; - if (rspamd_match_hash_map(tcp_session->ctx->skip_hashes, + if (rspamd_match_hash_map(tcp_session->common.ctx->skip_hashes, hexbuf, sizeof(hexbuf) - 1)) { result.v1.value = 401; result.v1.prob = 0.0f; @@ -2312,18 +2229,18 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c } } - if (tcp_session->ctx->weak_ids && - kh_get(fuzzy_key_ids_set, tcp_session->ctx->weak_ids, cmd->flag) != kh_end(tcp_session->ctx->weak_ids)) { + if (tcp_session->common.ctx->weak_ids && + kh_get(fuzzy_key_ids_set, tcp_session->common.ctx->weak_ids, cmd->flag) != kh_end(tcp_session->common.ctx->weak_ids)) { /* Flag command as weak */ cmd->version |= RSPAMD_FUZZY_FLAG_WEAK; } - if (tcp_session->worker->index == 0 || tcp_session->ctx->peer_fd == -1) { + if (tcp_session->common.worker->index == 0 || tcp_session->common.ctx->peer_fd == -1) { /* Just add to the queue */ up_cmd.is_shingle = is_shingle; ptr = is_shingle ? (gpointer) &up_cmd.cmd.shingle : (gpointer) &up_cmd.cmd.normal; memcpy(ptr, cmd, up_len); - g_array_append_val(tcp_session->ctx->updates_pending, up_cmd); + g_array_append_val(tcp_session->common.ctx->updates_pending, up_cmd); } else { /* We need to send request to the peer */ @@ -2332,11 +2249,11 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal; memcpy(ptr, cmd, up_len); - if (!fuzzy_peer_try_send(tcp_session->ctx->peer_fd, up_req)) { + if (!fuzzy_peer_try_send(tcp_session->common.ctx->peer_fd, up_req)) { up_req->io_ev.data = up_req; ev_io_init(&up_req->io_ev, fuzzy_peer_send_io, - tcp_session->ctx->peer_fd, EV_WRITE); - ev_io_start(tcp_session->ctx->event_loop, &up_req->io_ev); + tcp_session->common.ctx->peer_fd, EV_WRITE); + ev_io_start(tcp_session->common.ctx->event_loop, &up_req->io_ev); } else { g_free(up_req); @@ -2419,18 +2336,18 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) { struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data; - msg_debug_fuzzy_storage("got io for %s: %d", rspamd_inet_address_to_string(tcp_session->addr), revents); + msg_debug_fuzzy_storage("got io for %s: %d", rspamd_inet_address_to_string(tcp_session->common.addr), revents); if (revents & EV_READ) { ssize_t r; - r = read(tcp_session->fd, tcp_session->input_buf + tcp_session->bytes_unprocessed, + r = read(tcp_session->common.fd, tcp_session->input_buf + tcp_session->bytes_unprocessed, sizeof(tcp_session->input_buf) - tcp_session->bytes_unprocessed); if (r == -1) { /* Cannot read anything */ msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: %s", - rspamd_inet_address_to_string(tcp_session->addr), + rspamd_inet_address_to_string(tcp_session->common.addr), strerror(errno)); REF_RELEASE(tcp_session); @@ -2438,7 +2355,7 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) else if (r == 0) { /* Got EOF */ msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: EOF", - rspamd_inet_address_to_string(tcp_session->addr)); + rspamd_inet_address_to_string(tcp_session->common.addr)); REF_RELEASE(tcp_session); } @@ -2449,11 +2366,11 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) else { if (tcp_session->replies_queue != NULL) { /* No more replies */ - ev_io_set(w, tcp_session->fd, EV_READ); + ev_io_set(w, tcp_session->common.fd, EV_READ); } else { /* Wait for another write readiness */ - ev_io_set(w, tcp_session->fd, EV_WRITE | EV_READ); + ev_io_set(w, tcp_session->common.fd, EV_WRITE | EV_READ); } ev_io_start(loop, w); @@ -2484,7 +2401,7 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) } /* Try to write everything */ - ssize_t r = writev(tcp_session->fd, iov, n); + ssize_t r = writev(tcp_session->common.fd, iov, n); if (n > 32) { g_free(iov); @@ -2493,7 +2410,7 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) if (r == -1) { /* Cannot write anything */ msg_debug_fuzzy_storage("failed TCP connection from %s; cannot write: %s", - rspamd_inet_address_to_string(tcp_session->addr), + rspamd_inet_address_to_string(tcp_session->common.addr), strerror(errno)); REF_RELEASE(tcp_session); @@ -2521,11 +2438,11 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) if (tcp_session->replies_queue != NULL) { /* No more replies */ - ev_io_set(w, tcp_session->fd, EV_READ); + ev_io_set(w, tcp_session->common.fd, EV_READ); } else { /* Wait for another write readiness */ - ev_io_set(w, tcp_session->fd, EV_WRITE | EV_READ); + ev_io_set(w, tcp_session->common.fd, EV_WRITE | EV_READ); } ev_io_start(loop, w); @@ -2541,7 +2458,7 @@ tcp_fuzzy_socket_timeout(EV_P_ ev_timer *w, int revents) { struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data; - msg_debug_fuzzy_storage("timed out TCP connection from %s", rspamd_inet_address_to_string(tcp_session->addr)); + msg_debug_fuzzy_storage("timed out TCP connection from %s", rspamd_inet_address_to_string(tcp_session->common.addr)); REF_RELEASE(tcp_session); } @@ -2574,16 +2491,18 @@ accept_tcp_fuzzy_socket(EV_P_ ev_io *w, int revents) } tcp_session = g_malloc0(sizeof(*tcp_session)); - tcp_session->addr = addr; - tcp_session->ctx = ctx; - tcp_session->fd = nfd; + tcp_session->common.addr = addr; + tcp_session->common.ctx = ctx; + tcp_session->common.fd = nfd; + tcp_session->common.worker = worker; + tcp_session->common.timestamp = ev_now(ctx->event_loop); REF_INIT_RETAIN(tcp_session, tcp_session_dtor); - ev_io_init(&tcp_session->io, tcp_fuzzy_socket_io, nfd, EV_READ); + ev_io_init(&tcp_session->common.io, tcp_fuzzy_socket_io, nfd, EV_READ); ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, ctx->tcp_timeout); tcp_session->tm.data = tcp_session; - tcp_session->io.data = tcp_session; + tcp_session->common.io.data = tcp_session; ev_timer_start(ctx->event_loop, &tcp_session->tm); - ev_io_start(ctx->event_loop, &tcp_session->io); + ev_io_start(ctx->event_loop, &tcp_session->common.io); msg_debug_fuzzy_storage("accepted TCP connection from %s", rspamd_inet_address_to_string(addr)); } @@ -2597,7 +2516,7 @@ accept_udp_fuzzy_socket(EV_P_ ev_io *w, int revents) { struct rspamd_worker *worker = (struct rspamd_worker *) w->data; struct rspamd_fuzzy_storage_ctx *ctx; - struct fuzzy_session *session; + struct fuzzy_udp_session *session; gssize r, msg_len; uint64_t *nerrors; struct iovec iovs[MSGVEC_LEN]; @@ -2673,12 +2592,12 @@ accept_udp_fuzzy_socket(EV_P_ ev_io *w, int revents) } session = g_malloc0(sizeof(*session)); - REF_INIT_RETAIN(session, fuzzy_session_destroy); - session->worker = worker; - session->fd = w->fd; - session->ctx = ctx; - session->timestamp = ev_now(ctx->event_loop); - session->addr = client_addr; + REF_INIT_RETAIN(session, fuzzy_udp_session_dtor); + session->common.worker = worker; + session->common.fd = w->fd; + session->common.ctx = ctx; + session->common.timestamp = ev_now(ctx->event_loop); + session->common.addr = client_addr; worker->nconns++; /* Each message can have its length in case of recvmmsg */ @@ -2688,12 +2607,12 @@ accept_udp_fuzzy_socket(EV_P_ ev_io *w, int revents) if (rspamd_fuzzy_cmd_from_wire(ctx, client_addr, iovs[i].iov_base, msg_len, - &session->key, - session->nm, - &session->cmd, - &session->epoch, - &session->cmd_type, - &session->extensions)) { + &session->common.key, + session->common.nm, + &session->common.cmd, + &session->common.epoch, + &session->common.cmd_type, + &session->common.extensions)) { /* Check shingles count sanity */ rspamd_fuzzy_process_udp_session(session); } @@ -2702,15 +2621,15 @@ accept_udp_fuzzy_socket(EV_P_ ev_io *w, int revents) ctx->stat.invalid_requests++; msg_debug("invalid fuzzy command of size %z received", r); - if (session->addr) { - nerrors = rspamd_lru_hash_lookup(session->ctx->errors_ips, - session->addr, -1); + if (session->common.addr) { + nerrors = rspamd_lru_hash_lookup(session->common.ctx->errors_ips, + session->common.addr, -1); if (nerrors == NULL) { nerrors = g_malloc(sizeof(*nerrors)); *nerrors = 1; - rspamd_lru_hash_insert(session->ctx->errors_ips, - rspamd_inet_address_copy(session->addr, NULL), + rspamd_lru_hash_insert(session->common.ctx->errors_ips, + rspamd_inet_address_copy(session->common.addr, NULL), nerrors, -1, -1); } else {