From ca72f2d5e8e90d6dfb61fe407e8b532894a885ac Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 20 Aug 2009 19:13:45 +0400 Subject: [PATCH] * Fix process dispatcher * Use bloom filter in fuzzy storage --- src/fuzzy_storage.c | 21 ++++++++++++++++++++- src/main.c | 10 ++++++---- src/main.h | 5 +++-- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 01e7e8f44..97c33eed8 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -36,6 +36,7 @@ #include "modules.h" #include "message.h" #include "fuzzy.h" +#include "bloom.h" #include "fuzzy_storage.h" /* This number is used as limit while comparing two fuzzy hashes, this value can vary from 0 to 100 */ @@ -50,6 +51,7 @@ #define BUCKETS 1024 static GQueue *hashes[BUCKETS]; +static bloom_filter_t *bf; /* Number of cache modifications */ static uint32_t mods = 0; @@ -121,6 +123,7 @@ sync_cache (struct rspamd_worker *wrk) tmp = cur; cur = g_list_next (cur); g_queue_delete_link (hashes[i], tmp); + bloom_del (bf, node->h.hash_pipe); g_free (node); continue; } @@ -198,6 +201,7 @@ read_hashes_file (struct rspamd_worker *wrk) break; } g_queue_push_head (hashes[node->h.block_size % BUCKETS], node); + bloom_add (bf, node->h.hash_pipe); } if (r > 0) { @@ -229,6 +233,10 @@ process_check_command (struct fuzzy_cmd *cmd) fuzzy_hash_t s; int prob = 0; + if (!bloom_check (bf, cmd->hash)) { + return FALSE; + } + memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe)); s.block_size = cmd->blocksize; cur = hashes[cmd->blocksize % BUCKETS]->head; @@ -252,11 +260,16 @@ process_write_command (struct fuzzy_cmd *cmd) { struct rspamd_fuzzy_node *h; + if (bloom_check (bf, cmd->hash)) { + return FALSE; + } + h = g_malloc (sizeof (struct rspamd_fuzzy_node)); memcpy (&h->h.hash_pipe, &cmd->hash, sizeof (cmd->hash)); h->h.block_size = cmd->blocksize; h->time = (uint64_t)time (NULL); g_queue_push_head (hashes[cmd->blocksize % BUCKETS], h); + bloom_add (bf, cmd->hash); mods ++; msg_info ("process_write_command: fuzzy hash was successfully added"); @@ -270,6 +283,10 @@ process_delete_command (struct fuzzy_cmd *cmd) struct rspamd_fuzzy_node *h; fuzzy_hash_t s; gboolean res = FALSE; + + if (!bloom_check (bf, cmd->hash)) { + return FALSE; + } memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe)); s.block_size = cmd->blocksize; @@ -283,6 +300,7 @@ process_delete_command (struct fuzzy_cmd *cmd) tmp = cur; cur = g_list_next (cur); g_queue_delete_link (hashes[cmd->blocksize % BUCKETS], tmp); + bloom_del (bf, cmd->hash); msg_info ("process_delete_command: fuzzy hash was successfully deleted"); res = TRUE; mods ++; @@ -443,7 +461,8 @@ start_fuzzy_storage (struct rspamd_worker *worker) /* Send SIGUSR2 to parent */ kill (getppid (), SIGUSR2); - + /* Init bloom filter */ + bf = bloom_create (20000000L, DEFAULT_BLOOM_HASHES); /* Try to read hashes from file */ if (!read_hashes_file (worker)) { msg_err ("read_hashes_file: cannot read hashes file, it can be created after save procedure"); diff --git a/src/main.c b/src/main.c index 40c4c68f8..4353174f8 100644 --- a/src/main.c +++ b/src/main.c @@ -287,6 +287,7 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf) cur->type = cf->type; cur->pid = fork(); cur->cf = cf; + cur->pending = FALSE; switch (cur->pid) { case 0: /* Drop privilleges */ @@ -717,7 +718,7 @@ main (int argc, char **argv, char **env) else { if (WIFSIGNALED (res)) { msg_warn ("main: %s process %d terminated abnormally by signal: %d", - (cur->type != TYPE_WORKER) ? "controller" : "worker", + (cur->type == TYPE_CONTROLLER) ? "controller" : "worker", cur->pid, WTERMSIG(res)); } else { @@ -735,13 +736,15 @@ main (int argc, char **argv, char **env) do_restart = 0; do_reopen_log = 1; - msg_info ("main: rspamd "RVERSION " is restarting"); + msg_info ("main: rspamd " RVERSION " is restarting"); if (active_worker == NULL) { /* reread_config (rspamd); */ TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) { if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP || cur->type == TYPE_FUZZY) { /* Start new workers that would reread configuration */ + cur->pending = FALSE; active_worker = fork_worker (rspamd, cur->cf); + active_worker->pending = TRUE; } /* Immideately send termination request to conroller and wait for SIGCHLD */ if (cur->type == TYPE_CONTROLLER) { @@ -758,13 +761,12 @@ main (int argc, char **argv, char **env) if (active_worker != NULL) { msg_info ("main: worker process %d has been successfully started", active_worker->pid); TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) { - if (cur != active_worker && !cur->is_dying && cur->type != TYPE_CONTROLLER) { + if (!cur->pending && !cur->is_dying && cur->type != TYPE_CONTROLLER) { /* Send to old workers SIGUSR2 */ kill (cur->pid, SIGUSR2); cur->is_dying = 1; } } - active_worker = NULL; } } if (got_alarm) { diff --git a/src/main.h b/src/main.h index ee34a2cae..ef9a29537 100644 --- a/src/main.h +++ b/src/main.h @@ -56,8 +56,9 @@ enum process_type { */ struct rspamd_worker { pid_t pid; /**< pid of worker */ - char is_initialized; /**< is initialized */ - char is_dying; /**< if worker is going to shutdown */ + gboolean is_initialized; /**< is initialized */ + gboolean is_dying; /**< if worker is going to shutdown */ + gboolean pending; /**< if worker is pending to run */ struct rspamd_main *srv; /**< pointer to server structure */ enum process_type type; /**< process type */ struct event sig_ev; /**< signals event */ -- 2.39.5