summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2022-04-30 18:42:57 +0100
committerVsevolod Stakhov <vsevolod@rspamd.com>2022-04-30 18:42:57 +0100
commiteb743eebb8968bf74b714065fa1976f9508d73ec (patch)
tree70e67692fe1ce676d1d046989aeec4459cf1855f
parentf4b361fac864fe2ac0b8b2e2d7cd277bf93e5e73 (diff)
downloadrspamd-eb743eebb8968bf74b714065fa1976f9508d73ec.tar.gz
rspamd-eb743eebb8968bf74b714065fa1976f9508d73ec.zip
[Project] Rework symbols execution
-rw-r--r--src/libserver/symcache/symcache_c.cxx15
-rw-r--r--src/libserver/symcache/symcache_internal.hxx45
-rw-r--r--src/libserver/symcache/symcache_item.hxx27
-rw-r--r--src/libserver/symcache/symcache_runtime.cxx217
-rw-r--r--src/libserver/symcache/symcache_runtime.hxx22
5 files changed, 319 insertions, 7 deletions
diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx
index ba3ccd9a6..bef932488 100644
--- a/src/libserver/symcache/symcache_c.cxx
+++ b/src/libserver/symcache/symcache_c.cxx
@@ -528,4 +528,19 @@ rspamd_symcache_composites_foreach(struct rspamd_task *task,
dyn_item->finished = true;
}
});
+}
+
+gboolean
+rspamd_symcache_process_symbols(struct rspamd_task *task,
+ struct rspamd_symcache *cache,
+ gint stage)
+{
+ auto *real_cache = C_API_SYMCACHE(cache);
+
+ if (task->symcache_runtime == nullptr) {
+ task->symcache_runtime = rspamd::symcache::symcache_runtime::create(task, *real_cache);
+ }
+
+ auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime);
+ return cache_runtime->process_symbols(task, *real_cache, stage);
} \ No newline at end of file
diff --git a/src/libserver/symcache/symcache_internal.hxx b/src/libserver/symcache/symcache_internal.hxx
index 1b4d5a509..00c0d4d8b 100644
--- a/src/libserver/symcache/symcache_internal.hxx
+++ b/src/libserver/symcache/symcache_internal.hxx
@@ -44,6 +44,10 @@
"symcache", log_tag(), \
RSPAMD_LOG_FUNC, \
__VA_ARGS__)
+#define msg_err_cache_task(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
+ "symcache", task->task_pool->tag.uid, \
+ RSPAMD_LOG_FUNC, \
+ __VA_ARGS__)
#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
"symcache", log_tag(), \
RSPAMD_LOG_FUNC, \
@@ -368,6 +372,47 @@ public:
}
/**
+ * Iterate over all composites using a specific functor
+ * @tparam Functor
+ * @param f
+ */
+ template<typename Functor>
+ auto connfilters_foreach(Functor f) -> bool {
+ return std::all_of(std::begin(connfilters), std::end(connfilters),
+ [&](const auto &sym_it){
+ return f(sym_it.get());
+ });
+ }
+ template<typename Functor>
+ auto prefilters_foreach(Functor f) -> bool {
+ return std::all_of(std::begin(prefilters), std::end(prefilters),
+ [&](const auto &sym_it){
+ return f(sym_it.get());
+ });
+ }
+ template<typename Functor>
+ auto postfilters_foreach(Functor f) -> bool {
+ return std::all_of(std::begin(postfilters), std::end(postfilters),
+ [&](const auto &sym_it){
+ return f(sym_it.get());
+ });
+ }
+ template<typename Functor>
+ auto idempotent_foreach(Functor f) -> bool {
+ return std::all_of(std::begin(idempotent), std::end(idempotent),
+ [&](const auto &sym_it){
+ return f(sym_it.get());
+ });
+ }
+ template<typename Functor>
+ auto filters_foreach(Functor f) -> bool {
+ return std::all_of(std::begin(filters), std::end(filters),
+ [&](const auto &sym_it){
+ return f(sym_it.get());
+ });
+ }
+
+ /**
* Resort cache if anything has been changed since last time
* @return
*/
diff --git a/src/libserver/symcache/symcache_item.hxx b/src/libserver/symcache/symcache_item.hxx
index a194e9b0d..62297a7da 100644
--- a/src/libserver/symcache/symcache_item.hxx
+++ b/src/libserver/symcache/symcache_item.hxx
@@ -88,9 +88,9 @@ public:
conditions.emplace_back(L, cbref);
}
- auto call() -> void
+ auto call(struct rspamd_task *task, struct rspamd_symcache_item *item) const -> void
{
- // TODO
+ func(task, item, user_data);
}
auto check_conditions(std::string_view sym_name, struct rspamd_task *task) const -> bool {
@@ -320,6 +320,29 @@ public:
return nullptr;
}
+ /**
+ * Check all conditions for an item
+ * @param task
+ * @return
+ */
+ auto check_conditions(struct rspamd_task *task) const -> auto {
+ if (std::holds_alternative<normal_item>(specific)) {
+ const auto &filter_data = std::get<normal_item>(specific);
+
+ return filter_data.check_conditions(symbol, task);
+ }
+
+ return false;
+ }
+
+ auto call(struct rspamd_task *task) const -> void {
+ if (std::holds_alternative<normal_item>(specific)) {
+ const auto &filter_data = std::get<normal_item>(specific);
+
+ filter_data.call(task, (struct rspamd_symcache_item *)this);
+ }
+ }
+
private:
/**
* Constructor for a normal symbols with callback
diff --git a/src/libserver/symcache/symcache_runtime.cxx b/src/libserver/symcache/symcache_runtime.cxx
index a6ccd5509..f0284f740 100644
--- a/src/libserver/symcache/symcache_runtime.cxx
+++ b/src/libserver/symcache/symcache_runtime.cxx
@@ -19,6 +19,8 @@
#include "symcache_runtime.hxx"
#include "libutil/cxx/util.hxx"
#include "libserver/task.h"
+#include "libmime/scan_result.h"
+#include <limits>
namespace rspamd::symcache {
@@ -30,7 +32,7 @@ constexpr static const auto PROFILE_MESSAGE_SIZE_THRESHOLD = 1024ul * 1024 * 2;
constexpr static const auto PROFILE_PROBABILITY = 0.01;
auto
-symcache_runtime::create_savepoint(struct rspamd_task *task, symcache &cache) -> symcache_runtime *
+symcache_runtime::create(struct rspamd_task *task, symcache &cache) -> symcache_runtime *
{
cache.maybe_resort();
@@ -43,7 +45,7 @@ symcache_runtime::create_savepoint(struct rspamd_task *task, symcache &cache) ->
rspamd_mempool_add_destructor(task->task_pool,
symcache_runtime::savepoint_dtor, checkpoint);
- for (auto &pair : checkpoint->last_id_mappings) {
+ for (auto &pair: checkpoint->last_id_mappings) {
pair.first = -1;
pair.second = -1;
}
@@ -274,7 +276,7 @@ auto symcache_runtime::get_dynamic_item(int id, bool save_in_cache) const -> cac
{
/* Lookup in cache */
if (save_in_cache) {
- for (const auto &cache_id : last_id_mappings) {
+ for (const auto &cache_id: last_id_mappings) {
if (cache_id.first == -1) {
break;
}
@@ -324,5 +326,214 @@ auto symcache_runtime::get_dynamic_item(int id, bool save_in_cache) const -> cac
return nullptr;
}
+auto symcache_runtime::process_symbols(struct rspamd_task *task, symcache &cache, int stage) -> bool
+{
+ msg_debug_cache_task("symbols processing stage at pass: %d", stage);
+
+ if (RSPAMD_TASK_IS_SKIPPED(task)) {
+ return true;
+ }
+
+ switch (stage) {
+ case RSPAMD_TASK_STAGE_CONNFILTERS:
+ case RSPAMD_TASK_STAGE_PRE_FILTERS:
+ case RSPAMD_TASK_STAGE_POST_FILTERS:
+ case RSPAMD_TASK_STAGE_IDEMPOTENT:
+ return process_pre_postfilters(task, cache,
+ rspamd_session_events_pending(task->s), stage);
+ break;
+
+ case RSPAMD_TASK_STAGE_FILTERS:
+ return process_filters(task, cache,rspamd_session_events_pending(task->s));
+ break;
+
+ default:
+ g_assert_not_reached ();
+ }
+}
+
+auto
+symcache_runtime::process_pre_postfilters(struct rspamd_task *task,
+ symcache &cache,
+ int start_events,
+ int stage) -> bool
+{
+ auto saved_priority = std::numeric_limits<int>::min();
+ auto all_done = true;
+ auto compare_functor = +[](int a, int b) { return a < b; };
+
+ auto proc_func = [&](cache_item *item) {
+ auto dyn_item = get_dynamic_item(item->id, true);
+
+ if (!dyn_item->started && !dyn_item->finished) {
+ if (has_slow) {
+ /* Delay */
+ has_slow = false;
+
+ return false;
+ }
+
+ if (saved_priority == std::numeric_limits<int>::min()) {
+ saved_priority = item->priority;
+ }
+ else {
+ if (compare_functor(item->priority, saved_priority) &&
+ rspamd_session_events_pending(task->s) > start_events) {
+ /*
+ * Delay further checks as we have higher
+ * priority filters to be processed
+ */
+ return false;
+ }
+ }
+
+ process_symbol(task, cache, item, dyn_item);
+ all_done = false;
+ }
+
+ /* Continue processing */
+ return true;
+ };
+
+ switch (stage) {
+ case RSPAMD_TASK_STAGE_CONNFILTERS:
+ all_done = cache.connfilters_foreach(proc_func);
+ break;
+ case RSPAMD_TASK_STAGE_PRE_FILTERS:
+ all_done = cache.prefilters_foreach(proc_func);
+ break;
+ case RSPAMD_TASK_STAGE_POST_FILTERS:
+ compare_functor = +[](int a, int b) { return a > b; };
+ all_done = cache.postfilters_foreach(proc_func);
+ break;
+ case RSPAMD_TASK_STAGE_IDEMPOTENT:
+ compare_functor = +[](int a, int b) { return a > b; };
+ all_done = cache.idempotent_foreach(proc_func);
+ break;
+ default:
+ g_error("invalid invocation");
+ break;
+ }
+
+ return all_done;
+}
+
+auto
+symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool
+{
+ auto all_done = true;
+
+ cache.filters_foreach([&](cache_item *item) -> bool {
+ if (item->type == symcache_item_type::CLASSIFIER) {
+ return true;
+ }
+
+ auto dyn_item = get_dynamic_item(item->id, true);
+
+ if (!dyn_item->started && !dyn_item->finished) {
+ all_done = false;
+
+ if (!rspamd_symcache_check_deps(task, cache, item,
+ checkpoint, 0, FALSE)) {
+ msg_debug_cache_task ("blocked execution of %d(%s) unless deps are "
+ "resolved", item->id, item->symbol.c_str());
+
+ return true;
+ }
+
+ process_symbol(task, cache, item, dyn_item);
+
+ if (has_slow) {
+ /* Delay */
+ has_slow = false;
+
+ return false;
+ }
+ }
+
+ if (!(item->flags & SYMBOL_TYPE_FINE)) {
+ if (rspamd_symcache_metric_limit(task, checkpoint)) {
+ msg_info_task ("task has already scored more than %.2f, so do "
+ "not "
+ "plan more checks",
+ rs->score);
+ all_done = true;
+ return false;
+ }
+ }
+ });
+
+ return all_done;
+}
+
+auto
+symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, cache_item *item,
+ cache_dynamic_item *dyn_item) -> bool
+{
+ if (item->type == symcache_item_type::CLASSIFIER || item->type == symcache_item_type::COMPOSITE) {
+ /* Classifiers are special :( */
+ return true;
+ }
+
+ if (rspamd_session_blocked(task->s)) {
+ /*
+ * We cannot add new events as session is either destroyed or
+ * being cleaned up.
+ */
+ return true;
+ }
+
+ g_assert (!item->is_virtual());
+ if (dyn_item->started) {
+ /*
+ * This can actually happen when deps span over different layers
+ */
+ return dyn_item->finished;
+ }
+
+ /* Check has been started */
+ dyn_item->started = true;
+ auto check = true;
+
+ if (!item->is_allowed(task, true) || !item->check_conditions(task)) {
+ check = false;
+ }
+
+ if (check) {
+ msg_debug_cache_task("execute %s, %d; symbol type = %s", item->symbol.data(),
+ item->id);
+
+ if (profile) {
+ ev_now_update_if_cheap(task->event_loop);
+ dyn_item->start_msec = (ev_now(task->event_loop) -
+ profile_start) * 1e3;
+ }
+
+ dyn_item->async_events = 0;
+ cur_item = item;
+ items_inflight++;
+ /* Callback now must finalize itself */
+ item->call(task);
+ cur_item = NULL;
+
+ if (items_inflight == 0) {
+ return true;
+ }
+
+ if (dyn_item->async_events == 0 && !dyn_item->finished) {
+ msg_err_cache_task("critical error: item %s has no async events pending, "
+ "but it is not finalised", item->symbol.data());
+ g_assert_not_reached ();
+ }
+
+ return false;
+ }
+ else {
+ dyn_item->finished = true;
+ }
+
+ return true;
+}
+
}
diff --git a/src/libserver/symcache/symcache_runtime.hxx b/src/libserver/symcache/symcache_runtime.hxx
index a505f22e8..837b1eee2 100644
--- a/src/libserver/symcache/symcache_runtime.hxx
+++ b/src/libserver/symcache/symcache_runtime.hxx
@@ -27,6 +27,8 @@
#include "symcache_internal.hxx"
+struct rspamd_scan_result;
+
namespace rspamd::symcache {
/**
* These items are saved within task structure and are used to track
@@ -52,7 +54,7 @@ class symcache_runtime {
double profile_start;
double lim;
- struct rspamd_scan_result *rs;
+ struct ::rspamd_scan_result *rs;
struct cache_item *cur_item;
order_generation_ptr order;
@@ -69,6 +71,13 @@ class symcache_runtime {
/* Drop shared ownership */
real_savepoint->order.reset();
}
+
+ auto process_symbol(struct rspamd_task *task, symcache &cache, cache_item *item,
+ cache_dynamic_item *dyn_item) -> bool;
+ /* Specific stages of the processing */
+ auto process_pre_postfilters(struct rspamd_task *task, symcache &cache, int start_events, int stage) -> bool;
+ auto process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool;
+
public:
/**
* Creates a cache runtime using task mempool
@@ -76,7 +85,7 @@ public:
* @param cache
* @return
*/
- static auto create_savepoint(struct rspamd_task *task, symcache &cache) -> symcache_runtime *;
+ static auto create(struct rspamd_task *task, symcache &cache) -> symcache_runtime *;
/**
* Process task settings
* @param task
@@ -155,6 +164,15 @@ public:
* @return
*/
auto get_dynamic_item(int id, bool save_in_cache) const -> cache_dynamic_item *;
+
+ /**
+ * Process symbols in the cache
+ * @param task
+ * @param cache
+ * @param stage
+ * @return
+ */
+ auto process_symbols(struct rspamd_task *task, symcache &cache, int stage) -> bool;
};