]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Rework symbols execution
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 30 Apr 2022 17:42:57 +0000 (18:42 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 30 Apr 2022 17:42:57 +0000 (18:42 +0100)
src/libserver/symcache/symcache_c.cxx
src/libserver/symcache/symcache_internal.hxx
src/libserver/symcache/symcache_item.hxx
src/libserver/symcache/symcache_runtime.cxx
src/libserver/symcache/symcache_runtime.hxx

index ba3ccd9a673fa5705b871e2d0618cb94b763afd6..bef932488fecab44ca79b9221074bea3558e2884 100644 (file)
@@ -528,4 +528,19 @@ rspamd_symcache_composites_foreach(struct rspamd_task *task,
                        dyn_item->finished = true;
                }
        });
+}
+
+gboolean
+rspamd_symcache_process_symbols(struct rspamd_task *task,
+                                                               struct rspamd_symcache *cache,
+                                                               gint stage)
+{
+       auto *real_cache = C_API_SYMCACHE(cache);
+
+       if (task->symcache_runtime == nullptr) {
+               task->symcache_runtime = rspamd::symcache::symcache_runtime::create(task, *real_cache);
+       }
+
+       auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime);
+       return cache_runtime->process_symbols(task, *real_cache, stage);
 }
\ No newline at end of file
index 1b4d5a509e3c88348131dc39d17f71be92315645..00c0d4d8b4f70a9dc8583a0e5fc49bfb3351d254 100644 (file)
         "symcache", log_tag(), \
         RSPAMD_LOG_FUNC, \
         __VA_ARGS__)
+#define msg_err_cache_task(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
+        "symcache", task->task_pool->tag.uid, \
+        RSPAMD_LOG_FUNC, \
+        __VA_ARGS__)
 #define msg_warn_cache(...)   rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
         "symcache", log_tag(), \
         RSPAMD_LOG_FUNC, \
@@ -367,6 +371,47 @@ public:
                }
        }
 
