From 8dc18662846211219ba869e498c7da9ccaf87e65 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sat, 27 Apr 2024 15:42:20 +0100 Subject: [PATCH] [Project] Add server side IO state machine --- src/fuzzy_storage.c | 91 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 2 deletions(-) diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 32de3e1a8..dceadf731 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -2035,6 +2035,63 @@ tcp_session_dtor(struct fuzzy_tcp_session *tcp_session) g_free(tcp_session); } +static bool +fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_read) +{ + if (bytes_read <= 0) { + /* Apparent garbage */ + return false; + } + + static const unsigned state_mask = 0xC000; + + size_t buf_avail = tcp_session->bytes_unprocessed + bytes_read; + uint8_t *p = (uint8_t *) tcp_session->input_buf; + + while (buf_avail > 0) { + /* Run our state machine */ + unsigned int state = (tcp_session->cur_frame_state & state_mask) >> 14; + + if (state == 0) { + /* We have nothing to be read, process the next frame */ + tcp_session->cur_frame_state = (uint16_t) (*p) | 0x4000u; + p++; + buf_avail--; + } + else if (state == 1) { + /* We have read one byte of size and need to advance another one */ + tcp_session->cur_frame_state |= ((uint16_t) (*p << 8u)) | 0x8000u; + p++; + buf_avail--; + } + else { + size_t bytes_required = tcp_session->cur_frame_state & ~state_mask; + + if (buf_avail >= bytes_required) { + /* We can proces the next command */ + msg_debug_fuzzy_storage("can process next command: %d bytes available, %d bytes required", + (int) buf_avail, bytes_required); + + /* TODO: add this */ + buf_avail -= bytes_required; + p += bytes_required; + tcp_session->cur_frame_state = 0; + } + else { + /* Partial read, need more */ + msg_debug_fuzzy_storage("can not process next command: %d bytes available, %d bytes required (partial read)", + (int) buf_avail, bytes_required); + tcp_session->bytes_unprocessed = buf_avail; + memmove(tcp_session->input_buf, p, buf_avail); + + break; + } + } + } + + return true; +} + static void tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) { @@ -2050,11 +2107,37 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) if (r == -1) { /* Cannot read anything */ + msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: %s", + rspamd_inet_address_to_string(tcp_session->addr), + strerror(errno)); + + tcp_session_dtor(tcp_session); } else if (r == 0) { /* Got EOF */ + msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: EOF", + rspamd_inet_address_to_string(tcp_session->addr)); + + tcp_session_dtor(tcp_session); } else { + if (!fuzzy_tcp_process_input(tcp_session, r)) { + tcp_session_dtor(tcp_session); + } + else { + if (tcp_session->replies_queue != NULL) { + /* No more replies */ + ev_io_set(w, tcp_session->fd, EV_READ); + } + else { + /* Wait for another write readiness */ + ev_io_set(w, tcp_session->fd, EV_WRITE | EV_READ); + } + + ev_io_start(loop, w); + /* Reset timer */ + ev_timer_again(loop, &tcp_session->tm); + } } } else if (revents & EV_WRITE) { @@ -2068,8 +2151,8 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) n++; } - /* Allocate iovec */ - struct iovec *iov = g_malloc(sizeof(struct iovec) * n); + /* Allocate iovec (use stack for small number to avoid malloc calls for the trivial cases) */ + struct iovec *iov = n > 32 ? g_malloc(sizeof(struct iovec) * n) : g_alloca(sizeof(struct iovec) * n); n = 0; DL_FOREACH(tcp_session->replies_queue, rep) { @@ -2081,6 +2164,10 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) /* Try to write everything */ ssize_t r = writev(tcp_session->fd, iov, n); + if (n > 32) { + g_free(iov); + } + if (r == -1) { /* Cannot write anything */ msg_debug_fuzzy_storage("failed TCP connection from %s; cannot write: %s", -- 2.39.5