]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Further tcp/udp refactoring
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 4 May 2024 13:57:45 +0000 (14:57 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 25 Jun 2024 13:27:55 +0000 (14:27 +0100)
src/fuzzy_storage.c

index ce8b205a7fd73eeac46597528afee2671ce6be50..0ecac635b104bda03b7c40ac99abf4a3d893b1c4 100644 (file)
@@ -237,12 +237,24 @@ struct fuzzy_tcp_reply {
        struct fuzzy_tcp_reply *prev, *next; /* Link */
 };
 
-struct fuzzy_tcp_session {
-       struct rspamd_worker *worker;
-       rspamd_inet_addr_t *addr;
+struct fuzzy_common_session {
        struct rspamd_fuzzy_storage_ctx *ctx;
        int fd;
        struct ev_io io;
+       ev_tstamp timestamp;
+       struct rspamd_worker *worker;
+       rspamd_inet_addr_t *addr;
+
+       enum rspamd_fuzzy_epoch epoch;
+       enum fuzzy_cmd_type cmd_type;
+       struct rspamd_fuzzy_shingle_cmd cmd;
+       struct fuzzy_key *key;
+       struct rspamd_fuzzy_cmd_extension *extensions;
+       struct fuzzy_key_stat *ip_stat;
+       unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
+};
+
+struct fuzzy_tcp_session {
        struct ev_timer tm;
 
        /*
@@ -256,39 +268,17 @@ struct fuzzy_tcp_session {
        uint16_t bytes_unprocessed;
 
        /* Common with UDP session */
-       enum rspamd_fuzzy_epoch epoch;
-       enum fuzzy_cmd_type cmd_type;
-       struct rspamd_fuzzy_shingle_cmd cmd;
-       struct fuzzy_key *key;
-       struct rspamd_fuzzy_cmd_extension *extensions;
-       struct fuzzy_key_stat *ip_stat;
-       unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
-
+       struct fuzzy_common_session common;
        ref_entry_t ref;
 
        struct fuzzy_tcp_reply *replies_queue;
        unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH];
 };
 
-struct fuzzy_session {
-       struct rspamd_worker *worker;
-       rspamd_inet_addr_t *addr;
-       struct rspamd_fuzzy_storage_ctx *ctx;
-
-       struct rspamd_fuzzy_shingle_cmd cmd;       /* Can handle both shingles and non-shingles */
+struct fuzzy_udp_session {
+       /* Common fields with TCP session */
+       struct fuzzy_common_session common;
        struct rspamd_fuzzy_encrypted_reply reply; /* Again: contains everything */
-       struct fuzzy_key_stat *ip_stat;
-
-       enum rspamd_fuzzy_epoch epoch;
-       enum fuzzy_cmd_type cmd_type;
-       int fd;
-       ev_tstamp timestamp;
-
-       struct fuzzy_key *key;
-       struct rspamd_fuzzy_cmd_extension *extensions;
-       unsigned char nm[rspamd_cryptobox_MAX_NMBYTES];
-
-       struct ev_io io;
        ref_entry_t ref;
 };
 
@@ -305,7 +295,7 @@ struct rspamd_updates_cbdata {
 };
 
 
-static void rspamd_fuzzy_write_reply(struct fuzzy_session *session);
+static void rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session);
 static gboolean rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx,
                                                                                                   const char *source, gboolean final);
 static gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
@@ -887,28 +877,28 @@ rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx,
 }
 
 static void
-rspamd_fuzzy_reply_io(EV_P_ ev_io *w, int revents)
+rspamd_fuzzy_udp_reply_io(EV_P_ ev_io *w, int revents)
 {
-       struct fuzzy_session *session = (struct fuzzy_session *) w->data;
+       struct fuzzy_udp_session *session = (struct fuzzy_udp_session *) w->data;
 
        ev_io_stop(EV_A_ w);
-       rspamd_fuzzy_write_reply(session);
+       rspamd_fuzzy_udp_write_reply(session);
        REF_RELEASE(session);
 }
 
 static void
