]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Add preliminary support of idempotent symbols
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 11 Aug 2017 18:04:11 +0000 (19:04 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 11 Aug 2017 18:04:33 +0000 (19:04 +0100)
src/libserver/symbols_cache.c
src/libserver/symbols_cache.h
src/libserver/task.c
src/libserver/task.h

index 1c18c5559325109eb43e836d6032984287d94aa4..4240ba652fa7cd13df2e312f22334431d199a965 100644 (file)
@@ -67,6 +67,7 @@ struct symbols_cache {
        GPtrArray *prefilters;
        GPtrArray *postfilters;
        GPtrArray *composites;
+       GPtrArray *idempotent;
        GList *delayed_deps;
        GList *delayed_conditions;
        rspamd_mempool_t *static_pool;
@@ -153,6 +154,8 @@ enum rspamd_cache_savepoint_stage {
        RSPAMD_CACHE_PASS_WAIT_FILTERS,
        RSPAMD_CACHE_PASS_POSTFILTERS,
        RSPAMD_CACHE_PASS_WAIT_POSTFILTERS,
+       RSPAMD_CACHE_PASS_IDEMPOTENT,
+       RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT,
        RSPAMD_CACHE_PASS_DONE,
 };
 
@@ -360,7 +363,9 @@ rspamd_symbols_cache_resort (struct symbols_cache *cache)
                it = g_ptr_array_index (cache->items_by_id, i);
                total_hits += it->st->total_hits;
 
-               if (!(it->type & (SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER|SYMBOL_TYPE_COMPOSITE))) {
+               if (!(it->type & (SYMBOL_TYPE_PREFILTER|
+                               SYMBOL_TYPE_POSTFILTER|
+                               SYMBOL_TYPE_COMPOSITE))) {
                        g_ptr_array_add (ord->d, it);
                }
        }
@@ -468,6 +473,7 @@ rspamd_symbols_cache_post_init (struct symbols_cache *cache)
 
        g_ptr_array_sort_with_data (cache->prefilters, prefilters_cmp, cache);
        g_ptr_array_sort_with_data (cache->postfilters, postfilters_cmp, cache);
+       g_ptr_array_sort_with_data (cache->idempotent, postfilters_cmp, cache);
 }
 
 static gboolean
@@ -731,7 +737,8 @@ rspamd_symbols_cache_add_symbol (struct symbols_cache *cache,
        }
 
        if (type & (SYMBOL_TYPE_CLASSIFIER|SYMBOL_TYPE_CALLBACK|
-                       SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER)) {
+                       SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER|
+                       SYMBOL_TYPE_IDEMPOTENT)) {
                type |= SYMBOL_TYPE_NOSTAT;
        }
 
