]> source.dussan.org Git - rspamd.git/commitdiff
* Fix process dispatcher
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 20 Aug 2009 15:13:45 +0000 (19:13 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 20 Aug 2009 15:13:45 +0000 (19:13 +0400)
* Use bloom filter in fuzzy storage

src/fuzzy_storage.c
src/main.c
src/main.h

index 01e7e8f448af6781baf2959086b75801d3703644..97c33eed89b86a9bad08c8ebe427844a3888a5c5 100644 (file)
@@ -36,6 +36,7 @@
 #include "modules.h"
 #include "message.h"
 #include "fuzzy.h"
+#include "bloom.h"
 #include "fuzzy_storage.h"
 
 /* This number is used as limit while comparing two fuzzy hashes, this value can vary from 0 to 100 */
@@ -50,6 +51,7 @@
 #define BUCKETS 1024
 
 static GQueue *hashes[BUCKETS];
+static bloom_filter_t *bf;
 
 /* Number of cache modifications */
 static uint32_t mods = 0;
@@ -121,6 +123,7 @@ sync_cache (struct rspamd_worker *wrk)
                                tmp = cur;
                                cur = g_list_next (cur);
                                g_queue_delete_link (hashes[i], tmp);
+                               bloom_del (bf, node->h.hash_pipe);
                                g_free (node);
                                continue;
                        }
@@ -198,6 +201,7 @@ read_hashes_file (struct rspamd_worker *wrk)
                        break;  
                }
                g_queue_push_head (hashes[node->h.block_size % BUCKETS], node);
+               bloom_add (bf, node->h.hash_pipe);
        }
 
        if (r > 0) {
@@ -229,6 +233,10 @@ process_check_command (struct fuzzy_cmd *cmd)
        fuzzy_hash_t s;
        int prob = 0;
        
+       if (!bloom_check (bf, cmd->hash)) {
+               return FALSE;   
+       }
+
        memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
        s.block_size = cmd->blocksize;
        cur = hashes[cmd->blocksize % BUCKETS]->head;
@@ -252,11 +260,16 @@ process_write_command (struct fuzzy_cmd *cmd)
 {
        struct rspamd_fuzzy_node *h;
 
+       if (bloom_check (bf, cmd->hash)) {
+               return FALSE;   
+       }
+
        h = g_malloc (sizeof (struct rspamd_fuzzy_node));
        memcpy (&h->h.hash_pipe, &cmd->hash, sizeof (cmd->hash));
        h->h.block_size = cmd->blocksize;
        h->time = (uint64_t)time (NULL);
        g_queue_push_head (hashes[cmd->blocksize % BUCKETS], h);
+       bloom_add (bf, cmd->hash);
        mods ++;
        msg_info ("process_write_command: fuzzy hash was successfully added");
        
@@ -270,6 +283,10 @@ process_delete_command (struct fuzzy_cmd *cmd)
        struct rspamd_fuzzy_node *h;
        fuzzy_hash_t s;
        gboolean res = FALSE;
+
+       if (!bloom_check (bf, cmd->hash)) {
+               return FALSE;   
+       }
        
        memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
        s.block_size = cmd->blocksize;
@@ -283,6 +300,7 @@ process_delete_command (struct fuzzy_cmd *cmd)
                        tmp = cur;
                        cur = g_list_next (cur);
                        g_queue_delete_link (hashes[cmd->blocksize % BUCKETS], tmp);
+                       bloom_del (bf, cmd->hash);
                        msg_info ("process_delete_command: fuzzy hash was successfully deleted");
                        res = TRUE;
                        mods ++;
@@ -443,7 +461,8 @@ start_fuzzy_storage (struct rspamd_worker *worker)
        /* Send SIGUSR2 to parent */
        kill (getppid (), SIGUSR2);
 
-       
+       /* Init bloom filter */
+       bf = bloom_create (20000000L, DEFAULT_BLOOM_HASHES);
        /* Try to read hashes from file */
        if (!read_hashes_file (worker)) {
                msg_err ("read_hashes_file: cannot read hashes file, it can be created after save procedure");
index 40c4c68f825485f731c898ca4c8a04dda3e5e6d9..4353174f86a7ab835d9de42d04300b20209ee244 100644 (file)
@@ -287,6 +287,7 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
                cur->type = cf->type;
                cur->pid = fork();
                cur->cf = cf;
+               cur->pending = FALSE;
                switch (cur->pid) {
                        case 0:
                                /* Drop privilleges */
@@ -717,7 +718,7 @@ main (int argc, char **argv, char **env)
                                        else {
                                                if (WIFSIGNALED (res)) {
                                                        msg_warn ("main: %s process %d terminated abnormally by signal: %d", 
-                                                                               (cur->type != TYPE_WORKER) ? "controller" : "worker",
+                                                                               (cur->type == TYPE_CONTROLLER) ? "controller" : "worker",
                                                                                cur->pid, WTERMSIG(res));
                                                }
                                                else {
@@ -735,13 +736,15 @@ main (int argc, char **argv, char **env)
                        do_restart = 0;
                        do_reopen_log = 1;
 
-                       msg_info ("main: rspamd "RVERSION " is restarting");
+                       msg_info ("main: rspamd " RVERSION " is restarting");
                        if (active_worker == NULL) {
                                /* reread_config (rspamd); */
                                TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
                                        if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP || cur->type == TYPE_FUZZY) {
                                                /* Start new workers that would reread configuration */
+                                               cur->pending = FALSE;
                                                active_worker = fork_worker (rspamd, cur->cf);
+                                               active_worker->pending = TRUE;
                                        }
                                        /* Immideately send termination request to conroller and wait for SIGCHLD */
                                        if (cur->type == TYPE_CONTROLLER) {
@@ -758,13 +761,12 @@ main (int argc, char **argv, char **env)
                        if (active_worker != NULL) {
                                msg_info ("main: worker process %d has been successfully started", active_worker->pid);
                                TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
-                                       if (cur != active_worker && !cur->is_dying && cur->type != TYPE_CONTROLLER) {
+                                       if (!cur->pending && !cur->is_dying && cur->type != TYPE_CONTROLLER) {
                                                /* Send to old workers SIGUSR2 */
                                                kill (cur->pid, SIGUSR2);
                                                cur->is_dying = 1;
                                        }
                                }
-                               active_worker = NULL;
                        }
                }
                if (got_alarm) {
index ee34a2caebc85dd86e9c6f31fa1e791ef231f69b..ef9a295371feae641aac8451e782792dea45ab79 100644 (file)
@@ -56,8 +56,9 @@ enum process_type {
  */
 struct rspamd_worker {
        pid_t pid;                                                                                                      /**< pid of worker                                                                      */
-       char is_initialized;                                                                            /**< is initialized                                                                     */
-       char is_dying;                                                                                          /**< if worker is going to shutdown                                     */
+       gboolean is_initialized;                                                                        /**< is initialized                                                                     */
+       gboolean is_dying;                                                                                      /**< if worker is going to shutdown                                     */
+       gboolean pending;                                                                                       /**< if worker is pending to run                                        */
        struct rspamd_main *srv;                                                                        /**< pointer to server structure                                        */
        enum process_type type;                                                                         /**< process type                                                                       */
        struct event sig_ev;                                                                            /**< signals event                                                                      */