aboutsummaryrefslogtreecommitdiffstats
path: root/src/fuzzy_storage.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-07-30 11:07:04 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-07-30 11:07:04 +0100
commitf0823abc0076d78ee492e0b2c8d1a1b7921adaec (patch)
tree789bf5d68eac4331aa809c9c47041502a2c89e95 /src/fuzzy_storage.c
parenta072aa6fb1b0c12303e4492b0a9cb350d45ff5cb (diff)
downloadrspamd-f0823abc0076d78ee492e0b2c8d1a1b7921adaec.tar.gz
rspamd-f0823abc0076d78ee492e0b2c8d1a1b7921adaec.zip
[Fix] Freeze updates queue when do actual storage update
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r--src/fuzzy_storage.c57
1 files changed, 40 insertions, 17 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index a0fa9d182..f248d928f 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -319,6 +319,7 @@ struct rspamd_fuzzy_updates_cbdata {
struct rspamd_http_message *msg;
struct fuzzy_slave_connection *conn;
struct rspamd_fuzzy_mirror *m;
+ GArray *updates_pending;
};
static void
@@ -340,12 +341,13 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
ctx = cbdata->ctx;
msg = cbdata->msg;
m = cbdata->m;
- g_free (cbdata);
+
rev32 = GUINT32_TO_LE (rev32);
len = sizeof (guint32) * 2; /* revision + last chunk */
- for (i = 0; i < ctx->updates_pending->len; i ++) {
- io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i);
+ for (i = 0; i < cbdata->updates_pending->len; i ++) {
+ io_cmd = &g_array_index (cbdata->updates_pending,
+ struct fuzzy_peer_cmd, i);
if (io_cmd->is_shingle) {
len += sizeof (guint32) + sizeof (guint32) +
@@ -361,8 +363,8 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
reply = rspamd_fstring_append (reply, (const char *)&rev32,
sizeof (rev32));
- for (i = 0; i < ctx->updates_pending->len; i ++) {
- io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i);
+ for (i = 0; i < cbdata->updates_pending->len; i ++) {
+ io_cmd = &g_array_index (cbdata->updates_pending, struct fuzzy_peer_cmd, i);
if (io_cmd->is_shingle) {
len = sizeof (guint32) +
@@ -389,13 +391,17 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
conn->sock,
&tv, ctx->ev_base);
msg_info ("send update request to %s", m->name);
+
+ g_array_free (cbdata->updates_pending, TRUE);
+ g_free (cbdata);
}
static void
fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m,
- struct fuzzy_slave_connection *conn,
- struct rspamd_fuzzy_storage_ctx *ctx,
- struct rspamd_http_message *msg)
+ struct fuzzy_slave_connection *conn,
+ struct rspamd_fuzzy_storage_ctx *ctx,
+ struct rspamd_http_message *msg,
+ GArray *updates)
{
struct rspamd_fuzzy_updates_cbdata *cbdata;
@@ -405,6 +411,10 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m,
cbdata->msg = msg;
cbdata->conn = conn;
cbdata->m = m;
+ /* Copy queue */
+ cbdata->updates_pending = g_array_sized_new (FALSE, FALSE,
+ sizeof (struct fuzzy_peer_cmd), updates->len);
+ g_array_append_vals (cbdata->updates_pending, updates->data, updates->len);
rspamd_fuzzy_backend_version (ctx->backend, local_db_name,
fuzzy_mirror_updates_version_cb, cbdata);
}
@@ -436,7 +446,7 @@ fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,
static void
rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
- struct rspamd_fuzzy_mirror *m)
+ struct rspamd_fuzzy_mirror *m, GArray *updates)
{
struct fuzzy_slave_connection *conn;
struct rspamd_http_message *msg;
@@ -475,10 +485,11 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
rspamd_http_connection_set_key (conn->http_conn,
ctx->sync_keypair);
msg->peer_key = rspamd_pubkey_ref (m->key);
- fuzzy_mirror_updates_to_http (m, conn, ctx, msg);
+ fuzzy_mirror_updates_to_http (m, conn, ctx, msg, updates);
}
struct rspamd_updates_cbdata {
+ GArray *updates_pending;
struct rspamd_fuzzy_storage_ctx *ctx;
gchar *source;
};
@@ -533,16 +544,17 @@ rspamd_fuzzy_updates_cb (gboolean success,
for (i = 0; i < ctx->mirrors->len; i ++) {
m = g_ptr_array_index (ctx->mirrors, i);
- rspamd_fuzzy_send_update_mirror (ctx, m);
+ rspamd_fuzzy_send_update_mirror (ctx, m,
+ cbdata->updates_pending);
}
}
msg_info ("successfully updated fuzzy storage: %d updates in queue; "
+ "%d pending currently; "
"%d added, %d deleted, %d extended, %d duplicates",
+ cbdata->updates_pending->len,
ctx->updates_pending->len,
nadded, ndeleted, nextended, nignored);
- /* Clear updates */
- ctx->updates_pending->len = 0;
rspamd_fuzzy_backend_version (ctx->backend, source,
fuzzy_update_version_callback, g_strdup (source));
ctx->updates_failed = 0;
@@ -551,16 +563,21 @@ rspamd_fuzzy_updates_cb (gboolean success,
if (++ctx->updates_failed > ctx->updates_maxfail) {
msg_err ("cannot commit update transaction to fuzzy backend, discard "
"%ud updates after %d retries",
- ctx->updates_pending->len,
+ cbdata->updates_pending->len,
ctx->updates_maxfail);
ctx->updates_failed = 0;
- ctx->updates_pending->len = 0;
}
else {
msg_err ("cannot commit update transaction to fuzzy backend, "
- "%ud updates are still pending, %d updates left",
+ "%ud updates are still left; %ud currently pending;"
+ " %d updates left",
+ cbdata->updates_pending->len,
ctx->updates_pending->len,
ctx->updates_maxfail - ctx->updates_failed);
+ /* Move the remaining updates to ctx queue */
+ g_array_append_vals (ctx->updates_pending,
+ cbdata->updates_pending->data,
+ cbdata->updates_pending->len);
}
}
@@ -574,6 +591,7 @@ rspamd_fuzzy_updates_cb (gboolean success,
event_base_loopexit (ctx->ev_base, &tv);
}
+ g_array_free (cbdata->updates_pending, TRUE);
g_free (cbdata->source);
g_free (cbdata);
}
@@ -588,8 +606,13 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
if ((forced ||ctx->updates_pending->len > 0)) {
cbdata = g_malloc (sizeof (*cbdata));
cbdata->ctx = ctx;
+ cbdata->updates_pending = ctx->updates_pending;
+ ctx->updates_pending = g_array_sized_new (FALSE, FALSE,
+ sizeof (struct fuzzy_peer_cmd),
+ MAX (ctx->updates_pending->len, 1024));
cbdata->source = g_strdup (source);
- rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending,
+ rspamd_fuzzy_backend_process_updates (ctx->backend,
+ cbdata->updates_pending,
source, rspamd_fuzzy_updates_cb, cbdata);
}
}