diff options
-rw-r--r-- | src/libstat/backends/redis_backend.c | 60 | ||||
-rw-r--r-- | src/libstat/stat_config.c | 3 |
2 files changed, 43 insertions, 20 deletions
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c index fd31d287a..9118e3fc4 100644 --- a/src/libstat/backends/redis_backend.c +++ b/src/libstat/backends/redis_backend.c @@ -751,12 +751,15 @@ rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata) static void rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv) { - struct rspamd_redis_stat_cbdata *cbdata = priv; + struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv; + struct rspamd_redis_stat_cbdata *cbdata; redisReply *reply = r; ucl_object_t *obj; gulong num = 0; - if (cbdata->wanna_die) { + cbdata = redis_elt->cbdata; + + if (cbdata == NULL || cbdata->wanna_die) { return; } @@ -778,6 +781,7 @@ rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv) if (cbdata->inflight == 0) { rspamd_redis_async_cbdata_cleanup (cbdata); + redis_elt->cbdata = NULL; } } @@ -785,12 +789,15 @@ rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv) static void rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv) { - struct rspamd_redis_stat_cbdata *cbdata = priv; + struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv; + struct rspamd_redis_stat_cbdata *cbdata; redisReply *reply = r; ucl_object_t *obj; glong num = 0; - if (cbdata->wanna_die) { + cbdata = redis_elt->cbdata; + + if (cbdata == NULL || cbdata->wanna_die) { return; } @@ -829,6 +836,7 @@ rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv) if (cbdata->inflight == 0) { rspamd_redis_async_cbdata_cleanup (cbdata); + redis_elt->cbdata = NULL; } } @@ -836,13 +844,15 @@ rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv) static void rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv) { - struct rspamd_redis_stat_cbdata *cbdata = priv; + struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv; + struct rspamd_redis_stat_cbdata *cbdata; redisReply *reply = r, *elt; gchar **pk, *k; guint i, processed = 0; + cbdata = redis_elt->cbdata; - if (cbdata->wanna_die) { + if (cbdata == NULL || cbdata->wanna_die) { return; } @@ -879,7 +889,7 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv) } redisAsyncCommand (cbdata->redis, rspamd_redis_stat_learns, - cbdata, + redis_elt, "HGET %s %s", k, learned_key); cbdata->inflight += 1; @@ -887,12 +897,12 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv) else { redisAsyncCommand (cbdata->redis, rspamd_redis_stat_key, - cbdata, + redis_elt, "HLEN %s", k); redisAsyncCommand (cbdata->redis, rspamd_redis_stat_learns, - cbdata, + redis_elt, "HGET %s %s", k, learned_key); cbdata->inflight += 2; @@ -925,6 +935,7 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv) if (cbdata->inflight == 0) { rspamd_redis_async_cbdata_cleanup (cbdata); + redis_elt->cbdata = NULL; } } else { @@ -937,6 +948,7 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv) rspamd_upstream_fail (cbdata->selected, FALSE); rspamd_redis_async_cbdata_cleanup (cbdata); + redis_elt->cbdata = NULL; } } @@ -948,6 +960,8 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d) struct rspamd_redis_stat_cbdata *cbdata; rspamd_inet_addr_t *addr; struct upstream_list *ups; + redisAsyncContext *redis_ctx; + struct upstream *selected; g_assert (redis_elt != NULL); @@ -956,6 +970,7 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d) if (redis_elt->cbdata) { /* We have some other process pending */ rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata); + redis_elt->cbdata = NULL; } /* Disable further events unless needed */ @@ -967,29 +982,35 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d) return; } - cbdata = g_malloc0 (sizeof (*cbdata)); - - cbdata->selected = rspamd_upstream_get (ups, + selected = rspamd_upstream_get (ups, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); - g_assert (cbdata->selected != NULL); - addr = rspamd_upstream_addr_next (cbdata->selected); + g_assert (selected != NULL); + addr = rspamd_upstream_addr_next (selected); g_assert (addr != NULL); if (rspamd_inet_address_get_af (addr) == AF_UNIX) { - cbdata->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr)); + redis_ctx = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr)); } else { - cbdata->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr), + redis_ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr), rspamd_inet_address_get_port (addr)); } - g_assert (cbdata->redis != NULL); + if (redis_ctx == NULL) { + msg_warn ("cannot connect to redis server %s: %s", + rspamd_inet_address_to_string_pretty (addr), + strerror (errno)); - redisLibevAttach (redis_elt->event_loop, cbdata->redis); + return; + } + redisLibevAttach (redis_elt->event_loop, redis_ctx); + cbdata = g_malloc0 (sizeof (*cbdata)); + cbdata->redis = redis_ctx; + cbdata->selected = selected; cbdata->inflight = 1; cbdata->cur = ucl_object_typed_new (UCL_OBJECT); cbdata->elt = redis_elt; @@ -999,7 +1020,7 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d) /* XXX: deal with timeouts maybe */ /* Get keys in redis that match our symbol */ rspamd_redis_maybe_auth (ctx, cbdata->redis); - redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, cbdata, + redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, redis_elt, "SMEMBERS %s_keys", ctx->stcf->symbol); } @@ -1010,6 +1031,7 @@ rspamd_redis_async_stat_fin (struct rspamd_stat_async_elt *elt, gpointer d) struct rspamd_redis_stat_elt *redis_elt = elt->ud; rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata); + redis_elt->cbdata = NULL; } /* Called on connection termination */ diff --git a/src/libstat/stat_config.c b/src/libstat/stat_config.c index bc4c28b5d..17d0fdcc7 100644 --- a/src/libstat/stat_config.c +++ b/src/libstat/stat_config.c @@ -559,7 +559,8 @@ rspamd_stat_ctx_register_async (rspamd_stat_async_handler handler, * fast as possible */ elt->timer_ev.data = elt; - ev_timer_init (&elt->timer_ev, rspamd_async_elt_on_timer, 0.0, 0.0); + ev_timer_init (&elt->timer_ev, rspamd_async_elt_on_timer, + 0.1, 0.0); ev_timer_start (st_ctx->event_loop, &elt->timer_ev); } else { |