Browse Source

[Project] Add server side IO state machine

undefined
Vsevolod Stakhov 1 month ago
parent
commit
5404163efa
No account linked to committer's email address
1 changed files with 89 additions and 2 deletions
  1. 89
    2
      src/fuzzy_storage.c

+ 89
- 2
src/fuzzy_storage.c View File

@@ -2035,6 +2035,63 @@ tcp_session_dtor(struct fuzzy_tcp_session *tcp_session)
g_free(tcp_session);
}

static bool
fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_read)
{
if (bytes_read <= 0) {
/* Apparent garbage */
return false;
}

static const unsigned state_mask = 0xC000;

size_t buf_avail = tcp_session->bytes_unprocessed + bytes_read;
uint8_t *p = (uint8_t *) tcp_session->input_buf;

while (buf_avail > 0) {
/* Run our state machine */
unsigned int state = (tcp_session->cur_frame_state & state_mask) >> 14;

if (state == 0) {
/* We have nothing to be read, process the next frame */
tcp_session->cur_frame_state = (uint16_t) (*p) | 0x4000u;
p++;
buf_avail--;
}
else if (state == 1) {
/* We have read one byte of size and need to advance another one */
tcp_session->cur_frame_state |= ((uint16_t) (*p << 8u)) | 0x8000u;
p++;
buf_avail--;
}
else {
size_t bytes_required = tcp_session->cur_frame_state & ~state_mask;

if (buf_avail >= bytes_required) {
/* We can proces the next command */
msg_debug_fuzzy_storage("can process next command: %d bytes available, %d bytes required",
(int) buf_avail, bytes_required);

/* TODO: add this */
buf_avail -= bytes_required;
p += bytes_required;
tcp_session->cur_frame_state = 0;
}
else {
/* Partial read, need more */
msg_debug_fuzzy_storage("can not process next command: %d bytes available, %d bytes required (partial read)",
(int) buf_avail, bytes_required);
tcp_session->bytes_unprocessed = buf_avail;
memmove(tcp_session->input_buf, p, buf_avail);

break;
}
}
}

return true;
}

static void
tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
{
@@ -2050,11 +2107,37 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)

if (r == -1) {
/* Cannot read anything */
msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: %s",
rspamd_inet_address_to_string(tcp_session->addr),
strerror(errno));

tcp_session_dtor(tcp_session);
}
else if (r == 0) {
/* Got EOF */
msg_debug_fuzzy_storage("failed TCP connection from %s; cannot read: EOF",
rspamd_inet_address_to_string(tcp_session->addr));

tcp_session_dtor(tcp_session);
}
else {
if (!fuzzy_tcp_process_input(tcp_session, r)) {
tcp_session_dtor(tcp_session);
}
else {
if (tcp_session->replies_queue != NULL) {
/* No more replies */
ev_io_set(w, tcp_session->fd, EV_READ);
}
else {
/* Wait for another write readiness */
ev_io_set(w, tcp_session->fd, EV_WRITE | EV_READ);
}

ev_io_start(loop, w);
/* Reset timer */
ev_timer_again(loop, &tcp_session->tm);
}
}
}
else if (revents & EV_WRITE) {
@@ -2068,8 +2151,8 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
n++;
}

/* Allocate iovec */
struct iovec *iov = g_malloc(sizeof(struct iovec) * n);
/* Allocate iovec (use stack for small number to avoid malloc calls for the trivial cases) */
struct iovec *iov = n > 32 ? g_malloc(sizeof(struct iovec) * n) : g_alloca(sizeof(struct iovec) * n);
n = 0;
DL_FOREACH(tcp_session->replies_queue, rep)
{
@@ -2081,6 +2164,10 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
/* Try to write everything */
ssize_t r = writev(tcp_session->fd, iov, n);

if (n > 32) {
g_free(iov);
}

if (r == -1) {
/* Cannot write anything */
msg_debug_fuzzy_storage("failed TCP connection from %s; cannot write: %s",

Loading…
Cancel
Save