diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-07-27 20:05:33 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-07-27 20:05:33 +0400 |
commit | 7ed8ac7bcb54df908bd4a5f8b915ec1fdc483ac7 (patch) | |
tree | 8cb06875ef55407d7961d11491346b2899971f18 | |
parent | bc7022c5ce4186892ac4d9eee5899fdbc65e62c4 (diff) | |
download | rspamd-7ed8ac7bcb54df908bd4a5f8b915ec1fdc483ac7.tar.gz rspamd-7ed8ac7bcb54df908bd4a5f8b915ec1fdc483ac7.zip |
* Implement fuzzy hashes storage worker
TODO:
- implement client as rspamd plugin
- add support to controller and rspamc
- improve performance by avoiding usage of linked lists
-rw-r--r-- | CMakeLists.txt | 7 | ||||
-rw-r--r-- | src/fuzzy_storage.c | 342 | ||||
-rw-r--r-- | src/fuzzy_storage.h | 30 | ||||
-rw-r--r-- | src/main.c | 10 |
4 files changed, 385 insertions, 4 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 0ed4fbbd0..ce1026afa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,7 +7,7 @@ PROJECT(rspamd C) SET(RSPAMD_VERSION_MAJOR 0) SET(RSPAMD_VERSION_MINOR 2) -SET(RSPAMD_VERSION_PATCH 5) +SET(RSPAMD_VERSION_PATCH 6) SET(RSPAMD_VERSION "${RSPAMD_VERSION_MAJOR}.${RSPAMD_VERSION_MINOR}.${RSPAMD_VERSION_PATCH}") SET(RSPAMD_MASTER_SITE_URL "http://cebka.pp.ru/hg/rspamd") @@ -268,7 +268,7 @@ ENDIF (MD5_INCLUDE) IF(ENABLE_OPTIMIZATION MATCHES "ON") SET(CMAKE_C_OPT_FLAGS "-O3") ELSE(ENABLE_OPTIMIZATION MATCHES "ON") - SET(CMAKE_C_OPT_FLAGS "-O") + SET(CMAKE_C_OPT_FLAGS "-O0") ENDIF(ENABLE_OPTIMIZATION MATCHES "ON") SET(CMAKE_C_WARN_FLAGS "-Wall -W -Wpointer-arith -Wno-unused-parameter -Wno-unused-function -Wunused-variable -Wno-sign-compare -Wunused-value -Wno-declaration-after-statement -Wno-pointer-sign") @@ -322,7 +322,8 @@ SET(RSPAMDSRC src/modules.c src/radix.c src/view.c src/map.c - src/symbols_cache.c) + src/symbols_cache.c + src/fuzzy_storage.c) IF(ENABLE_PERL MATCHES "ON") LIST(APPEND RSPAMDSRC src/perl.c) diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c new file mode 100644 index 000000000..fa4acce09 --- /dev/null +++ b/src/fuzzy_storage.c @@ -0,0 +1,342 @@ +/* + * Copyright (c) 2009, Rambler media + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * Rspamd fuzzy storage server + */ + + +#include "config.h" +#include "util.h" +#include "main.h" +#include "protocol.h" +#include "upstream.h" +#include "cfg_file.h" +#include "url.h" +#include "modules.h" +#include "message.h" +#include "fuzzy.h" +#include "fuzzy_storage.h" + +/* This number is used as limit while comparing two fuzzy hashes, this value can vary from 0 to 100 */ +#define LEV_LIMIT 99 + +static GQueue *hashes; + +struct rspamd_fuzzy_node { + fuzzy_hash_t h; + uint64_t time; +}; + +static +void sig_handler (int signo) +{ + switch (signo) { + case SIGINT: + case SIGTERM: +#ifdef WITH_PROFILER + exit (0); +#else + _exit (1); +#endif + break; + } +} + +/* + * Config reload is designed by sending sigusr to active workers and pending shutdown of them + */ +static void +sigusr_handler (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + /* Do not accept new connections, preparing to end worker's process */ + struct timeval tv; + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + event_del (&worker->sig_ev); + event_del (&worker->bind_ev); + do_reopen_log = 1; + msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + event_loopexit (&tv); + return; +} + +static gboolean +read_hashes_file (struct rspamd_worker *wrk) +{ + int r, fd; + struct stat st; + char *filename; + struct rspamd_fuzzy_node *node; + + hashes = g_queue_new (); + + filename = g_hash_table_lookup (wrk->cf->params, "hashfile"); + if (filename == NULL) { + return FALSE; + } + + if ((fd = open (filename, O_RDONLY)) == -1) { + msg_err ("read_hashes_file: cannot open hash file %s: %s", filename, strerror (errno)); + return FALSE; + } + + fstat (fd, &st); + + do { + node = g_malloc (sizeof (struct rspamd_fuzzy_node)); + g_queue_push_head (hashes, node); + } + while ((r = read (fd, node, sizeof (struct rspamd_fuzzy_node))) == sizeof (struct rspamd_fuzzy_node)); + + if (r > 0) { + msg_warn ("read_hashes_file: ignore garbadge at the end of file, length of garbadge: %d", r); + } + else if (r == -1) { + msg_err ("read_hashes_file: cannot open read file %s: %s", filename, strerror (errno)); + return FALSE; + } + + return TRUE; +} + +static void +free_session (struct fuzzy_session *session) +{ + /* Delete IO event */ + event_del (&session->ev); + /* Close socket */ + close (session->fd); + g_free (session); +} + +static gboolean +process_check_command (struct fuzzy_cmd *cmd) +{ + GList *cur; + struct rspamd_fuzzy_node *h; + fuzzy_hash_t s; + + memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe)); + s.block_size = cmd->blocksize; + cur = hashes->head; + + /* XXX: too slow way */ + while (cur) { + h = cur->data; + if (fuzzy_compare_hashes (&h->h, &s) > LEV_LIMIT) { + return TRUE; + } + cur = g_list_next (cur); + } + + return FALSE; +} + +static gboolean +process_write_command (struct fuzzy_cmd *cmd) +{ + struct rspamd_fuzzy_node *h; + + h = g_malloc (sizeof (struct rspamd_fuzzy_node)); + memcpy (&h->h.hash_pipe, &cmd->hash, sizeof (cmd->hash)); + h->h.block_size = cmd->blocksize; + h->time = (uint64_t)time (NULL); + g_queue_push_head (hashes, h); + + return TRUE; +} + +static gboolean +process_delete_command (struct fuzzy_cmd *cmd) +{ + GList *cur; + struct rspamd_fuzzy_node *h; + fuzzy_hash_t s; + + memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe)); + s.block_size = cmd->blocksize; + cur = hashes->head; + + /* XXX: too slow way */ + while (cur) { + h = cur->data; + if (fuzzy_compare_hashes (&h->h, &s) > LEV_LIMIT) { + hashes->head = g_list_remove_link (hashes->head, cur); + g_free (h); + g_list_free1 (cur); + return TRUE; + } + cur = g_list_next (cur); + } + + return FALSE; +} + +#define CMD_PROCESS(x) \ +do { \ +if (process_##x##_command (&session->cmd)) { \ + if (write (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1) == -1) { \ + msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); \ + } \ +} \ +else { \ + if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) { \ + msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); \ + } \ +} \ +} while(0) + +static void +process_fuzzy_command (struct fuzzy_session *session) +{ + switch (session->cmd.cmd) { + case FUZZY_CHECK: + CMD_PROCESS(check); + break; + case FUZZY_WRITE: + CMD_PROCESS(write); + break; + case FUZZY_DEL: + CMD_PROCESS(delete); + break; + default: + if (write (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1) == -1) { + msg_err ("process_fuzzy_command: error while writing reply: %s", strerror (errno)); + } + break; + } +} + +#undef CMD_PROCESS + +/* Callback for network IO */ +static void +fuzzy_io_callback (int fd, short what, void *arg) +{ + struct fuzzy_session *session = arg; + ssize_t r; + + /* Got some data */ + if (what == EV_READ) { + if ((r = read (fd, session->pos, (u_char *)&session->cmd + sizeof (struct fuzzy_cmd) - session->pos)) == -1) { + msg_err ("fuzzy_io_callback: got error while reading from socket: %d, %s", errno, strerror (errno)); + free_session (session); + } + else if (session->pos + r == (u_char *)&session->cmd + sizeof (struct fuzzy_cmd)) { + /* Assume that the whole command was read */ + process_fuzzy_command (session); + free_session (session); + } + else { + session->pos += r; + } + } + else { + free_session (session); + } +} + + +/* + * Accept new connection and construct task + */ +static void +accept_socket (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct sockaddr_storage ss; + struct sockaddr_in *sin; + struct fuzzy_session *session; + socklen_t addrlen = sizeof(ss); + int nfd; + + if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { + msg_warn ("accept_socket: accept failed: %s", strerror (errno)); + return; + } + /* Check for EAGAIN */ + if (nfd == 0) { + msg_debug ("accept_socket: cannot accept socket as it was already accepted by other worker"); + return; + } + + if (ss.ss_family == AF_UNIX) { + msg_info ("accept_socket: accepted connection from unix socket"); + } + else if (ss.ss_family == AF_INET) { + sin = (struct sockaddr_in *) &ss; + msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port)); + } + + session = g_malloc (sizeof (struct fuzzy_session)); + + session->worker = worker; + session->fd = nfd; + session->tv.tv_sec = WORKER_IO_TIMEOUT; + session->tv.tv_usec = 0; + session->pos = (u_char *)&session->cmd; + + event_set (&session->ev, session->fd, EV_READ | EV_PERSIST, fuzzy_io_callback, session); + event_add (&session->ev, &session->tv); + +} + +/* + * Start worker process + */ +void +start_fuzzy_storage (struct rspamd_worker *worker) +{ + struct sigaction signals; + + worker->srv->pid = getpid (); + worker->srv->type = TYPE_FUZZY; + + event_init (); + + init_signals (&signals, sig_handler); + sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + + /* SIGUSR2 handler */ + signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker); + signal_add (&worker->sig_ev, NULL); + + /* Accept event */ + event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_add(&worker->bind_ev, NULL); + + /* Send SIGUSR2 to parent */ + kill (getppid (), SIGUSR2); + + /* Try to read hashes from file */ + if (!read_hashes_file (worker)) { + msg_err ("read_hashes_file: cannot read hashes file, it can be created after save procedure"); + } + + event_loop (0); + exit (EXIT_SUCCESS); +} + diff --git a/src/fuzzy_storage.h b/src/fuzzy_storage.h new file mode 100644 index 000000000..533ecaf13 --- /dev/null +++ b/src/fuzzy_storage.h @@ -0,0 +1,30 @@ +#ifndef RSPAMD_FUZZY_STORAGE_H +#define RSPAMD_FUZZY_STORAGE_H + +#include "config.h" +#include "main.h" +#include "fuzzy.h" + +/* Commands for fuzzy storage */ +#define FUZZY_CHECK 0 +#define FUZZY_WRITE 1 +#define FUZZY_DEL 2 + +struct fuzzy_cmd { + u_char cmd; + uint32_t blocksize; + u_char hash[FUZZY_HASHLEN]; +}; + +struct fuzzy_session { + struct rspamd_worker *worker; + struct event ev; + struct fuzzy_cmd cmd; + struct timeval tv; + int fd; + u_char *pos; +}; + +void start_fuzzy_storage (struct rspamd_worker *worker); + +#endif diff --git a/src/main.c b/src/main.c index 00cf68203..40c4c68f8 100644 --- a/src/main.c +++ b/src/main.c @@ -27,6 +27,7 @@ #include "cfg_file.h" #include "util.h" #include "lmtp.h" +#include "fuzzy_storage.h" #ifndef WITHOUT_PERL @@ -302,6 +303,13 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf) pidfile_close (rspamd->pfh); msg_info ("fork_worker: starting lmtp process %d", getpid ()); start_lmtp_worker (cur); + break; + case TYPE_FUZZY: + setproctitle ("fuzzy storage"); + pidfile_close (rspamd->pfh); + msg_info ("fork_worker: starting fuzzy storage process %d", getpid ()); + start_fuzzy_storage (cur); + break; case TYPE_WORKER: default: setproctitle ("worker process"); @@ -731,7 +739,7 @@ main (int argc, char **argv, char **env) if (active_worker == NULL) { /* reread_config (rspamd); */ TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) { - if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP) { + if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP || cur->type == TYPE_FUZZY) { /* Start new workers that would reread configuration */ active_worker = fork_worker (rspamd, cur->cf); } |