]> source.dussan.org Git - rspamd.git/commitdiff
[Fix] Freeze updates queue when do actual storage update
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 30 Jul 2018 10:07:04 +0000 (11:07 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 30 Jul 2018 10:07:04 +0000 (11:07 +0100)
src/fuzzy_storage.c

index a0fa9d182e5429db5dbaab3476c126ceb9c34c38..f248d928fdbe4c72e0298b420a283d54f24c377c 100644 (file)
@@ -319,6 +319,7 @@ struct rspamd_fuzzy_updates_cbdata {
        struct rspamd_http_message *msg;
        struct fuzzy_slave_connection *conn;
        struct rspamd_fuzzy_mirror *m;
+       GArray *updates_pending;
 };
 
 static void
@@ -340,12 +341,13 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
        ctx = cbdata->ctx;
        msg = cbdata->msg;
        m = cbdata->m;
-       g_free (cbdata);
+
        rev32 = GUINT32_TO_LE (rev32);
        len = sizeof (guint32) * 2; /* revision + last chunk */
 
-       for (i = 0; i < ctx->updates_pending->len; i ++) {
-               io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i);
+       for (i = 0; i < cbdata->updates_pending->len; i ++) {
+               io_cmd = &g_array_index (cbdata->updates_pending,
+                               struct fuzzy_peer_cmd, i);
 
                if (io_cmd->is_shingle) {
                        len += sizeof (guint32) + sizeof (guint32) +
@@ -361,8 +363,8 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
        reply = rspamd_fstring_append (reply, (const char *)&rev32,
                        sizeof (rev32));
 
-       for (i = 0; i < ctx->updates_pending->len; i ++) {
-               io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i);
+       for (i = 0; i < cbdata->updates_pending->len; i ++) {
+               io_cmd = &g_array_index (cbdata->updates_pending, struct fuzzy_peer_cmd, i);
 
                if (io_cmd->is_shingle) {
                        len = sizeof (guint32) +
@@ -389,13 +391,17 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
                        conn->sock,
                        &tv, ctx->ev_base);
        msg_info ("send update request to %s", m->name);
+
+       g_array_free (cbdata->updates_pending, TRUE);
+       g_free (cbdata);
 }
 
 static void
 fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m,
-               struct fuzzy_slave_connection *conn,
-               struct rspamd_fuzzy_storage_ctx *ctx,
-               struct rspamd_http_message *msg)
+                                                         struct fuzzy_slave_connection *conn,
+                                                         struct rspamd_fuzzy_storage_ctx *ctx,
+                                                         struct rspamd_http_message *msg,
+                                                         GArray *updates)
 {
 
        struct rspamd_fuzzy_updates_cbdata *cbdata;
@@ -405,6 +411,10 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m,
        cbdata->msg = msg;
        cbdata->conn = conn;
        cbdata->m = m;
+       /* Copy queue */
+       cbdata->updates_pending = g_array_sized_new (FALSE, FALSE,
+                       sizeof (struct fuzzy_peer_cmd), updates->len);
+       g_array_append_vals (cbdata->updates_pending, updates->data, updates->len);
        rspamd_fuzzy_backend_version (ctx->backend, local_db_name,
                        fuzzy_mirror_updates_version_cb, cbdata);
 }
@@ -436,7 +446,7 @@ fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
 
 static void
 rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
-               struct rspamd_fuzzy_mirror *m)
+               struct rspamd_fuzzy_mirror *m, GArray *updates)
 {
        struct fuzzy_slave_connection *conn;
        struct rspamd_http_message *msg;
@@ -475,10 +485,11 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
        rspamd_http_connection_set_key (conn->http_conn,
                        ctx->sync_keypair);
        msg->peer_key = rspamd_pubkey_ref (m->key);
-       fuzzy_mirror_updates_to_http (m, conn, ctx, msg);
+       fuzzy_mirror_updates_to_http (m, conn, ctx, msg, updates);
 }
 
 struct rspamd_updates_cbdata {
+       GArray *updates_pending;
        struct rspamd_fuzzy_storage_ctx *ctx;
        gchar *source;
 };
@@ -533,16 +544,17 @@ rspamd_fuzzy_updates_cb (gboolean success,
                        for (i = 0; i < ctx->mirrors->len; i ++) {
                                m = g_ptr_array_index (ctx->mirrors, i);
 
-                               rspamd_fuzzy_send_update_mirror (ctx, m);
+                               rspamd_fuzzy_send_update_mirror (ctx, m,
+                                               cbdata->updates_pending);
                        }
                }
 
                msg_info ("successfully updated fuzzy storage: %d updates in queue; "
+                       "%d pending currently; "
                        "%d added, %d deleted, %d extended, %d duplicates",
+                               cbdata->updates_pending->len,
                                ctx->updates_pending->len,
                                nadded, ndeleted, nextended, nignored);
-               /* Clear updates */
-               ctx->updates_pending->len = 0;
                rspamd_fuzzy_backend_version (ctx->backend, source,
                                fuzzy_update_version_callback, g_strdup (source));
                ctx->updates_failed = 0;
@@ -551,16 +563,21 @@ rspamd_fuzzy_updates_cb (gboolean success,
                if (++ctx->updates_failed > ctx->updates_maxfail) {
                        msg_err ("cannot commit update transaction to fuzzy backend, discard "
                                        "%ud updates after %d retries",
-                                       ctx->updates_pending->len,
+                                       cbdata->updates_pending->len,
                                        ctx->updates_maxfail);
                        ctx->updates_failed = 0;
-                       ctx->updates_pending->len = 0;
                }
                else {
                        msg_err ("cannot commit update transaction to fuzzy backend, "
-                                       "%ud updates are still pending, %d updates left",
+                                        "%ud updates are still left; %ud currently pending;"
+                                        " %d updates left",
+                                       cbdata->updates_pending->len,
                                        ctx->updates_pending->len,
                                        ctx->updates_maxfail - ctx->updates_failed);
+                       /* Move the remaining updates to ctx queue */
+                       g_array_append_vals (ctx->updates_pending,
+                                       cbdata->updates_pending->data,
+                                       cbdata->updates_pending->len);
                }
        }
 
@@ -574,6 +591,7 @@ rspamd_fuzzy_updates_cb (gboolean success,
                event_base_loopexit (ctx->ev_base, &tv);
        }
 
+       g_array_free (cbdata->updates_pending, TRUE);
        g_free (cbdata->source);
        g_free (cbdata);
 }
@@ -588,8 +606,13 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
        if ((forced ||ctx->updates_pending->len > 0)) {
                cbdata = g_malloc (sizeof (*cbdata));
                cbdata->ctx = ctx;
+               cbdata->updates_pending = ctx->updates_pending;
+               ctx->updates_pending = g_array_sized_new (FALSE, FALSE,
+                               sizeof (struct fuzzy_peer_cmd),
+                               MAX (ctx->updates_pending->len, 1024));
                cbdata->source = g_strdup (source);
-               rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending,
+               rspamd_fuzzy_backend_process_updates (ctx->backend,
+                               cbdata->updates_pending,
                                source, rspamd_fuzzy_updates_cb, cbdata);
        }
 }