diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2010-05-28 20:31:10 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2010-05-28 20:31:10 +0400 |
commit | ae3eb4dfd787052bebc732c3e37b56f0800e1aa2 (patch) | |
tree | 9ffbf987783e0802ed1c6b311e69b42beba0e142 | |
parent | 0c72b8d2de2081e8b8605c86bba3f743387f981f (diff) | |
download | rspamd-ae3eb4dfd787052bebc732c3e37b56f0800e1aa2.tar.gz rspamd-ae3eb4dfd787052bebc732c3e37b56f0800e1aa2.zip |
* New symbols sorter:
- add ability to have dynamic rules inside items cache
- make 3 types of rules: negative, dynamic and static
- make logic of cache more simple by using glib lists instead of arrays
- do checks of symbols in more logically correct way (negative->dynamic->static)
-rw-r--r-- | CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/filter.c | 25 | ||||
-rw-r--r-- | src/main.c | 20 | ||||
-rw-r--r-- | src/symbols_cache.c | 405 | ||||
-rw-r--r-- | src/symbols_cache.h | 49 |
5 files changed, 414 insertions, 87 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 09e4b1583..615cbb52f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,7 +7,7 @@ PROJECT(rspamd C) SET(RSPAMD_VERSION_MAJOR 0) SET(RSPAMD_VERSION_MINOR 3) -SET(RSPAMD_VERSION_PATCH 0) +SET(RSPAMD_VERSION_PATCH 1) SET(RSPAMD_VERSION "${RSPAMD_VERSION_MAJOR}.${RSPAMD_VERSION_MINOR}.${RSPAMD_VERSION_PATCH}") SET(RSPAMD_MASTER_SITE_URL "http://cebka.pp.ru/hg/rspamd") diff --git a/src/filter.c b/src/filter.c index e2d6d80b8..0a18bd793 100644 --- a/src/filter.c +++ b/src/filter.c @@ -50,7 +50,7 @@ insert_result (struct worker_task *task, const char *metric_name, const char *sy struct metric_result *metric_res; struct symbol *s; struct cache_item *item; - int i; + GList *cur; metric = g_hash_table_lookup (task->worker->srv->cfg->metrics, metric_name); if (metric == NULL) { @@ -99,12 +99,25 @@ insert_result (struct worker_task *task, const char *metric_name, const char *sy /* Process cache item */ if (metric->cache) { - for (i = 0; i < metric->cache->used_items; i++) { - item = &metric->cache->items[i]; + cur = metric->cache->static_items; + while (cur) + { + item = cur->data; - if (flag > 0 && strcmp (item->s->symbol, symbol) == 0) { + if (strcmp (item->s->symbol, symbol) == 0) { item->s->frequency++; } + cur = g_list_next (cur); + } + cur = metric->cache->negative_items; + while (cur) + { + item = cur->data; + + if (strcmp (item->s->symbol, symbol) == 0) { + item->s->frequency++; + } + cur = g_list_next (cur); } } } @@ -272,7 +285,7 @@ static int continue_process_filters (struct worker_task *task) { GList *cur = task->save.entry; - struct cache_item *item = task->save.item; + gpointer item = task->save.item; struct metric *metric = cur->data; @@ -306,7 +319,7 @@ process_filters (struct worker_task *task) { GList *cur; struct metric *metric; - struct cache_item *item = NULL; + gpointer item = NULL; if (task->save.saved) { task->save.saved = 0; diff --git a/src/main.c b/src/main.c index 844a76925..b05315c1c 100644 --- a/src/main.c +++ b/src/main.c @@ -704,7 +704,7 @@ static void print_metrics_cache (struct config_file *cfg) { struct metric *metric; - GList *l; + GList *l, *cur; struct cache_item *item; int i; @@ -718,12 +718,24 @@ print_metrics_cache (struct config_file *cfg) printf ("Cache for metric: %s\n", metric->name); printf ("-----------------------------------------------------------------\n"); printf ("| Pri | Symbol | Weight | Frequency | Avg. time |\n"); - for (i = 0; i < metric->cache->used_items; i++) { - item = &metric->cache->items[i]; + i = 0; + cur = metric->cache->negative_items; + while (cur) { + item = cur->data; printf ("-----------------------------------------------------------------\n"); printf ("| %3d | %22s | %6.1f | %9d | %9.3f |\n", i, item->s->symbol, item->s->weight, item->s->frequency, item->s->avg_time); - + cur = g_list_next (cur); + i ++; + } + cur = metric->cache->static_items; + while (cur) { + item = cur->data; + printf ("-----------------------------------------------------------------\n"); + printf ("| %3d | %22s | %6.1f | %9d | %9.3f |\n", i, item->s->symbol, item->s->weight, item->s->frequency, item->s->avg_time); + cur = g_list_next (cur); + i ++; } + printf ("-----------------------------------------------------------------\n"); } l = g_list_next (l); diff --git a/src/symbols_cache.c b/src/symbols_cache.c index 487efd15e..f1589226a 100644 --- a/src/symbols_cache.c +++ b/src/symbols_cache.c @@ -70,36 +70,31 @@ cache_logic_cmp (const void *p1, const void *p2) return (int)w2 - w1; } -static void -grow_cache (struct symbols_cache *cache) -{ - guint old = cache->cur_items, i; - void *new; - - cache->cur_items = cache->cur_items * 2; - new = g_new0 (struct cache_item, cache->cur_items); - memcpy (new, cache->items, old * sizeof (struct cache_item)); - g_free (cache->items); - cache->items = new; - - /* Create new saved_cache_items */ - for (i = old; i < cache->cur_items; i++) { - cache->items[i].s = g_new0 (struct saved_cache_item, 1); - } -} - static GChecksum * get_mem_cksum (struct symbols_cache *cache) { - int i; GChecksum *result; + GList *cur; + struct cache_item *item; result = g_checksum_new (G_CHECKSUM_SHA1); - for (i = 0; i < cache->used_items; i++) { - if (cache->items[i].s->symbol[0] != '\0') { - g_checksum_update (result, cache->items[i].s->symbol, strlen (cache->items[i].s->symbol)); + cur = g_list_first (cache->negative_items); + while (cur) { + item = cur->data; + if (item->s->symbol[0] != '\0') { + g_checksum_update (result, item->s->symbol, strlen (item->s->symbol)); + } + cur = g_list_next (cur); + } + cur = g_list_first (cache->static_items); + while (cur) { + item = cur->data; + if (item->s->symbol[0] != '\0') { + g_checksum_update (result, item->s->symbol, strlen (item->s->symbol)); } + total_frequency += item->s->frequency; + cur = g_list_next (cur); } return result; @@ -109,15 +104,26 @@ get_mem_cksum (struct symbols_cache *cache) static void post_cache_init (struct symbols_cache *cache) { - int i; + GList *cur; + struct cache_item *item; total_frequency = 0; nsymbols = cache->used_items; - for (i = 0; i < cache->used_items; i++) { - total_frequency += cache->items[i].s->frequency; + cur = g_list_first (cache->negative_items); + while (cur) { + item = cur->data; + total_frequency += item->s->frequency; + cur = g_list_next (cur); + } + cur = g_list_first (cache->static_items); + while (cur) { + item = cur->data; + total_frequency += item->s->frequency; + cur = g_list_next (cur); } - qsort (cache->items, cache->used_items, sizeof (struct cache_item), cache_logic_cmp); + cache->negative_items = g_list_sort (cache->negative_items, cache_logic_cmp); + cache->static_items = g_list_sort (cache->static_items, cache_logic_cmp); } /* Unmap cache file */ @@ -135,6 +141,8 @@ mmap_cache_file (struct symbols_cache *cache, int fd, memory_pool_t *pool) { void *map; int i; + GList *cur; + struct cache_item *item; map = mmap (NULL, cache->used_items * sizeof (struct saved_cache_item), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (map == MAP_FAILED) { @@ -146,11 +154,22 @@ mmap_cache_file (struct symbols_cache *cache, int fd, memory_pool_t *pool) close (fd); cache->map = map; /* Now free old values for saved cache items and fill them with mmapped ones */ - for (i = 0; i < cache->used_items; i++) { - g_free (cache->items[i].s); - cache->items[i].s = ((struct saved_cache_item *)map) + i; + i = 0; + cur = g_list_first (cache->negative_items); + while (cur) { + item = cur->data; + item->s = ((struct saved_cache_item *)map) + i; + cur = g_list_next (cur); + i ++; } - + cur = g_list_first (cache->static_items); + while (cur) { + item = cur->data; + item->s = ((struct saved_cache_item *)map) + i; + cur = g_list_next (cur); + i ++; + } + post_cache_init (cache); return TRUE; @@ -160,10 +179,11 @@ mmap_cache_file (struct symbols_cache *cache, int fd, memory_pool_t *pool) static gboolean create_cache_file (struct symbols_cache *cache, const char *filename, int fd, memory_pool_t *pool) { - int i; GChecksum *cksum; u_char *digest; gsize cklen; + GList *cur; + struct cache_item *item; /* Calculate checksum */ cksum = get_mem_cksum (cache); @@ -178,14 +198,29 @@ create_cache_file (struct symbols_cache *cache, const char *filename, int fd, me g_checksum_get_digest (cksum, digest, &cklen); /* Now write data to file */ - for (i = 0; i < cache->used_items; i++) { - if (write (fd, cache->items[i].s, sizeof (struct saved_cache_item)) == -1) { + cur = g_list_first (cache->negative_items); + while (cur) { + item = cur->data; + if (write (fd, &item->s, sizeof (struct saved_cache_item)) == -1) { + msg_err ("cannot write to file %d, %s", errno, strerror (errno)); + close (fd); + g_checksum_free (cksum); + g_free (digest); + return FALSE; + } + cur = g_list_next (cur); + } + cur = g_list_first (cache->static_items); + while (cur) { + item = cur->data; + if (write (fd, &item->s, sizeof (struct saved_cache_item)) == -1) { msg_err ("cannot write to file %d, %s", errno, strerror (errno)); close (fd); g_checksum_free (cksum); g_free (digest); return FALSE; } + cur = g_list_next (cur); } /* Write checksum */ if (write (fd, digest, cklen) == -1) { @@ -212,55 +247,143 @@ void register_symbol (struct symbols_cache **cache, const char *name, double weight, symbol_func_t func, gpointer user_data) { struct cache_item *item = NULL; - int i; + struct symbols_cache *pcache = *cache; + GList **target; if (*cache == NULL) { - *cache = g_new0 (struct symbols_cache, 1); + pcache = g_new0 (struct symbols_cache, 1); + *cache = pcache; + pcache->static_pool = memory_pool_new (memory_pool_get_size ()); } - if ((*cache)->items == NULL) { - (*cache)->cur_items = MIN_CACHE; - (*cache)->used_items = 0; - (*cache)->items = g_new0 (struct cache_item, (*cache)->cur_items); - for (i = 0; i < (*cache)->cur_items; i++) { - (*cache)->items[i].s = g_new0 (struct saved_cache_item, 1); - } + + if (weight > 0) { + target = &(*cache)->static_items; } - - if ((*cache)->used_items >= (*cache)->cur_items) { - grow_cache (*cache); - /* Call once more */ - register_symbol (cache, name, weight, func, user_data); - return; + else { + target = &(*cache)->negative_items; } + + item = memory_pool_alloc0 (pcache->static_pool, sizeof (struct cache_item)); + item->s = memory_pool_alloc (pcache->static_pool, sizeof (struct saved_cache_item)); + g_strlcpy (item->s->symbol, name, sizeof (item->s->symbol)); + item->func = func; + item->user_data = user_data; + item->s->weight = weight; + pcache->used_items++; + msg_debug ("used items: %d, added symbol: %s", (*cache)->used_items, name); + set_counter (item->s->symbol, 0); - item = &(*cache)->items[(*cache)->used_items]; + *target = g_list_prepend (*target, item); +} +void +register_dynamic_symbol (struct symbols_cache **cache, const char *name, double weight, symbol_func_t func, + gpointer user_data, struct dynamic_map_item *networks, gsize network_count) +{ + struct cache_item *item = NULL; + struct symbols_cache *pcache = *cache; + GList **target, *t; + gsize i; + uintptr_t r; + uint32_t mask = 0xFFFFFFFF; + + if (*cache == NULL) { + pcache = g_new0 (struct symbols_cache, 1); + *cache = pcache; + pcache->static_pool = memory_pool_new (memory_pool_get_size ()); + } + + if (pcache->dynamic_pool == NULL) { + pcache->dynamic_pool = memory_pool_new (memory_pool_get_size ()); + } + item = memory_pool_alloc0 (pcache->dynamic_pool, sizeof (struct cache_item)); + item->s = memory_pool_alloc (pcache->dynamic_pool, sizeof (struct saved_cache_item)); g_strlcpy (item->s->symbol, name, sizeof (item->s->symbol)); item->func = func; item->user_data = user_data; item->s->weight = weight; - (*cache)->used_items++; + item->is_dynamic = TRUE; + + pcache->used_items++; msg_debug ("used items: %d, added symbol: %s", (*cache)->used_items, name); set_counter (item->s->symbol, 0); + + if (network_count == 0 || networks == NULL) { + target = &pcache->dynamic_items; + } + else { + if (pcache->dynamic_map == NULL) { + pcache->dynamic_map = radix_tree_create (); + } + for (i = 0; i < network_count; i ++) { + mask = mask << (32 - networks[i].mask); + r = ntohl (networks[i].addr.s_addr & mask); + if ((r = radix32tree_find (pcache->dynamic_map, r)) != RADIX_NO_VALUE) { + t = (GList *)((gpointer)r); + target = &t; + } + else { + t = g_list_prepend (NULL, item); + memory_pool_add_destructor (pcache->dynamic_pool, (pool_destruct_func)g_list_free, t); + r = radix32tree_insert (pcache->dynamic_map, ntohl (networks[i].addr.s_addr), mask, (uintptr_t)t); + if (r == -1) { + msg_warn ("cannot insert ip to tree: %s, mask %X", inet_ntoa (networks[i].addr), mask); + } + else if (r == 1) { + msg_warn ("ip %s, mask %X, value already exists", inet_ntoa (networks[i].addr), mask); + } + return; + } + } + } + *target = g_list_prepend (*target, item); +} + +void +remove_dynamic_items (struct symbols_cache *cache) +{ + if (cache->dynamic_items) { + g_list_free (cache->dynamic_items); + cache->dynamic_items = NULL; + } + + if (cache->dynamic_map) { + radix_tree_free (cache->dynamic_map); + } + + /* Do magic */ + memory_pool_delete (cache->dynamic_pool); + cache->dynamic_pool = NULL; } static void free_cache (gpointer arg) { struct symbols_cache *cache = arg; - int i; - if (cache->map == NULL) { - /* Free items in memory, otherwise */ - for (i = 0; i < cache->cur_items; i++) { - g_free (cache->items[i].s); - } - } - else { + if (cache->map != NULL) { unmap_cache_file (cache); } + if (cache->static_items) { + g_list_free (cache->static_items); + } + if (cache->negative_items) { + g_list_free (cache->negative_items); + } + if (cache->dynamic_items) { + g_list_free (cache->dynamic_items); + } + if (cache->dynamic_map) { + radix_tree_free (cache->dynamic_map); + } + + memory_pool_delete (cache->static_pool); + if (cache->dynamic_pool) { + memory_pool_delete (cache->dynamic_pool); + } + g_free (cache); } @@ -274,13 +397,10 @@ init_symbols_cache (memory_pool_t * pool, struct symbols_cache *cache, const cha gsize cklen; gboolean res; - if (cache == NULL || cache->items == NULL) { + if (cache == NULL) { return FALSE; } - /* Sort items in cache */ - qsort (cache->items, cache->used_items, sizeof (struct cache_item), cache_cmp); - /* Init locking */ cache->lock = memory_pool_get_rwlock (pool); @@ -290,7 +410,6 @@ init_symbols_cache (memory_pool_t * pool, struct symbols_cache *cache, const cha return TRUE; } - /* First of all try to stat file */ if (stat (filename, &st) == -1) { /* Check errno */ @@ -371,14 +490,45 @@ init_symbols_cache (memory_pool_t * pool, struct symbols_cache *cache, const cha return res; } +static GList * +check_dynamic_item (struct worker_task *task, struct symbols_cache *cache) +{ + GList *res = NULL; + uintptr_t r; + + if (cache->dynamic_map != NULL && task->from_addr.s_addr != INADDR_NONE) { + if ((r = radix32tree_find (cache->dynamic_map, ntohl (task->from_addr.s_addr))) != RADIX_NO_VALUE) { + res = (GList *)((gpointer)r); + return res; + } + else { + return NULL; + } + } + + return res; +} + +struct symbol_callback_data { + enum { + CACHE_STATE_NEGATIVE, + CACHE_STATE_DYNAMIC_MAP, + CACHE_STATE_DYNAMIC, + CACHE_STATE_STATIC + } state; + struct cache_item *saved_item; + GList *list_pointer; +}; + gboolean -call_symbol_callback (struct worker_task * task, struct symbols_cache * cache, struct cache_item ** saved_item) +call_symbol_callback (struct worker_task * task, struct symbols_cache * cache, gpointer *save) { struct timespec ts1, ts2; uint64_t diff; struct cache_item *item; + struct symbol_callback_data *s = *save; - if (*saved_item == NULL) { + if (s == NULL) { if (cache == NULL) { return FALSE; } @@ -390,20 +540,127 @@ call_symbol_callback (struct worker_task * task, struct symbols_cache * cache, s post_cache_init (cache); memory_pool_wunlock_rwlock (cache->lock); } - item = &cache->items[0]; + s = memory_pool_alloc0 (task->task_pool, sizeof (struct symbol_callback_data)); + *save = s; + if (cache->negative_items != NULL) { + s->list_pointer = g_list_first (cache->negative_items); + s->saved_item = s->list_pointer->data; + s->state = CACHE_STATE_NEGATIVE; + } + else if ((s->list_pointer = check_dynamic_item (task, cache)) || cache->dynamic_items != NULL) { + if (s->list_pointer == NULL) { + s->list_pointer = g_list_first (cache->dynamic_items); + s->saved_item = s->list_pointer->data; + s->state = CACHE_STATE_DYNAMIC; + } + else { + s->saved_item = s->list_pointer->data; + s->state = CACHE_STATE_DYNAMIC_MAP; + } + } + else { + s->state = CACHE_STATE_STATIC; + s->list_pointer = g_list_first (cache->static_items); + if (s->list_pointer) { + s->saved_item = s->list_pointer->data; + } + else { + return FALSE; + } + } + item = s->saved_item; } else { if (cache == NULL) { return FALSE; } - /* Next pointer */ - if (*saved_item - cache->items >= cache->used_items - 1) { - /* No more items in cache */ - return FALSE; + switch (s->state) { + case CACHE_STATE_NEGATIVE: + s->list_pointer = g_list_next (s->list_pointer); + if (s->list_pointer == NULL) { + if ((s->list_pointer = check_dynamic_item (task, cache)) || cache->dynamic_items != NULL) { + if (s->list_pointer == NULL) { + s->list_pointer = g_list_first (cache->dynamic_items); + s->saved_item = s->list_pointer->data; + s->state = CACHE_STATE_DYNAMIC; + } + else { + s->saved_item = s->list_pointer->data; + s->state = CACHE_STATE_DYNAMIC_MAP; + } + } + else { + s->state = CACHE_STATE_STATIC; + s->list_pointer = g_list_first (cache->static_items); + if (s->list_pointer) { + s->saved_item = s->list_pointer->data; + } + else { + return FALSE; + } + } + } + else { + s->saved_item = s->list_pointer->data; + } + item = s->saved_item; + break; + case CACHE_STATE_DYNAMIC_MAP: + s->list_pointer = g_list_next (s->list_pointer); + if (s->list_pointer == NULL) { + s->list_pointer = g_list_first (cache->dynamic_items); + if (s->list_pointer) { + s->saved_item = s->list_pointer->data; + s->state = CACHE_STATE_DYNAMIC; + } + else { + s->state = CACHE_STATE_STATIC; + s->list_pointer = g_list_first (cache->static_items); + if (s->list_pointer) { + s->saved_item = s->list_pointer->data; + } + else { + return FALSE; + } + } + } + else { + s->saved_item = s->list_pointer->data; + } + item = s->saved_item; + break; + case CACHE_STATE_DYNAMIC: + s->list_pointer = g_list_next (s->list_pointer); + if (s->list_pointer == NULL) { + s->state = CACHE_STATE_STATIC; + s->list_pointer = g_list_first (cache->static_items); + if (s->list_pointer) { + s->saved_item = s->list_pointer->data; + } + else { + return FALSE; + } + } + else { + s->saved_item = s->list_pointer->data; + } + item = s->saved_item; + break; + case CACHE_STATE_STATIC: + /* Next pointer */ + s->list_pointer = g_list_next (s->list_pointer); + if (s->list_pointer) { + s->saved_item = s->list_pointer->data; + } + else { + return FALSE; + } + item = s->saved_item; + break; } - memory_pool_rlock_rwlock (cache->lock); - item = *saved_item + 1; - memory_pool_runlock_rwlock (cache->lock); + } + if (!item) { + return FALSE; } if (check_view (task->cfg->views, item->s->symbol, task)) { #ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID @@ -427,7 +684,7 @@ call_symbol_callback (struct worker_task * task, struct symbols_cache * cache, s item->s->avg_time = set_counter (item->s->symbol, diff); } - *saved_item = item; + s->saved_item = item; return TRUE; diff --git a/src/symbols_cache.h b/src/symbols_cache.h index 246205260..af92b4891 100644 --- a/src/symbols_cache.h +++ b/src/symbols_cache.h @@ -2,6 +2,7 @@ #define RSPAMD_SYMBOLS_CACHE_H #include "config.h" +#include "radix.h" #define MAX_SYMBOL 128 @@ -16,14 +17,42 @@ struct saved_cache_item { double avg_time; }; +struct dynamic_map_item { + struct in_addr addr; + uint32_t mask; +}; + struct cache_item { + /* Static item's data */ struct saved_cache_item *s; + + /* For dynamic rules */ + struct dynamic_map_item *networks; + uint32_t networks_number; + gboolean is_dynamic; + + /* Callback data */ symbol_func_t func; gpointer user_data; }; + struct symbols_cache { - struct cache_item *items; + /* Normal cache items */ + GList *static_items; + + /* Items that have negative weights */ + GList *negative_items; + + /* Radix map of dynamic rules with ip mappings */ + radix_tree_t *dynamic_map; + + /* Common dynamic rules */ + GList *dynamic_items; + + memory_pool_t *static_pool; + memory_pool_t *dynamic_pool; + guint cur_items; guint used_items; guint uses; @@ -45,11 +74,27 @@ gboolean init_symbols_cache (memory_pool_t *pool, struct symbols_cache *cache, c void register_symbol (struct symbols_cache **cache, const char *name, double weight, symbol_func_t func, gpointer user_data); /** + * Register function for dynamic symbols parsing + * @param name name of symbol + * @param func pointer to handler + * @param user_data pointer to user_data + */ +void register_dynamic_symbol (struct symbols_cache **cache, const char *name, double weight, symbol_func_t func, + gpointer user_data, struct dynamic_map_item *networks, gsize network_count); + +/** * Call function for cached symbol using saved callback * @param task task object * @param cache symbols cache * @param saved_item pointer to currently saved item */ -gboolean call_symbol_callback (struct worker_task *task, struct symbols_cache *cache, struct cache_item **saved_item); +gboolean call_symbol_callback (struct worker_task *task, struct symbols_cache *cache, gpointer *save); + +/** + * Remove all dynamic rules from cache + * @param cache symbols cache + */ +void remove_dynamic_rules (struct symbols_cache *cache); + #endif |