diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2017-01-27 15:19:09 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2017-01-27 15:19:09 +0000 |
commit | 66243984d6fc252e1a4b19bcb050c09005761eed (patch) | |
tree | 4f4cc1d2db0a87559712992083307a2d5ef8c22b | |
parent | ef7e7cbfe393e392eccfc025c7ac00ec0ed0c020 (diff) | |
download | rspamd-66243984d6fc252e1a4b19bcb050c09005761eed.tar.gz rspamd-66243984d6fc252e1a4b19bcb050c09005761eed.zip |
[Fix] Various collection mode fixes
-rw-r--r-- | src/fuzzy_storage.c | 62 |
1 files changed, 34 insertions, 28 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 95bef0e1e..b0606eb97 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -327,11 +327,11 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud) io_cmd = cur->data; if (io_cmd->is_shingle) { - len += sizeof (guint32) + sizeof (gboolean) + + len += sizeof (guint32) + sizeof (guint32) + sizeof (struct rspamd_fuzzy_shingle_cmd); } else { - len += sizeof (guint32) + sizeof (gboolean) + + len += sizeof (guint32) + sizeof (guint32) + sizeof (struct rspamd_fuzzy_cmd); } } @@ -344,11 +344,11 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud) io_cmd = cur->data; if (io_cmd->is_shingle) { - len = sizeof (gboolean) + + len = sizeof (guint32) + sizeof (struct rspamd_fuzzy_shingle_cmd); } else { - len = sizeof (gboolean) + + len = sizeof (guint32) + sizeof (struct rspamd_fuzzy_cmd); } @@ -459,7 +459,7 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx, struct rspamd_updates_cbdata { struct rspamd_fuzzy_storage_ctx *ctx; - const gchar *source; + gchar *source; }; static void @@ -467,6 +467,7 @@ fuzzy_update_version_callback (guint64 ver, void *ud) { msg_info ("updated fuzzy storage from %s: version: %d", (const char *)ud, (gint)ver); + g_free (ud); } static void @@ -523,7 +524,7 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud) g_queue_clear (ctx->updates_pending); rspamd_fuzzy_backend_version (ctx->backend, source, - fuzzy_update_version_callback, (void *)source); + fuzzy_update_version_callback, g_strdup (source)); ctx->updates_failed = 0; } else { @@ -561,21 +562,22 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud) event_base_loopexit (ctx->ev_base, &tv); } + g_free (cbdata->source); g_slice_free1 (sizeof (*cbdata), cbdata); } static void rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx, - const gchar *source) + const gchar *source, gboolean forced) { struct rspamd_updates_cbdata *cbdata; if (ctx->updates_pending && - g_queue_get_length (ctx->updates_pending) > 0) { + (forced || g_queue_get_length (ctx->updates_pending) > 0)) { cbdata = g_slice_alloc (sizeof (*cbdata)); cbdata->ctx = ctx; - cbdata->source = source; + cbdata->source = g_strdup (source); rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending, source, rspamd_fuzzy_updates_cb, cbdata); } @@ -1114,7 +1116,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, */ p = rspamd_http_message_get_body (msg, &remain); - if (p && remain > sizeof (gint32) * 2) { + if (p && remain >= sizeof (gint32) * 2) { memcpy (&revision, p, sizeof (gint32)); revision = GINT32_TO_LE (revision); @@ -1169,17 +1171,17 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, return; } - if (len < sizeof (struct rspamd_fuzzy_cmd) + sizeof (gboolean) || + if (len < sizeof (struct rspamd_fuzzy_cmd) + sizeof (guint32) || len > sizeof (cmd)) { /* Bad size command */ msg_err_fuzzy_update ("incorrect element size: %d, at least " "%d expected", len, - (gint)(sizeof (struct rspamd_fuzzy_cmd) + sizeof (gboolean))); + (gint)(sizeof (struct rspamd_fuzzy_cmd) + sizeof (guint32))); goto err; } - memcpy (&cmd, p, sizeof (gboolean)); - if (cmd.is_shingle && len < sizeof (cmd)) { + memcpy (&cmd, p, len); + if (cmd.is_shingle && len != sizeof (cmd)) { /* Short command */ msg_err_fuzzy_update ("incorrect element size: %d, at least " "%d expected", len, @@ -1188,7 +1190,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, } pcmd = g_slice_alloc (sizeof (cmd)); - memcpy (pcmd, p, len); + memcpy (pcmd, &cmd, len); updates = g_list_prepend (updates, pcmd); p += len; @@ -1226,7 +1228,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, cur->data = NULL; } - rspamd_fuzzy_process_updates_queue (session->ctx, session->src); + rspamd_fuzzy_process_updates_queue (session->ctx, session->src, TRUE); msg_info_fuzzy_update ("processed updates from the master %s, " "%ud operations processed," " revision: %d (local revision: %d)", @@ -1242,7 +1244,6 @@ err: } } - /* This also update our version id */ g_list_free (updates); } } @@ -1477,7 +1478,7 @@ rspamd_fuzzy_collection_send_fstring (struct rspamd_http_connection_entry *entry } static int -rspamd_fuzzy_collection_cookie_handler (struct rspamd_http_connection_entry *conn_ent, +rspamd_fuzzy_collection_cookie (struct rspamd_http_connection_entry *conn_ent, struct rspamd_http_message *msg) { struct rspamd_fuzzy_collection_session *session = conn_ent->ud; @@ -1491,7 +1492,7 @@ rspamd_fuzzy_collection_cookie_handler (struct rspamd_http_connection_entry *con } static int -rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_ent, +rspamd_fuzzy_collection_data (struct rspamd_http_connection_entry *conn_ent, struct rspamd_http_message *msg) { struct rspamd_fuzzy_collection_session *session = conn_ent->ud; @@ -1503,7 +1504,7 @@ rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_ GError *err = NULL; guchar *decoded_signature; gsize dec_len; - guint32 cmdlen; + guint32 cmdlen, nupdates = 0; sign_header = rspamd_http_message_find_header (msg, "Signature"); @@ -1561,26 +1562,31 @@ rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_ */ reply = rspamd_fstring_append (reply, (const gchar *)&ctx->collection_id, sizeof (ctx->collection_id)); + cur = ctx->updates_pending->head; + while (cur) { io_cmd = cur->data; if (io_cmd->is_shingle) { - cmdlen = sizeof (io_cmd->cmd.shingle); + cmdlen = sizeof (io_cmd->cmd.shingle) + sizeof (guint32); } else { - cmdlen = sizeof (io_cmd->cmd.normal); + cmdlen = sizeof (io_cmd->cmd.normal) + sizeof (guint32); } cmdlen = GUINT32_TO_LE (cmdlen); reply = rspamd_fstring_append (reply, (const gchar *)&cmdlen, sizeof (cmdlen)); - reply = rspamd_fstring_append (reply, (const gchar *)&io_cmd->cmd, + reply = rspamd_fstring_append (reply, (const gchar *)io_cmd, cmdlen); g_slice_free1 (sizeof (*io_cmd), io_cmd); + nupdates ++; cur = g_list_next (cur); } + msg_info_fuzzy_collection ("collection %d done, send %d updates", + ctx->collection_id, nupdates); /* Last command */ cmdlen = 0; reply = rspamd_fstring_append (reply, (const gchar *)&cmdlen, @@ -1839,7 +1845,7 @@ rspamd_fuzzy_storage_periodic_callback (void *ud) struct rspamd_fuzzy_storage_ctx *ctx = ud; if (g_queue_get_length (ctx->updates_pending) > 0) { - rspamd_fuzzy_process_updates_queue (ctx, local_db_name); + rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE); return TRUE; } @@ -1860,7 +1866,7 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main, rep.reply.fuzzy_sync.status = 0; if (ctx->backend && worker->index == 0) { - rspamd_fuzzy_process_updates_queue (ctx, local_db_name); + rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE); rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout, rspamd_fuzzy_storage_periodic_callback, ctx); } @@ -2788,10 +2794,10 @@ start_fuzzy (struct rspamd_worker *worker) /* Register paths */ rspamd_http_router_add_path (ctx->collection_rt, "/cookie", - rspamd_fuzzy_collection_cookie_handler); + rspamd_fuzzy_collection_cookie); rspamd_http_router_add_path (ctx->collection_rt, "/data", - rspamd_fuzzy_collection_data_handler); + rspamd_fuzzy_collection_data); } } @@ -2844,7 +2850,7 @@ start_fuzzy (struct rspamd_worker *worker) if (worker->index == 0 && g_queue_get_length (ctx->updates_pending) > 0) { if (!ctx->collection_mode) { - rspamd_fuzzy_process_updates_queue (ctx, local_db_name); + rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE); event_base_loop (ctx->ev_base, 0); } } |