From 8e8bddff44da6fc4fa9eef6405d7f51adb77d467 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 11 Jan 2016 09:54:59 +0000 Subject: [PATCH] Write initialization for redis cache --- src/libstat/learn_cache/learn_cache.h | 4 +- src/libstat/learn_cache/redis_cache.c | 203 +++++++++++++++++++++++- src/libstat/learn_cache/sqlite3_cache.c | 2 +- src/libstat/stat_process.c | 4 +- 4 files changed, 205 insertions(+), 8 deletions(-) diff --git a/src/libstat/learn_cache/learn_cache.h b/src/libstat/learn_cache/learn_cache.h index 1ebe2864a..263252695 100644 --- a/src/libstat/learn_cache/learn_cache.h +++ b/src/libstat/learn_cache/learn_cache.h @@ -42,7 +42,7 @@ struct rspamd_stat_cache { struct rspamd_statfile *st, const ucl_object_t *cf); gpointer (*runtime)(struct rspamd_task *task, - gpointer ctx); + gpointer ctx, gboolean learn); gint (*check)(struct rspamd_task *task, gboolean is_spam, gpointer runtime, @@ -61,7 +61,7 @@ struct rspamd_stat_cache { struct rspamd_statfile *st, \ const ucl_object_t *cf); \ gpointer rspamd_stat_cache_##name##_runtime (struct rspamd_task *task, \ - gpointer ctx); \ + gpointer ctx, gboolean learn); \ gint rspamd_stat_cache_##name##_check (struct rspamd_task *task, \ gboolean is_spam, \ gpointer runtime, \ diff --git a/src/libstat/learn_cache/redis_cache.c b/src/libstat/learn_cache/redis_cache.c index de88936be..56c651fc9 100644 --- a/src/libstat/learn_cache/redis_cache.c +++ b/src/libstat/learn_cache/redis_cache.c @@ -32,20 +32,211 @@ #include "hiredis/hiredis.h" #include "hiredis/adapters/libevent.h" +#define REDIS_DEFAULT_TIMEOUT 0.5 +#define REDIS_STAT_TIMEOUT 30 +#define REDIS_DEFAULT_PORT 6379 +#define DEFAULT_REDIS_KEY "learned_ids" + +struct rspamd_redis_cache_ctx { + struct rspamd_statfile_config *stcf; + struct upstream_list *read_servers; + struct upstream_list *write_servers; + const gchar *redis_object; + gdouble timeout; +}; + +struct rspamd_redis_cache_runtime { + struct rspamd_redis_cache_ctx *ctx; + struct rspamd_task *task; + struct upstream *selected; + struct event timeout_event; + redisAsyncContext *redis; +}; + +static GQuark +rspamd_stat_cache_redis_quark (void) +{ + return g_quark_from_static_string ("redis-statistics"); +} + +/* Called on connection termination */ +static void +rspamd_redis_cache_fin (gpointer data) +{ + struct rspamd_redis_cache_runtime *rt = data; + + event_del (&rt->timeout_event); + redisAsyncFree (rt->redis); +} + +static void +rspamd_redis_cache_timeout (gint fd, short what, gpointer d) +{ + struct rspamd_redis_cache_runtime *rt = d; + struct rspamd_task *task; + + task = rt->task; + + msg_err_task ("connection to redis server %s timed out", + rspamd_upstream_name (rt->selected)); + rspamd_upstream_fail (rt->selected); + rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, d); +} + +static void +rspamd_stat_cache_redis_generate_id (struct rspamd_task *task) +{ + rspamd_cryptobox_hash_state_t st; + rspamd_token_t *tok; + guint i; + guchar out[rspamd_cryptobox_HASHBYTES]; + gchar *b32out; + + rspamd_cryptobox_hash_init (&st, NULL, 0); + + for (i = 0; i < task->tokens->len; i ++) { + tok = g_ptr_array_index (task->tokens, i); + rspamd_cryptobox_hash_update (&st, tok->data, tok->datalen); + } + + rspamd_cryptobox_hash_final (&st, out); + + b32out = rspamd_encode_base32 (out, sizeof (out)); + g_assert (b32out != NULL); + rspamd_mempool_set_variable (task->task_pool, "words_hash", b32out, g_free); +} + gpointer rspamd_stat_cache_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg, struct rspamd_statfile *st, const ucl_object_t *cf) { - return NULL; + struct rspamd_redis_cache_ctx *cache_ctx; + struct rspamd_statfile_config *stf = st->stcf; + const ucl_object_t *elt; + + cache_ctx = g_slice_alloc0 (sizeof (*cache_ctx)); + + elt = ucl_object_find_key (stf->opts, "read_servers"); + if (elt == NULL) { + elt = ucl_object_find_key (stf->opts, "servers"); + } + if (elt == NULL) { + msg_err ("statfile %s has no redis servers", stf->symbol); + + return NULL; + } + else { + cache_ctx->read_servers = rspamd_upstreams_create (cfg->ups_ctx); + if (!rspamd_upstreams_from_ucl (cache_ctx->read_servers, elt, + REDIS_DEFAULT_PORT, NULL)) { + msg_err ("statfile %s cannot read servers configuration", + stf->symbol); + return NULL; + } + } + + elt = ucl_object_find_key (stf->opts, "write_servers"); + if (elt == NULL) { + msg_err ("statfile %s has no write redis servers, " + "so learning is impossible", stf->symbol); + cache_ctx->write_servers = NULL; + } + else { + cache_ctx->write_servers = rspamd_upstreams_create (cfg->ups_ctx); + if (!rspamd_upstreams_from_ucl (cache_ctx->write_servers, elt, + REDIS_DEFAULT_PORT, NULL)) { + msg_err ("statfile %s cannot write servers configuration", + stf->symbol); + rspamd_upstreams_destroy (cache_ctx->write_servers); + cache_ctx->write_servers = NULL; + } + } + + elt = ucl_object_find_key (stf->opts, "key"); + if (elt == NULL || ucl_object_type (elt) != UCL_STRING) { + cache_ctx->redis_object = DEFAULT_REDIS_KEY; + } + else { + cache_ctx->redis_object = ucl_object_tostring (elt); + } + + elt = ucl_object_find_key (stf->opts, "timeout"); + if (elt) { + cache_ctx->timeout = ucl_object_todouble (elt); + } + else { + cache_ctx->timeout = REDIS_DEFAULT_TIMEOUT; + } + + cache_ctx->stcf = stf; + + return (gpointer)cache_ctx; } gpointer rspamd_stat_cache_redis_runtime (struct rspamd_task *task, - gpointer ctx) + gpointer c, gboolean learn) { - return NULL; + struct rspamd_redis_cache_ctx *ctx = c; + struct rspamd_redis_cache_runtime *rt; + struct upstream *up; + rspamd_inet_addr_t *addr; + struct timeval tv; + + g_assert (ctx != NULL); + + if (learn && ctx->write_servers == NULL) { + msg_err_task ("no write servers defined for %s, cannot learn", + ctx->stcf->symbol); + return NULL; + } + + if (learn) { + up = rspamd_upstream_get (ctx->write_servers, + RSPAMD_UPSTREAM_MASTER_SLAVE, + NULL, + 0); + } + else { + up = rspamd_upstream_get (ctx->read_servers, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); + } + + if (up == NULL) { + msg_err_task ("no upstreams reachable"); + return NULL; + } + + rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt)); + rt->selected = up; + rt->task = task; + rt->ctx = ctx; + + addr = rspamd_upstream_addr (up); + g_assert (addr != NULL); + rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + g_assert (rt->redis != NULL); + + redisLibeventAttach (rt->redis, task->ev_base); + rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt, + rspamd_stat_cache_redis_quark ()); + + /* Now check stats */ + event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_cache_timeout, rt); + event_base_set (task->ev_base, &rt->timeout_event); + double_to_tv (ctx->timeout, &tv); + event_add (&rt->timeout_event, &tv); + + if (!learn) { + rspamd_stat_cache_redis_generate_id (task); + } + + return rt; } gint @@ -54,6 +245,12 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task, gpointer runtime, gpointer c) { + struct rspamd_redis_cache_runtime *rt = runtime; + gchar *h; + + h = rspamd_mempool_get_variable (task->task_pool, "words_hash"); + g_assert (h != NULL); + return RSPAMD_LEARN_OK; } diff --git a/src/libstat/learn_cache/sqlite3_cache.c b/src/libstat/learn_cache/sqlite3_cache.c index 4d97e3084..9594941ba 100644 --- a/src/libstat/learn_cache/sqlite3_cache.c +++ b/src/libstat/learn_cache/sqlite3_cache.c @@ -171,7 +171,7 @@ rspamd_stat_cache_sqlite3_init (struct rspamd_stat_ctx *ctx, gpointer rspamd_stat_cache_sqlite3_runtime (struct rspamd_task *task, - gpointer ctx) + gpointer ctx, gboolean learn) { /* No need of runtime for this type of classifier */ return NULL; diff --git a/src/libstat/stat_process.c b/src/libstat/stat_process.c index 864336a61..74b226407 100644 --- a/src/libstat/stat_process.c +++ b/src/libstat/stat_process.c @@ -390,7 +390,7 @@ rspamd_stat_cache_check (struct rspamd_stat_ctx *st_ctx, } if (cl->cache && cl->cachecf) { - rt = cl->cache->runtime (task, cl->cachecf); + rt = cl->cache->runtime (task, cl->cachecf, FALSE); learn_res = cl->cache->check (task, spam, cl->cachecf, rt); } @@ -575,7 +575,7 @@ rspamd_stat_backends_post_learn (struct rspamd_stat_ctx *st_ctx, } if (cl->cache) { - cache_run = cl->cache->runtime (task, cl->cachecf); + cache_run = cl->cache->runtime (task, cl->cachecf, TRUE); cl->cache->learn (task, spam, cache_run, cl->cachecf); } -- 2.39.5