]> source.dussan.org Git - rspamd.git/commitdiff
* Add support for Judy storage for fuzzy hashes
authorcebka@lenovo-laptop <cebka@lenovo-laptop>
Mon, 22 Mar 2010 15:41:35 +0000 (18:41 +0300)
committercebka@lenovo-laptop <cebka@lenovo-laptop>
Mon, 22 Mar 2010 15:41:35 +0000 (18:41 +0300)
CMakeLists.txt
src/fuzzy_storage.c
src/main.c
src/plugins/fuzzy_check.c

index 7e866f719ea3da3bddd1241a82aeb36936316893..4d1d60ce049f7c942a65bc009dad133be7cd089f 100644 (file)
@@ -172,22 +172,22 @@ INCLUDE_DIRECTORIES("${LIBEVENT_INCLUDE}")
 LINK_DIRECTORIES("${LIBEVENT_PATH}")
 
 # Find libjudy
-#FIND_LIBRARY(LIBJUDY_LIBRARY NAMES judy PATHS /lib
-#                                                      /opt/lib
-#                                              /usr/lib
-#                                              /usr/local/lib
-#                 DOC "Path where the lijudy library can be found")
-#IF(LIBJUDY_LIBRARY)
-#      FIND_PATH(LIBJUDY_INCLUDE Judy.h PATHS  /opt/include
-#                                                                                      /usr/include
-#                                                                                      /usr/local/include
-#                                                                                      DOC "Path where the judy header files can be found")
-#
-#      GET_FILENAME_COMPONENT(LIBJUDY_PATH "${LIBJUDY_LIBRARY}" PATH)
-#      INCLUDE_DIRECTORIES("${LIBJUDY_INCLUDE}")
-#      LINK_DIRECTORIES("${LIBJUDY_PATH}")
-#      SET(WITH_JUDY 1)
-#ENDIF(LIBJUDY_LIBRARY)
+FIND_LIBRARY(LIBJUDY_LIBRARY NAMES Judy PATHS /lib
+                                                       /opt/lib
+                                               /usr/lib
+                                               /usr/local/lib
+                 DOC "Path where the libjudy library can be found")
+IF(LIBJUDY_LIBRARY)
+       FIND_PATH(LIBJUDY_INCLUDE Judy.h PATHS  /opt/include
+                                                                                       /usr/include
+                                                                                       /usr/local/include
+                                                                                       DOC "Path where the judy header files can be found")
+
+       GET_FILENAME_COMPONENT(LIBJUDY_PATH "${LIBJUDY_LIBRARY}" PATH)
+       INCLUDE_DIRECTORIES("${LIBJUDY_INCLUDE}")
+       LINK_DIRECTORIES("${LIBJUDY_PATH}")
+       SET(WITH_JUDY 1)
+ENDIF(LIBJUDY_LIBRARY)
 
 IF(ENABLE_PROFILING MATCHES "ON")
        SET(WITH_PROFILER 1)
@@ -554,7 +554,7 @@ IF(LIBUTIL_LIBRARY)
        TARGET_LINK_LIBRARIES(rspamd util)
 ENDIF(LIBUTIL_LIBRARY)
 IF(LIBJUDY_LIBRARY)
-       TARGET_LINK_LIBRARIES(rspamd judy)
+       TARGET_LINK_LIBRARIES(rspamd Judy)
 ENDIF(LIBJUDY_LIBRARY)
 TARGET_LINK_LIBRARIES(rspamd rspamd_evdns)
 TARGET_LINK_LIBRARIES(rspamd event)
index 882130a04c72bef32565f8f5ff44b6dc583016e0..bd9096af988df3137d1f35f60d3dc9362b98245c 100644 (file)
 #include "bloom.h"
 #include "fuzzy_storage.h"
 
+#ifdef WITH_JUDY
+#include <Judy.h>
+#endif
+
 /* This number is used as limit while comparing two fuzzy hashes, this value can vary from 0 to 100 */
 #define LEV_LIMIT 99
 /* This number is used as limit while we are making decision to write new hash file or not */
 /* Number of insuccessfull bind retries */
 #define MAX_RETRIES 40
 /* Weight of hash to consider it frequent */
