Browse Source

[Project] Implement write logic for TCP

undefined
Vsevolod Stakhov 1 month ago
parent
commit
aeeec77290
No account linked to committer's email address
1 changed files with 93 additions and 4 deletions
  1. 93
    4
      src/fuzzy_storage.c

+ 93
- 4
src/fuzzy_storage.c View File

@@ -226,10 +226,15 @@ enum fuzzy_cmd_type {

#define FUZZY_TCP_BUFFER_LENGTH 8192

struct rspamd_fuzzy_tcp_reply {
uint16_t size_hdr; /* We have to write this as well */
struct rspamd_fuzzy_encrypted_reply rep; /* Payload */
};

struct fuzzy_tcp_reply {
struct rspamd_fuzzy_encrypted_reply rep; /* Serialized reply */
unsigned int written; /* How many have we already written */
struct fuzzy_tcp_reply *prev, *next; /* Link */
struct rspamd_fuzzy_tcp_reply rep; /* Serialized reply */
unsigned int written; /* How many bytes have we already written */
struct fuzzy_tcp_reply *prev, *next; /* Link */
};

struct fuzzy_tcp_session {
@@ -2036,6 +2041,90 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;

msg_debug_fuzzy_storage("got io for %s: %d", rspamd_inet_address_to_string(tcp_session->addr), revents);

if (revents & EV_READ) {
ssize_t r;

r = read(tcp_session->fd, tcp_session->input_buf + tcp_session->bytes_unprocessed,
sizeof(tcp_session->input_buf) - tcp_session->bytes_unprocessed);

if (r == -1) {
/* Cannot read anything */
}
else if (r == 0) {
/* Got EOF */
}
else {
}
}
else if (revents & EV_WRITE) {
if (tcp_session->replies_queue) {
/* Try to write as many replies, as possible */
struct fuzzy_tcp_reply *rep, *tmp;
int n = 0;

DL_FOREACH(tcp_session->replies_queue, rep)
{
n++;
}

/* Allocate iovec */
struct iovec *iov = g_malloc(sizeof(struct iovec) * n);
n = 0;
DL_FOREACH(tcp_session->replies_queue, rep)
{
iov[n].iov_base = ((unsigned char *) &rep->rep) + rep->written;
iov[n].iov_len = sizeof(rep->rep) - rep->written;
n++;
}

/* Try to write everything */
ssize_t r = writev(tcp_session->fd, iov, n);

if (r == -1) {
/* Cannot write anything */
msg_debug_fuzzy_storage("failed TCP connection from %s; cannot write: %s",
rspamd_inet_address_to_string(tcp_session->addr),
strerror(errno));

tcp_session_dtor(tcp_session);
}
else if (r == 0) {
/* Fake EV_WRITE? */
ev_io_start(loop, w);
}
else {
/* Now we have to process all replies and check if we need to free them */
DL_FOREACH_SAFE(tcp_session->replies_queue, rep, tmp)
{
if (r >= (ssize_t) (sizeof(rep->rep) - rep->written)) {
/* We have written it completely */
r -= sizeof(rep->rep) - rep->written;
DL_DELETE(tcp_session->replies_queue, rep);
g_free(rep);
}
else {
/* Found incomplete buffer */
rep->written += r;
break;
}
}

if (tcp_session->replies_queue != NULL) {
/* No more replies */
ev_io_set(w, tcp_session->fd, EV_READ);
}
else {
/* Wait for another write readiness */
ev_io_set(w, tcp_session->fd, EV_WRITE | EV_READ);
}

ev_io_start(loop, w);
/* Reset timer */
ev_timer_again(loop, &tcp_session->tm);
}
}
}
}

static void
@@ -2080,7 +2169,7 @@ accept_tcp_fuzzy_socket(EV_P_ ev_io *w, int revents)
tcp_session->ctx = ctx;
tcp_session->fd = nfd;
ev_io_init(&tcp_session->io, tcp_fuzzy_socket_io, nfd, EV_READ);
ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, 0.0);
ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, ctx->tcp_timeout);
tcp_session->tm.data = tcp_session;
tcp_session->io.data = tcp_session;
ev_timer_start(ctx->event_loop, &tcp_session->tm);

Loading…
Cancel
Save