From 436f77da87ab5b250b573ff4369bf029d9003cc0 Mon Sep 17 00:00:00 2001 From: "cebka@lenovo-laptop" Date: Mon, 22 Mar 2010 18:41:35 +0300 Subject: [PATCH] * Add support for Judy storage for fuzzy hashes --- CMakeLists.txt | 34 ++++---- src/fuzzy_storage.c | 158 ++++++++++++++++++++++++++++++++++---- src/main.c | 12 ++- src/plugins/fuzzy_check.c | 8 +- 4 files changed, 177 insertions(+), 35 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7e866f719..4d1d60ce0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -172,22 +172,22 @@ INCLUDE_DIRECTORIES("${LIBEVENT_INCLUDE}") LINK_DIRECTORIES("${LIBEVENT_PATH}") # Find libjudy -#FIND_LIBRARY(LIBJUDY_LIBRARY NAMES judy PATHS /lib -# /opt/lib -# /usr/lib -# /usr/local/lib -# DOC "Path where the lijudy library can be found") -#IF(LIBJUDY_LIBRARY) -# FIND_PATH(LIBJUDY_INCLUDE Judy.h PATHS /opt/include -# /usr/include -# /usr/local/include -# DOC "Path where the judy header files can be found") -# -# GET_FILENAME_COMPONENT(LIBJUDY_PATH "${LIBJUDY_LIBRARY}" PATH) -# INCLUDE_DIRECTORIES("${LIBJUDY_INCLUDE}") -# LINK_DIRECTORIES("${LIBJUDY_PATH}") -# SET(WITH_JUDY 1) -#ENDIF(LIBJUDY_LIBRARY) +FIND_LIBRARY(LIBJUDY_LIBRARY NAMES Judy PATHS /lib + /opt/lib + /usr/lib + /usr/local/lib + DOC "Path where the libjudy library can be found") +IF(LIBJUDY_LIBRARY) + FIND_PATH(LIBJUDY_INCLUDE Judy.h PATHS /opt/include + /usr/include + /usr/local/include + DOC "Path where the judy header files can be found") + + GET_FILENAME_COMPONENT(LIBJUDY_PATH "${LIBJUDY_LIBRARY}" PATH) + INCLUDE_DIRECTORIES("${LIBJUDY_INCLUDE}") + LINK_DIRECTORIES("${LIBJUDY_PATH}") + SET(WITH_JUDY 1) +ENDIF(LIBJUDY_LIBRARY) IF(ENABLE_PROFILING MATCHES "ON") SET(WITH_PROFILER 1) @@ -554,7 +554,7 @@ IF(LIBUTIL_LIBRARY) TARGET_LINK_LIBRARIES(rspamd util) ENDIF(LIBUTIL_LIBRARY) IF(LIBJUDY_LIBRARY) - TARGET_LINK_LIBRARIES(rspamd judy) + TARGET_LINK_LIBRARIES(rspamd Judy) ENDIF(LIBJUDY_LIBRARY) TARGET_LINK_LIBRARIES(rspamd rspamd_evdns) TARGET_LINK_LIBRARIES(rspamd event) diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 882130a04..bd9096af9 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -39,6 +39,10 @@ #include "bloom.h" #include "fuzzy_storage.h" +#ifdef WITH_JUDY +#include +#endif + /* This number is used as limit while comparing two fuzzy hashes, this value can vary from 0 to 100 */ #define LEV_LIMIT 99 /* This number is used as limit while we are making decision to write new hash file or not */ @@ -52,14 +56,20 @@ /* Number of insuccessfull bind retries */ #define MAX_RETRIES 40 /* Weight of hash to consider it frequent */ -#define FREQUENT_SCORE 100 +#define DEFAULT_FREQUENT_SCORE 100 static GQueue *hashes[BUCKETS]; static GQueue *frequent; +#ifdef WITH_JUDY +static gpointer jtree; +static gboolean use_judy = FALSE; +#endif static bloom_filter_t *bf; /* Number of cache modifications */ static uint32_t mods = 0; +/* Frequent score number */ +static uint32_t frequent_score = DEFAULT_FREQUENT_SCORE; /* For evtimer */ static struct timeval tmv; static struct event tev; @@ -109,6 +119,10 @@ sync_cache (struct rspamd_worker *wrk) GList *cur, *tmp; struct rspamd_fuzzy_node *node; uint64_t expire, now; +#ifdef WITH_JUDY + PPvoid_t pvalue; + char indexbuf[1024], tmpindex[1024]; +#endif /* Check for modifications */ if (mods < MOD_LIMIT) { @@ -128,13 +142,6 @@ sync_cache (struct rspamd_worker *wrk) expire = DEFAULT_EXPIRE; } - /* Sync section */ - if ((fd = open (filename, O_WRONLY)) != -1) { - /* Aqquire a lock */ - (void)lock_file (fd, FALSE); - (void)unlock_file (fd, FALSE); - } - if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) { msg_err ("cannot create hash file %s: %s", filename, strerror (errno)); return; @@ -143,6 +150,33 @@ sync_cache (struct rspamd_worker *wrk) (void)lock_file (fd, FALSE); now = (uint64_t) time (NULL); + +#ifdef WITH_JUDY + if (use_judy) { + indexbuf[0] = '\0'; + pvalue = JudySLFirst (jtree, indexbuf, PJE0); + while (pvalue) { + node = *((struct rspamd_fuzzy_node **)pvalue); + if (now - node->time > expire) { + /* Remove expired item */ + g_strlcpy (tmpindex, indexbuf, sizeof (tmpindex)); + pvalue = JudySLNext (jtree, tmpindex, PJE0); + JudySLDel (&jtree, indexbuf, PJE0); + g_strlcpy (indexbuf, tmpindex, sizeof (indexbuf)); + bloom_del (bf, node->h.hash_pipe); + server_stat->fuzzy_hashes_expired ++; + server_stat->fuzzy_hashes --; + g_free (node); + continue; + } + if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) { + msg_err ("cannot write file %s: %s", filename, strerror (errno)); + } + pvalue = JudySLNext (jtree, indexbuf, PJE0); + } + } + else { +#endif cur = frequent->head; while (cur) { node = cur->data; @@ -172,6 +206,9 @@ sync_cache (struct rspamd_worker *wrk) cur = g_list_next (cur); } } +#ifdef WITH_JUDY + } +#endif (void)unlock_file (fd, FALSE); close (fd); @@ -219,11 +256,21 @@ read_hashes_file (struct rspamd_worker *wrk) struct stat st; char *filename; struct rspamd_fuzzy_node *node; +#ifdef WITH_JUDY + PPvoid_t pvalue; + if (use_judy) { + jtree = NULL; + } + else { +#endif for (i = 0; i < BUCKETS; i++) { hashes[i] = g_queue_new (); } frequent = g_queue_new (); +#ifdef WITH_JUDY + } +#endif filename = g_hash_table_lookup (wrk->cf->params, "hashfile"); if (filename == NULL) { @@ -245,21 +292,37 @@ read_hashes_file (struct rspamd_worker *wrk) if (r != sizeof (struct rspamd_fuzzy_node)) { break; } - if (node->value > FREQUENT_SCORE) { +#ifdef WITH_JUDY + if (use_judy) { + pvalue = JudySLIns (&jtree, node->h.hash_pipe, PJE0); + *pvalue = node; + } + else { +#endif + if (node->value > frequent_score) { g_queue_push_head (frequent, node); } else { g_queue_push_head (hashes[node->h.block_size % BUCKETS], node); } +#ifdef WITH_JUDY + } +#endif bloom_add (bf, node->h.hash_pipe); server_stat->fuzzy_hashes ++; } +#ifdef WITH_JUDY + if (!use_judy) { +#endif /* Sort everything */ g_queue_sort (frequent, compare_nodes, NULL); for (i = 0; i < BUCKETS; i ++) { g_queue_sort (hashes[i], compare_nodes, NULL); } +#ifdef WITH_JUDY + } +#endif (void)unlock_file (fd, FALSE); close (fd); @@ -281,7 +344,22 @@ check_hash_node (GQueue *hash, fuzzy_hash_t *s, int update_value) GList *cur; struct rspamd_fuzzy_node *h; int prob = 0; - +#ifdef WITH_JUDY + PPvoid_t pvalue; + + if (use_judy) { + pvalue = JudySLGet (jtree, s->hash_pipe, PJE0); + if (pvalue != NULL) { + h = *((struct rspamd_fuzzy_node **)pvalue); + msg_info ("fuzzy hash was found in judy tree"); + if (update_value) { + h->value += update_value; + } + return h->value; + } + } + else { +#endif cur = frequent->head; while (cur) { h = cur->data; @@ -305,7 +383,7 @@ check_hash_node (GQueue *hash, fuzzy_hash_t *s, int update_value) h->value += update_value; msg_info ("new hash weight: %d", h->value); } - if (h->value > FREQUENT_SCORE) { + if (h->value > frequent_score) { g_queue_unlink (hash, cur); g_queue_push_head_link (frequent, cur); msg_info ("moved hash to frequent list"); @@ -314,6 +392,9 @@ check_hash_node (GQueue *hash, fuzzy_hash_t *s, int update_value) } cur = g_list_next (cur); } +#ifdef WITH_JUDY + } +#endif return 0; } @@ -336,12 +417,10 @@ process_check_command (struct fuzzy_cmd *cmd) static gboolean update_hash (struct fuzzy_cmd *cmd) { - GList *cur; fuzzy_hash_t s; memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe)); s.block_size = cmd->blocksize; - cur = hashes[cmd->blocksize % BUCKETS]->head; return check_hash_node (hashes[cmd->blocksize % BUCKETS], &s, cmd->value); } @@ -350,6 +429,9 @@ static gboolean process_write_command (struct fuzzy_cmd *cmd) { struct rspamd_fuzzy_node *h; +#ifdef WITH_JUDY + PPvoid_t pvalue; +#endif if (bloom_check (bf, cmd->hash)) { if (update_hash (cmd)) { @@ -361,7 +443,19 @@ process_write_command (struct fuzzy_cmd *cmd) memcpy (&h->h.hash_pipe, &cmd->hash, sizeof (cmd->hash)); h->h.block_size = cmd->blocksize; h->time = (uint64_t) time (NULL); + h->value = cmd->value; +#ifdef WITH_JUDY + if (use_judy) { + pvalue = JudySLIns (&jtree, h->h.hash_pipe, PJE0); + *pvalue = h; + } + else { +#endif + g_queue_push_head (hashes[cmd->blocksize % BUCKETS], h); +#ifdef WITH_JUDY + } +#endif bloom_add (bf, cmd->hash); mods++; server_stat->fuzzy_hashes ++; @@ -376,7 +470,18 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s) GList *cur, *tmp; struct rspamd_fuzzy_node *h; gboolean res = FALSE; - +#ifdef WITH_JUDY + PPvoid_t pvalue; + + if (use_judy) { + pvalue = JudySLGet (jtree, s->hash_pipe, PJE0); + if (pvalue) { + res = JudySLDel (&jtree, s->hash_pipe, PJE0); + g_free (*pvalue); + } + } + else { +#endif cur = hash->head; /* XXX: too slow way */ @@ -396,6 +501,9 @@ delete_hash (GQueue *hash, fuzzy_hash_t *s) } cur = g_list_next (cur); } +#ifdef WITH_JUDY + } +#endif return res; @@ -413,7 +521,12 @@ process_delete_command (struct fuzzy_cmd *cmd) memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe)); s.block_size = cmd->blocksize; - +#ifdef WITH_JUDY + if (use_judy) { + return delete_hash (NULL, &s); + } + else { +#endif res = delete_hash (frequent, &s); if (!res) { res = delete_hash (hashes[cmd->blocksize % BUCKETS], &s); @@ -421,6 +534,9 @@ process_delete_command (struct fuzzy_cmd *cmd) else { (void)delete_hash (hashes[cmd->blocksize % BUCKETS], &s); } +#ifdef WITH_JUDY + } +#endif return res; } @@ -532,6 +648,7 @@ start_fuzzy_storage (struct rspamd_worker *worker) struct sigaction signals; struct event sev; int retries = 0; + char *value; worker->srv->pid = getpid (); worker->srv->type = TYPE_FUZZY; @@ -548,6 +665,17 @@ start_fuzzy_storage (struct rspamd_worker *worker) signal_add (&worker->sig_ev, NULL); signal_set (&sev, SIGTERM, sigterm_handler, (void *)worker); signal_add (&sev, NULL); + /* Get params */ + if ((value = g_hash_table_lookup (worker->cf->params, "frequent_score")) != NULL) { + frequent_score = strtol (value, NULL, 10); + } + if ((value = g_hash_table_lookup (worker->cf->params, "use_judy")) != NULL) { +#ifdef WITH_JUDY + use_judy = TRUE; +#else + msg_err ("cannot use judy storage as judy support is not compiled in"); +#endif + } /* Init bloom filter */ bf = bloom_create (20000000L, DEFAULT_BLOOM_HASHES); diff --git a/src/main.c b/src/main.c index 49ac3da86..fd250781d 100644 --- a/src/main.c +++ b/src/main.c @@ -493,10 +493,18 @@ spawn_workers (struct rspamd_main *rspamd, gboolean make_sockets) } cf->listen_sock = listen_sock; } - - for (i = 0; i < cf->count; i++) { + + if (cf->type == TYPE_FUZZY) { + if (cf->count > 1) { + msg_err ("cannot spawn more than 1 fuzzy storage worker, so spawn one"); + } fork_worker (rspamd, cf); } + else { + for (i = 0; i < cf->count; i++) { + fork_worker (rspamd, cf); + } + } cur = g_list_next (cur); } diff --git a/src/plugins/fuzzy_check.c b/src/plugins/fuzzy_check.c index c4c012fed..e79c40368 100644 --- a/src/plugins/fuzzy_check.c +++ b/src/plugins/fuzzy_check.c @@ -308,6 +308,7 @@ fuzzy_io_callback (int fd, short what, void *arg) goto ok; } else { + errno = ETIMEDOUT; goto err; } @@ -365,9 +366,14 @@ fuzzy_learn_callback (int fd, short what, void *arg) rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE); goto ok; } - goto err; + else { + r = snprintf (buf, sizeof (buf), "ERR" CRLF); + rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE); + goto ok; + } } else { + errno = ETIMEDOUT; goto err; } -- 2.39.5