-#define FREQUENT_SCORE 100
+#define DEFAULT_FREQUENT_SCORE 100
 
 static GQueue                  *hashes[BUCKETS];
 static GQueue                  *frequent;
+#ifdef WITH_JUDY
+static gpointer                 jtree;
+static gboolean                 use_judy = FALSE;
+#endif
 static bloom_filter_t          *bf;
 
 /* Number of cache modifications */
 static uint32_t                 mods = 0;
+/* Frequent score number */
+static uint32_t                 frequent_score = DEFAULT_FREQUENT_SCORE;
 /* For evtimer */
 static struct timeval           tmv;
 static struct event             tev;
@@ -109,6 +119,10 @@ sync_cache (struct rspamd_worker *wrk)
        GList                          *cur, *tmp;
        struct rspamd_fuzzy_node       *node;
        uint64_t                        expire, now;
+#ifdef WITH_JUDY
+       PPvoid_t                        pvalue;
+       char                            indexbuf[1024], tmpindex[1024];
+#endif
 
        /* Check for modifications */
        if (mods < MOD_LIMIT) {
@@ -128,13 +142,6 @@ sync_cache (struct rspamd_worker *wrk)
                expire = DEFAULT_EXPIRE;
        }
        
-       /* Sync section */
-       if ((fd = open (filename, O_WRONLY)) != -1) {
-               /* Aqquire a lock */
-               (void)lock_file (fd, FALSE);
-               (void)unlock_file (fd, FALSE);
-       }
-
        if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) {
                msg_err ("cannot create hash file %s: %s", filename, strerror (errno));
                return;
@@ -143,6 +150,33 @@ sync_cache (struct rspamd_worker *wrk)
        (void)lock_file (fd, FALSE);
 
        now = (uint64_t) time (NULL);
+
+#ifdef WITH_JUDY
+       if (use_judy) {
+               indexbuf[0] = '\0';
+               pvalue = JudySLFirst (jtree, indexbuf, PJE0);
+               while (pvalue) {
+                       node = *((struct rspamd_fuzzy_node **)pvalue);
+                       if (now - node->time > expire) {
+                               /* Remove expired item */
+                               g_strlcpy (tmpindex, indexbuf, sizeof (tmpindex));
+                               pvalue = JudySLNext (jtree, tmpindex, PJE0);
+                               JudySLDel (&jtree, indexbuf, PJE0);
+                               g_strlcpy (indexbuf, tmpindex, sizeof (indexbuf));
+                               bloom_del (bf, node->h.hash_pipe);
+                               server_stat->fuzzy_hashes_expired ++;
+                               server_stat->fuzzy_hashes --;
+                               g_free (node);
+                               continue;
+                       }
+                       if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
+                               msg_err ("cannot write file %s: %s", filename, strerror (errno));
+                       }
+                       pvalue = JudySLNext (jtree, indexbuf, PJE0);
+               }
+       }
+       else {
+#endif
        cur = frequent->head;
        while (cur) {
                node = cur->data;
@@ -172,6 +206,9 @@ sync_cache (struct rspamd_worker *wrk)
                        cur = g_list_next (cur);
                }
        }
+#ifdef WITH_JUDY
+       }
+#endif
 
        (void)unlock_file (fd, FALSE);
        close (fd);
@@ -219,11 +256,21 @@ read_hashes_file (struct rspamd_worker *wrk)
        struct stat                     st;
        char                           *filename;
        struct rspamd_fuzzy_node       *node;
+#ifdef WITH_JUDY
+       PPvoid_t                         pvalue;
 
