]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Add server side IO state machine
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 27 Apr 2024 14:42:20 +0000 (15:42 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 25 Jun 2024 13:27:54 +0000 (14:27 +0100)
src/fuzzy_storage.c

index 32de3e1a8ad699c0511f80fbcc1b6b42c3e81ec9..dceadf731404345d71911546a8c50bac30bc4b24 100644 (file)
@@ -2035,6 +2035,63 @@ tcp_session_dtor(struct fuzzy_tcp_session *tcp_session)
        g_free(tcp_session);
 }
 
+static bool
+fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_read)
+{
+       if (bytes_read <= 0) {
+               /* Apparent garbage */
+               return false;
+       }
+
+       static const unsigned state_mask = 0xC000;
+
+       size_t buf_avail = tcp_session->bytes_unprocessed + bytes_read;
+       uint8_t *p = (uint8_t *) tcp_session->input_buf;
+
+       while (buf_avail > 0) {
+               /* Run our state machine */
+               unsigned int state = (tcp_session->cur_frame_state & state_mask) >> 14;
+
+               if (state == 0) {
+                       /* We have nothing to be read, process the next frame */
+                       tcp_session->cur_frame_state = (uint16_t) (*p) | 0x4000u;
+                       p++;
+                       buf_avail--;
+               }
+               else if (state == 1) {
+                       /* We have read one byte of size and need to advance another one */
+                       tcp_session->cur_frame_state |= ((uint16_t) (*p << 8u)) | 0x8000u;
+                       p++;
+                       buf_avail--;
+               }
+               else {
+                       size_t bytes_required = tcp_session->cur_frame_state & ~state_mask;
+
+                       if (buf_avail >= bytes_required) {
+                               /* We can proces the next command */
+                               msg_debug_fuzzy_storage("can process next command: %d bytes available, %d bytes required",
+                                                                               (int) buf_avail, bytes_required);
+
+                               /* TODO: add this */
+                               buf_avail -= bytes_required;
+                               p += bytes_required;
+                               tcp_session->cur_frame_state = 0;
+                       }
+                       else {
+                               /* Partial read, need more */
+                               msg_debug_fuzzy_storage("can not process next command: %d bytes available, %d bytes required (partial read)",
+                                                                               (int) buf_avail, bytes_required);
+                               tcp_session->bytes_unprocessed = buf_avail;
+                               memmove(tcp_session->input_buf, p, buf_avail);
+
+                               break;
+                       }
+               }
+       }
+
+       return true;
+}
+
 static void
 tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
 {
@@ -2050,11 +2107,37 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
 
                if (r == -1) {
                        /* Cannot read anything */
+                       msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: %s",
+                                                                       rspamd_inet_address_to_string(tcp_session->addr),
+                                                                       strerror(errno));
+
+                       tcp_session_dtor(tcp_session);
                }
                else if (r == 0) {
                        /* Got EOF */
+                       msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: EOF",
+                                                                       rspamd_inet_address_to_string(tcp_session->addr));
+
+                       tcp_session_dtor(tcp_session);
                }
                else {
+                       if (!fuzzy_tcp_process_input(tcp_session, r)) {
+                               tcp_session_dtor(tcp_session);
+                       }
+                       else {
+                               if (tcp_session->replies_queue != NULL) {
+                                       /* No more replies */
+                                       ev_io_set(w, tcp_session->fd, EV_READ);
+                               }
+                               else {
+                                       /* Wait for another write readiness */
+                                       ev_io_set(w, tcp_session->fd, EV_WRITE | EV_READ);
+                               }
+
+                               ev_io_start(loop, w);
+                               /* Reset timer */
+                               ev_timer_again(loop, &tcp_session->tm);
+                       }
                }
        }
        else if (revents & EV_WRITE) {
@@ -2068,8 +2151,8 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
                                n++;
                        }
 
-                       /* Allocate iovec */
-                       struct iovec *iov = g_malloc(sizeof(struct iovec) * n);
+                       /* Allocate iovec (use stack for small number to avoid malloc calls for the trivial cases) */
+                       struct iovec *iov = n > 32 ? g_malloc(sizeof(struct iovec) * n) : g_alloca(sizeof(struct iovec) * n);
                        n = 0;
                        DL_FOREACH(tcp_session->replies_queue, rep)
                        {
@@ -2081,6 +2164,10 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
                        /* Try to write everything */
                        ssize_t r = writev(tcp_session->fd, iov, n);
 
+                       if (n > 32) {
+                               g_free(iov);
+                       }
+
                        if (r == -1) {
                                /* Cannot write anything */
                                msg_debug_fuzzy_storage("failed TCP connection from %s; cannot write: %s",