]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Add some basic bits
authorVsevolod Stakhov <vsevolod@rspamd.com>
Mon, 25 Mar 2024 18:18:18 +0000 (18:18 +0000)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 25 Jun 2024 13:27:53 +0000 (14:27 +0100)
src/fuzzy_storage.c

index ae4ce249c0efdc881cac671f89d553218d885bd2..529bc530f2b2f0f2adec038c68369401a56dc948 100644 (file)
@@ -57,6 +57,12 @@ static const char *local_db_name = "local";
 gpointer init_fuzzy(struct rspamd_config *cfg);
 void start_fuzzy(struct rspamd_worker *worker);
 
+#define msg_debug_fuzzy_storage(...) rspamd_conditional_debug_fast(NULL, NULL,                                         \
+                                                                                                                                  rspamd_fuzzy_storage_log_id, "fuzzy_storage", NULL, \
+                                                                                                                                  RSPAMD_LOG_FUNC,                                    \
+                                                                                                                                  __VA_ARGS__)
+INIT_LOG_MODULE(fuzzy_storage);
+
 worker_t fuzzy_worker = {
        "fuzzy",     /* Name */
        init_fuzzy,  /* Init function */
@@ -153,6 +159,7 @@ struct rspamd_fuzzy_storage_ctx {
        double expire;
        double sync_timeout;
        double delay;
+       double tcp_timeout;
        struct rspamd_radix_map_helper *update_ips;
        struct rspamd_hash_map_helper *update_keys;
        struct rspamd_radix_map_helper *blocked_ips;
@@ -217,6 +224,36 @@ enum fuzzy_cmd_type {
        CMD_ENCRYPTED_SHINGLE
 };
 
+#define FUZZY_TCP_BUFFER_LENGTH 8192
+
+struct fuzzy_tcp_reply {
+       struct rspamd_fuzzy_encrypted_reply rep; /* Serialized reply */
+       unsigned int written;                    /* How many have we already written */
+       struct fuzzy_tcp_reply *prev, *next;     /* Link */
+};
+
+struct fuzzy_tcp_session {
+       struct rspamd_worker *worker;
+       rspamd_inet_addr_t *addr;
+       struct rspamd_fuzzy_storage_ctx *ctx;
+       int fd;
+       struct ev_io io;
+       struct ev_timer tm;
+
+       /*
+        * We store the state in the current frame
+        * 0 0 x x x x x x x x x x x x x x x - initial
+        * 1 0 x x x x x x x x x x x x x x x - read 1 byte of lenght
+        * 1 1 x x x x x x x x x x x x x x x - read 2 bytes of length
+        * So the length is always cur_frame & 0x3fff
+        */
+       uint16_t cur_frame_state;
+       unsigned int bytes_unprocessed;
+
+       struct fuzzy_tcp_reply *replies_queue;
+       unsigned char input_buf[FUZZY_TCP_BUFFER_LENGTH];
+};
+
 struct fuzzy_session {
        struct rspamd_worker *worker;
        rspamd_inet_addr_t *addr;
@@ -1971,12 +2008,52 @@ union sa_union {
        struct sockaddr_storage ss;
 };
 
+static void
+tcp_session_dtor(struct fuzzy_tcp_session *tcp_session)
+{
+       struct fuzzy_tcp_reply *rep;
+       if (tcp_session->addr) {
+               rspamd_inet_address_free(tcp_session->addr);
+       }
+
+       if (tcp_session->ctx->event_loop) {
+               ev_timer_stop(tcp_session->ctx->event_loop, &tcp_session->tm);
+               ev_io_stop(tcp_session->ctx->event_loop, &tcp_session->io);
+       }
+
+       DL_FOREACH(tcp_session->replies_queue, rep)
+       {
+               g_free(rep);
+       }
+
+       close(tcp_session->fd);
+       g_free(tcp_session);
+}
+
+static void
+tcp_fuzzy_socket_io(EV_P_ ev_io *w, int revents)
+{
+       struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;
+
+       msg_debug_fuzzy_storage("got io for %s: %d", rspamd_inet_address_to_string(tcp_session->addr), revents);
+}
+
+static void
+tcp_fuzzy_socket_timeout(EV_P_ ev_timer *w, int revents)
+{
+       struct fuzzy_tcp_session *tcp_session = (struct fuzzy_tcp_session *) w->data;
+
+       msg_debug_fuzzy_storage("timed out TCP connection from %s", rspamd_inet_address_to_string(tcp_session->addr));
+
+       tcp_session_dtor(tcp_session);
+}
+
 static void
 accept_tcp_fuzzy_socket(EV_P_ ev_io *w, int revents)
 {
        struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
        struct rspamd_fuzzy_storage_ctx *ctx;
-       struct fuzzy_session *session;
+       struct fuzzy_tcp_session *tcp_session;
 
        ctx = (struct rspamd_fuzzy_storage_ctx *) worker->ctx;
 
@@ -1997,6 +2074,19 @@ accept_tcp_fuzzy_socket(EV_P_ ev_io *w, int revents)
 
                return;
        }
+
+       tcp_session = g_malloc0(sizeof(*tcp_session));
+       tcp_session->addr = addr;
+       tcp_session->ctx = ctx;
+       tcp_session->fd = nfd;
+       ev_io_init(&tcp_session->io, tcp_fuzzy_socket_io, nfd, EV_READ);
+       ev_timer_init(&tcp_session->tm, tcp_fuzzy_socket_timeout, ctx->tcp_timeout, 0.0);
+       tcp_session->tm.data = tcp_session;
+       tcp_session->io.data = tcp_session;
+       ev_timer_start(ctx->event_loop, &tcp_session->tm);
+       ev_io_start(ctx->event_loop, &tcp_session->io);
+
+       msg_debug_fuzzy_storage("accepted TCP connection from %s", rspamd_inet_address_to_string(addr));
 }
 
 
@@ -2953,6 +3043,7 @@ init_fuzzy(struct rspamd_config *cfg)
        ctx->leaky_bucket_burst = NAN;
        ctx->leaky_bucket_rate = NAN;
        ctx->delay = NAN;
+       ctx->tcp_timeout = 5.0;
        ctx->default_forbidden_ids = kh_init(fuzzy_key_ids_set);
        ctx->weak_ids = kh_init(fuzzy_key_ids_set);
 
@@ -2976,6 +3067,16 @@ init_fuzzy(struct rspamd_config *cfg)
                                                                          RSPAMD_CL_FLAG_TIME_FLOAT,
                                                                          "Default expire time for hashes, default: " G_STRINGIFY(DEFAULT_EXPIRE) " seconds");
 
+       rspamd_rcl_register_worker_option(cfg,
+                                                                         type,
+                                                                         "tcp_timeout",
+                                                                         rspamd_rcl_parse_struct_time,
+                                                                         ctx,
+                                                                         G_STRUCT_OFFSET(struct rspamd_fuzzy_storage_ctx,
+                                                                                                         tcp_timeout),
+                                                                         RSPAMD_CL_FLAG_TIME_FLOAT,
+                                                                         "Default tcp timeout: " G_STRINGIFY(5.0) " seconds");
+
        rspamd_rcl_register_worker_option(cfg,
                                                                          type,
                                                                          "delay",