-rspamd_fuzzy_write_reply(struct fuzzy_session *session)
+rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session)
 {
        gssize r;
        gsize len;
        gconstpointer data;
 
-       if (session->cmd_type == CMD_ENCRYPTED_NORMAL ||
-               session->cmd_type == CMD_ENCRYPTED_SHINGLE) {
+       if (session->common.cmd_type == CMD_ENCRYPTED_NORMAL ||
+               session->common.cmd_type == CMD_ENCRYPTED_SHINGLE) {
                /* Encrypted reply */
                data = &session->reply;
 
-               if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+               if (session->common.epoch > RSPAMD_FUZZY_EPOCH10) {
                        len = sizeof(session->reply);
                }
                else {
@@ -918,7 +908,7 @@ rspamd_fuzzy_write_reply(struct fuzzy_session *session)
        else {
                data = &session->reply.rep;
 
-               if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+               if (session->common.epoch > RSPAMD_FUZZY_EPOCH10) {
                        len = sizeof(session->reply.rep);
                }
                else {
@@ -926,17 +916,17 @@ rspamd_fuzzy_write_reply(struct fuzzy_session *session)
                }
        }
 
-       r = rspamd_inet_address_sendto(session->fd, data, len, 0,
-                                                                  session->addr);
+       r = rspamd_inet_address_sendto(session->common.fd, data, len, 0,
+                                                                  session->common.addr);
 
        if (r == -1) {
                if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
                        /* Grab reference to avoid early destruction */
                        REF_RETAIN(session);
-                       session->io.data = session;
-                       ev_io_init(&session->io,
-                                          rspamd_fuzzy_reply_io, session->fd, EV_WRITE);
-                       ev_io_start(session->ctx->event_loop, &session->io);
+                       session->common.io.data = session;
+                       ev_io_init(&session->common.io,
+                                          rspamd_fuzzy_udp_reply_io, session->common.fd, EV_WRITE);
+                       ev_io_start(session->common.ctx->event_loop, &session->common.io);
                }
                else {
                        msg_err("error while writing reply: %s", strerror(errno));
@@ -1060,11 +1050,43 @@ enum rspamd_fuzzy_reply_flags {
        RSPAMD_FUZZY_REPLY_DELAY = 0x1u << 2u,
 };
 
+static bool
+rspamd_fuzzy_can_reply(struct fuzzy_common_session *session, bool encrypted, int flag, float prob)
+{
+       bool default_disabled = false;
+
+       {
+               khiter_t k;
+
+               k = kh_get(fuzzy_key_ids_set, session->ctx->default_forbidden_ids, flag);
+
+               if (k != kh_end(session->ctx->default_forbidden_ids)) {
+                       /* Hash is from a forbidden flag by default */
+                       default_disabled = true;
+               }
+       }
+
+       if (encrypted) {
+               if (prob > 0 && session->key && session->key->forbidden_ids) {
+                       khiter_t k;
+
+                       k = kh_get(fuzzy_key_ids_set, session->key->forbidden_ids, flag);
+
+                       if (k != kh_end(session->key->forbidden_ids)) {
+                               /* Hash is from a forbidden flag for this key */
+                               default_disabled = true;
+                       }
+               }
+       }
+
+       return !default_disabled;
+}
+
 static void
-rspamd_fuzzy_make_reply(struct rspamd_fuzzy_cmd *cmd,
-                                               struct rspamd_fuzzy_reply *result,
-                                               struct fuzzy_session *session,
-                                               int flags)
+rspamd_fuzzy_make_udp_reply(struct rspamd_fuzzy_cmd *cmd,
+                                                       struct rspamd_fuzzy_reply *result,
+                                                       struct fuzzy_udp_session *session,
+                                                       int flags)
 {
        gsize len;
 
@@ -1078,43 +1100,31 @@ rspamd_fuzzy_make_reply(struct rspamd_fuzzy_cmd *cmd,
                        session->reply.rep.v1.prob = 0.0f;
                        session->reply.rep.v1.value = 0;
                }
-
-               bool default_disabled = false;
-
-               {
-                       khiter_t k;
-
-                       k = kh_get(fuzzy_key_ids_set, session->ctx->default_forbidden_ids, session->reply.rep.v1.flag);
-
-                       if (k != kh_end(session->ctx->default_forbidden_ids)) {
-                               /* Hash is from a forbidden flag by default */
-                               default_disabled = true;
+               else if (!rspamd_fuzzy_can_reply(&session->common, flags & RSPAMD_FUZZY_REPLY_ENCRYPTED,
+                                                                                session->reply.rep.v1.flag, session->reply.rep.v1.prob)) {
+                       /* Hash is from a forbidden flag */
+                       session->reply.rep.ts = 0;
+                       session->reply.rep.v1.prob = 0.0f;
+                       session->reply.rep.v1.value = 0;
+                       session->reply.rep.v1.flag = 0;
+               }
+               else {
+                       /* Update stats before encryption */
+                       if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) {
+                               rspamd_fuzzy_update_stats(session->common.ctx,
+                                                                                 session->common.epoch,
+                                                                                 session->reply.rep.v1.prob > 0.5f,
+                                                                                 flags & RSPAMD_FUZZY_REPLY_SHINGLE,
+                                                                                 flags & RSPAMD_FUZZY_REPLY_DELAY,
+                                                                                 session->common.key,
+                                                                                 session->common.ip_stat,
+                                                                                 cmd->cmd,
+                                                                                 &session->reply.rep,
+                                                                                 session->common.timestamp);
                        }
                }
 
                if (flags & RSPAMD_FUZZY_REPLY_ENCRYPTED) {
-
-                       if (session->reply.rep.v1.prob > 0 && session->key && session->key->forbidden_ids) {
-                               khiter_t k;
-
-                               k = kh_get(fuzzy_key_ids_set, session->key->forbidden_ids, session->reply.rep.v1.flag);
-
-                               if (k != kh_end(session->key->forbidden_ids)) {
-                                       /* Hash is from a forbidden flag for this key */
-                                       session->reply.rep.ts = 0;
-                                       session->reply.rep.v1.prob = 0.0f;
-                                       session->reply.rep.v1.value = 0;
-                                       session->reply.rep.v1.flag = 0;
-                               }
-                       }
-                       else if (default_disabled) {
-                               /* Hash is from a forbidden flag by default */
-                               session->reply.rep.ts = 0;
-                               session->reply.rep.v1.prob = 0.0f;
-                               session->reply.rep.v1.value = 0;
-                               session->reply.rep.v1.flag = 0;
-                       }
-
                        /* We need also to encrypt reply */
                        ottery_rand_bytes(session->reply.hdr.nonce,
                                                          sizeof(session->reply.hdr.nonce));
@@ -1124,58 +1134,23 @@ rspamd_fuzzy_make_reply(struct rspamd_fuzzy_cmd *cmd,
                         * decryption would fail due to mac verification mistake
                         */
 
-                       if (session->epoch > RSPAMD_FUZZY_EPOCH10) {
+                       if (session->common.epoch > RSPAMD_FUZZY_EPOCH10) {
                                len = sizeof(session->reply.rep);
                        }
                        else {
                                len = sizeof(session->reply.rep.v1);
                        }
 
-                       /* Update stats before encryption */
-                       if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) {
-                               rspamd_fuzzy_update_stats(session->ctx,
-                                                                                 session->epoch,
-                                                                                 session->reply.rep.v1.prob > 0.5f,
-                                                                                 flags & RSPAMD_FUZZY_REPLY_SHINGLE,
-                                                                                 flags & RSPAMD_FUZZY_REPLY_DELAY,
-                                                                                 session->key,
-                                                                                 session->ip_stat,
-                                                                                 cmd->cmd,
-                                                                                 &session->reply.rep,
-                                                                                 session->timestamp);
-                       }
-
                        rspamd_cryptobox_encrypt_nm_inplace((unsigned char *) &session->reply.rep,
                                                                                                len,
                                                                                                session->reply.hdr.nonce,
-                                                                                               session->nm,
+                                                                                               session->common.nm,
                                                                                                session->reply.hdr.mac,
                                                                                                RSPAMD_CRYPTOBOX_MODE_25519);
                }
-               else if (default_disabled) {
-                       /* Hash is from a forbidden flag by default, and there is no encryption override */
-                       session->reply.rep.ts = 0;
-                       session->reply.rep.v1.prob = 0.0f;
-                       session->reply.rep.v1.value = 0;
-                       session->reply.rep.v1.flag = 0;
-               }
-               if (!(flags & RSPAMD_FUZZY_REPLY_ENCRYPTED)) {
-                       if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) {
-                               rspamd_fuzzy_update_stats(session->ctx,
-                                                                                 session->epoch,
-                                                                                 session->reply.rep.v1.prob > 0.5f,
-                                                                                 flags & RSPAMD_FUZZY_REPLY_SHINGLE,
-                                                                                 flags & RSPAMD_FUZZY_REPLY_DELAY,
-                                                                                 session->key,
-                                                                                 session->ip_stat,
-                                                                                 cmd->cmd,
-                                                                                 &session->reply.rep,
-                                                                                 session->timestamp);
-                       }
-               }
        }
 
-       rspamd_fuzzy_write_reply(session);
+       rspamd_fuzzy_udp_write_reply(session);
 }
 
 static gboolean
@@ -1237,35 +1212,33 @@ rspamd_fuzzy_extensions_tolua(lua_State *L,
        }
 }
 
-static void
-rspamd_fuzzy_check_callback(struct rspamd_fuzzy_reply *result, void *ud)
+static struct rspamd_fuzzy_cmd *
+rspamd_fuzzy_fill_reply(struct rspamd_fuzzy_reply *result, struct fuzzy_common_session *session, int *send_flags)
 {
-       struct fuzzy_session *session = ud;
-       gboolean is_shingle = FALSE, __attribute__((unused)) encrypted = FALSE;
+       bool encrypted = false, is_shingle = false;
        struct rspamd_fuzzy_cmd *cmd = NULL;
        const struct rspamd_shingle *shingle = NULL;
        struct rspamd_shingle sgl_cpy;
-       int send_flags = 0;
 
        switch (session->cmd_type) {
        case CMD_ENCRYPTED_NORMAL:
-               encrypted = TRUE;
-               send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
+               encrypted = true;
+               *send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
                /* Fallthrough */
        case CMD_NORMAL:
                cmd = &session->cmd.basic;
                break;
 
        case CMD_ENCRYPTED_SHINGLE:
-               encrypted = TRUE;
-               send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
+               encrypted = true;
+               *send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
                /* Fallthrough */
        case CMD_SHINGLE:
                cmd = &session->cmd.basic;
                memcpy(&sgl_cpy, &session->cmd.sgl, sizeof(sgl_cpy));
                shingle = &sgl_cpy;
-               is_shingle = TRUE;
-               send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
+               is_shingle = true;
+               *send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
                break;
        }
 
@@ -1335,10 +1308,7 @@ rspamd_fuzzy_check_callback(struct rspamd_fuzzy_reply *result, void *ud)
                                }
 
                                lua_settop(L, 0);
-                               rspamd_fuzzy_make_reply(cmd, result, session, send_flags);
-                               REF_RELEASE(session);
-
-                               return;
+                               return cmd;
                        }
                }
 
@@ -1353,7 +1323,7 @@ rspamd_fuzzy_check_callback(struct rspamd_fuzzy_reply *result, void *ud)
                                                                                                 session->ctx->delay / 2.0);
 
                if (hash_age < jittered_age) {
-                       send_flags |= RSPAMD_FUZZY_REPLY_DELAY;
+                       *send_flags |= RSPAMD_FUZZY_REPLY_DELAY;
                }
        }
 
