]> source.dussan.org Git - rspamd.git/commitdiff
* Implement fuzzy hashes storage worker
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 27 Jul 2009 16:05:33 +0000 (20:05 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 27 Jul 2009 16:05:33 +0000 (20:05 +0400)
TODO:
- implement client as rspamd plugin
- add support to controller and rspamc
- improve performance by avoiding usage of linked lists

CMakeLists.txt
src/fuzzy_storage.c [new file with mode: 0644]
src/fuzzy_storage.h [new file with mode: 0644]
src/main.c

index 0ed4fbbd0824c01fc0d244fe8a35c190bd58350f..ce1026afa998c06408c6fdb0f58ed2556cc968f6 100644 (file)
@@ -7,7 +7,7 @@ PROJECT(rspamd C)
 
 SET(RSPAMD_VERSION_MAJOR 0)
 SET(RSPAMD_VERSION_MINOR 2)
-SET(RSPAMD_VERSION_PATCH 5)
+SET(RSPAMD_VERSION_PATCH 6)
 
 SET(RSPAMD_VERSION         "${RSPAMD_VERSION_MAJOR}.${RSPAMD_VERSION_MINOR}.${RSPAMD_VERSION_PATCH}")
 SET(RSPAMD_MASTER_SITE_URL "http://cebka.pp.ru/hg/rspamd")
@@ -268,7 +268,7 @@ ENDIF (MD5_INCLUDE)
 IF(ENABLE_OPTIMIZATION MATCHES "ON")
        SET(CMAKE_C_OPT_FLAGS "-O3")
 ELSE(ENABLE_OPTIMIZATION MATCHES "ON")
-       SET(CMAKE_C_OPT_FLAGS "-O")
+       SET(CMAKE_C_OPT_FLAGS "-O0")
 ENDIF(ENABLE_OPTIMIZATION MATCHES "ON")
 
 SET(CMAKE_C_WARN_FLAGS "-Wall -W -Wpointer-arith -Wno-unused-parameter -Wno-unused-function -Wunused-variable -Wno-sign-compare -Wunused-value -Wno-declaration-after-statement -Wno-pointer-sign")
@@ -322,7 +322,8 @@ SET(RSPAMDSRC       src/modules.c
                                src/radix.c
                                src/view.c
                                src/map.c
-                               src/symbols_cache.c)
+                               src/symbols_cache.c
+                               src/fuzzy_storage.c)
 
 IF(ENABLE_PERL MATCHES "ON")
        LIST(APPEND RSPAMDSRC src/perl.c)
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
new file mode 100644 (file)
index 0000000..fa4acce
--- /dev/null
@@ -0,0 +1,342 @@
+/*
+ * Copyright (c) 2009, Rambler media
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * Rspamd fuzzy storage server
+ */
+
+
+#include "config.h"
+#include "util.h"
+#include "main.h"
+#include "protocol.h"
+#include "upstream.h"
+#include "cfg_file.h"
+#include "url.h"
+#include "modules.h"
+#include "message.h"
+#include "fuzzy.h"
+#include "fuzzy_storage.h"
+
+/* This number is used as limit while comparing two fuzzy hashes, this value can vary from 0 to 100 */
+#define LEV_LIMIT 99
+
+static GQueue *hashes;
+
+struct rspamd_fuzzy_node {
+       fuzzy_hash_t h;
+       uint64_t time;
+};
+
+static 
+void sig_handler (int signo)
+{
+       switch (signo) {
+               case SIGINT:
+               case SIGTERM:
+#ifdef WITH_PROFILER
+                       exit (0);
+#else
+                       _exit (1);
+#endif
+                       break;
+       }
+}
+
+/*
+ * Config reload is designed by sending sigusr to active workers and pending shutdown of them
+ */
+static void
+sigusr_handler (int fd, short what, void *arg)
+{
+       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+       /* Do not accept new connections, preparing to end worker's process */
+       struct timeval tv;
+       tv.tv_sec = SOFT_SHUTDOWN_TIME;
+       tv.tv_usec = 0;
+       event_del (&worker->sig_ev);
+       event_del (&worker->bind_ev);
+       do_reopen_log = 1;
+       msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+       event_loopexit (&tv);
+       return;
+}
+
+static gboolean
+read_hashes_file (struct rspamd_worker *wrk)
+{
+       int r, fd;
+       struct stat st;
+       char *filename;
+       struct rspamd_fuzzy_node *node;
+
+       hashes = g_queue_new ();
+
+       filename = g_hash_table_lookup (wrk->cf->params, "hashfile");
+       if (filename == NULL) {
+               return FALSE;
+       }
+
+       if ((fd = open (filename, O_RDONLY)) == -1) {
+               msg_err ("read_hashes_file: cannot open hash file %s: %s", filename, strerror (errno));
+               return FALSE;
+       }
+       
+       fstat (fd, &st);
+       
+       do {
+               node = g_malloc (sizeof (struct rspamd_fuzzy_node));
+               g_queue_push_head (hashes, node);
+       }
+       while ((r = read (fd, node, sizeof (struct rspamd_fuzzy_node))) == sizeof (struct rspamd_fuzzy_node));
+
+       if (r > 0) {
+               msg_warn ("read_hashes_file: ignore garbadge at the end of file, length of garbadge: %d", r);
+       }
+       else if (r == -1) {
+               msg_err ("read_hashes_file: cannot open read file %s: %s", filename, strerror (errno));
+               return FALSE;
+       }
+
+       return TRUE;
+}
+
+static void
+free_session (struct fuzzy_session *session)
+{
+       /* Delete IO event */
+       event_del (&session->ev);
+       /* Close socket */
+       close (session->fd);
+       g_free (session);
+}
+
+static gboolean
+process_check_command (struct fuzzy_cmd *cmd)
+{
+       GList *cur;
+       struct rspamd_fuzzy_node *h;
+       fuzzy_hash_t s;
+       
+       memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
+       s.block_size = cmd->blocksize;
+       cur = hashes->head;
+
+       /* XXX: too slow way */
+       while (cur) {
+               h = cur->data;
+               if (fuzzy_compare_hashes (&h->h, &s) > LEV_LIMIT) {
+                       return TRUE;
+               }
+               cur = g_list_next (cur);
+       }
+
+       return FALSE;
+}
+
+static gboolean
+process_write_command (struct fuzzy_cmd *cmd)
+{
+       struct rspamd_fuzzy_node *h;
+
+       h = g_malloc (sizeof (struct rspamd_fuzzy_node));
+       memcpy (&h->h.hash_pipe, &cmd->hash, sizeof (cmd->hash));
+       h->h.block_size = cmd->blocksize;
+       h->time = (uint64_t)time (NULL);
+       g_queue_push_head (hashes, h);
+       
+       return TRUE;
+}
+
+static gboolean
+process_delete_command (struct fuzzy_cmd *cmd)
+{
+       GList *cur;
+       struct rspamd_fuzzy_node *h;
+       fuzzy_hash_t s;
+       
+       memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
+       s.block_size = cmd->blocksize;
+       cur = hashes->head;
+
+       /* XXX: too slow way */
+       while (cur) {
+               h = cur->data;
+               if (fuzzy_compare_hashes (&h->h, &s) > LEV_LIMIT) {
+                       hashes->head = g_list_remove_link (hashes->head, cur);
+                       g_free (h);
+                       g_list_free1 (cur);
+                       return TRUE;
+               }
+               cur = g_list_next (cur);
+       }
+
+       return FALSE;
+}
+
+#define CMD_PROCESS(x)                                                                                                                                                 \
+do {                                                                                                                                                                                   \
+if (process_##x##_command (&session->cmd)) {                                                                                                   \
+       if (write (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1) == -1) {                                                     \
+               msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno));             \
+       }                                                                                                                                                                                       \
+}                                                                                                                                                                                              \
+else {                                                                                                                                                                                 \
+       if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) {                                           \
+               msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno));             \
+       }                                                                                                                                                                                       \
+}                                                                                                                                                                                              \
+} while(0)
+
+static void
+process_fuzzy_command (struct fuzzy_session *session)
+{
+       switch (session->cmd.cmd) {
+               case FUZZY_CHECK:
+                       CMD_PROCESS(check);
+                       break;
+               case FUZZY_WRITE:
+                       CMD_PROCESS(write);
+                       break;  
+               case FUZZY_DEL:
+                       CMD_PROCESS(delete);
+                       break;  
+               default:
+                       if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) {
+                               msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno));
+                       }
+                       break;
+       }
+}
+
+#undef CMD_PROCESS
+
+/* Callback for network IO */
+static void
+fuzzy_io_callback (int fd, short what, void *arg)
+{
+       struct fuzzy_session *session = arg;
+       ssize_t r;
+       
+       /* Got some data */
+       if (what == EV_READ) {
+               if ((r = read (fd, session->pos, (u_char *)&session->cmd + sizeof (struct fuzzy_cmd) - session->pos)) == -1) {
+                       msg_err ("fuzzy_io_callback: got error while reading from socket: %d, %s", errno, strerror (errno));
+                       free_session (session);
+               }
+               else if (session->pos + r == (u_char *)&session->cmd + sizeof (struct fuzzy_cmd)) {
+                       /* Assume that the whole command was read */
+                       process_fuzzy_command (session);
+                       free_session (session);
+               }
+               else {
+                       session->pos += r;
+               }
+       }
+       else {
+               free_session (session);
+       }
+}
+
+
+/*
+ * Accept new connection and construct task
+ */
+static void
+accept_socket (int fd, short what, void *arg)
+{
+       struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+       struct sockaddr_storage ss;
+    struct sockaddr_in *sin;
+       struct fuzzy_session *session;
+       socklen_t addrlen = sizeof(ss);
+       int nfd;
+       
+       if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
+               msg_warn ("accept_socket: accept failed: %s", strerror (errno));
+               return;
+       }
+       /* Check for EAGAIN */
+       if (nfd == 0) {
+               msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker");
+               return;
+       }
+
+    if (ss.ss_family == AF_UNIX) {
+        msg_info ("accept_socket: accepted connection from unix socket");
+    }
+    else if (ss.ss_family == AF_INET) {
+        sin = (struct sockaddr_in *) &ss;
+        msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
+    }
+       
+       session = g_malloc (sizeof (struct fuzzy_session));
+
+       session->worker = worker;
+       session->fd = nfd;
+       session->tv.tv_sec = WORKER_IO_TIMEOUT;
+       session->tv.tv_usec = 0;
+       session->pos = (u_char *)&session->cmd;
+
+       event_set (&session->ev, session->fd, EV_READ | EV_PERSIST, fuzzy_io_callback, session);
+       event_add (&session->ev, &session->tv);
+
+}
+
+/*
+ * Start worker process
+ */
+void
+start_fuzzy_storage (struct rspamd_worker *worker)
+{
+       struct sigaction signals;
+
+       worker->srv->pid = getpid ();
+       worker->srv->type = TYPE_FUZZY;
+
+       event_init ();
+
+       init_signals (&signals, sig_handler);
+       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+
+       /* SIGUSR2 handler */
+       signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
+       signal_add (&worker->sig_ev, NULL);
+
+       /* Accept event */
+       event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+       event_add(&worker->bind_ev, NULL);
+
+       /* Send SIGUSR2 to parent */
+       kill (getppid (), SIGUSR2);
+       
+       /* Try to read hashes from file */
+       if (!read_hashes_file (worker)) {
+               msg_err ("read_hashes_file: cannot read hashes file, it can be created after save procedure");
+       }
+
+       event_loop (0);
+       exit (EXIT_SUCCESS);
+}
+
diff --git a/src/fuzzy_storage.h b/src/fuzzy_storage.h
new file mode 100644 (file)
index 0000000..533ecaf
--- /dev/null
@@ -0,0 +1,30 @@
+#ifndef RSPAMD_FUZZY_STORAGE_H
+#define RSPAMD_FUZZY_STORAGE_H
+
+#include "config.h"
+#include "main.h"
+#include "fuzzy.h"
+
+/* Commands for fuzzy storage */
+#define FUZZY_CHECK 0
+#define FUZZY_WRITE 1
+#define FUZZY_DEL 2
+
+struct fuzzy_cmd {
+       u_char cmd;
+       uint32_t blocksize;
+       u_char hash[FUZZY_HASHLEN];
+};
+
+struct fuzzy_session {
+       struct rspamd_worker *worker;
+       struct event ev;
+       struct fuzzy_cmd cmd;
+       struct timeval tv;
+       int fd;
+       u_char *pos;
+};
+
+void start_fuzzy_storage (struct rspamd_worker *worker);
+
+#endif
index 00cf6820360454f8037c727169aaee761fa510f5..40c4c68f825485f731c898ca4c8a04dda3e5e6d9 100644 (file)
@@ -27,6 +27,7 @@
 #include "cfg_file.h"
 #include "util.h"
 #include "lmtp.h"
+#include "fuzzy_storage.h"
 
 #ifndef WITHOUT_PERL
 
@@ -302,6 +303,13 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
                                                pidfile_close (rspamd->pfh);
                                                msg_info ("fork_worker: starting lmtp process %d", getpid ());
                                                start_lmtp_worker (cur);
+                                               break;
+                                       case TYPE_FUZZY:
+                                               setproctitle ("fuzzy storage");
+                                               pidfile_close (rspamd->pfh);
+                                               msg_info ("fork_worker: starting fuzzy storage process %d", getpid ());
+                                               start_fuzzy_storage (cur);
+                                               break;
                                        case TYPE_WORKER:
                                        default:
                                                setproctitle ("worker process");
@@ -731,7 +739,7 @@ main (int argc, char **argv, char **env)
                        if (active_worker == NULL) {
                                /* reread_config (rspamd); */
                                TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
-                                       if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP) {
+                                       if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP || cur->type == TYPE_FUZZY) {
                                                /* Start new workers that would reread configuration */
                                                active_worker = fork_worker (rspamd, cur->cf);
                                        }