diff --git a/contrib/hiredis/async.c b/contrib/hiredis/async.c index a508036e6..851676263 100644 --- a/contrib/hiredis/async.c +++ b/contrib/hiredis/async.c @@ -483,6 +483,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) { * abort with an error, but simply ignore it because the client * doesn't know what the server will spit out over the wire. */ c->reader->fn->freeObject(reply); + /* Proceed with free'ing when redisAsyncFree() was called. */ + if (c->flags & REDIS_FREEING) { + __redisAsyncFree(ac); + return; + } } } diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c index 5774b46a6..84bc0ba77 100644 --- a/src/libstat/backends/redis_backend.c +++ b/src/libstat/backends/redis_backend.c @@ -637,7 +637,6 @@ rspamd_redis_store_stat_signature (struct rspamd_task *task, guint i, blen, klen; rspamd_fstring_t *out; - out = rspamd_fstring_sized_new (1024); sig = rspamd_mempool_get_variable (task->task_pool, RSPAMD_MEMPOOL_STAT_SIGNATURE); @@ -646,11 +645,10 @@ rspamd_redis_store_stat_signature (struct rspamd_task *task, return; } + out = rspamd_fstring_sized_new (1024); klen = rspamd_snprintf (keybuf, sizeof (keybuf), "%s_%s_%s", prefix, sig, rt->stcf->is_spam ? "S" : "H"); - out->len = 0; - /* Cleanup key */ rspamd_printf_fstring (&out, "" "*2\r\n" @@ -1069,7 +1067,12 @@ rspamd_redis_fin (gpointer data) struct redis_stat_runtime *rt = REDIS_RUNTIME (data); redisAsyncContext *redis; - rt->has_event = FALSE; + if (rt->has_event) { + /* Should not happen ! */ + msg_err ("FIXME: this code path should not be reached!"); + rspamd_session_remove_event (rt->task->s, NULL, rt); + rt->has_event = FALSE; + } /* Stop timeout */ if (ev_can_stop (&rt->timeout_event)) { ev_timer_stop (rt->task->event_loop, &rt->timeout_event); @@ -1086,29 +1089,9 @@ rspamd_redis_fin (gpointer data) /* This calls for all callbacks pending */ redisAsyncFree (redis); } -} -static void -rspamd_redis_fin_learn (gpointer data) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (data); - redisAsyncContext *redis; - - rt->has_event = FALSE; - - if (rt->tokens) { - g_ptr_array_unref (rt->tokens); - rt->tokens = NULL; - } - - /* Stop timeout */ - ev_timer_stop (rt->task->event_loop, &rt->timeout_event); - - if (rt->redis) { - redis = rt->redis; - rt->redis = NULL; - /* This calls for all callbacks pending */ - redisAsyncFree (redis); + if (rt->err) { + g_error_free (rt->err); } } @@ -1145,7 +1128,7 @@ rspamd_redis_timeout (EV_P_ ev_timer *w, int revents) } if (rt->has_event) { rt->has_event = FALSE; - rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); + rspamd_session_remove_event (task->s, NULL, rt); } } @@ -1236,7 +1219,8 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv) } if (rt->has_event) { - rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); + rt->has_event = FALSE; + rspamd_session_remove_event (task->s, NULL, rt); } } @@ -1363,7 +1347,8 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv) } if (final && rt->has_event) { - rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); + rt->has_event = FALSE; + rspamd_session_remove_event (task->s, NULL, rt); } } @@ -1395,7 +1380,8 @@ rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv) } if (rt->has_event) { - rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt); + rt->has_event = FALSE; + rspamd_session_remove_event (task->s, NULL, rt); } } static void @@ -1647,15 +1633,14 @@ rspamd_redis_runtime (struct rspamd_task *task, if (rspamd_redis_expand_object (ctx->redis_object, ctx, task, &object_expanded) == 0) { - msg_err_task ("expansion for learning failed for symbol %s " + msg_err_task ("expansion for %s failed for symbol %s " "(maybe learning per user classifier with no user or recipient)", + learn ? "learning" : "classifying", stcf->symbol); return NULL; } rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt)); - rspamd_mempool_add_destructor (task->task_pool, - rspamd_gerror_free_maybe, &rt->err); rt->selected = up; rt->task = task; rt->ctx = ctx; @@ -1692,6 +1677,8 @@ rspamd_redis_runtime (struct rspamd_task *task, redisLibevAttach (task->event_loop, rt->redis); rspamd_redis_maybe_auth (ctx, rt->redis); + rspamd_mempool_add_destructor (task->task_pool, rspamd_redis_fin, rt); + return rt; } @@ -1738,7 +1725,7 @@ rspamd_redis_process_tokens (struct rspamd_task *task, if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s", rt->redis_object_expanded, learned_key) == REDIS_OK) { - rspamd_session_add_event (task->s, rspamd_redis_fin, rt, M); + rspamd_session_add_event (task->s, NULL, rt, M); rt->has_event = TRUE; rt->tokens = g_ptr_array_ref (tokens); @@ -1762,32 +1749,18 @@ rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime, gpointer ctx) { struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); - redisAsyncContext *redis; - - if (ev_can_stop (&rt->timeout_event)) { - ev_timer_stop (task->event_loop, &rt->timeout_event); - } - - if (rt->redis) { - redis = rt->redis; - rt->redis = NULL; - redisAsyncFree (redis); - } - - if (rt->tokens) { - g_ptr_array_unref (rt->tokens); - rt->tokens = NULL; - } if (rt->err) { msg_info_task ("cannot retrieve stat tokens from Redis: %e", rt->err); g_error_free (rt->err); rt->err = NULL; - + rspamd_redis_fin (rt); return FALSE; } + rspamd_redis_fin (rt); + return TRUE; } @@ -1796,9 +1769,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, gint id, gpointer p) { struct redis_stat_runtime *rt = REDIS_RUNTIME (p); - struct upstream *up; - struct upstream_list *ups; - rspamd_inet_addr_t *addr; rspamd_fstring_t *query; const gchar *redis_cmd; rspamd_token_t *tok; @@ -1810,23 +1780,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, return FALSE; } - ups = rspamd_redis_get_servers (rt->ctx, "write_servers"); - - if (!ups) { - return FALSE; - } - up = rspamd_upstream_get (ups, - RSPAMD_UPSTREAM_MASTER_SLAVE, - NULL, - 0); - - if (up == NULL) { - msg_err_task ("no upstreams reachable"); - return FALSE; - } - - rt->selected = up; - if (rt->ctx->new_schema) { if (rt->ctx->stcf->is_spam) { learned_key = "learns_spam"; @@ -1836,37 +1789,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, } } - addr = rspamd_upstream_addr_next (up); - g_assert (addr != NULL); - - if (rspamd_inet_address_get_af (addr) == AF_UNIX) { - rt->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr)); - } - else { - rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr), - rspamd_inet_address_get_port (addr)); - } - - if (rt->redis == NULL) { - msg_warn_task ("cannot connect to redis server %s: %s", - rspamd_inet_address_to_string_pretty (addr), - strerror (errno)); - - return FALSE; - } - else if (rt->redis->err != REDIS_OK) { - msg_warn_task ("cannot connect to redis server %s: %s", - rspamd_inet_address_to_string_pretty (addr), - rt->redis->errstr); - redisAsyncFree (rt->redis); - rt->redis = NULL; - - return FALSE; - } - - redisLibevAttach (task->event_loop, rt->redis); - rspamd_redis_maybe_auth (rt->ctx, rt->redis); - /* * Add the current key to the set of learned keys */ @@ -1958,7 +1880,7 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, "RSIG"); } - rspamd_session_add_event (task->s, rspamd_redis_fin_learn, rt, M); + rspamd_session_add_event (task->s, NULL, rt, M); rt->has_event = TRUE; /* Set timeout */ @@ -1988,25 +1910,17 @@ rspamd_redis_finalize_learn (struct rspamd_task *task, gpointer runtime, gpointer ctx, GError **err) { struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); - redisAsyncContext *redis; - - if (ev_can_stop (&rt->timeout_event)) { - ev_timer_stop (task->event_loop, &rt->timeout_event); - } - - if (rt->redis) { - redis = rt->redis; - rt->redis = NULL; - redisAsyncFree (redis); - } if (rt->err) { g_propagate_error (err, rt->err); rt->err = NULL; + rspamd_redis_fin (rt); return FALSE; } + rspamd_redis_fin (rt); + return TRUE; } diff --git a/src/libstat/stat_process.c b/src/libstat/stat_process.c index 6bd17f38f..fc42cd875 100644 --- a/src/libstat/stat_process.c +++ b/src/libstat/stat_process.c @@ -818,6 +818,8 @@ rspamd_stat_learn (struct rspamd_task *task, if (stage == RSPAMD_TASK_STAGE_LEARN_PRE) { /* Process classifiers */ + rspamd_stat_preprocess (st_ctx, task, TRUE); + if (!rspamd_stat_cache_check (st_ctx, task, classifier, spam, err)) { return RSPAMD_STAT_PROCESS_ERROR; }