+       if (use_judy) {
+               jtree = NULL;
+       }
+       else {
+#endif
        for (i = 0; i < BUCKETS; i++) {
                hashes[i] = g_queue_new ();
        }
        frequent = g_queue_new ();
+#ifdef WITH_JUDY
+       }
+#endif
 
        filename = g_hash_table_lookup (wrk->cf->params, "hashfile");
        if (filename == NULL) {
@@ -245,21 +292,37 @@ read_hashes_file (struct rspamd_worker *wrk)
                if (r != sizeof (struct rspamd_fuzzy_node)) {
                        break;
                }
-               if (node->value > FREQUENT_SCORE) {
+#ifdef WITH_JUDY
+               if (use_judy) {
+                       pvalue = JudySLIns (&jtree, node->h.hash_pipe, PJE0);
+                       *pvalue = node;
+               }
+               else {
+#endif
+               if (node->value > frequent_score) {
                        g_queue_push_head (frequent, node);
                }
                else {
                        g_queue_push_head (hashes[node->h.block_size % BUCKETS], node);
                }
+#ifdef WITH_JUDY
+               }
+#endif
                bloom_add (bf, node->h.hash_pipe);
                server_stat->fuzzy_hashes ++;
        }
 
+#ifdef WITH_JUDY
+       if (!use_judy) {
+#endif
        /* Sort everything */
        g_queue_sort (frequent, compare_nodes, NULL);
        for (i = 0; i < BUCKETS; i ++) {
                g_queue_sort (hashes[i], compare_nodes, NULL);
        }
+#ifdef WITH_JUDY
+       }
+#endif
 
        (void)unlock_file (fd, FALSE);
        close (fd);
@@ -281,7 +344,22 @@ check_hash_node (GQueue *hash, fuzzy_hash_t *s, int update_value)
        GList                          *cur;
        struct rspamd_fuzzy_node       *h;
        int                             prob = 0;
-       
+#ifdef WITH_JUDY
+       PPvoid_t                         pvalue;
+
+       if (use_judy) {
+               pvalue = JudySLGet (jtree, s->hash_pipe, PJE0);
+               if (pvalue != NULL) {
+                       h = *((struct rspamd_fuzzy_node **)pvalue);
+                       msg_info ("fuzzy hash was found in judy tree");
+                       if (update_value) {
+                               h->value += update_value;
+                       }
+                       return h->value;
+               }
+       }
+       else {
+#endif
        cur = frequent->head;
        while (cur) {
                h = cur->data;
@@ -305,7 +383,7 @@ check_hash_node (GQueue *hash, fuzzy_hash_t *s, int update_value)
                                h->value += update_value;
                                msg_info ("new hash weight: %d", h->value);
                        }
-                       if (h->value > FREQUENT_SCORE) {
+                       if (h->value > frequent_score) {
                                g_queue_unlink (hash, cur);
                                g_queue_push_head_link (frequent, cur);
                                msg_info ("moved hash to frequent list");
@@ -314,6 +392,9 @@ check_hash_node (GQueue *hash, fuzzy_hash_t *s, int update_value)
                }
                cur = g_list_next (cur);
        }
+#ifdef WITH_JUDY
+       }
+#endif
 
        return 0;
 }
@@ -336,12 +417,10 @@ process_check_command (struct fuzzy_cmd *cmd)
 static                          gboolean
 update_hash (struct fuzzy_cmd *cmd)
 {
-       GList                          *cur;
        fuzzy_hash_t                    s;
 
        memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
        s.block_size = cmd->blocksize;
-       cur = hashes[cmd->blocksize % BUCKETS]->head;
 
        return check_hash_node (hashes[cmd->blocksize % BUCKETS], &s, cmd->value);
 }
@@ -350,6 +429,9 @@ static                          gboolean
 process_write_command (struct fuzzy_cmd *cmd)
 {
        struct rspamd_fuzzy_node       *h;
+#ifdef WITH_JUDY
+       PPvoid_t                         pvalue;
+#endif
 
        if (bloom_check (bf, cmd->hash)) {
                if (update_hash (cmd)) {
@@ -361,7 +443,19 @@ process_write_command (struct fuzzy_cmd *cmd)
        memcpy (&h->h.hash_pipe, &cmd->hash, sizeof (cmd->hash));
        h->h.block_size = cmd->blocksize;
        h->time = (uint64_t) time (NULL);
+       h->value = cmd->value;
+#ifdef WITH_JUDY
+       if (use_judy) {
+               pvalue = JudySLIns (&jtree, h->h.hash_pipe, PJE0);
+               *pvalue = h;
+       }
+       else {
+#endif
+
        g_queue_push_head (hashes[cmd->blocksize % BUCKETS], h);
+#ifdef WITH_JUDY
+       }
+#endif
        bloom_add (bf, cmd->hash);
        mods++;
        server_stat->fuzzy_hashes ++;
@@ -376,7 +470,18 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s)
        GList                          *cur, *tmp;
        struct rspamd_fuzzy_node       *h;
        gboolean                        res = FALSE;
-       
+#ifdef WITH_JUDY
+       PPvoid_t                         pvalue;
+
+       if (use_judy) {
+               pvalue = JudySLGet (jtree, s->hash_pipe, PJE0);
+               if (pvalue) {
+                       res = JudySLDel (&jtree, s->hash_pipe, PJE0);
+                       g_free (*pvalue);
+               }
+       }
+       else {
+#endif
        cur = hash->head;
 
        /* XXX: too slow way */
@@ -396,6 +501,9 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s)
                }
                cur = g_list_next (cur);
        }
