diff options
Diffstat (limited to 'src/libstat/backends/redis_backend.c')
-rw-r--r-- | src/libstat/backends/redis_backend.c | 56 |
1 files changed, 50 insertions, 6 deletions
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c index 4f65a673c..a2924d054 100644 --- a/src/libstat/backends/redis_backend.c +++ b/src/libstat/backends/redis_backend.c @@ -22,6 +22,7 @@ #ifdef WITH_HIREDIS #include "hiredis.h" #include "adapters/libevent.h" +#include "ref.h" #define REDIS_CTX(p) (struct redis_stat_ctx *)(p) @@ -64,6 +65,7 @@ struct redis_stat_runtime { guint64 learned; gint id; enum rspamd_redis_connection_state conn_state; + ref_entry_t ref; }; /* Used to get statistics from redis */ @@ -678,9 +680,9 @@ rspamd_redis_fin (gpointer data) if (rt->conn_state != RSPAMD_REDIS_CONNECTED) { rt->conn_state = RSPAMD_REDIS_DISCONNECTED; + event_del (&rt->timeout_event); + REF_RELEASE (rt); } - - event_del (&rt->timeout_event); } static void @@ -690,9 +692,9 @@ rspamd_redis_fin_learn (gpointer data) if (rt->conn_state != RSPAMD_REDIS_CONNECTED) { rt->conn_state = RSPAMD_REDIS_DISCONNECTED; + event_del (&rt->timeout_event); + REF_RELEASE (rt); } - - event_del (&rt->timeout_event); } static void @@ -703,9 +705,14 @@ rspamd_redis_timeout (gint fd, short what, gpointer d) task = rt->task; - msg_err_task ("connection to redis server %s timed out", + msg_err_task_check ("connection to redis server %s timed out", rspamd_upstream_name (rt->selected)); rspamd_upstream_fail (rt->selected); + + if (rt->conn_state != RSPAMD_REDIS_CONNECTED) { + rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); + } + rt->conn_state = RSPAMD_REDIS_TIMEDOUT; redisAsyncFree (rt->redis); rt->redis = NULL; @@ -722,6 +729,12 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv) task = rt->task; + if (rt->conn_state != RSPAMD_REDIS_CONNECTED) { + /* Task has disappeared already */ + REF_RELEASE (rt); + return; + } + if (c->err == 0) { if (r != NULL) { if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) { @@ -748,6 +761,7 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv) rt->learned = val; rt->conn_state = RSPAMD_REDIS_CONNECTED; + REF_RETAIN (rt); msg_debug_task ("connected to redis server, tokens learned for %s: %uL", rt->redis_object_expanded, rt->learned); @@ -765,6 +779,8 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv) rspamd_upstream_fail (rt->selected); rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); } + + REF_RELEASE (rt); } /* Called when we have received tokens values from redis */ @@ -781,6 +797,12 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv) task = rt->task; + if (rt->conn_state != RSPAMD_REDIS_CONNECTED) { + /* Task has disappeared already */ + REF_RELEASE (rt); + return; + } + if (c->err == 0) { if (r != NULL) { if (reply->type == REDIS_REPLY_ARRAY) { @@ -848,6 +870,8 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv) rspamd_upstream_fail (rt->selected); rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); } + + REF_RELEASE (rt); } /* Called when we have set tokens during learning */ @@ -859,6 +883,12 @@ rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv) task = rt->task; + if (rt->conn_state != RSPAMD_REDIS_CONNECTED) { + /* Task has disappeared already */ + REF_RELEASE (rt); + return; + } + if (c->err == 0) { rspamd_upstream_ok (rt->selected); rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt); @@ -874,6 +904,8 @@ rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv) redisAsyncFree (rt->redis); rt->conn_state = RSPAMD_REDIS_DISCONNECTED; } + + REF_RELEASE (rt); } static gboolean @@ -1053,6 +1085,12 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, return (gpointer)backend; } +static void +rspamd_redis_runtime_dtor (struct redis_stat_runtime *rt) +{ + g_slice_free1 (sizeof (*rt), rt); +} + gpointer rspamd_redis_runtime (struct rspamd_task *task, struct rspamd_statfile_config *stcf, @@ -1090,7 +1128,8 @@ rspamd_redis_runtime (struct rspamd_task *task, return NULL; } - rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt)); + rt = g_slice_alloc0 (sizeof (*rt)); + REF_INIT_RETAIN (rt, rspamd_redis_runtime_dtor); rspamd_redis_expand_object (ctx->redis_object, ctx, task, &rt->redis_object_expanded); rt->selected = up; @@ -1114,6 +1153,8 @@ rspamd_redis_runtime (struct rspamd_task *task, event_base_set (task->ev_base, &rt->timeout_event); double_to_tv (ctx->timeout, &tv); event_add (&rt->timeout_event, &tv); + /* Cleared by timeout */ + REF_RETAIN (rt); rspamd_redis_maybe_auth (ctx, rt->redis); redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s", @@ -1192,6 +1233,8 @@ rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime, rt->redis = NULL; rt->conn_state = RSPAMD_REDIS_DISCONNECTED; + + REF_RELEASE (rt); } } @@ -1329,6 +1372,7 @@ rspamd_redis_finalize_learn (struct rspamd_task *task, gpointer runtime, rt->redis = NULL; rt->conn_state = RSPAMD_REDIS_DISCONNECTED; + REF_RELEASE (rt); } } |