Browse Source

[Fix] Freeze updates queue when do actual storage update

tags/1.7.9
Vsevolod Stakhov 5 years ago
parent
commit
f0823abc00
1 changed files with 40 additions and 17 deletions
  1. 40
    17
      src/fuzzy_storage.c

+ 40
- 17
src/fuzzy_storage.c View File

@@ -319,6 +319,7 @@ struct rspamd_fuzzy_updates_cbdata {
struct rspamd_http_message *msg;
struct fuzzy_slave_connection *conn;
struct rspamd_fuzzy_mirror *m;
GArray *updates_pending;
};

static void
@@ -340,12 +341,13 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
ctx = cbdata->ctx;
msg = cbdata->msg;
m = cbdata->m;
g_free (cbdata);
rev32 = GUINT32_TO_LE (rev32);
len = sizeof (guint32) * 2; /* revision + last chunk */

for (i = 0; i < ctx->updates_pending->len; i ++) {
io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i);
for (i = 0; i < cbdata->updates_pending->len; i ++) {
io_cmd = &g_array_index (cbdata->updates_pending,
struct fuzzy_peer_cmd, i);

if (io_cmd->is_shingle) {
len += sizeof (guint32) + sizeof (guint32) +
@@ -361,8 +363,8 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
reply = rspamd_fstring_append (reply, (const char *)&rev32,
sizeof (rev32));

for (i = 0; i < ctx->updates_pending->len; i ++) {
io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i);
for (i = 0; i < cbdata->updates_pending->len; i ++) {
io_cmd = &g_array_index (cbdata->updates_pending, struct fuzzy_peer_cmd, i);

if (io_cmd->is_shingle) {
len = sizeof (guint32) +
@@ -389,13 +391,17 @@ fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud)
conn->sock,
&tv, ctx->ev_base);
msg_info ("send update request to %s", m->name);

g_array_free (cbdata->updates_pending, TRUE);
g_free (cbdata);
}

static void
fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m,
struct fuzzy_slave_connection *conn,
struct rspamd_fuzzy_storage_ctx *ctx,
struct rspamd_http_message *msg)
struct fuzzy_slave_connection *conn,
struct rspamd_fuzzy_storage_ctx *ctx,
struct rspamd_http_message *msg,
GArray *updates)
{

struct rspamd_fuzzy_updates_cbdata *cbdata;
@@ -405,6 +411,10 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m,
cbdata->msg = msg;
cbdata->conn = conn;
cbdata->m = m;
/* Copy queue */
cbdata->updates_pending = g_array_sized_new (FALSE, FALSE,
sizeof (struct fuzzy_peer_cmd), updates->len);
g_array_append_vals (cbdata->updates_pending, updates->data, updates->len);
rspamd_fuzzy_backend_version (ctx->backend, local_db_name,
fuzzy_mirror_updates_version_cb, cbdata);
}
@@ -436,7 +446,7 @@ fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn,

static void
rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
struct rspamd_fuzzy_mirror *m)
struct rspamd_fuzzy_mirror *m, GArray *updates)
{
struct fuzzy_slave_connection *conn;
struct rspamd_http_message *msg;
@@ -475,10 +485,11 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
rspamd_http_connection_set_key (conn->http_conn,
ctx->sync_keypair);
msg->peer_key = rspamd_pubkey_ref (m->key);
fuzzy_mirror_updates_to_http (m, conn, ctx, msg);
fuzzy_mirror_updates_to_http (m, conn, ctx, msg, updates);
}

struct rspamd_updates_cbdata {
GArray *updates_pending;
struct rspamd_fuzzy_storage_ctx *ctx;
gchar *source;
};
@@ -533,16 +544,17 @@ rspamd_fuzzy_updates_cb (gboolean success,
for (i = 0; i < ctx->mirrors->len; i ++) {
m = g_ptr_array_index (ctx->mirrors, i);

rspamd_fuzzy_send_update_mirror (ctx, m);
rspamd_fuzzy_send_update_mirror (ctx, m,
cbdata->updates_pending);
}
}

msg_info ("successfully updated fuzzy storage: %d updates in queue; "
"%d pending currently; "
"%d added, %d deleted, %d extended, %d duplicates",
cbdata->updates_pending->len,
ctx->updates_pending->len,
nadded, ndeleted, nextended, nignored);
/* Clear updates */
ctx->updates_pending->len = 0;
rspamd_fuzzy_backend_version (ctx->backend, source,
fuzzy_update_version_callback, g_strdup (source));
ctx->updates_failed = 0;
@@ -551,16 +563,21 @@ rspamd_fuzzy_updates_cb (gboolean success,
if (++ctx->updates_failed > ctx->updates_maxfail) {
msg_err ("cannot commit update transaction to fuzzy backend, discard "
"%ud updates after %d retries",
ctx->updates_pending->len,
cbdata->updates_pending->len,
ctx->updates_maxfail);
ctx->updates_failed = 0;
ctx->updates_pending->len = 0;
}
else {
msg_err ("cannot commit update transaction to fuzzy backend, "
"%ud updates are still pending, %d updates left",
"%ud updates are still left; %ud currently pending;"
" %d updates left",
cbdata->updates_pending->len,
ctx->updates_pending->len,
ctx->updates_maxfail - ctx->updates_failed);
/* Move the remaining updates to ctx queue */
g_array_append_vals (ctx->updates_pending,
cbdata->updates_pending->data,
cbdata->updates_pending->len);
}
}

@@ -574,6 +591,7 @@ rspamd_fuzzy_updates_cb (gboolean success,
event_base_loopexit (ctx->ev_base, &tv);
}

g_array_free (cbdata->updates_pending, TRUE);
g_free (cbdata->source);
g_free (cbdata);
}
@@ -588,8 +606,13 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
if ((forced ||ctx->updates_pending->len > 0)) {
cbdata = g_malloc (sizeof (*cbdata));
cbdata->ctx = ctx;
cbdata->updates_pending = ctx->updates_pending;
ctx->updates_pending = g_array_sized_new (FALSE, FALSE,
sizeof (struct fuzzy_peer_cmd),
MAX (ctx->updates_pending->len, 1024));
cbdata->source = g_strdup (source);
rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending,
rspamd_fuzzy_backend_process_updates (ctx->backend,
cbdata->updates_pending,
source, rspamd_fuzzy_updates_cb, cbdata);
}
}

Loading…
Cancel
Save