]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Implement fuzzy updates push protocol
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 23 May 2016 13:25:07 +0000 (14:25 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 23 May 2016 13:25:07 +0000 (14:25 +0100)
src/fuzzy_storage.c

index db30fc6f863b1a7e472e5ccc0953a7f467860ea1..2f7ba06968f523e4ba6f2065117f39d087129ce1 100644 (file)
@@ -225,6 +225,162 @@ fuzzy_key_dtor (gpointer p)
        g_slice_free1 (sizeof (*key), key);
 }
 
+struct fuzzy_slave_connection {
+       struct rspamd_cryptobox_keypair *local_key;
+       struct rspamd_cryptobox_pubkey *remote_key;
+       struct upstream *up;
+       struct rspamd_http_connection *http_conn;
+       struct rspamd_fuzzy_mirror *mirror;
+       gint sock;
+};
+
+static void
+fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn)
+{
+       if (conn) {
+               if (conn->http_conn) {
+                       rspamd_http_connection_reset (conn->http_conn);
+                       rspamd_http_connection_unref (conn->http_conn);
+               }
+
+               close (conn->sock);
+
+               g_slice_free1 (sizeof (*conn), conn);
+       }
+}
+
+static void
+fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx,
+               struct rspamd_http_message *msg)
+{
+       GList *cur;
+       struct fuzzy_peer_cmd *io_cmd;
+       gsize len;
+       guint32 rev;
+       const gchar *p;
+
+       rev = rspamd_fuzzy_backend_version (ctx->backend);
+       rev = GUINT32_TO_LE (rev);
+       len = sizeof (guint32) * 2; /* revision + last chunk */
+
+       for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
+               io_cmd = cur->data;
+
+               if (io_cmd->is_shingle) {
+                       len += sizeof (guint32) + sizeof (gboolean) +
+                                       sizeof (struct rspamd_fuzzy_shingle_cmd);
+               }
+               else {
+                       len += sizeof (guint32) + sizeof (gboolean) +
+                                       sizeof (struct rspamd_fuzzy_cmd);
+               }
+       }
+
+       msg->body = rspamd_fstring_sized_new (len);
+       msg->body = rspamd_fstring_append (msg->body, (const char *)&rev,
+                       sizeof (rev));
+
+       for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) {
+               io_cmd = cur->data;
+
+               if (io_cmd->is_shingle) {
+                       len = sizeof (gboolean) +
+                                       sizeof (struct rspamd_fuzzy_shingle_cmd);
+               }
+               else {
+                       len = sizeof (gboolean) +
+                                       sizeof (struct rspamd_fuzzy_cmd);
+               }
+
+               p = (const char *)io_cmd;
+               msg->body = rspamd_fstring_append (msg->body, (const char *)&len,
+                                       sizeof (len));
+               msg->body = rspamd_fstring_append (msg->body, p, len);
+       }
+
+       /* Last chunk */
+       len = 0;
+       msg->body = rspamd_fstring_append (msg->body, (const char *)&len,
+                       sizeof (len));
+}
+
+static void
+fuzzy_mirror_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+       struct fuzzy_slave_connection *bk_conn = conn->ud;
+       msg_info ("abnormally closing connection from backend: %s:%s, "
+                       "error: %e",
+                       bk_conn->mirror->name,
+                       rspamd_inet_address_to_string (rspamd_upstream_addr (bk_conn->up)),
+                       err);
+
+       fuzzy_mirror_close_connection (bk_conn);
+}
+
+static gint
+fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
+       struct rspamd_http_message *msg)
+{
+       struct fuzzy_slave_connection *bk_conn = conn->ud;
+
+       msg_info ("finished mirror connection to %s", bk_conn->mirror->name);
+       fuzzy_mirror_close_connection (bk_conn);
+
+       return 0;
+}
+
+static void
+rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
+               struct rspamd_fuzzy_mirror *m)
+{
+       struct fuzzy_slave_connection *conn;
+       struct rspamd_http_message *msg;
+       struct timeval tv;
+
+       conn = g_slice_alloc0 (sizeof (*conn));
+       conn->up = rspamd_upstream_get (m->u,
+                       RSPAMD_UPSTREAM_MASTER_SLAVE, NULL, 0);
+       conn->mirror = m;
+
+       if (conn->up == NULL) {
+               msg_err ("cannot select upstream for %s", m->name);
+               return;
+       }
+
+       conn->sock = rspamd_inet_address_connect (
+                       rspamd_upstream_addr (conn->up),
+                       SOCK_STREAM, TRUE);
+
+       if (conn->sock == -1) {
+               msg_err ("cannot connect upstream for %s", m->name);
+               rspamd_upstream_fail (conn->up);
+               return;
+       }
+
+       msg = rspamd_http_new_message (HTTP_REQUEST);
+       rspamd_printf_fstring (&msg->url, "/update_v1");
+
+       conn->http_conn = rspamd_http_connection_new (
+                       NULL,
+                       fuzzy_mirror_error_handler,
+                       fuzzy_mirror_finish_handler,
+                       RSPAMD_HTTP_CLIENT_SIMPLE,
+                       RSPAMD_HTTP_CLIENT,
+                       ctx->keypair_cache);
+
+       rspamd_http_connection_set_key (conn->http_conn,
+                       ctx->sync_keypair);
+       msg->peer_key = rspamd_pubkey_ref (m->key);
+       double_to_tv (ctx->sync_timeout, &tv);
+       fuzzy_mirror_updates_to_http (ctx, msg);
+
+       rspamd_http_connection_write_message (conn->http_conn,
+                       msg, NULL, NULL, conn,
+                       conn->sock,
+                       &tv, ctx->ev_base);
+       msg_info ("send update request to %s", m->name);
+}
+
 static void
 rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
 {
@@ -232,13 +388,15 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
        struct fuzzy_peer_cmd *io_cmd;
        struct rspamd_fuzzy_cmd *cmd;
        gpointer ptr;
-       guint nupdates = 0;
+       struct rspamd_fuzzy_mirror *m;
+       guint nupdates = 0, i;
        time_t now = time (NULL);
 
        if (ctx->updates_pending &&
                        g_queue_get_length (ctx->updates_pending) > 0 &&
                        rspamd_fuzzy_backend_prepare_update (ctx->backend)) {
                cur = ctx->updates_pending->head;
+
                while (cur) {
                        io_cmd = cur->data;
 
@@ -264,6 +422,14 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx)
 
                if (rspamd_fuzzy_backend_finish_update (ctx->backend)) {
                        ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_count (ctx->backend);
+
+                       for (i = 0; i < ctx->mirrors->len; i ++) {
+                               m = g_ptr_array_index (ctx->mirrors, i);
+
+                               rspamd_fuzzy_send_update_mirror (ctx, m);
+                       }
+
+                       /* Clear updates */
                        cur = ctx->updates_pending->head;
 
                        while (cur) {
@@ -722,7 +888,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
 {
        const guchar *p;
        gsize remain;
-       guint32 revision, our_rev, len;
+       guint32 revision, our_rev, len, cnt = 0;
        struct fuzzy_peer_cmd cmd, *pcmd;
        enum {
                read_len = 0,
@@ -825,6 +991,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
                        remain -= len;
                        len = 0;
                        state = read_len;
+                       cnt ++;
                        break;
                case finish_processing:
                        /* Do nothing */
@@ -840,6 +1007,8 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
        }
 
        rspamd_fuzzy_process_updates_queue (session->ctx);
+       msg_info ("processed updates from the master, %ud operations processed,"
+                       " revision: %ud", cnt, revision);
 
 err:
        if (updates) {
@@ -1595,6 +1764,7 @@ init_fuzzy (struct rspamd_config *cfg)
                        (GDestroyNotify) rspamd_inet_address_destroy, g_free,
                        rspamd_inet_address_hash, rspamd_inet_address_equal);
        ctx->cfg = cfg;
+       ctx->mirrors = g_ptr_array_new ();
 
        rspamd_rcl_register_worker_option (cfg,
                        type,