@@ -1407,57 +1377,73 @@ rspamd_fuzzy_check_callback(struct rspamd_fuzzy_reply *result, void *ud)
                }
        }
 
-       rspamd_fuzzy_make_reply(cmd, result, session, send_flags);
+       return cmd;
+}
+
+static void
+rspamd_fuzzy_udp_check_callback(struct rspamd_fuzzy_reply *result, void *ud)
+{
+       struct fuzzy_udp_session *session = (struct fuzzy_udp_session *) ud;
+       int send_flags = 0;
 
+       struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_fill_reply(result, &session->common, &send_flags);
+       rspamd_fuzzy_make_udp_reply(cmd, result, session, send_flags);
        REF_RELEASE(session);
 }
 
 static void
-rspamd_fuzzy_process_udp_session(struct fuzzy_session *session)
+rspamd_fuzzy_tcp_check_callback(struct rspamd_fuzzy_reply *result, void *ud)
 {
-       gboolean is_shingle = FALSE, __attribute__((unused)) encrypted = FALSE;
-       struct rspamd_fuzzy_cmd *cmd = NULL;
-       struct rspamd_fuzzy_reply result;
-       struct fuzzy_peer_cmd up_cmd;
-       struct fuzzy_peer_request *up_req;
-       struct fuzzy_key_stat *ip_stat = NULL;
-       char hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1];
-       rspamd_inet_addr_t *naddr;
-       gpointer ptr;
-       gsize up_len = 0;
+       struct fuzzy_tcp_session *session = (struct fuzzy_tcp_session *) ud;
        int send_flags = 0;
 
