aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2017-08-11 19:04:11 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2017-08-11 19:04:33 +0100
commit7a4c7de7035499eee2d68d525dc003feb18a749b (patch)
treebcd03bde59899bc9a368d5cd735b8760b3096ae9 /src/libserver
parent574aba03daae0891733bf62a35d390a7e637d6a0 (diff)
downloadrspamd-7a4c7de7035499eee2d68d525dc003feb18a749b.tar.gz
rspamd-7a4c7de7035499eee2d68d525dc003feb18a749b.zip
[Feature] Add preliminary support of idempotent symbols
Diffstat (limited to 'src/libserver')
-rw-r--r--src/libserver/symbols_cache.c79
-rw-r--r--src/libserver/symbols_cache.h1
-rw-r--r--src/libserver/task.c9
-rw-r--r--src/libserver/task.h6
4 files changed, 88 insertions, 7 deletions
diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c
index 1c18c5559..4240ba652 100644
--- a/src/libserver/symbols_cache.c
+++ b/src/libserver/symbols_cache.c
@@ -67,6 +67,7 @@ struct symbols_cache {
GPtrArray *prefilters;
GPtrArray *postfilters;
GPtrArray *composites;
+ GPtrArray *idempotent;
GList *delayed_deps;
GList *delayed_conditions;
rspamd_mempool_t *static_pool;
@@ -153,6 +154,8 @@ enum rspamd_cache_savepoint_stage {
RSPAMD_CACHE_PASS_WAIT_FILTERS,
RSPAMD_CACHE_PASS_POSTFILTERS,
RSPAMD_CACHE_PASS_WAIT_POSTFILTERS,
+ RSPAMD_CACHE_PASS_IDEMPOTENT,
+ RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT,
RSPAMD_CACHE_PASS_DONE,
};
@@ -360,7 +363,9 @@ rspamd_symbols_cache_resort (struct symbols_cache *cache)
it = g_ptr_array_index (cache->items_by_id, i);
total_hits += it->st->total_hits;
- if (!(it->type & (SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER|SYMBOL_TYPE_COMPOSITE))) {
+ if (!(it->type & (SYMBOL_TYPE_PREFILTER|
+ SYMBOL_TYPE_POSTFILTER|
+ SYMBOL_TYPE_COMPOSITE))) {
g_ptr_array_add (ord->d, it);
}
}
@@ -468,6 +473,7 @@ rspamd_symbols_cache_post_init (struct symbols_cache *cache)
g_ptr_array_sort_with_data (cache->prefilters, prefilters_cmp, cache);
g_ptr_array_sort_with_data (cache->postfilters, postfilters_cmp, cache);
+ g_ptr_array_sort_with_data (cache->idempotent, postfilters_cmp, cache);
}
static gboolean
@@ -731,7 +737,8 @@ rspamd_symbols_cache_add_symbol (struct symbols_cache *cache,
}
if (type & (SYMBOL_TYPE_CLASSIFIER|SYMBOL_TYPE_CALLBACK|
- SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER)) {
+ SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER|
+ SYMBOL_TYPE_IDEMPOTENT)) {
type |= SYMBOL_TYPE_NOSTAT;
}
@@ -791,6 +798,9 @@ rspamd_symbols_cache_add_symbol (struct symbols_cache *cache,
if (item->type & SYMBOL_TYPE_PREFILTER) {
g_ptr_array_add (cache->prefilters, item);
}
+ else if (item->type & SYMBOL_TYPE_IDEMPOTENT) {
+ g_ptr_array_add (cache->idempotent, item);
+ }
else if (item->type & SYMBOL_TYPE_POSTFILTER) {
g_ptr_array_add (cache->postfilters, item);
}
@@ -929,6 +939,7 @@ rspamd_symbols_cache_destroy (struct symbols_cache *cache)
g_ptr_array_free (cache->items_by_id, TRUE);
g_ptr_array_free (cache->prefilters, TRUE);
g_ptr_array_free (cache->postfilters, TRUE);
+ g_ptr_array_free (cache->idempotent, TRUE);
g_ptr_array_free (cache->composites, TRUE);
REF_RELEASE (cache->items_by_order);
@@ -953,6 +964,7 @@ rspamd_symbols_cache_new (struct rspamd_config *cfg)
cache->items_by_id = g_ptr_array_new ();
cache->prefilters = g_ptr_array_new ();
cache->postfilters = g_ptr_array_new ();
+ cache->idempotent = g_ptr_array_new ();
cache->composites = g_ptr_array_new ();
cache->mtx = rspamd_mempool_get_mutex (cache->static_pool);
cache->reload_time = cfg->cache_reload_time;
@@ -1458,7 +1470,8 @@ rspamd_symbols_cache_make_checkpoint (struct rspamd_task *task,
guint nitems;
nitems = cache->items_by_id->len - cache->postfilters->len -
- cache->prefilters->len - cache->composites->len;
+ cache->prefilters->len - cache->composites->len -
+ cache->idempotent->len;
if (nitems != cache->items_by_order->d->len) {
/*
@@ -1836,6 +1849,66 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
}
if (all_done) {
+ checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
+
+ return TRUE;
+ }
+
+ if (checkpoint->waitq->len == 0 ||
+ stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
+ checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
+ }
+
+ if (stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
+ return rspamd_symbols_cache_process_symbols (task, cache, stage);
+ }
+
+ break;
+
+ case RSPAMD_CACHE_PASS_IDEMPOTENT:
+ /* Check for postfilters */
+ saved_priority = G_MININT;
+
+ for (i = 0; i < (gint)cache->idempotent->len; i ++) {
+ item = g_ptr_array_index (cache->idempotent, i);
+
+ if (!isset (checkpoint->processed_bits, item->id * 2) &&
+ !isset (checkpoint->processed_bits, item->id * 2 + 1)) {
+ /* Check priorities */
+ if (saved_priority == G_MININT) {
+ saved_priority = item->priority;
+ }
+ else {
+ if (item->priority > saved_priority &&
+ rspamd_session_events_pending (task->s) > start_events_pending) {
+ /*
+ * Delay further checks as we have higher
+ * priority filters to be processed
+ */
+ checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
+ return TRUE;
+ }
+ }
+ rspamd_symbols_cache_check_symbol (task, cache, item,
+ checkpoint, &total_microseconds);
+ }
+ }
+ checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT;
+ break;
+
+ case RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT:
+ all_done = TRUE;
+
+ for (i = 0; i < (gint)cache->idempotent->len; i ++) {
+ item = g_ptr_array_index (cache->idempotent, i);
+
+ if (!isset (checkpoint->processed_bits, item->id * 2 + 1)) {
+ all_done = FALSE;
+ break;
+ }
+ }
+
+ if (all_done) {
checkpoint->pass = RSPAMD_CACHE_PASS_DONE;
return TRUE;
diff --git a/src/libserver/symbols_cache.h b/src/libserver/symbols_cache.h
index ade8d95b9..f6bbd599b 100644
--- a/src/libserver/symbols_cache.h
+++ b/src/libserver/symbols_cache.h
@@ -41,6 +41,7 @@ enum rspamd_symbol_type {
SYMBOL_TYPE_PREFILTER = (1 << 9),
SYMBOL_TYPE_POSTFILTER = (1 << 10),
SYMBOL_TYPE_NOSTAT = (1 << 11), /* Skip as statistical symbol */
+ SYMBOL_TYPE_IDEMPOTENT = (1 << 12), /* Symbol cannot change metric */
};
/**
diff --git a/src/libserver/task.c b/src/libserver/task.c
index 876ab51ca..4aa2e21ad 100644
--- a/src/libserver/task.c
+++ b/src/libserver/task.c
@@ -779,9 +779,14 @@ rspamd_task_process (struct rspamd_task *task, guint stages)
}
break;
- case RSPAMD_TASK_STAGE_DONE:
- /* Second run of composites processing */
+ case RSPAMD_TASK_STAGE_IDEMPOTENT:
+ /* Second run of composites processing before idempotent filters */
rspamd_make_composites (task);
+ rspamd_symbols_cache_process_symbols (task, task->cfg->cache,
+ RSPAMD_TASK_STAGE_IDEMPOTENT);
+ break;
+
+ case RSPAMD_TASK_STAGE_DONE:
task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
break;
diff --git a/src/libserver/task.h b/src/libserver/task.h
index 1efa45309..194003e72 100644
--- a/src/libserver/task.h
+++ b/src/libserver/task.h
@@ -50,8 +50,9 @@ enum rspamd_task_stage {
RSPAMD_TASK_STAGE_LEARN_PRE = (1 << 10),
RSPAMD_TASK_STAGE_LEARN = (1 << 11),
RSPAMD_TASK_STAGE_LEARN_POST = (1 << 12),
- RSPAMD_TASK_STAGE_DONE = (1 << 13),
- RSPAMD_TASK_STAGE_REPLIED = (1 << 14)
+ RSPAMD_TASK_STAGE_IDEMPOTENT = (1 << 13),
+ RSPAMD_TASK_STAGE_DONE = (1 << 14),
+ RSPAMD_TASK_STAGE_REPLIED = (1 << 15)
};
#define RSPAMD_TASK_PROCESS_ALL (RSPAMD_TASK_STAGE_CONNECT | \
@@ -67,6 +68,7 @@ enum rspamd_task_stage {
RSPAMD_TASK_STAGE_LEARN_PRE | \
RSPAMD_TASK_STAGE_LEARN | \
RSPAMD_TASK_STAGE_LEARN_POST | \
+ RSPAMD_TASK_STAGE_IDEMPOTENT | \
RSPAMD_TASK_STAGE_DONE)
#define RSPAMD_TASK_PROCESS_LEARN (RSPAMD_TASK_STAGE_CONNECT | \
RSPAMD_TASK_STAGE_ENVELOPE | \