]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Further tcp logic implementation
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 4 May 2024 14:58:36 +0000 (15:58 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 25 Jun 2024 13:27:55 +0000 (14:27 +0100)
src/fuzzy_storage.c

index 0ecac635b104bda03b7c40ac99abf4a3d893b1c4..bf0687bfa38ce0617476b2e7ab31c9e00a76edd8 100644 (file)
@@ -226,15 +226,15 @@ enum fuzzy_cmd_type {
 
 #define FUZZY_TCP_BUFFER_LENGTH 8192
 
-struct rspamd_fuzzy_tcp_reply {
-       uint16_t size_hdr;                       /* We have to write this as well */
-       struct rspamd_fuzzy_encrypted_reply rep; /* Payload */
+struct rspamd_fuzzy_tcp_frame {
+       uint16_t size_hdr;                           /* We have to write this as well */
+       struct rspamd_fuzzy_encrypted_reply payload; /* Payload */
 };
 
-struct fuzzy_tcp_reply {
-       struct rspamd_fuzzy_tcp_reply rep;   /* Serialized reply */
-       unsigned int written;                /* How many bytes have we already written */
-       struct fuzzy_tcp_reply *prev, *next; /* Link */
+struct fuzzy_tcp_reply_queue_elt {
+       struct rspamd_fuzzy_tcp_frame rep;             /* Serialized reply */
+       unsigned int written;                          /* How many bytes have we already written */
+       struct fuzzy_tcp_reply_queue_elt *prev, *next; /* Link */
 };
 
 struct fuzzy_common_session {
@@ -271,7 +271,7 @@ struct fuzzy_tcp_session {
        struct fuzzy_common_session common;
        ref_entry_t ref;
 
-       struct fuzzy_tcp_reply *replies_queue;
+       struct fuzzy_tcp_reply_queue_elt *replies_queue;
        unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH];
 };
 
@@ -296,6 +296,7 @@ struct rspamd_updates_cbdata {
 
 
 static void rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session);
+static bool rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session, struct fuzzy_tcp_reply_queue_elt *reply);
 static gboolean rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx,
                                                                                                   const char *source, gboolean final);
 static gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx,
@@ -305,6 +306,7 @@ static void rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx
                                                                                                const char *reason);
 static struct fuzzy_key *fuzzy_add_keypair_from_ucl(const ucl_object_t *obj,
                                                                                                        khash_t(rspamd_fuzzy_keys_hash) * target);
+static void rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents);
 
 struct fuzzy_keymap_ucl_buf {
        rspamd_fstring_t *buf;
@@ -934,6 +936,48 @@ rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session)
        }
 }
 
