aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/libstat/backends/redis_backend.c60
-rw-r--r--src/libstat/stat_config.c3
2 files changed, 43 insertions, 20 deletions
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c
index fd31d287a..9118e3fc4 100644
--- a/src/libstat/backends/redis_backend.c
+++ b/src/libstat/backends/redis_backend.c
@@ -751,12 +751,15 @@ rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata)
static void
rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
{
- struct rspamd_redis_stat_cbdata *cbdata = priv;
+ struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
+ struct rspamd_redis_stat_cbdata *cbdata;
redisReply *reply = r;
ucl_object_t *obj;
gulong num = 0;
- if (cbdata->wanna_die) {
+ cbdata = redis_elt->cbdata;
+
+ if (cbdata == NULL || cbdata->wanna_die) {
return;
}
@@ -778,6 +781,7 @@ rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
if (cbdata->inflight == 0) {
rspamd_redis_async_cbdata_cleanup (cbdata);
+ redis_elt->cbdata = NULL;
}
}
@@ -785,12 +789,15 @@ rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
static void
rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
{
- struct rspamd_redis_stat_cbdata *cbdata = priv;
+ struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
+ struct rspamd_redis_stat_cbdata *cbdata;
redisReply *reply = r;
ucl_object_t *obj;
glong num = 0;
- if (cbdata->wanna_die) {
+ cbdata = redis_elt->cbdata;
+
+ if (cbdata == NULL || cbdata->wanna_die) {
return;
}
@@ -829,6 +836,7 @@ rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
if (cbdata->inflight == 0) {
rspamd_redis_async_cbdata_cleanup (cbdata);
+ redis_elt->cbdata = NULL;
}
}
@@ -836,13 +844,15 @@ rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
static void
rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
{
- struct rspamd_redis_stat_cbdata *cbdata = priv;
+ struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
+ struct rspamd_redis_stat_cbdata *cbdata;
redisReply *reply = r, *elt;
gchar **pk, *k;
guint i, processed = 0;
+ cbdata = redis_elt->cbdata;
- if (cbdata->wanna_die) {
+ if (cbdata == NULL || cbdata->wanna_die) {
return;
}
@@ -879,7 +889,7 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
}
redisAsyncCommand (cbdata->redis,
rspamd_redis_stat_learns,
- cbdata,
+ redis_elt,
"HGET %s %s",
k, learned_key);
cbdata->inflight += 1;
@@ -887,12 +897,12 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
else {
redisAsyncCommand (cbdata->redis,
rspamd_redis_stat_key,
- cbdata,
+ redis_elt,
"HLEN %s",
k);
redisAsyncCommand (cbdata->redis,
rspamd_redis_stat_learns,
- cbdata,
+ redis_elt,
"HGET %s %s",
k, learned_key);
cbdata->inflight += 2;
@@ -925,6 +935,7 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
if (cbdata->inflight == 0) {
rspamd_redis_async_cbdata_cleanup (cbdata);
+ redis_elt->cbdata = NULL;
}
}
else {
@@ -937,6 +948,7 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
rspamd_upstream_fail (cbdata->selected, FALSE);
rspamd_redis_async_cbdata_cleanup (cbdata);
+ redis_elt->cbdata = NULL;
}
}
@@ -948,6 +960,8 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
struct rspamd_redis_stat_cbdata *cbdata;
rspamd_inet_addr_t *addr;
struct upstream_list *ups;
+ redisAsyncContext *redis_ctx;
+ struct upstream *selected;
g_assert (redis_elt != NULL);
@@ -956,6 +970,7 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
if (redis_elt->cbdata) {
/* We have some other process pending */
rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
+ redis_elt->cbdata = NULL;
}
/* Disable further events unless needed */
@@ -967,29 +982,35 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
return;
}
- cbdata = g_malloc0 (sizeof (*cbdata));
-
- cbdata->selected = rspamd_upstream_get (ups,
+ selected = rspamd_upstream_get (ups,
RSPAMD_UPSTREAM_ROUND_ROBIN,
NULL,
0);
- g_assert (cbdata->selected != NULL);
- addr = rspamd_upstream_addr_next (cbdata->selected);
+ g_assert (selected != NULL);
+ addr = rspamd_upstream_addr_next (selected);
g_assert (addr != NULL);
if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
- cbdata->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
+ redis_ctx = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
}
else {
- cbdata->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
+ redis_ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr),
rspamd_inet_address_get_port (addr));
}
- g_assert (cbdata->redis != NULL);
+ if (redis_ctx == NULL) {
+ msg_warn ("cannot connect to redis server %s: %s",
+ rspamd_inet_address_to_string_pretty (addr),
+ strerror (errno));
- redisLibevAttach (redis_elt->event_loop, cbdata->redis);
+ return;
+ }
+ redisLibevAttach (redis_elt->event_loop, redis_ctx);
+ cbdata = g_malloc0 (sizeof (*cbdata));
+ cbdata->redis = redis_ctx;
+ cbdata->selected = selected;
cbdata->inflight = 1;
cbdata->cur = ucl_object_typed_new (UCL_OBJECT);
cbdata->elt = redis_elt;
@@ -999,7 +1020,7 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
/* XXX: deal with timeouts maybe */
/* Get keys in redis that match our symbol */
rspamd_redis_maybe_auth (ctx, cbdata->redis);
- redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, cbdata,
+ redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, redis_elt,
"SMEMBERS %s_keys",
ctx->stcf->symbol);
}
@@ -1010,6 +1031,7 @@ rspamd_redis_async_stat_fin (struct rspamd_stat_async_elt *elt, gpointer d)
struct rspamd_redis_stat_elt *redis_elt = elt->ud;
rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
+ redis_elt->cbdata = NULL;
}
/* Called on connection termination */
diff --git a/src/libstat/stat_config.c b/src/libstat/stat_config.c
index bc4c28b5d..17d0fdcc7 100644
--- a/src/libstat/stat_config.c
+++ b/src/libstat/stat_config.c
@@ -559,7 +559,8 @@ rspamd_stat_ctx_register_async (rspamd_stat_async_handler handler,
* fast as possible
*/
elt->timer_ev.data = elt;
- ev_timer_init (&elt->timer_ev, rspamd_async_elt_on_timer, 0.0, 0.0);
+ ev_timer_init (&elt->timer_ev, rspamd_async_elt_on_timer,
+ 0.1, 0.0);
ev_timer_start (st_ctx->event_loop, &elt->timer_ev);
}
else {