+#ifdef WITH_JUDY
+       }
+#endif
 
        return res;
 
@@ -413,7 +521,12 @@ process_delete_command (struct fuzzy_cmd *cmd)
 
        memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
        s.block_size = cmd->blocksize;
-
+#ifdef WITH_JUDY
+       if (use_judy) {
+               return delete_hash (NULL, &s);
+       }
+       else {
+#endif
        res = delete_hash (frequent, &s);
        if (!res) {
                res = delete_hash (hashes[cmd->blocksize % BUCKETS], &s);
@@ -421,6 +534,9 @@ process_delete_command (struct fuzzy_cmd *cmd)
        else {
                (void)delete_hash (hashes[cmd->blocksize % BUCKETS], &s);
        }
+#ifdef WITH_JUDY
+       }
+#endif
 
        return res;
 }
@@ -532,6 +648,7 @@ start_fuzzy_storage (struct rspamd_worker *worker)
        struct sigaction                signals;
        struct event                    sev;
        int                             retries = 0;
+       char                           *value;
 
        worker->srv->pid = getpid ();
        worker->srv->type = TYPE_FUZZY;
@@ -548,6 +665,17 @@ start_fuzzy_storage (struct rspamd_worker *worker)
        signal_add (&worker->sig_ev, NULL);
        signal_set (&sev, SIGTERM, sigterm_handler, (void *)worker);
        signal_add (&sev, NULL);
+       /* Get params */
+       if ((value = g_hash_table_lookup (worker->cf->params, "frequent_score")) != NULL) {
+               frequent_score = strtol (value, NULL, 10);
+       }
+       if ((value = g_hash_table_lookup (worker->cf->params, "use_judy")) != NULL) {
+#ifdef WITH_JUDY
+               use_judy = TRUE;
+#else
+               msg_err ("cannot use judy storage as judy support is not compiled in");
+#endif
+       }
 
        /* Init bloom filter */
        bf = bloom_create (20000000L, DEFAULT_BLOOM_HASHES);
index 49ac3da861d12318d158efacff9d4be756eac9ea..fd250781dbcf8370a9c3a23400f981f34a846a5f 100644 (file)
@@ -493,10 +493,18 @@ spawn_workers (struct rspamd_main *rspamd, gboolean make_sockets)
                        }
                        cf->listen_sock = listen_sock;
                }
-
-               for (i = 0; i < cf->count; i++) {
+               
+               if (cf->type == TYPE_FUZZY) {
+                       if (cf->count > 1) {
+                               msg_err ("cannot spawn more than 1 fuzzy storage worker, so spawn one");
+                       }
                        fork_worker (rspamd, cf);
                }
+               else {
+                       for (i = 0; i < cf->count; i++) {
+                               fork_worker (rspamd, cf);
+                       }
+               }
 
                cur = g_list_next (cur);
        }
index c4c012fedfe19faf9da9b1a518078d702aa27d77..e79c4036896d22d5b675603c87803b2379a12c4e 100644 (file)
@@ -308,6 +308,7 @@ fuzzy_io_callback (int fd, short what, void *arg)
                goto ok;
        }
        else {
+               errno = ETIMEDOUT;
                goto err;       
        }
 
@@ -365,9 +366,14 @@ fuzzy_learn_callback (int fd, short what, void *arg)
                        rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
                        goto ok;
                }
-               goto err;
+               else {
+                       r = snprintf (buf, sizeof (buf), "ERR" CRLF);
+                       rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
+                       goto ok;
+               }
        }
        else {
+               errno = ETIMEDOUT;
                goto err;       
        }