]> source.dussan.org Git - rspamd.git/commitdiff
[Minor] Another try to fix races in redis stats
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 19 Oct 2019 08:56:12 +0000 (09:56 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 19 Oct 2019 08:56:12 +0000 (09:56 +0100)
Issue: #3088

src/libstat/backends/redis_backend.c
src/libstat/stat_config.c

index fd31d287a8a8ef0181f9180230c5c422ec425fe6..9118e3fc4193a674bc95c3090f8aaeb7fc8d3602 100644 (file)
@@ -751,12 +751,15 @@ rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata)
 static void
 rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
 {
-       struct rspamd_redis_stat_cbdata *cbdata = priv;
+       struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
+       struct rspamd_redis_stat_cbdata *cbdata;
        redisReply *reply = r;
        ucl_object_t *obj;
        gulong num = 0;
 
-       if (cbdata->wanna_die) {
+       cbdata = redis_elt->cbdata;
+
+       if (cbdata == NULL || cbdata->wanna_die) {
                return;
        }
 
@@ -778,6 +781,7 @@ rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
 
        if (cbdata->inflight == 0) {
                rspamd_redis_async_cbdata_cleanup (cbdata);
+               redis_elt->cbdata = NULL;
        }
 }
 
@@ -785,12 +789,15 @@ rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
 static void
 rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
 {
-       struct rspamd_redis_stat_cbdata *cbdata = priv;
+       struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
+       struct rspamd_redis_stat_cbdata *cbdata;
        redisReply *reply = r;
        ucl_object_t *obj;
        glong num = 0;
 
-       if (cbdata->wanna_die) {
+       cbdata = redis_elt->cbdata;
+
+       if (cbdata == NULL || cbdata->wanna_die) {
                return;
        }
 
@@ -829,6 +836,7 @@ rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
 
        if (cbdata->inflight == 0) {
                rspamd_redis_async_cbdata_cleanup (cbdata);
+               redis_elt->cbdata = NULL;
        }
 }
 
@@ -836,13 +844,15 @@ rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
 static void
 rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
 {
-       struct rspamd_redis_stat_cbdata *cbdata = priv;
+       struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
+       struct rspamd_redis_stat_cbdata *cbdata;
        redisReply *reply = r, *elt;
        gchar **pk, *k;
        guint i, processed = 0;
 
+       cbdata = redis_elt->cbdata;
 
-       if (cbdata->wanna_die) {
+       if (cbdata == NULL || cbdata->wanna_die) {
                return;
        }
 
@@ -879,7 +889,7 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
                                                        }
                                                        redisAsyncCommand (cbdata->redis,
                                                                        rspamd_redis_stat_learns,
-                                                                       cbdata,
+                                                                       redis_elt,
                                                                        "HGET %s %s",
                                                                        k, learned_key);
                                                        cbdata->inflight += 1;
@@ -887,12 +897,12 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
                                                else {
                                                        redisAsyncCommand (cbdata->redis,
                                                                        rspamd_redis_stat_key,
-                                                                       cbdata,
+                                                                       redis_elt,
                                                                        "HLEN %s",
                                                                        k);
                                                        redisAsyncCommand (cbdata->redis,
                                                                        rspamd_redis_stat_learns,
-                                                                       cbdata,
+                                                                       redis_elt,
                                                                        "HGET %s %s",
                                                                        k, learned_key);
                                                        cbdata->inflight += 2;
@@ -925,6 +935,7 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
 
                if (cbdata->inflight == 0) {
                        rspamd_redis_async_cbdata_cleanup (cbdata);
+                       redis_elt->cbdata = NULL;
                }
        }
        else {
@@ -937,6 +948,7 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
 
                rspamd_upstream_fail (cbdata->selected, FALSE);
                rspamd_redis_async_cbdata_cleanup (cbdata);
+               redis_elt->cbdata = NULL;
        }
 }
 
@@ -948,6 +960,8 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
        struct rspamd_redis_stat_cbdata *cbdata;
        rspamd_inet_addr_t *addr;
        struct upstream_list *ups;
+       redisAsyncContext *redis_ctx;
+       struct upstream *selected;
 
        g_assert (redis_elt != NULL);
 
@@ -956,6 +970,7 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
        if (redis_elt->cbdata) {
                /* We have some other process pending */
                rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
+               redis_elt->cbdata = NULL;
        }
 
        /* Disable further events unless needed */
@@ -967,29 +982,35 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
                return;
        }
 
-       cbdata = g_malloc0 (sizeof (*cbdata));
-
-       cbdata->selected = rspamd_upstream_get (ups,
+       selected = rspamd_upstream_get (ups,
                                        RSPAMD_UPSTREAM_ROUND_ROBIN,
                                        NULL,
                                        0);
 
-       g_assert (cbdata->selected != NULL);
-       addr = rspamd_upstream_addr_next (cbdata->selected);
+       g_assert (selected != NULL);
+       addr = rspamd_upstream_addr_next (selected);
        g_assert (addr != NULL);
 
        if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
-               cbdata->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
+               redis_ctx = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
        }
        else {
-               cbdata->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
+               redis_ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr),
                                rspamd_inet_address_get_port (addr));
        }
 
-       g_assert (cbdata->redis != NULL);
+       if (redis_ctx == NULL) {
+               msg_warn ("cannot connect to redis server %s: %s",
+                               rspamd_inet_address_to_string_pretty (addr),
+                               strerror (errno));
 
-       redisLibevAttach (redis_elt->event_loop, cbdata->redis);
+               return;
+       }
 
+       redisLibevAttach (redis_elt->event_loop, redis_ctx);
+       cbdata = g_malloc0 (sizeof (*cbdata));
+       cbdata->redis = redis_ctx;
+       cbdata->selected = selected;
        cbdata->inflight = 1;
        cbdata->cur = ucl_object_typed_new (UCL_OBJECT);
        cbdata->elt = redis_elt;
@@ -999,7 +1020,7 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
        /* XXX: deal with timeouts maybe */
        /* Get keys in redis that match our symbol */
        rspamd_redis_maybe_auth (ctx, cbdata->redis);
-       redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, cbdata,
+       redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, redis_elt,
                        "SMEMBERS %s_keys",
                        ctx->stcf->symbol);
 }
@@ -1010,6 +1031,7 @@ rspamd_redis_async_stat_fin (struct rspamd_stat_async_elt *elt, gpointer d)
        struct rspamd_redis_stat_elt *redis_elt = elt->ud;
 
        rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
+       redis_elt->cbdata = NULL;
 }
 
 /* Called on connection termination */
index bc4c28b5dd52a7d255f76cbbcfd1502a24fb5d26..17d0fdcc70d204beaf915209280c37e759eb588b 100644 (file)
@@ -559,7 +559,8 @@ rspamd_stat_ctx_register_async (rspamd_stat_async_handler handler,
                 * fast as possible
                 */
                elt->timer_ev.data = elt;
-               ev_timer_init (&elt->timer_ev, rspamd_async_elt_on_timer, 0.0, 0.0);
+               ev_timer_init (&elt->timer_ev, rspamd_async_elt_on_timer,
+                               0.1, 0.0);
                ev_timer_start (st_ctx->event_loop, &elt->timer_ev);
        }
        else {