]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Implement write logic for TCP
authorVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 25 Apr 2024 15:22:38 +0000 (16:22 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 25 Jun 2024 13:27:54 +0000 (14:27 +0100)
src/fuzzy_storage.c

index 529bc530f2b2f0f2adec038c68369401a56dc948..32de3e1a8ad699c0511f80fbcc1b6b42c3e81ec9 100644 (file)
@@ -226,10 +226,15 @@ enum fuzzy_cmd_type {
 
 #define FUZZY_TCP_BUFFER_LENGTH 8192
 
+struct rspamd_fuzzy_tcp_reply {
+       uint16_t size_hdr;                       /* We have to write this as well */
+       struct rspamd_fuzzy_encrypted_reply rep; /* Payload */
+};
+
 struct fuzzy_tcp_reply {
-       struct rspamd_fuzzy_encrypted_reply rep; /* Serialized reply */
-       unsigned int written;                    /* How many have we already written */
-       struct fuzzy_tcp_reply *prev, *next;     /* Link */
+       struct rspamd_fuzzy_tcp_reply rep;   /* Serialized reply */
+       unsigned int written;                /* How many bytes have we already written */
+       struct fuzzy_tcp_reply *prev, *next; /* Link */
 };
 
 struct fuzzy_tcp_session {
@@ -2036,6 +2041,90 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
        struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;
 
        msg_debug_fuzzy_storage("got io for %s: %d", rspamd_inet_address_to_string(tcp_session->addr), revents);
+
+       if (revents & EV_READ) {
+               ssize_t r;
+
+               r = read(tcp_session->fd, tcp_session->input_buf + tcp_session->bytes_unprocessed,
+                                sizeof(tcp_session->input_buf) - tcp_session->bytes_unprocessed);
+
+               if (r == -1) {
+                       /* Cannot read anything */
+               }
+               else if (r == 0) {
+                       /* Got EOF */
+               }
+               else {
+               }
+       }
+       else if (revents & EV_WRITE) {
+               if (tcp_session->replies_queue) {
+                       /* Try to write as many replies, as possible */
+                       struct fuzzy_tcp_reply *rep, *tmp;
+                       int n = 0;
+
+                       DL_FOREACH(tcp_session->replies_queue, rep)
+                       {
+                               n++;
+                       }
+
+                       /* Allocate iovec */
+                       struct iovec *iov = g_malloc(sizeof(struct iovec) * n);
+                       n = 0;
+                       DL_FOREACH(tcp_session->replies_queue, rep)
+                       {
+                               iov[n].iov_base = ((unsigned char *) &rep->rep) + rep->written;
+                               iov[n].iov_len = sizeof(rep->rep) - rep->written;
+                               n++;
+                       }
+
+                       /* Try to write everything */
+                       ssize_t r = writev(tcp_session->fd, iov, n);
+
+                       if (r == -1) {
+                               /* Cannot write anything */
+                               msg_debug_fuzzy_storage("failed TCP connection from %s; cannot write: %s",
+                                                                               rspamd_inet_address_to_string(tcp_session->addr),
+                                                                               strerror(errno));
+
+                               tcp_session_dtor(tcp_session);
+                       }
+                       else if (r == 0) {
+                               /* Fake EV_WRITE? */
+                               ev_io_start(loop, w);
+                       }
+                       else {
+                               /* Now we have to process all replies and check if we need to free them */
+                               DL_FOREACH_SAFE(tcp_session->replies_queue, rep, tmp)
+                               {
+                                       if (r >= (ssize_t) (sizeof(rep->rep) - rep->written)) {
+                                               /* We have written it completely */
+                                               r -= sizeof(rep->rep) - rep->written;
+                                               DL_DELETE(tcp_session->replies_queue, rep);
+                                               g_free(rep);
+                                       }
+                                       else {
+                                               /* Found incomplete buffer */
+                                               rep->written += r;
+                                               break;
+                                       }
+                               }
+
+                               if (tcp_session->replies_queue != NULL) {
+                                       /* No more replies */
+                                       ev_io_set(w, tcp_session->fd, EV_READ);
+                               }
+                               else {
+                                       /* Wait for another write readiness */
+                                       ev_io_set(w, tcp_session->fd, EV_WRITE | EV_READ);
+                               }
+
+                               ev_io_start(loop, w);
+                               /* Reset timer */
+                               ev_timer_again(loop, &tcp_session->tm);
+                       }
+               }
+       }
 }
 
 static void
@@ -2080,7 +2169,7 @@ accept_tcp_fuzzy_socket(EV_P_ ev_io *w, int revents)
        tcp_session->ctx = ctx;
        tcp_session->fd = nfd;
        ev_io_init(&tcp_session->io, tcp_fuzzy_socket_io, nfd, EV_READ);
-       ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, 0.0);
+       ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, ctx->tcp_timeout);
        tcp_session->tm.data = tcp_session;
        tcp_session->io.data = tcp_session;
        ev_timer_start(ctx->event_loop, &tcp_session->tm);