From: Vsevolod Stakhov Date: Thu, 25 Apr 2024 15:22:38 +0000 (+0100) Subject: [Project] Implement write logic for TCP X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=21fcdefda7dfae799e222a5fce4f788101b2a307;p=rspamd.git [Project] Implement write logic for TCP --- diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 529bc530f..32de3e1a8 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -226,10 +226,15 @@ enum fuzzy_cmd_type { #define FUZZY_TCP_BUFFER_LENGTH 8192 +struct rspamd_fuzzy_tcp_reply { + uint16_t size_hdr; /* We have to write this as well */ + struct rspamd_fuzzy_encrypted_reply rep; /* Payload */ +}; + struct fuzzy_tcp_reply { - struct rspamd_fuzzy_encrypted_reply rep; /* Serialized reply */ - unsigned int written; /* How many have we already written */ - struct fuzzy_tcp_reply *prev, *next; /* Link */ + struct rspamd_fuzzy_tcp_reply rep; /* Serialized reply */ + unsigned int written; /* How many bytes have we already written */ + struct fuzzy_tcp_reply *prev, *next; /* Link */ }; struct fuzzy_tcp_session { @@ -2036,6 +2041,90 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data; msg_debug_fuzzy_storage("got io for %s: %d", rspamd_inet_address_to_string(tcp_session->addr), revents); + + if (revents & EV_READ) { + ssize_t r; + + r = read(tcp_session->fd, tcp_session->input_buf + tcp_session->bytes_unprocessed, + sizeof(tcp_session->input_buf) - tcp_session->bytes_unprocessed); + + if (r == -1) { + /* Cannot read anything */ + } + else if (r == 0) { + /* Got EOF */ + } + else { + } + } + else if (revents & EV_WRITE) { + if (tcp_session->replies_queue) { + /* Try to write as many replies, as possible */ + struct fuzzy_tcp_reply *rep, *tmp; + int n = 0; + + DL_FOREACH(tcp_session->replies_queue, rep) + { + n++; + } + + /* Allocate iovec */ + struct iovec *iov = g_malloc(sizeof(struct iovec) * n); + n = 0; + DL_FOREACH(tcp_session->replies_queue, rep) + { + iov[n].iov_base = ((unsigned char *) &rep->rep) + rep->written; + iov[n].iov_len = sizeof(rep->rep) - rep->written; + n++; + } + + /* Try to write everything */ + ssize_t r = writev(tcp_session->fd, iov, n); + + if (r == -1) { + /* Cannot write anything */ + msg_debug_fuzzy_storage("failed TCP connection from %s; cannot write: %s", + rspamd_inet_address_to_string(tcp_session->addr), + strerror(errno)); + + tcp_session_dtor(tcp_session); + } + else if (r == 0) { + /* Fake EV_WRITE? */ + ev_io_start(loop, w); + } + else { + /* Now we have to process all replies and check if we need to free them */ + DL_FOREACH_SAFE(tcp_session->replies_queue, rep, tmp) + { + if (r >= (ssize_t) (sizeof(rep->rep) - rep->written)) { + /* We have written it completely */ + r -= sizeof(rep->rep) - rep->written; + DL_DELETE(tcp_session->replies_queue, rep); + g_free(rep); + } + else { + /* Found incomplete buffer */ + rep->written += r; + break; + } + } + + if (tcp_session->replies_queue != NULL) { + /* No more replies */ + ev_io_set(w, tcp_session->fd, EV_READ); + } + else { + /* Wait for another write readiness */ + ev_io_set(w, tcp_session->fd, EV_WRITE | EV_READ); + } + + ev_io_start(loop, w); + /* Reset timer */ + ev_timer_again(loop, &tcp_session->tm); + } + } + } } static void @@ -2080,7 +2169,7 @@ accept_tcp_fuzzy_socket(EV_P_ ev_io *w, int revents) tcp_session->ctx = ctx; tcp_session->fd = nfd; ev_io_init(&tcp_session->io, tcp_fuzzy_socket_io, nfd, EV_READ); - ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, 0.0); + ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, ctx->tcp_timeout); tcp_session->tm.data = tcp_session; tcp_session->io.data = tcp_session; ev_timer_start(ctx->event_loop, &tcp_session->tm);