aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2017-01-27 15:19:09 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2017-01-27 15:19:09 +0000
commit66243984d6fc252e1a4b19bcb050c09005761eed (patch)
tree4f4cc1d2db0a87559712992083307a2d5ef8c22b
parentef7e7cbfe393e392eccfc025c7ac00ec0ed0c020 (diff)
downloadrspamd-66243984d6fc252e1a4b19bcb050c09005761eed.tar.gz
rspamd-66243984d6fc252e1a4b19bcb050c09005761eed.zip
[Fix] Various collection mode fixes
-rw-r--r--src/fuzzy_storage.c62
1 files changed, 34 insertions, 28 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 95bef0e1e..b0606eb97 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -327,11 +327,11 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
io_cmd = cur->data;
if (io_cmd->is_shingle) {
- len += sizeof (guint32) + sizeof (gboolean) +
+ len += sizeof (guint32) + sizeof (guint32) +
sizeof (struct rspamd_fuzzy_shingle_cmd);
}
else {
- len += sizeof (guint32) + sizeof (gboolean) +
+ len += sizeof (guint32) + sizeof (guint32) +
sizeof (struct rspamd_fuzzy_cmd);
}
}
@@ -344,11 +344,11 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
io_cmd = cur->data;
if (io_cmd->is_shingle) {
- len = sizeof (gboolean) +
+ len = sizeof (guint32) +
sizeof (struct rspamd_fuzzy_shingle_cmd);
}
else {
- len = sizeof (gboolean) +
+ len = sizeof (guint32) +
sizeof (struct rspamd_fuzzy_cmd);
}
@@ -459,7 +459,7 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
struct rspamd_updates_cbdata {
struct rspamd_fuzzy_storage_ctx *ctx;
- const gchar *source;
+ gchar *source;
};
static void
@@ -467,6 +467,7 @@ fuzzy_update_version_callback (guint64 ver, void *ud)
{
msg_info ("updated fuzzy storage from %s: version: %d",
(const char *)ud, (gint)ver);
+ g_free (ud);
}
static void
@@ -523,7 +524,7 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud)
g_queue_clear (ctx->updates_pending);
rspamd_fuzzy_backend_version (ctx->backend, source,
- fuzzy_update_version_callback, (void *)source);
+ fuzzy_update_version_callback, g_strdup (source));
ctx->updates_failed = 0;
}
else {
@@ -561,21 +562,22 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud)
event_base_loopexit (ctx->ev_base, &tv);
}
+ g_free (cbdata->source);
g_slice_free1 (sizeof (*cbdata), cbdata);
}
static void
rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
- const gchar *source)
+ const gchar *source, gboolean forced)
{
struct rspamd_updates_cbdata *cbdata;
if (ctx->updates_pending &&
- g_queue_get_length (ctx->updates_pending) > 0) {
+ (forced || g_queue_get_length (ctx->updates_pending) > 0)) {
cbdata = g_slice_alloc (sizeof (*cbdata));
cbdata->ctx = ctx;
- cbdata->source = source;
+ cbdata->source = g_strdup (source);
rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending,
source, rspamd_fuzzy_updates_cb, cbdata);
}
@@ -1114,7 +1116,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
*/
p = rspamd_http_message_get_body (msg, &remain);
- if (p && remain > sizeof (gint32) * 2) {
+ if (p && remain >= sizeof (gint32) * 2) {
memcpy (&revision, p, sizeof (gint32));
revision = GINT32_TO_LE (revision);
@@ -1169,17 +1171,17 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
return;
}
- if (len < sizeof (struct rspamd_fuzzy_cmd) + sizeof (gboolean) ||
+ if (len < sizeof (struct rspamd_fuzzy_cmd) + sizeof (guint32) ||
len > sizeof (cmd)) {
/* Bad size command */
msg_err_fuzzy_update ("incorrect element size: %d, at least "
"%d expected", len,
- (gint)(sizeof (struct rspamd_fuzzy_cmd) + sizeof (gboolean)));
+ (gint)(sizeof (struct rspamd_fuzzy_cmd) + sizeof (guint32)));
goto err;
}
- memcpy (&cmd, p, sizeof (gboolean));
- if (cmd.is_shingle && len < sizeof (cmd)) {
+ memcpy (&cmd, p, len);
+ if (cmd.is_shingle && len != sizeof (cmd)) {
/* Short command */
msg_err_fuzzy_update ("incorrect element size: %d, at least "
"%d expected", len,
@@ -1188,7 +1190,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
}
pcmd = g_slice_alloc (sizeof (cmd));
- memcpy (pcmd, p, len);
+ memcpy (pcmd, &cmd, len);
updates = g_list_prepend (updates, pcmd);
p += len;
@@ -1226,7 +1228,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session,
cur->data = NULL;
}
- rspamd_fuzzy_process_updates_queue (session->ctx, session->src);
+ rspamd_fuzzy_process_updates_queue (session->ctx, session->src, TRUE);
msg_info_fuzzy_update ("processed updates from the master %s, "
"%ud operations processed,"
" revision: %d (local revision: %d)",
@@ -1242,7 +1244,6 @@ err:
}
}
- /* This also update our version id */
g_list_free (updates);
}
}
@@ -1477,7 +1478,7 @@ rspamd_fuzzy_collection_send_fstring (struct rspamd_http_connection_entry *entry
}
static int
-rspamd_fuzzy_collection_cookie_handler (struct rspamd_http_connection_entry *conn_ent,
+rspamd_fuzzy_collection_cookie (struct rspamd_http_connection_entry *conn_ent,
struct rspamd_http_message *msg)
{
struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
@@ -1491,7 +1492,7 @@ rspamd_fuzzy_collection_cookie_handler (struct rspamd_http_connection_entry *con
}
static int
-rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_ent,
+rspamd_fuzzy_collection_data (struct rspamd_http_connection_entry *conn_ent,
struct rspamd_http_message *msg)
{
struct rspamd_fuzzy_collection_session *session = conn_ent->ud;
@@ -1503,7 +1504,7 @@ rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_
GError *err = NULL;
guchar *decoded_signature;
gsize dec_len;
- guint32 cmdlen;
+ guint32 cmdlen, nupdates = 0;
sign_header = rspamd_http_message_find_header (msg, "Signature");
@@ -1561,26 +1562,31 @@ rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_
*/
reply = rspamd_fstring_append (reply, (const gchar *)&ctx->collection_id,
sizeof (ctx->collection_id));
+ cur = ctx->updates_pending->head;
+
while (cur) {
io_cmd = cur->data;
if (io_cmd->is_shingle) {
- cmdlen = sizeof (io_cmd->cmd.shingle);
+ cmdlen = sizeof (io_cmd->cmd.shingle) + sizeof (guint32);
}
else {
- cmdlen = sizeof (io_cmd->cmd.normal);
+ cmdlen = sizeof (io_cmd->cmd.normal) + sizeof (guint32);
}
cmdlen = GUINT32_TO_LE (cmdlen);
reply = rspamd_fstring_append (reply, (const gchar *)&cmdlen,
sizeof (cmdlen));
- reply = rspamd_fstring_append (reply, (const gchar *)&io_cmd->cmd,
+ reply = rspamd_fstring_append (reply, (const gchar *)io_cmd,
cmdlen);
g_slice_free1 (sizeof (*io_cmd), io_cmd);
+ nupdates ++;
cur = g_list_next (cur);
}
+ msg_info_fuzzy_collection ("collection %d done, send %d updates",
+ ctx->collection_id, nupdates);
/* Last command */
cmdlen = 0;
reply = rspamd_fstring_append (reply, (const gchar *)&cmdlen,
@@ -1839,7 +1845,7 @@ rspamd_fuzzy_storage_periodic_callback (void *ud)
struct rspamd_fuzzy_storage_ctx *ctx = ud;
if (g_queue_get_length (ctx->updates_pending) > 0) {
- rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
return TRUE;
}
@@ -1860,7 +1866,7 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main,
rep.reply.fuzzy_sync.status = 0;
if (ctx->backend && worker->index == 0) {
- rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
rspamd_fuzzy_backend_start_update (ctx->backend, ctx->sync_timeout,
rspamd_fuzzy_storage_periodic_callback, ctx);
}
@@ -2788,10 +2794,10 @@ start_fuzzy (struct rspamd_worker *worker)
/* Register paths */
rspamd_http_router_add_path (ctx->collection_rt,
"/cookie",
- rspamd_fuzzy_collection_cookie_handler);
+ rspamd_fuzzy_collection_cookie);
rspamd_http_router_add_path (ctx->collection_rt,
"/data",
- rspamd_fuzzy_collection_data_handler);
+ rspamd_fuzzy_collection_data);
}
}
@@ -2844,7 +2850,7 @@ start_fuzzy (struct rspamd_worker *worker)
if (worker->index == 0 && g_queue_get_length (ctx->updates_pending) > 0) {
if (!ctx->collection_mode) {
- rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name, FALSE);
event_base_loop (ctx->ev_base, 0);
}
}