aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-09-05 16:57:26 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-09-05 16:57:26 +0100
commitaaf486034f4c95b63d7cd8f8dc02af66d515b5cb (patch)
tree810a3a11ff45f688896e0c2bec79cc9faa137483
parent0b46aefebed805c202b9c44accd6b457744c4c18 (diff)
downloadrspamd-aaf486034f4c95b63d7cd8f8dc02af66d515b5cb.tar.gz
rspamd-aaf486034f4c95b63d7cd8f8dc02af66d515b5cb.zip
[Fix] Adopt fuzzy storage for flexible backends
-rw-r--r--src/fuzzy_storage.c33
1 files changed, 25 insertions, 8 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index c49a9f277..bdb5f4aa4 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -135,6 +135,7 @@ struct rspamd_fuzzy_storage_ctx {
GQueue *updates_pending;
struct rspamd_dns_resolver *resolver;
struct rspamd_config *cfg;
+ struct rspamd_worker *worker;
};
enum fuzzy_cmd_type {
@@ -491,6 +492,16 @@ rspamd_fuzzy_updates_cb (gboolean success, void *ud)
g_queue_get_length (ctx->updates_pending));
}
+ if (ctx->worker->wanna_die) {
+ /* Plan exit */
+ struct timeval tv;
+
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+
+ event_base_loopexit (ctx->ev_base, &tv);
+ }
+
g_slice_free1 (sizeof (*cbdata), cbdata);
}
@@ -508,14 +519,6 @@ rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx,
cbdata->source = source;
rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending,
source, rspamd_fuzzy_updates_cb, cbdata);
-
-
- }
- else if (ctx->updates_pending &&
- g_queue_get_length (ctx->updates_pending) > 0) {
- msg_err ("cannot start transaction in fuzzy backend, "
- "%ud updates are still pending",
- g_queue_get_length (ctx->updates_pending));
}
}
@@ -2252,8 +2255,17 @@ start_fuzzy (struct rspamd_worker *worker)
"fuzzy",
NULL);
ctx->peer_fd = -1;
+ ctx->worker = worker;
double_to_tv (ctx->master_timeout, &ctx->master_io_tv);
+ ctx->resolver = dns_resolver_init (worker->srv->logger,
+ ctx->ev_base,
+ worker->srv->cfg);
+ rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
+ ctx->ev_base, ctx->resolver->r);
+ rspamd_redis_pool_config (worker->srv->cfg->redis_pool,
+ worker->srv->cfg, ctx->ev_base);
+
/*
* Open DB and perform VACUUM
*/
@@ -2331,6 +2343,11 @@ start_fuzzy (struct rspamd_worker *worker)
event_base_loop (ctx->ev_base, 0);
rspamd_worker_block_signals ();
+ if (worker->index == 0 && g_queue_get_length (ctx->updates_pending) > 0) {
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+ event_base_loop (ctx->ev_base, 0);
+ }
+
rspamd_fuzzy_backend_close (ctx->backend);
rspamd_log_close (worker->srv->logger);