+       /**
+        * Iterate over all composites using a specific functor
+        * @tparam Functor
+        * @param f
+        */
+       template<typename Functor>
+       auto connfilters_foreach(Functor f) -> bool {
+               return std::all_of(std::begin(connfilters), std::end(connfilters),
+                                                  [&](const auto &sym_it){
+                       return f(sym_it.get());
+               });
+       }
+       template<typename Functor>
+       auto prefilters_foreach(Functor f) -> bool {
+               return std::all_of(std::begin(prefilters), std::end(prefilters),
+                               [&](const auto &sym_it){
+                                       return f(sym_it.get());
+                               });
+       }
+       template<typename Functor>
+       auto postfilters_foreach(Functor f) -> bool {
+               return std::all_of(std::begin(postfilters), std::end(postfilters),
+                               [&](const auto &sym_it){
+                                       return f(sym_it.get());
+                               });
+       }
+       template<typename Functor>
+       auto idempotent_foreach(Functor f) -> bool {
+               return std::all_of(std::begin(idempotent), std::end(idempotent),
+                               [&](const auto &sym_it){
+                                       return f(sym_it.get());
+                               });
+       }
+       template<typename Functor>
+       auto filters_foreach(Functor f) -> bool {
+               return std::all_of(std::begin(filters), std::end(filters),
+                               [&](const auto &sym_it){
+                                       return f(sym_it.get());
+                               });
+       }
+
        /**
         * Resort cache if anything has been changed since last time
         * @return
index a194e9b0d51a34d0848cb9db2bb01e8ddeaddbbb..62297a7da4b73ef596909330ab72760cef3f6dec 100644 (file)
@@ -88,9 +88,9 @@ public:
                conditions.emplace_back(L, cbref);
        }
 
-       auto call() -> void
+       auto call(struct rspamd_task *task, struct rspamd_symcache_item *item) const -> void
        {
-               // TODO
+               func(task, item, user_data);
        }
 
        auto check_conditions(std::string_view sym_name, struct rspamd_task *task) const -> bool {
@@ -320,6 +320,29 @@ public:
                return nullptr;
        }
 
+       /**
+        * Check all conditions for an item
+        * @param task
+        * @return
+        */
+       auto check_conditions(struct rspamd_task *task) const -> auto {
+               if (std::holds_alternative<normal_item>(specific)) {
+                       const auto &filter_data = std::get<normal_item>(specific);
+
+                       return filter_data.check_conditions(symbol, task);
+               }
+
+               return false;
+       }
+
+       auto call(struct rspamd_task *task) const -> void {
+               if (std::holds_alternative<normal_item>(specific)) {
+                       const auto &filter_data = std::get<normal_item>(specific);
+
+                       filter_data.call(task, (struct rspamd_symcache_item *)this);
+               }
+       }
+
 private:
        /**
         * Constructor for a normal symbols with callback
index a6ccd550952b8f05c539239abc3e83f8ae51dd0f..f0284f740cf70f99fa1cb30ef7433f35b3782d08 100644 (file)
@@ -19,6 +19,8 @@
 #include "symcache_runtime.hxx"
 #include "libutil/cxx/util.hxx"
 #include "libserver/task.h"
+#include "libmime/scan_result.h"
+#include <limits>
 
 namespace rspamd::symcache {
 
@@ -30,7 +32,7 @@ constexpr static const auto PROFILE_MESSAGE_SIZE_THRESHOLD = 1024ul * 1024 * 2;
 constexpr static const auto PROFILE_PROBABILITY = 0.01;
 
 auto
-symcache_runtime::create_savepoint(struct rspamd_task *task, symcache &cache) -> symcache_runtime *
+symcache_runtime::create(struct rspamd_task *task, symcache &cache) -> symcache_runtime *
 {
        cache.maybe_resort();
 
@@ -43,7 +45,7 @@ symcache_runtime::create_savepoint(struct rspamd_task *task, symcache &cache) ->
        rspamd_mempool_add_destructor(task->task_pool,
                        symcache_runtime::savepoint_dtor, checkpoint);
 
-       for (auto &pair : checkpoint->last_id_mappings) {
+       for (auto &pair: checkpoint->last_id_mappings) {
                pair.first = -1;
                pair.second = -1;
        }
@@ -274,7 +276,7 @@ auto symcache_runtime::get_dynamic_item(int id, bool save_in_cache) const -> cac
 {
        /* Lookup in cache */
        if (save_in_cache) {
-               for (const auto &cache_id : last_id_mappings) {
+               for (const auto &cache_id: last_id_mappings) {
                        if (cache_id.first == -1) {
                                break;
                        }
@@ -324,5 +326,214 @@ auto symcache_runtime::get_dynamic_item(int id, bool save_in_cache) const -> cac
        return nullptr;
 }
 
+auto symcache_runtime::process_symbols(struct rspamd_task *task, symcache &cache, int stage) -> bool
+{
+       msg_debug_cache_task("symbols processing stage at pass: %d", stage);
+
+       if (RSPAMD_TASK_IS_SKIPPED(task)) {
+               return true;
+       }
+
+       switch (stage) {
+       case RSPAMD_TASK_STAGE_CONNFILTERS:
+       case RSPAMD_TASK_STAGE_PRE_FILTERS:
+       case RSPAMD_TASK_STAGE_POST_FILTERS:
+       case RSPAMD_TASK_STAGE_IDEMPOTENT:
+               return process_pre_postfilters(task, cache,
+                               rspamd_session_events_pending(task->s), stage);
+               break;
+
+       case RSPAMD_TASK_STAGE_FILTERS:
+               return process_filters(task, cache,rspamd_session_events_pending(task->s));
+               break;
+
+       default:
+               g_assert_not_reached ();
+       }
+}
+
+auto
+symcache_runtime::process_pre_postfilters(struct rspamd_task *task,
+                                                                                 symcache &cache,
+                                                                                 int start_events,
+                                                                                 int stage) -> bool
+{
+       auto saved_priority = std::numeric_limits<int>::min();
+       auto all_done = true;
+       auto compare_functor = +[](int a, int b) { return a < b; };
+
+       auto proc_func = [&](cache_item *item) {
+               auto dyn_item = get_dynamic_item(item->id, true);
+
+               if (!dyn_item->started && !dyn_item->finished) {
+                       if (has_slow) {
+                               /* Delay */
+                               has_slow = false;
+
+                               return false;
+                       }
+
+                       if (saved_priority == std::numeric_limits<int>::min()) {
+                               saved_priority = item->priority;
+                       }
+                       else {
+                               if (compare_functor(item->priority, saved_priority) &&
+                                       rspamd_session_events_pending(task->s) > start_events) {
+                                       /*
+                                        * Delay further checks as we have higher
+                                        * priority filters to be processed
+                                        */
+                                       return false;
+                               }
+                       }
+
+                       process_symbol(task, cache, item, dyn_item);
+                       all_done = false;
+               }
+
+               /* Continue processing */
+               return true;
+       };
+
+       switch (stage) {
+       case RSPAMD_TASK_STAGE_CONNFILTERS:
+               all_done = cache.connfilters_foreach(proc_func);
+               break;
+       case RSPAMD_TASK_STAGE_PRE_FILTERS:
+               all_done = cache.prefilters_foreach(proc_func);
+               break;
+       case RSPAMD_TASK_STAGE_POST_FILTERS:
+               compare_functor = +[](int a, int b) { return a > b; };
+               all_done = cache.postfilters_foreach(proc_func);
+               break;
+       case RSPAMD_TASK_STAGE_IDEMPOTENT:
+               compare_functor = +[](int a, int b) { return a > b; };
+               all_done = cache.idempotent_foreach(proc_func);
+               break;
+       default:
+               g_error("invalid invocation");
+               break;
+       }
+
+       return all_done;
+}
+
+auto
+symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool
+{
+       auto all_done = true;
+
+       cache.filters_foreach([&](cache_item *item) -> bool {
+               if (item->type == symcache_item_type::CLASSIFIER) {
+                       return true;
+               }
+
+               auto dyn_item = get_dynamic_item(item->id, true);
+
+               if (!dyn_item->started && !dyn_item->finished) {
+                       all_done = false;
+
+                       if (!rspamd_symcache_check_deps(task, cache, item,
+                                       checkpoint, 0, FALSE)) {
+                               msg_debug_cache_task ("blocked execution of %d(%s) unless deps are "
+                                                                         "resolved", item->id, item->symbol.c_str());
+
+                               return true;
+                       }
+
+                       process_symbol(task, cache, item, dyn_item);
+
+                       if (has_slow) {
+                               /* Delay */
+                               has_slow = false;
+
+                               return false;
+                       }
+               }
+
+               if (!(item->flags & SYMBOL_TYPE_FINE)) {
+                       if (rspamd_symcache_metric_limit(task, checkpoint)) {
+                               msg_info_task ("task has already scored more than %.2f, so do "
+                                                          "not "
+                                                          "plan more checks",
+                                               rs->score);
+                               all_done = true;
+                               return false;
+                       }
+               }
+       });
+
+       return all_done;
+}
+
+auto
+symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, cache_item *item,
+                                                                cache_dynamic_item *dyn_item) -> bool
+{
+       if (item->type == symcache_item_type::CLASSIFIER || item->type == symcache_item_type::COMPOSITE) {
+               /* Classifiers are special :( */
+               return true;
+       }
+
+       if (rspamd_session_blocked(task->s)) {
+               /*
+                * We cannot add new events as session is either destroyed or
+                * being cleaned up.
+                */
+               return true;
+       }
+
+       g_assert (!item->is_virtual());
+       if (dyn_item->started) {
+               /*
+                * This can actually happen when deps span over different layers
+                */
+               return dyn_item->finished;
+       }
+
+       /* Check has been started */
+       dyn_item->started = true;
+       auto check = true;
+
+       if (!item->is_allowed(task, true) || !item->check_conditions(task)) {
+               check = false;
+       }
+
+       if (check) {
+               msg_debug_cache_task("execute %s, %d; symbol type = %s", item->symbol.data(),
+                               item->id);
+
+               if (profile) {
+                       ev_now_update_if_cheap(task->event_loop);
+                       dyn_item->start_msec = (ev_now(task->event_loop) -
+                                                                       profile_start) * 1e3;
+               }
+
+               dyn_item->async_events = 0;
+               cur_item = item;
+               items_inflight++;
+               /* Callback now must finalize itself */
+               item->call(task);
+               cur_item = NULL;
+
+               if (items_inflight == 0) {
+                       return true;
+               }
+
+               if (dyn_item->async_events == 0 && !dyn_item->finished) {
+                       msg_err_cache_task("critical error: item %s has no async events pending, "
+                                                          "but it is not finalised", item->symbol.data());
+                       g_assert_not_reached ();
+               }
+
+               return false;
+       }
+       else {
+               dyn_item->finished = true;
+       }
+
+       return true;
+}
+
 }
 
