]> source.dussan.org Git - rspamd.git/commitdiff
[Rework] Use explicit item status
authorVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 18 Jun 2024 13:48:37 +0000 (14:48 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Tue, 18 Jun 2024 13:48:37 +0000 (14:48 +0100)
src/libserver/symcache/symcache_c.cxx
src/libserver/symcache/symcache_impl.cxx
src/libserver/symcache/symcache_item.cxx
src/libserver/symcache/symcache_item.hxx
src/libserver/symcache/symcache_runtime.cxx
src/libserver/symcache/symcache_runtime.hxx

index 1ffcd9ceb87eb7c779e278db73b60eac86d98e10..3214aab2e4c2b86ecae51054b8970db2ddfbdb69 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -673,10 +673,10 @@ void rspamd_symcache_composites_foreach(struct rspamd_task *task,
        real_cache->composites_foreach([&](const auto *item) {
                auto *dyn_item = cache_runtime->get_dynamic_item(item->id);
 
-               if (dyn_item && !dyn_item->started) {
+               if (dyn_item && dyn_item->status == rspamd::symcache::cache_item_status::not_started) {
                        auto *old_item = cache_runtime->set_cur_item(dyn_item);
                        func((void *) item->get_name().c_str(), item->get_cbdata(), fd);
-                       dyn_item->finished = true;
+                       dyn_item->status = rspamd::symcache::cache_item_status::finished;
                        cache_runtime->set_cur_item(old_item);
                }
        });
@@ -711,5 +711,5 @@ void rspamd_symcache_finalize_item(struct rspamd_task *task,
 void rspamd_symcache_runtime_destroy(struct rspamd_task *task)
 {
        auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime);
-       cache_runtime->savepoint_dtor();
+       cache_runtime->savepoint_dtor(task);
 }
\ No newline at end of file
index 869e025b38e2636381fe5c45717fac3caff9723f..34d399c5dbd903637cfd2571c52c87b880e89f00 100644 (file)
@@ -638,6 +638,7 @@ auto symcache::resort() -> void
         */
        total_hits = 0;
        auto used_items = ord->d.size();
+       msg_debug_cache("topologically sort %d filters", used_items);
 
        for (const auto &it: ord->d) {
                if (it->order == 0) {
@@ -696,7 +697,8 @@ auto symcache::resort() -> void
         * We enrich ord with all other symbol types without any sorting,
         * as it is done in another place
         */
-       constexpr auto append_items_vec = [](const auto &vec, auto &out) {
+       const auto append_items_vec = [&](const auto &vec, auto &out, const char *what) {
+               msg_debug_cache_lambda("append %d items; type = %s", (int) vec.size(), what);
                for (const auto &it: vec) {
                        if (it) {
                                out.emplace_back(it->getptr());
@@ -704,12 +706,12 @@ auto symcache::resort() -> void
                }
        };
 
-       append_items_vec(connfilters, ord->d);
-       append_items_vec(prefilters, ord->d);
-       append_items_vec(postfilters, ord->d);
-       append_items_vec(idempotent, ord->d);
-       append_items_vec(composites, ord->d);
-       append_items_vec(classifiers, ord->d);
+       append_items_vec(connfilters, ord->d, "connection filters");
+       append_items_vec(prefilters, ord->d, "prefilters");
+       append_items_vec(postfilters, ord->d, "postfilters");
+       append_items_vec(idempotent, ord->d, "idempotent filters");
+       append_items_vec(composites, ord->d, "composites");
+       append_items_vec(classifiers, ord->d, "classifiers");
 
        /* After sorting is done, we can assign all elements in the by_symbol hash */
        for (const auto [i, it]: rspamd::enumerate(ord->d)) {
index ac901f5cf563009fc13cc6828712fd417df48cd8..ca81267c489613c2426e20a1114dddf5ed1800cd 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -200,12 +200,6 @@ auto cache_item::resolve_parent(const symcache &cache) -> bool
        if (is_virtual()) {
                auto &virt = std::get<virtual_item>(specific);
 
-               if (virt.get_parent(cache)) {
-                       msg_debug_cache("trying to resolve parent twice for %s", symbol.c_str());
-
-                       return false;
-               }
-
                return virt.resolve_parent(cache);
        }
        else {
index a60213a610808806eb2e45c16bf280d852b85b0f..95127f850e9753fa7bde83e9fdba54cf85f0f794 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -425,13 +425,16 @@ public:
                return false;
        }
 
-       auto call(struct rspamd_task *task, cache_dynamic_item *dyn_item) const -> void
+       auto call(struct rspamd_task *task, cache_dynamic_item *dyn_item) const -> bool
        {
                if (std::holds_alternative<normal_item>(specific)) {
                        const auto &filter_data = std::get<normal_item>(specific);
 
                        filter_data.call(task, (struct rspamd_symcache_dynamic_item *) dyn_item);
+                       return true;
                }
+
+               return false;
        }
 
        /**
index 741e75183b7a9e929cc7796fcd0bd713cd61ada5..8c2df4696b34e201f0f9cb22b2dcb97664c33043 100644 (file)
@@ -38,13 +38,14 @@ auto symcache_runtime::create(struct rspamd_task *task, symcache &cache) -> symc
 {
        cache.maybe_resort();
 
-       auto &&cur_order = cache.get_cache_order();
+       auto cur_order = cache.get_cache_order();
+       auto allocated_size = sizeof(symcache_runtime) +
+                                                 sizeof(struct cache_dynamic_item) * cur_order->size();
        auto *checkpoint = (symcache_runtime *) rspamd_mempool_alloc0(task->task_pool,
-                                                                                                                                 sizeof(symcache_runtime) +
-                                                                                                                                         sizeof(struct cache_dynamic_item) * cur_order->size());
-
-       checkpoint->order = cache.get_cache_order();
-
+                                                                                                                                 allocated_size);
+       msg_debug_cache_task("create symcache runtime for task: %d bytes, %d items", (int) allocated_size, (int) cur_order->size());
+       checkpoint->order = std::move(cur_order);
+       checkpoint->slow_status = slow_status::none;
        /* Calculate profile probability */
        ev_now_update_if_cheap(task->event_loop);
        ev_tstamp now = ev_now(task->event_loop);
@@ -160,14 +161,20 @@ auto symcache_runtime::process_settings(struct rspamd_task *task, const symcache
        return false;
 }
 
+auto symcache_runtime::savepoint_dtor(struct rspamd_task *task) -> void
+{
+       msg_debug_cache_task("destroying savepoint");
+       /* Drop shared ownership */
+       order.reset();
+}
+
 auto symcache_runtime::disable_all_symbols(int skip_mask) -> void
 {
        for (auto [i, item]: rspamd::enumerate(order->d)) {
                auto *dyn_item = &dynamic_items[i];
 
                if (!(item->get_flags() & skip_mask)) {
-                       dyn_item->finished = true;
-                       dyn_item->started = true;
+                       dyn_item->status = cache_item_status::finished;
                }
        }
 }
@@ -181,8 +188,7 @@ auto symcache_runtime::disable_symbol(struct rspamd_task *task, const symcache &
                auto *dyn_item = get_dynamic_item(item->id);
 
                if (dyn_item) {
-                       dyn_item->finished = true;
-                       dyn_item->started = true;
+                       dyn_item->status = cache_item_status::finished;
                        msg_debug_cache_task("disable execution of %s", name.data());
 
                        return true;
@@ -207,8 +213,7 @@ auto symcache_runtime::enable_symbol(struct rspamd_task *task, const symcache &c
                auto *dyn_item = get_dynamic_item(item->id);
 
                if (dyn_item) {
-                       dyn_item->finished = false;
-                       dyn_item->started = false;
+                       dyn_item->status = cache_item_status::not_started;
                        msg_debug_cache_task("enable execution of %s", name.data());
 
                        return true;
@@ -233,7 +238,7 @@ auto symcache_runtime::is_symbol_checked(const symcache &cache, std::string_view
                auto *dyn_item = get_dynamic_item(item->id);
 
                if (dyn_item) {
-                       return dyn_item->started;
+                       return dyn_item->status != cache_item_status::not_started;
                }
        }
 
@@ -253,7 +258,7 @@ auto symcache_runtime::is_symbol_enabled(struct rspamd_task *task, const symcach
                        auto *dyn_item = get_dynamic_item(item->id);
 
                        if (dyn_item) {
-                               if (dyn_item->started) {
+                               if (dyn_item->status != cache_item_status::not_started) {
                                        /* Already started */
                                        return false;
                                }
@@ -337,7 +342,7 @@ auto symcache_runtime::process_pre_postfilters(struct rspamd_task *task,
 
                auto dyn_item = get_dynamic_item(item->id);
 
-               if (!dyn_item->started && !dyn_item->finished) {
+               if (dyn_item->status == cache_item_status::not_started) {
                        if (slow_status == slow_status::enabled) {
                                return false;
                        }
@@ -413,7 +418,7 @@ auto symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache
 
                auto dyn_item = &dynamic_items[idx];
 
-               if (!dyn_item->started) {
+               if (dyn_item->status == cache_item_status::not_started) {
                        all_done = false;
 
                        if (!check_item_deps(task, cache, item.get(),
@@ -453,15 +458,16 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache,
        }
 
        g_assert(!item->is_virtual());
-       if (dyn_item->started) {
+       if (dyn_item->status != cache_item_status::not_started) {
                /*
                 * This can actually happen when deps span over different layers
                 */
-               return dyn_item->finished;
+               msg_debug_cache_task("skip already started %s(%d) symbol", item->symbol.c_str(), item->id);
+
+               return dyn_item->status == cache_item_status::finished;
        }
 
        /* Check has been started */
-       dyn_item->started = true;
        auto check = true;
 
        if (!item->is_allowed(task, true) || !item->check_conditions(task)) {
@@ -469,6 +475,7 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache,
        }
 
        if (check) {
+               dyn_item->status = cache_item_status::started;
                msg_debug_cache_task("execute %s, %d; symbol type = %s", item->symbol.data(),
                                                         item->id, item_type_to_str(item->type));
 
@@ -482,24 +489,41 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache,
                cur_item = dyn_item;
                items_inflight++;
                /* Callback now must finalize itself */
-               item->call(task, dyn_item);
-               cur_item = nullptr;
+               if (item->call(task, dyn_item)) {
+                       cur_item = nullptr;
 
-               if (items_inflight == 0) {
-                       return true;
-               }
+                       if (items_inflight == 0) {
+                               msg_debug_cache_task("item %s, %d is now finished (no async events)", item->symbol.data(),
+                                                                        item->id);
+                               dyn_item->status = cache_item_status::finished;
+                               return true;
+                       }
 
-               if (dyn_item->async_events == 0 && !dyn_item->finished) {
-                       msg_err_cache_task("critical error: item %s has no async events pending, "
-                                                          "but it is not finalised",
-                                                          item->symbol.data());
-                       g_assert_not_reached();
-               }
+                       if (dyn_item->async_events == 0 && dyn_item->status != cache_item_status::finished) {
+                               msg_err_cache_task("critical error: item %s has no async events pending, "
+                                                                  "but it is not finalised",
+                                                                  item->symbol.data());
+                               g_assert_not_reached();
+                       }
 
-               return false;
+                       msg_debug_cache_task("item %s, %d is now pending", item->symbol.data(),
+                                                                item->id);
+                       dyn_item->status = cache_item_status::pending;
+
+                       return false;
+               }
+               else {
+                       /* We were not able to call item, so we assume it is not callable */
+                       msg_debug_cache_task("cannot call %s, %d; symbol type = %s", item->symbol.data(),
+                                                                item->id, item_type_to_str(item->type));
+                       dyn_item->status = cache_item_status::finished;
+                       return true;
+               }
        }
        else {
-               dyn_item->finished = true;
+               msg_debug_cache_task("do not check %s, %d", item->symbol.data(),
+                                                        item->id);
+               dyn_item->status = cache_item_status::finished;
        }
 
        return true;
@@ -551,6 +575,9 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache
        auto log_func = RSPAMD_LOG_FUNC;
 
        auto inner_functor = [&](int recursion, cache_item *item, cache_dynamic_item *dyn_item, auto rec_functor) -> bool {
+               msg_debug_cache_task_lambda("recursively (%d) check dependencies for %s(%d)", recursion,
+                                                                       item->symbol.c_str(), item->id);
+
                if (recursion > max_recursion) {
                        msg_err_task_lambda("cyclic dependencies: maximum check level %ud exceed when "
                                                                "checking dependencies for %s",
@@ -571,8 +598,8 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache
 
                        auto *dep_dyn_item = get_dynamic_item(dep.item->id);
 
-                       if (!dep_dyn_item->finished) {
-                               if (!dep_dyn_item->started) {
+                       if (dep_dyn_item->status != cache_item_status::finished) {
+                               if (dep_dyn_item->status == cache_item_status::not_started) {
                                        /* Not started */
                                        if (!check_only) {
                                                if (!rec_functor(recursion + 1,
@@ -609,14 +636,17 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache
                                else {
                                        /* Started but not finished */
                                        msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is "
-                                                                                               "still executing",
-                                                                                               dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
+                                                                                               "still executing (%d events pending)",
+                                                                                               dep.id, dep.sym.c_str(),
+                                                                                               item->id, item->symbol.c_str(),
+                                                                                               dep_dyn_item->async_events);
+                                       g_assert(dep_dyn_item->async_events > 0);
                                        ret = false;
                                }
                        }
                        else {
                                msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is already "
-                                                                                       "checked",
+                                                                                       "finished",
                                                                                        dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
                        }
                }
@@ -701,7 +731,7 @@ auto symcache_runtime::finalize_item(struct rspamd_task *task, cache_dynamic_ite
        }
 
        msg_debug_cache_task("process finalize for item %s(%d)", item->symbol.c_str(), item->id);
-       dyn_item->finished = true;
+       dyn_item->status = cache_item_status::finished;
        items_inflight--;
        cur_item = nullptr;
 
@@ -787,7 +817,7 @@ auto symcache_runtime::process_item_rdeps(struct rspamd_task *task, cache_item *
        for (const auto &rdep: item->rdeps) {
                if (rdep.item) {
                        auto *dyn_item = get_dynamic_item(rdep.item->id);
-                       if (!dyn_item->started) {
+                       if (dyn_item->status == cache_item_status::not_started) {
                                msg_debug_cache_task("check item %d(%s) rdep of %s ",
                                                                         rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str());
 
index 181b32880d38f41ae681fcdffb36ec5fafd69ad7..7e4a4126956d5d0e1cf1e5c0627da81526f2ffe2 100644 (file)
 struct rspamd_scan_result;
 
 namespace rspamd::symcache {
+enum class cache_item_status : std::uint16_t {
+       not_started = 0,
+       started = 1,
+       pending = 2,
+       finished = 3,
+};
 /**
  * These items are saved within task structure and are used to track
  * symbols execution.
@@ -38,8 +44,7 @@ namespace rspamd::symcache {
  */
 struct cache_dynamic_item {
        std::uint16_t start_msec; /* Relative to task time */
-       bool started;
-       bool finished;
+       cache_item_status status;
        std::uint32_t async_events;
 };
 
@@ -49,12 +54,12 @@ static_assert(std::is_trivial_v<cache_dynamic_item>);
 
 class symcache_runtime {
        unsigned items_inflight;
+
        enum class slow_status : std::uint8_t {
                none = 0,
                enabled = 1,
                disabled = 2,
        } slow_status;
-
        bool profile;
 
        double profile_start;
@@ -78,12 +83,7 @@ class symcache_runtime {
 
 public:
        /* Dropper for a shared ownership */
-       auto savepoint_dtor() -> void
-       {
-
-               /* Drop shared ownership */
-               order.reset();
-       }
+       auto savepoint_dtor(struct rspamd_task *task) -> void;
        /**
         * Creates a cache runtime using task mempool
         * @param task