diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2015-03-02 16:32:22 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2015-03-02 16:32:22 +0000 |
commit | ac1748b066ce20567a83de6352376963e7563af1 (patch) | |
tree | 9ce582fb6410b8d2b2296e8f692ac6e5e7c0cc71 /src/libstat | |
parent | 132da5d4b35335eb490c63f5d1ac889b1a71bfd0 (diff) | |
download | rspamd-ac1748b066ce20567a83de6352376963e7563af1.tar.gz rspamd-ac1748b066ce20567a83de6352376963e7563af1.zip |
Implement runtime creation for redis.
Diffstat (limited to 'src/libstat')
-rw-r--r-- | src/libstat/backends/redis.c | 93 |
1 files changed, 82 insertions, 11 deletions
diff --git a/src/libstat/backends/redis.c b/src/libstat/backends/redis.c index 1b35802a1..070924689 100644 --- a/src/libstat/backends/redis.c +++ b/src/libstat/backends/redis.c @@ -25,6 +25,7 @@ #include "main.h" #include "stat_internal.h" #include "hiredis.h" +#include "adapters/libevent.h" #include "upstream.h" #define REDIS_CTX(p) (struct redis_stat_ctx *)(p) @@ -33,7 +34,7 @@ #define REDIS_DEFAULT_PORT 6379 #define REDIS_DEFAULT_OBJECT "%s%l" -struct redis_stat_ctx { +struct redis_stat_ctx_elt { struct upstream_list *read_servers; struct upstream_list *write_servers; @@ -41,15 +42,34 @@ struct redis_stat_ctx { gdouble timeout; }; +struct redis_stat_ctx { + GHashTable *redis_elts; +}; + struct redis_stat_runtime { struct rspamd_task *task; struct upstream *selected; GArray *results; gchar *redis_object_expanded; + redisAsyncContext *redis; }; #define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt) +static GQuark +rspamd_redis_stat_quark (void) +{ + return g_quark_from_static_string ("redis-statistics"); +} + +static void +rspamd_redis_fin (gpointer data) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (data); + + redisAsyncFree (rt->redis); +} + /* * Non-static for lua unit testing */ @@ -256,12 +276,14 @@ gpointer rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg) { struct redis_stat_ctx *new; + struct redis_stat_ctx_elt *backend; struct rspamd_classifier_config *clf; struct rspamd_statfile_config *stf; GList *cur, *curst; const ucl_object_t *elt; new = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*new)); + new->redis_elts = g_hash_table_new (g_direct_hash, g_direct_equal); /* Iterate over all classifiers and load matching statfiles */ cur = cfg->classifiers; @@ -280,6 +302,8 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg) /* * Check configuration sanity */ + backend = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*backend)); + elt = ucl_object_find_key (stf->opts, "read_servers"); if (elt == NULL) { elt = ucl_object_find_key (stf->opts, "servers"); @@ -290,8 +314,8 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg) continue; } else { - new->read_servers = rspamd_upstreams_create (); - if (!rspamd_upstreams_from_ucl (new->read_servers, elt, + backend->read_servers = rspamd_upstreams_create (); + if (!rspamd_upstreams_from_ucl (backend->read_servers, elt, REDIS_DEFAULT_PORT, NULL)) { msg_err ("statfile %s cannot read servers configuration", stf->symbol); @@ -308,30 +332,32 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg) continue; } else { - new->write_servers = rspamd_upstreams_create (); - if (!rspamd_upstreams_from_ucl (new->read_servers, elt, + backend->write_servers = rspamd_upstreams_create (); + if (!rspamd_upstreams_from_ucl (backend->write_servers, elt, REDIS_DEFAULT_PORT, NULL)) { msg_err ("statfile %s cannot write servers configuration", stf->symbol); - rspamd_upstreams_destroy (new->write_servers); - new->write_servers = NULL; + rspamd_upstreams_destroy (backend->write_servers); + backend->write_servers = NULL; } } elt = ucl_object_find_key (stf->opts, "prefix"); if (elt == NULL || ucl_object_type (elt) != UCL_STRING) { - new->redis_object = REDIS_DEFAULT_OBJECT; + backend->redis_object = REDIS_DEFAULT_OBJECT; } else { /* XXX: sanity check */ - new->redis_object = ucl_object_tostring (elt); - if (rspamd_redis_expand_object (new->redis_object, stf, + backend->redis_object = ucl_object_tostring (elt); + if (rspamd_redis_expand_object (backend->redis_object, stf, NULL, NULL) == 0) { msg_err ("statfile %s cannot write servers configuration", stf->symbol); } } + g_hash_table_insert (new->redis_elts, stf, backend); + ctx->statfiles ++; } @@ -347,9 +373,54 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg) gpointer rspamd_redis_runtime (struct rspamd_task *task, struct rspamd_statfile_config *stcf, - gboolean learn, gpointer ctx) + gboolean learn, gpointer c) { + struct redis_stat_ctx *ctx = REDIS_CTX (c); + struct redis_stat_ctx_elt *elt; + struct redis_stat_runtime *rt; + struct upstream *up; + rspamd_inet_addr_t *addr; + + g_assert (ctx != NULL); + g_assert (stcf != NULL); + + elt = g_hash_table_lookup (ctx->redis_elts, stcf); + g_assert (elt != NULL); + + if (learn && elt->write_servers == NULL) { + msg_err ("no write servers defined for %s, cannot learn", stcf->symbol); + return NULL; + } + + if (learn) { + up = rspamd_upstream_get (elt->write_servers, RSPAMD_UPSTREAM_MASTER_SLAVE); + } + else { + up = rspamd_upstream_get (elt->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN); + } + + if (up == NULL) { + msg_err ("no upstreams reachable"); + return NULL; + } + + rt = rspamd_mempool_alloc (task->task_pool, sizeof (*rt)); + rspamd_redis_expand_object (elt->redis_object, stcf, task, + &rt->redis_object_expanded); + rt->selected = up; + rt->task = task; + + addr = rspamd_upstream_addr (up); + g_assert (addr != NULL); + rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + g_assert (rt->redis != NULL); + + redisLibeventAttach (rt->redis, task->ev_base); + register_async_event (task->s, rspamd_redis_fin, rt, + rspamd_redis_stat_quark ()); + return rt; } gboolean rspamd_redis_process_token (struct token_node_s *tok, |