index a505f22e8e0000af3ad570076300e82a34da1990..837b1eee2ec34d23da5ecc6a2027c3bf895e209b 100644 (file)
@@ -27,6 +27,8 @@
 
 #include "symcache_internal.hxx"
 
+struct rspamd_scan_result;
+
 namespace rspamd::symcache {
 /**
  * These items are saved within task structure and are used to track
@@ -52,7 +54,7 @@ class symcache_runtime {
        double profile_start;
        double lim;
 
-       struct rspamd_scan_result *rs;
+       struct ::rspamd_scan_result *rs;
 
        struct cache_item *cur_item;
        order_generation_ptr order;
@@ -69,6 +71,13 @@ class symcache_runtime {
                /* Drop shared ownership */
                real_savepoint->order.reset();
        }
+
+       auto process_symbol(struct rspamd_task *task, symcache &cache, cache_item *item,
+                       cache_dynamic_item *dyn_item) -> bool;
+       /* Specific stages of the processing */
+       auto process_pre_postfilters(struct rspamd_task *task, symcache &cache, int start_events, int stage) -> bool;
+       auto process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool;
+
 public:
        /**
         * Creates a cache runtime using task mempool
@@ -76,7 +85,7 @@ public:
         * @param cache
         * @return
         */
-       static auto create_savepoint(struct rspamd_task *task, symcache &cache) -> symcache_runtime *;
+       static auto create(struct rspamd_task *task, symcache &cache) -> symcache_runtime *;
        /**
         * Process task settings
         * @param task
@@ -155,6 +164,15 @@ public:
         * @return
         */
        auto get_dynamic_item(int id, bool save_in_cache) const -> cache_dynamic_item *;
+
+       /**
+        * Process symbols in the cache
+        * @param task
+        * @param cache
+        * @param stage
+        * @return
+        */
+       auto process_symbols(struct rspamd_task *task, symcache &cache, int stage) -> bool;
 };