]> source.dussan.org Git - rspamd.git/commitdiff
[Fix] Various collection mode fixes
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 27 Jan 2017 15:19:09 +0000 (15:19 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 27 Jan 2017 15:19:09 +0000 (15:19 +0000)
src/fuzzy_storage.c

index 95bef0e1e50651513303ca3f7dd0b5b0b68fc33f..b0606eb97c133b3046c1e5aac8d735e82fc0d5f7 100644 (file)
@@ -327,11 +327,11 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
                io_cmd = cur->data;
 
                if (io_cmd->is_shingle) {
-                       len += sizeof (guint32) + sizeof (gboolean) +
+                       len += sizeof (guint32) + sizeof (guint32) +
                                        sizeof (struct rspamd_fuzzy_shingle_cmd);
                }
                else {
-                       len += sizeof (guint32) + sizeof (gboolean) +
+                       len += sizeof (guint32) + sizeof (guint32) +
                                        sizeof (struct rspamd_fuzzy_cmd);
                }
        }
@@ -344,11 +344,11 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
                io_cmd = cur->data;
 
                if (io_cmd->is_shingle) {
-                       len = sizeof (gboolean) +
+                       len = sizeof (guint32) +
                                        sizeof (struct rspamd_fuzzy_shingle_cmd);
                }
                else {
-                       len = sizeof (gboolean) +
+                       len = sizeof (guint32) +
                                        sizeof (struct rspamd_fuzzy_cmd);
                }
 
@@ -459,7 +459,7 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
 
 struct rspamd_updates_cbdata {
        struct rspamd_fuzzy_storage_ctx *ctx;
-       const gchar *source;
+       gchar *source;
 };
 
 static void
@@ -467,6 +467,7 @@ fuzzy_update_version_callback (guint64 ver, void *ud)
 {
        msg_info ("updated fuzzy storage from %s: version: %d",
                (const char *)ud, (gint)ver);
+       g_free (ud);
 }
 
 static void
@@ -523,7 +524,7 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud)
 
                g_queue_clear (ctx->updates_pending);
                rspamd_fuzzy_backend_version (ctx->backend, source,
-                               fuzzy_update_version_callback, (void *)source);
+                               fuzzy_update_version_callback, g_strdup (source));
                ctx->updates_failed = 0;
        }
        else {
@@ -561,21 +562,22 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud)
                event_base_loopexit (ctx->ev_base, &tv);
        }
 
+       g_free (cbdata->source);
        g_slice_free1 (sizeof (*cbdata), cbdata);
 }
 
 static void
 rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
-               const gchar *source)
+               const gchar *source, gboolean forced)
 {
 
        struct rspamd_updates_cbdata *cbdata;
 
        if (ctx->updates_pending &&
-                       g_queue_get_length (ctx->updates_pending) > 0) {
+                       (forced || g_queue_get_length (ctx->updates_pending) > 0)) {
                cbdata = g_slice_alloc (sizeof (*cbdata));
                cbdata->ctx = ctx;
-               cbdata->source = source;
+               cbdata->source = g_strdup (source);
                rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending,
                                source, rspamd_fuzzy_updates_cb, cbdata);
        }
@@ -1114,7 +1116,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
         */
        p = rspamd_http_message_get_body (msg, &remain);
 
-       if (p && remain > sizeof (gint32) * 2) {
+       if (p && remain >= sizeof (gint32) * 2) {
                memcpy (&revision, p, sizeof (gint32));
                revision = GINT32_TO_LE (revision);
 
@@ -1169,17 +1171,17 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
                                return;
                        }
 
-                       if (len < sizeof (struct rspamd_fuzzy_cmd) + sizeof (gboolean) ||
+                       if (len < sizeof (struct rspamd_fuzzy_cmd) + sizeof (guint32) ||
                                        len > sizeof (cmd)) {
                                /* Bad size command */
                                msg_err_fuzzy_update ("incorrect element size: %d, at least "
                                                "%d expected", len,
-                                               (gint)(sizeof (struct rspamd_fuzzy_cmd) + sizeof (gboolean)));
+                                               (gint)(sizeof (struct rspamd_fuzzy_cmd) + sizeof (guint32)));
                                goto err;
                        }
 
-                       memcpy (&cmd, p, sizeof (gboolean));
-                       if (cmd.is_shingle && len < sizeof (cmd)) {
+                       memcpy (&cmd, p, len);
+                       if (cmd.is_shingle && len != sizeof (cmd)) {
                                /* Short command */
                                msg_err_fuzzy_update ("incorrect element size: %d, at least "
                                                "%d expected", len,
@@ -1188,7 +1190,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
                        }
 
                        pcmd = g_slice_alloc (sizeof (cmd));
-                       memcpy (pcmd, p, len);
+                       memcpy (pcmd, &cmd, len);
                        updates = g_list_prepend (updates, pcmd);
 
                        p += len;
@@ -1226,7 +1228,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
                cur->data = NULL;
        }
 
-       rspamd_fuzzy_process_updates_queue (session->ctx, session->src);
+       rspamd_fuzzy_process_updates_queue (session->ctx, session->src, TRUE);
        msg_info_fuzzy_update ("processed updates from the master %s, "
                        "%ud operations processed,"
                        " revision: %d (local revision: %d)",
@@ -1242,7 +1244,6 @@ err:
                        }
                }
 
-               /* This also update our version id */
                g_list_free (updates);
        }
 }