+static bool
+rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session, struct fuzzy_tcp_reply_queue_elt *reply)
+{
+       ssize_t r;
+       gconstpointer data = &reply->rep;
+       size_t len = reply->rep.size_hdr + sizeof(reply->rep.size_hdr);
+
+       r = write(session->common.fd, data, len);
+
+       if (r == -1) {
+               if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
+                       /* Attach this reply to the list of the replies pending */
+                       DL_APPEND(session->replies_queue, reply);
+                       /* Grab reference to avoid early destruction */
+                       REF_RETAIN(session);
+                       ev_io_init(&session->common.io,
+                                          rspamd_fuzzy_tcp_io, session->common.fd, EV_WRITE | EV_READ);
+                       ev_io_start(session->common.ctx->event_loop, &session->common.io);
+               }
+               else {
+                       msg_err("error while writing reply: %s", strerror(errno));
+                       return false;
+               }
+       }
+       else if ((size_t) r < len) {
+               reply->written = r;
+               DL_APPEND(session->replies_queue, reply);
+               /* Grab reference to avoid early destruction */
+               REF_RETAIN(session);
+               /* Partial write */
+               ev_io_init(&session->common.io,
+                                  rspamd_fuzzy_tcp_io, session->common.fd, EV_WRITE);
+               ev_io_start(session->common.ctx->event_loop, &session->common.io);
+       }
+       else {
+               /* Full write, no need to preserve queue elt */
+               g_free(reply);
+       }
+
+       return true;
+}
+
 static void
 rspamd_fuzzy_update_key_stat(gboolean matched,
                                                         struct fuzzy_key_stat *key_stat,
@@ -1153,8 +1197,86 @@ rspamd_fuzzy_make_udp_reply(struct rspamd_fuzzy_cmd *cmd,
        rspamd_fuzzy_udp_write_reply(session);
 }
 
+static struct fuzzy_tcp_reply_queue_elt *
+rspamd_fuzzy_make_tcp_reply(struct rspamd_fuzzy_cmd *cmd,
+                                                       struct rspamd_fuzzy_reply *result,
+                                                       struct fuzzy_tcp_session *session,
+                                                       int flags)
+{
+       gsize len;
+       struct fuzzy_tcp_reply_queue_elt *queue_elt;
+
+       if (cmd) {
+               queue_elt = g_malloc0(sizeof(*queue_elt));
+
+               struct rspamd_fuzzy_tcp_frame *reply = &queue_elt->rep;
+
+               result->v1.tag = cmd->tag;
+               memcpy(&reply->payload.rep, result, sizeof(*result));
+
+               if (flags & RSPAMD_FUZZY_REPLY_DELAY) {
+                       /* Hash is too fresh, need to delay it */
+                       reply->payload.rep.ts = 0;
+                       reply->payload.rep.v1.prob = 0.0f;
+                       reply->payload.rep.v1.value = 0;
+               }
+               else if (!rspamd_fuzzy_can_reply(&session->common, flags & RSPAMD_FUZZY_REPLY_ENCRYPTED,
+                                                                                reply->payload.rep.v1.flag, reply->payload.rep.v1.prob)) {
+                       /* Hash is from a forbidden flag */
+                       reply->payload.rep.ts = 0;
+                       reply->payload.rep.v1.prob = 0.0f;
+                       reply->payload.rep.v1.value = 0;
+                       reply->payload.rep.v1.flag = 0;
+               }
+               else {
+                       /* Update stats before encryption */
+                       if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) {
+                               rspamd_fuzzy_update_stats(session->common.ctx,
+                                                                                 session->common.epoch,
+                                                                                 reply->payload.rep.v1.prob > 0.5f,
+                                                                                 flags & RSPAMD_FUZZY_REPLY_SHINGLE,
+                                                                                 flags & RSPAMD_FUZZY_REPLY_DELAY,
+                                                                                 session->common.key,
+                                                                                 session->common.ip_stat,
+                                                                                 cmd->cmd,
+                                                                                 &reply->payload.rep,
+                                                                                 session->common.timestamp);
+                       }
+               }
+
+               if (flags & RSPAMD_FUZZY_REPLY_ENCRYPTED) {
+                       /* We need also to encrypt reply */
+                       ottery_rand_bytes(reply->payload.hdr.nonce,
+                                                         sizeof(reply->payload.hdr.nonce));
+
+                       /*
+                        * For old replies we need to encrypt just old part, otherwise
+                        * decryption would fail due to mac verification mistake
+                        */
+
+                       len = sizeof(reply->payload);
+                       rspamd_cryptobox_encrypt_nm_inplace((unsigned char *) &reply->payload,
+                                                                                               len,
+                                                                                               reply->payload.hdr.nonce,
+                                                                                               session->common.nm,
+                                                                                               reply->payload.hdr.mac,
+                                                                                               RSPAMD_CRYPTOBOX_MODE_25519);
+               }
+               else {
+                       len = sizeof(reply->payload.rep);
+               }
+
+               reply->size_hdr = len;
+
+               return queue_elt;
+       }
+       else {
+               return NULL;
+       }
+}
+
 static gboolean
-fuzzy_peer_try_send(int fd, struct fuzzy_peer_request *up_req)
+rspamd_fuzzy_peer_try_send(int fd, struct fuzzy_peer_request *up_req)
 {
        gssize r;
 
@@ -1168,11 +1290,11 @@ fuzzy_peer_try_send(int fd, struct fuzzy_peer_request *up_req)
 }
 
 static void
