aboutsummaryrefslogtreecommitdiffstats
path: root/src/libstat
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-03-02 16:32:22 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-03-02 16:32:22 +0000
commitac1748b066ce20567a83de6352376963e7563af1 (patch)
tree9ce582fb6410b8d2b2296e8f692ac6e5e7c0cc71 /src/libstat
parent132da5d4b35335eb490c63f5d1ac889b1a71bfd0 (diff)
downloadrspamd-ac1748b066ce20567a83de6352376963e7563af1.tar.gz
rspamd-ac1748b066ce20567a83de6352376963e7563af1.zip
Implement runtime creation for redis.
Diffstat (limited to 'src/libstat')
-rw-r--r--src/libstat/backends/redis.c93
1 files changed, 82 insertions, 11 deletions
diff --git a/src/libstat/backends/redis.c b/src/libstat/backends/redis.c
index 1b35802a1..070924689 100644
--- a/src/libstat/backends/redis.c
+++ b/src/libstat/backends/redis.c
@@ -25,6 +25,7 @@
#include "main.h"
#include "stat_internal.h"
#include "hiredis.h"
+#include "adapters/libevent.h"
#include "upstream.h"
#define REDIS_CTX(p) (struct redis_stat_ctx *)(p)
@@ -33,7 +34,7 @@
#define REDIS_DEFAULT_PORT 6379
#define REDIS_DEFAULT_OBJECT "%s%l"
-struct redis_stat_ctx {
+struct redis_stat_ctx_elt {
struct upstream_list *read_servers;
struct upstream_list *write_servers;
@@ -41,15 +42,34 @@ struct redis_stat_ctx {
gdouble timeout;
};
+struct redis_stat_ctx {
+ GHashTable *redis_elts;
+};
+
struct redis_stat_runtime {
struct rspamd_task *task;
struct upstream *selected;
GArray *results;
gchar *redis_object_expanded;
+ redisAsyncContext *redis;
};
#define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt)
+static GQuark
+rspamd_redis_stat_quark (void)
+{
+ return g_quark_from_static_string ("redis-statistics");
+}
+
+static void
+rspamd_redis_fin (gpointer data)
+{
+ struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
+
+ redisAsyncFree (rt->redis);
+}
+
/*
* Non-static for lua unit testing
*/
@@ -256,12 +276,14 @@ gpointer
rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
{
struct redis_stat_ctx *new;
+ struct redis_stat_ctx_elt *backend;
struct rspamd_classifier_config *clf;
struct rspamd_statfile_config *stf;
GList *cur, *curst;
const ucl_object_t *elt;
new = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*new));
+ new->redis_elts = g_hash_table_new (g_direct_hash, g_direct_equal);
/* Iterate over all classifiers and load matching statfiles */
cur = cfg->classifiers;
@@ -280,6 +302,8 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
/*
* Check configuration sanity
*/
+ backend = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*backend));
+
elt = ucl_object_find_key (stf->opts, "read_servers");
if (elt == NULL) {
elt = ucl_object_find_key (stf->opts, "servers");
@@ -290,8 +314,8 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
continue;
}
else {
- new->read_servers = rspamd_upstreams_create ();
- if (!rspamd_upstreams_from_ucl (new->read_servers, elt,
+ backend->read_servers = rspamd_upstreams_create ();
+ if (!rspamd_upstreams_from_ucl (backend->read_servers, elt,
REDIS_DEFAULT_PORT, NULL)) {
msg_err ("statfile %s cannot read servers configuration",
stf->symbol);
@@ -308,30 +332,32 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
continue;
}
else {
- new->write_servers = rspamd_upstreams_create ();
- if (!rspamd_upstreams_from_ucl (new->read_servers, elt,
+ backend->write_servers = rspamd_upstreams_create ();
+ if (!rspamd_upstreams_from_ucl (backend->write_servers, elt,
REDIS_DEFAULT_PORT, NULL)) {
msg_err ("statfile %s cannot write servers configuration",
stf->symbol);
- rspamd_upstreams_destroy (new->write_servers);
- new->write_servers = NULL;
+ rspamd_upstreams_destroy (backend->write_servers);
+ backend->write_servers = NULL;
}
}
elt = ucl_object_find_key (stf->opts, "prefix");
if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
- new->redis_object = REDIS_DEFAULT_OBJECT;
+ backend->redis_object = REDIS_DEFAULT_OBJECT;
}
else {
/* XXX: sanity check */
- new->redis_object = ucl_object_tostring (elt);
- if (rspamd_redis_expand_object (new->redis_object, stf,
+ backend->redis_object = ucl_object_tostring (elt);
+ if (rspamd_redis_expand_object (backend->redis_object, stf,
NULL, NULL) == 0) {
msg_err ("statfile %s cannot write servers configuration",
stf->symbol);
}
}
+ g_hash_table_insert (new->redis_elts, stf, backend);
+
ctx->statfiles ++;
}
@@ -347,9 +373,54 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
gpointer
rspamd_redis_runtime (struct rspamd_task *task,
struct rspamd_statfile_config *stcf,
- gboolean learn, gpointer ctx)
+ gboolean learn, gpointer c)
{
+ struct redis_stat_ctx *ctx = REDIS_CTX (c);
+ struct redis_stat_ctx_elt *elt;
+ struct redis_stat_runtime *rt;
+ struct upstream *up;
+ rspamd_inet_addr_t *addr;
+
+ g_assert (ctx != NULL);
+ g_assert (stcf != NULL);
+
+ elt = g_hash_table_lookup (ctx->redis_elts, stcf);
+ g_assert (elt != NULL);
+
+ if (learn && elt->write_servers == NULL) {
+ msg_err ("no write servers defined for %s, cannot learn", stcf->symbol);
+ return NULL;
+ }
+
+ if (learn) {
+ up = rspamd_upstream_get (elt->write_servers, RSPAMD_UPSTREAM_MASTER_SLAVE);
+ }
+ else {
+ up = rspamd_upstream_get (elt->read_servers, RSPAMD_UPSTREAM_ROUND_ROBIN);
+ }
+
+ if (up == NULL) {
+ msg_err ("no upstreams reachable");
+ return NULL;
+ }
+
+ rt = rspamd_mempool_alloc (task->task_pool, sizeof (*rt));
+ rspamd_redis_expand_object (elt->redis_object, stcf, task,
+ &rt->redis_object_expanded);
+ rt->selected = up;
+ rt->task = task;
+
+ addr = rspamd_upstream_addr (up);
+ g_assert (addr != NULL);
+ rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
+ rspamd_inet_address_get_port (addr));
+ g_assert (rt->redis != NULL);
+
+ redisLibeventAttach (rt->redis, task->ev_base);
+ register_async_event (task->s, rspamd_redis_fin, rt,
+ rspamd_redis_stat_quark ());
+ return rt;
}
gboolean rspamd_redis_process_token (struct token_node_s *tok,