diff options
author | Vsevolod Stakhov <vsevolod@rspamd.com> | 2024-06-21 16:14:56 +0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-21 16:14:56 +0600 |
commit | 90b73439d20d6e5b9b9e61cecbaa9809d7d0ddcd (patch) | |
tree | 555a09d3b715d3cf1f6e0405e6d360d610b3c7fc /src | |
parent | b44099c96e279a0d60b0688e84e0ef6293194c59 (diff) | |
parent | b2df28925f5ffc785969a973e0bfb5c5670d9d2b (diff) | |
download | rspamd-90b73439d20d6e5b9b9e61cecbaa9809d7d0ddcd.tar.gz rspamd-90b73439d20d6e5b9b9e61cecbaa9809d7d0ddcd.zip |
Merge pull request #5020 from rspamd/vstakhov-slow-timer
[Rework] Rething slow timer
Diffstat (limited to 'src')
-rw-r--r-- | src/libserver/composites/composites.cxx | 6 | ||||
-rw-r--r-- | src/libserver/composites/composites_internal.hxx | 6 | ||||
-rw-r--r-- | src/libserver/html/html_tag_defs.hxx | 6 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_c.cxx | 17 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_impl.cxx | 111 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_internal.hxx | 4 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_item.cxx | 70 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_item.hxx | 39 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_runtime.cxx | 222 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_runtime.hxx | 30 |
10 files changed, 319 insertions, 192 deletions
diff --git a/src/libserver/composites/composites.cxx b/src/libserver/composites/composites.cxx index 64f8f2daf..0e7adfb8a 100644 --- a/src/libserver/composites/composites.cxx +++ b/src/libserver/composites/composites.cxx @@ -1,11 +1,11 @@ -/*- - * Copyright 2021 Vsevolod Stakhov +/* + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/src/libserver/composites/composites_internal.hxx b/src/libserver/composites/composites_internal.hxx index 55aaa2ee1..2ae4219eb 100644 --- a/src/libserver/composites/composites_internal.hxx +++ b/src/libserver/composites/composites_internal.hxx @@ -1,11 +1,11 @@ -/*- - * Copyright 2021 Vsevolod Stakhov +/* + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/src/libserver/html/html_tag_defs.hxx b/src/libserver/html/html_tag_defs.hxx index 4cff79855..05a5e3cde 100644 --- a/src/libserver/html/html_tag_defs.hxx +++ b/src/libserver/html/html_tag_defs.hxx @@ -1,11 +1,11 @@ -/*- - * Copyright 2021 Vsevolod Stakhov +/* + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx index 1ffcd9ceb..047fc1181 100644 --- a/src/libserver/symcache/symcache_c.cxx +++ b/src/libserver/symcache/symcache_c.cxx @@ -1,5 +1,5 @@ /* - * Copyright 2023 Vsevolod Stakhov + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -602,8 +602,15 @@ unsigned int rspamd_symcache_item_async_inc_full(struct rspamd_task *task, "subsystem %s (%s)", static_item->symbol.c_str(), static_item->id, real_dyn_item->async_events, subsystem, loc); + auto nevents = ++real_dyn_item->async_events; - return ++real_dyn_item->async_events; + if (nevents > 1) { + /* Item is async */ + static_item->internal_flags &= ~rspamd::symcache::cache_item::bit_sync; + real_dyn_item->status = rspamd::symcache::cache_item_status::pending; + } + + return nevents; } unsigned int rspamd_symcache_item_async_dec_full(struct rspamd_task *task, @@ -673,10 +680,10 @@ void rspamd_symcache_composites_foreach(struct rspamd_task *task, real_cache->composites_foreach([&](const auto *item) { auto *dyn_item = cache_runtime->get_dynamic_item(item->id); - if (dyn_item && !dyn_item->started) { + if (dyn_item && dyn_item->status == rspamd::symcache::cache_item_status::not_started) { auto *old_item = cache_runtime->set_cur_item(dyn_item); func((void *) item->get_name().c_str(), item->get_cbdata(), fd); - dyn_item->finished = true; + dyn_item->status = rspamd::symcache::cache_item_status::finished; cache_runtime->set_cur_item(old_item); } }); @@ -711,5 +718,5 @@ void rspamd_symcache_finalize_item(struct rspamd_task *task, void rspamd_symcache_runtime_destroy(struct rspamd_task *task) { auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); - cache_runtime->savepoint_dtor(); + cache_runtime->savepoint_dtor(task); }
\ No newline at end of file diff --git a/src/libserver/symcache/symcache_impl.cxx b/src/libserver/symcache/symcache_impl.cxx index 869e025b3..4ea087024 100644 --- a/src/libserver/symcache/symcache_impl.cxx +++ b/src/libserver/symcache/symcache_impl.cxx @@ -112,29 +112,39 @@ auto symcache::init() -> bool /* Deal with the delayed dependencies */ msg_debug_cache("resolving delayed dependencies: %d in list", (int) delayed_deps->size()); for (const auto &delayed_dep: *delayed_deps) { - auto virt_item = get_item_by_name(delayed_dep.from, false); - auto real_item = get_item_by_name(delayed_dep.from, true); - - if (virt_item == nullptr || real_item == nullptr) { - msg_err_cache("cannot register delayed dependency between %s and %s: " - "%s is missing", - delayed_dep.from.data(), - delayed_dep.to.data(), delayed_dep.from.data()); + auto virt_source = get_item_by_name(delayed_dep.from, false); + auto real_source = get_item_by_name(delayed_dep.from, true); + + auto real_destination = get_item_by_name(delayed_dep.to, true); + + if (virt_source == nullptr || real_source == nullptr || real_destination == nullptr) { + if (real_destination != nullptr) { + msg_err_cache("cannot register delayed dependency %s -> %s: " + "source %s is missing", + delayed_dep.from.data(), + delayed_dep.to.data(), delayed_dep.from.data()); + } + else { + msg_err_cache("cannot register delayed dependency %s -> %s: " + "destionation %s is missing", + delayed_dep.from.data(), + delayed_dep.to.data(), delayed_dep.to.data()); + } } else { - if (!disabled_ids.contains(real_item->id)) { + if (!disabled_ids.contains(real_source->id)) { msg_debug_cache("delayed between %s(%d:%d) -> %s", delayed_dep.from.data(), - real_item->id, virt_item->id, + real_source->id, virt_source->id, delayed_dep.to.data()); - add_dependency(real_item->id, delayed_dep.to, - virt_item != real_item ? virt_item->id : -1); + add_dependency(real_source->id, delayed_dep.to, real_destination->id, + virt_source != real_source ? virt_source->id : -1); } else { msg_debug_cache("no delayed between %s(%d:%d) -> %s; %s is disabled", delayed_dep.from.data(), - real_item->id, virt_item->id, + real_source->id, virt_source->id, delayed_dep.to.data(), delayed_dep.from.data()); } @@ -529,16 +539,27 @@ auto symcache::get_item_by_name_mut(std::string_view name, bool resolve_parent) return it->second; } -auto symcache::add_dependency(int id_from, std::string_view to, int virtual_id_from) -> void +auto symcache::add_dependency(int id_from, std::string_view to, int id_to, int virtual_id_from) -> void { g_assert(id_from >= 0 && id_from < (int) items_by_id.size()); + g_assert(id_to >= 0 && id_to < (int) items_by_id.size()); const auto &source = items_by_id[id_from]; + const auto &dest = items_by_id[id_to]; g_assert(source.get() != nullptr); - - source->deps.emplace_back(nullptr, - std::string(to), - id_from, - -1); + g_assert(dest.get() != nullptr); + + if (!source->deps.contains(id_to)) { + msg_debug_cache("add dependency %s(%d) -> %s(%d)", + source->symbol.c_str(), source->id, to.data(), dest->id); + source->deps.emplace(id_to, cache_dependency{dest.get(), + std::string(to), + -1}); + } + else { + msg_debug_cache("duplicate dependency %s -> %s", + source->symbol.c_str(), to.data()); + return; + } if (virtual_id_from >= 0) { @@ -546,10 +567,18 @@ auto symcache::add_dependency(int id_from, std::string_view to, int virtual_id_f /* We need that for settings id propagation */ const auto &vsource = items_by_id[virtual_id_from]; g_assert(vsource.get() != nullptr); - vsource->deps.emplace_back(nullptr, - std::string(to), - -1, - virtual_id_from); + + if (!vsource->deps.contains(id_to)) { + msg_debug_cache("add virtual dependency %s -> %s", + vsource->symbol.c_str(), to.data()); + vsource->deps.emplace(id_to, cache_dependency{dest.get(), + std::string(to), + virtual_id_from}); + } + else { + msg_debug_cache("duplicate virtual dependency %s -> %s", + vsource->symbol.c_str(), to.data()); + } } } @@ -625,7 +654,7 @@ auto symcache::resort() -> void tsort_mark(it, tsort_mask::TEMP); msg_debug_cache_lambda("visiting node: %s (%d)", it->symbol.c_str(), cur_order); - for (const auto &dep: it->deps) { + for (const auto &[id, dep]: it->deps) { msg_debug_cache_lambda("visiting dep: %s (%d)", dep.item->symbol.c_str(), cur_order + 1); rec(dep.item, cur_order + 1, rec); } @@ -638,6 +667,7 @@ auto symcache::resort() -> void */ total_hits = 0; auto used_items = ord->d.size(); + msg_debug_cache("topologically sort %d filters", used_items); for (const auto &it: ord->d) { if (it->order == 0) { @@ -696,7 +726,8 @@ auto symcache::resort() -> void * We enrich ord with all other symbol types without any sorting, * as it is done in another place */ - constexpr auto append_items_vec = [](const auto &vec, auto &out) { + const auto append_items_vec = [&](const auto &vec, auto &out, const char *what) { + msg_debug_cache_lambda("append %d items; type = %s", (int) vec.size(), what); for (const auto &it: vec) { if (it) { out.emplace_back(it->getptr()); @@ -704,12 +735,12 @@ auto symcache::resort() -> void } }; - append_items_vec(connfilters, ord->d); - append_items_vec(prefilters, ord->d); - append_items_vec(postfilters, ord->d); - append_items_vec(idempotent, ord->d); - append_items_vec(composites, ord->d); - append_items_vec(classifiers, ord->d); + append_items_vec(connfilters, ord->d, "connection filters"); + append_items_vec(prefilters, ord->d, "prefilters"); + append_items_vec(postfilters, ord->d, "postfilters"); + append_items_vec(idempotent, ord->d, "idempotent filters"); + append_items_vec(composites, ord->d, "composites"); + append_items_vec(classifiers, ord->d, "classifiers"); /* After sorting is done, we can assign all elements in the by_symbol hash */ for (const auto [i, it]: rspamd::enumerate(ord->d)) { @@ -856,8 +887,7 @@ auto symcache::validate(bool strict) -> bool for (auto &pair: items_by_symbol) { auto &item = pair.second; - auto ghost = item->st->weight == 0; - auto skipped = !ghost; + auto skipped = item->st->weight != 0; if (item->is_scoreable() && g_hash_table_lookup(cfg->symbols, item->symbol.c_str()) == nullptr) { if (!std::isnan(cfg->unknown_weight)) { @@ -871,18 +901,19 @@ auto symcache::validate(bool strict) -> bool msg_info_cache("adding unknown symbol %s with weight: %.2f", item->symbol.c_str(), cfg->unknown_weight); - ghost = false; skipped = false; } else { + /* No `unknown weight`, no static score, and no dynamic score */ skipped = true; } } else { + /* We have a score, so we are not skipped */ skipped = false; } - if (!ghost && skipped) { + if (skipped) { if (!(item->flags & SYMBOL_TYPE_SKIPPED)) { item->flags |= SYMBOL_TYPE_SKIPPED; msg_warn_cache("symbol %s has no score registered, skip its check", @@ -890,12 +921,6 @@ auto symcache::validate(bool strict) -> bool } } - if (ghost) { - msg_debug_cache("symbol %s is registered as ghost symbol, it won't be inserted " - "to any metric", - item->symbol.c_str()); - } - if (item->st->weight < 0 && item->priority == 0) { item->priority++; } @@ -953,7 +978,7 @@ auto symcache::validate(bool strict) -> bool auto item = get_item_by_name_mut((const char *) k, false); if (item) { - item->enabled = FALSE; + item->internal_flags &= ~cache_item::bit_enabled; } } } @@ -1215,7 +1240,7 @@ auto symcache::get_max_timeout(std::vector<std::pair<double, const cache_item *> auto own_timeout = get_item_timeout(it); auto max_child_timeout = 0.0; - for (const auto &dep: it->deps) { + for (const auto &[id, dep]: it->deps) { auto cld_timeout = self(dep.item, self); if (cld_timeout > max_child_timeout) { diff --git a/src/libserver/symcache/symcache_internal.hxx b/src/libserver/symcache/symcache_internal.hxx index 255a4b1c1..c7dda51d1 100644 --- a/src/libserver/symcache/symcache_internal.hxx +++ b/src/libserver/symcache/symcache_internal.hxx @@ -1,5 +1,5 @@ /* - * Copyright 2023 Vsevolod Stakhov + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -365,7 +365,7 @@ public: * @param virtual_id_from * @return */ - auto add_dependency(int id_from, std::string_view to, int virtual_id_from) -> void; + auto add_dependency(int id_from, std::string_view to, int id_to, int virtual_id_from) -> void; /** * Add a delayed dependency between symbols that will be resolved on the init stage diff --git a/src/libserver/symcache/symcache_item.cxx b/src/libserver/symcache/symcache_item.cxx index ac901f5cf..490a87880 100644 --- a/src/libserver/symcache/symcache_item.cxx +++ b/src/libserver/symcache/symcache_item.cxx @@ -1,5 +1,5 @@ /* - * Copyright 2023 Vsevolod Stakhov + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,11 +78,11 @@ auto cache_item::process_deps(const symcache &cache) -> void /* Allow logging macros to work */ auto log_tag = [&]() { return cache.log_tag(); }; - for (auto &dep: deps) { + for (auto &[_id, dep]: deps) { msg_debug_cache("process real dependency %s on %s", symbol.c_str(), dep.sym.c_str()); auto *dit = cache.get_item_by_name_mut(dep.sym, true); - if (dep.vid >= 0) { + if (dep.virtual_source_id >= 0) { /* Case of the virtual symbol that depends on another (maybe virtual) symbol */ const auto *vdit = cache.get_item_by_name(dep.sym, false); @@ -94,7 +94,7 @@ auto cache_item::process_deps(const symcache &cache) -> void } else { msg_debug_cache("process virtual dependency %s(%d) on %s(%d)", symbol.c_str(), - dep.vid, vdit->symbol.c_str(), vdit->id); + dep.virtual_source_id, vdit->symbol.c_str(), vdit->id); unsigned nids = 0; @@ -148,6 +148,8 @@ auto cache_item::process_deps(const symcache &cache) -> void continue; } + + dep.item = dit; } else { if (dit->id == id) { @@ -161,36 +163,54 @@ auto cache_item::process_deps(const symcache &cache) -> void auto *parent = get_parent_mut(cache); if (parent) { - dit->rdeps.emplace_back(parent, parent->symbol, parent->id, -1); + if (!dit->rdeps.contains(parent->id)) { + dit->rdeps.emplace(parent->id, cache_dependency{parent, parent->symbol, -1}); + msg_debug_cache("added reverse dependency from %d on %d", parent->id, + dit->id); + } + else { + msg_debug_cache("reverse dependency from %d on %d already exists", + parent->id, dit->id); + } dep.item = dit; - dep.id = dit->id; - - msg_debug_cache("added reverse dependency from %d on %d", parent->id, - dit->id); + } + else { + msg_err_cache("cannot find parent for virtual symbol %s, when resolving dependency %s", + symbol.c_str(), dep.sym.c_str()); } } else { dep.item = dit; - dep.id = dit->id; - dit->rdeps.emplace_back(this, symbol, id, -1); - msg_debug_cache("added reverse dependency from %d on %d", id, - dit->id); + if (!dit->rdeps.contains(id)) { + dit->rdeps.emplace(id, cache_dependency{this, symbol, -1}); + msg_debug_cache("added reverse dependency from %d on %d", id, + dit->id); + } + else { + msg_debug_cache("reverse dependency from %d on %d already exists", + id, dit->id); + } } } } } - else if (dep.id >= 0) { - msg_err_cache("cannot find dependency on symbol %s for symbol %s", + else { + msg_err_cache("cannot find dependency named %s for symbol %s", dep.sym.c_str(), symbol.c_str()); - - continue; } } // Remove empty deps - deps.erase(std::remove_if(std::begin(deps), std::end(deps), - [](const auto &dep) { return !dep.item; }), - std::end(deps)); + for (auto it = deps.begin(); it != deps.end();) { + if (it->second.item == nullptr) { + msg_info_cache("remove empty dependency on %s for symbol %s", + it->second.sym.c_str(), symbol.c_str()); + it = deps.erase(it); + } + else { + ++it; + } + } } auto cache_item::resolve_parent(const symcache &cache) -> bool @@ -200,12 +220,6 @@ auto cache_item::resolve_parent(const symcache &cache) -> bool if (is_virtual()) { auto &virt = std::get<virtual_item>(specific); - if (virt.get_parent(cache)) { - msg_debug_cache("trying to resolve parent twice for %s", symbol.c_str()); - - return false; - } - return virt.resolve_parent(cache); } else { @@ -320,11 +334,11 @@ auto cache_item::is_allowed(struct rspamd_task *task, bool exec_only) const -> b } /* Static checks */ - if (!enabled || + if (!(internal_flags & cache_item::bit_enabled) || (RSPAMD_TASK_IS_EMPTY(task) && !(flags & SYMBOL_TYPE_EMPTY)) || (flags & SYMBOL_TYPE_MIME_ONLY && !RSPAMD_TASK_IS_MIME(task))) { - if (!enabled) { + if (!(internal_flags & cache_item::bit_enabled)) { msg_debug_cache_task("skipping %s of %s as it is permanently disabled", what, symbol.c_str()); diff --git a/src/libserver/symcache/symcache_item.hxx b/src/libserver/symcache/symcache_item.hxx index a60213a61..8ed973a82 100644 --- a/src/libserver/symcache/symcache_item.hxx +++ b/src/libserver/symcache/symcache_item.hxx @@ -1,5 +1,5 @@ /* - * Copyright 2023 Vsevolod Stakhov + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -177,14 +177,13 @@ public: }; struct cache_dependency { - cache_item *item; /* Real dependency */ - std::string sym; /* Symbolic dep name */ - int id; /* Real from */ - int vid; /* Virtual from */ + cache_item *item; /* Real dependency */ + std::string sym; /* Symbolic dep name */ + int virtual_source_id; /* Virtual source */ public: /* Default piecewise constructor */ - explicit cache_dependency(cache_item *_item, std::string _sym, int _id, int _vid) - : item(_item), sym(std::move(_sym)), id(_id), vid(_vid) + explicit cache_dependency(cache_item *_item, std::string _sym, int _vid) + : item(_item), sym(std::move(_sym)), virtual_source_id(_vid) { } }; @@ -215,15 +214,18 @@ struct cache_item : std::enable_shared_from_this<cache_item> { struct rspamd_symcache_item_stat *st = nullptr; struct rspamd_counter_data *cd = nullptr; + std::string symbol; + /* Unique id - counter */ int id; std::uint64_t last_count = 0; - std::string symbol; symcache_item_type type; int flags; - /* Condition of execution */ - bool enabled = true; + static constexpr const auto bit_enabled = 0b0001; + static constexpr const auto bit_sync = 0b0010; + static constexpr const auto bit_slow = 0b0100; + int internal_flags = bit_enabled | bit_sync; /* Priority */ int priority = 0; @@ -246,9 +248,9 @@ struct cache_item : std::enable_shared_from_this<cache_item> { augmentations; /* Dependencies */ - std::vector<cache_dependency> deps; + ankerl::unordered_dense::map<int, cache_dependency> deps; /* Reverse dependencies */ - std::vector<cache_dependency> rdeps; + ankerl::unordered_dense::map<int, cache_dependency> rdeps; public: /** @@ -425,13 +427,16 @@ public: return false; } - auto call(struct rspamd_task *task, cache_dynamic_item *dyn_item) const -> void + auto call(struct rspamd_task *task, cache_dynamic_item *dyn_item) const -> bool { if (std::holds_alternative<normal_item>(specific)) { const auto &filter_data = std::get<normal_item>(specific); filter_data.call(task, (struct rspamd_symcache_dynamic_item *) dyn_item); + return true; } + + return false; } /** @@ -512,8 +517,8 @@ private: void *user_data, symcache_item_type _type, int _flags) - : id(_id), - symbol(std::move(name)), + : symbol(std::move(name)), + id(_id), type(_type), flags(_flags), priority(_priority), @@ -541,8 +546,8 @@ private: int parent, symcache_item_type _type, int _flags) - : id(_id), - symbol(std::move(name)), + : symbol(std::move(name)), + id(_id), type(_type), flags(_flags), specific(virtual_item{parent}) diff --git a/src/libserver/symcache/symcache_runtime.cxx b/src/libserver/symcache/symcache_runtime.cxx index 15f970d8c..dc7066b32 100644 --- a/src/libserver/symcache/symcache_runtime.cxx +++ b/src/libserver/symcache/symcache_runtime.cxx @@ -1,5 +1,5 @@ /* - * Copyright 2023 Vsevolod Stakhov + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,19 +38,26 @@ auto symcache_runtime::create(struct rspamd_task *task, symcache &cache) -> symc { cache.maybe_resort(); - auto &&cur_order = cache.get_cache_order(); + auto cur_order = cache.get_cache_order(); + auto allocated_size = sizeof(symcache_runtime) + + sizeof(struct cache_dynamic_item) * cur_order->size(); auto *checkpoint = (symcache_runtime *) rspamd_mempool_alloc0(task->task_pool, - sizeof(symcache_runtime) + - sizeof(struct cache_dynamic_item) * cur_order->size()); - - checkpoint->order = cache.get_cache_order(); - + allocated_size); + msg_debug_cache_task("create symcache runtime for task: %d bytes, %d items", (int) allocated_size, (int) cur_order->size()); + checkpoint->order = std::move(cur_order); + checkpoint->slow_status = slow_status::none; /* Calculate profile probability */ ev_now_update_if_cheap(task->event_loop); ev_tstamp now = ev_now(task->event_loop); checkpoint->profile_start = now; checkpoint->lim = rspamd_task_get_required_score(task, task->result); + /* + * We enable profiling if the following conditions are met: + * - we have not profiled for a long time + * - message is large + * - random probability + */ if ((cache.get_last_profile() == 0.0 || now > cache.get_last_profile() + PROFILE_MAX_TIME) || (task->msg.len >= PROFILE_MESSAGE_SIZE_THRESHOLD) || (rspamd_random_double_fast() >= (1 - PROFILE_PROBABILITY))) { @@ -154,14 +161,20 @@ auto symcache_runtime::process_settings(struct rspamd_task *task, const symcache return false; } +auto symcache_runtime::savepoint_dtor(struct rspamd_task *task) -> void +{ + msg_debug_cache_task("destroying savepoint"); + /* Drop shared ownership */ + order.reset(); +} + auto symcache_runtime::disable_all_symbols(int skip_mask) -> void { for (auto [i, item]: rspamd::enumerate(order->d)) { auto *dyn_item = &dynamic_items[i]; if (!(item->get_flags() & skip_mask)) { - dyn_item->finished = true; - dyn_item->started = true; + dyn_item->status = cache_item_status::finished; } } } @@ -175,8 +188,7 @@ auto symcache_runtime::disable_symbol(struct rspamd_task *task, const symcache & auto *dyn_item = get_dynamic_item(item->id); if (dyn_item) { - dyn_item->finished = true; - dyn_item->started = true; + dyn_item->status = cache_item_status::finished; msg_debug_cache_task("disable execution of %s", name.data()); return true; @@ -201,8 +213,7 @@ auto symcache_runtime::enable_symbol(struct rspamd_task *task, const symcache &c auto *dyn_item = get_dynamic_item(item->id); if (dyn_item) { - dyn_item->finished = false; - dyn_item->started = false; + dyn_item->status = cache_item_status::not_started; msg_debug_cache_task("enable execution of %s", name.data()); return true; @@ -227,7 +238,7 @@ auto symcache_runtime::is_symbol_checked(const symcache &cache, std::string_view auto *dyn_item = get_dynamic_item(item->id); if (dyn_item) { - return dyn_item->started; + return dyn_item->status != cache_item_status::not_started; } } @@ -247,7 +258,7 @@ auto symcache_runtime::is_symbol_enabled(struct rspamd_task *task, const symcach auto *dyn_item = get_dynamic_item(item->id); if (dyn_item) { - if (dyn_item->started) { + if (dyn_item->status != cache_item_status::not_started) { /* Already started */ return false; } @@ -331,11 +342,8 @@ auto symcache_runtime::process_pre_postfilters(struct rspamd_task *task, auto dyn_item = get_dynamic_item(item->id); - if (!dyn_item->started && !dyn_item->finished) { - if (has_slow) { - /* Delay */ - has_slow = false; - + if (dyn_item->status == cache_item_status::not_started) { + if (slow_status == slow_status::enabled) { return false; } @@ -410,7 +418,7 @@ auto symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache auto dyn_item = &dynamic_items[idx]; - if (!dyn_item->started) { + if (dyn_item->status == cache_item_status::not_started) { all_done = false; if (!check_item_deps(task, cache, item.get(), @@ -424,10 +432,7 @@ auto symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache process_symbol(task, cache, item.get(), dyn_item); - if (has_slow) { - /* Delay */ - has_slow = false; - + if (slow_status == slow_status::enabled) { return false; } } @@ -453,15 +458,16 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, } g_assert(!item->is_virtual()); - if (dyn_item->started) { + if (dyn_item->status != cache_item_status::not_started) { /* * This can actually happen when deps span over different layers */ - return dyn_item->finished; + msg_debug_cache_task("skip already started %s(%d) symbol", item->symbol.c_str(), item->id); + + return dyn_item->status == cache_item_status::finished; } /* Check has been started */ - dyn_item->started = true; auto check = true; if (!item->is_allowed(task, true) || !item->check_conditions(task)) { @@ -469,6 +475,7 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, } if (check) { + dyn_item->status = cache_item_status::started; msg_debug_cache_task("execute %s, %d; symbol type = %s", item->symbol.data(), item->id, item_type_to_str(item->type)); @@ -482,24 +489,43 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, cur_item = dyn_item; items_inflight++; /* Callback now must finalize itself */ - item->call(task, dyn_item); - cur_item = nullptr; - if (items_inflight == 0) { - return true; - } - if (dyn_item->async_events == 0 && !dyn_item->finished) { - msg_err_cache_task("critical error: item %s has no async events pending, " - "but it is not finalised", - item->symbol.data()); - g_assert_not_reached(); - } + if (item->call(task, dyn_item)) { + cur_item = nullptr; - return false; + if (items_inflight == 0) { + msg_debug_cache_task("item %s, %d is now finished (no async events)", item->symbol.data(), + item->id); + dyn_item->status = cache_item_status::finished; + return true; + } + + if (dyn_item->async_events == 0 && dyn_item->status != cache_item_status::finished) { + msg_err_cache_task("critical error: item %s has no async events pending, " + "but it is not finalised", + item->symbol.data()); + g_assert_not_reached(); + } + else if (dyn_item->async_events > 0) { + msg_debug_cache_task("item %s, %d is now pending with %d async events", item->symbol.data(), + item->id, dyn_item->async_events); + } + + return false; + } + else { + /* We were not able to call item, so we assume it is not callable */ + msg_debug_cache_task("cannot call %s, %d; symbol type = %s", item->symbol.data(), + item->id, item_type_to_str(item->type)); + dyn_item->status = cache_item_status::finished; + return true; + } } else { - dyn_item->finished = true; + msg_debug_cache_task("do not check %s, %d", item->symbol.data(), + item->id); + dyn_item->status = cache_item_status::finished; } return true; @@ -551,6 +577,9 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache auto log_func = RSPAMD_LOG_FUNC; auto inner_functor = [&](int recursion, cache_item *item, cache_dynamic_item *dyn_item, auto rec_functor) -> bool { + msg_debug_cache_task_lambda("recursively (%d) check dependencies for %s(%d)", recursion, + item->symbol.c_str(), item->id); + if (recursion > max_recursion) { msg_err_task_lambda("cyclic dependencies: maximum check level %ud exceed when " "checking dependencies for %s", @@ -561,18 +590,18 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache auto ret = true; - for (const auto &dep: item->deps) { + for (const auto &[dest_id, dep]: item->deps) { if (!dep.item) { /* Assume invalid deps as done */ msg_debug_cache_task_lambda("symbol %d(%s) has invalid dependencies on %d(%s)", - item->id, item->symbol.c_str(), dep.id, dep.sym.c_str()); + item->id, item->symbol.c_str(), dest_id, dep.sym.c_str()); continue; } auto *dep_dyn_item = get_dynamic_item(dep.item->id); - if (!dep_dyn_item->finished) { - if (!dep_dyn_item->started) { + if (dep_dyn_item->status != cache_item_status::finished) { + if (dep_dyn_item->status == cache_item_status::not_started) { /* Not started */ if (!check_only) { if (!rec_functor(recursion + 1, @@ -583,7 +612,7 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache ret = false; msg_debug_cache_task_lambda("delayed dependency %d(%s) for " "symbol %d(%s)", - dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + dest_id, dep.sym.c_str(), item->id, item->symbol.c_str()); } else if (!process_symbol(task, cache, dep.item, dep_dyn_item)) { /* Now started, but has events pending */ @@ -591,33 +620,36 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache msg_debug_cache_task_lambda("started check of %d(%s) symbol " "as dep for " "%d(%s)", - dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + dest_id, dep.sym.c_str(), item->id, item->symbol.c_str()); } else { msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is " "already processed", - dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + dest_id, dep.sym.c_str(), item->id, item->symbol.c_str()); } } else { msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) " "cannot be started now", - dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + dest_id, dep.sym.c_str(), item->id, item->symbol.c_str()); ret = false; } } else { /* Started but not finished */ msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is " - "still executing", - dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + "still executing (%d events pending)", + dest_id, dep.sym.c_str(), + item->id, item->symbol.c_str(), + dep_dyn_item->async_events); + g_assert(dep_dyn_item->async_events > 0); ret = false; } } else { msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is already " - "checked", - dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + "finished", + dest_id, dep.sym.c_str(), item->id, item->symbol.c_str()); } } @@ -654,7 +686,7 @@ rspamd_symcache_delayed_item_cb(EV_P_ ev_timer *w, int what) if (cbd->event) { cbd->event = nullptr; - /* Timer will be stopped here */ + /* Timer will be stopped here; `has_slow` is also reset there */ rspamd_session_remove_event(cbd->task->s, rspamd_symcache_delayed_item_fin, cbd); @@ -701,7 +733,7 @@ auto symcache_runtime::finalize_item(struct rspamd_task *task, cache_dynamic_ite } msg_debug_cache_task("process finalize for item %s(%d)", item->symbol.c_str(), item->id); - dyn_item->finished = true; + dyn_item->status = cache_item_status::finished; items_inflight--; cur_item = nullptr; @@ -732,45 +764,81 @@ auto symcache_runtime::finalize_item(struct rspamd_task *task, cache_dynamic_ite } else { /* Just reset as no timer is added */ - has_slow = FALSE; + slow_status = slow_status::none; return false; } return true; }; - if (profile) { + /* Check if we need to profile symbol (always profile when we have seen this item to be slow */ + if (profile || item->flags & cache_item::bit_slow) { ev_now_update_if_cheap(task->event_loop); auto diff = ((ev_now(task->event_loop) - profile_start) * 1e3 - dyn_item->start_msec); + if (G_UNLIKELY(RSPAMD_TASK_IS_PROFILING(task))) { + rspamd_task_profile_set(task, item->symbol.c_str(), diff); + } + + if (rspamd_worker_is_scanner(task->worker)) { + rspamd_set_counter(item->cd, diff); + } + if (diff > slow_diff_limit) { - if (!has_slow) { - has_slow = true; + item->internal_flags |= cache_item::bit_slow; + + if (item->internal_flags & cache_item::bit_sync) { + + /* + * We also need to adjust start timer for all async rules that + * are started before this rule, as this rule could delay them + * on its own. Hence, we need to make some corrections for all + * rules pending + */ + bool need_slow = false; + for (const auto &[i, other_item]: rspamd::enumerate(order->d)) { + auto *other_dyn_item = &dynamic_items[i]; + + if (other_dyn_item->status == cache_item_status::pending && other_dyn_item->start_msec <= dyn_item->start_msec) { + other_dyn_item->start_msec += diff; + + msg_debug_cache_task("slow sync rule %s(%d); adjust start time for pending rule %s(%d) by %.2fms to %dms", + item->symbol.c_str(), item->id, + other_item->symbol.c_str(), + other_item->id, + diff, + (int) other_dyn_item->start_msec); + /* We have something pending, so we need to enable slow timer */ + need_slow = true; + } + } - msg_info_task("slow rule: %s(%d): %.2f ms; enable slow timer delay", - item->symbol.c_str(), item->id, - diff); + if (need_slow && slow_status != slow_status::enabled) { + slow_status = slow_status::enabled; - if (enable_slow_timer()) { - /* Allow network execution */ - return; + msg_info_task("slow synchronous rule: %s(%d): %.2f ms; enable 100ms idle timer to allow other rules to be finished", + item->symbol.c_str(), item->id, + diff); + if (enable_slow_timer()) { + return; + } + } + else { + msg_info_task("slow synchronous rule: %s(%d): %.2f ms; idle timer has already been activated for this scan", + item->symbol.c_str(), item->id, + diff); } } else { - msg_info_task("slow rule: %s(%d): %.2f ms", - item->symbol.c_str(), item->id, - diff); + msg_notice_task("slow asynchronous rule: %s(%d): %.2f ms; no idle timer is needed", + item->symbol.c_str(), item->id, + diff); } } - - if (G_UNLIKELY(RSPAMD_TASK_IS_PROFILING(task))) { - rspamd_task_profile_set(task, item->symbol.c_str(), diff); - } - - if (rspamd_worker_is_scanner(task->worker)) { - rspamd_set_counter(item->cd, diff); + else { + item->internal_flags &= ~cache_item::bit_slow; } } @@ -786,10 +854,10 @@ auto symcache_runtime::process_item_rdeps(struct rspamd_task *task, cache_item * return; } - for (const auto &rdep: item->rdeps) { + for (const auto &[id, rdep]: item->rdeps.values()) { if (rdep.item) { auto *dyn_item = get_dynamic_item(rdep.item->id); - if (!dyn_item->started) { + if (dyn_item->status == cache_item_status::not_started) { msg_debug_cache_task("check item %d(%s) rdep of %s ", rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str()); diff --git a/src/libserver/symcache/symcache_runtime.hxx b/src/libserver/symcache/symcache_runtime.hxx index aa8f66c0f..7e4a41269 100644 --- a/src/libserver/symcache/symcache_runtime.hxx +++ b/src/libserver/symcache/symcache_runtime.hxx @@ -1,5 +1,5 @@ /* - * Copyright 2023 Vsevolod Stakhov + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,12 @@ struct rspamd_scan_result; namespace rspamd::symcache { +enum class cache_item_status : std::uint16_t { + not_started = 0, + started = 1, + pending = 2, + finished = 3, +}; /** * These items are saved within task structure and are used to track * symbols execution. @@ -38,18 +44,23 @@ namespace rspamd::symcache { */ struct cache_dynamic_item { std::uint16_t start_msec; /* Relative to task time */ - bool started; - bool finished; + cache_item_status status; std::uint32_t async_events; }; static_assert(sizeof(cache_dynamic_item) == sizeof(std::uint64_t)); static_assert(std::is_trivial_v<cache_dynamic_item>); + class symcache_runtime { unsigned items_inflight; + + enum class slow_status : std::uint8_t { + none = 0, + enabled = 1, + disabled = 2, + } slow_status; bool profile; - bool has_slow; double profile_start; double lim; @@ -72,12 +83,7 @@ class symcache_runtime { public: /* Dropper for a shared ownership */ - auto savepoint_dtor() -> void - { - - /* Drop shared ownership */ - order.reset(); - } + auto savepoint_dtor(struct rspamd_task *task) -> void; /** * Creates a cache runtime using task mempool * @param task @@ -199,7 +205,9 @@ public: /* XXX: a helper to allow hiding internal implementation of the slow timer structure */ auto unset_slow() -> void { - has_slow = false; + if (slow_status == slow_status::enabled) { + slow_status = slow_status::disabled; + } } }; |