From f66f0e8f23aa1005f57461ddcbf2bce93a8e3281 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 25 Mar 2024 18:18:18 +0000 Subject: [PATCH] [Project] Add some basic bits --- src/fuzzy_storage.c | 103 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 102 insertions(+), 1 deletion(-) diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index ae4ce249c..529bc530f 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -57,6 +57,12 @@ static const char *local_db_name = "local"; gpointer init_fuzzy(struct rspamd_config *cfg); void start_fuzzy(struct rspamd_worker *worker); +#define msg_debug_fuzzy_storage(...) rspamd_conditional_debug_fast(NULL, NULL, \ + rspamd_fuzzy_storage_log_id, "fuzzy_storage", NULL, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +INIT_LOG_MODULE(fuzzy_storage); + worker_t fuzzy_worker = { "fuzzy", /* Name */ init_fuzzy, /* Init function */ @@ -153,6 +159,7 @@ struct rspamd_fuzzy_storage_ctx { double expire; double sync_timeout; double delay; + double tcp_timeout; struct rspamd_radix_map_helper *update_ips; struct rspamd_hash_map_helper *update_keys; struct rspamd_radix_map_helper *blocked_ips; @@ -217,6 +224,36 @@ enum fuzzy_cmd_type { CMD_ENCRYPTED_SHINGLE }; +#define FUZZY_TCP_BUFFER_LENGTH 8192 + +struct fuzzy_tcp_reply { + struct rspamd_fuzzy_encrypted_reply rep; /* Serialized reply */ + unsigned int written; /* How many have we already written */ + struct fuzzy_tcp_reply *prev, *next; /* Link */ +}; + +struct fuzzy_tcp_session { + struct rspamd_worker *worker; + rspamd_inet_addr_t *addr; + struct rspamd_fuzzy_storage_ctx *ctx; + int fd; + struct ev_io io; + struct ev_timer tm; + + /* + * We store the state in the current frame + * 0 0 x x x x x x x x x x x x x x x - initial + * 1 0 x x x x x x x x x x x x x x x - read 1 byte of lenght + * 1 1 x x x x x x x x x x x x x x x - read 2 bytes of length + * So the length is always cur_frame & 0x3fff + */ + uint16_t cur_frame_state; + unsigned int bytes_unprocessed; + + struct fuzzy_tcp_reply *replies_queue; + unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH]; +}; + struct fuzzy_session { struct rspamd_worker *worker; rspamd_inet_addr_t *addr; @@ -1971,12 +2008,52 @@ union sa_union { struct sockaddr_storage ss; }; +static void +tcp_session_dtor(struct fuzzy_tcp_session *tcp_session) +{ + struct fuzzy_tcp_reply *rep; + if (tcp_session->addr) { + rspamd_inet_address_free(tcp_session->addr); + } + + if (tcp_session->ctx->event_loop) { + ev_timer_stop(tcp_session->ctx->event_loop, &tcp_session->tm); + ev_io_stop(tcp_session->ctx->event_loop, &tcp_session->io); + } + + DL_FOREACH(tcp_session->replies_queue, rep) + { + g_free(rep); + } + + close(tcp_session->fd); + g_free(tcp_session); +} + +static void +tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents) +{ + struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data; + + msg_debug_fuzzy_storage("got io for %s: %d", rspamd_inet_address_to_string(tcp_session->addr), revents); +} + +static void +tcp_fuzzy_socket_timeout(EV_P_ ev_timer *w, int revents) +{ + struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data; + + msg_debug_fuzzy_storage("timed out TCP connection from %s", rspamd_inet_address_to_string(tcp_session->addr)); + + tcp_session_dtor(tcp_session); +} + static void accept_tcp_fuzzy_socket(EV_P_ ev_io *w, int revents) { struct rspamd_worker *worker = (struct rspamd_worker *) w->data; struct rspamd_fuzzy_storage_ctx *ctx; - struct fuzzy_session *session; + struct fuzzy_tcp_session *tcp_session; ctx = (struct rspamd_fuzzy_storage_ctx *) worker->ctx; @@ -1997,6 +2074,19 @@ accept_tcp_fuzzy_socket(EV_P_ ev_io *w, int revents) return; } + + tcp_session = g_malloc0(sizeof(*tcp_session)); + tcp_session->addr = addr; + tcp_session->ctx = ctx; + tcp_session->fd = nfd; + ev_io_init(&tcp_session->io, tcp_fuzzy_socket_io, nfd, EV_READ); + ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, 0.0); + tcp_session->tm.data = tcp_session; + tcp_session->io.data = tcp_session; + ev_timer_start(ctx->event_loop, &tcp_session->tm); + ev_io_start(ctx->event_loop, &tcp_session->io); + + msg_debug_fuzzy_storage("accepted TCP connection from %s", rspamd_inet_address_to_string(addr)); } @@ -2953,6 +3043,7 @@ init_fuzzy(struct rspamd_config *cfg) ctx->leaky_bucket_burst = NAN; ctx->leaky_bucket_rate = NAN; ctx->delay = NAN; + ctx->tcp_timeout = 5.0; ctx->default_forbidden_ids = kh_init(fuzzy_key_ids_set); ctx->weak_ids = kh_init(fuzzy_key_ids_set); @@ -2976,6 +3067,16 @@ init_fuzzy(struct rspamd_config *cfg) RSPAMD_CL_FLAG_TIME_FLOAT, "Default expire time for hashes, default: " G_STRINGIFY(DEFAULT_EXPIRE) " seconds"); + rspamd_rcl_register_worker_option(cfg, + type, + "tcp_timeout", + rspamd_rcl_parse_struct_time, + ctx, + G_STRUCT_OFFSET(struct rspamd_fuzzy_storage_ctx, + tcp_timeout), + RSPAMD_CL_FLAG_TIME_FLOAT, + "Default tcp timeout: " G_STRINGIFY(5.0) " seconds"); + rspamd_rcl_register_worker_option(cfg, type, "delay", -- 2.39.5