]> source.dussan.org Git - rspamd.git/commitdiff
* Add client part for fuzzy checksums storage
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 28 Jul 2009 16:09:50 +0000 (20:09 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 28 Jul 2009 16:09:50 +0000 (20:09 +0400)
* Add fuzzy storage syncing to file

CMakeLists.txt
src/cfg_utils.c
src/fuzzy_storage.c
src/plugins/fuzzy_check.c [new file with mode: 0644]

index ce1026afa998c06408c6fdb0f58ed2556cc968f6..2fe6e13505fc9c9d9fdf5c3649d6cca1574e164d 100644 (file)
@@ -341,7 +341,8 @@ SET(CLASSIFIERSSRC src/classifiers/classifiers.c
 SET(PLUGINSSRC src/plugins/surbl.c
                                src/plugins/regexp.c
                                src/plugins/chartable.c
-                               src/plugins/emails.c)
+                               src/plugins/emails.c
+                               src/plugins/fuzzy_check.c)
 
 SET(TESTSRC            test/rspamd_expression_test.c
                                test/rspamd_memcached_test.c
index 6b095e6d89385c1ed11d48d46d8b70444eda10de..21092356f42def393909e50674bb89b8c9127f56 100644 (file)
@@ -278,6 +278,21 @@ parse_seconds (const char *t)
                if (*err_str == 's' || *err_str == 'S') {
                        result *= 1000;
                }
+        /* Minutes */
+        else if (*err_str == 'm' || *err_str == 'M') {
+            /* Handle ms correctly */
+            if (*(err_str + 1) == 's' || *(err_str + 1) == 'S') {
+                result *= 60 * 1000;
+            }
+        }
+        /* Hours */
+        else if (*err_str == 'h' || *err_str == 'H') {
+            result *= 60 * 60 * 1000;
+        }
+        /* Days */
+        else if (*err_str == 'd' || *err_str == 'D') {
+            result *= 24 * 60 * 60 * 1000;
+        }
        }
 
        return result;
index fa4acce096f6f7916374388da8851c4cdca2b53d..a0aa00e500e1f73e6cfe12a48ab3825b5036b6fb 100644 (file)
@@ -26,7 +26,6 @@
  * Rspamd fuzzy storage server
  */
 
-
 #include "config.h"
 #include "util.h"
 #include "main.h"
 
 /* This number is used as limit while comparing two fuzzy hashes, this value can vary from 0 to 100 */
 #define LEV_LIMIT 99
+/* This number is used as limit while we are making decision to write new hash file or not */
+#define MOD_LIMIT 10
+/* This number is used as expire time in seconds for cache items  (2 days) */
+#define DEFAULT_EXPIRE 172800L
+/* Resync value in seconds */
+#define SYNC_TIMEOUT 60
 
 static GQueue *hashes;
 
+/* Number of cache modifications */
+static uint32_t mods = 0;
+/* For evtimer */
+static struct timeval tmv;
+static struct event tev;
+
 struct rspamd_fuzzy_node {
        fuzzy_hash_t h;
        uint64_t time;
 };
 
-static 
-void sig_handler (int signo)
-{
+static void 
+sig_handler (int signo)
+{      
        switch (signo) {
                case SIGINT:
+                       /* Ignore SIGINT as we should got SIGTERM after it anyway */
+                       return;
                case SIGTERM:
 #ifdef WITH_PROFILER
                        exit (0);
@@ -64,6 +77,73 @@ void sig_handler (int signo)
        }
 }
 
+static void
+sync_cache (struct rspamd_worker *wrk)
+{
+       int fd;
+       char *filename, *exp_str;
+       GList *cur, *tmp;
+       struct rspamd_fuzzy_node *node;
+       uint64_t expire, now;
+       
+       /* Check for modifications */
+       if (mods < MOD_LIMIT) {
+               return;
+       }
+       
+       msg_info ("sync_cache: syncing fuzzy hash storage");
+       filename = g_hash_table_lookup (wrk->cf->params, "hashfile");
+       if (filename == NULL) {
+               return;
+       }
+       exp_str = g_hash_table_lookup (wrk->cf->params, "expire");
+       if (exp_str != NULL) {
+               expire = parse_seconds (exp_str) / 1000;
+       }
+       else {
+               expire = DEFAULT_EXPIRE;
+       }
+
+       if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) {
+               msg_err ("sync_cache: cannot create hash file %s: %s", filename, strerror (errno));
+               return;
+       }
+       
+       now = (uint64_t)time (NULL);
+       cur = hashes->head;
+       while (cur) {
+               node = cur->data;
+               if (now - node->time > expire) {
+                       /* Remove expired item */
+                       cur = g_list_next (cur);
+                       hashes->head = g_list_remove_link (hashes->head, cur);
+                       g_free (node);
+                       g_list_free1 (tmp);
+                       continue;
+               }
+               if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
+                       msg_err ("sync_cache: cannot write file %s: %s", filename, strerror (errno));
+               }
+               cur = g_list_next (cur);
+       }
+
+       close (fd);
+}
+
+static void 
+sigterm_handler (int fd, short what, void *arg)
+{
+       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+       static struct timeval tv = {
+               .tv_sec = 0,
+               .tv_usec = 0,
+       };
+       
+       mods = MOD_LIMIT + 1;
+       sync_cache (worker);
+       (void)event_loopexit (&tv);
+}
+
 /*
  * Config reload is designed by sending sigusr to active workers and pending shutdown of them
  */
