]> source.dussan.org Git - rspamd.git/commitdiff
[Fix] Stat_redis_backend: Fix memory leak and simplify learn path
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 9 Mar 2020 16:59:54 +0000 (16:59 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 9 Mar 2020 16:59:54 +0000 (16:59 +0000)
contrib/hiredis/async.c
src/libstat/backends/redis_backend.c
src/libstat/stat_process.c

index a508036e6ec87cdab99de44e3d12f0ef3665cc4a..8516762636cc6236730121c4059ff1c920e9794a 100644 (file)
@@ -483,6 +483,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
              * abort with an error, but simply ignore it because the client
              * doesn't know what the server will spit out over the wire. */
             c->reader->fn->freeObject(reply);
+                       /* Proceed with free'ing when redisAsyncFree() was called. */
+                       if (c->flags & REDIS_FREEING) {
+                               __redisAsyncFree(ac);
+                               return;
+                       }
         }
     }
 
index 5774b46a6b67eda07a79043e33212c5ebfa06d70..84bc0ba77965f91a61ee9018b9e0a894d8fa9022 100644 (file)
@@ -637,7 +637,6 @@ rspamd_redis_store_stat_signature (struct rspamd_task *task,
        guint i, blen, klen;
        rspamd_fstring_t *out;
 
-       out = rspamd_fstring_sized_new (1024);
        sig = rspamd_mempool_get_variable (task->task_pool,
                        RSPAMD_MEMPOOL_STAT_SIGNATURE);
 
@@ -646,11 +645,10 @@ rspamd_redis_store_stat_signature (struct rspamd_task *task,
                return;
        }
 
+       out = rspamd_fstring_sized_new (1024);
        klen = rspamd_snprintf (keybuf, sizeof (keybuf), "%s_%s_%s",
                        prefix, sig, rt->stcf->is_spam ? "S" : "H");
 
-       out->len = 0;
-
        /* Cleanup key */
        rspamd_printf_fstring (&out, ""
                                        "*2\r\n"
@@ -1069,7 +1067,12 @@ rspamd_redis_fin (gpointer data)
        struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
        redisAsyncContext *redis;
 
-       rt->has_event = FALSE;
+       if (rt->has_event) {
+               /* Should not happen ! */
+               msg_err ("FIXME: this code path should not be reached!");
+               rspamd_session_remove_event (rt->task->s, NULL, rt);
+               rt->has_event = FALSE;
+       }
        /* Stop timeout */
        if (ev_can_stop (&rt->timeout_event)) {
                ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
@@ -1086,29 +1089,9 @@ rspamd_redis_fin (gpointer data)
                /* This calls for all callbacks pending */
                redisAsyncFree (redis);
        }
-}
-
-static void
-rspamd_redis_fin_learn (gpointer data)
-{
-       struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
-       redisAsyncContext *redis;
-
-       rt->has_event = FALSE;
 
-       if (rt->tokens) {
-               g_ptr_array_unref (rt->tokens);
-               rt->tokens = NULL;
-       }
-
-       /* Stop timeout */
-       ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
-
-       if (rt->redis) {
-               redis = rt->redis;
-               rt->redis = NULL;
-               /* This calls for all callbacks pending */
-               redisAsyncFree (redis);
+       if (rt->err) {
+               g_error_free (rt->err);
        }
 }
 
@@ -1145,7 +1128,7 @@ rspamd_redis_timeout (EV_P_ ev_timer *w, int revents)
        }
        if (rt->has_event) {
                rt->has_event = FALSE;
-               rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+               rspamd_session_remove_event (task->s, NULL, rt);
        }
 }
 
@@ -1236,7 +1219,8 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
        }
 
        if (rt->has_event) {
-               rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+               rt->has_event = FALSE;
+               rspamd_session_remove_event (task->s, NULL, rt);
        }
 }
 
@@ -1363,7 +1347,8 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
        }
 
        if (final && rt->has_event) {
-               rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+               rt->has_event = FALSE;
+               rspamd_session_remove_event (task->s, NULL, rt);
        }
 }
 
@@ -1395,7 +1380,8 @@ rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv)
        }
 
        if (rt->has_event) {
-               rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt);
+               rt->has_event = FALSE;
+               rspamd_session_remove_event (task->s, NULL, rt);
        }
 }
 static void
