]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Preliminary implementation of fuzzy master/slave updates
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 14 May 2016 12:38:10 +0000 (13:38 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 14 May 2016 12:38:10 +0000 (13:38 +0100)
src/fuzzy_storage.c
src/worker.c

index a1881aff32054ffebeeeb126773577a18e486e94..d8f82104fc447ba0c3877c6641a7ac0b03f5cd9b 100644 (file)
@@ -40,7 +40,7 @@
 /* Resync value in seconds */
 #define DEFAULT_SYNC_TIMEOUT 60.0
 #define DEFAULT_KEYPAIR_CACHE_SIZE 512
-
+#define DEFAULT_MASTER_TIMEOUT 10.0
 
 #define INVALID_NODE_TIME (guint64) - 1
 
@@ -53,7 +53,7 @@ worker_t fuzzy_worker = {
                init_fuzzy,                 /* Init function */
                start_fuzzy,                /* Start function */
                RSPAMD_WORKER_HAS_SOCKET,
-               RSPAMD_WORKER_SOCKET_UDP,   /* UDP socket */
+               RSPAMD_WORKER_SOCKET_UDP|RSPAMD_WORKER_SOCKET_TCP,   /* Both socket */
                RSPAMD_WORKER_VER           /* Version info */
 };
 
@@ -93,7 +93,12 @@ struct rspamd_fuzzy_storage_ctx {
        gdouble expire;
        gdouble sync_timeout;
        radix_compressed_t *update_ips;
+       radix_compressed_t *master_ips;
+       struct rspamd_cryptobox_keypair *master_key;
+       struct timeval master_io_tv;
+       gdouble master_timeout;
        gchar *update_map;
+       gchar *masters_map;
        guint keypair_cache_size;
        struct event_base *ev_base;
        gint peer_fd;
@@ -160,6 +165,12 @@ struct fuzzy_key {
        struct fuzzy_key_stat *stat;
 };
 
+struct fuzzy_master_update_session {
+       struct rspamd_http_connection *conn;
+       struct rspamd_fuzzy_storage_ctx *ctx;
+       rspamd_inet_addr_t *addr;
+};
+
 static void rspamd_fuzzy_write_reply (struct fuzzy_session *session);
 
 static gboolean
@@ -703,6 +714,102 @@ fuzzy_session_destroy (gpointer d)
        g_slice_free1 (sizeof (*session), session);
 }
 
+static void
+rspamd_fuzzy_mirror_session_destroy (struct fuzzy_master_update_session *session)
+{
+       if (session) {
+               rspamd_http_connection_unref (session->conn);
+               rspamd_inet_address_destroy (session->addr);
+               g_slice_free1 (sizeof (*session), session);
+       }
+}
+
+static void
+rspamd_fuzzy_mirror_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+       struct fuzzy_master_update_session *session = conn->ud;
+
+       msg_err ("abnormally closing connection from: %s, error: %e",
+               rspamd_inet_address_to_string (session->addr), err);
+       /* Terminate session immediately */
+       rspamd_fuzzy_mirror_session_destroy (session);
+}
+
+static gint
+rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
+       struct rspamd_http_message *msg)
+{
+       struct fuzzy_master_update_session *session = conn->ud;
+
+       /* TODO: implement updates */
+       rspamd_fuzzy_mirror_session_destroy (session);
+
+       return 0;
+}
+
+static void
+accept_fuzzy_mirror_socket (gint fd, short what, void *arg)
+{
+       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+       rspamd_inet_addr_t *addr;
+       gint nfd;
+       struct rspamd_http_connection *http_conn;
+       struct rspamd_fuzzy_storage_ctx *ctx;
+       struct fuzzy_master_update_session *session;
+
+       if ((nfd =
+                       rspamd_accept_from_socket (fd, &addr)) == -1) {
+               msg_warn ("accept failed: %s", strerror (errno));
+               return;
+       }
+       /* Check for EAGAIN */
+       if (nfd == 0) {
+               return;
+       }
+
+       ctx = worker->ctx;
+
+       if (!ctx->master_ips) {
+               msg_err ("deny update request from %s as no masters defined",
+                               rspamd_inet_address_to_string (addr));
+               rspamd_inet_address_destroy (addr);
+               close (nfd);
+
+               return;
+       }
+       else if (radix_find_compressed_addr (ctx->master_ips, addr) == RADIX_NO_VALUE) {
+               msg_err ("deny update request from %s",
+                               rspamd_inet_address_to_string (addr));
+               rspamd_inet_address_destroy (addr);
+               close (nfd);
+
+               return;
+       }
+
+       session = g_slice_alloc0 (sizeof (*session));
+       http_conn = rspamd_http_connection_new (
+                       NULL,
+                       rspamd_fuzzy_mirror_error_handler,
+                       rspamd_fuzzy_mirror_finish_handler,
+                       0,
+                       RSPAMD_HTTP_SERVER,
+                       ctx->keypair_cache);
+
+       if (ctx->master_key) {
+               rspamd_http_connection_set_key (http_conn, ctx->master_key);
+       }
+
+       session->ctx = ctx;
+       session->conn = http_conn;
+       session->addr = addr;
+
+       rspamd_http_connection_read_message (http_conn,
+                       session,
+                       nfd,
+                       &ctx->master_io_tv,
+                       ctx->ev_base);
+}
+
 /*
  * Accept new connection and construct task
  */
