From 40d5f05721ac5d96a99659407eac91eab2420f86 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 23 May 2016 12:18:00 +0100 Subject: [PATCH] [Feature] Implement fuzzy storage updates --- src/fuzzy_storage.c | 197 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 189 insertions(+), 8 deletions(-) diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index d8f82104f..db1557e47 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -94,7 +94,8 @@ struct rspamd_fuzzy_storage_ctx { gdouble sync_timeout; radix_compressed_t *update_ips; radix_compressed_t *master_ips; - struct rspamd_cryptobox_keypair *master_key; + struct rspamd_cryptobox_keypair *sync_keypair; + struct rspamd_cryptobox_pubkey *master_key; struct timeval master_io_tv; gdouble master_timeout; gchar *update_map; @@ -703,6 +704,144 @@ rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s) return TRUE; } +static void +rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, + struct rspamd_http_message *msg) +{ + const guchar *p; + gsize remain; + guint32 revision, our_rev, len; + struct fuzzy_peer_cmd cmd, *pcmd; + enum { + read_len = 0, + read_data, + finish_processing + } state = read_len; + GList *updates = NULL, *cur; + + if (!msg->body || msg->body->len == 0) { + msg_err ("empty update message, not processing"); + + return; + } + + /* + * Message format: + * - revision + * - size of the next element + * - command data + * ... + * <0> - end of data + * ... - ignored + */ + p = (const guchar *)msg->body->str; + remain = msg->body->len; + + if (remain > sizeof (guint32) * 2) { + memcpy (&revision, p, sizeof (guint32)); + revision = GUINT32_TO_LE (revision); + our_rev = rspamd_fuzzy_backend_version (session->ctx->backend); + + if (revision <= our_rev) { + msg_err ("remote revision:d %d is older than ours: %d, refusing update", + revision, our_rev); + + return; + } + else if (revision - our_rev > 1) { + msg_warn ("remote revision:d %d is newer more than 1 revision " + "than ours: %d, cold sync is recommended", + revision, our_rev); + } + + remain -= sizeof (guint32); + p += sizeof (guint32); + } + else { + msg_err ("short update message, not processing"); + goto err; + } + + while (remain > 0) { + switch (state) { + case read_len: + if (remain < sizeof (guint32)) { + msg_err ("short update message while reading length, not processing"); + goto err; + } + + memcpy (&len, p, sizeof (guint32)); + len = GUINT32_TO_LE (len); + remain -= sizeof (guint32); + p += sizeof (guint32); + + if (len == 0) { + remain = 0; + state = finish_processing; + } + else { + state = read_data; + } + break; + case read_data: + if (remain < len) { + msg_err ("short update message while reading data, not processing"); + return; + } + + if (len < sizeof (struct rspamd_fuzzy_cmd) + sizeof (gboolean) || + len > sizeof (cmd)) { + /* Bad size command */ + msg_err ("incorrect element size: %d, at least %d expected", len, + (gint)(sizeof (struct rspamd_fuzzy_cmd) + sizeof (gboolean))); + goto err; + } + + memcpy (&cmd, p, sizeof (gboolean)); + if (cmd.is_shingle && len < sizeof (cmd)) { + /* Short command */ + msg_err ("incorrect element size: %d, at least %d expected", len, + (gint)(sizeof (cmd))); + goto err; + } + + pcmd = g_slice_alloc (sizeof (cmd)); + memcpy (pcmd, p, len); + updates = g_list_prepend (updates, pcmd); + + p += len; + remain -= len; + len = 0; + state = read_len; + break; + case finish_processing: + /* Do nothing */ + remain = 0; + break; + } + } + + /* Insert elements to the updates from head */ + for (cur = updates; cur != NULL; cur = g_list_next (cur)) { + g_queue_push_head (session->ctx->updates_pending, cur->data); + cur->data = NULL; + } + + rspamd_fuzzy_process_updates_queue (session->ctx); + +err: + if (updates) { + for (cur = updates; cur != NULL; cur = g_list_next (cur)) { + if (cur->data) { + g_slice_free1 (sizeof (cmd), cur->data); + } + } + + g_list_free (updates); + } +} + + static void fuzzy_session_destroy (gpointer d) { @@ -740,8 +879,35 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn, struct rspamd_http_message *msg) { struct fuzzy_master_update_session *session = conn->ud; + const struct rspamd_cryptobox_pubkey *rk; + + /* Check key */ + if (!rspamd_http_connection_is_encrypted (conn)) { + msg_err ("refuse unencrypted update from: %s", + rspamd_inet_address_to_string (session->addr)); + goto end; + } + else { + + if (session->ctx->master_key) { + rk = rspamd_http_connection_get_peer_key (conn); + g_assert (rk != NULL); + + if (!rspamd_pubkey_equal (rk, session->ctx->master_key)) { + msg_err ("refuse unknown pubkey update from: %s", + rspamd_inet_address_to_string (session->addr)); + goto end; + } + } + else { + msg_warn ("no trusted key specified, accept any update from %s", + rspamd_inet_address_to_string (session->addr)); + } + + rspamd_fuzzy_mirror_process_update (session, msg); + } - /* TODO: implement updates */ +end: rspamd_fuzzy_mirror_session_destroy (session); return 0; @@ -786,6 +952,15 @@ accept_fuzzy_mirror_socket (gint fd, short what, void *arg) return; } + if (!ctx->sync_keypair) { + msg_err ("deny update request from %s, as no local keypair is specified", + rspamd_inet_address_to_string (addr)); + rspamd_inet_address_destroy (addr); + close (nfd); + + return; + } + session = g_slice_alloc0 (sizeof (*session)); http_conn = rspamd_http_connection_new ( NULL, @@ -795,10 +970,7 @@ accept_fuzzy_mirror_socket (gint fd, short what, void *arg) RSPAMD_HTTP_SERVER, ctx->keypair_cache); - if (ctx->master_key) { - rspamd_http_connection_set_key (http_conn, ctx->master_key); - } - + rspamd_http_connection_set_key (http_conn, ctx->sync_keypair); session->ctx = ctx; session->conn = http_conn; session->addr = addr; @@ -1434,10 +1606,10 @@ init_fuzzy (struct rspamd_config *cfg) rspamd_rcl_register_worker_option (cfg, type, - "master_key", + "sync_keypair", rspamd_rcl_parse_struct_keypair, ctx, - G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_key), + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, sync_keypair), 0, "Encryption key for master/slave updates"); @@ -1450,6 +1622,15 @@ init_fuzzy (struct rspamd_config *cfg) 0, "Allow master/slave updates from the following IP addresses"); + rspamd_rcl_register_worker_option (cfg, + type, + "master_key", + rspamd_rcl_parse_struct_pubkey, + ctx, + G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, master_key), + 0, + "Allow master/slave updates merely using the specified key"); + return ctx; } -- 2.39.5