From 7a4c7de7035499eee2d68d525dc003feb18a749b Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Fri, 11 Aug 2017 19:04:11 +0100 Subject: [PATCH] [Feature] Add preliminary support of idempotent symbols --- src/libserver/symbols_cache.c | 79 +++++++++++++++++++++++++++++++++-- src/libserver/symbols_cache.h | 1 + src/libserver/task.c | 9 +++- src/libserver/task.h | 6 ++- 4 files changed, 88 insertions(+), 7 deletions(-) diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c index 1c18c5559..4240ba652 100644 --- a/src/libserver/symbols_cache.c +++ b/src/libserver/symbols_cache.c @@ -67,6 +67,7 @@ struct symbols_cache { GPtrArray *prefilters; GPtrArray *postfilters; GPtrArray *composites; + GPtrArray *idempotent; GList *delayed_deps; GList *delayed_conditions; rspamd_mempool_t *static_pool; @@ -153,6 +154,8 @@ enum rspamd_cache_savepoint_stage { RSPAMD_CACHE_PASS_WAIT_FILTERS, RSPAMD_CACHE_PASS_POSTFILTERS, RSPAMD_CACHE_PASS_WAIT_POSTFILTERS, + RSPAMD_CACHE_PASS_IDEMPOTENT, + RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT, RSPAMD_CACHE_PASS_DONE, }; @@ -360,7 +363,9 @@ rspamd_symbols_cache_resort (struct symbols_cache *cache) it = g_ptr_array_index (cache->items_by_id, i); total_hits += it->st->total_hits; - if (!(it->type & (SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER|SYMBOL_TYPE_COMPOSITE))) { + if (!(it->type & (SYMBOL_TYPE_PREFILTER| + SYMBOL_TYPE_POSTFILTER| + SYMBOL_TYPE_COMPOSITE))) { g_ptr_array_add (ord->d, it); } } @@ -468,6 +473,7 @@ rspamd_symbols_cache_post_init (struct symbols_cache *cache) g_ptr_array_sort_with_data (cache->prefilters, prefilters_cmp, cache); g_ptr_array_sort_with_data (cache->postfilters, postfilters_cmp, cache); + g_ptr_array_sort_with_data (cache->idempotent, postfilters_cmp, cache); } static gboolean @@ -731,7 +737,8 @@ rspamd_symbols_cache_add_symbol (struct symbols_cache *cache, } if (type & (SYMBOL_TYPE_CLASSIFIER|SYMBOL_TYPE_CALLBACK| - SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER)) { + SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER| + SYMBOL_TYPE_IDEMPOTENT)) { type |= SYMBOL_TYPE_NOSTAT; } @@ -791,6 +798,9 @@ rspamd_symbols_cache_add_symbol (struct symbols_cache *cache, if (item->type & SYMBOL_TYPE_PREFILTER) { g_ptr_array_add (cache->prefilters, item); } + else if (item->type & SYMBOL_TYPE_IDEMPOTENT) { + g_ptr_array_add (cache->idempotent, item); + } else if (item->type & SYMBOL_TYPE_POSTFILTER) { g_ptr_array_add (cache->postfilters, item); } @@ -929,6 +939,7 @@ rspamd_symbols_cache_destroy (struct symbols_cache *cache) g_ptr_array_free (cache->items_by_id, TRUE); g_ptr_array_free (cache->prefilters, TRUE); g_ptr_array_free (cache->postfilters, TRUE); + g_ptr_array_free (cache->idempotent, TRUE); g_ptr_array_free (cache->composites, TRUE); REF_RELEASE (cache->items_by_order); @@ -953,6 +964,7 @@ rspamd_symbols_cache_new (struct rspamd_config *cfg) cache->items_by_id = g_ptr_array_new (); cache->prefilters = g_ptr_array_new (); cache->postfilters = g_ptr_array_new (); + cache->idempotent = g_ptr_array_new (); cache->composites = g_ptr_array_new (); cache->mtx = rspamd_mempool_get_mutex (cache->static_pool); cache->reload_time = cfg->cache_reload_time; @@ -1458,7 +1470,8 @@ rspamd_symbols_cache_make_checkpoint (struct rspamd_task *task, guint nitems; nitems = cache->items_by_id->len - cache->postfilters->len - - cache->prefilters->len - cache->composites->len; + cache->prefilters->len - cache->composites->len - + cache->idempotent->len; if (nitems != cache->items_by_order->d->len) { /* @@ -1835,6 +1848,66 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, } } + if (all_done) { + checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT; + + return TRUE; + } + + if (checkpoint->waitq->len == 0 || + stage == RSPAMD_TASK_STAGE_IDEMPOTENT) { + checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT; + } + + if (stage == RSPAMD_TASK_STAGE_IDEMPOTENT) { + return rspamd_symbols_cache_process_symbols (task, cache, stage); + } + + break; + + case RSPAMD_CACHE_PASS_IDEMPOTENT: + /* Check for postfilters */ + saved_priority = G_MININT; + + for (i = 0; i < (gint)cache->idempotent->len; i ++) { + item = g_ptr_array_index (cache->idempotent, i); + + if (!isset (checkpoint->processed_bits, item->id * 2) && + !isset (checkpoint->processed_bits, item->id * 2 + 1)) { + /* Check priorities */ + if (saved_priority == G_MININT) { + saved_priority = item->priority; + } + else { + if (item->priority > saved_priority && + rspamd_session_events_pending (task->s) > start_events_pending) { + /* + * Delay further checks as we have higher + * priority filters to be processed + */ + checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT; + return TRUE; + } + } + rspamd_symbols_cache_check_symbol (task, cache, item, + checkpoint, &total_microseconds); + } + } + checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT; + break; + + case RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT: + all_done = TRUE; + + for (i = 0; i < (gint)cache->idempotent->len; i ++) { + item = g_ptr_array_index (cache->idempotent, i); + + if (!isset (checkpoint->processed_bits, item->id * 2 + 1)) { + all_done = FALSE; + break; + } + } + if (all_done) { checkpoint->pass = RSPAMD_CACHE_PASS_DONE; diff --git a/src/libserver/symbols_cache.h b/src/libserver/symbols_cache.h index ade8d95b9..f6bbd599b 100644 --- a/src/libserver/symbols_cache.h +++ b/src/libserver/symbols_cache.h @@ -41,6 +41,7 @@ enum rspamd_symbol_type { SYMBOL_TYPE_PREFILTER = (1 << 9), SYMBOL_TYPE_POSTFILTER = (1 << 10), SYMBOL_TYPE_NOSTAT = (1 << 11), /* Skip as statistical symbol */ + SYMBOL_TYPE_IDEMPOTENT = (1 << 12), /* Symbol cannot change metric */ }; /** diff --git a/src/libserver/task.c b/src/libserver/task.c index 876ab51ca..4aa2e21ad 100644 --- a/src/libserver/task.c +++ b/src/libserver/task.c @@ -779,9 +779,14 @@ rspamd_task_process (struct rspamd_task *task, guint stages) } break; - case RSPAMD_TASK_STAGE_DONE: - /* Second run of composites processing */ + case RSPAMD_TASK_STAGE_IDEMPOTENT: + /* Second run of composites processing before idempotent filters */ rspamd_make_composites (task); + rspamd_symbols_cache_process_symbols (task, task->cfg->cache, + RSPAMD_TASK_STAGE_IDEMPOTENT); + break; + + case RSPAMD_TASK_STAGE_DONE: task->processed_stages |= RSPAMD_TASK_STAGE_DONE; break; diff --git a/src/libserver/task.h b/src/libserver/task.h index 1efa45309..194003e72 100644 --- a/src/libserver/task.h +++ b/src/libserver/task.h @@ -50,8 +50,9 @@ enum rspamd_task_stage { RSPAMD_TASK_STAGE_LEARN_PRE = (1 << 10), RSPAMD_TASK_STAGE_LEARN = (1 << 11), RSPAMD_TASK_STAGE_LEARN_POST = (1 << 12), - RSPAMD_TASK_STAGE_DONE = (1 << 13), - RSPAMD_TASK_STAGE_REPLIED = (1 << 14) + RSPAMD_TASK_STAGE_IDEMPOTENT = (1 << 13), + RSPAMD_TASK_STAGE_DONE = (1 << 14), + RSPAMD_TASK_STAGE_REPLIED = (1 << 15) }; #define RSPAMD_TASK_PROCESS_ALL (RSPAMD_TASK_STAGE_CONNECT | \ @@ -67,6 +68,7 @@ enum rspamd_task_stage { RSPAMD_TASK_STAGE_LEARN_PRE | \ RSPAMD_TASK_STAGE_LEARN | \ RSPAMD_TASK_STAGE_LEARN_POST | \ + RSPAMD_TASK_STAGE_IDEMPOTENT | \ RSPAMD_TASK_STAGE_DONE) #define RSPAMD_TASK_PROCESS_LEARN (RSPAMD_TASK_STAGE_CONNECT | \ RSPAMD_TASK_STAGE_ENVELOPE | \ -- 2.39.5