aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-02-27 17:15:36 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-02-27 17:15:36 +0000
commitf1752bac0b0c00717478bc6633e4afb527bd46da (patch)
treed3b35139e6d7737eec8b379d120429476f3c5f4f
parent3696893b558e90e80044ee4f86074b82f25fc838 (diff)
parent60f838ed65c4a64fb9acadb5d19f325cda5fae72 (diff)
downloadrspamd-f1752bac0b0c00717478bc6633e4afb527bd46da.tar.gz
rspamd-f1752bac0b0c00717478bc6633e4afb527bd46da.zip
Merge branch 'redis-stat'
-rw-r--r--src/libstat/CMakeLists.txt3
-rw-r--r--src/libstat/backends/backends.h30
-rw-r--r--src/libstat/backends/mmaped_file.c4
-rw-r--r--src/libstat/backends/redis.c368
-rw-r--r--src/libstat/stat_process.c4
5 files changed, 402 insertions, 7 deletions
diff --git a/src/libstat/CMakeLists.txt b/src/libstat/CMakeLists.txt
index 5d7184316..bf9aa94e4 100644
--- a/src/libstat/CMakeLists.txt
+++ b/src/libstat/CMakeLists.txt
@@ -7,7 +7,8 @@ SET(TOKENIZERSSRC ${CMAKE_CURRENT_SOURCE_DIR}/tokenizers/tokenizers.c
SET(CLASSIFIERSSRC ${CMAKE_CURRENT_SOURCE_DIR}/classifiers/bayes.c)
-SET(BACKENDSSRC ${CMAKE_CURRENT_SOURCE_DIR}/backends/mmaped_file.c)
+SET(BACKENDSSRC ${CMAKE_CURRENT_SOURCE_DIR}/backends/mmaped_file.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/backends/redis.c)
SET(CACHESSRC ${CMAKE_CURRENT_SOURCE_DIR}/learn_cache/sqlite3_cache.c)
diff --git a/src/libstat/backends/backends.h b/src/libstat/backends/backends.h
index e775faf6e..b6cc49f9e 100644
--- a/src/libstat/backends/backends.h
+++ b/src/libstat/backends/backends.h
@@ -38,11 +38,13 @@ struct rspamd_stat_ctx;
struct rspamd_token_result;
struct rspamd_statfile_runtime;
struct token_node_s;
+struct rspamd_task;
struct rspamd_stat_backend {
const char *name;
gpointer (*init)(struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg);
- gpointer (*runtime)(struct rspamd_statfile_config *stcf, gboolean learn, gpointer ctx);
+ gpointer (*runtime)(struct rspamd_task *task,
+ struct rspamd_statfile_config *stcf, gboolean learn, gpointer ctx);
gboolean (*process_token)(struct token_node_s *tok,
struct rspamd_token_result *res, gpointer ctx);
gboolean (*learn_token)(struct token_node_s *tok,
@@ -55,8 +57,9 @@ struct rspamd_stat_backend {
gpointer ctx;
};
-gpointer rspamd_mmaped_file_init(struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg);
-gpointer rspamd_mmaped_file_runtime (struct rspamd_statfile_config *stcf,
+gpointer rspamd_mmaped_file_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg);
+gpointer rspamd_mmaped_file_runtime (struct rspamd_task *task,
+ struct rspamd_statfile_config *stcf,
gboolean learn, gpointer ctx);
gboolean rspamd_mmaped_file_process_token (struct token_node_s *tok,
struct rspamd_token_result *res,
@@ -75,4 +78,25 @@ gulong rspamd_mmaped_file_dec_learns (struct rspamd_statfile_runtime *runtime,
ucl_object_t * rspamd_mmaped_file_get_stat (struct rspamd_statfile_runtime *runtime,
gpointer ctx);
+gpointer rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg);
+gpointer rspamd_redis_runtime (struct rspamd_task *task,
+ struct rspamd_statfile_config *stcf,
+ gboolean learn, gpointer ctx);
+gboolean rspamd_redis_process_token (struct token_node_s *tok,
+ struct rspamd_token_result *res,
+ gpointer ctx);
+gboolean rspamd_redis_learn_token (struct token_node_s *tok,
+ struct rspamd_token_result *res,
+ gpointer ctx);
+void rspamd_redis_finalize_learn (struct rspamd_statfile_runtime *runtime,
+ gpointer ctx);
+gulong rspamd_redis_total_learns (struct rspamd_statfile_runtime *runtime,
+ gpointer ctx);
+gulong rspamd_redis_inc_learns (struct rspamd_statfile_runtime *runtime,
+ gpointer ctx);
+gulong rspamd_redis_learns (struct rspamd_statfile_runtime *runtime,
+ gpointer ctx);
+ucl_object_t * rspamd_redis_get_stat (struct rspamd_statfile_runtime *runtime,
+ gpointer ctx);
+
#endif /* BACKENDS_H_ */
diff --git a/src/libstat/backends/mmaped_file.c b/src/libstat/backends/mmaped_file.c
index 2eaf6c638..5bb38c521 100644
--- a/src/libstat/backends/mmaped_file.c
+++ b/src/libstat/backends/mmaped_file.c
@@ -852,7 +852,9 @@ rspamd_mmaped_file_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
}
gpointer
-rspamd_mmaped_file_runtime (struct rspamd_statfile_config *stcf, gboolean learn,
+rspamd_mmaped_file_runtime (struct rspamd_task *task,
+ struct rspamd_statfile_config *stcf,
+ gboolean learn,
gpointer p)
{
rspamd_mmaped_file_ctx *ctx = (rspamd_mmaped_file_ctx *)p;
diff --git a/src/libstat/backends/redis.c b/src/libstat/backends/redis.c
new file mode 100644
index 000000000..9d05bc591
--- /dev/null
+++ b/src/libstat/backends/redis.c
@@ -0,0 +1,368 @@
+/* Copyright (c) 2015, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "main.h"
+#include "stat_internal.h"
+#include "hiredis.h"
+#include "upstream.h"
+
+#define REDIS_CTX(p) (struct redis_stat_ctx *)(p)
+#define REDIS_RUNTIME(p) (struct redis_stat_runtime *)(p)
+#define REDIS_BACKEND_TYPE "redis"
+#define REDIS_DEFAULT_PORT 6379
+#define REDIS_DEFAULT_OBJECT "%s%l"
+
+struct redis_stat_ctx {
+ struct upstream_list *read_servers;
+ struct upstream_list *write_servers;
+
+ const gchar *redis_object;
+ gdouble timeout;
+};
+
+struct redis_stat_runtime {
+ struct rspamd_task *task;
+ struct upstream *selected;
+ GArray *results;
+ gchar *redis_object_expanded;
+};
+
+#define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt)
+
+static gsize
+rspamd_redis_expand_object (const gchar *pattern,
+ struct rspamd_statfile_config *stcf,
+ struct rspamd_task *task,
+ gchar **target)
+{
+ gsize tlen = 0;
+ const gchar *p = pattern, *elt;
+ InternetAddressList *ia;
+ InternetAddress *iaelt;
+ InternetAddressMailbox *imb;
+ gchar *d, *end;
+ enum {
+ just_char,
+ percent_char,
+ mod_char
+ } state = just_char;
+
+ g_assert (stcf != NULL);
+
+ /* Length calculation */
+ while (*p) {
+ switch (state) {
+ case just_char:
+ if (*p == '%') {
+ state = percent_char;
+ }
+ else {
+ tlen ++;
+ }
+ p ++;
+ break;
+ case percent_char:
+ switch (*p) {
+ case '%':
+ tlen ++;
+ state = just_char;
+ break;
+ case 'f':
+ if (task) {
+ elt = rspamd_task_get_sender (task);
+ if (elt) {
+ tlen += strlen (elt);
+ }
+ }
+ break;
+ case 'u':
+ elt = GET_TASK_ELT (task, user);
+ if (elt) {
+ tlen += strlen (elt);
+ }
+ break;
+ case 'r':
+ ia = GET_TASK_ELT (task, rcpt_envelope);
+ if (ia != NULL) {
+ iaelt = internet_address_list_get_address (ia, 0);
+ imb = INTERNET_ADDRESS_IS_MAILBOX (iaelt) ?
+ INTERNET_ADDRESS_MAILBOX (iaelt) : NULL;
+
+ elt = (imb ? internet_address_mailbox_get_addr (imb) : NULL);
+
+ if (elt) {
+ tlen += strlen (elt);
+ }
+ }
+ break;
+ case 'l':
+ if (stcf->label) {
+ tlen += strlen (stcf->label);
+ }
+ break;
+ case 's':
+ if (stcf->symbol) {
+ tlen += strlen (stcf->symbol);
+ }
+ break;
+ default:
+ state = just_char;
+ tlen ++;
+ break;
+ }
+
+ if (state == percent_char) {
+ state = mod_char;
+ }
+ p ++;
+ break;
+
+ case mod_char:
+ switch (*p) {
+ case 'd':
+ p ++;
+ state = just_char;
+ break;
+ default:
+ state = just_char;
+ break;
+ }
+ break;
+ }
+ }
+
+ if (target == NULL) {
+ return tlen;
+ }
+
+ *target = rspamd_mempool_alloc (task->task_pool, tlen + 1);
+ d = *target;
+ end = d + tlen;
+ p = pattern;
+ state = just_char;
+
+ /* Expand string */
+ while (*p && d < end) {
+ switch (state) {
+ case just_char:
+ if (*p == '%') {
+ state = percent_char;
+ }
+ else {
+ *d++ = *p;
+ }
+ p ++;
+ break;
+ case percent_char:
+ switch (*p) {
+ case '%':
+ *d++ = *p;
+ state = just_char;
+ break;
+ case 'f':
+ if (task) {
+ elt = rspamd_task_get_sender (task);
+ if (elt) {
+ d += rspamd_strlcpy (d, elt, end - d);
+ }
+ }
+ break;
+ case 'u':
+ elt = GET_TASK_ELT (task, user);
+ if (elt) {
+ d += rspamd_strlcpy (d, elt, end - d);
+ }
+ break;
+ case 'r':
+ ia = GET_TASK_ELT (task, rcpt_envelope);
+ if (ia != NULL) {
+ iaelt = internet_address_list_get_address (ia, 0);
+ imb = INTERNET_ADDRESS_IS_MAILBOX (iaelt) ?
+ INTERNET_ADDRESS_MAILBOX (iaelt) : NULL;
+
+ elt = (imb ? internet_address_mailbox_get_addr (imb) : NULL);
+
+ if (elt) {
+ d += rspamd_strlcpy (d, elt, end - d);
+ }
+ }
+ break;
+ case 'l':
+ if (stcf->label) {
+ d += rspamd_strlcpy (d, stcf->label, end - d);
+ }
+ break;
+ case 's':
+ if (stcf->symbol) {
+ d += rspamd_strlcpy (d, stcf->symbol, end - d);
+ }
+ break;
+ default:
+ state = just_char;
+ *d++ = *p;
+ break;
+ }
+
+ if (state == percent_char) {
+ state = mod_char;
+ }
+ p ++;
+ break;
+
+ case mod_char:
+ switch (*p) {
+ case 'd':
+ /* TODO: not supported yet */
+ p ++;
+ state = just_char;
+ break;
+ default:
+ state = just_char;
+ break;
+ }
+ break;
+ }
+ }
+
+ *d = '\0';
+
+ return tlen;
+}
+
+gpointer
+rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
+{
+ struct redis_stat_ctx *new;
+ struct rspamd_classifier_config *clf;
+ struct rspamd_statfile_config *stf;
+ GList *cur, *curst;
+ const ucl_object_t *elt;
+
+ new = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*new));
+
+ /* Iterate over all classifiers and load matching statfiles */
+ cur = cfg->classifiers;
+
+ while (cur) {
+ clf = cur->data;
+
+ curst = clf->statfiles;
+ while (curst) {
+ stf = curst->data;
+
+ /*
+ * By default, all statfiles are treated as mmaped files
+ */
+ if (stf->backend != NULL && strcmp (stf->backend, REDIS_BACKEND_TYPE)) {
+ /*
+ * Check configuration sanity
+ */
+ elt = ucl_object_find_key (stf->opts, "read_servers");
+ if (elt == NULL) {
+ elt = ucl_object_find_key (stf->opts, "servers");
+ }
+ if (elt == NULL) {
+ msg_err ("statfile %s has no redis servers", stf->symbol);
+ curst = curst->next;
+ continue;
+ }
+ else {
+ new->read_servers = rspamd_upstreams_create ();
+ if (!rspamd_upstreams_from_ucl (new->read_servers, elt,
+ REDIS_DEFAULT_PORT, NULL)) {
+ msg_err ("statfile %s cannot read servers configuration",
+ stf->symbol);
+ curst = curst->next;
+ continue;
+ }
+ }
+
+ elt = ucl_object_find_key (stf->opts, "write_servers");
+ if (elt == NULL) {
+ msg_err ("statfile %s has no write redis servers, "
+ "so learning is impossible", stf->symbol);
+ curst = curst->next;
+ continue;
+ }
+ else {
+ new->write_servers = rspamd_upstreams_create ();
+ if (!rspamd_upstreams_from_ucl (new->read_servers, elt,
+ REDIS_DEFAULT_PORT, NULL)) {
+ msg_err ("statfile %s cannot write servers configuration",
+ stf->symbol);
+ rspamd_upstreams_destroy (new->write_servers);
+ new->write_servers = NULL;
+ }
+ }
+
+ elt = ucl_object_find_key (stf->opts, "prefix");
+ if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
+ new->redis_object = REDIS_DEFAULT_OBJECT;
+ }
+ else {
+ /* XXX: sanity check */
+ new->redis_object = ucl_object_tostring (elt);
+ if (rspamd_redis_expand_object (new->redis_object, stf,
+ NULL, NULL) == 0) {
+ msg_err ("statfile %s cannot write servers configuration",
+ stf->symbol);
+ }
+ }
+
+ ctx->statfiles ++;
+ }
+
+ curst = curst->next;
+ }
+
+ cur = g_list_next (cur);
+ }
+
+ return (gpointer)new;
+}
+
+gpointer
+rspamd_redis_runtime (struct rspamd_task *task,
+ struct rspamd_statfile_config *stcf,
+ gboolean learn, gpointer ctx)
+{
+
+}
+
+gboolean rspamd_redis_process_token (struct token_node_s *tok,
+ struct rspamd_token_result *res,
+ gpointer ctx);
+gboolean rspamd_redis_learn_token (struct token_node_s *tok,
+ struct rspamd_token_result *res,
+ gpointer ctx);
+void rspamd_redis_finalize_learn (struct rspamd_statfile_runtime *runtime,
+ gpointer ctx);
+gulong rspamd_redis_total_learns (struct rspamd_statfile_runtime *runtime,
+ gpointer ctx);
+gulong rspamd_redis_inc_learns (struct rspamd_statfile_runtime *runtime,
+ gpointer ctx);
+gulong rspamd_redis_learns (struct rspamd_statfile_runtime *runtime,
+ gpointer ctx);
+ucl_object_t * rspamd_redis_get_stat (struct rspamd_statfile_runtime *runtime,
+ gpointer ctx);
diff --git a/src/libstat/stat_process.c b/src/libstat/stat_process.c
index f5a4b9398..311eaa0ea 100644
--- a/src/libstat/stat_process.c
+++ b/src/libstat/stat_process.c
@@ -213,7 +213,7 @@ rspamd_stat_preprocess (struct rspamd_stat_ctx *st_ctx,
continue;
}
- backend_runtime = bk->runtime (stcf, op != RSPAMD_CLASSIFY_OP,
+ backend_runtime = bk->runtime (task, stcf, op != RSPAMD_CLASSIFY_OP,
bk->ctx);
st_runtime = rspamd_mempool_alloc0 (task->task_pool,
@@ -645,7 +645,7 @@ rspamd_stat_statistics (struct rspamd_config *cfg, guint64 *total_learns)
continue;
}
- backend_runtime = bk->runtime (stcf, FALSE, bk->ctx);
+ backend_runtime = bk->runtime (NULL, stcf, FALSE, bk->ctx);
learns += bk->total_learns (backend_runtime, bk->ctx);
elt = bk->get_stat (backend_runtime, bk->ctx);