summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-07-28 20:09:50 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-07-28 20:09:50 +0400
commitb92225677f444858c81b9bd4900d5ddcf9eb801a (patch)
tree833cb69b4a62948636e20ddc5808a2913fd0bd19 /src
parent188018fe88416dbe918402a650da4153066d1658 (diff)
downloadrspamd-b92225677f444858c81b9bd4900d5ddcf9eb801a.tar.gz
rspamd-b92225677f444858c81b9bd4900d5ddcf9eb801a.zip
* Add client part for fuzzy checksums storage
* Add fuzzy storage syncing to file
Diffstat (limited to 'src')
-rw-r--r--src/cfg_utils.c15
-rw-r--r--src/fuzzy_storage.c123
-rw-r--r--src/plugins/fuzzy_check.c290
3 files changed, 420 insertions, 8 deletions
diff --git a/src/cfg_utils.c b/src/cfg_utils.c
index 6b095e6d8..21092356f 100644
--- a/src/cfg_utils.c
+++ b/src/cfg_utils.c
@@ -278,6 +278,21 @@ parse_seconds (const char *t)
if (*err_str == 's' || *err_str == 'S') {
result *= 1000;
}
+ /* Minutes */
+ else if (*err_str == 'm' || *err_str == 'M') {
+ /* Handle ms correctly */
+ if (*(err_str + 1) == 's' || *(err_str + 1) == 'S') {
+ result *= 60 * 1000;
+ }
+ }
+ /* Hours */
+ else if (*err_str == 'h' || *err_str == 'H') {
+ result *= 60 * 60 * 1000;
+ }
+ /* Days */
+ else if (*err_str == 'd' || *err_str == 'D') {
+ result *= 24 * 60 * 60 * 1000;
+ }
}
return result;
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index fa4acce09..a0aa00e50 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -26,7 +26,6 @@
* Rspamd fuzzy storage server
*/
-
#include "config.h"
#include "util.h"
#include "main.h"
@@ -41,19 +40,33 @@
/* This number is used as limit while comparing two fuzzy hashes, this value can vary from 0 to 100 */
#define LEV_LIMIT 99
+/* This number is used as limit while we are making decision to write new hash file or not */
+#define MOD_LIMIT 10
+/* This number is used as expire time in seconds for cache items (2 days) */
+#define DEFAULT_EXPIRE 172800L
+/* Resync value in seconds */
+#define SYNC_TIMEOUT 60
static GQueue *hashes;
+/* Number of cache modifications */
+static uint32_t mods = 0;
+/* For evtimer */
+static struct timeval tmv;
+static struct event tev;
+
struct rspamd_fuzzy_node {
fuzzy_hash_t h;
uint64_t time;
};
-static
-void sig_handler (int signo)
-{
+static void
+sig_handler (int signo)
+{
switch (signo) {
case SIGINT:
+ /* Ignore SIGINT as we should got SIGTERM after it anyway */
+ return;
case SIGTERM:
#ifdef WITH_PROFILER
exit (0);
@@ -64,6 +77,73 @@ void sig_handler (int signo)
}
}
+static void
+sync_cache (struct rspamd_worker *wrk)
+{
+ int fd;
+ char *filename, *exp_str;
+ GList *cur, *tmp;
+ struct rspamd_fuzzy_node *node;
+ uint64_t expire, now;
+
+ /* Check for modifications */
+ if (mods < MOD_LIMIT) {
+ return;
+ }
+
+ msg_info ("sync_cache: syncing fuzzy hash storage");
+ filename = g_hash_table_lookup (wrk->cf->params, "hashfile");
+ if (filename == NULL) {
+ return;
+ }
+ exp_str = g_hash_table_lookup (wrk->cf->params, "expire");
+ if (exp_str != NULL) {
+ expire = parse_seconds (exp_str) / 1000;
+ }
+ else {
+ expire = DEFAULT_EXPIRE;
+ }
+
+ if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) {
+ msg_err ("sync_cache: cannot create hash file %s: %s", filename, strerror (errno));
+ return;
+ }
+
+ now = (uint64_t)time (NULL);
+ cur = hashes->head;
+ while (cur) {
+ node = cur->data;
+ if (now - node->time > expire) {
+ /* Remove expired item */
+ cur = g_list_next (cur);
+ hashes->head = g_list_remove_link (hashes->head, cur);
+ g_free (node);
+ g_list_free1 (tmp);
+ continue;
+ }
+ if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
+ msg_err ("sync_cache: cannot write file %s: %s", filename, strerror (errno));
+ }
+ cur = g_list_next (cur);
+ }
+
+ close (fd);
+}
+
+static void
+sigterm_handler (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ static struct timeval tv = {
+ .tv_sec = 0,
+ .tv_usec = 0,
+ };
+
+ mods = MOD_LIMIT + 1;
+ sync_cache (worker);
+ (void)event_loopexit (&tv);
+}
+
/*
* Config reload is designed by sending sigusr to active workers and pending shutdown of them
*/
@@ -165,6 +245,7 @@ process_write_command (struct fuzzy_cmd *cmd)
h->h.block_size = cmd->blocksize;
h->time = (uint64_t)time (NULL);
g_queue_push_head (hashes, h);
+ mods ++;
return TRUE;
}
@@ -187,6 +268,7 @@ process_delete_command (struct fuzzy_cmd *cmd)
hashes->head = g_list_remove_link (hashes->head, cur);
g_free (h);
g_list_free1 (cur);
+ mods ++;
return TRUE;
}
cur = g_list_next (cur);
@@ -304,6 +386,20 @@ accept_socket (int fd, short what, void *arg)
}
+static void
+sync_callback (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ /* Timer event */
+ evtimer_set (&tev, sync_callback, worker);
+ /* Plan event with jitter */
+ tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double ();
+ tmv.tv_usec = 0;
+ evtimer_add (&tev, &tmv);
+
+ sync_cache (worker);
+}
+
/*
* Start worker process
*/
@@ -311,6 +407,7 @@ void
start_fuzzy_storage (struct rspamd_worker *worker)
{
struct sigaction signals;
+ struct event sev;
worker->srv->pid = getpid ();
worker->srv->type = TYPE_FUZZY;
@@ -323,18 +420,28 @@ start_fuzzy_storage (struct rspamd_worker *worker)
/* SIGUSR2 handler */
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
signal_add (&worker->sig_ev, NULL);
-
- /* Accept event */
- event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
- event_add(&worker->bind_ev, NULL);
+ signal_set (&sev, SIGTERM, sigterm_handler, (void *) worker);
+ signal_add (&sev, NULL);
/* Send SIGUSR2 to parent */
kill (getppid (), SIGUSR2);
+
/* Try to read hashes from file */
if (!read_hashes_file (worker)) {
msg_err ("read_hashes_file: cannot read hashes file, it can be created after save procedure");
}
+ /* Timer event */
+ evtimer_set (&tev, sync_callback, worker);
+ /* Plan event with jitter */
+ tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double ();
+ tmv.tv_usec = 0;
+ evtimer_add (&tev, &tmv);
+
+ /* Accept event */
+ event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_add(&worker->bind_ev, NULL);
+
event_loop (0);
exit (EXIT_SUCCESS);
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
new file mode 100644
index 000000000..085ee5164
--- /dev/null
+++ b/src/plugins/fuzzy_check.c
@@ -0,0 +1,290 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/***MODULE:fuzzy
+ * rspamd module that checks fuzzy checksums for messages
+ */
+
+#include "../config.h"
+#include "../main.h"
+#include "../message.h"
+#include "../modules.h"
+#include "../cfg_file.h"
+#include "../expressions.h"
+#include "../util.h"
+#include "../view.h"
+#include "../map.h"
+#include "../fuzzy_storage.h"
+
+#define DEFAULT_SYMBOL "R_FUZZY_HASH"
+#define DEFAULT_UPSTREAM_ERROR_TIME 10
+#define DEFAULT_UPSTREAM_DEAD_TIME 300
+#define DEFAULT_UPSTREAM_MAXERRORS 10
+
+#define IO_TIMEOUT 5
+#define DEFAULT_PORT 11335
+
+struct storage_server {
+ char *name;
+ struct in_addr addr;
+ uint16_t port;
+};
+
+struct fuzzy_ctx {
+ int (*filter)(struct worker_task *task);
+ char *metric;
+ char *symbol;
+ struct storage_server *servers;
+ int servers_num;
+ memory_pool_t *fuzzy_pool;
+};
+
+struct fuzzy_client_session {
+ int state;
+ fuzzy_hash_t *h;
+ struct event ev;
+ struct timeval tv;
+ struct worker_task *task;
+};
+
+static struct fuzzy_ctx *fuzzy_module_ctx = NULL;
+
+static int fuzzy_mime_filter (struct worker_task *task);
+static void fuzzy_symbol_callback (struct worker_task *task, void *unused);
+
+static void
+parse_servers_string (char *str)
+{
+ char **strvec, *p, portbuf[5], *name;
+ int num, i, j, port;
+ struct hostent *hent;
+ struct in_addr addr;
+
+ strvec = g_strsplit (str, ",", 0);
+ num = g_strv_length (strvec);
+
+ fuzzy_module_ctx->servers = memory_pool_alloc0 (fuzzy_module_ctx->fuzzy_pool, sizeof (struct storage_server) * num);
+
+ for (i = 0; i <= num; i ++) {
+ g_strstrip (strvec[i]);
+
+ if ((p = strchr (strvec[i], ':')) != NULL) {
+ j = 0;
+ while (g_ascii_isdigit (*p) && j < sizeof (portbuf) - 1) {
+ portbuf[j ++] = *p ++;
+ }
+ portbuf[j] = '\0';
+ port = atoi (portbuf);
+ }
+ else {
+ /* Default http port */
+ port = DEFAULT_PORT;
+ }
+ name = memory_pool_alloc (fuzzy_module_ctx->fuzzy_pool, p - strvec[i] + 1);
+ g_strlcpy (name, strvec[i], p - strvec[i] + 1);
+ if (!inet_aton (name, &addr)) {
+ /* Resolve using dns */
+ hent = gethostbyname (name);
+ if (hent == NULL) {
+ msg_info ("parse_servers_string: cannot resolve: %s", name);
+ continue;
+ }
+ else {
+ fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].port = port;
+ fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].name = name;
+ memcpy (&fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].addr, hent->h_addr, sizeof(struct in_addr));
+ fuzzy_module_ctx->servers_num ++;
+ }
+ }
+ else {
+ fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].port = port;
+ fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].name = name;
+ memcpy (&fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].addr, hent->h_addr, sizeof(struct in_addr));
+ fuzzy_module_ctx->servers_num ++;
+ }
+
+ }
+
+ g_strfreev (strvec);
+
+}
+
+int
+fuzzy_check_module_init (struct config_file *cfg, struct module_ctx **ctx)
+{
+ fuzzy_module_ctx = g_malloc (sizeof (struct fuzzy_ctx));
+
+ fuzzy_module_ctx->filter = fuzzy_mime_filter;
+ fuzzy_module_ctx->fuzzy_pool = memory_pool_new (memory_pool_get_size ());
+ fuzzy_module_ctx->servers = NULL;
+ fuzzy_module_ctx->servers_num = 0;
+
+ *ctx = (struct module_ctx *)fuzzy_module_ctx;
+
+ return 0;
+}
+
+int
+fuzzy_check_module_config (struct config_file *cfg)
+{
+ char *value;
+ int res = TRUE;
+ struct metric *metric;
+ double *w;
+
+ if ((value = get_module_opt (cfg, "fuzzy", "metric")) != NULL) {
+ fuzzy_module_ctx->metric = memory_pool_strdup (fuzzy_module_ctx->fuzzy_pool, value);
+ g_free (value);
+ }
+ else {
+ fuzzy_module_ctx->metric = DEFAULT_METRIC;
+ }
+ if ((value = get_module_opt (cfg, "fuzzy", "symbol")) != NULL) {
+ fuzzy_module_ctx->symbol = memory_pool_strdup (fuzzy_module_ctx->fuzzy_pool, value);
+ g_free (value);
+ }
+ else {
+ fuzzy_module_ctx->symbol = DEFAULT_SYMBOL;
+ }
+ if ((value = get_module_opt (cfg, "fuzzy", "servers")) != NULL) {
+ parse_servers_string (value);
+ }
+
+ metric = g_hash_table_lookup (cfg->metrics, fuzzy_module_ctx->metric);
+ if (metric == NULL) {
+ msg_err ("fuzzy_module_config: cannot find metric definition %s", fuzzy_module_ctx->metric);
+ return FALSE;
+ }
+
+ /* Search in factors hash table */
+ w = g_hash_table_lookup (cfg->factors, fuzzy_module_ctx->symbol);
+ if (w == NULL) {
+ register_symbol (&metric->cache, fuzzy_module_ctx->symbol, 1, fuzzy_symbol_callback, NULL);
+ }
+ else {
+ register_symbol (&metric->cache, fuzzy_module_ctx->symbol, *w, fuzzy_symbol_callback, NULL);
+ }
+
+ return res;
+}
+
+int
+fuzzy_check_module_reconfig (struct config_file *cfg)
+{
+ memory_pool_delete (fuzzy_module_ctx->fuzzy_pool);
+ fuzzy_module_ctx->fuzzy_pool = memory_pool_new (memory_pool_get_size ());
+
+ return fuzzy_check_module_config (cfg);
+}
+
+static void
+fuzzy_io_callback (int fd, short what, void *arg)
+{
+ struct fuzzy_client_session *session = arg;
+ struct fuzzy_cmd cmd;
+ char buf[sizeof ("ERR")];
+
+ if (what == EV_WRITE) {
+ /* Send command to storage */
+ cmd.blocksize = session->h->block_size;
+ memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash));
+ cmd.cmd = FUZZY_CHECK;
+ if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) {
+ goto err;
+ }
+ else {
+ event_set (&session->ev, fd, EV_READ, fuzzy_io_callback, session);
+ event_add (&session->ev, &session->tv);
+ }
+ }
+ else if (what == EV_READ) {
+ if (read (fd, buf, sizeof (buf)) == -1) {
+ goto err;
+ }
+ else if (buf[0] == 'O' && buf[1] == 'K') {
+ insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL);
+ }
+ goto ok;
+ }
+
+ return;
+
+ err:
+ msg_err ("fuzzy_io_callback: got error on IO, %d, %s", errno, strerror (errno));
+ ok:
+ event_del (&session->ev);
+ close (fd);
+ session->task->save.saved --;
+ if (session->task->save.saved == 0) {
+ /* Call other filters */
+ session->task->save.saved = 1;
+ process_filters (session->task);
+ }
+
+}
+
+static void
+fuzzy_symbol_callback (struct worker_task *task, void *unused)
+{
+ struct mime_text_part *part;
+ struct fuzzy_client_session *session;
+ struct storage_server *selected;
+ GList *cur;
+ int sock;
+
+ cur = task->text_parts;
+
+ while (cur) {
+ part = cur->data;
+ selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num,
+ sizeof (struct storage_server), task->ts.tv_sec,
+ DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME,
+ DEFAULT_UPSTREAM_MAXERRORS,
+ part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
+ if (selected) {
+ if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
+ msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
+ }
+ else {
+ session = memory_pool_alloc (task->task_pool, sizeof (struct fuzzy_client_session));
+ event_set (&session->ev, sock, EV_WRITE, fuzzy_io_callback, session);
+ session->tv.tv_sec = IO_TIMEOUT;
+ session->tv.tv_usec = 0;
+ session->state = 0;
+ session->h = part->fuzzy;
+ session->task = task;
+ event_add (&session->ev, &session->tv);
+ task->save.saved ++;
+ }
+ }
+ cur = g_list_next (cur);
+ }
+}
+
+static int
+fuzzy_mime_filter (struct worker_task *task)
+{
+ /* XXX: remove this */
+ return 0;
+}