@@ -791,6 +798,9 @@ rspamd_symbols_cache_add_symbol (struct symbols_cache *cache,
        if (item->type & SYMBOL_TYPE_PREFILTER) {
                g_ptr_array_add (cache->prefilters, item);
        }
+       else if (item->type & SYMBOL_TYPE_IDEMPOTENT) {
+               g_ptr_array_add (cache->idempotent, item);
+       }
        else if (item->type & SYMBOL_TYPE_POSTFILTER) {
                g_ptr_array_add (cache->postfilters, item);
        }
@@ -929,6 +939,7 @@ rspamd_symbols_cache_destroy (struct symbols_cache *cache)
                g_ptr_array_free (cache->items_by_id, TRUE);
                g_ptr_array_free (cache->prefilters, TRUE);
                g_ptr_array_free (cache->postfilters, TRUE);
+               g_ptr_array_free (cache->idempotent, TRUE);
                g_ptr_array_free (cache->composites, TRUE);
                REF_RELEASE (cache->items_by_order);
 
@@ -953,6 +964,7 @@ rspamd_symbols_cache_new (struct rspamd_config *cfg)
        cache->items_by_id = g_ptr_array_new ();
        cache->prefilters = g_ptr_array_new ();
        cache->postfilters = g_ptr_array_new ();
+       cache->idempotent = g_ptr_array_new ();
        cache->composites = g_ptr_array_new ();
        cache->mtx = rspamd_mempool_get_mutex (cache->static_pool);
        cache->reload_time = cfg->cache_reload_time;
@@ -1458,7 +1470,8 @@ rspamd_symbols_cache_make_checkpoint (struct rspamd_task *task,
        guint nitems;
 
        nitems = cache->items_by_id->len - cache->postfilters->len -
-                       cache->prefilters->len - cache->composites->len;
+                       cache->prefilters->len - cache->composites->len -
+                       cache->idempotent->len;
 
        if (nitems != cache->items_by_order->d->len) {
                /*
@@ -1835,6 +1848,66 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                        }
                }
 
+               if (all_done) {
+                       checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
+
+                       return TRUE;
+               }
+
+               if (checkpoint->waitq->len == 0 ||
+                               stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
+                       checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
+               }
+
+               if (stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
+                       return rspamd_symbols_cache_process_symbols (task, cache, stage);
+               }
+
+               break;
+
+       case RSPAMD_CACHE_PASS_IDEMPOTENT:
+               /* Check for postfilters */
+               saved_priority = G_MININT;
+
+               for (i = 0; i < (gint)cache->idempotent->len; i ++) {
+                       item = g_ptr_array_index (cache->idempotent, i);
+
+                       if (!isset (checkpoint->processed_bits, item->id * 2) &&
+                                       !isset (checkpoint->processed_bits, item->id * 2 + 1)) {
+                               /* Check priorities */
+                               if (saved_priority == G_MININT) {
+                                       saved_priority = item->priority;
+                               }
+                               else {
+                                       if (item->priority > saved_priority &&
+                                                       rspamd_session_events_pending (task->s) > start_events_pending) {
+                                               /*
+                                                * Delay further checks as we have higher
+                                                * priority filters to be processed
+                                                */
+                                               checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
+                                               return TRUE;
+                                       }
+                               }
+                               rspamd_symbols_cache_check_symbol (task, cache, item,
+                                               checkpoint, &total_microseconds);
+                       }
+               }
+               checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT;
+               break;
+
+       case RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT:
+               all_done = TRUE;
+
+               for (i = 0; i < (gint)cache->idempotent->len; i ++) {
+                       item = g_ptr_array_index (cache->idempotent, i);
+
+                       if (!isset (checkpoint->processed_bits, item->id * 2 + 1)) {
+                               all_done = FALSE;
+                               break;
+                       }
+               }
+
                if (all_done) {
                        checkpoint->pass = RSPAMD_CACHE_PASS_DONE;
 
index ade8d95b933367770f6c36d0f523ca9907cc35ce..f6bbd599b9616d3b9d0549b56405d86f4d3f801b 100644 (file)
@@ -41,6 +41,7 @@ enum rspamd_symbol_type {
        SYMBOL_TYPE_PREFILTER = (1 << 9),
        SYMBOL_TYPE_POSTFILTER = (1 << 10),
        SYMBOL_TYPE_NOSTAT = (1 << 11), /* Skip as statistical symbol */
+       SYMBOL_TYPE_IDEMPOTENT = (1 << 12), /* Symbol cannot change metric */
 };
 
 /**
index 876ab51caff9a34cd78b31aed32261989fe26408..4aa2e21ad243ee821b3b64d7315a25c8c533d1a8 100644 (file)
@@ -779,9 +779,14 @@ rspamd_task_process (struct rspamd_task *task, guint stages)
                }
                break;
 
-       case RSPAMD_TASK_STAGE_DONE:
-               /* Second run of composites processing */
+       case RSPAMD_TASK_STAGE_IDEMPOTENT:
+               /* Second run of composites processing before idempotent filters */
                rspamd_make_composites (task);
+               rspamd_symbols_cache_process_symbols (task, task->cfg->cache,
+                               RSPAMD_TASK_STAGE_IDEMPOTENT);
+               break;
+
+       case RSPAMD_TASK_STAGE_DONE:
                task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
                break;
 
index 1efa453091f7fabc90a012bdcfcd359b9a30f5a9..194003e7235cb904df8b46fff186961022cf05e1 100644 (file)
@@ -50,8 +50,9 @@ enum rspamd_task_stage {
        RSPAMD_TASK_STAGE_LEARN_PRE = (1 << 10),
        RSPAMD_TASK_STAGE_LEARN = (1 << 11),
        RSPAMD_TASK_STAGE_LEARN_POST = (1 << 12),
-       RSPAMD_TASK_STAGE_DONE = (1 << 13),
-       RSPAMD_TASK_STAGE_REPLIED = (1 << 14)
+       RSPAMD_TASK_STAGE_IDEMPOTENT = (1 << 13),
+       RSPAMD_TASK_STAGE_DONE = (1 << 14),
+       RSPAMD_TASK_STAGE_REPLIED = (1 << 15)
 };
 
 #define RSPAMD_TASK_PROCESS_ALL (RSPAMD_TASK_STAGE_CONNECT | \
@@ -67,6 +68,7 @@ enum rspamd_task_stage {
                RSPAMD_TASK_STAGE_LEARN_PRE | \
                RSPAMD_TASK_STAGE_LEARN | \
                RSPAMD_TASK_STAGE_LEARN_POST | \
+               RSPAMD_TASK_STAGE_IDEMPOTENT | \
                RSPAMD_TASK_STAGE_DONE)
 #define RSPAMD_TASK_PROCESS_LEARN (RSPAMD_TASK_STAGE_CONNECT | \
                RSPAMD_TASK_STAGE_ENVELOPE | \