From: Vsevolod Stakhov Date: Tue, 28 Jul 2009 16:09:50 +0000 (+0400) Subject: * Add client part for fuzzy checksums storage X-Git-Tag: 0.2.7~65 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=b92225677f444858c81b9bd4900d5ddcf9eb801a;p=rspamd.git * Add client part for fuzzy checksums storage * Add fuzzy storage syncing to file --- diff --git a/CMakeLists.txt b/CMakeLists.txt index ce1026afa..2fe6e1350 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -341,7 +341,8 @@ SET(CLASSIFIERSSRC src/classifiers/classifiers.c SET(PLUGINSSRC src/plugins/surbl.c src/plugins/regexp.c src/plugins/chartable.c - src/plugins/emails.c) + src/plugins/emails.c + src/plugins/fuzzy_check.c) SET(TESTSRC test/rspamd_expression_test.c test/rspamd_memcached_test.c diff --git a/src/cfg_utils.c b/src/cfg_utils.c index 6b095e6d8..21092356f 100644 --- a/src/cfg_utils.c +++ b/src/cfg_utils.c @@ -278,6 +278,21 @@ parse_seconds (const char *t) if (*err_str == 's' || *err_str == 'S') { result *= 1000; } + /* Minutes */ + else if (*err_str == 'm' || *err_str == 'M') { + /* Handle ms correctly */ + if (*(err_str + 1) == 's' || *(err_str + 1) == 'S') { + result *= 60 * 1000; + } + } + /* Hours */ + else if (*err_str == 'h' || *err_str == 'H') { + result *= 60 * 60 * 1000; + } + /* Days */ + else if (*err_str == 'd' || *err_str == 'D') { + result *= 24 * 60 * 60 * 1000; + } } return result; diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index fa4acce09..a0aa00e50 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -26,7 +26,6 @@ * Rspamd fuzzy storage server */ - #include "config.h" #include "util.h" #include "main.h" @@ -41,19 +40,33 @@ /* This number is used as limit while comparing two fuzzy hashes, this value can vary from 0 to 100 */ #define LEV_LIMIT 99 +/* This number is used as limit while we are making decision to write new hash file or not */ +#define MOD_LIMIT 10 +/* This number is used as expire time in seconds for cache items (2 days) */ +#define DEFAULT_EXPIRE 172800L +/* Resync value in seconds */ +#define SYNC_TIMEOUT 60 static GQueue *hashes; +/* Number of cache modifications */ +static uint32_t mods = 0; +/* For evtimer */ +static struct timeval tmv; +static struct event tev; + struct rspamd_fuzzy_node { fuzzy_hash_t h; uint64_t time; }; -static -void sig_handler (int signo) -{ +static void +sig_handler (int signo) +{ switch (signo) { case SIGINT: + /* Ignore SIGINT as we should got SIGTERM after it anyway */ + return; case SIGTERM: #ifdef WITH_PROFILER exit (0); @@ -64,6 +77,73 @@ void sig_handler (int signo) } } +static void +sync_cache (struct rspamd_worker *wrk) +{ + int fd; + char *filename, *exp_str; + GList *cur, *tmp; + struct rspamd_fuzzy_node *node; + uint64_t expire, now; + + /* Check for modifications */ + if (mods < MOD_LIMIT) { + return; + } + + msg_info ("sync_cache: syncing fuzzy hash storage"); + filename = g_hash_table_lookup (wrk->cf->params, "hashfile"); + if (filename == NULL) { + return; + } + exp_str = g_hash_table_lookup (wrk->cf->params, "expire"); + if (exp_str != NULL) { + expire = parse_seconds (exp_str) / 1000; + } + else { + expire = DEFAULT_EXPIRE; + } + + if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) { + msg_err ("sync_cache: cannot create hash file %s: %s", filename, strerror (errno)); + return; + } + + now = (uint64_t)time (NULL); + cur = hashes->head; + while (cur) { + node = cur->data; + if (now - node->time > expire) { + /* Remove expired item */ + cur = g_list_next (cur); + hashes->head = g_list_remove_link (hashes->head, cur); + g_free (node); + g_list_free1 (tmp); + continue; + } + if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) { + msg_err ("sync_cache: cannot write file %s: %s", filename, strerror (errno)); + } + cur = g_list_next (cur); + } + + close (fd); +} + +static void +sigterm_handler (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + static struct timeval tv = { + .tv_sec = 0, + .tv_usec = 0, + }; + + mods = MOD_LIMIT + 1; + sync_cache (worker); + (void)event_loopexit (&tv); +} + /* * Config reload is designed by sending sigusr to active workers and pending shutdown of them */ @@ -165,6 +245,7 @@ process_write_command (struct fuzzy_cmd *cmd) h->h.block_size = cmd->blocksize; h->time = (uint64_t)time (NULL); g_queue_push_head (hashes, h); + mods ++; return TRUE; } @@ -187,6 +268,7 @@ process_delete_command (struct fuzzy_cmd *cmd) hashes->head = g_list_remove_link (hashes->head, cur); g_free (h); g_list_free1 (cur); + mods ++; return TRUE; } cur = g_list_next (cur); @@ -304,6 +386,20 @@ accept_socket (int fd, short what, void *arg) } +static void +sync_callback (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + /* Timer event */ + evtimer_set (&tev, sync_callback, worker); + /* Plan event with jitter */ + tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double (); + tmv.tv_usec = 0; + evtimer_add (&tev, &tmv); + + sync_cache (worker); +} + /* * Start worker process */ @@ -311,6 +407,7 @@ void start_fuzzy_storage (struct rspamd_worker *worker) { struct sigaction signals; + struct event sev; worker->srv->pid = getpid (); worker->srv->type = TYPE_FUZZY; @@ -323,18 +420,28 @@ start_fuzzy_storage (struct rspamd_worker *worker) /* SIGUSR2 handler */ signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker); signal_add (&worker->sig_ev, NULL); - - /* Accept event */ - event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); - event_add(&worker->bind_ev, NULL); + signal_set (&sev, SIGTERM, sigterm_handler, (void *) worker); + signal_add (&sev, NULL); /* Send SIGUSR2 to parent */ kill (getppid (), SIGUSR2); + /* Try to read hashes from file */ if (!read_hashes_file (worker)) { msg_err ("read_hashes_file: cannot read hashes file, it can be created after save procedure"); } + /* Timer event */ + evtimer_set (&tev, sync_callback, worker); + /* Plan event with jitter */ + tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double (); + tmv.tv_usec = 0; + evtimer_add (&tev, &tmv); + + /* Accept event */ + event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_add(&worker->bind_ev, NULL); + event_loop (0); exit (EXIT_SUCCESS); diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c new file mode 100644 index 000000000..085ee5164 --- /dev/null +++ b/src/plugins/fuzzy_check.c @@ -0,0 +1,290 @@ +/* + * Copyright (c) 2009, Rambler media + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/***MODULE:fuzzy + * rspamd module that checks fuzzy checksums for messages + */ + +#include "../config.h" +#include "../main.h" +#include "../message.h" +#include "../modules.h" +#include "../cfg_file.h" +#include "../expressions.h" +#include "../util.h" +#include "../view.h" +#include "../map.h" +#include "../fuzzy_storage.h" + +#define DEFAULT_SYMBOL "R_FUZZY_HASH" +#define DEFAULT_UPSTREAM_ERROR_TIME 10 +#define DEFAULT_UPSTREAM_DEAD_TIME 300 +#define DEFAULT_UPSTREAM_MAXERRORS 10 + +#define IO_TIMEOUT 5 +#define DEFAULT_PORT 11335 + +struct storage_server { + char *name; + struct in_addr addr; + uint16_t port; +}; + +struct fuzzy_ctx { + int (*filter)(struct worker_task *task); + char *metric; + char *symbol; + struct storage_server *servers; + int servers_num; + memory_pool_t *fuzzy_pool; +}; + +struct fuzzy_client_session { + int state; + fuzzy_hash_t *h; + struct event ev; + struct timeval tv; + struct worker_task *task; +}; + +static struct fuzzy_ctx *fuzzy_module_ctx = NULL; + +static int fuzzy_mime_filter (struct worker_task *task); +static void fuzzy_symbol_callback (struct worker_task *task, void *unused); + +static void +parse_servers_string (char *str) +{ + char **strvec, *p, portbuf[5], *name; + int num, i, j, port; + struct hostent *hent; + struct in_addr addr; + + strvec = g_strsplit (str, ",", 0); + num = g_strv_length (strvec); + + fuzzy_module_ctx->servers = memory_pool_alloc0 (fuzzy_module_ctx->fuzzy_pool, sizeof (struct storage_server) * num); + + for (i = 0; i <= num; i ++) { + g_strstrip (strvec[i]); + + if ((p = strchr (strvec[i], ':')) != NULL) { + j = 0; + while (g_ascii_isdigit (*p) && j < sizeof (portbuf) - 1) { + portbuf[j ++] = *p ++; + } + portbuf[j] = '\0'; + port = atoi (portbuf); + } + else { + /* Default http port */ + port = DEFAULT_PORT; + } + name = memory_pool_alloc (fuzzy_module_ctx->fuzzy_pool, p - strvec[i] + 1); + g_strlcpy (name, strvec[i], p - strvec[i] + 1); + if (!inet_aton (name, &addr)) { + /* Resolve using dns */ + hent = gethostbyname (name); + if (hent == NULL) { + msg_info ("parse_servers_string: cannot resolve: %s", name); + continue; + } + else { + fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].port = port; + fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].name = name; + memcpy (&fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].addr, hent->h_addr, sizeof(struct in_addr)); + fuzzy_module_ctx->servers_num ++; + } + } + else { + fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].port = port; + fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].name = name; + memcpy (&fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].addr, hent->h_addr, sizeof(struct in_addr)); + fuzzy_module_ctx->servers_num ++; + } + + } + + g_strfreev (strvec); + +} + +int +fuzzy_check_module_init (struct config_file *cfg, struct module_ctx **ctx) +{ + fuzzy_module_ctx = g_malloc (sizeof (struct fuzzy_ctx)); + + fuzzy_module_ctx->filter = fuzzy_mime_filter; + fuzzy_module_ctx->fuzzy_pool = memory_pool_new (memory_pool_get_size ()); + fuzzy_module_ctx->servers = NULL; + fuzzy_module_ctx->servers_num = 0; + + *ctx = (struct module_ctx *)fuzzy_module_ctx; + + return 0; +} + +int +fuzzy_check_module_config (struct config_file *cfg) +{ + char *value; + int res = TRUE; + struct metric *metric; + double *w; + + if ((value = get_module_opt (cfg, "fuzzy", "metric")) != NULL) { + fuzzy_module_ctx->metric = memory_pool_strdup (fuzzy_module_ctx->fuzzy_pool, value); + g_free (value); + } + else { + fuzzy_module_ctx->metric = DEFAULT_METRIC; + } + if ((value = get_module_opt (cfg, "fuzzy", "symbol")) != NULL) { + fuzzy_module_ctx->symbol = memory_pool_strdup (fuzzy_module_ctx->fuzzy_pool, value); + g_free (value); + } + else { + fuzzy_module_ctx->symbol = DEFAULT_SYMBOL; + } + if ((value = get_module_opt (cfg, "fuzzy", "servers")) != NULL) { + parse_servers_string (value); + } + + metric = g_hash_table_lookup (cfg->metrics, fuzzy_module_ctx->metric); + if (metric == NULL) { + msg_err ("fuzzy_module_config: cannot find metric definition %s", fuzzy_module_ctx->metric); + return FALSE; + } + + /* Search in factors hash table */ + w = g_hash_table_lookup (cfg->factors, fuzzy_module_ctx->symbol); + if (w == NULL) { + register_symbol (&metric->cache, fuzzy_module_ctx->symbol, 1, fuzzy_symbol_callback, NULL); + } + else { + register_symbol (&metric->cache, fuzzy_module_ctx->symbol, *w, fuzzy_symbol_callback, NULL); + } + + return res; +} + +int +fuzzy_check_module_reconfig (struct config_file *cfg) +{ + memory_pool_delete (fuzzy_module_ctx->fuzzy_pool); + fuzzy_module_ctx->fuzzy_pool = memory_pool_new (memory_pool_get_size ()); + + return fuzzy_check_module_config (cfg); +} + +static void +fuzzy_io_callback (int fd, short what, void *arg) +{ + struct fuzzy_client_session *session = arg; + struct fuzzy_cmd cmd; + char buf[sizeof ("ERR")]; + + if (what == EV_WRITE) { + /* Send command to storage */ + cmd.blocksize = session->h->block_size; + memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash)); + cmd.cmd = FUZZY_CHECK; + if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) { + goto err; + } + else { + event_set (&session->ev, fd, EV_READ, fuzzy_io_callback, session); + event_add (&session->ev, &session->tv); + } + } + else if (what == EV_READ) { + if (read (fd, buf, sizeof (buf)) == -1) { + goto err; + } + else if (buf[0] == 'O' && buf[1] == 'K') { + insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL); + } + goto ok; + } + + return; + + err: + msg_err ("fuzzy_io_callback: got error on IO, %d, %s", errno, strerror (errno)); + ok: + event_del (&session->ev); + close (fd); + session->task->save.saved --; + if (session->task->save.saved == 0) { + /* Call other filters */ + session->task->save.saved = 1; + process_filters (session->task); + } + +} + +static void +fuzzy_symbol_callback (struct worker_task *task, void *unused) +{ + struct mime_text_part *part; + struct fuzzy_client_session *session; + struct storage_server *selected; + GList *cur; + int sock; + + cur = task->text_parts; + + while (cur) { + part = cur->data; + selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num, + sizeof (struct storage_server), task->ts.tv_sec, + DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, + DEFAULT_UPSTREAM_MAXERRORS, + part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe)); + if (selected) { + if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) { + msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno)); + } + else { + session = memory_pool_alloc (task->task_pool, sizeof (struct fuzzy_client_session)); + event_set (&session->ev, sock, EV_WRITE, fuzzy_io_callback, session); + session->tv.tv_sec = IO_TIMEOUT; + session->tv.tv_usec = 0; + session->state = 0; + session->h = part->fuzzy; + session->task = task; + event_add (&session->ev, &session->tv); + task->save.saved ++; + } + } + cur = g_list_next (cur); + } +} + +static int +fuzzy_mime_filter (struct worker_task *task) +{ + /* XXX: remove this */ + return 0; +}