-fuzzy_peer_send_io(EV_P_ ev_io *w, int revents)
+rspamd_fuzzy_peer_send_io(EV_P_ ev_io *w, int revents)
 {
        struct fuzzy_peer_request *up_req = (struct fuzzy_peer_request *) w->data;
 
-       if (!fuzzy_peer_try_send(w->fd, up_req)) {
+       if (!rspamd_fuzzy_peer_try_send(w->fd, up_req)) {
                msg_err("cannot send update request to the peer: %s", strerror(errno));
        }
 
@@ -1365,9 +1487,9 @@ rspamd_fuzzy_fill_reply(struct rspamd_fuzzy_reply *result, struct fuzzy_common_s
                                           sizeof(up_req->cmd.cmd.shingle.sgl));
                        }
 
-                       if (!fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) {
+                       if (!rspamd_fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) {
                                up_req->io_ev.data = up_req;
-                               ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
+                               ev_io_init(&up_req->io_ev, rspamd_fuzzy_peer_send_io,
                                                   session->ctx->peer_fd, EV_WRITE);
                                ev_io_start(session->ctx->event_loop, &up_req->io_ev);
                        }
@@ -1398,7 +1520,24 @@ rspamd_fuzzy_tcp_check_callback(struct rspamd_fuzzy_reply *result, void *ud)
        int send_flags = 0;
 
        struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_fill_reply(result, &session->common, &send_flags);
-       /* TODO write reply impl */
+
+       if (cmd) {
+
+               struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, result, session, send_flags);
+
+               if (reply) {
+                       if (!rspamd_fuzzy_tcp_write_reply(session, reply)) {
+                               /* Enforced deletion */
+                               REF_RELEASE(session);
+                       }
+               }
+       }
+       else {
+               /* Enforced deletion */
+               REF_RELEASE(session);
+       }
+
+       /* Free our refcount */
        REF_RELEASE(session);
 }
 
@@ -1645,9 +1784,9 @@ rspamd_fuzzy_process_udp_session(struct fuzzy_udp_session *session)
                                ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal;
                                memcpy(ptr, cmd, up_len);
 
-                               if (!fuzzy_peer_try_send(session->common.ctx->peer_fd, up_req)) {
+                               if (!rspamd_fuzzy_peer_try_send(session->common.ctx->peer_fd, up_req)) {
                                        up_req->io_ev.data = up_req;
-                                       ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
+                                       ev_io_init(&up_req->io_ev, rspamd_fuzzy_peer_send_io,
                                                           session->common.ctx->peer_fd, EV_WRITE);
                                        ev_io_start(session->common.ctx->event_loop, &up_req->io_ev);
                                }
@@ -2078,7 +2217,7 @@ union sa_union {
 static void
 tcp_session_dtor(struct fuzzy_tcp_session *tcp_session)
 {
-       struct fuzzy_tcp_reply *rep;
+       struct fuzzy_tcp_reply_queue_elt *rep;
 
        fuzzy_common_session_dtor(&tcp_session->common);
 
@@ -2146,8 +2285,13 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c
        struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_prepare_cmd(&tcp_session->common, &result, &send_flags, &up_len, &final);
 
        if (G_UNLIKELY(cmd == NULL || final)) {
+               struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags);
 
-               /* TODO: write reply */
+               if (reply) {
+                       rspamd_fuzzy_tcp_write_reply(tcp_session, reply);
+               }
+
+               REF_RELEASE(tcp_session);
 
                return true;
        }
@@ -2194,7 +2338,14 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c
                        result.v1.value = 403;
                        result.v1.prob = 0.0f;
                        result.v1.flag = 0;
-                       /* TODO: write reply */
+
+                       struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags);
+
+                       if (reply) {
+                               rspamd_fuzzy_tcp_write_reply(tcp_session, reply);
+                       }
+
+                       REF_RELEASE(tcp_session);
 
                        return false;
                }
@@ -2205,14 +2356,35 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c
                /* Store high qword in value and low qword in flag */
                result.v1.value = (int32_t) ((uint64_t) tcp_session->common.ctx->stat.fuzzy_hashes >> 32);
                result.v1.flag = (uint32_t) (tcp_session->common.ctx->stat.fuzzy_hashes & G_MAXUINT32);
