struct rspamd_http_message *msg;
struct fuzzy_slave_connection *conn;
struct rspamd_fuzzy_mirror *m;
+ GArray *updates_pending;
};
static void
ctx = cbdata->ctx;
msg = cbdata->msg;
m = cbdata->m;
- g_free (cbdata);
+
rev32 = GUINT32_TO_LE (rev32);
len = sizeof (guint32) * 2; /* revision + last chunk */
- for (i = 0; i < ctx->updates_pending->len; i ++) {
- io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i);
+ for (i = 0; i < cbdata->updates_pending->len; i ++) {
+ io_cmd = &g_array_index (cbdata->updates_pending,
+ struct fuzzy_peer_cmd, i);
if (io_cmd->is_shingle) {
len += sizeof (guint32) + sizeof (guint32) +
reply = rspamd_fstring_append (reply, (const char *)&rev32,
sizeof (rev32));
- for (i = 0; i < ctx->updates_pending->len; i ++) {
- io_cmd = &g_array_index (ctx->updates_pending, struct fuzzy_peer_cmd, i);
+ for (i = 0; i < cbdata->updates_pending->len; i ++) {
+ io_cmd = &g_array_index (cbdata->updates_pending, struct fuzzy_peer_cmd, i);
if (io_cmd->is_shingle) {
len = sizeof (guint32) +
conn->sock,
&tv, ctx->ev_base);
msg_info ("send update request to %s", m->name);
+
+ g_array_free (cbdata->updates_pending, TRUE);
+ g_free (cbdata);
}
static void
fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m,
- struct fuzzy_slave_connection *conn,
- struct rspamd_fuzzy_storage_ctx *ctx,
- struct rspamd_http_message *msg)
+ struct fuzzy_slave_connection *conn,
+ struct rspamd_fuzzy_storage_ctx *ctx,
+ struct rspamd_http_message *msg,
+ GArray *updates)
{
struct rspamd_fuzzy_updates_cbdata *cbdata;
cbdata->msg = msg;
cbdata->conn = conn;
cbdata->m = m;
+ /* Copy queue */
+ cbdata->updates_pending = g_array_sized_new (FALSE, FALSE,
+ sizeof (struct fuzzy_peer_cmd), updates->len);
+ g_array_append_vals (cbdata->updates_pending, updates->data, updates->len);
rspamd_fuzzy_backend_version (ctx->backend, local_db_name,
fuzzy_mirror_updates_version_cb, cbdata);
}
static void
rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
- struct rspamd_fuzzy_mirror *m)
+ struct rspamd_fuzzy_mirror *m, GArray *updates)
{
struct fuzzy_slave_connection *conn;
struct rspamd_http_message *msg;
rspamd_http_connection_set_key (conn->http_conn,
ctx->sync_keypair);
msg->peer_key = rspamd_pubkey_ref (m->key);
- fuzzy_mirror_updates_to_http (m, conn, ctx, msg);
+ fuzzy_mirror_updates_to_http (m, conn, ctx, msg, updates);
}
struct rspamd_updates_cbdata {
+ GArray *updates_pending;
struct rspamd_fuzzy_storage_ctx *ctx;
gchar *source;
};
for (i = 0; i < ctx->mirrors->len; i ++) {
m = g_ptr_array_index (ctx->mirrors, i);
- rspamd_fuzzy_send_update_mirror (ctx, m);
+ rspamd_fuzzy_send_update_mirror (ctx, m,
+ cbdata->updates_pending);
}
}
msg_info ("successfully updated fuzzy storage: %d updates in queue; "
+ "%d pending currently; "
"%d added, %d deleted, %d extended, %d duplicates",
+ cbdata->updates_pending->len,
ctx->updates_pending->len,
nadded, ndeleted, nextended, nignored);
- /* Clear updates */
- ctx->updates_pending->len = 0;
rspamd_fuzzy_backend_version (ctx->backend, source,
fuzzy_update_version_callback, g_strdup (source));
ctx->updates_failed = 0;
if (++ctx->updates_failed > ctx->updates_maxfail) {
msg_err ("cannot commit update transaction to fuzzy backend, discard "
"%ud updates after %d retries",
- ctx->updates_pending->len,
+ cbdata->updates_pending->len,
ctx->updates_maxfail);
ctx->updates_failed = 0;
- ctx->updates_pending->len = 0;
}
else {
msg_err ("cannot commit update transaction to fuzzy backend, "
- "%ud updates are still pending, %d updates left",
+ "%ud updates are still left; %ud currently pending;"
+ " %d updates left",
+ cbdata->updates_pending->len,
ctx->updates_pending->len,
ctx->updates_maxfail - ctx->updates_failed);
+ /* Move the remaining updates to ctx queue */
+ g_array_append_vals (ctx->updates_pending,
+ cbdata->updates_pending->data,
+ cbdata->updates_pending->len);
}
}
event_base_loopexit (ctx->ev_base, &tv);
}
+ g_array_free (cbdata->updates_pending, TRUE);
g_free (cbdata->source);
g_free (cbdata);
}
if ((forced ||ctx->updates_pending->len > 0)) {
cbdata = g_malloc (sizeof (*cbdata));
cbdata->ctx = ctx;
+ cbdata->updates_pending = ctx->updates_pending;
+ ctx->updates_pending = g_array_sized_new (FALSE, FALSE,
+ sizeof (struct fuzzy_peer_cmd),
+ MAX (ctx->updates_pending->len, 1024));
cbdata->source = g_strdup (source);
- rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending,
+ rspamd_fuzzy_backend_process_updates (ctx->backend,
+ cbdata->updates_pending,
source, rspamd_fuzzy_updates_cb, cbdata);
}
}