From f49b0099acbc16bcbc07f5b47169143bd2c3aef8 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sat, 30 Apr 2022 19:26:28 +0100 Subject: [PATCH] [Project] Add symbols processing methods --- src/libserver/symcache/symcache_runtime.cxx | 127 +++++++++++++++++++- src/libserver/symcache/symcache_runtime.hxx | 3 + 2 files changed, 126 insertions(+), 4 deletions(-) diff --git a/src/libserver/symcache/symcache_runtime.cxx b/src/libserver/symcache/symcache_runtime.cxx index f0284f740..47e6d4c6e 100644 --- a/src/libserver/symcache/symcache_runtime.cxx +++ b/src/libserver/symcache/symcache_runtime.cxx @@ -21,6 +21,7 @@ #include "libserver/task.h" #include "libmime/scan_result.h" #include +#include namespace rspamd::symcache { @@ -433,9 +434,9 @@ symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache, int if (!dyn_item->started && !dyn_item->finished) { all_done = false; - if (!rspamd_symcache_check_deps(task, cache, item, - checkpoint, 0, FALSE)) { - msg_debug_cache_task ("blocked execution of %d(%s) unless deps are " + if (!check_item_deps(task, cache, item, + dyn_item, false)) { + msg_debug_cache_task("blocked execution of %d(%s) unless deps are " "resolved", item->id, item->symbol.c_str()); return true; @@ -452,7 +453,7 @@ symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache, int } if (!(item->flags & SYMBOL_TYPE_FINE)) { - if (rspamd_symcache_metric_limit(task, checkpoint)) { + if (check_metric_limit(task)) { msg_info_task ("task has already scored more than %.2f, so do " "not " "plan more checks", @@ -535,5 +536,123 @@ symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, cach return true; } +auto +symcache_runtime::check_metric_limit(struct rspamd_task *task) -> bool +{ + if (task->flags & RSPAMD_TASK_FLAG_PASS_ALL) { + return false; + } + + if (lim == 0.0) { + auto *res = task->result; + + if (res) { + auto ms = rspamd_task_get_required_score(task, res); + + if (!::isnan(ms) && lim < ms) { + rs = res; + lim = ms; + } + } + } + + if (rs) { + + if (rs->score > lim) { + return true; + } + } + else { + /* No reject score define, always check all rules */ + lim = -1; + } + + return false; +} + +auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache, cache_item *item, + cache_dynamic_item *dyn_item, bool check_only) -> bool +{ + static const guint max_recursion = 20; + + auto inner_functor = [&](int recursion, cache_item *item, cache_dynamic_item *dyn_item, auto rec_functor) -> bool { + if (recursion > max_recursion) { + msg_err_task ("cyclic dependencies: maximum check level %ud exceed when " + "checking dependencies for %s", max_recursion, item->symbol.c_str()); + + return true; + } + + auto ret = true; + + if (!item->deps.empty()) { + + for (const auto &dep : item->deps) { + if (!dep.item) { + /* Assume invalid deps as done */ + msg_debug_cache_task("symbol %d(%s) has invalid dependencies on %d(%s)", + item->id, item->symbol.c_str(), dep.id, dep.sym.c_str()); + continue; + } + + auto *dep_dyn_item = get_dynamic_item(dep.item->id, true); + + if (!dep_dyn_item->finished) { + if (!dep_dyn_item->started) { + /* Not started */ + if (!check_only) { + if (!rec_functor(recursion + 1, + dep.item.get(), + dep_dyn_item, + rec_functor)) { + + ret = false; + msg_debug_cache_task("delayed dependency %d(%s) for " + "symbol %d(%s)", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + } + else if (!process_symbol(task, cache,dep.item.get(),dep_dyn_item)) { + /* Now started, but has events pending */ + ret = false; + msg_debug_cache_task("started check of %d(%s) symbol " + "as dep for " + "%d(%s)", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + } + else { + msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is " + "already processed", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + } + } + else { + msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) " + "cannot be started now", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + ret = false; + } + } + else { + /* Started but not finished */ + msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is " + "still executing", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + ret = false; + } + } + else { + msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is already " + "checked", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + } + } + } + + return ret; + }; + + return inner_functor(0, item, dyn_item, inner_functor); +} + } diff --git a/src/libserver/symcache/symcache_runtime.hxx b/src/libserver/symcache/symcache_runtime.hxx index 837b1eee2..e63d236e7 100644 --- a/src/libserver/symcache/symcache_runtime.hxx +++ b/src/libserver/symcache/symcache_runtime.hxx @@ -77,6 +77,9 @@ class symcache_runtime { /* Specific stages of the processing */ auto process_pre_postfilters(struct rspamd_task *task, symcache &cache, int start_events, int stage) -> bool; auto process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool; + auto check_metric_limit(struct rspamd_task *task) -> bool; + auto check_item_deps(struct rspamd_task *task, symcache &cache, cache_item *item, + cache_dynamic_item *dyn_item, bool check_only) -> bool; public: /** -- 2.39.5