-               /* TODO: write reply */
+
+               struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags);
+
+               if (reply) {
+                       if (!rspamd_fuzzy_tcp_write_reply(tcp_session, reply)) {
+                               REF_RELEASE(tcp_session);
+                       }
+               }
+               else {
+                       REF_RELEASE(tcp_session);
+               }
        }
        else if (cmd->cmd == FUZZY_PING) {
                result.v1.prob = 1.0f;
                result.v1.value = cmd->value;
-               /* TODO: write reply */
+               struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags);
+
+               if (reply) {
+                       if (!rspamd_fuzzy_tcp_write_reply(tcp_session, reply)) {
+                               REF_RELEASE(tcp_session);
+                       }
+               }
+               else {
+                       REF_RELEASE(tcp_session);
+               }
        }
        else {
+               struct fuzzy_tcp_reply_queue_elt *reply;
+
                if (rspamd_fuzzy_check_write(tcp_session->common.ctx, tcp_session->common.addr, tcp_session->common.key)) {
                        /* Check whitelist */
                        if (tcp_session->common.ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) {
@@ -2249,9 +2421,9 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c
                                ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal;
                                memcpy(ptr, cmd, up_len);
 
-                               if (!fuzzy_peer_try_send(tcp_session->common.ctx->peer_fd, up_req)) {
+                               if (!rspamd_fuzzy_peer_try_send(tcp_session->common.ctx->peer_fd, up_req)) {
                                        up_req->io_ev.data = up_req;
-                                       ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
+                                       ev_io_init(&up_req->io_ev, rspamd_fuzzy_peer_send_io,
                                                           tcp_session->common.ctx->peer_fd, EV_WRITE);
                                        ev_io_start(tcp_session->common.ctx->event_loop, &up_req->io_ev);
                                }
@@ -2268,14 +2440,23 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c
                        result.v1.prob = 0.0f;
                }
        reply:
-               /* TODO: write reply */
+               reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags);
+
+               if (reply) {
+                       if (!rspamd_fuzzy_tcp_write_reply(tcp_session, reply)) {
+                               REF_RELEASE(tcp_session);
+                       }
+               }
+               else {
+                       REF_RELEASE(tcp_session);
+               }
        }
 
        return true;
 }
 
 static bool
-fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_read)
+rspamd_fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_read)
 {
        if (bytes_read <= 0) {
                /* Apparent garbage */
@@ -2311,7 +2492,11 @@ fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_rea
                                msg_debug_fuzzy_storage("can process next command: %d bytes available, %d bytes required",
                                                                                (int) buf_avail, bytes_required);
 
-                               /* TODO: add this */
+                               if (!rspamd_fuzzy_process_tcp_frame(tcp_session, p, bytes_required)) {
+                                       /* Error processing, give up */
+                                       return false;
+                               }
+
                                buf_avail -= bytes_required;
                                p += bytes_required;
                                tcp_session->cur_frame_state = 0;
@@ -2332,7 +2517,7 @@ fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_rea
 }
 
 static void
-tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
+rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents)
 {
        struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;
 
@@ -2360,7 +2545,7 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
                        REF_RELEASE(tcp_session);
                }
                else {
-                       if (!fuzzy_tcp_process_input(tcp_session, r)) {
+                       if (!rspamd_fuzzy_tcp_process_input(tcp_session, r)) {
                                REF_RELEASE(tcp_session);
                        }
                        else {
@@ -2382,7 +2567,7 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
        else if (revents & EV_WRITE) {
                if (tcp_session->replies_queue) {
                        /* Try to write as many replies, as possible */
-                       struct fuzzy_tcp_reply *rep, *tmp;
+                       struct fuzzy_tcp_reply_queue_elt *rep, *tmp;
                        int n = 0;
 
                        DL_FOREACH(tcp_session->replies_queue, rep)
@@ -2454,7 +2639,7 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
 }
 
 static void
-tcp_fuzzy_socket_timeout(EV_P_ ev_timer *w, int revents)
+rspamd_fuzzy_tcp_timeout(EV_P_ ev_timer *w, int revents)
 {
        struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;
 
@@ -2497,8 +2682,8 @@ accept_tcp_fuzzy_socket(EV_P_ ev_io *w, int revents)
        tcp_session->common.worker = worker;
        tcp_session->common.timestamp = ev_now(ctx->event_loop);
        REF_INIT_RETAIN(tcp_session, tcp_session_dtor);
-       ev_io_init(&tcp_session->common.io, tcp_fuzzy_socket_io, nfd, EV_READ);
-       ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, ctx->tcp_timeout);
+       ev_io_init(&tcp_session->common.io, rspamd_fuzzy_tcp_io, nfd, EV_READ);
+       ev_timer_init(&tcp_session->tm, rspamd_fuzzy_tcp_timeout, ctx->tcp_timeout, ctx->tcp_timeout);
        tcp_session->tm.data = tcp_session;
        tcp_session->common.io.data = tcp_session;
        ev_timer_start(ctx->event_loop, &tcp_session->tm);