aboutsummaryrefslogtreecommitdiffstats
path: root/src/libstat/backends/redis_backend.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstat/backends/redis_backend.c')
-rw-r--r--src/libstat/backends/redis_backend.c56
1 files changed, 50 insertions, 6 deletions
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c
index 4f65a673c..a2924d054 100644
--- a/src/libstat/backends/redis_backend.c
+++ b/src/libstat/backends/redis_backend.c
@@ -22,6 +22,7 @@
#ifdef WITH_HIREDIS
#include "hiredis.h"
#include "adapters/libevent.h"
+#include "ref.h"
#define REDIS_CTX(p) (struct redis_stat_ctx *)(p)
@@ -64,6 +65,7 @@ struct redis_stat_runtime {
guint64 learned;
gint id;
enum rspamd_redis_connection_state conn_state;
+ ref_entry_t ref;
};
/* Used to get statistics from redis */
@@ -678,9 +680,9 @@ rspamd_redis_fin (gpointer data)
if (rt->conn_state != RSPAMD_REDIS_CONNECTED) {
rt->conn_state = RSPAMD_REDIS_DISCONNECTED;
+ event_del (&rt->timeout_event);
+ REF_RELEASE (rt);
}
-
- event_del (&rt->timeout_event);
}
static void
@@ -690,9 +692,9 @@ rspamd_redis_fin_learn (gpointer data)
if (rt->conn_state != RSPAMD_REDIS_CONNECTED) {
rt->conn_state = RSPAMD_REDIS_DISCONNECTED;
+ event_del (&rt->timeout_event);
+ REF_RELEASE (rt);
}
-
- event_del (&rt->timeout_event);
}
static void
@@ -703,9 +705,14 @@ rspamd_redis_timeout (gint fd, short what, gpointer d)
task = rt->task;
- msg_err_task ("connection to redis server %s timed out",
+ msg_err_task_check ("connection to redis server %s timed out",
rspamd_upstream_name (rt->selected));
rspamd_upstream_fail (rt->selected);
+
+ if (rt->conn_state != RSPAMD_REDIS_CONNECTED) {
+ rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+ }
+
rt->conn_state = RSPAMD_REDIS_TIMEDOUT;
redisAsyncFree (rt->redis);
rt->redis = NULL;
@@ -722,6 +729,12 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
task = rt->task;
+ if (rt->conn_state != RSPAMD_REDIS_CONNECTED) {
+ /* Task has disappeared already */
+ REF_RELEASE (rt);
+ return;
+ }
+
if (c->err == 0) {
if (r != NULL) {
if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
@@ -748,6 +761,7 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
rt->learned = val;
rt->conn_state = RSPAMD_REDIS_CONNECTED;
+ REF_RETAIN (rt);
msg_debug_task ("connected to redis server, tokens learned for %s: %uL",
rt->redis_object_expanded, rt->learned);
@@ -765,6 +779,8 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
rspamd_upstream_fail (rt->selected);
rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
}
+
+ REF_RELEASE (rt);
}
/* Called when we have received tokens values from redis */
@@ -781,6 +797,12 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
task = rt->task;
+ if (rt->conn_state != RSPAMD_REDIS_CONNECTED) {
+ /* Task has disappeared already */
+ REF_RELEASE (rt);
+ return;
+ }
+
if (c->err == 0) {
if (r != NULL) {
if (reply->type == REDIS_REPLY_ARRAY) {
@@ -848,6 +870,8 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv)
rspamd_upstream_fail (rt->selected);
rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
}
+
+ REF_RELEASE (rt);
}
/* Called when we have set tokens during learning */
@@ -859,6 +883,12 @@ rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv)
task = rt->task;
+ if (rt->conn_state != RSPAMD_REDIS_CONNECTED) {
+ /* Task has disappeared already */
+ REF_RELEASE (rt);
+ return;
+ }
+
if (c->err == 0) {
rspamd_upstream_ok (rt->selected);
rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt);
@@ -874,6 +904,8 @@ rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv)
redisAsyncFree (rt->redis);
rt->conn_state = RSPAMD_REDIS_DISCONNECTED;
}
+
+ REF_RELEASE (rt);
}
static gboolean
@@ -1053,6 +1085,12 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx,
return (gpointer)backend;
}
+static void
+rspamd_redis_runtime_dtor (struct redis_stat_runtime *rt)
+{
+ g_slice_free1 (sizeof (*rt), rt);
+}
+
gpointer
rspamd_redis_runtime (struct rspamd_task *task,
struct rspamd_statfile_config *stcf,
@@ -1090,7 +1128,8 @@ rspamd_redis_runtime (struct rspamd_task *task,
return NULL;
}
- rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
+ rt = g_slice_alloc0 (sizeof (*rt));
+ REF_INIT_RETAIN (rt, rspamd_redis_runtime_dtor);
rspamd_redis_expand_object (ctx->redis_object, ctx, task,
&rt->redis_object_expanded);
rt->selected = up;
@@ -1114,6 +1153,8 @@ rspamd_redis_runtime (struct rspamd_task *task,
event_base_set (task->ev_base, &rt->timeout_event);
double_to_tv (ctx->timeout, &tv);
event_add (&rt->timeout_event, &tv);
+ /* Cleared by timeout */
+ REF_RETAIN (rt);
rspamd_redis_maybe_auth (ctx, rt->redis);
redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
@@ -1192,6 +1233,8 @@ rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime,
rt->redis = NULL;
rt->conn_state = RSPAMD_REDIS_DISCONNECTED;
+
+ REF_RELEASE (rt);
}
}
@@ -1329,6 +1372,7 @@ rspamd_redis_finalize_learn (struct rspamd_task *task, gpointer runtime,
rt->redis = NULL;
rt->conn_state = RSPAMD_REDIS_DISCONNECTED;
+ REF_RELEASE (rt);
}
}