From c61a3c1ccf6957cf20bfca9d36ec81366b0922b0 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sat, 4 May 2024 15:58:36 +0100 Subject: [PATCH] [Project] Further tcp logic implementation --- src/fuzzy_storage.c | 249 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 217 insertions(+), 32 deletions(-) diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 0ecac635b..bf0687bfa 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -226,15 +226,15 @@ enum fuzzy_cmd_type { #define FUZZY_TCP_BUFFER_LENGTH 8192 -struct rspamd_fuzzy_tcp_reply { - uint16_t size_hdr; /* We have to write this as well */ - struct rspamd_fuzzy_encrypted_reply rep; /* Payload */ +struct rspamd_fuzzy_tcp_frame { + uint16_t size_hdr; /* We have to write this as well */ + struct rspamd_fuzzy_encrypted_reply payload; /* Payload */ }; -struct fuzzy_tcp_reply { - struct rspamd_fuzzy_tcp_reply rep; /* Serialized reply */ - unsigned int written; /* How many bytes have we already written */ - struct fuzzy_tcp_reply *prev, *next; /* Link */ +struct fuzzy_tcp_reply_queue_elt { + struct rspamd_fuzzy_tcp_frame rep; /* Serialized reply */ + unsigned int written; /* How many bytes have we already written */ + struct fuzzy_tcp_reply_queue_elt *prev, *next; /* Link */ }; struct fuzzy_common_session { @@ -271,7 +271,7 @@ struct fuzzy_tcp_session { struct fuzzy_common_session common; ref_entry_t ref; - struct fuzzy_tcp_reply *replies_queue; + struct fuzzy_tcp_reply_queue_elt *replies_queue; unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH]; }; @@ -296,6 +296,7 @@ struct rspamd_updates_cbdata { static void rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session); +static bool rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session, struct fuzzy_tcp_reply_queue_elt *reply); static gboolean rspamd_fuzzy_process_updates_queue(struct rspamd_fuzzy_storage_ctx *ctx, const char *source, gboolean final); static gboolean rspamd_fuzzy_check_client(struct rspamd_fuzzy_storage_ctx *ctx, @@ -305,6 +306,7 @@ static void rspamd_fuzzy_maybe_call_blacklisted(struct rspamd_fuzzy_storage_ctx const char *reason); static struct fuzzy_key *fuzzy_add_keypair_from_ucl(const ucl_object_t *obj, khash_t(rspamd_fuzzy_keys_hash) * target); +static void rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents); struct fuzzy_keymap_ucl_buf { rspamd_fstring_t *buf; @@ -934,6 +936,48 @@ rspamd_fuzzy_udp_write_reply(struct fuzzy_udp_session *session) } } +static bool +rspamd_fuzzy_tcp_write_reply(struct fuzzy_tcp_session *session, struct fuzzy_tcp_reply_queue_elt *reply) +{ + ssize_t r; + gconstpointer data = &reply->rep; + size_t len = reply->rep.size_hdr + sizeof(reply->rep.size_hdr); + + r = write(session->common.fd, data, len); + + if (r == -1) { + if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { + /* Attach this reply to the list of the replies pending */ + DL_APPEND(session->replies_queue, reply); + /* Grab reference to avoid early destruction */ + REF_RETAIN(session); + ev_io_init(&session->common.io, + rspamd_fuzzy_tcp_io, session->common.fd, EV_WRITE | EV_READ); + ev_io_start(session->common.ctx->event_loop, &session->common.io); + } + else { + msg_err("error while writing reply: %s", strerror(errno)); + return false; + } + } + else if ((size_t) r < len) { + reply->written = r; + DL_APPEND(session->replies_queue, reply); + /* Grab reference to avoid early destruction */ + REF_RETAIN(session); + /* Partial write */ + ev_io_init(&session->common.io, + rspamd_fuzzy_tcp_io, session->common.fd, EV_WRITE); + ev_io_start(session->common.ctx->event_loop, &session->common.io); + } + else { + /* Full write, no need to preserve queue elt */ + g_free(reply); + } + + return true; +} + static void rspamd_fuzzy_update_key_stat(gboolean matched, struct fuzzy_key_stat *key_stat, @@ -1153,8 +1197,86 @@ rspamd_fuzzy_make_udp_reply(struct rspamd_fuzzy_cmd *cmd, rspamd_fuzzy_udp_write_reply(session); } +static struct fuzzy_tcp_reply_queue_elt * +rspamd_fuzzy_make_tcp_reply(struct rspamd_fuzzy_cmd *cmd, + struct rspamd_fuzzy_reply *result, + struct fuzzy_tcp_session *session, + int flags) +{ + gsize len; + struct fuzzy_tcp_reply_queue_elt *queue_elt; + + if (cmd) { + queue_elt = g_malloc0(sizeof(*queue_elt)); + + struct rspamd_fuzzy_tcp_frame *reply = &queue_elt->rep; + + result->v1.tag = cmd->tag; + memcpy(&reply->payload.rep, result, sizeof(*result)); + + if (flags & RSPAMD_FUZZY_REPLY_DELAY) { + /* Hash is too fresh, need to delay it */ + reply->payload.rep.ts = 0; + reply->payload.rep.v1.prob = 0.0f; + reply->payload.rep.v1.value = 0; + } + else if (!rspamd_fuzzy_can_reply(&session->common, flags & RSPAMD_FUZZY_REPLY_ENCRYPTED, + reply->payload.rep.v1.flag, reply->payload.rep.v1.prob)) { + /* Hash is from a forbidden flag */ + reply->payload.rep.ts = 0; + reply->payload.rep.v1.prob = 0.0f; + reply->payload.rep.v1.value = 0; + reply->payload.rep.v1.flag = 0; + } + else { + /* Update stats before encryption */ + if (cmd->cmd != FUZZY_STAT && cmd->cmd <= FUZZY_CLIENT_MAX) { + rspamd_fuzzy_update_stats(session->common.ctx, + session->common.epoch, + reply->payload.rep.v1.prob > 0.5f, + flags & RSPAMD_FUZZY_REPLY_SHINGLE, + flags & RSPAMD_FUZZY_REPLY_DELAY, + session->common.key, + session->common.ip_stat, + cmd->cmd, + &reply->payload.rep, + session->common.timestamp); + } + } + + if (flags & RSPAMD_FUZZY_REPLY_ENCRYPTED) { + /* We need also to encrypt reply */ + ottery_rand_bytes(reply->payload.hdr.nonce, + sizeof(reply->payload.hdr.nonce)); + + /* + * For old replies we need to encrypt just old part, otherwise + * decryption would fail due to mac verification mistake + */ + + len = sizeof(reply->payload); + rspamd_cryptobox_encrypt_nm_inplace((unsigned char *) &reply->payload, + len, + reply->payload.hdr.nonce, + session->common.nm, + reply->payload.hdr.mac, + RSPAMD_CRYPTOBOX_MODE_25519); + } + else { + len = sizeof(reply->payload.rep); + } + + reply->size_hdr = len; + + return queue_elt; + } + else { + return NULL; + } +} + static gboolean -fuzzy_peer_try_send(int fd, struct fuzzy_peer_request *up_req) +rspamd_fuzzy_peer_try_send(int fd, struct fuzzy_peer_request *up_req) { gssize r; @@ -1168,11 +1290,11 @@ fuzzy_peer_try_send(int fd, struct fuzzy_peer_request *up_req) } static void -fuzzy_peer_send_io(EV_P_ ev_io *w, int revents) +rspamd_fuzzy_peer_send_io(EV_P_ ev_io *w, int revents) { struct fuzzy_peer_request *up_req = (struct fuzzy_peer_request *) w->data; - if (!fuzzy_peer_try_send(w->fd, up_req)) { + if (!rspamd_fuzzy_peer_try_send(w->fd, up_req)) { msg_err("cannot send update request to the peer: %s", strerror(errno)); } @@ -1365,9 +1487,9 @@ rspamd_fuzzy_fill_reply(struct rspamd_fuzzy_reply *result, struct fuzzy_common_s sizeof(up_req->cmd.cmd.shingle.sgl)); } - if (!fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) { + if (!rspamd_fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) { up_req->io_ev.data = up_req; - ev_io_init(&up_req->io_ev, fuzzy_peer_send_io, + ev_io_init(&up_req->io_ev, rspamd_fuzzy_peer_send_io, session->ctx->peer_fd, EV_WRITE); ev_io_start(session->ctx->event_loop, &up_req->io_ev); } @@ -1398,7 +1520,24 @@ rspamd_fuzzy_tcp_check_callback(struct rspamd_fuzzy_reply *result, void *ud) int send_flags = 0; struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_fill_reply(result, &session->common, &send_flags); - /* TODO write reply impl */ + + if (cmd) { + + struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, result, session, send_flags); + + if (reply) { + if (!rspamd_fuzzy_tcp_write_reply(session, reply)) { + /* Enforced deletion */ + REF_RELEASE(session); + } + } + } + else { + /* Enforced deletion */ + REF_RELEASE(session); + } + + /* Free our refcount */ REF_RELEASE(session); } @@ -1645,9 +1784,9 @@ rspamd_fuzzy_process_udp_session(struct fuzzy_udp_session *session) ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal; memcpy(ptr, cmd, up_len); - if (!fuzzy_peer_try_send(session->common.ctx->peer_fd, up_req)) { + if (!rspamd_fuzzy_peer_try_send(session->common.ctx->peer_fd, up_req)) { up_req->io_ev.data = up_req; - ev_io_init(&up_req->io_ev, fuzzy_peer_send_io, + ev_io_init(&up_req->io_ev, rspamd_fuzzy_peer_send_io, session->common.ctx->peer_fd, EV_WRITE); ev_io_start(session->common.ctx->event_loop, &up_req->io_ev); } @@ -2078,7 +2217,7 @@ union sa_union { static void tcp_session_dtor(struct fuzzy_tcp_session *tcp_session) { - struct fuzzy_tcp_reply *rep; + struct fuzzy_tcp_reply_queue_elt *rep; fuzzy_common_session_dtor(&tcp_session->common); @@ -2146,8 +2285,13 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c struct rspamd_fuzzy_cmd *cmd = rspamd_fuzzy_prepare_cmd(&tcp_session->common, &result, &send_flags, &up_len, &final); if (G_UNLIKELY(cmd == NULL || final)) { + struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags); - /* TODO: write reply */ + if (reply) { + rspamd_fuzzy_tcp_write_reply(tcp_session, reply); + } + + REF_RELEASE(tcp_session); return true; } @@ -2194,7 +2338,14 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c result.v1.value = 403; result.v1.prob = 0.0f; result.v1.flag = 0; - /* TODO: write reply */ + + struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags); + + if (reply) { + rspamd_fuzzy_tcp_write_reply(tcp_session, reply); + } + + REF_RELEASE(tcp_session); return false; } @@ -2205,14 +2356,35 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c /* Store high qword in value and low qword in flag */ result.v1.value = (int32_t) ((uint64_t) tcp_session->common.ctx->stat.fuzzy_hashes >> 32); result.v1.flag = (uint32_t) (tcp_session->common.ctx->stat.fuzzy_hashes & G_MAXUINT32); - /* TODO: write reply */ + + struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags); + + if (reply) { + if (!rspamd_fuzzy_tcp_write_reply(tcp_session, reply)) { + REF_RELEASE(tcp_session); + } + } + else { + REF_RELEASE(tcp_session); + } } else if (cmd->cmd == FUZZY_PING) { result.v1.prob = 1.0f; result.v1.value = cmd->value; - /* TODO: write reply */ + struct fuzzy_tcp_reply_queue_elt *reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags); + + if (reply) { + if (!rspamd_fuzzy_tcp_write_reply(tcp_session, reply)) { + REF_RELEASE(tcp_session); + } + } + else { + REF_RELEASE(tcp_session); + } } else { + struct fuzzy_tcp_reply_queue_elt *reply; + if (rspamd_fuzzy_check_write(tcp_session->common.ctx, tcp_session->common.addr, tcp_session->common.key)) { /* Check whitelist */ if (tcp_session->common.ctx->skip_hashes && cmd->cmd == FUZZY_WRITE) { @@ -2249,9 +2421,9 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal; memcpy(ptr, cmd, up_len); - if (!fuzzy_peer_try_send(tcp_session->common.ctx->peer_fd, up_req)) { + if (!rspamd_fuzzy_peer_try_send(tcp_session->common.ctx->peer_fd, up_req)) { up_req->io_ev.data = up_req; - ev_io_init(&up_req->io_ev, fuzzy_peer_send_io, + ev_io_init(&up_req->io_ev, rspamd_fuzzy_peer_send_io, tcp_session->common.ctx->peer_fd, EV_WRITE); ev_io_start(tcp_session->common.ctx->event_loop, &up_req->io_ev); } @@ -2268,14 +2440,23 @@ rspamd_fuzzy_process_tcp_frame(struct fuzzy_tcp_session *tcp_session, unsigned c result.v1.prob = 0.0f; } reply: - /* TODO: write reply */ + reply = rspamd_fuzzy_make_tcp_reply(cmd, &result, tcp_session, send_flags); + + if (reply) { + if (!rspamd_fuzzy_tcp_write_reply(tcp_session, reply)) { + REF_RELEASE(tcp_session); + } + } + else { + REF_RELEASE(tcp_session); + } } return true; } static bool -fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_read) +rspamd_fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_read) { if (bytes_read <= 0) { /* Apparent garbage */ @@ -2311,7 +2492,11 @@ fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_rea msg_debug_fuzzy_storage("can process next command: %d bytes available, %d bytes required", (int) buf_avail, bytes_required); - /* TODO: add this */ + if (!rspamd_fuzzy_process_tcp_frame(tcp_session, p, bytes_required)) { + /* Error processing, give up */ + return false; + } + buf_avail -= bytes_required; p += bytes_required; tcp_session->cur_frame_state = 0; @@ -2332,7 +2517,7 @@ fuzzy_tcp_process_input(struct fuzzy_tcp_session *tcp_session, ssize_t bytes_rea } static void -tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) +rspamd_fuzzy_tcp_io(EV_P_ ev_io *w, int revents) { struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data; @@ -2360,7 +2545,7 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) REF_RELEASE(tcp_session); } else { - if (!fuzzy_tcp_process_input(tcp_session, r)) { + if (!rspamd_fuzzy_tcp_process_input(tcp_session, r)) { REF_RELEASE(tcp_session); } else { @@ -2382,7 +2567,7 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) else if (revents & EV_WRITE) { if (tcp_session->replies_queue) { /* Try to write as many replies, as possible */ - struct fuzzy_tcp_reply *rep, *tmp; + struct fuzzy_tcp_reply_queue_elt *rep, *tmp; int n = 0; DL_FOREACH(tcp_session->replies_queue, rep) @@ -2454,7 +2639,7 @@ tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) } static void -tcp_fuzzy_socket_timeout(EV_P_ ev_timer *w, int revents) +rspamd_fuzzy_tcp_timeout(EV_P_ ev_timer *w, int revents) { struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data; @@ -2497,8 +2682,8 @@ accept_tcp_fuzzy_socket(EV_P_ ev_io *w, int revents) tcp_session->common.worker = worker; tcp_session->common.timestamp = ev_now(ctx->event_loop); REF_INIT_RETAIN(tcp_session, tcp_session_dtor); - ev_io_init(&tcp_session->common.io, tcp_fuzzy_socket_io, nfd, EV_READ); - ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, ctx->tcp_timeout); + ev_io_init(&tcp_session->common.io, rspamd_fuzzy_tcp_io, nfd, EV_READ); + ev_timer_init(&tcp_session->tm, rspamd_fuzzy_tcp_timeout, ctx->tcp_timeout, ctx->tcp_timeout); tcp_session->tm.data = tcp_session; tcp_session->common.io.data = tcp_session; ev_timer_start(ctx->event_loop, &tcp_session->tm); -- 2.39.5