@@ -1211,6 +1318,7 @@ init_fuzzy (struct rspamd_config *cfg)
 
        ctx->magic = rspamd_fuzzy_storage_magic;
        ctx->sync_timeout = DEFAULT_SYNC_TIMEOUT;
+       ctx->master_timeout = DEFAULT_MASTER_TIMEOUT;
        ctx->expire = DEFAULT_EXPIRE;
        ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE;
        ctx->keys = g_hash_table_new_full (fuzzy_kp_hash, fuzzy_kp_equal,
@@ -1315,6 +1423,32 @@ init_fuzzy (struct rspamd_config *cfg)
                        0,
                        "Allow encrypted requests only (and forbid all unknown keys or plaintext requests)");
 
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "master_timeout",
+                       rspamd_rcl_parse_struct_time,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_timeout),
+                       RSPAMD_CL_FLAG_TIME_FLOAT,
+                       "Master protocol IO timeout");
+
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "master_key",
+                       rspamd_rcl_parse_struct_keypair,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_key),
+                       0,
+                       "Encryption key for master/slave updates");
+
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "masters",
+                       rspamd_rcl_parse_struct_string,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, masters_map),
+                       0,
+                       "Allow master/slave updates from the following IP addresses");
 
        return ctx;
 }
@@ -1371,13 +1505,25 @@ fuzzy_peer_rep (struct rspamd_worker *worker,
                ls = cur->data;
 
                if (ls->fd != -1) {
-                       accept_event = g_slice_alloc0 (sizeof (struct event));
-                       event_set (accept_event, ls->fd, EV_READ | EV_PERSIST,
-                                       accept_fuzzy_socket, worker);
-                       event_base_set (ctx->ev_base, accept_event);
-                       event_add (accept_event, NULL);
-                       worker->accept_events = g_list_prepend (worker->accept_events,
-                                       accept_event);
+                       if (ls->type == RSPAMD_WORKER_SOCKET_UDP) {
+                               accept_event = g_slice_alloc0 (sizeof (struct event));
+                               event_set (accept_event, ls->fd, EV_READ | EV_PERSIST,
+                                               accept_fuzzy_socket, worker);
+                               event_base_set (ctx->ev_base, accept_event);
+                               event_add (accept_event, NULL);
+                               worker->accept_events = g_list_prepend (worker->accept_events,
+                                               accept_event);
+                       }
+                       else if (worker->index == 0) {
+                               /* We allow TCP listeners only for a update worker */
+                               accept_event = g_slice_alloc0 (sizeof (struct event));
+                               event_set (accept_event, ls->fd, EV_READ | EV_PERSIST,
+                                               accept_fuzzy_mirror_socket, worker);
+                               event_base_set (ctx->ev_base, accept_event);
+                               event_add (accept_event, NULL);
+                               worker->accept_events = g_list_prepend (worker->accept_events,
+                                               accept_event);
+                       }
                }
 
                cur = g_list_next (cur);
@@ -1415,6 +1561,7 @@ start_fuzzy (struct rspamd_worker *worker)
                        "fuzzy",
                        NULL);
        ctx->peer_fd = -1;
+       double_to_tv (ctx->master_timeout, &ctx->master_io_tv);
 
        /*
         * Open DB and perform VACUUM
@@ -1443,7 +1590,7 @@ start_fuzzy (struct rspamd_worker *worker)
                        rspamd_fuzzy_storage_stat, ctx);
        rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
                                rspamd_fuzzy_storage_sync, ctx);
-       /* Create radix tree */
+       /* Create radix trees */
        if (ctx->update_map != NULL) {
                if (!rspamd_map_is_map (ctx->update_map)) {
                        if (!radix_add_generic_iplist (ctx->update_map,
@@ -1460,6 +1607,22 @@ start_fuzzy (struct rspamd_worker *worker)
 
                }
        }
+       if (ctx->masters_map != NULL) {
+               if (!rspamd_map_is_map (ctx->masters_map)) {
+                       if (!radix_add_generic_iplist (ctx->masters_map,
+                                       &ctx->master_ips)) {
+                               msg_warn ("cannot load or parse ip list from '%s'",
+                                               ctx->masters_map);
+                       }
+               }
+               else {
+                       rspamd_map_add (worker->srv->cfg, ctx->masters_map,
+                                       "Allow fuzzy master/slave updates from specified addresses",
+                                       rspamd_radix_read, rspamd_radix_fin,
+                                       (void **)&ctx->master_ips);
+
+               }
+       }
 
        /* Maps events */
        ctx->resolver = dns_resolver_init (worker->srv->logger,
index d3e3fc4c87d7e4b8f0bb1a0eeca4e46ebaf784cd..ac104f7f0f8ba659841c99290b67b80de59fcafb 100644 (file)
@@ -320,9 +320,9 @@ accept_socket (gint fd, short what, void *arg)
 
        rspamd_http_connection_read_message (task->http_conn,
                        task,
-               nfd,
-               &ctx->io_tv,
-               ctx->ev_base);
+                       nfd,
+                       &ctx->io_tv,
+                       ctx->ev_base);
 }
 
 #ifdef WITH_HYPERSCAN