+       struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_fill_reply(result, &session->common, &send_flags);
+       /* TODO write reply impl */
+       REF_RELEASE(session);
+}
+
+static struct rspamd_fuzzy_cmd *
+rspamd_fuzzy_prepare_cmd(struct fuzzy_common_session *session,
+                                                struct rspamd_fuzzy_reply *result,
+                                                int *send_flags,
+                                                size_t *up_len,
+                                                bool *final)
+{
+       struct fuzzy_key_stat *ip_stat = NULL;
+       struct rspamd_fuzzy_cmd *cmd = NULL;
+       bool is_encrypted = false, is_shingle = false;
+
        cmd = &session->cmd.basic;
 
        switch (session->cmd_type) {
        case CMD_NORMAL:
-               up_len = sizeof(session->cmd.basic);
+               *up_len = sizeof(session->cmd.basic);
                break;
        case CMD_SHINGLE:
-               up_len = sizeof(session->cmd);
-               is_shingle = TRUE;
-               send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
+               *up_len = sizeof(session->cmd);
+               is_shingle = true;
+               *send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
                break;
        case CMD_ENCRYPTED_NORMAL:
-               up_len = sizeof(session->cmd.basic);
-               encrypted = TRUE;
-               send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
+               *up_len = sizeof(session->cmd.basic);
+               is_encrypted = true;
+               *send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
                break;
        case CMD_ENCRYPTED_SHINGLE:
-               up_len = sizeof(session->cmd);
-               encrypted = TRUE;
-               is_shingle = TRUE;
-               send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE | RSPAMD_FUZZY_REPLY_ENCRYPTED;
+               *up_len = sizeof(session->cmd);
+               is_encrypted = true;
+               is_shingle = true;
+               *send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE | RSPAMD_FUZZY_REPLY_ENCRYPTED;
                break;
        default:
                msg_err("invalid command type: %d", session->cmd_type);
-               return;
+               return NULL;
        }
 
-       memset(&result, 0, sizeof(result));
-       memcpy(result.digest, cmd->digest, sizeof(result.digest));
-       result.v1.flag = cmd->flag;
-       result.v1.tag = cmd->tag;
+       memset(result, 0, sizeof(*result));
+       memcpy(result->digest, cmd->digest, sizeof(result->digest));
+       result->v1.flag = cmd->flag;
+       result->v1.tag = cmd->tag;
 
        if (session->ctx->lua_pre_handler_cbref != -1) {
                /* Start lua pre handler */
@@ -1497,19 +1483,18 @@ rspamd_fuzzy_process_udp_session(struct fuzzy_session *session)
 
                        if (ret) {
                                /* Artificial reply */
-                               result.v1.value = lua_tointeger(L, err_idx + 2);
+                               result->v1.value = lua_tointeger(L, err_idx + 2);
 
                                if (lua_isnumber(L, err_idx + 3)) {
-                                       result.v1.prob = lua_tonumber(L, err_idx + 3);
+                                       result->v1.prob = lua_tonumber(L, err_idx + 3);
                                }
                                else {
-                                       result.v1.prob = 0.0f;
+                                       result->v1.prob = 0.0f;
                                }
 
                                lua_settop(L, 0);
-                               rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
 
-                               return;
+                               return cmd;
                        }
                }
 
@@ -1518,18 +1503,20 @@ rspamd_fuzzy_process_udp_session(struct fuzzy_session *session)
 
 
        if (G_UNLIKELY(cmd == NULL || up_len == 0)) {
-               result.v1.value = 500;
-               result.v1.prob = 0.0f;
-               rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
-               return;
+               result->v1.value = 500;
+               result->v1.prob = 0.0f;
+               *final = true;
+
+               return cmd;
        }
 
-       if (session->ctx->encrypted_only && !encrypted) {
+       if (session->ctx->encrypted_only && !is_encrypted) {
                /* Do not accept unencrypted commands */
-               result.v1.value = 403;
-               result.v1.prob = 0.0f;
-               rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
-               return;
+               result->v1.value = 403;
+               result->v1.prob = 0.0f;
+               *final = true;
+
+               return cmd;
        }
 
        if (session->key && session->addr) {
@@ -1537,7 +1524,7 @@ rspamd_fuzzy_process_udp_session(struct fuzzy_session *session)
                                                                                 session->addr, -1);
 
                if (ip_stat == NULL) {
-                       naddr = rspamd_inet_address_copy(session->addr, NULL);
+                       rspamd_inet_addr_t *naddr = rspamd_inet_address_copy(session->addr, NULL);
                        ip_stat = g_malloc0(sizeof(*ip_stat));
                        REF_INIT_RETAIN(ip_stat, fuzzy_key_stat_dtor);
                        rspamd_lru_hash_insert(session->key->stat->last_ips,
@@ -1548,77 +1535,108 @@ rspamd_fuzzy_process_udp_session(struct fuzzy_session *session)
                session->ip_stat = ip_stat;
        }
 
+       /* Unset final flag */
+       *final = false;
+
+       return cmd;
+}
+
+static void
+rspamd_fuzzy_process_udp_session(struct fuzzy_udp_session *session)
+{
+       gboolean is_shingle = FALSE, __attribute__((unused)) encrypted = FALSE;
+       char hexbuf[rspamd_cryptobox_HASHBYTES * 2 + 1];
+       rspamd_inet_addr_t *naddr;
+       gpointer ptr;
+       int send_flags = 0;
+       bool final = false;
+       size_t up_len;
+
+       struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_prepare_cmd(&session->common, &session->reply.rep,
+                                                                                                                       &send_flags, &up_len, &final);
+
+
+       if (final) {
+               rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags);
+               REF_RELEASE(session);
+               return;
+       }
+
        if (cmd->cmd == FUZZY_CHECK) {
                bool can_continue = true;
-
-               if (session->ctx->ratelimit_buckets) {
-                       if (session->ctx->ratelimit_log_only) {
-                               (void) rspamd_fuzzy_check_ratelimit(session->ctx,
-                                                                                                       session->addr,
-                                                                                                       session->worker,
-                                                                                                       session->timestamp); /* Check but ignore */
+               if (session->common.ctx->ratelimit_buckets) {
+                       if (session->common.ctx->ratelimit_log_only) {
+                               (void) rspamd_fuzzy_check_ratelimit(session->common.ctx,
+                                                                                                       session->common.addr,
+                                                                                                       session->common.worker,
+                                                                                                       session->common.timestamp); /* Check but ignore */
                        }
                        else {
-                               can_continue = rspamd_fuzzy_check_ratelimit(session->ctx,
-                                                                                                                       session->addr,
-                                                                                                                       session->worker,
-                                                                                                                       session->timestamp);
+                               can_continue = rspamd_fuzzy_check_ratelimit(session->common.ctx,
+                                                                                                                       session->common.addr,
+                                                                                                                       session->common.worker,
+                                                                                                                       session->common.timestamp);
                        }
                }
 
                if (can_continue) {
                        REF_RETAIN(session);
-                       rspamd_fuzzy_backend_check(session->ctx->backend, cmd,
-                                                                          rspamd_fuzzy_check_callback, session);
+                       rspamd_fuzzy_backend_check(session->common.ctx->backend, cmd,
+                                                                          rspamd_fuzzy_udp_check_callback, session);
                }
                else {
-                       result.v1.value = 403;
-                       result.v1.prob = 0.0f;
-                       result.v1.flag = 0;
-                       rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+                       session->reply.rep.v1.value = 403;
+                       session->reply.rep.v1.prob = 0.0f;
+                       session->reply.rep.v1.flag = 0;
+                       rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags);
                }
        }
        else if (cmd->cmd == FUZZY_STAT) {
                /* Store approximation (if needed) */
-               result.v1.prob = session->ctx->stat.fuzzy_hashes;
+               session->reply.rep.v1.prob = session->common.ctx->stat.fuzzy_hashes;
                /* Store high qword in value and low qword in flag */
-               result.v1.value = (int32_t) ((uint64_t) session->ctx->stat.fuzzy_hashes >> 32);
-               result.v1.flag = (uint32_t) (session->ctx->stat.fuzzy_hashes & G_MAXUINT32);
-               rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+               session->reply.rep.v1.value = (int32_t) ((uint64_t) session->common.ctx->stat.fuzzy_hashes >> 32);
+               session->reply.rep.v1.flag = (uint32_t) (session->common.ctx->stat.fuzzy_hashes & G_MAXUINT32);
+               rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags);
        }
        else if (cmd->cmd == FUZZY_PING) {
-               result.v1.prob = 1.0f;
-               result.v1.value = cmd->value;
-               rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+               session->reply.rep.v1.prob = 1.0f;
+               session->reply.rep.v1.value = cmd->value;
+               rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags);
        }
        else {
-               if (rspamd_fuzzy_check_write(session->ctx, session->addr, session->key)) {
+               if (rspamd_fuzzy_check_write(session->common.ctx, session->common.addr, session->common.key)) {
                        /* Check whitelist */
-                       if (session->ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
+                       if (session->common.ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
                                rspamd_encode_hex_buf(cmd->digest, sizeof(cmd->digest),
                                                                          hexbuf, sizeof(hexbuf) - 1);
                                hexbuf[sizeof(hexbuf) - 1] = '\0';
 
-                               if (rspamd_match_hash_map(session->ctx->skip_hashes,
+                               if (rspamd_match_hash_map(session->common.ctx->skip_hashes,
                                                                                  hexbuf, sizeof(hexbuf) - 1)) {
-                                       result.v1.value = 401;
-                                       result.v1.prob = 0.0f;
+                                       session->reply.rep.v1.value = 401;
+                                       session->reply.rep.v1.prob = 0.0f;
 
                                        goto reply;
                                }
                        }
 
-                       if (session->ctx->weak_ids && kh_get(fuzzy_key_ids_set, session->ctx->weak_ids, cmd->flag) != kh_end(session->ctx->weak_ids)) {
+                       if (session->common.ctx->weak_ids &&
+                               kh_get(fuzzy_key_ids_set, session->common.ctx->weak_ids, cmd->flag) != kh_end(session->common.ctx->weak_ids)) {
                                /* Flag command as weak */
                                cmd->version |= RSPAMD_FUZZY_FLAG_WEAK;
                        }
 
-                       if (session->worker->index == 0 || session->ctx->peer_fd == -1) {
+                       struct fuzzy_peer_cmd up_cmd;
+                       struct fuzzy_peer_request *up_req;
+
+                       /* Decide if we can process add request by this worker */
+                       if (session->common.worker->index == 0 || session->common.ctx->peer_fd == -1) {
                                /* Just add to the queue */
                                up_cmd.is_shingle = is_shingle;
                                ptr = is_shingle ? (gpointer) &up_cmd.cmd.shingle : (gpointer) &up_cmd.cmd.normal;
                                memcpy(ptr, cmd, up_len);
-                               g_array_append_val(session->ctx->updates_pending, up_cmd);
+                               g_array_append_val(session->common.ctx->updates_pending, up_cmd);
                        }
                        else {
                                /* We need to send request to the peer */
@@ -1627,26 +1645,26 @@ rspamd_fuzzy_process_udp_session(struct fuzzy_session *session)
                                ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal;
                                memcpy(ptr, cmd, up_len);
 
-                               if (!fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) {
+                               if (!fuzzy_peer_try_send(session->common.ctx->peer_fd, up_req)) {
                                        up_req->io_ev.data = up_req;
                                        ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
-                                                          session->ctx->peer_fd, EV_WRITE);
-                                       ev_io_start(session->ctx->event_loop, &up_req->io_ev);
+                                                          session->common.ctx->peer_fd, EV_WRITE);
+                                       ev_io_start(session->common.ctx->event_loop, &up_req->io_ev);
                                }
                                else {
                                        g_free(up_req);
                                }
                        }
 
-                       result.v1.value = 0;
-                       result.v1.prob = 1.0f;
+                       session->reply.rep.v1.value = 0;
+                       session->reply.rep.v1.prob = 1.0f;
                }
                else {
-                       result.v1.value = 403;
-                       result.v1.prob = 0.0f;
+                       session->reply.rep.v1.value = 403;
+                       session->reply.rep.v1.prob = 0.0f;
                }
        reply:
-               rspamd_fuzzy_make_reply(cmd, &result, session, send_flags);
+               rspamd_fuzzy_make_udp_reply(cmd, &session->reply.rep, session, send_flags);
        }
 }
 
@@ -2013,12 +2031,9 @@ rspamd_fuzzy_cmd_from_wire(struct rspamd_fuzzy_storage_ctx *ctx,
        return TRUE;
 }
 
-
 static void
-fuzzy_session_destroy(gpointer d)
+fuzzy_common_session_dtor(struct fuzzy_common_session *session)
 {
-       struct fuzzy_session *session = d;
-
        rspamd_inet_address_free(session->addr);
        rspamd_explicit_memzero(session->nm, sizeof(session->nm));
        session->worker->nconns--;
@@ -2034,7 +2049,14 @@ fuzzy_session_destroy(gpointer d)
        if (session->key) {
                REF_RELEASE(session->key);
        }
+}
+
+static void
+fuzzy_udp_session_dtor(gpointer d)
+{
+       struct fuzzy_udp_session *session = d;
 
+       fuzzy_common_session_dtor(&session->common);
        g_free(session);
 }
 
@@ -2057,19 +2079,12 @@ static void
 tcp_session_dtor(struct fuzzy_tcp_session *tcp_session)
 {
        struct fuzzy_tcp_reply *rep;
-       if (tcp_session->addr) {
-               rspamd_inet_address_free(tcp_session->addr);
-       }
-
-       tcp_session->worker->nconns--;
 
-       if (tcp_session->ip_stat) {
-               REF_RELEASE(tcp_session->ip_stat);
-       }
+       fuzzy_common_session_dtor(&tcp_session->common);
 
-       if (tcp_session->ctx->event_loop) {
-               ev_timer_stop(tcp_session->ctx->event_loop, &tcp_session->tm);
-               ev_io_stop(tcp_session->ctx->event_loop, &tcp_session->io);
+       if (tcp_session->common.ctx->event_loop) {
+               ev_timer_stop(tcp_session->common.ctx->event_loop, &tcp_session->tm);
+               ev_io_stop(tcp_session->common.ctx->event_loop, &tcp_session->common.io);
        }
 
        DL_FOREACH(tcp_session->replies_queue, rep)
@@ -2077,7 +2092,8 @@ tcp_session_dtor(struct fuzzy_tcp_session *tcp_session)
                g_free(rep);
        }
 
-       close(tcp_session->fd);
+       /* For TCP session we also close a socket as it is owned by a session unlike UDP socket that is shared */
+       close(tcp_session->common.fd);
        g_free(tcp_session);
 }
 
@@ -2094,27 +2110,27 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c
        gpointer ptr;
        int send_flags = 0;
 
-       if (!rspamd_fuzzy_cmd_from_wire(tcp_session->ctx, tcp_session->addr, buf,
+       if (!rspamd_fuzzy_cmd_from_wire(tcp_session->common.ctx, tcp_session->common.addr, buf,
                                                                        buflen,
-                                                                       &tcp_session->key,
-                                                                       tcp_session->nm,
-                                                                       &tcp_session->cmd,
-                                                                       &tcp_session->epoch,
-                                                                       &tcp_session->cmd_type,
-                                                                       &tcp_session->extensions)) {
+                                                                       &tcp_session->common.key,
+                                                                       tcp_session->common.nm,
+                                                                       &tcp_session->common.cmd,
+                                                                       &tcp_session->common.epoch,
+                                                                       &tcp_session->common.cmd_type,
+                                                                       &tcp_session->common.extensions)) {
                /* Discard input */
-               tcp_session->ctx->stat.invalid_requests++;
+               tcp_session->common.ctx->stat.invalid_requests++;
                msg_debug("invalid fuzzy command of size %z received", buflen);
 
-               if (tcp_session->addr) {
-                       uint64_t *nerrors = rspamd_lru_hash_lookup(tcp_session->ctx->errors_ips,
-                                                                                                          tcp_session->addr, -1);
+               if (tcp_session->common.addr) {
+                       uint64_t *nerrors = rspamd_lru_hash_lookup(tcp_session->common.ctx->errors_ips,
+                                                                                                          tcp_session->common.addr, -1);
 
                        if (nerrors == NULL) {
                                nerrors = g_malloc(sizeof(*nerrors));
                                *nerrors = 1;
-                               rspamd_lru_hash_insert(tcp_session->ctx->errors_ips,
-                                                                          rspamd_inet_address_copy(tcp_session->addr, NULL),
+                               rspamd_lru_hash_insert(tcp_session->common.ctx->errors_ips,
+                                                                          rspamd_inet_address_copy(tcp_session->common.addr, NULL),
                                                                           nerrors, -1, -1);
                        }
                        else {
@@ -2125,153 +2141,54 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c
                return false;
        }
 
-       struct rspamd_fuzzy_cmd *cmd = &tcp_session->cmd.basic;
+       bool final = false;
        size_t up_len = 0;
+       struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_prepare_cmd(&tcp_session->common, &result, &send_flags, &up_len, &final);
 
-       switch (tcp_session->cmd_type) {
-       case CMD_NORMAL:
-               up_len = sizeof(tcp_session->cmd.basic);
-               break;
-       case CMD_SHINGLE:
-               up_len = sizeof(tcp_session->cmd);
-               is_shingle = TRUE;
-               send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE;
-               break;
-       case CMD_ENCRYPTED_NORMAL:
-               up_len = sizeof(tcp_session->cmd.basic);
-               encrypted = TRUE;
-               send_flags |= RSPAMD_FUZZY_REPLY_ENCRYPTED;
-               break;
-       case CMD_ENCRYPTED_SHINGLE:
-               up_len = sizeof(tcp_session->cmd);
-               encrypted = TRUE;
-               is_shingle = TRUE;
-               send_flags |= RSPAMD_FUZZY_REPLY_SHINGLE | RSPAMD_FUZZY_REPLY_ENCRYPTED;
-               break;
-       default:
-               msg_err("invalid command type: %d", tcp_session->cmd_type);
-               return false;
-       }
-
-       memset(&result, 0, sizeof(result));
-       memcpy(result.digest, cmd->digest, sizeof(result.digest));
-       result.v1.flag = cmd->flag;
-       result.v1.tag = cmd->tag;
+       if (G_UNLIKELY(cmd == NULL || final)) {
 
-       if (tcp_session->ctx->lua_pre_handler_cbref != -1) {
-               /* Start lua pre handler */
-               lua_State *L = tcp_session->ctx->cfg->lua_state;
-               int err_idx, ret;
-
-               lua_pushcfunction(L, &rspamd_lua_traceback);
-               err_idx = lua_gettop(L);
-               /* Preallocate stack (small opt) */
-               lua_checkstack(L, err_idx + 5);
-               /* function */
-               lua_rawgeti(L, LUA_REGISTRYINDEX, tcp_session->ctx->lua_pre_handler_cbref);
-               /* client IP */
-               rspamd_lua_ip_push(L, tcp_session->addr);
-               /* client command */
-               lua_pushinteger(L, cmd->cmd);
-               /* command value (push as rspamd_text) */
-               (void) lua_new_text(L, cmd->digest, sizeof(cmd->digest), FALSE);
-               /* is shingle */
-               lua_pushboolean(L, is_shingle);
-               /* TODO: add additional data maybe (encryption, pubkey, etc) */
-               rspamd_fuzzy_extensions_tolua(L, tcp_session->extensions);
-
-               if ((ret = lua_pcall(L, 5, LUA_MULTRET, err_idx)) != 0) {
-                       msg_err("call to lua_pre_handler lua "
-                                       "script failed (%d): %s",
-                                       ret, lua_tostring(L, -1));
-
-                       return false;
-               }
-               else {
-                       /* Return values order:
-                        * the first reply will be on err_idx + 1
-                        * if it is true, then we need to read the former ones:
-                        * 2-nd will be reply code
-                        * 3-rd will be probability (or 0.0 if missing)
-                        */
-                       ret = lua_toboolean(L, err_idx + 1);
-
-                       if (ret) {
-                               /* Artificial reply */
-                               result.v1.value = lua_tointeger(L, err_idx + 2);
-
-                               if (lua_isnumber(L, err_idx + 3)) {
-                                       result.v1.prob = lua_tonumber(L, err_idx + 3);
-                               }
-                               else {
-                                       result.v1.prob = 0.0f;
-                               }
-
-                               lua_settop(L, 0);
-                               /* TODO: write reply */
-
-                               return true;
-                       }
-               }
-
-               lua_settop(L, 0);
-       }
-
-
-       if (G_UNLIKELY(cmd == NULL || up_len == 0)) {
-               result.v1.value = 500;
-               result.v1.prob = 0.0f;
-               /* TODO: write reply */
-
-               return true;
-       }
-
-       if (tcp_session->ctx->encrypted_only && !encrypted) {
-               /* Do not accept unencrypted commands */
-               result.v1.value = 403;
-               result.v1.prob = 0.0f;
                /* TODO: write reply */
 
                return true;
        }
 
-       if (tcp_session->key && tcp_session->addr) {
-               ip_stat = rspamd_lru_hash_lookup(tcp_session->key->stat->last_ips,
-                                                                                tcp_session->addr, -1);
+       if (tcp_session->common.key && tcp_session->common.addr) {
+               ip_stat = rspamd_lru_hash_lookup(tcp_session->common.key->stat->last_ips,
+                                                                                tcp_session->common.addr, -1);
 
                if (ip_stat == NULL) {
-                       naddr = rspamd_inet_address_copy(tcp_session->addr, NULL);
+                       naddr = rspamd_inet_address_copy(tcp_session->common.addr, NULL);
                        ip_stat = g_malloc0(sizeof(*ip_stat));
                        REF_INIT_RETAIN(ip_stat, fuzzy_key_stat_dtor);
-                       rspamd_lru_hash_insert(tcp_session->key->stat->last_ips,
+                       rspamd_lru_hash_insert(tcp_session->common.key->stat->last_ips,
                                                                   naddr, ip_stat, -1, 0);
                }
 
                REF_RETAIN(ip_stat);
-               tcp_session->ip_stat = ip_stat;
+               tcp_session->common.ip_stat = ip_stat;
        }
 
        if (cmd->cmd == FUZZY_CHECK) {
                bool can_continue = true;
 
-               if (tcp_session->ctx->ratelimit_buckets) {
-                       if (tcp_session->ctx->ratelimit_log_only) {
-                               (void) rspamd_fuzzy_check_ratelimit(tcp_session->ctx, tcp_session->addr,
-                                                                                                       tcp_session->worker,
-                                                                                                       ev_now(tcp_session->ctx->event_loop)); /* Check but ignore */
+               if (tcp_session->common.ctx->ratelimit_buckets) {
+                       if (tcp_session->common.ctx->ratelimit_log_only) {
+                               (void) rspamd_fuzzy_check_ratelimit(tcp_session->common.ctx, tcp_session->common.addr,
+                                                                                                       tcp_session->common.worker,
+                                                                                                       ev_now(tcp_session->common.ctx->event_loop)); /* Check but ignore */
                        }
                        else {
-                               can_continue = rspamd_fuzzy_check_ratelimit(tcp_session->ctx, tcp_session->addr,
-                                                                                                                       tcp_session->worker,
-                                                                                                                       ev_now(tcp_session->ctx->event_loop));
+                               can_continue = rspamd_fuzzy_check_ratelimit(tcp_session->common.ctx, tcp_session->common.addr,
+                                                                                                                       tcp_session->common.worker,
+                                                                                                                       ev_now(tcp_session->common.ctx->event_loop));
                        }
                }
 
                if (can_continue) {
                        REF_RETAIN(tcp_session);
                        /* TODO: use a different callback */
-                       rspamd_fuzzy_backend_check(tcp_session->ctx->backend, cmd,
-                                                                          rspamd_fuzzy_check_callback, tcp_session);
+                       rspamd_fuzzy_backend_check(tcp_session->common.ctx->backend, cmd,
+                                                                          rspamd_fuzzy_tcp_check_callback, tcp_session);
                }
                else {
                        result.v1.value = 403;
@@ -2284,10 +2201,10 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c
        }
        else if (cmd->cmd == FUZZY_STAT) {
                /* Store approximation (if needed) */
-               result.v1.prob = tcp_session->ctx->stat.fuzzy_hashes;
+               result.v1.prob = tcp_session->common.ctx->stat.fuzzy_hashes;
                /* Store high qword in value and low qword in flag */
-               result.v1.value = (int32_t) ((uint64_t) tcp_session->ctx->stat.fuzzy_hashes >> 32);
-               result.v1.flag = (uint32_t) (tcp_session->ctx->stat.fuzzy_hashes & G_MAXUINT32);
+               result.v1.value = (int32_t) ((uint64_t) tcp_session->common.ctx->stat.fuzzy_hashes >> 32);
+               result.v1.flag = (uint32_t) (tcp_session->common.ctx->stat.fuzzy_hashes & G_MAXUINT32);
                /* TODO: write reply */
        }
        else if (cmd->cmd == FUZZY_PING) {
@@ -2296,14 +2213,14 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c
                /* TODO: write reply */
        }
        else {
-               if (rspamd_fuzzy_check_write(tcp_session->ctx, tcp_session->addr, tcp_session->key)) {
+               if (rspamd_fuzzy_check_write(tcp_session->common.ctx, tcp_session->common.addr, tcp_session->common.key)) {
                        /* Check whitelist */
-                       if (tcp_session->ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
+                       if (tcp_session->common.ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
                                rspamd_encode_hex_buf(cmd->digest, sizeof(cmd->digest),
                                                                          hexbuf, sizeof(hexbuf) - 1);
                                hexbuf[sizeof(hexbuf) - 1] = '\0';
 
-                               if (rspamd_match_hash_map(tcp_session->ctx->skip_hashes,
+                               if (rspamd_match_hash_map(tcp_session->common.ctx->skip_hashes,
                                                                                  hexbuf, sizeof(hexbuf) - 1)) {
                                        result.v1.value = 401;
                                        result.v1.prob = 0.0f;
@@ -2312,18 +2229,18 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c
                                }
                        }
 
-                       if (tcp_session->ctx->weak_ids &&
-                               kh_get(fuzzy_key_ids_set, tcp_session->ctx->weak_ids, cmd->flag) != kh_end(tcp_session->ctx->weak_ids)) {
+                       if (tcp_session->common.ctx->weak_ids &&
+                               kh_get(fuzzy_key_ids_set, tcp_session->common.ctx->weak_ids, cmd->flag) != kh_end(tcp_session->common.ctx->weak_ids)) {
                                /* Flag command as weak */
                                cmd->version |= RSPAMD_FUZZY_FLAG_WEAK;
                        }
 
-                       if (tcp_session->worker->index == 0 || tcp_session->ctx->peer_fd == -1) {
+                       if (tcp_session->common.worker->index == 0 || tcp_session->common.ctx->peer_fd == -1) {
                                /* Just add to the queue */
                                up_cmd.is_shingle = is_shingle;
                                ptr = is_shingle ? (gpointer) &up_cmd.cmd.shingle : (gpointer) &up_cmd.cmd.normal;
                                memcpy(ptr, cmd, up_len);
-                               g_array_append_val(tcp_session->ctx->updates_pending, up_cmd);
+                               g_array_append_val(tcp_session->common.ctx->updates_pending, up_cmd);
                        }
                        else {
                                /* We need to send request to the peer */
@@ -2332,11 +2249,11 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c
                                ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal;
                                memcpy(ptr, cmd, up_len);
 
-                               if (!fuzzy_peer_try_send(tcp_session->ctx->peer_fd, up_req)) {
+                               if (!fuzzy_peer_try_send(tcp_session->common.ctx->peer_fd, up_req)) {
                                        up_req->io_ev.data = up_req;
                                        ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
-                                                          tcp_session->ctx->peer_fd, EV_WRITE);
-                                       ev_io_start(tcp_session->ctx->event_loop, &up_req->io_ev);
+                                                          tcp_session->common.ctx->peer_fd, EV_WRITE);
+                                       ev_io_start(tcp_session->common.ctx->event_loop, &up_req->io_ev);
                                }
                                else {
                                        g_free(up_req);
@@ -2419,18 +2336,18 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
 {
        struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;
 
-       msg_debug_fuzzy_storage("got io for %s: %d", rspamd_inet_address_to_string(tcp_session->addr), revents);
+       msg_debug_fuzzy_storage("got io for %s: %d", rspamd_inet_address_to_string(tcp_session->common.addr), revents);
 
        if (revents & EV_READ) {
                ssize_t r;
 
-               r = read(tcp_session->fd, tcp_session->input_buf + tcp_session->bytes_unprocessed,
+               r = read(tcp_session->common.fd, tcp_session->input_buf + tcp_session->bytes_unprocessed,
                                 sizeof(tcp_session->input_buf) - tcp_session->bytes_unprocessed);
 
                if (r == -1) {
                        /* Cannot read anything */
                        msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: %s",
-                                                                       rspamd_inet_address_to_string(tcp_session->addr),
+                                                                       rspamd_inet_address_to_string(tcp_session->common.addr),
                                                                        strerror(errno));
 
                        REF_RELEASE(tcp_session);
@@ -2438,7 +2355,7 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
                else if (r == 0) {
                        /* Got EOF */
                        msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: EOF",
-                                                                       rspamd_inet_address_to_string(tcp_session->addr));
+                                                                       rspamd_inet_address_to_string(tcp_session->common.addr));
 
                        REF_RELEASE(tcp_session);
                }
@@ -2449,11 +2366,11 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
                        else {
                                if (tcp_session->replies_queue != NULL) {
                                        /* No more replies */
-                                       ev_io_set(w, tcp_session->fd, EV_READ);
+                                       ev_io_set(w, tcp_session->common.fd, EV_READ);
                                }
                                else {
                                        /* Wait for another write readiness */
-                                       ev_io_set(w, tcp_session->fd, EV_WRITE | EV_READ);
+                                       ev_io_set(w, tcp_session->common.fd, EV_WRITE | EV_READ);
                                }
 
                                ev_io_start(loop, w);
@@ -2484,7 +2401,7 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
                        }
 
                        /* Try to write everything */
-                       ssize_t r = writev(tcp_session->fd, iov, n);
+                       ssize_t r = writev(tcp_session->common.fd, iov, n);
 
                        if (n > 32) {
                                g_free(iov);
@@ -2493,7 +2410,7 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
                        if (r == -1) {
                                /* Cannot write anything */
                                msg_debug_fuzzy_storage("failed TCP connection from %s; cannot write: %s",
-                                                                               rspamd_inet_address_to_string(tcp_session->addr),
+                                                                               rspamd_inet_address_to_string(tcp_session->common.addr),
                                                                                strerror(errno));
 
                                REF_RELEASE(tcp_session);
@@ -2521,11 +2438,11 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
 
                                if (tcp_session->replies_queue != NULL) {
                                        /* No more replies */
-                                       ev_io_set(w, tcp_session->fd, EV_READ);
+                                       ev_io_set(w, tcp_session->common.fd, EV_READ);
                                }
                                else {
                                        /* Wait for another write readiness */
-                                       ev_io_set(w, tcp_session->fd, EV_WRITE | EV_READ);
+                                       ev_io_set(w, tcp_session->common.fd, EV_WRITE | EV_READ);
                                }
 
                                ev_io_start(loop, w);
@@ -2541,7 +2458,7 @@ tcp_fuzzy_socket_timeout(EV_P_ ev_timer *w, int revents)
 {
        struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;
 
-       msg_debug_fuzzy_storage("timed out TCP connection from %s", rspamd_inet_address_to_string(tcp_session->addr));
+       msg_debug_fuzzy_storage("timed out TCP connection from %s", rspamd_inet_address_to_string(tcp_session->common.addr));
 
        REF_RELEASE(tcp_session);
 }
@@ -2574,16 +2491,18 @@ accept_tcp_fuzzy_socket(EV_P_ ev_io *w, int revents)
        }
 
        tcp_session = g_malloc0(sizeof(*tcp_session));
-       tcp_session->addr = addr;
-       tcp_session->ctx = ctx;
-       tcp_session->fd = nfd;
+       tcp_session->common.addr = addr;
+       tcp_session->common.ctx = ctx;
+       tcp_session->common.fd = nfd;
+       tcp_session->common.worker = worker;
+       tcp_session->common.timestamp = ev_now(ctx->event_loop);
        REF_INIT_RETAIN(tcp_session, tcp_session_dtor);
-       ev_io_init(&tcp_session->io, tcp_fuzzy_socket_io, nfd, EV_READ);
+       ev_io_init(&tcp_session->common.io, tcp_fuzzy_socket_io, nfd, EV_READ);
        ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, ctx->tcp_timeout);
        tcp_session->tm.data = tcp_session;
-       tcp_session->io.data = tcp_session;
+       tcp_session->common.io.data = tcp_session;
        ev_timer_start(ctx->event_loop, &tcp_session->tm);
-       ev_io_start(ctx->event_loop, &tcp_session->io);
+       ev_io_start(ctx->event_loop, &tcp_session->common.io);
 
        msg_debug_fuzzy_storage("accepted TCP connection from %s", rspamd_inet_address_to_string(addr));
 }
@@ -2597,7 +2516,7 @@ accept_udp_fuzzy_socket(EV_P_ ev_io *w, int revents)
 {
        struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
        struct rspamd_fuzzy_storage_ctx *ctx;
-       struct fuzzy_session *session;
+       struct fuzzy_udp_session *session;
        gssize r, msg_len;
        uint64_t *nerrors;
        struct iovec iovs[MSGVEC_LEN];
@@ -2673,12 +2592,12 @@ accept_udp_fuzzy_socket(EV_P_ ev_io *w, int revents)
                                }
 
                                session = g_malloc0(sizeof(*session));
-                               REF_INIT_RETAIN(session, fuzzy_session_destroy);
-                               session->worker = worker;
-                               session->fd = w->fd;
-                               session->ctx = ctx;
-                               session->timestamp = ev_now(ctx->event_loop);
-                               session->addr = client_addr;
+                               REF_INIT_RETAIN(session, fuzzy_udp_session_dtor);
+                               session->common.worker = worker;
+                               session->common.fd = w->fd;
+                               session->common.ctx = ctx;
+                               session->common.timestamp = ev_now(ctx->event_loop);
+                               session->common.addr = client_addr;
                                worker->nconns++;
 
                                /* Each message can have its length in case of recvmmsg */
@@ -2688,12 +2607,12 @@ accept_udp_fuzzy_socket(EV_P_ ev_io *w, int revents)
 
                                if (rspamd_fuzzy_cmd_from_wire(ctx, client_addr, iovs[i].iov_base,
                                                                                           msg_len,
-                                                                                          &session->key,
-                                                                                          session->nm,
-                                                                                          &session->cmd,
-                                                                                          &session->epoch,
-                                                                                          &session->cmd_type,
-                                                                                          &session->extensions)) {
+                                                                                          &session->common.key,
+                                                                                          session->common.nm,
+                                                                                          &session->common.cmd,
+                                                                                          &session->common.epoch,
+                                                                                          &session->common.cmd_type,
+                                                                                          &session->common.extensions)) {
                                        /* Check shingles count sanity */
                                        rspamd_fuzzy_process_udp_session(session);
                                }
@@ -2702,15 +2621,15 @@ accept_udp_fuzzy_socket(EV_P_ ev_io *w, int revents)
                                        ctx->stat.invalid_requests++;
                                        msg_debug("invalid fuzzy command of size %z received", r);
 
-                                       if (session->addr) {
-                                               nerrors = rspamd_lru_hash_lookup(session->ctx->errors_ips,
-                                                                                                                session->addr, -1);
+                                       if (session->common.addr) {
+                                               nerrors = rspamd_lru_hash_lookup(session->common.ctx->errors_ips,
+                                                                                                                session->common.addr, -1);
 
                                                if (nerrors == NULL) {
                                                        nerrors = g_malloc(sizeof(*nerrors));
                                                        *nerrors = 1;
-                                                       rspamd_lru_hash_insert(session->ctx->errors_ips,
-                                                                                                  rspamd_inet_address_copy(session->addr, NULL),
+                                                       rspamd_lru_hash_insert(session->common.ctx->errors_ips,
+                                                                                                  rspamd_inet_address_copy(session->common.addr, NULL),
                                                                                                   nerrors, -1, -1);
                                                }
                                                else {