From f0823abc0076d78ee492e0b2c8d1a1b7921adaec Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 30 Jul 2018 11:07:04 +0100 Subject: [PATCH] [Fix] Freeze updates queue when do actual storage update --- src/fuzzy_storage.c | 57 +++++++++++++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index a0fa9d182..f248d928f 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -319,6 +319,7 @@ struct rspamd_fuzzy_updates_cbdata { struct rspamd_http_message *msg; struct fuzzy_slave_connection *conn; struct rspamd_fuzzy_mirror *m; + GArray *updates_pending; }; static void @@ -340,12 +341,13 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud) ctx = cbdata->ctx; msg = cbdata->msg; m = cbdata->m; - g_free (cbdata); + rev32 = GUINT32_TO_LE (rev32); len = sizeof (guint32) * 2; /* revision + last chunk */ - for (i = 0; i < ctx->updates_pending->len; i ++) { - io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i); + for (i = 0; i < cbdata->updates_pending->len; i ++) { + io_cmd = &g_array_index (cbdata->updates_pending, + struct fuzzy_peer_cmd, i); if (io_cmd->is_shingle) { len += sizeof (guint32) + sizeof (guint32) + @@ -361,8 +363,8 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud) reply = rspamd_fstring_append (reply, (const char *)&rev32, sizeof (rev32)); - for (i = 0; i < ctx->updates_pending->len; i ++) { - io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i); + for (i = 0; i < cbdata->updates_pending->len; i ++) { + io_cmd = &g_array_index (cbdata->updates_pending, struct fuzzy_peer_cmd, i); if (io_cmd->is_shingle) { len = sizeof (guint32) + @@ -389,13 +391,17 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud) conn->sock, &tv, ctx->ev_base); msg_info ("send update request to %s", m->name); + + g_array_free (cbdata->updates_pending, TRUE); + g_free (cbdata); } static void fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m, - struct fuzzy_slave_connection *conn, - struct rspamd_fuzzy_storage_ctx *ctx, - struct rspamd_http_message *msg) + struct fuzzy_slave_connection *conn, + struct rspamd_fuzzy_storage_ctx *ctx, + struct rspamd_http_message *msg, + GArray *updates) { struct rspamd_fuzzy_updates_cbdata *cbdata; @@ -405,6 +411,10 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m, cbdata->msg = msg; cbdata->conn = conn; cbdata->m = m; + /* Copy queue */ + cbdata->updates_pending = g_array_sized_new (FALSE, FALSE, + sizeof (struct fuzzy_peer_cmd), updates->len); + g_array_append_vals (cbdata->updates_pending, updates->data, updates->len); rspamd_fuzzy_backend_version (ctx->backend, local_db_name, fuzzy_mirror_updates_version_cb, cbdata); } @@ -436,7 +446,7 @@ fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn, static void rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx, - struct rspamd_fuzzy_mirror *m) + struct rspamd_fuzzy_mirror *m, GArray *updates) { struct fuzzy_slave_connection *conn; struct rspamd_http_message *msg; @@ -475,10 +485,11 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx, rspamd_http_connection_set_key (conn->http_conn, ctx->sync_keypair); msg->peer_key = rspamd_pubkey_ref (m->key); - fuzzy_mirror_updates_to_http (m, conn, ctx, msg); + fuzzy_mirror_updates_to_http (m, conn, ctx, msg, updates); } struct rspamd_updates_cbdata { + GArray *updates_pending; struct rspamd_fuzzy_storage_ctx *ctx; gchar *source; }; @@ -533,16 +544,17 @@ rspamd_fuzzy_updates_cb (gboolean success, for (i = 0; i < ctx->mirrors->len; i ++) { m = g_ptr_array_index (ctx->mirrors, i); - rspamd_fuzzy_send_update_mirror (ctx, m); + rspamd_fuzzy_send_update_mirror (ctx, m, + cbdata->updates_pending); } } msg_info ("successfully updated fuzzy storage: %d updates in queue; " + "%d pending currently; " "%d added, %d deleted, %d extended, %d duplicates", + cbdata->updates_pending->len, ctx->updates_pending->len, nadded, ndeleted, nextended, nignored); - /* Clear updates */ - ctx->updates_pending->len = 0; rspamd_fuzzy_backend_version (ctx->backend, source, fuzzy_update_version_callback, g_strdup (source)); ctx->updates_failed = 0; @@ -551,16 +563,21 @@ rspamd_fuzzy_updates_cb (gboolean success, if (++ctx->updates_failed > ctx->updates_maxfail) { msg_err ("cannot commit update transaction to fuzzy backend, discard " "%ud updates after %d retries", - ctx->updates_pending->len, + cbdata->updates_pending->len, ctx->updates_maxfail); ctx->updates_failed = 0; - ctx->updates_pending->len = 0; } else { msg_err ("cannot commit update transaction to fuzzy backend, " - "%ud updates are still pending, %d updates left", + "%ud updates are still left; %ud currently pending;" + " %d updates left", + cbdata->updates_pending->len, ctx->updates_pending->len, ctx->updates_maxfail - ctx->updates_failed); + /* Move the remaining updates to ctx queue */ + g_array_append_vals (ctx->updates_pending, + cbdata->updates_pending->data, + cbdata->updates_pending->len); } } @@ -574,6 +591,7 @@ rspamd_fuzzy_updates_cb (gboolean success, event_base_loopexit (ctx->ev_base, &tv); } + g_array_free (cbdata->updates_pending, TRUE); g_free (cbdata->source); g_free (cbdata); } @@ -588,8 +606,13 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx, if ((forced ||ctx->updates_pending->len > 0)) { cbdata = g_malloc (sizeof (*cbdata)); cbdata->ctx = ctx; + cbdata->updates_pending = ctx->updates_pending; + ctx->updates_pending = g_array_sized_new (FALSE, FALSE, + sizeof (struct fuzzy_peer_cmd), + MAX (ctx->updates_pending->len, 1024)); cbdata->source = g_strdup (source); - rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending, + rspamd_fuzzy_backend_process_updates (ctx->backend, + cbdata->updates_pending, source, rspamd_fuzzy_updates_cb, cbdata); } } -- 2.39.5