@@ -1477,7 +1478,7 @@ rspamd_fuzzy_collection_send_fstring (struct rspamd_http_connection_entry *entry
 }
 
 static int
-rspamd_fuzzy_collection_cookie_handler (struct rspamd_http_connection_entry *conn_ent,
+rspamd_fuzzy_collection_cookie (struct rspamd_http_connection_entry *conn_ent,
        struct rspamd_http_message *msg)
 {
        struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
@@ -1491,7 +1492,7 @@ rspamd_fuzzy_collection_cookie_handler (struct rspamd_http_connection_entry *con
 }
 
 static int
-rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_ent,
+rspamd_fuzzy_collection_data (struct rspamd_http_connection_entry *conn_ent,
        struct rspamd_http_message *msg)
 {
        struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
@@ -1503,7 +1504,7 @@ rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_
        GError *err = NULL;
        guchar *decoded_signature;
        gsize dec_len;
-       guint32 cmdlen;
+       guint32 cmdlen, nupdates = 0;
 
        sign_header = rspamd_http_message_find_header (msg, "Signature");
 
@@ -1561,26 +1562,31 @@ rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_
         */
        reply = rspamd_fstring_append (reply, (const gchar *)&ctx->collection_id,
                                        sizeof (ctx->collection_id));
+       cur = ctx->updates_pending->head;
+
        while (cur) {
                io_cmd = cur->data;
 
                if (io_cmd->is_shingle) {
-                       cmdlen = sizeof (io_cmd->cmd.shingle);
+                       cmdlen = sizeof (io_cmd->cmd.shingle) + sizeof (guint32);
 
                }
                else {
-                       cmdlen = sizeof (io_cmd->cmd.normal);
+                       cmdlen = sizeof (io_cmd->cmd.normal) + sizeof (guint32);
                }
 
                cmdlen = GUINT32_TO_LE (cmdlen);
                reply = rspamd_fstring_append (reply, (const gchar *)&cmdlen,
                                sizeof (cmdlen));
-               reply = rspamd_fstring_append (reply, (const gchar *)&io_cmd->cmd,
+               reply = rspamd_fstring_append (reply, (const gchar *)io_cmd,
                                cmdlen);
                g_slice_free1 (sizeof (*io_cmd), io_cmd);
+               nupdates ++;
                cur = g_list_next (cur);
        }
 
+       msg_info_fuzzy_collection ("collection %d done, send %d updates",
+                       ctx->collection_id, nupdates);
        /* Last command */
        cmdlen = 0;
        reply = rspamd_fstring_append (reply, (const gchar *)&cmdlen,
@@ -1839,7 +1845,7 @@ rspamd_fuzzy_storage_periodic_callback (void *ud)
        struct rspamd_fuzzy_storage_ctx *ctx = ud;
 
        if (g_queue_get_length (ctx->updates_pending) > 0) {
-               rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+               rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
 
                return TRUE;
        }
@@ -1860,7 +1866,7 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
        rep.reply.fuzzy_sync.status = 0;
 
        if (ctx->backend && worker->index == 0) {
-               rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+               rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
                rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
                                rspamd_fuzzy_storage_periodic_callback, ctx);
        }
@@ -2788,10 +2794,10 @@ start_fuzzy (struct rspamd_worker *worker)
                        /* Register paths */
                        rspamd_http_router_add_path (ctx->collection_rt,
                                        "/cookie",
-                                       rspamd_fuzzy_collection_cookie_handler);
+                                       rspamd_fuzzy_collection_cookie);
                        rspamd_http_router_add_path (ctx->collection_rt,
                                        "/data",
-                                       rspamd_fuzzy_collection_data_handler);
+                                       rspamd_fuzzy_collection_data);
                }
        }
 
@@ -2844,7 +2850,7 @@ start_fuzzy (struct rspamd_worker *worker)
 
        if (worker->index == 0 && g_queue_get_length (ctx->updates_pending) > 0) {
                if (!ctx->collection_mode) {
-                       rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+                       rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
                        event_base_loop (ctx->ev_base, 0);
                }
        }