aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver/symbols_cache.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libserver/symbols_cache.c')
-rw-r--r--src/libserver/symbols_cache.c79
1 files changed, 76 insertions, 3 deletions
diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c
index 1c18c5559..4240ba652 100644
--- a/src/libserver/symbols_cache.c
+++ b/src/libserver/symbols_cache.c
@@ -67,6 +67,7 @@ struct symbols_cache {
GPtrArray *prefilters;
GPtrArray *postfilters;
GPtrArray *composites;
+ GPtrArray *idempotent;
GList *delayed_deps;
GList *delayed_conditions;
rspamd_mempool_t *static_pool;
@@ -153,6 +154,8 @@ enum rspamd_cache_savepoint_stage {
RSPAMD_CACHE_PASS_WAIT_FILTERS,
RSPAMD_CACHE_PASS_POSTFILTERS,
RSPAMD_CACHE_PASS_WAIT_POSTFILTERS,
+ RSPAMD_CACHE_PASS_IDEMPOTENT,
+ RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT,
RSPAMD_CACHE_PASS_DONE,
};
@@ -360,7 +363,9 @@ rspamd_symbols_cache_resort (struct symbols_cache *cache)
it = g_ptr_array_index (cache->items_by_id, i);
total_hits += it->st->total_hits;
- if (!(it->type & (SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER|SYMBOL_TYPE_COMPOSITE))) {
+ if (!(it->type & (SYMBOL_TYPE_PREFILTER|
+ SYMBOL_TYPE_POSTFILTER|
+ SYMBOL_TYPE_COMPOSITE))) {
g_ptr_array_add (ord->d, it);
}
}
@@ -468,6 +473,7 @@ rspamd_symbols_cache_post_init (struct symbols_cache *cache)
g_ptr_array_sort_with_data (cache->prefilters, prefilters_cmp, cache);
g_ptr_array_sort_with_data (cache->postfilters, postfilters_cmp, cache);
+ g_ptr_array_sort_with_data (cache->idempotent, postfilters_cmp, cache);
}
static gboolean
@@ -731,7 +737,8 @@ rspamd_symbols_cache_add_symbol (struct symbols_cache *cache,
}
if (type & (SYMBOL_TYPE_CLASSIFIER|SYMBOL_TYPE_CALLBACK|
- SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER)) {
+ SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER|
+ SYMBOL_TYPE_IDEMPOTENT)) {
type |= SYMBOL_TYPE_NOSTAT;
}
@@ -791,6 +798,9 @@ rspamd_symbols_cache_add_symbol (struct symbols_cache *cache,
if (item->type & SYMBOL_TYPE_PREFILTER) {
g_ptr_array_add (cache->prefilters, item);
}
+ else if (item->type & SYMBOL_TYPE_IDEMPOTENT) {
+ g_ptr_array_add (cache->idempotent, item);
+ }
else if (item->type & SYMBOL_TYPE_POSTFILTER) {
g_ptr_array_add (cache->postfilters, item);
}
@@ -929,6 +939,7 @@ rspamd_symbols_cache_destroy (struct symbols_cache *cache)
g_ptr_array_free (cache->items_by_id, TRUE);
g_ptr_array_free (cache->prefilters, TRUE);
g_ptr_array_free (cache->postfilters, TRUE);
+ g_ptr_array_free (cache->idempotent, TRUE);
g_ptr_array_free (cache->composites, TRUE);
REF_RELEASE (cache->items_by_order);
@@ -953,6 +964,7 @@ rspamd_symbols_cache_new (struct rspamd_config *cfg)
cache->items_by_id = g_ptr_array_new ();
cache->prefilters = g_ptr_array_new ();
cache->postfilters = g_ptr_array_new ();
+ cache->idempotent = g_ptr_array_new ();
cache->composites = g_ptr_array_new ();
cache->mtx = rspamd_mempool_get_mutex (cache->static_pool);
cache->reload_time = cfg->cache_reload_time;
@@ -1458,7 +1470,8 @@ rspamd_symbols_cache_make_checkpoint (struct rspamd_task *task,
guint nitems;
nitems = cache->items_by_id->len - cache->postfilters->len -
- cache->prefilters->len - cache->composites->len;
+ cache->prefilters->len - cache->composites->len -
+ cache->idempotent->len;
if (nitems != cache->items_by_order->d->len) {
/*
@@ -1836,6 +1849,66 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
}
if (all_done) {
+ checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
+
+ return TRUE;
+ }
+
+ if (checkpoint->waitq->len == 0 ||
+ stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
+ checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
+ }
+
+ if (stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
+ return rspamd_symbols_cache_process_symbols (task, cache, stage);
+ }
+
+ break;
+
+ case RSPAMD_CACHE_PASS_IDEMPOTENT:
+ /* Check for postfilters */
+ saved_priority = G_MININT;
+
+ for (i = 0; i < (gint)cache->idempotent->len; i ++) {
+ item = g_ptr_array_index (cache->idempotent, i);
+
+ if (!isset (checkpoint->processed_bits, item->id * 2) &&
+ !isset (checkpoint->processed_bits, item->id * 2 + 1)) {
+ /* Check priorities */
+ if (saved_priority == G_MININT) {
+ saved_priority = item->priority;
+ }
+ else {
+ if (item->priority > saved_priority &&
+ rspamd_session_events_pending (task->s) > start_events_pending) {
+ /*
+ * Delay further checks as we have higher
+ * priority filters to be processed
+ */
+ checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
+ return TRUE;
+ }
+ }
+ rspamd_symbols_cache_check_symbol (task, cache, item,
+ checkpoint, &total_microseconds);
+ }
+ }
+ checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT;
+ break;
+
+ case RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT:
+ all_done = TRUE;
+
+ for (i = 0; i < (gint)cache->idempotent->len; i ++) {
+ item = g_ptr_array_index (cache->idempotent, i);
+
+ if (!isset (checkpoint->processed_bits, item->id * 2 + 1)) {
+ all_done = FALSE;
+ break;
+ }
+ }
+
+ if (all_done) {
checkpoint->pass = RSPAMD_CACHE_PASS_DONE;
return TRUE;