diff options
author | Vsevolod Stakhov <vsevolod@rspamd.com> | 2022-04-30 18:42:57 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rspamd.com> | 2022-04-30 18:42:57 +0100 |
commit | eb743eebb8968bf74b714065fa1976f9508d73ec (patch) | |
tree | 70e67692fe1ce676d1d046989aeec4459cf1855f | |
parent | f4b361fac864fe2ac0b8b2e2d7cd277bf93e5e73 (diff) | |
download | rspamd-eb743eebb8968bf74b714065fa1976f9508d73ec.tar.gz rspamd-eb743eebb8968bf74b714065fa1976f9508d73ec.zip |
[Project] Rework symbols execution
-rw-r--r-- | src/libserver/symcache/symcache_c.cxx | 15 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_internal.hxx | 45 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_item.hxx | 27 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_runtime.cxx | 217 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_runtime.hxx | 22 |
5 files changed, 319 insertions, 7 deletions
diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx index ba3ccd9a6..bef932488 100644 --- a/src/libserver/symcache/symcache_c.cxx +++ b/src/libserver/symcache/symcache_c.cxx @@ -528,4 +528,19 @@ rspamd_symcache_composites_foreach(struct rspamd_task *task, dyn_item->finished = true; } }); +} + +gboolean +rspamd_symcache_process_symbols(struct rspamd_task *task, + struct rspamd_symcache *cache, + gint stage) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + if (task->symcache_runtime == nullptr) { + task->symcache_runtime = rspamd::symcache::symcache_runtime::create(task, *real_cache); + } + + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + return cache_runtime->process_symbols(task, *real_cache, stage); }
\ No newline at end of file diff --git a/src/libserver/symcache/symcache_internal.hxx b/src/libserver/symcache/symcache_internal.hxx index 1b4d5a509..00c0d4d8b 100644 --- a/src/libserver/symcache/symcache_internal.hxx +++ b/src/libserver/symcache/symcache_internal.hxx @@ -44,6 +44,10 @@ "symcache", log_tag(), \ RSPAMD_LOG_FUNC, \ __VA_ARGS__) +#define msg_err_cache_task(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ + "symcache", task->task_pool->tag.uid, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) #define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ "symcache", log_tag(), \ RSPAMD_LOG_FUNC, \ @@ -368,6 +372,47 @@ public: } /** + * Iterate over all composites using a specific functor + * @tparam Functor + * @param f + */ + template<typename Functor> + auto connfilters_foreach(Functor f) -> bool { + return std::all_of(std::begin(connfilters), std::end(connfilters), + [&](const auto &sym_it){ + return f(sym_it.get()); + }); + } + template<typename Functor> + auto prefilters_foreach(Functor f) -> bool { + return std::all_of(std::begin(prefilters), std::end(prefilters), + [&](const auto &sym_it){ + return f(sym_it.get()); + }); + } + template<typename Functor> + auto postfilters_foreach(Functor f) -> bool { + return std::all_of(std::begin(postfilters), std::end(postfilters), + [&](const auto &sym_it){ + return f(sym_it.get()); + }); + } + template<typename Functor> + auto idempotent_foreach(Functor f) -> bool { + return std::all_of(std::begin(idempotent), std::end(idempotent), + [&](const auto &sym_it){ + return f(sym_it.get()); + }); + } + template<typename Functor> + auto filters_foreach(Functor f) -> bool { + return std::all_of(std::begin(filters), std::end(filters), + [&](const auto &sym_it){ + return f(sym_it.get()); + }); + } + + /** * Resort cache if anything has been changed since last time * @return */ diff --git a/src/libserver/symcache/symcache_item.hxx b/src/libserver/symcache/symcache_item.hxx index a194e9b0d..62297a7da 100644 --- a/src/libserver/symcache/symcache_item.hxx +++ b/src/libserver/symcache/symcache_item.hxx @@ -88,9 +88,9 @@ public: conditions.emplace_back(L, cbref); } - auto call() -> void + auto call(struct rspamd_task *task, struct rspamd_symcache_item *item) const -> void { - // TODO + func(task, item, user_data); } auto check_conditions(std::string_view sym_name, struct rspamd_task *task) const -> bool { @@ -320,6 +320,29 @@ public: return nullptr; } + /** + * Check all conditions for an item + * @param task + * @return + */ + auto check_conditions(struct rspamd_task *task) const -> auto { + if (std::holds_alternative<normal_item>(specific)) { + const auto &filter_data = std::get<normal_item>(specific); + + return filter_data.check_conditions(symbol, task); + } + + return false; + } + + auto call(struct rspamd_task *task) const -> void { + if (std::holds_alternative<normal_item>(specific)) { + const auto &filter_data = std::get<normal_item>(specific); + + filter_data.call(task, (struct rspamd_symcache_item *)this); + } + } + private: /** * Constructor for a normal symbols with callback diff --git a/src/libserver/symcache/symcache_runtime.cxx b/src/libserver/symcache/symcache_runtime.cxx index a6ccd5509..f0284f740 100644 --- a/src/libserver/symcache/symcache_runtime.cxx +++ b/src/libserver/symcache/symcache_runtime.cxx @@ -19,6 +19,8 @@ #include "symcache_runtime.hxx" #include "libutil/cxx/util.hxx" #include "libserver/task.h" +#include "libmime/scan_result.h" +#include <limits> namespace rspamd::symcache { @@ -30,7 +32,7 @@ constexpr static const auto PROFILE_MESSAGE_SIZE_THRESHOLD = 1024ul * 1024 * 2; constexpr static const auto PROFILE_PROBABILITY = 0.01; auto -symcache_runtime::create_savepoint(struct rspamd_task *task, symcache &cache) -> symcache_runtime * +symcache_runtime::create(struct rspamd_task *task, symcache &cache) -> symcache_runtime * { cache.maybe_resort(); @@ -43,7 +45,7 @@ symcache_runtime::create_savepoint(struct rspamd_task *task, symcache &cache) -> rspamd_mempool_add_destructor(task->task_pool, symcache_runtime::savepoint_dtor, checkpoint); - for (auto &pair : checkpoint->last_id_mappings) { + for (auto &pair: checkpoint->last_id_mappings) { pair.first = -1; pair.second = -1; } @@ -274,7 +276,7 @@ auto symcache_runtime::get_dynamic_item(int id, bool save_in_cache) const -> cac { /* Lookup in cache */ if (save_in_cache) { - for (const auto &cache_id : last_id_mappings) { + for (const auto &cache_id: last_id_mappings) { if (cache_id.first == -1) { break; } @@ -324,5 +326,214 @@ auto symcache_runtime::get_dynamic_item(int id, bool save_in_cache) const -> cac return nullptr; } +auto symcache_runtime::process_symbols(struct rspamd_task *task, symcache &cache, int stage) -> bool +{ + msg_debug_cache_task("symbols processing stage at pass: %d", stage); + + if (RSPAMD_TASK_IS_SKIPPED(task)) { + return true; + } + + switch (stage) { + case RSPAMD_TASK_STAGE_CONNFILTERS: + case RSPAMD_TASK_STAGE_PRE_FILTERS: + case RSPAMD_TASK_STAGE_POST_FILTERS: + case RSPAMD_TASK_STAGE_IDEMPOTENT: + return process_pre_postfilters(task, cache, + rspamd_session_events_pending(task->s), stage); + break; + + case RSPAMD_TASK_STAGE_FILTERS: + return process_filters(task, cache,rspamd_session_events_pending(task->s)); + break; + + default: + g_assert_not_reached (); + } +} + +auto +symcache_runtime::process_pre_postfilters(struct rspamd_task *task, + symcache &cache, + int start_events, + int stage) -> bool +{ + auto saved_priority = std::numeric_limits<int>::min(); + auto all_done = true; + auto compare_functor = +[](int a, int b) { return a < b; }; + + auto proc_func = [&](cache_item *item) { + auto dyn_item = get_dynamic_item(item->id, true); + + if (!dyn_item->started && !dyn_item->finished) { + if (has_slow) { + /* Delay */ + has_slow = false; + + return false; + } + + if (saved_priority == std::numeric_limits<int>::min()) { + saved_priority = item->priority; + } + else { + if (compare_functor(item->priority, saved_priority) && + rspamd_session_events_pending(task->s) > start_events) { + /* + * Delay further checks as we have higher + * priority filters to be processed + */ + return false; + } + } + + process_symbol(task, cache, item, dyn_item); + all_done = false; + } + + /* Continue processing */ + return true; + }; + + switch (stage) { + case RSPAMD_TASK_STAGE_CONNFILTERS: + all_done = cache.connfilters_foreach(proc_func); + break; + case RSPAMD_TASK_STAGE_PRE_FILTERS: + all_done = cache.prefilters_foreach(proc_func); + break; + case RSPAMD_TASK_STAGE_POST_FILTERS: + compare_functor = +[](int a, int b) { return a > b; }; + all_done = cache.postfilters_foreach(proc_func); + break; + case RSPAMD_TASK_STAGE_IDEMPOTENT: + compare_functor = +[](int a, int b) { return a > b; }; + all_done = cache.idempotent_foreach(proc_func); + break; + default: + g_error("invalid invocation"); + break; + } + + return all_done; +} + +auto +symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool +{ + auto all_done = true; + + cache.filters_foreach([&](cache_item *item) -> bool { + if (item->type == symcache_item_type::CLASSIFIER) { + return true; + } + + auto dyn_item = get_dynamic_item(item->id, true); + + if (!dyn_item->started && !dyn_item->finished) { + all_done = false; + + if (!rspamd_symcache_check_deps(task, cache, item, + checkpoint, 0, FALSE)) { + msg_debug_cache_task ("blocked execution of %d(%s) unless deps are " + "resolved", item->id, item->symbol.c_str()); + + return true; + } + + process_symbol(task, cache, item, dyn_item); + + if (has_slow) { + /* Delay */ + has_slow = false; + + return false; + } + } + + if (!(item->flags & SYMBOL_TYPE_FINE)) { + if (rspamd_symcache_metric_limit(task, checkpoint)) { + msg_info_task ("task has already scored more than %.2f, so do " + "not " + "plan more checks", + rs->score); + all_done = true; + return false; + } + } + }); + + return all_done; +} + +auto +symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, cache_item *item, + cache_dynamic_item *dyn_item) -> bool +{ + if (item->type == symcache_item_type::CLASSIFIER || item->type == symcache_item_type::COMPOSITE) { + /* Classifiers are special :( */ + return true; + } + + if (rspamd_session_blocked(task->s)) { + /* + * We cannot add new events as session is either destroyed or + * being cleaned up. + */ + return true; + } + + g_assert (!item->is_virtual()); + if (dyn_item->started) { + /* + * This can actually happen when deps span over different layers + */ + return dyn_item->finished; + } + + /* Check has been started */ + dyn_item->started = true; + auto check = true; + + if (!item->is_allowed(task, true) || !item->check_conditions(task)) { + check = false; + } + + if (check) { + msg_debug_cache_task("execute %s, %d; symbol type = %s", item->symbol.data(), + item->id); + + if (profile) { + ev_now_update_if_cheap(task->event_loop); + dyn_item->start_msec = (ev_now(task->event_loop) - + profile_start) * 1e3; + } + + dyn_item->async_events = 0; + cur_item = item; + items_inflight++; + /* Callback now must finalize itself */ + item->call(task); + cur_item = NULL; + + if (items_inflight == 0) { + return true; + } + + if (dyn_item->async_events == 0 && !dyn_item->finished) { + msg_err_cache_task("critical error: item %s has no async events pending, " + "but it is not finalised", item->symbol.data()); + g_assert_not_reached (); + } + + return false; + } + else { + dyn_item->finished = true; + } + + return true; +} + } diff --git a/src/libserver/symcache/symcache_runtime.hxx b/src/libserver/symcache/symcache_runtime.hxx index a505f22e8..837b1eee2 100644 --- a/src/libserver/symcache/symcache_runtime.hxx +++ b/src/libserver/symcache/symcache_runtime.hxx @@ -27,6 +27,8 @@ #include "symcache_internal.hxx" +struct rspamd_scan_result; + namespace rspamd::symcache { /** * These items are saved within task structure and are used to track @@ -52,7 +54,7 @@ class symcache_runtime { double profile_start; double lim; - struct rspamd_scan_result *rs; + struct ::rspamd_scan_result *rs; struct cache_item *cur_item; order_generation_ptr order; @@ -69,6 +71,13 @@ class symcache_runtime { /* Drop shared ownership */ real_savepoint->order.reset(); } + + auto process_symbol(struct rspamd_task *task, symcache &cache, cache_item *item, + cache_dynamic_item *dyn_item) -> bool; + /* Specific stages of the processing */ + auto process_pre_postfilters(struct rspamd_task *task, symcache &cache, int start_events, int stage) -> bool; + auto process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool; + public: /** * Creates a cache runtime using task mempool @@ -76,7 +85,7 @@ public: * @param cache * @return */ - static auto create_savepoint(struct rspamd_task *task, symcache &cache) -> symcache_runtime *; + static auto create(struct rspamd_task *task, symcache &cache) -> symcache_runtime *; /** * Process task settings * @param task @@ -155,6 +164,15 @@ public: * @return */ auto get_dynamic_item(int id, bool save_in_cache) const -> cache_dynamic_item *; + + /** + * Process symbols in the cache + * @param task + * @param cache + * @param stage + * @return + */ + auto process_symbols(struct rspamd_task *task, symcache &cache, int stage) -> bool; }; |