diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2015-12-29 16:59:53 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2015-12-29 16:59:53 +0000 |
commit | e365dd860c0e666c11d15bc57a5d18912e8c0115 (patch) | |
tree | 1ad83624040d40e3be6ef9b8bbed6aa584b28063 | |
parent | 596154946ff89da59ef55f36353162749e4111d3 (diff) | |
download | rspamd-e365dd860c0e666c11d15bc57a5d18912e8c0115.tar.gz rspamd-e365dd860c0e666c11d15bc57a5d18912e8c0115.zip |
Start redis plugin rejig
-rw-r--r-- | src/libstat/backends/redis.c | 91 |
1 files changed, 82 insertions, 9 deletions
diff --git a/src/libstat/backends/redis.c b/src/libstat/backends/redis.c index b5194065a..5f9b5d126 100644 --- a/src/libstat/backends/redis.c +++ b/src/libstat/backends/redis.c @@ -36,6 +36,7 @@ #define REDIS_BACKEND_TYPE "redis" #define REDIS_DEFAULT_PORT 6379 #define REDIS_DEFAULT_OBJECT "%s%l" +#define REDIS_DEFAULT_TIMEOUT 0.5 struct redis_stat_ctx_elt { struct upstream_list *read_servers; @@ -52,9 +53,12 @@ struct redis_stat_ctx { struct redis_stat_runtime { struct rspamd_task *task; struct upstream *selected; + struct event timeout_event; GArray *results; gchar *redis_object_expanded; redisAsyncContext *redis; + guint64 learned; + gboolean connected; }; #define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt) @@ -65,13 +69,6 @@ rspamd_redis_stat_quark (void) return g_quark_from_static_string ("redis-statistics"); } -static void -rspamd_redis_fin (gpointer data) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (data); - - redisAsyncFree (rt->redis); -} /* * Non-static for lua unit testing @@ -275,6 +272,66 @@ rspamd_redis_expand_object (const gchar *pattern, return tlen; } +/* Called on connection termination */ +static void +rspamd_redis_fin (gpointer data) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (data); + + redisAsyncFree (rt->redis); + event_del (&rt->timeout_event); +} + +static void +rspamd_redis_timeout (gint fd, short what, gpointer d) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (d); + struct rspamd_task *task; + + task = rt->task; + + msg_err_task ("connection to redis server %s timed out", + rspamd_upstream_name (rt->selected)); + rspamd_upstream_fail (rt->selected); + rspamd_session_remove_event (task->s, rspamd_redis_fin, d); +} + +/* Called when we have connected to the redis server and got stats */ +static void +rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (priv); + redisReply *reply = r; + struct rspamd_task *task; + + task = rt->task; + + if (c->err == 0) { + if (r != NULL) { + if (reply->type == REDIS_REPLY_INTEGER) { + rt->learned = reply->integer; + } + else { + rt->learned = 0; + } + + rt->connected = TRUE; + } + else { + msg_err_task ("error getting reply from redis server %s: %s", + rspamd_upstream_name (rt->selected), c->errstr); + rspamd_upstream_fail (rt->selected); + rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); + } + } + else { + msg_err_task ("error getting reply from redis server %s: %s", + rspamd_upstream_name (rt->selected), c->errstr); + rspamd_upstream_fail (rt->selected); + rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); + } +} + gpointer rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg) { @@ -334,7 +391,6 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg) msg_err ("statfile %s has no write redis servers, " "so learning is impossible", stf->symbol); curst = curst->next; - continue; } else { backend->write_servers = rspamd_upstreams_create (cfg->ups_ctx); @@ -361,6 +417,14 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg) } } + elt = ucl_object_find_key (stf->opts, "timeout"); + if (elt) { + backend->timeout = ucl_object_todouble (elt); + } + else { + backend->timeout = REDIS_DEFAULT_TIMEOUT; + } + g_hash_table_insert (new->redis_elts, stf, backend); ctx->statfiles ++; @@ -385,6 +449,7 @@ rspamd_redis_runtime (struct rspamd_task *task, struct redis_stat_runtime *rt; struct upstream *up; rspamd_inet_addr_t *addr; + struct timeval tv; g_assert (ctx != NULL); g_assert (stcf != NULL); @@ -415,7 +480,7 @@ rspamd_redis_runtime (struct rspamd_task *task, return NULL; } - rt = rspamd_mempool_alloc (task->task_pool, sizeof (*rt)); + rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt)); rspamd_redis_expand_object (elt->redis_object, stcf, task, &rt->redis_object_expanded); rt->selected = up; @@ -431,5 +496,13 @@ rspamd_redis_runtime (struct rspamd_task *task, rspamd_session_add_event (task->s, rspamd_redis_fin, rt, rspamd_redis_stat_quark ()); + /* Now check stats */ + event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt); + event_base_set (task->ev_base, &rt->timeout_event); + double_to_tv (elt->timeout, &tv); + event_add (&rt->timeout_event, &tv); + redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s", + rt->redis_object_expanded, "learned"); + return rt; } |