aboutsummaryrefslogtreecommitdiffstats
path: root/src/fuzzy_storage.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-07-28 20:09:50 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-07-28 20:09:50 +0400
commitb92225677f444858c81b9bd4900d5ddcf9eb801a (patch)
tree833cb69b4a62948636e20ddc5808a2913fd0bd19 /src/fuzzy_storage.c
parent188018fe88416dbe918402a650da4153066d1658 (diff)
downloadrspamd-b92225677f444858c81b9bd4900d5ddcf9eb801a.tar.gz
rspamd-b92225677f444858c81b9bd4900d5ddcf9eb801a.zip
* Add client part for fuzzy checksums storage
* Add fuzzy storage syncing to file
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r--src/fuzzy_storage.c123
1 files changed, 115 insertions, 8 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index fa4acce09..a0aa00e50 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -26,7 +26,6 @@
* Rspamd fuzzy storage server
*/
-
#include "config.h"
#include "util.h"
#include "main.h"
@@ -41,19 +40,33 @@
/* This number is used as limit while comparing two fuzzy hashes, this value can vary from 0 to 100 */
#define LEV_LIMIT 99
+/* This number is used as limit while we are making decision to write new hash file or not */
+#define MOD_LIMIT 10
+/* This number is used as expire time in seconds for cache items (2 days) */
+#define DEFAULT_EXPIRE 172800L
+/* Resync value in seconds */
+#define SYNC_TIMEOUT 60
static GQueue *hashes;
+/* Number of cache modifications */
+static uint32_t mods = 0;
+/* For evtimer */
+static struct timeval tmv;
+static struct event tev;
+
struct rspamd_fuzzy_node {
fuzzy_hash_t h;
uint64_t time;
};
-static
-void sig_handler (int signo)
-{
+static void
+sig_handler (int signo)
+{
switch (signo) {
case SIGINT:
+ /* Ignore SIGINT as we should got SIGTERM after it anyway */
+ return;
case SIGTERM:
#ifdef WITH_PROFILER
exit (0);
@@ -64,6 +77,73 @@ void sig_handler (int signo)
}
}
+static void
+sync_cache (struct rspamd_worker *wrk)
+{
+ int fd;
+ char *filename, *exp_str;
+ GList *cur, *tmp;
+ struct rspamd_fuzzy_node *node;
+ uint64_t expire, now;
+
+ /* Check for modifications */
+ if (mods < MOD_LIMIT) {
+ return;
+ }
+
+ msg_info ("sync_cache: syncing fuzzy hash storage");
+ filename = g_hash_table_lookup (wrk->cf->params, "hashfile");
+ if (filename == NULL) {
+ return;
+ }
+ exp_str = g_hash_table_lookup (wrk->cf->params, "expire");
+ if (exp_str != NULL) {
+ expire = parse_seconds (exp_str) / 1000;
+ }
+ else {
+ expire = DEFAULT_EXPIRE;
+ }
+
+ if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) {
+ msg_err ("sync_cache: cannot create hash file %s: %s", filename, strerror (errno));
+ return;
+ }
+
+ now = (uint64_t)time (NULL);
+ cur = hashes->head;
+ while (cur) {
+ node = cur->data;
+ if (now - node->time > expire) {
+ /* Remove expired item */
+ cur = g_list_next (cur);
+ hashes->head = g_list_remove_link (hashes->head, cur);
+ g_free (node);
+ g_list_free1 (tmp);
+ continue;
+ }
+ if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
+ msg_err ("sync_cache: cannot write file %s: %s", filename, strerror (errno));
+ }
+ cur = g_list_next (cur);
+ }
+
+ close (fd);
+}
+
+static void
+sigterm_handler (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ static struct timeval tv = {
+ .tv_sec = 0,
+ .tv_usec = 0,
+ };
+
+ mods = MOD_LIMIT + 1;
+ sync_cache (worker);
+ (void)event_loopexit (&tv);
+}
+
/*
* Config reload is designed by sending sigusr to active workers and pending shutdown of them
*/
@@ -165,6 +245,7 @@ process_write_command (struct fuzzy_cmd *cmd)
h->h.block_size = cmd->blocksize;
h->time = (uint64_t)time (NULL);
g_queue_push_head (hashes, h);
+ mods ++;
return TRUE;
}
@@ -187,6 +268,7 @@ process_delete_command (struct fuzzy_cmd *cmd)
hashes->head = g_list_remove_link (hashes->head, cur);
g_free (h);
g_list_free1 (cur);
+ mods ++;
return TRUE;
}
cur = g_list_next (cur);
@@ -304,6 +386,20 @@ accept_socket (int fd, short what, void *arg)
}
+static void
+sync_callback (int fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ /* Timer event */
+ evtimer_set (&tev, sync_callback, worker);
+ /* Plan event with jitter */
+ tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double ();
+ tmv.tv_usec = 0;
+ evtimer_add (&tev, &tmv);
+
+ sync_cache (worker);
+}
+
/*
* Start worker process
*/
@@ -311,6 +407,7 @@ void
start_fuzzy_storage (struct rspamd_worker *worker)
{
struct sigaction signals;
+ struct event sev;
worker->srv->pid = getpid ();
worker->srv->type = TYPE_FUZZY;
@@ -323,18 +420,28 @@ start_fuzzy_storage (struct rspamd_worker *worker)
/* SIGUSR2 handler */
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
signal_add (&worker->sig_ev, NULL);
-
- /* Accept event */
- event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
- event_add(&worker->bind_ev, NULL);
+ signal_set (&sev, SIGTERM, sigterm_handler, (void *) worker);
+ signal_add (&sev, NULL);
/* Send SIGUSR2 to parent */
kill (getppid (), SIGUSR2);
+
/* Try to read hashes from file */
if (!read_hashes_file (worker)) {
msg_err ("read_hashes_file: cannot read hashes file, it can be created after save procedure");
}
+ /* Timer event */
+ evtimer_set (&tev, sync_callback, worker);
+ /* Plan event with jitter */
+ tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double ();
+ tmv.tv_usec = 0;
+ evtimer_add (&tev, &tmv);
+
+ /* Accept event */
+ event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_add(&worker->bind_ev, NULL);
+
event_loop (0);
exit (EXIT_SUCCESS);