]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Implement fuzzy storage updates
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 23 May 2016 11:18:00 +0000 (12:18 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 23 May 2016 11:18:00 +0000 (12:18 +0100)
src/fuzzy_storage.c

index d8f82104fc447ba0c3877c6641a7ac0b03f5cd9b..db1557e470ce8fcf44eef4be454aab5020876f8a 100644 (file)
@@ -94,7 +94,8 @@ struct rspamd_fuzzy_storage_ctx {
        gdouble sync_timeout;
        radix_compressed_t *update_ips;
        radix_compressed_t *master_ips;
-       struct rspamd_cryptobox_keypair *master_key;
+       struct rspamd_cryptobox_keypair *sync_keypair;
+       struct rspamd_cryptobox_pubkey *master_key;
        struct timeval master_io_tv;
        gdouble master_timeout;
        gchar *update_map;
@@ -703,6 +704,144 @@ rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s)
        return TRUE;
 }
 
+static void
+rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
+               struct rspamd_http_message *msg)
+{
+       const guchar *p;
+       gsize remain;
+       guint32 revision, our_rev, len;
+       struct fuzzy_peer_cmd cmd, *pcmd;
+       enum {
+               read_len = 0,
+               read_data,
+               finish_processing
+       } state = read_len;
+       GList *updates = NULL, *cur;
+
+       if (!msg->body || msg->body->len == 0) {
+               msg_err ("empty update message, not processing");
+
+               return;
+       }
+
+       /*
+        * Message format:
+        * <uint32_le> - revision
+        * <uint32_le> - size of the next element
+        * <data> - command data
+        * ...
+        * <0> - end of data
+        * ... - ignored
+        */
+       p = (const guchar *)msg->body->str;
+       remain = msg->body->len;
+
+       if (remain > sizeof (guint32) * 2) {
+               memcpy (&revision, p, sizeof (guint32));
+               revision = GUINT32_TO_LE (revision);
+               our_rev = rspamd_fuzzy_backend_version (session->ctx->backend);
+
+               if (revision <= our_rev) {
+                       msg_err ("remote revision:d %d is older than ours: %d, refusing update",
+                                       revision, our_rev);
+
+                       return;
+               }
+               else if (revision - our_rev > 1) {
+                       msg_warn ("remote revision:d %d is newer more than 1 revision "
+                                       "than ours: %d, cold sync is recommended",
+                                                               revision, our_rev);
+               }
+
+               remain -= sizeof (guint32);
+               p += sizeof (guint32);
+       }
+       else {
+               msg_err ("short update message, not processing");
+               goto err;
+       }
+
+       while (remain > 0) {
+               switch (state) {
+               case read_len:
+                       if (remain < sizeof (guint32)) {
+                               msg_err ("short update message while reading length, not processing");
+                               goto err;
+                       }
+
+                       memcpy (&len, p, sizeof (guint32));
+                       len = GUINT32_TO_LE (len);
+                       remain -= sizeof (guint32);
+                       p += sizeof (guint32);
+
+                       if (len == 0) {
+                               remain = 0;
+                               state = finish_processing;
+                       }
+                       else {
+                               state = read_data;
+                       }
+                       break;
+               case read_data:
+                       if (remain < len) {
+                               msg_err ("short update message while reading data, not processing");
+                               return;
+                       }
+
+                       if (len < sizeof (struct rspamd_fuzzy_cmd) + sizeof (gboolean) ||
+                                       len > sizeof (cmd)) {
+                               /* Bad size command */
+                               msg_err ("incorrect element size: %d, at least %d expected", len,
+                                               (gint)(sizeof (struct rspamd_fuzzy_cmd) + sizeof (gboolean)));
+                               goto err;
+                       }
+
+                       memcpy (&cmd, p, sizeof (gboolean));
+                       if (cmd.is_shingle && len < sizeof (cmd)) {
+                               /* Short command */
+                               msg_err ("incorrect element size: %d, at least %d expected", len,
+                                               (gint)(sizeof (cmd)));
+                               goto err;
+                       }
+
+                       pcmd = g_slice_alloc (sizeof (cmd));
+                       memcpy (pcmd, p, len);
+                       updates = g_list_prepend (updates, pcmd);
+
+                       p += len;
+                       remain -= len;
+                       len = 0;
+                       state = read_len;
+                       break;
+               case finish_processing:
+                       /* Do nothing */
+                       remain = 0;
+                       break;
+               }
+       }
+
+       /* Insert elements to the updates from head */
+       for (cur = updates; cur != NULL; cur = g_list_next (cur)) {
+               g_queue_push_head (session->ctx->updates_pending, cur->data);
+               cur->data = NULL;
+       }
+
+       rspamd_fuzzy_process_updates_queue (session->ctx);
+
+err:
+       if (updates) {
+               for (cur = updates; cur != NULL; cur = g_list_next (cur)) {
+                       if (cur->data) {
+                               g_slice_free1 (sizeof (cmd), cur->data);
+                       }
+               }
+
+               g_list_free (updates);
+       }
+}
+
+
 static void
 fuzzy_session_destroy (gpointer d)
 {
@@ -740,8 +879,35 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
        struct rspamd_http_message *msg)
 {
        struct fuzzy_master_update_session *session = conn->ud;
+       const struct rspamd_cryptobox_pubkey *rk;
+
+       /* Check key */
+       if (!rspamd_http_connection_is_encrypted (conn)) {
+               msg_err ("refuse unencrypted update from: %s",
+                               rspamd_inet_address_to_string (session->addr));
+               goto end;
+       }
+       else {
+
+               if (session->ctx->master_key) {
+                       rk = rspamd_http_connection_get_peer_key (conn);
+                       g_assert (rk != NULL);
+
+                       if (!rspamd_pubkey_equal (rk, session->ctx->master_key)) {
+                               msg_err ("refuse unknown pubkey update from: %s",
+                                               rspamd_inet_address_to_string (session->addr));
+                               goto end;
+                       }
+               }
+               else {
+                       msg_warn ("no trusted key specified, accept any update from %s",
+                                       rspamd_inet_address_to_string (session->addr));
+               }
+
+               rspamd_fuzzy_mirror_process_update (session, msg);
+       }
 
-       /* TODO: implement updates */
+end:
        rspamd_fuzzy_mirror_session_destroy (session);
 
        return 0;
@@ -786,6 +952,15 @@ accept_fuzzy_mirror_socket (gint fd, short what, void *arg)
                return;
        }
 
