]> source.dussan.org Git - rspamd.git/commitdiff
Implement runtime creation for redis.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 2 Mar 2015 16:32:22 +0000 (16:32 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 2 Mar 2015 16:32:22 +0000 (16:32 +0000)
src/libstat/backends/redis.c

index 1b35802a178b5da77343f9d3250cf779bac9c610..0709246897aebc79695758d6650ea35bed8bb37d 100644 (file)
@@ -25,6 +25,7 @@
 #include "main.h"
 #include "stat_internal.h"
 #include "hiredis.h"
+#include "adapters/libevent.h"
 #include "upstream.h"
 
 #define REDIS_CTX(p) (struct redis_stat_ctx *)(p)
@@ -33,7 +34,7 @@
 #define REDIS_DEFAULT_PORT 6379
 #define REDIS_DEFAULT_OBJECT "%s%l"
 
-struct redis_stat_ctx {
+struct redis_stat_ctx_elt {
        struct upstream_list *read_servers;
        struct upstream_list *write_servers;
 
@@ -41,15 +42,34 @@ struct redis_stat_ctx {
        gdouble timeout;
 };
 
+struct redis_stat_ctx {
+       GHashTable *redis_elts;
+};
+
 struct redis_stat_runtime {
        struct rspamd_task *task;
        struct upstream *selected;
        GArray *results;
        gchar *redis_object_expanded;
+       redisAsyncContext *redis;
 };
 
 #define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt)
 
+static GQuark
+rspamd_redis_stat_quark (void)
+{
+       return g_quark_from_static_string ("redis-statistics");
+}
+
+static void
+rspamd_redis_fin (gpointer data)
+{
+       struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
+
+       redisAsyncFree (rt->redis);
+}
+
 /*
  * Non-static for lua unit testing
  */
@@ -256,12 +276,14 @@ gpointer
 rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
 {
        struct redis_stat_ctx *new;
+       struct redis_stat_ctx_elt *backend;
        struct rspamd_classifier_config *clf;
        struct rspamd_statfile_config *stf;
        GList *cur, *curst;
        const ucl_object_t *elt;
 
        new = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*new));
+       new->redis_elts = g_hash_table_new (g_direct_hash, g_direct_equal);
 
        /* Iterate over all classifiers and load matching statfiles */
        cur = cfg->classifiers;
@@ -280,6 +302,8 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
                                /*
                                 * Check configuration sanity
                                 */
+                               backend = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*backend));
+
                                elt = ucl_object_find_key (stf->opts, "read_servers");
                                if (elt == NULL) {
                                        elt = ucl_object_find_key (stf->opts, "servers");
@@ -290,8 +314,8 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
                                        continue;
                                }
                                else {
-                                       new->read_servers = rspamd_upstreams_create ();
-                                       if (!rspamd_upstreams_from_ucl (new->read_servers, elt,
+                                       backend->read_servers = rspamd_upstreams_create ();
+                                       if (!rspamd_upstreams_from_ucl (backend->read_servers, elt,
                                                        REDIS_DEFAULT_PORT, NULL)) {
                                                msg_err ("statfile %s cannot read servers configuration",
                                                                stf->symbol);
@@ -308,30 +332,32 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
                                        continue;
                                }
                                else {
-                                       new->write_servers = rspamd_upstreams_create ();
-                                       if (!rspamd_upstreams_from_ucl (new->read_servers, elt,
+                                       backend->write_servers = rspamd_upstreams_create ();
+                                       if (!rspamd_upstreams_from_ucl (backend->write_servers, elt,
                                                        REDIS_DEFAULT_PORT, NULL)) {
                                                msg_err ("statfile %s cannot write servers configuration",
                                                                stf->symbol);
-                                               rspamd_upstreams_destroy (new->write_servers);
-                                               new->write_servers = NULL;
+                                               rspamd_upstreams_destroy (backend->write_servers);
+                                               backend->write_servers = NULL;
                                        }
                                }
 
                                elt = ucl_object_find_key (stf->opts, "prefix");
                                if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
-                                       new->redis_object = REDIS_DEFAULT_OBJECT;
+                                       backend->redis_object = REDIS_DEFAULT_OBJECT;
                                }
                                else {
                                        /* XXX: sanity check */
-                                       new->redis_object = ucl_object_tostring (elt);
-                                       if (rspamd_redis_expand_object (new->redis_object, stf,
+                                       backend->redis_object = ucl_object_tostring (elt);
+                                       if (rspamd_redis_expand_object (backend->redis_object, stf,
                                                        NULL, NULL) == 0) {
                                                msg_err ("statfile %s cannot write servers configuration",
                                                        stf->symbol);
                                        }
                                }
 
+                               g_hash_table_insert (new->redis_elts, stf, backend);
+
                                ctx->statfiles ++;
                        }
 
@@ -347,9 +373,54 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
 gpointer
 rspamd_redis_runtime (struct rspamd_task *task,
                struct rspamd_statfile_config *stcf,
-               gboolean learn, gpointer ctx)
+               gboolean learn, gpointer c)
 {
+       struct redis_stat_ctx *ctx = REDIS_CTX (c);
+       struct redis_stat_ctx_elt *elt;
+       struct redis_stat_runtime *rt;
+       struct upstream *up;
+       rspamd_inet_addr_t *addr;
+
+       g_assert (ctx != NULL);
+       g_assert (stcf != NULL);
+
+       elt = g_hash_table_lookup (ctx->redis_elts, stcf);
+       g_assert (elt != NULL);
+
+       if (learn && elt->write_servers == NULL) {
+               msg_err ("no write servers defined for %s, cannot learn", stcf->symbol);
+               return NULL;
+       }
+
+       if (learn) {
+               up = rspamd_upstream_get (elt->write_servers, RSPAMD_UPSTREAM_MASTER_SLAVE);
+       }
+       else {
+               up = rspamd_upstream_get (elt->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN);
+       }
+
+       if (up == NULL) {
+               msg_err ("no upstreams reachable");
+               return NULL;
+       }
+
+       rt = rspamd_mempool_alloc (task->task_pool, sizeof (*rt));
+       rspamd_redis_expand_object (elt->redis_object, stcf, task,
+                       &rt->redis_object_expanded);
+       rt->selected = up;
+       rt->task = task;
+
+       addr = rspamd_upstream_addr (up);
+       g_assert (addr != NULL);
+       rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
+                       rspamd_inet_address_get_port (addr));
+       g_assert (rt->redis != NULL);
+
+       redisLibeventAttach (rt->redis, task->ev_base);
+       register_async_event (task->s, rspamd_redis_fin, rt,
+                       rspamd_redis_stat_quark ());
 
+       return rt;
 }
 
 gboolean rspamd_redis_process_token (struct token_node_s *tok,