summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-12-29 16:59:53 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-12-29 16:59:53 +0000
commite365dd860c0e666c11d15bc57a5d18912e8c0115 (patch)
tree1ad83624040d40e3be6ef9b8bbed6aa584b28063
parent596154946ff89da59ef55f36353162749e4111d3 (diff)
downloadrspamd-e365dd860c0e666c11d15bc57a5d18912e8c0115.tar.gz
rspamd-e365dd860c0e666c11d15bc57a5d18912e8c0115.zip
Start redis plugin rejig
-rw-r--r--src/libstat/backends/redis.c91
1 files changed, 82 insertions, 9 deletions
diff --git a/src/libstat/backends/redis.c b/src/libstat/backends/redis.c
index b5194065a..5f9b5d126 100644
--- a/src/libstat/backends/redis.c
+++ b/src/libstat/backends/redis.c
@@ -36,6 +36,7 @@
#define REDIS_BACKEND_TYPE "redis"
#define REDIS_DEFAULT_PORT 6379
#define REDIS_DEFAULT_OBJECT "%s%l"
+#define REDIS_DEFAULT_TIMEOUT 0.5
struct redis_stat_ctx_elt {
struct upstream_list *read_servers;
@@ -52,9 +53,12 @@ struct redis_stat_ctx {
struct redis_stat_runtime {
struct rspamd_task *task;
struct upstream *selected;
+ struct event timeout_event;
GArray *results;
gchar *redis_object_expanded;
redisAsyncContext *redis;
+ guint64 learned;
+ gboolean connected;
};
#define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt)
@@ -65,13 +69,6 @@ rspamd_redis_stat_quark (void)
return g_quark_from_static_string ("redis-statistics");
}
-static void
-rspamd_redis_fin (gpointer data)
-{
- struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
-
- redisAsyncFree (rt->redis);
-}
/*
* Non-static for lua unit testing
@@ -275,6 +272,66 @@ rspamd_redis_expand_object (const gchar *pattern,
return tlen;
}
+/* Called on connection termination */
+static void
+rspamd_redis_fin (gpointer data)
+{
+ struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
+
+ redisAsyncFree (rt->redis);
+ event_del (&rt->timeout_event);
+}
+
+static void
+rspamd_redis_timeout (gint fd, short what, gpointer d)
+{
+ struct redis_stat_runtime *rt = REDIS_RUNTIME (d);
+ struct rspamd_task *task;
+
+ task = rt->task;
+
+ msg_err_task ("connection to redis server %s timed out",
+ rspamd_upstream_name (rt->selected));
+ rspamd_upstream_fail (rt->selected);
+ rspamd_session_remove_event (task->s, rspamd_redis_fin, d);
+}
+
+/* Called when we have connected to the redis server and got stats */
+static void
+rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+ struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
+ redisReply *reply = r;
+ struct rspamd_task *task;
+
+ task = rt->task;
+
+ if (c->err == 0) {
+ if (r != NULL) {
+ if (reply->type == REDIS_REPLY_INTEGER) {
+ rt->learned = reply->integer;
+ }
+ else {
+ rt->learned = 0;
+ }
+
+ rt->connected = TRUE;
+ }
+ else {
+ msg_err_task ("error getting reply from redis server %s: %s",
+ rspamd_upstream_name (rt->selected), c->errstr);
+ rspamd_upstream_fail (rt->selected);
+ rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+ }
+ }
+ else {
+ msg_err_task ("error getting reply from redis server %s: %s",
+ rspamd_upstream_name (rt->selected), c->errstr);
+ rspamd_upstream_fail (rt->selected);
+ rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+ }
+}
+
gpointer
rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
{
@@ -334,7 +391,6 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
msg_err ("statfile %s has no write redis servers, "
"so learning is impossible", stf->symbol);
curst = curst->next;
- continue;
}
else {
backend->write_servers = rspamd_upstreams_create (cfg->ups_ctx);
@@ -361,6 +417,14 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
}
}
+ elt = ucl_object_find_key (stf->opts, "timeout");
+ if (elt) {
+ backend->timeout = ucl_object_todouble (elt);
+ }
+ else {
+ backend->timeout = REDIS_DEFAULT_TIMEOUT;
+ }
+
g_hash_table_insert (new->redis_elts, stf, backend);
ctx->statfiles ++;
@@ -385,6 +449,7 @@ rspamd_redis_runtime (struct rspamd_task *task,
struct redis_stat_runtime *rt;
struct upstream *up;
rspamd_inet_addr_t *addr;
+ struct timeval tv;
g_assert (ctx != NULL);
g_assert (stcf != NULL);
@@ -415,7 +480,7 @@ rspamd_redis_runtime (struct rspamd_task *task,
return NULL;
}
- rt = rspamd_mempool_alloc (task->task_pool, sizeof (*rt));
+ rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
rspamd_redis_expand_object (elt->redis_object, stcf, task,
&rt->redis_object_expanded);
rt->selected = up;
@@ -431,5 +496,13 @@ rspamd_redis_runtime (struct rspamd_task *task,
rspamd_session_add_event (task->s, rspamd_redis_fin, rt,
rspamd_redis_stat_quark ());
+ /* Now check stats */
+ event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt);
+ event_base_set (task->ev_base, &rt->timeout_event);
+ double_to_tv (elt->timeout, &tv);
+ event_add (&rt->timeout_event, &tv);
+ redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
+ rt->redis_object_expanded, "learned");
+
return rt;
}