@@ -1647,15 +1633,14 @@ rspamd_redis_runtime (struct rspamd_task *task,
 
        if (rspamd_redis_expand_object (ctx->redis_object, ctx, task,
                        &object_expanded) == 0) {
-               msg_err_task ("expansion for learning failed for symbol %s "
+               msg_err_task ("expansion for %s failed for symbol %s "
                                 "(maybe learning per user classifier with no user or recipient)",
+                                learn ? "learning" : "classifying",
                                 stcf->symbol);
                return NULL;
        }
 
        rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
-       rspamd_mempool_add_destructor (task->task_pool,
-                       rspamd_gerror_free_maybe, &rt->err);
        rt->selected = up;
        rt->task = task;
        rt->ctx = ctx;
@@ -1692,6 +1677,8 @@ rspamd_redis_runtime (struct rspamd_task *task,
        redisLibevAttach (task->event_loop, rt->redis);
        rspamd_redis_maybe_auth (ctx, rt->redis);
 
+       rspamd_mempool_add_destructor (task->task_pool, rspamd_redis_fin, rt);
+
        return rt;
 }
 
@@ -1738,7 +1725,7 @@ rspamd_redis_process_tokens (struct rspamd_task *task,
        if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
                        rt->redis_object_expanded, learned_key) == REDIS_OK) {
 
-               rspamd_session_add_event (task->s, rspamd_redis_fin, rt, M);
+               rspamd_session_add_event (task->s, NULL, rt, M);
                rt->has_event = TRUE;
                rt->tokens = g_ptr_array_ref (tokens);
 
@@ -1762,32 +1749,18 @@ rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime,
                gpointer ctx)
 {
        struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
-       redisAsyncContext *redis;
-
-       if (ev_can_stop (&rt->timeout_event)) {
-               ev_timer_stop (task->event_loop, &rt->timeout_event);
-       }
-
-       if (rt->redis) {
-               redis = rt->redis;
-               rt->redis = NULL;
-               redisAsyncFree (redis);
-       }
-
-       if (rt->tokens) {
-               g_ptr_array_unref (rt->tokens);
-               rt->tokens = NULL;
-       }
 
        if (rt->err) {
                msg_info_task ("cannot retrieve stat tokens from Redis: %e", rt->err);
                g_error_free (rt->err);
                rt->err = NULL;
-
+               rspamd_redis_fin (rt);
 
                return FALSE;
        }
 
+       rspamd_redis_fin (rt);
+
        return TRUE;
 }
 
@@ -1796,9 +1769,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
                gint id, gpointer p)
 {
        struct redis_stat_runtime *rt = REDIS_RUNTIME (p);
-       struct upstream *up;
-       struct upstream_list *ups;
-       rspamd_inet_addr_t *addr;
        rspamd_fstring_t *query;
        const gchar *redis_cmd;
        rspamd_token_t *tok;
@@ -1810,23 +1780,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
                return FALSE;
        }
 
-       ups = rspamd_redis_get_servers (rt->ctx, "write_servers");
-
-       if (!ups) {
-               return FALSE;
-       }
-       up = rspamd_upstream_get (ups,
-                       RSPAMD_UPSTREAM_MASTER_SLAVE,
-                       NULL,
-                       0);
-
-       if (up == NULL) {
-               msg_err_task ("no upstreams reachable");
-               return FALSE;
-       }
-
-       rt->selected = up;
-
        if (rt->ctx->new_schema) {
                if (rt->ctx->stcf->is_spam) {
                        learned_key = "learns_spam";
@@ -1836,37 +1789,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
                }
        }
 
-       addr = rspamd_upstream_addr_next (up);
-       g_assert (addr != NULL);
-
-       if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
-               rt->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
-       }
-       else {
-               rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
-                               rspamd_inet_address_get_port (addr));
-       }
-
-       if (rt->redis == NULL) {
-               msg_warn_task ("cannot connect to redis server %s: %s",
-                               rspamd_inet_address_to_string_pretty (addr),
-                               strerror (errno));
-
-               return FALSE;
-       }
-       else if (rt->redis->err != REDIS_OK) {
-               msg_warn_task ("cannot connect to redis server %s: %s",
-                               rspamd_inet_address_to_string_pretty (addr),
-                               rt->redis->errstr);
-               redisAsyncFree (rt->redis);
-               rt->redis = NULL;
-
-               return FALSE;
-       }
-
-       redisLibevAttach (task->event_loop, rt->redis);
-       rspamd_redis_maybe_auth (rt->ctx, rt->redis);
-
        /*
         * Add the current key to the set of learned keys
         */
@@ -1958,7 +1880,7 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
                                        "RSIG");
                }
 
-               rspamd_session_add_event (task->s, rspamd_redis_fin_learn, rt, M);
+               rspamd_session_add_event (task->s, NULL, rt, M);
                rt->has_event = TRUE;
 
                /* Set timeout */
@@ -1988,25 +1910,17 @@ rspamd_redis_finalize_learn (struct rspamd_task *task, gpointer runtime,
                gpointer ctx, GError **err)
 {
        struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
-       redisAsyncContext *redis;
-
-       if (ev_can_stop (&rt->timeout_event)) {
-               ev_timer_stop (task->event_loop, &rt->timeout_event);
-       }
-
-       if (rt->redis) {
-               redis = rt->redis;
-               rt->redis = NULL;
-               redisAsyncFree (redis);
-       }
 
        if (rt->err) {
                g_propagate_error (err, rt->err);
                rt->err = NULL;
+               rspamd_redis_fin (rt);
 
                return FALSE;
        }
 
+       rspamd_redis_fin (rt);
+
        return TRUE;
 }
 
index 6bd17f38f0269224de38406b5e41610d08e30ef3..fc42cd8757873135cce3362640611267fb9a59a2 100644 (file)
@@ -818,6 +818,8 @@ rspamd_stat_learn (struct rspamd_task *task,
 
        if (stage == RSPAMD_TASK_STAGE_LEARN_PRE) {
                /* Process classifiers */
+               rspamd_stat_preprocess (st_ctx, task, TRUE);
+
                if (!rspamd_stat_cache_check (st_ctx, task, classifier, spam, err)) {
                        return RSPAMD_STAT_PROCESS_ERROR;
                }