diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-07-28 20:09:50 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-07-28 20:09:50 +0400 |
commit | b92225677f444858c81b9bd4900d5ddcf9eb801a (patch) | |
tree | 833cb69b4a62948636e20ddc5808a2913fd0bd19 /src/fuzzy_storage.c | |
parent | 188018fe88416dbe918402a650da4153066d1658 (diff) | |
download | rspamd-b92225677f444858c81b9bd4900d5ddcf9eb801a.tar.gz rspamd-b92225677f444858c81b9bd4900d5ddcf9eb801a.zip |
* Add client part for fuzzy checksums storage
* Add fuzzy storage syncing to file
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r-- | src/fuzzy_storage.c | 123 |
1 files changed, 115 insertions, 8 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index fa4acce09..a0aa00e50 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -26,7 +26,6 @@ * Rspamd fuzzy storage server */ - #include "config.h" #include "util.h" #include "main.h" @@ -41,19 +40,33 @@ /* This number is used as limit while comparing two fuzzy hashes, this value can vary from 0 to 100 */ #define LEV_LIMIT 99 +/* This number is used as limit while we are making decision to write new hash file or not */ +#define MOD_LIMIT 10 +/* This number is used as expire time in seconds for cache items (2 days) */ +#define DEFAULT_EXPIRE 172800L +/* Resync value in seconds */ +#define SYNC_TIMEOUT 60 static GQueue *hashes; +/* Number of cache modifications */ +static uint32_t mods = 0; +/* For evtimer */ +static struct timeval tmv; +static struct event tev; + struct rspamd_fuzzy_node { fuzzy_hash_t h; uint64_t time; }; -static -void sig_handler (int signo) -{ +static void +sig_handler (int signo) +{ switch (signo) { case SIGINT: + /* Ignore SIGINT as we should got SIGTERM after it anyway */ + return; case SIGTERM: #ifdef WITH_PROFILER exit (0); @@ -64,6 +77,73 @@ void sig_handler (int signo) } } +static void +sync_cache (struct rspamd_worker *wrk) +{ + int fd; + char *filename, *exp_str; + GList *cur, *tmp; + struct rspamd_fuzzy_node *node; + uint64_t expire, now; + + /* Check for modifications */ + if (mods < MOD_LIMIT) { + return; + } + + msg_info ("sync_cache: syncing fuzzy hash storage"); + filename = g_hash_table_lookup (wrk->cf->params, "hashfile"); + if (filename == NULL) { + return; + } + exp_str = g_hash_table_lookup (wrk->cf->params, "expire"); + if (exp_str != NULL) { + expire = parse_seconds (exp_str) / 1000; + } + else { + expire = DEFAULT_EXPIRE; + } + + if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) { + msg_err ("sync_cache: cannot create hash file %s: %s", filename, strerror (errno)); + return; + } + + now = (uint64_t)time (NULL); + cur = hashes->head; + while (cur) { + node = cur->data; + if (now - node->time > expire) { + /* Remove expired item */ + cur = g_list_next (cur); + hashes->head = g_list_remove_link (hashes->head, cur); + g_free (node); + g_list_free1 (tmp); + continue; + } + if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) { + msg_err ("sync_cache: cannot write file %s: %s", filename, strerror (errno)); + } + cur = g_list_next (cur); + } + + close (fd); +} + +static void +sigterm_handler (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + static struct timeval tv = { + .tv_sec = 0, + .tv_usec = 0, + }; + + mods = MOD_LIMIT + 1; + sync_cache (worker); + (void)event_loopexit (&tv); +} + /* * Config reload is designed by sending sigusr to active workers and pending shutdown of them */ @@ -165,6 +245,7 @@ process_write_command (struct fuzzy_cmd *cmd) h->h.block_size = cmd->blocksize; h->time = (uint64_t)time (NULL); g_queue_push_head (hashes, h); + mods ++; return TRUE; } @@ -187,6 +268,7 @@ process_delete_command (struct fuzzy_cmd *cmd) hashes->head = g_list_remove_link (hashes->head, cur); g_free (h); g_list_free1 (cur); + mods ++; return TRUE; } cur = g_list_next (cur); @@ -304,6 +386,20 @@ accept_socket (int fd, short what, void *arg) } +static void +sync_callback (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + /* Timer event */ + evtimer_set (&tev, sync_callback, worker); + /* Plan event with jitter */ + tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double (); + tmv.tv_usec = 0; + evtimer_add (&tev, &tmv); + + sync_cache (worker); +} + /* * Start worker process */ @@ -311,6 +407,7 @@ void start_fuzzy_storage (struct rspamd_worker *worker) { struct sigaction signals; + struct event sev; worker->srv->pid = getpid (); worker->srv->type = TYPE_FUZZY; @@ -323,18 +420,28 @@ start_fuzzy_storage (struct rspamd_worker *worker) /* SIGUSR2 handler */ signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker); signal_add (&worker->sig_ev, NULL); - - /* Accept event */ - event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); - event_add(&worker->bind_ev, NULL); + signal_set (&sev, SIGTERM, sigterm_handler, (void *) worker); + signal_add (&sev, NULL); /* Send SIGUSR2 to parent */ kill (getppid (), SIGUSR2); + /* Try to read hashes from file */ if (!read_hashes_file (worker)) { msg_err ("read_hashes_file: cannot read hashes file, it can be created after save procedure"); } + /* Timer event */ + evtimer_set (&tev, sync_callback, worker); + /* Plan event with jitter */ + tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double (); + tmv.tv_usec = 0; + evtimer_add (&tev, &tmv); + + /* Accept event */ + event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_add(&worker->bind_ev, NULL); + event_loop (0); exit (EXIT_SUCCESS); |