mirror of
https://github.com/rspamd/rspamd.git
synced 2024-07-30 08:18:31 +02:00
[Fix] Stat_redis_backend: Fix memory leak and simplify learn path
This commit is contained in:
parent
d623ed249b
commit
f514841c75
@ -483,6 +483,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
|
||||
* abort with an error, but simply ignore it because the client
|
||||
* doesn't know what the server will spit out over the wire. */
|
||||
c->reader->fn->freeObject(reply);
|
||||
/* Proceed with free'ing when redisAsyncFree() was called. */
|
||||
if (c->flags & REDIS_FREEING) {
|
||||
__redisAsyncFree(ac);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -637,7 +637,6 @@ rspamd_redis_store_stat_signature (struct rspamd_task *task,
|
||||
guint i, blen, klen;
|
||||
rspamd_fstring_t *out;
|
||||
|
||||
out = rspamd_fstring_sized_new (1024);
|
||||
sig = rspamd_mempool_get_variable (task->task_pool,
|
||||
RSPAMD_MEMPOOL_STAT_SIGNATURE);
|
||||
|
||||
@ -646,11 +645,10 @@ rspamd_redis_store_stat_signature (struct rspamd_task *task,
|
||||
return;
|
||||
}
|
||||
|
||||
out = rspamd_fstring_sized_new (1024);
|
||||
klen = rspamd_snprintf (keybuf, sizeof (keybuf), "%s_%s_%s",
|
||||
prefix, sig, rt->stcf->is_spam ? "S" : "H");
|
||||
|
||||
out->len = 0;
|
||||
|
||||
/* Cleanup key */
|
||||
rspamd_printf_fstring (&out, ""
|
||||
"*2\r\n"
|
||||
@ -1069,7 +1067,12 @@ rspamd_redis_fin (gpointer data)
|
||||
struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
|
||||
redisAsyncContext *redis;
|
||||
|
||||
rt->has_event = FALSE;
|
||||
if (rt->has_event) {
|
||||
/* Should not happen ! */
|
||||
msg_err ("FIXME: this code path should not be reached!");
|
||||
rspamd_session_remove_event (rt->task->s, NULL, rt);
|
||||
rt->has_event = FALSE;
|
||||
}
|
||||
/* Stop timeout */
|
||||
if (ev_can_stop (&rt->timeout_event)) {
|
||||
ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
|
||||
@ -1086,29 +1089,9 @@ rspamd_redis_fin (gpointer data)
|
||||
/* This calls for all callbacks pending */
|
||||
redisAsyncFree (redis);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
rspamd_redis_fin_learn (gpointer data)
|
||||
{
|
||||
struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
|
||||
redisAsyncContext *redis;
|
||||
|
||||
rt->has_event = FALSE;
|
||||
|
||||
if (rt->tokens) {
|
||||
g_ptr_array_unref (rt->tokens);
|
||||
rt->tokens = NULL;
|
||||
}
|
||||
|
||||
/* Stop timeout */
|
||||
ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
|
||||
|
||||
if (rt->redis) {
|
||||
redis = rt->redis;
|
||||
rt->redis = NULL;
|
||||
/* This calls for all callbacks pending */
|
||||
redisAsyncFree (redis);
|
||||
if (rt->err) {
|
||||
g_error_free (rt->err);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1145,7 +1128,7 @@ rspamd_redis_timeout (EV_P_ ev_timer *w, int revents)
|
||||
}
|
||||
if (rt->has_event) {
|
||||
rt->has_event = FALSE;
|
||||
rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
|
||||
rspamd_session_remove_event (task->s, NULL, rt);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1236,7 +1219,8 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
|
||||
}
|
||||
|
||||
if (rt->has_event) {
|
||||
rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
|
||||
rt->has_event = FALSE;
|
||||
rspamd_session_remove_event (task->s, NULL, rt);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1363,7 +1347,8 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
|
||||
}
|
||||
|
||||
if (final && rt->has_event) {
|
||||
rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
|
||||
rt->has_event = FALSE;
|
||||
rspamd_session_remove_event (task->s, NULL, rt);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1395,7 +1380,8 @@ rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv)
|
||||
}
|
||||
|
||||
if (rt->has_event) {
|
||||
rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt);
|
||||
rt->has_event = FALSE;
|
||||
rspamd_session_remove_event (task->s, NULL, rt);
|
||||
}
|
||||
}
|
||||
static void
|
||||
@ -1647,15 +1633,14 @@ rspamd_redis_runtime (struct rspamd_task *task,
|
||||
|
||||
if (rspamd_redis_expand_object (ctx->redis_object, ctx, task,
|
||||
&object_expanded) == 0) {
|
||||
msg_err_task ("expansion for learning failed for symbol %s "
|
||||
msg_err_task ("expansion for %s failed for symbol %s "
|
||||
"(maybe learning per user classifier with no user or recipient)",
|
||||
learn ? "learning" : "classifying",
|
||||
stcf->symbol);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
|
||||
rspamd_mempool_add_destructor (task->task_pool,
|
||||
rspamd_gerror_free_maybe, &rt->err);
|
||||
rt->selected = up;
|
||||
rt->task = task;
|
||||
rt->ctx = ctx;
|
||||
@ -1692,6 +1677,8 @@ rspamd_redis_runtime (struct rspamd_task *task,
|
||||
redisLibevAttach (task->event_loop, rt->redis);
|
||||
rspamd_redis_maybe_auth (ctx, rt->redis);
|
||||
|
||||
rspamd_mempool_add_destructor (task->task_pool, rspamd_redis_fin, rt);
|
||||
|
||||
return rt;
|
||||
}
|
||||
|
||||
@ -1738,7 +1725,7 @@ rspamd_redis_process_tokens (struct rspamd_task *task,
|
||||
if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
|
||||
rt->redis_object_expanded, learned_key) == REDIS_OK) {
|
||||
|
||||
rspamd_session_add_event (task->s, rspamd_redis_fin, rt, M);
|
||||
rspamd_session_add_event (task->s, NULL, rt, M);
|
||||
rt->has_event = TRUE;
|
||||
rt->tokens = g_ptr_array_ref (tokens);
|
||||
|
||||
@ -1762,32 +1749,18 @@ rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime,
|
||||
gpointer ctx)
|
||||
{
|
||||
struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
|
||||
redisAsyncContext *redis;
|
||||
|
||||
if (ev_can_stop (&rt->timeout_event)) {
|
||||
ev_timer_stop (task->event_loop, &rt->timeout_event);
|
||||
}
|
||||
|
||||
if (rt->redis) {
|
||||
redis = rt->redis;
|
||||
rt->redis = NULL;
|
||||
redisAsyncFree (redis);
|
||||
}
|
||||
|
||||
if (rt->tokens) {
|
||||
g_ptr_array_unref (rt->tokens);
|
||||
rt->tokens = NULL;
|
||||
}
|
||||
|
||||
if (rt->err) {
|
||||
msg_info_task ("cannot retrieve stat tokens from Redis: %e", rt->err);
|
||||
g_error_free (rt->err);
|
||||
rt->err = NULL;
|
||||
|
||||
rspamd_redis_fin (rt);
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
rspamd_redis_fin (rt);
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
@ -1796,9 +1769,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
|
||||
gint id, gpointer p)
|
||||
{
|
||||
struct redis_stat_runtime *rt = REDIS_RUNTIME (p);
|
||||
struct upstream *up;
|
||||
struct upstream_list *ups;
|
||||
rspamd_inet_addr_t *addr;
|
||||
rspamd_fstring_t *query;
|
||||
const gchar *redis_cmd;
|
||||
rspamd_token_t *tok;
|
||||
@ -1810,23 +1780,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
ups = rspamd_redis_get_servers (rt->ctx, "write_servers");
|
||||
|
||||
if (!ups) {
|
||||
return FALSE;
|
||||
}
|
||||
up = rspamd_upstream_get (ups,
|
||||
RSPAMD_UPSTREAM_MASTER_SLAVE,
|
||||
NULL,
|
||||
0);
|
||||
|
||||
if (up == NULL) {
|
||||
msg_err_task ("no upstreams reachable");
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
rt->selected = up;
|
||||
|
||||
if (rt->ctx->new_schema) {
|
||||
if (rt->ctx->stcf->is_spam) {
|
||||
learned_key = "learns_spam";
|
||||
@ -1836,37 +1789,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
|
||||
}
|
||||
}
|
||||
|
||||
addr = rspamd_upstream_addr_next (up);
|
||||
g_assert (addr != NULL);
|
||||
|
||||
if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
|
||||
rt->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
|
||||
}
|
||||
else {
|
||||
rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
|
||||
rspamd_inet_address_get_port (addr));
|
||||
}
|
||||
|
||||
if (rt->redis == NULL) {
|
||||
msg_warn_task ("cannot connect to redis server %s: %s",
|
||||
rspamd_inet_address_to_string_pretty (addr),
|
||||
strerror (errno));
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
else if (rt->redis->err != REDIS_OK) {
|
||||
msg_warn_task ("cannot connect to redis server %s: %s",
|
||||
rspamd_inet_address_to_string_pretty (addr),
|
||||
rt->redis->errstr);
|
||||
redisAsyncFree (rt->redis);
|
||||
rt->redis = NULL;
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
redisLibevAttach (task->event_loop, rt->redis);
|
||||
rspamd_redis_maybe_auth (rt->ctx, rt->redis);
|
||||
|
||||
/*
|
||||
* Add the current key to the set of learned keys
|
||||
*/
|
||||
@ -1958,7 +1880,7 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
|
||||
"RSIG");
|
||||
}
|
||||
|
||||
rspamd_session_add_event (task->s, rspamd_redis_fin_learn, rt, M);
|
||||
rspamd_session_add_event (task->s, NULL, rt, M);
|
||||
rt->has_event = TRUE;
|
||||
|
||||
/* Set timeout */
|
||||
@ -1988,25 +1910,17 @@ rspamd_redis_finalize_learn (struct rspamd_task *task, gpointer runtime,
|
||||
gpointer ctx, GError **err)
|
||||
{
|
||||
struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
|
||||
redisAsyncContext *redis;
|
||||
|
||||
if (ev_can_stop (&rt->timeout_event)) {
|
||||
ev_timer_stop (task->event_loop, &rt->timeout_event);
|
||||
}
|
||||
|
||||
if (rt->redis) {
|
||||
redis = rt->redis;
|
||||
rt->redis = NULL;
|
||||
redisAsyncFree (redis);
|
||||
}
|
||||
|
||||
if (rt->err) {
|
||||
g_propagate_error (err, rt->err);
|
||||
rt->err = NULL;
|
||||
rspamd_redis_fin (rt);
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
rspamd_redis_fin (rt);
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
|
@ -818,6 +818,8 @@ rspamd_stat_learn (struct rspamd_task *task,
|
||||
|
||||
if (stage == RSPAMD_TASK_STAGE_LEARN_PRE) {
|
||||
/* Process classifiers */
|
||||
rspamd_stat_preprocess (st_ctx, task, TRUE);
|
||||
|
||||
if (!rspamd_stat_cache_check (st_ctx, task, classifier, spam, err)) {
|
||||
return RSPAMD_STAT_PROCESS_ERROR;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user