From: Vsevolod Stakhov Date: Thu, 7 Jan 2016 16:58:40 +0000 (+0000) Subject: Move redis.c to redis_backend.c X-Git-Tag: 1.1.0~105 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=622717cee90166393b5ade4ec2636dc1d16cc042;p=rspamd.git Move redis.c to redis_backend.c --- diff --git a/src/libstat/CMakeLists.txt b/src/libstat/CMakeLists.txt index b6dfeb3c8..4a1a848bb 100644 --- a/src/libstat/CMakeLists.txt +++ b/src/libstat/CMakeLists.txt @@ -11,7 +11,7 @@ SET(BACKENDSSRC ${CMAKE_CURRENT_SOURCE_DIR}/backends/mmaped_file.c ${CMAKE_CURRENT_SOURCE_DIR}/backends/sqlite3_backend.c) IF(ENABLE_HIREDIS MATCHES "ON") SET(BACKENDSSRC ${BACKENDSSRC} - ${CMAKE_CURRENT_SOURCE_DIR}/backends/redis.c) + ${CMAKE_CURRENT_SOURCE_DIR}/backends/redis_backend.c) ENDIF(ENABLE_HIREDIS MATCHES "ON") SET(CACHESSRC ${CMAKE_CURRENT_SOURCE_DIR}/learn_cache/sqlite3_cache.c) diff --git a/src/libstat/backends/redis.c b/src/libstat/backends/redis.c deleted file mode 100644 index e60ed3826..000000000 --- a/src/libstat/backends/redis.c +++ /dev/null @@ -1,573 +0,0 @@ -/* Copyright (c) 2015, Vsevolod Stakhov - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#include "config.h" -#include "rspamd.h" -#include "stat_internal.h" -#include "upstream.h" - -#ifdef WITH_HIREDIS -#include "hiredis/hiredis.h" -#include "hiredis/adapters/libevent.h" -#endif - -#define REDIS_CTX(p) (struct redis_stat_ctx *)(p) -#define REDIS_RUNTIME(p) (struct redis_stat_runtime *)(p) -#define REDIS_BACKEND_TYPE "redis" -#define REDIS_DEFAULT_PORT 6379 -#define REDIS_DEFAULT_OBJECT "%s%l" -#define REDIS_DEFAULT_TIMEOUT 0.5 - -struct redis_stat_ctx { - struct upstream_list *read_servers; - struct upstream_list *write_servers; - - const gchar *redis_object; - gdouble timeout; -}; - -struct redis_stat_runtime { - struct redis_stat_ctx *ctx; - struct rspamd_task *task; - struct upstream *selected; - struct event timeout_event; - GArray *results; - gchar *redis_object_expanded; - redisAsyncContext *redis; - guint64 learned; - gboolean connected; -}; - -#define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt) - -static GQuark -rspamd_redis_stat_quark (void) -{ - return g_quark_from_static_string ("redis-statistics"); -} - - -/* - * Non-static for lua unit testing - */ -gsize -rspamd_redis_expand_object (const gchar *pattern, - struct rspamd_statfile_config *stcf, - struct rspamd_task *task, - gchar **target) -{ - gsize tlen = 0; - const gchar *p = pattern, *elt; - InternetAddressList *ia; - InternetAddress *iaelt; - InternetAddressMailbox *imb; - gchar *d, *end; - enum { - just_char, - percent_char, - mod_char - } state = just_char; - - g_assert (stcf != NULL); - - /* Length calculation */ - while (*p) { - switch (state) { - case just_char: - if (*p == '%') { - state = percent_char; - } - else { - tlen ++; - } - p ++; - break; - case percent_char: - switch (*p) { - case '%': - tlen ++; - state = just_char; - break; - case 'f': - if (task) { - elt = rspamd_task_get_sender (task); - if (elt) { - tlen += strlen (elt); - } - } - break; - case 'u': - elt = GET_TASK_ELT (task, user); - if (elt) { - tlen += strlen (elt); - } - break; - case 'r': - ia = GET_TASK_ELT (task, rcpt_envelope); - if (ia != NULL) { - iaelt = internet_address_list_get_address (ia, 0); - imb = INTERNET_ADDRESS_IS_MAILBOX (iaelt) ? - INTERNET_ADDRESS_MAILBOX (iaelt) : NULL; - - elt = (imb ? internet_address_mailbox_get_addr (imb) : NULL); - - if (elt) { - tlen += strlen (elt); - } - } - break; - case 'l': - if (stcf->label) { - tlen += strlen (stcf->label); - } - break; - case 's': - if (stcf->symbol) { - tlen += strlen (stcf->symbol); - } - break; - default: - state = just_char; - tlen ++; - break; - } - - if (state == percent_char) { - state = mod_char; - } - p ++; - break; - - case mod_char: - switch (*p) { - case 'd': - p ++; - state = just_char; - break; - default: - state = just_char; - break; - } - break; - } - } - - if (target == NULL) { - return tlen; - } - - *target = rspamd_mempool_alloc (task->task_pool, tlen + 1); - d = *target; - end = d + tlen + 1; - d[tlen] = '\0'; - p = pattern; - state = just_char; - - /* Expand string */ - while (*p && d < end) { - switch (state) { - case just_char: - if (*p == '%') { - state = percent_char; - } - else { - *d++ = *p; - } - p ++; - break; - case percent_char: - switch (*p) { - case '%': - *d++ = *p; - state = just_char; - break; - case 'f': - if (task) { - elt = rspamd_task_get_sender (task); - if (elt) { - d += rspamd_strlcpy (d, elt, end - d); - } - } - break; - case 'u': - elt = GET_TASK_ELT (task, user); - if (elt) { - d += rspamd_strlcpy (d, elt, end - d); - } - break; - case 'r': - ia = GET_TASK_ELT (task, rcpt_envelope); - if (ia != NULL) { - iaelt = internet_address_list_get_address (ia, 0); - imb = INTERNET_ADDRESS_IS_MAILBOX (iaelt) ? - INTERNET_ADDRESS_MAILBOX (iaelt) : NULL; - - elt = (imb ? internet_address_mailbox_get_addr (imb) : NULL); - - if (elt) { - d += rspamd_strlcpy (d, elt, end - d); - } - } - break; - case 'l': - if (stcf->label) { - d += rspamd_strlcpy (d, stcf->label, end - d); - } - break; - case 's': - if (stcf->symbol) { - d += rspamd_strlcpy (d, stcf->symbol, end - d); - } - break; - default: - state = just_char; - *d++ = *p; - break; - } - - if (state == percent_char) { - state = mod_char; - } - p ++; - break; - - case mod_char: - switch (*p) { - case 'd': - /* TODO: not supported yet */ - p ++; - state = just_char; - break; - default: - state = just_char; - break; - } - break; - } - } - - return tlen; -} - -/* Called on connection termination */ -static void -rspamd_redis_fin (gpointer data) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (data); - - redisAsyncFree (rt->redis); - event_del (&rt->timeout_event); -} - -static void -rspamd_redis_timeout (gint fd, short what, gpointer d) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (d); - struct rspamd_task *task; - - task = rt->task; - - msg_err_task ("connection to redis server %s timed out", - rspamd_upstream_name (rt->selected)); - rspamd_upstream_fail (rt->selected); - rspamd_session_remove_event (task->s, rspamd_redis_fin, d); -} - -/* Called when we have connected to the redis server and got stats */ -static void -rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (priv); - redisReply *reply = r; - struct rspamd_task *task; - - task = rt->task; - - if (c->err == 0) { - if (r != NULL) { - if (reply->type == REDIS_REPLY_INTEGER) { - rt->learned = reply->integer; - } - else { - rt->learned = 0; - } - - rt->connected = TRUE; - - msg_debug_task ("connected to redis server, tokens learned for %s: %d", - rt->redis_object_expanded, rt->learned); - } - else { - msg_err_task ("error getting reply from redis server %s: %s", - rspamd_upstream_name (rt->selected), c->errstr); - rspamd_upstream_fail (rt->selected); - rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); - } - } - else { - msg_err_task ("error getting reply from redis server %s: %s", - rspamd_upstream_name (rt->selected), c->errstr); - rspamd_upstream_fail (rt->selected); - rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); - } -} - -gpointer -rspamd_redis_init (struct rspamd_stat_ctx *ctx, - struct rspamd_config *cfg, struct rspamd_statfile *st) -{ - struct redis_stat_ctx *backend; - struct rspamd_statfile_config *stf = st->stcf; - const ucl_object_t *elt; - - backend = g_slice_alloc0 (sizeof (*backend)); - - elt = ucl_object_find_key (stf->opts, "read_servers"); - if (elt == NULL) { - elt = ucl_object_find_key (stf->opts, "servers"); - } - if (elt == NULL) { - msg_err ("statfile %s has no redis servers", stf->symbol); - - return NULL; - } - else { - backend->read_servers = rspamd_upstreams_create (cfg->ups_ctx); - if (!rspamd_upstreams_from_ucl (backend->read_servers, elt, - REDIS_DEFAULT_PORT, NULL)) { - msg_err ("statfile %s cannot read servers configuration", - stf->symbol); - return NULL; - } - } - - elt = ucl_object_find_key (stf->opts, "write_servers"); - if (elt == NULL) { - msg_err ("statfile %s has no write redis servers, " - "so learning is impossible", stf->symbol); - backend->write_servers = NULL; - } - else { - backend->write_servers = rspamd_upstreams_create (cfg->ups_ctx); - if (!rspamd_upstreams_from_ucl (backend->write_servers, elt, - REDIS_DEFAULT_PORT, NULL)) { - msg_err ("statfile %s cannot write servers configuration", - stf->symbol); - rspamd_upstreams_destroy (backend->write_servers); - backend->write_servers = NULL; - } - } - - elt = ucl_object_find_key (stf->opts, "prefix"); - if (elt == NULL || ucl_object_type (elt) != UCL_STRING) { - backend->redis_object = REDIS_DEFAULT_OBJECT; - } - else { - /* XXX: sanity check */ - backend->redis_object = ucl_object_tostring (elt); - if (rspamd_redis_expand_object (backend->redis_object, stf, - NULL, NULL) == 0) { - msg_err ("statfile %s cannot write servers configuration", - stf->symbol); - } - } - - elt = ucl_object_find_key (stf->opts, "timeout"); - if (elt) { - backend->timeout = ucl_object_todouble (elt); - } - else { - backend->timeout = REDIS_DEFAULT_TIMEOUT; - } - - - return (gpointer)backend; -} - -gpointer -rspamd_redis_runtime (struct rspamd_task *task, - struct rspamd_statfile_config *stcf, - gboolean learn, gpointer c) -{ - struct redis_stat_ctx *ctx = REDIS_CTX (c); - struct redis_stat_runtime *rt; - struct upstream *up; - rspamd_inet_addr_t *addr; - struct timeval tv; - - g_assert (ctx != NULL); - g_assert (stcf != NULL); - - if (learn && ctx->write_servers == NULL) { - msg_err ("no write servers defined for %s, cannot learn", stcf->symbol); - return NULL; - } - - if (learn) { - up = rspamd_upstream_get (ctx->write_servers, - RSPAMD_UPSTREAM_MASTER_SLAVE, - NULL, - 0); - } - else { - up = rspamd_upstream_get (ctx->read_servers, - RSPAMD_UPSTREAM_ROUND_ROBIN, - NULL, - 0); - } - - if (up == NULL) { - msg_err ("no upstreams reachable"); - return NULL; - } - - rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt)); - rspamd_redis_expand_object (ctx->redis_object, stcf, task, - &rt->redis_object_expanded); - rt->selected = up; - rt->task = task; - - addr = rspamd_upstream_addr (up); - g_assert (addr != NULL); - rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr), - rspamd_inet_address_get_port (addr)); - g_assert (rt->redis != NULL); - - redisLibeventAttach (rt->redis, task->ev_base); - rspamd_session_add_event (task->s, rspamd_redis_fin, rt, - rspamd_redis_stat_quark ()); - - /* Now check stats */ - event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt); - event_base_set (task->ev_base, &rt->timeout_event); - double_to_tv (ctx->timeout, &tv); - event_add (&rt->timeout_event, &tv); - redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s", - rt->redis_object_expanded, "learned"); - - return rt; -} - -void -rspamd_redis_close (gpointer p) -{ - struct redis_stat_ctx *ctx = REDIS_CTX (p); - - if (ctx->read_servers) { - rspamd_upstreams_destroy (ctx->read_servers); - } - - if (ctx->write_servers) { - rspamd_upstreams_destroy (ctx->write_servers); - } - - g_slice_free1 (sizeof (*ctx), ctx); -} - -gboolean -rspamd_redis_process_tokens (struct rspamd_task *task, - GPtrArray *tokens, - gint id, gpointer p) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (p); - - return FALSE; -} - -void -rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime, - gpointer ctx) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); -} - -gboolean -rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, - gint id, gpointer p) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (p); - - return FALSE; -} - - -void -rspamd_redis_finalize_learn (struct rspamd_task *task, gpointer runtime, - gpointer ctx) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); -} - -gulong -rspamd_redis_total_learns (struct rspamd_task *task, gpointer runtime, - gpointer ctx) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); - - return 0; -} - -gulong -rspamd_redis_inc_learns (struct rspamd_task *task, gpointer runtime, - gpointer ctx) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); - - return 0; -} - -gulong -rspamd_redis_dec_learns (struct rspamd_task *task, gpointer runtime, - gpointer ctx) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); - - return 0; -} - -gulong -rspamd_redis_learns (struct rspamd_task *task, gpointer runtime, - gpointer ctx) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); - - return 0; -} - -ucl_object_t * -rspamd_redis_get_stat (gpointer runtime, - gpointer ctx) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); - - return NULL; -} - -gpointer -rspamd_redis_load_tokenizer_config (gpointer runtime, - gsize *len) -{ - struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); - - return NULL; -} diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c new file mode 100644 index 000000000..e60ed3826 --- /dev/null +++ b/src/libstat/backends/redis_backend.c @@ -0,0 +1,573 @@ +/* Copyright (c) 2015, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "rspamd.h" +#include "stat_internal.h" +#include "upstream.h" + +#ifdef WITH_HIREDIS +#include "hiredis/hiredis.h" +#include "hiredis/adapters/libevent.h" +#endif + +#define REDIS_CTX(p) (struct redis_stat_ctx *)(p) +#define REDIS_RUNTIME(p) (struct redis_stat_runtime *)(p) +#define REDIS_BACKEND_TYPE "redis" +#define REDIS_DEFAULT_PORT 6379 +#define REDIS_DEFAULT_OBJECT "%s%l" +#define REDIS_DEFAULT_TIMEOUT 0.5 + +struct redis_stat_ctx { + struct upstream_list *read_servers; + struct upstream_list *write_servers; + + const gchar *redis_object; + gdouble timeout; +}; + +struct redis_stat_runtime { + struct redis_stat_ctx *ctx; + struct rspamd_task *task; + struct upstream *selected; + struct event timeout_event; + GArray *results; + gchar *redis_object_expanded; + redisAsyncContext *redis; + guint64 learned; + gboolean connected; +}; + +#define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt) + +static GQuark +rspamd_redis_stat_quark (void) +{ + return g_quark_from_static_string ("redis-statistics"); +} + + +/* + * Non-static for lua unit testing + */ +gsize +rspamd_redis_expand_object (const gchar *pattern, + struct rspamd_statfile_config *stcf, + struct rspamd_task *task, + gchar **target) +{ + gsize tlen = 0; + const gchar *p = pattern, *elt; + InternetAddressList *ia; + InternetAddress *iaelt; + InternetAddressMailbox *imb; + gchar *d, *end; + enum { + just_char, + percent_char, + mod_char + } state = just_char; + + g_assert (stcf != NULL); + + /* Length calculation */ + while (*p) { + switch (state) { + case just_char: + if (*p == '%') { + state = percent_char; + } + else { + tlen ++; + } + p ++; + break; + case percent_char: + switch (*p) { + case '%': + tlen ++; + state = just_char; + break; + case 'f': + if (task) { + elt = rspamd_task_get_sender (task); + if (elt) { + tlen += strlen (elt); + } + } + break; + case 'u': + elt = GET_TASK_ELT (task, user); + if (elt) { + tlen += strlen (elt); + } + break; + case 'r': + ia = GET_TASK_ELT (task, rcpt_envelope); + if (ia != NULL) { + iaelt = internet_address_list_get_address (ia, 0); + imb = INTERNET_ADDRESS_IS_MAILBOX (iaelt) ? + INTERNET_ADDRESS_MAILBOX (iaelt) : NULL; + + elt = (imb ? internet_address_mailbox_get_addr (imb) : NULL); + + if (elt) { + tlen += strlen (elt); + } + } + break; + case 'l': + if (stcf->label) { + tlen += strlen (stcf->label); + } + break; + case 's': + if (stcf->symbol) { + tlen += strlen (stcf->symbol); + } + break; + default: + state = just_char; + tlen ++; + break; + } + + if (state == percent_char) { + state = mod_char; + } + p ++; + break; + + case mod_char: + switch (*p) { + case 'd': + p ++; + state = just_char; + break; + default: + state = just_char; + break; + } + break; + } + } + + if (target == NULL) { + return tlen; + } + + *target = rspamd_mempool_alloc (task->task_pool, tlen + 1); + d = *target; + end = d + tlen + 1; + d[tlen] = '\0'; + p = pattern; + state = just_char; + + /* Expand string */ + while (*p && d < end) { + switch (state) { + case just_char: + if (*p == '%') { + state = percent_char; + } + else { + *d++ = *p; + } + p ++; + break; + case percent_char: + switch (*p) { + case '%': + *d++ = *p; + state = just_char; + break; + case 'f': + if (task) { + elt = rspamd_task_get_sender (task); + if (elt) { + d += rspamd_strlcpy (d, elt, end - d); + } + } + break; + case 'u': + elt = GET_TASK_ELT (task, user); + if (elt) { + d += rspamd_strlcpy (d, elt, end - d); + } + break; + case 'r': + ia = GET_TASK_ELT (task, rcpt_envelope); + if (ia != NULL) { + iaelt = internet_address_list_get_address (ia, 0); + imb = INTERNET_ADDRESS_IS_MAILBOX (iaelt) ? + INTERNET_ADDRESS_MAILBOX (iaelt) : NULL; + + elt = (imb ? internet_address_mailbox_get_addr (imb) : NULL); + + if (elt) { + d += rspamd_strlcpy (d, elt, end - d); + } + } + break; + case 'l': + if (stcf->label) { + d += rspamd_strlcpy (d, stcf->label, end - d); + } + break; + case 's': + if (stcf->symbol) { + d += rspamd_strlcpy (d, stcf->symbol, end - d); + } + break; + default: + state = just_char; + *d++ = *p; + break; + } + + if (state == percent_char) { + state = mod_char; + } + p ++; + break; + + case mod_char: + switch (*p) { + case 'd': + /* TODO: not supported yet */ + p ++; + state = just_char; + break; + default: + state = just_char; + break; + } + break; + } + } + + return tlen; +} + +/* Called on connection termination */ +static void +rspamd_redis_fin (gpointer data) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (data); + + redisAsyncFree (rt->redis); + event_del (&rt->timeout_event); +} + +static void +rspamd_redis_timeout (gint fd, short what, gpointer d) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (d); + struct rspamd_task *task; + + task = rt->task; + + msg_err_task ("connection to redis server %s timed out", + rspamd_upstream_name (rt->selected)); + rspamd_upstream_fail (rt->selected); + rspamd_session_remove_event (task->s, rspamd_redis_fin, d); +} + +/* Called when we have connected to the redis server and got stats */ +static void +rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (priv); + redisReply *reply = r; + struct rspamd_task *task; + + task = rt->task; + + if (c->err == 0) { + if (r != NULL) { + if (reply->type == REDIS_REPLY_INTEGER) { + rt->learned = reply->integer; + } + else { + rt->learned = 0; + } + + rt->connected = TRUE; + + msg_debug_task ("connected to redis server, tokens learned for %s: %d", + rt->redis_object_expanded, rt->learned); + } + else { + msg_err_task ("error getting reply from redis server %s: %s", + rspamd_upstream_name (rt->selected), c->errstr); + rspamd_upstream_fail (rt->selected); + rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); + } + } + else { + msg_err_task ("error getting reply from redis server %s: %s", + rspamd_upstream_name (rt->selected), c->errstr); + rspamd_upstream_fail (rt->selected); + rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); + } +} + +gpointer +rspamd_redis_init (struct rspamd_stat_ctx *ctx, + struct rspamd_config *cfg, struct rspamd_statfile *st) +{ + struct redis_stat_ctx *backend; + struct rspamd_statfile_config *stf = st->stcf; + const ucl_object_t *elt; + + backend = g_slice_alloc0 (sizeof (*backend)); + + elt = ucl_object_find_key (stf->opts, "read_servers"); + if (elt == NULL) { + elt = ucl_object_find_key (stf->opts, "servers"); + } + if (elt == NULL) { + msg_err ("statfile %s has no redis servers", stf->symbol); + + return NULL; + } + else { + backend->read_servers = rspamd_upstreams_create (cfg->ups_ctx); + if (!rspamd_upstreams_from_ucl (backend->read_servers, elt, + REDIS_DEFAULT_PORT, NULL)) { + msg_err ("statfile %s cannot read servers configuration", + stf->symbol); + return NULL; + } + } + + elt = ucl_object_find_key (stf->opts, "write_servers"); + if (elt == NULL) { + msg_err ("statfile %s has no write redis servers, " + "so learning is impossible", stf->symbol); + backend->write_servers = NULL; + } + else { + backend->write_servers = rspamd_upstreams_create (cfg->ups_ctx); + if (!rspamd_upstreams_from_ucl (backend->write_servers, elt, + REDIS_DEFAULT_PORT, NULL)) { + msg_err ("statfile %s cannot write servers configuration", + stf->symbol); + rspamd_upstreams_destroy (backend->write_servers); + backend->write_servers = NULL; + } + } + + elt = ucl_object_find_key (stf->opts, "prefix"); + if (elt == NULL || ucl_object_type (elt) != UCL_STRING) { + backend->redis_object = REDIS_DEFAULT_OBJECT; + } + else { + /* XXX: sanity check */ + backend->redis_object = ucl_object_tostring (elt); + if (rspamd_redis_expand_object (backend->redis_object, stf, + NULL, NULL) == 0) { + msg_err ("statfile %s cannot write servers configuration", + stf->symbol); + } + } + + elt = ucl_object_find_key (stf->opts, "timeout"); + if (elt) { + backend->timeout = ucl_object_todouble (elt); + } + else { + backend->timeout = REDIS_DEFAULT_TIMEOUT; + } + + + return (gpointer)backend; +} + +gpointer +rspamd_redis_runtime (struct rspamd_task *task, + struct rspamd_statfile_config *stcf, + gboolean learn, gpointer c) +{ + struct redis_stat_ctx *ctx = REDIS_CTX (c); + struct redis_stat_runtime *rt; + struct upstream *up; + rspamd_inet_addr_t *addr; + struct timeval tv; + + g_assert (ctx != NULL); + g_assert (stcf != NULL); + + if (learn && ctx->write_servers == NULL) { + msg_err ("no write servers defined for %s, cannot learn", stcf->symbol); + return NULL; + } + + if (learn) { + up = rspamd_upstream_get (ctx->write_servers, + RSPAMD_UPSTREAM_MASTER_SLAVE, + NULL, + 0); + } + else { + up = rspamd_upstream_get (ctx->read_servers, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); + } + + if (up == NULL) { + msg_err ("no upstreams reachable"); + return NULL; + } + + rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt)); + rspamd_redis_expand_object (ctx->redis_object, stcf, task, + &rt->redis_object_expanded); + rt->selected = up; + rt->task = task; + + addr = rspamd_upstream_addr (up); + g_assert (addr != NULL); + rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + g_assert (rt->redis != NULL); + + redisLibeventAttach (rt->redis, task->ev_base); + rspamd_session_add_event (task->s, rspamd_redis_fin, rt, + rspamd_redis_stat_quark ()); + + /* Now check stats */ + event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt); + event_base_set (task->ev_base, &rt->timeout_event); + double_to_tv (ctx->timeout, &tv); + event_add (&rt->timeout_event, &tv); + redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s", + rt->redis_object_expanded, "learned"); + + return rt; +} + +void +rspamd_redis_close (gpointer p) +{ + struct redis_stat_ctx *ctx = REDIS_CTX (p); + + if (ctx->read_servers) { + rspamd_upstreams_destroy (ctx->read_servers); + } + + if (ctx->write_servers) { + rspamd_upstreams_destroy (ctx->write_servers); + } + + g_slice_free1 (sizeof (*ctx), ctx); +} + +gboolean +rspamd_redis_process_tokens (struct rspamd_task *task, + GPtrArray *tokens, + gint id, gpointer p) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (p); + + return FALSE; +} + +void +rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime, + gpointer ctx) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); +} + +gboolean +rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, + gint id, gpointer p) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (p); + + return FALSE; +} + + +void +rspamd_redis_finalize_learn (struct rspamd_task *task, gpointer runtime, + gpointer ctx) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); +} + +gulong +rspamd_redis_total_learns (struct rspamd_task *task, gpointer runtime, + gpointer ctx) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); + + return 0; +} + +gulong +rspamd_redis_inc_learns (struct rspamd_task *task, gpointer runtime, + gpointer ctx) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); + + return 0; +} + +gulong +rspamd_redis_dec_learns (struct rspamd_task *task, gpointer runtime, + gpointer ctx) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); + + return 0; +} + +gulong +rspamd_redis_learns (struct rspamd_task *task, gpointer runtime, + gpointer ctx) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); + + return 0; +} + +ucl_object_t * +rspamd_redis_get_stat (gpointer runtime, + gpointer ctx) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); + + return NULL; +} + +gpointer +rspamd_redis_load_tokenizer_config (gpointer runtime, + gsize *len) +{ + struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); + + return NULL; +}