@@ -165,6 +245,7 @@ process_write_command (struct fuzzy_cmd *cmd)
        h->h.block_size = cmd->blocksize;
        h->time = (uint64_t)time (NULL);
        g_queue_push_head (hashes, h);
+       mods ++;
        
        return TRUE;
 }
@@ -187,6 +268,7 @@ process_delete_command (struct fuzzy_cmd *cmd)
                        hashes->head = g_list_remove_link (hashes->head, cur);
                        g_free (h);
                        g_list_free1 (cur);
+                       mods ++;
                        return TRUE;
                }
                cur = g_list_next (cur);
@@ -304,6 +386,20 @@ accept_socket (int fd, short what, void *arg)
 
 }
 
+static void
+sync_callback (int fd, short what, void *arg)
+{
+       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+       /* Timer event */ 
+       evtimer_set (&tev, sync_callback, worker);
+       /* Plan event with jitter */
+       tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double ();
+       tmv.tv_usec = 0;
+       evtimer_add (&tev, &tmv);
+
+       sync_cache (worker);
+}
+
 /*
  * Start worker process
  */
@@ -311,6 +407,7 @@ void
 start_fuzzy_storage (struct rspamd_worker *worker)
 {
        struct sigaction signals;
+       struct event sev;
 
        worker->srv->pid = getpid ();
        worker->srv->type = TYPE_FUZZY;
@@ -323,18 +420,28 @@ start_fuzzy_storage (struct rspamd_worker *worker)
        /* SIGUSR2 handler */
        signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
        signal_add (&worker->sig_ev, NULL);
-
-       /* Accept event */
-       event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
-       event_add(&worker->bind_ev, NULL);
+       signal_set (&sev, SIGTERM, sigterm_handler, (void *) worker);
+       signal_add (&sev, NULL);
 
        /* Send SIGUSR2 to parent */
        kill (getppid (), SIGUSR2);
+
        
        /* Try to read hashes from file */
        if (!read_hashes_file (worker)) {
                msg_err ("read_hashes_file: cannot read hashes file, it can be created after save procedure");
        }
+       /* Timer event */ 
+       evtimer_set (&tev, sync_callback, worker);
+       /* Plan event with jitter */
+       tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double ();
+       tmv.tv_usec = 0;
+       evtimer_add (&tev, &tmv);
+
+       /* Accept event */
+       event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+       event_add(&worker->bind_ev, NULL);
+
 
        event_loop (0);
        exit (EXIT_SUCCESS);
diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c
new file mode 100644 (file)
index 0000000..085ee51
--- /dev/null
@@ -0,0 +1,290 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/***MODULE:fuzzy
+ * rspamd module that checks fuzzy checksums for messages
+ */
+
+#include "../config.h"
+#include "../main.h"
+#include "../message.h"
+#include "../modules.h"
+#include "../cfg_file.h"
+#include "../expressions.h"
+#include "../util.h"
+#include "../view.h"
+#include "../map.h"
+#include "../fuzzy_storage.h"
+
+#define DEFAULT_SYMBOL "R_FUZZY_HASH"
+#define DEFAULT_UPSTREAM_ERROR_TIME 10
+#define DEFAULT_UPSTREAM_DEAD_TIME 300
+#define DEFAULT_UPSTREAM_MAXERRORS 10
+
+#define IO_TIMEOUT 5
+#define DEFAULT_PORT 11335
+
+struct storage_server {
+       char *name;
+       struct in_addr addr;
+       uint16_t port;
+};
+
+struct fuzzy_ctx {
+       int (*filter)(struct worker_task *task);
+       char *metric;
+       char *symbol;
+       struct storage_server *servers;
+       int servers_num;
+       memory_pool_t *fuzzy_pool;
+};
+
+struct fuzzy_client_session {
+       int state;
+       fuzzy_hash_t *h;
+       struct event ev;
+       struct timeval tv;
+       struct worker_task *task;
+};
+
+static struct fuzzy_ctx *fuzzy_module_ctx = NULL;
+
+static int fuzzy_mime_filter (struct worker_task *task);
+static void fuzzy_symbol_callback (struct worker_task *task, void *unused);
+
+static void
+parse_servers_string (char *str)
+{
+       char **strvec, *p, portbuf[5], *name;
+       int num, i, j, port;
+       struct hostent *hent;
+       struct in_addr addr;
+
+       strvec = g_strsplit (str, ",", 0);
+       num = g_strv_length (strvec);
+
+       fuzzy_module_ctx->servers = memory_pool_alloc0 (fuzzy_module_ctx->fuzzy_pool, sizeof (struct storage_server) * num);
+
+       for (i = 0; i <= num; i ++) {
+               g_strstrip (strvec[i]);
+
+               if ((p = strchr (strvec[i], ':')) != NULL) {
+                       j = 0;
+                       while (g_ascii_isdigit (*p) && j < sizeof (portbuf) - 1) {
+                               portbuf[j ++] = *p ++;
+                       }
+                       portbuf[j] = '\0';
+                       port = atoi (portbuf);
+               }
+               else {
+                       /* Default http port */
+                       port = DEFAULT_PORT;
+               }
+               name = memory_pool_alloc (fuzzy_module_ctx->fuzzy_pool, p - strvec[i] + 1);
+               g_strlcpy (name, strvec[i], p - strvec[i] + 1);
+               if (!inet_aton (name, &addr)) {
+                       /* Resolve using dns */
+                       hent = gethostbyname (name);
+                       if (hent == NULL) {
+                               msg_info ("parse_servers_string: cannot resolve: %s", name);
+                               continue;
+                       }
+                       else {
+                               fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].port = port;   
+                               fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].name = name;   
+                               memcpy (&fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].addr, hent->h_addr, sizeof(struct in_addr));
+                               fuzzy_module_ctx->servers_num ++;       
+                       }
+               }
+               else {
+                       fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].port = port;   
+                       fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].name = name;   
+                       memcpy (&fuzzy_module_ctx->servers[fuzzy_module_ctx->servers_num].addr, hent->h_addr, sizeof(struct in_addr));
+                       fuzzy_module_ctx->servers_num ++;       
+               }
+
+       }
+
+       g_strfreev (strvec);
+
+}
+
+int
+fuzzy_check_module_init (struct config_file *cfg, struct module_ctx **ctx)
+{
+       fuzzy_module_ctx = g_malloc (sizeof (struct fuzzy_ctx));
+
+       fuzzy_module_ctx->filter = fuzzy_mime_filter;
+       fuzzy_module_ctx->fuzzy_pool = memory_pool_new (memory_pool_get_size ());
+       fuzzy_module_ctx->servers = NULL;
+       fuzzy_module_ctx->servers_num = 0;
+       
+       *ctx = (struct module_ctx *)fuzzy_module_ctx;
+
+       return 0;
+}
+
+int
+fuzzy_check_module_config (struct config_file *cfg)
+{
+       char *value;
+       int res = TRUE;
+       struct metric *metric;
+       double *w;
+
+       if ((value = get_module_opt (cfg, "fuzzy", "metric")) != NULL) {
+               fuzzy_module_ctx->metric = memory_pool_strdup (fuzzy_module_ctx->fuzzy_pool, value);
+               g_free (value);
+       }
+       else {
+               fuzzy_module_ctx->metric = DEFAULT_METRIC;
+       }
+       if ((value = get_module_opt (cfg, "fuzzy", "symbol")) != NULL) {
+               fuzzy_module_ctx->symbol = memory_pool_strdup (fuzzy_module_ctx->fuzzy_pool, value);
+               g_free (value);
+       }
+       else {
+               fuzzy_module_ctx->symbol = DEFAULT_SYMBOL;
+       }
+       if ((value = get_module_opt (cfg, "fuzzy", "servers")) != NULL) {
+               parse_servers_string (value);
+       }       
+
+       metric = g_hash_table_lookup (cfg->metrics, fuzzy_module_ctx->metric);
+       if (metric == NULL) {
+               msg_err ("fuzzy_module_config: cannot find metric definition %s", fuzzy_module_ctx->metric);
+               return FALSE;
+       }
+
+       /* Search in factors hash table */
+       w = g_hash_table_lookup (cfg->factors, fuzzy_module_ctx->symbol);
+       if (w == NULL) {
+               register_symbol (&metric->cache, fuzzy_module_ctx->symbol, 1, fuzzy_symbol_callback, NULL);
+       }
+       else {
+               register_symbol (&metric->cache, fuzzy_module_ctx->symbol, *w, fuzzy_symbol_callback, NULL);
+       }
+
+       return res;
+}
+
+int
+fuzzy_check_module_reconfig (struct config_file *cfg)
+{
+       memory_pool_delete (fuzzy_module_ctx->fuzzy_pool);
+       fuzzy_module_ctx->fuzzy_pool = memory_pool_new (memory_pool_get_size ());
+
+       return fuzzy_check_module_config (cfg);
+}
+
+static void
+fuzzy_io_callback (int fd, short what, void *arg)
+{
+       struct fuzzy_client_session *session = arg;
+       struct fuzzy_cmd cmd;
+       char buf[sizeof ("ERR")];
+
+       if (what == EV_WRITE) {
+               /* Send command to storage */
+               cmd.blocksize = session->h->block_size;
+               memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash));
+               cmd.cmd = FUZZY_CHECK;
+               if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) {
+                       goto err;
+               }
+               else {
+                       event_set (&session->ev, fd, EV_READ, fuzzy_io_callback, session);
+                       event_add (&session->ev, &session->tv);
+               }
+       }
+       else if (what == EV_READ) {
+               if (read (fd, buf, sizeof (buf)) == -1) {
+                       goto err;
+               }
+               else if (buf[0] == 'O' && buf[1] == 'K') {
+                       insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL);
+               }
+               goto ok;
+       }
+       
+       return;
+
+       err:
+               msg_err ("fuzzy_io_callback: got error on IO, %d, %s", errno, strerror (errno));
+       ok:
+               event_del (&session->ev);
+               close (fd);
+               session->task->save.saved --;
+               if (session->task->save.saved == 0) {
+                       /* Call other filters */
+                       session->task->save.saved = 1;
+                       process_filters (session->task);
+               }
+
+}
+
+static void 
+fuzzy_symbol_callback (struct worker_task *task, void *unused)
+{
+       struct mime_text_part *part;
+       struct fuzzy_client_session *session;
+       struct storage_server *selected;
+       GList *cur;
+       int sock;
+
+       cur = task->text_parts;
+
+       while (cur) {
+               part = cur->data;
+               selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num,
+                                                                                sizeof (struct storage_server), task->ts.tv_sec, 
+                                                                                DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, 
+                                                                                DEFAULT_UPSTREAM_MAXERRORS,
+                                                                                part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
+               if (selected) {
+                       if ((sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
+                               msg_warn ("fuzzy_symbol_callback: cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
+                       }       
+                       else {
+                               session = memory_pool_alloc (task->task_pool, sizeof (struct fuzzy_client_session));
+                               event_set (&session->ev, sock, EV_WRITE, fuzzy_io_callback, session);
+                               session->tv.tv_sec = IO_TIMEOUT;
+                               session->tv.tv_usec = 0;
+                               session->state = 0;
+                               session->h = part->fuzzy;
+                               session->task = task;
+                               event_add (&session->ev, &session->tv);
+                               task->save.saved ++;
+                       }
+               }
+               cur = g_list_next (cur);
+       }
+}
+
+static int 
+fuzzy_mime_filter (struct worker_task *task)
+{
+       /* XXX: remove this */
+       return 0;
+}