]> source.dussan.org Git - rspamd.git/commitdiff
Start redis plugin rejig
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 29 Dec 2015 16:59:53 +0000 (16:59 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 29 Dec 2015 16:59:53 +0000 (16:59 +0000)
src/libstat/backends/redis.c

index b5194065aacdac740ab7348b13252cb53ced2ec2..5f9b5d126fb25cd7af96e2952262a011805028b2 100644 (file)
@@ -36,6 +36,7 @@
 #define REDIS_BACKEND_TYPE "redis"
 #define REDIS_DEFAULT_PORT 6379
 #define REDIS_DEFAULT_OBJECT "%s%l"
+#define REDIS_DEFAULT_TIMEOUT 0.5
 
 struct redis_stat_ctx_elt {
        struct upstream_list *read_servers;
@@ -52,9 +53,12 @@ struct redis_stat_ctx {
 struct redis_stat_runtime {
        struct rspamd_task *task;
        struct upstream *selected;
+       struct event timeout_event;
        GArray *results;
        gchar *redis_object_expanded;
        redisAsyncContext *redis;
+       guint64 learned;
+       gboolean connected;
 };
 
 #define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt)
@@ -65,13 +69,6 @@ rspamd_redis_stat_quark (void)
        return g_quark_from_static_string ("redis-statistics");
 }
 
-static void
-rspamd_redis_fin (gpointer data)
-{
-       struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
-
-       redisAsyncFree (rt->redis);
-}
 
 /*
  * Non-static for lua unit testing
@@ -275,6 +272,66 @@ rspamd_redis_expand_object (const gchar *pattern,
        return tlen;
 }
 
+/* Called on connection termination */
+static void
+rspamd_redis_fin (gpointer data)
+{
+       struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
+
+       redisAsyncFree (rt->redis);
+       event_del (&rt->timeout_event);
+}
+
+static void
+rspamd_redis_timeout (gint fd, short what, gpointer d)
+{
+       struct redis_stat_runtime *rt = REDIS_RUNTIME (d);
+       struct rspamd_task *task;
+
+       task = rt->task;
+
+       msg_err_task ("connection to redis server %s timed out",
+                       rspamd_upstream_name (rt->selected));
+       rspamd_upstream_fail (rt->selected);
+       rspamd_session_remove_event (task->s, rspamd_redis_fin, d);
+}
+
+/* Called when we have connected to the redis server and got stats */
+static void
+rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+       struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
+       redisReply *reply = r;
+       struct rspamd_task *task;
+
+       task = rt->task;
+
+       if (c->err == 0) {
+               if (r != NULL) {
+                       if (reply->type == REDIS_REPLY_INTEGER) {
+                               rt->learned = reply->integer;
+                       }
+                       else {
+                               rt->learned = 0;
+                       }
+
+                       rt->connected = TRUE;
+               }
+               else {
+                       msg_err_task ("error getting reply from redis server %s: %s",
+                                       rspamd_upstream_name (rt->selected), c->errstr);
+                       rspamd_upstream_fail (rt->selected);
+                       rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+               }
+       }
+       else {
+               msg_err_task ("error getting reply from redis server %s: %s",
+                               rspamd_upstream_name (rt->selected), c->errstr);
+               rspamd_upstream_fail (rt->selected);
+               rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+       }
+}
+
 gpointer
 rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
 {
@@ -334,7 +391,6 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
                                        msg_err ("statfile %s has no write redis servers, "
                                                        "so learning is impossible", stf->symbol);
                                        curst = curst->next;
-                                       continue;
                                }
                                else {
                                        backend->write_servers = rspamd_upstreams_create (cfg->ups_ctx);
@@ -361,6 +417,14 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
                                        }
                                }
 
+                               elt = ucl_object_find_key (stf->opts, "timeout");
+                               if (elt) {
+                                       backend->timeout = ucl_object_todouble (elt);
+                               }
+                               else {
+                                       backend->timeout = REDIS_DEFAULT_TIMEOUT;
+                               }
+
                                g_hash_table_insert (new->redis_elts, stf, backend);
 
                                ctx->statfiles ++;
@@ -385,6 +449,7 @@ rspamd_redis_runtime (struct rspamd_task *task,
        struct redis_stat_runtime *rt;
        struct upstream *up;
        rspamd_inet_addr_t *addr;
+       struct timeval tv;
 
        g_assert (ctx != NULL);
        g_assert (stcf != NULL);
@@ -415,7 +480,7 @@ rspamd_redis_runtime (struct rspamd_task *task,
                return NULL;
        }
 
-       rt = rspamd_mempool_alloc (task->task_pool, sizeof (*rt));
+       rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
        rspamd_redis_expand_object (elt->redis_object, stcf, task,
                        &rt->redis_object_expanded);
        rt->selected = up;
@@ -431,5 +496,13 @@ rspamd_redis_runtime (struct rspamd_task *task,
        rspamd_session_add_event (task->s, rspamd_redis_fin, rt,
                        rspamd_redis_stat_quark ());
 
+       /* Now check stats */
+       event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt);
+       event_base_set (task->ev_base, &rt->timeout_event);
+       double_to_tv (elt->timeout, &tv);
+       event_add (&rt->timeout_event, &tv);
+       redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
+                       rt->redis_object_expanded, "learned");
+
        return rt;
 }