+       if (!ctx->sync_keypair) {
+               msg_err ("deny update request from %s, as no local keypair is specified",
+                               rspamd_inet_address_to_string (addr));
+               rspamd_inet_address_destroy (addr);
+               close (nfd);
+
+               return;
+       }
+
        session = g_slice_alloc0 (sizeof (*session));
        http_conn = rspamd_http_connection_new (
                        NULL,
@@ -795,10 +970,7 @@ accept_fuzzy_mirror_socket (gint fd, short what, void *arg)
                        RSPAMD_HTTP_SERVER,
                        ctx->keypair_cache);
 
-       if (ctx->master_key) {
-               rspamd_http_connection_set_key (http_conn, ctx->master_key);
-       }
-
+       rspamd_http_connection_set_key (http_conn, ctx->sync_keypair);
        session->ctx = ctx;
        session->conn = http_conn;
        session->addr = addr;
@@ -1434,10 +1606,10 @@ init_fuzzy (struct rspamd_config *cfg)
 
        rspamd_rcl_register_worker_option (cfg,
                        type,
-                       "master_key",
+                       "sync_keypair",
                        rspamd_rcl_parse_struct_keypair,
                        ctx,
-                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_key),
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, sync_keypair),
                        0,
                        "Encryption key for master/slave updates");
 
@@ -1450,6 +1622,15 @@ init_fuzzy (struct rspamd_config *cfg)
                        0,
                        "Allow master/slave updates from the following IP addresses");
 
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "master_key",
+                       rspamd_rcl_parse_struct_pubkey,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_key),
+                       0,
+                       "Allow master/slave updates merely using the specified key");
+
        return ctx;
 }