From 24a4b03e8a5acd33f72c5747936e6e96650a8992 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 18 Jun 2024 14:48:37 +0100 Subject: [PATCH] [Rework] Use explicit item status --- src/libserver/symcache/symcache_c.cxx | 8 +- src/libserver/symcache/symcache_impl.cxx | 16 +-- src/libserver/symcache/symcache_item.cxx | 8 +- src/libserver/symcache/symcache_item.hxx | 7 +- src/libserver/symcache/symcache_runtime.cxx | 108 +++++++++++++------- src/libserver/symcache/symcache_runtime.hxx | 18 ++-- 6 files changed, 97 insertions(+), 68 deletions(-) diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx index 1ffcd9ceb..3214aab2e 100644 --- a/src/libserver/symcache/symcache_c.cxx +++ b/src/libserver/symcache/symcache_c.cxx @@ -1,5 +1,5 @@ /* - * Copyright 2023 Vsevolod Stakhov + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -673,10 +673,10 @@ void rspamd_symcache_composites_foreach(struct rspamd_task *task, real_cache->composites_foreach([&](const auto *item) { auto *dyn_item = cache_runtime->get_dynamic_item(item->id); - if (dyn_item && !dyn_item->started) { + if (dyn_item && dyn_item->status == rspamd::symcache::cache_item_status::not_started) { auto *old_item = cache_runtime->set_cur_item(dyn_item); func((void *) item->get_name().c_str(), item->get_cbdata(), fd); - dyn_item->finished = true; + dyn_item->status = rspamd::symcache::cache_item_status::finished; cache_runtime->set_cur_item(old_item); } }); @@ -711,5 +711,5 @@ void rspamd_symcache_finalize_item(struct rspamd_task *task, void rspamd_symcache_runtime_destroy(struct rspamd_task *task) { auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); - cache_runtime->savepoint_dtor(); + cache_runtime->savepoint_dtor(task); } \ No newline at end of file diff --git a/src/libserver/symcache/symcache_impl.cxx b/src/libserver/symcache/symcache_impl.cxx index 869e025b3..34d399c5d 100644 --- a/src/libserver/symcache/symcache_impl.cxx +++ b/src/libserver/symcache/symcache_impl.cxx @@ -638,6 +638,7 @@ auto symcache::resort() -> void */ total_hits = 0; auto used_items = ord->d.size(); + msg_debug_cache("topologically sort %d filters", used_items); for (const auto &it: ord->d) { if (it->order == 0) { @@ -696,7 +697,8 @@ auto symcache::resort() -> void * We enrich ord with all other symbol types without any sorting, * as it is done in another place */ - constexpr auto append_items_vec = [](const auto &vec, auto &out) { + const auto append_items_vec = [&](const auto &vec, auto &out, const char *what) { + msg_debug_cache_lambda("append %d items; type = %s", (int) vec.size(), what); for (const auto &it: vec) { if (it) { out.emplace_back(it->getptr()); @@ -704,12 +706,12 @@ auto symcache::resort() -> void } }; - append_items_vec(connfilters, ord->d); - append_items_vec(prefilters, ord->d); - append_items_vec(postfilters, ord->d); - append_items_vec(idempotent, ord->d); - append_items_vec(composites, ord->d); - append_items_vec(classifiers, ord->d); + append_items_vec(connfilters, ord->d, "connection filters"); + append_items_vec(prefilters, ord->d, "prefilters"); + append_items_vec(postfilters, ord->d, "postfilters"); + append_items_vec(idempotent, ord->d, "idempotent filters"); + append_items_vec(composites, ord->d, "composites"); + append_items_vec(classifiers, ord->d, "classifiers"); /* After sorting is done, we can assign all elements in the by_symbol hash */ for (const auto [i, it]: rspamd::enumerate(ord->d)) { diff --git a/src/libserver/symcache/symcache_item.cxx b/src/libserver/symcache/symcache_item.cxx index ac901f5cf..ca81267c4 100644 --- a/src/libserver/symcache/symcache_item.cxx +++ b/src/libserver/symcache/symcache_item.cxx @@ -1,5 +1,5 @@ /* - * Copyright 2023 Vsevolod Stakhov + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -200,12 +200,6 @@ auto cache_item::resolve_parent(const symcache &cache) -> bool if (is_virtual()) { auto &virt = std::get(specific); - if (virt.get_parent(cache)) { - msg_debug_cache("trying to resolve parent twice for %s", symbol.c_str()); - - return false; - } - return virt.resolve_parent(cache); } else { diff --git a/src/libserver/symcache/symcache_item.hxx b/src/libserver/symcache/symcache_item.hxx index a60213a61..95127f850 100644 --- a/src/libserver/symcache/symcache_item.hxx +++ b/src/libserver/symcache/symcache_item.hxx @@ -1,5 +1,5 @@ /* - * Copyright 2023 Vsevolod Stakhov + * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -425,13 +425,16 @@ public: return false; } - auto call(struct rspamd_task *task, cache_dynamic_item *dyn_item) const -> void + auto call(struct rspamd_task *task, cache_dynamic_item *dyn_item) const -> bool { if (std::holds_alternative(specific)) { const auto &filter_data = std::get(specific); filter_data.call(task, (struct rspamd_symcache_dynamic_item *) dyn_item); + return true; } + + return false; } /** diff --git a/src/libserver/symcache/symcache_runtime.cxx b/src/libserver/symcache/symcache_runtime.cxx index 741e75183..8c2df4696 100644 --- a/src/libserver/symcache/symcache_runtime.cxx +++ b/src/libserver/symcache/symcache_runtime.cxx @@ -38,13 +38,14 @@ auto symcache_runtime::create(struct rspamd_task *task, symcache &cache) -> symc { cache.maybe_resort(); - auto &&cur_order = cache.get_cache_order(); + auto cur_order = cache.get_cache_order(); + auto allocated_size = sizeof(symcache_runtime) + + sizeof(struct cache_dynamic_item) * cur_order->size(); auto *checkpoint = (symcache_runtime *) rspamd_mempool_alloc0(task->task_pool, - sizeof(symcache_runtime) + - sizeof(struct cache_dynamic_item) * cur_order->size()); - - checkpoint->order = cache.get_cache_order(); - + allocated_size); + msg_debug_cache_task("create symcache runtime for task: %d bytes, %d items", (int) allocated_size, (int) cur_order->size()); + checkpoint->order = std::move(cur_order); + checkpoint->slow_status = slow_status::none; /* Calculate profile probability */ ev_now_update_if_cheap(task->event_loop); ev_tstamp now = ev_now(task->event_loop); @@ -160,14 +161,20 @@ auto symcache_runtime::process_settings(struct rspamd_task *task, const symcache return false; } +auto symcache_runtime::savepoint_dtor(struct rspamd_task *task) -> void +{ + msg_debug_cache_task("destroying savepoint"); + /* Drop shared ownership */ + order.reset(); +} + auto symcache_runtime::disable_all_symbols(int skip_mask) -> void { for (auto [i, item]: rspamd::enumerate(order->d)) { auto *dyn_item = &dynamic_items[i]; if (!(item->get_flags() & skip_mask)) { - dyn_item->finished = true; - dyn_item->started = true; + dyn_item->status = cache_item_status::finished; } } } @@ -181,8 +188,7 @@ auto symcache_runtime::disable_symbol(struct rspamd_task *task, const symcache & auto *dyn_item = get_dynamic_item(item->id); if (dyn_item) { - dyn_item->finished = true; - dyn_item->started = true; + dyn_item->status = cache_item_status::finished; msg_debug_cache_task("disable execution of %s", name.data()); return true; @@ -207,8 +213,7 @@ auto symcache_runtime::enable_symbol(struct rspamd_task *task, const symcache &c auto *dyn_item = get_dynamic_item(item->id); if (dyn_item) { - dyn_item->finished = false; - dyn_item->started = false; + dyn_item->status = cache_item_status::not_started; msg_debug_cache_task("enable execution of %s", name.data()); return true; @@ -233,7 +238,7 @@ auto symcache_runtime::is_symbol_checked(const symcache &cache, std::string_view auto *dyn_item = get_dynamic_item(item->id); if (dyn_item) { - return dyn_item->started; + return dyn_item->status != cache_item_status::not_started; } } @@ -253,7 +258,7 @@ auto symcache_runtime::is_symbol_enabled(struct rspamd_task *task, const symcach auto *dyn_item = get_dynamic_item(item->id); if (dyn_item) { - if (dyn_item->started) { + if (dyn_item->status != cache_item_status::not_started) { /* Already started */ return false; } @@ -337,7 +342,7 @@ auto symcache_runtime::process_pre_postfilters(struct rspamd_task *task, auto dyn_item = get_dynamic_item(item->id); - if (!dyn_item->started && !dyn_item->finished) { + if (dyn_item->status == cache_item_status::not_started) { if (slow_status == slow_status::enabled) { return false; } @@ -413,7 +418,7 @@ auto symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache auto dyn_item = &dynamic_items[idx]; - if (!dyn_item->started) { + if (dyn_item->status == cache_item_status::not_started) { all_done = false; if (!check_item_deps(task, cache, item.get(), @@ -453,15 +458,16 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, } g_assert(!item->is_virtual()); - if (dyn_item->started) { + if (dyn_item->status != cache_item_status::not_started) { /* * This can actually happen when deps span over different layers */ - return dyn_item->finished; + msg_debug_cache_task("skip already started %s(%d) symbol", item->symbol.c_str(), item->id); + + return dyn_item->status == cache_item_status::finished; } /* Check has been started */ - dyn_item->started = true; auto check = true; if (!item->is_allowed(task, true) || !item->check_conditions(task)) { @@ -469,6 +475,7 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, } if (check) { + dyn_item->status = cache_item_status::started; msg_debug_cache_task("execute %s, %d; symbol type = %s", item->symbol.data(), item->id, item_type_to_str(item->type)); @@ -482,24 +489,41 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, cur_item = dyn_item; items_inflight++; /* Callback now must finalize itself */ - item->call(task, dyn_item); - cur_item = nullptr; + if (item->call(task, dyn_item)) { + cur_item = nullptr; - if (items_inflight == 0) { - return true; - } + if (items_inflight == 0) { + msg_debug_cache_task("item %s, %d is now finished (no async events)", item->symbol.data(), + item->id); + dyn_item->status = cache_item_status::finished; + return true; + } - if (dyn_item->async_events == 0 && !dyn_item->finished) { - msg_err_cache_task("critical error: item %s has no async events pending, " - "but it is not finalised", - item->symbol.data()); - g_assert_not_reached(); - } + if (dyn_item->async_events == 0 && dyn_item->status != cache_item_status::finished) { + msg_err_cache_task("critical error: item %s has no async events pending, " + "but it is not finalised", + item->symbol.data()); + g_assert_not_reached(); + } - return false; + msg_debug_cache_task("item %s, %d is now pending", item->symbol.data(), + item->id); + dyn_item->status = cache_item_status::pending; + + return false; + } + else { + /* We were not able to call item, so we assume it is not callable */ + msg_debug_cache_task("cannot call %s, %d; symbol type = %s", item->symbol.data(), + item->id, item_type_to_str(item->type)); + dyn_item->status = cache_item_status::finished; + return true; + } } else { - dyn_item->finished = true; + msg_debug_cache_task("do not check %s, %d", item->symbol.data(), + item->id); + dyn_item->status = cache_item_status::finished; } return true; @@ -551,6 +575,9 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache auto log_func = RSPAMD_LOG_FUNC; auto inner_functor = [&](int recursion, cache_item *item, cache_dynamic_item *dyn_item, auto rec_functor) -> bool { + msg_debug_cache_task_lambda("recursively (%d) check dependencies for %s(%d)", recursion, + item->symbol.c_str(), item->id); + if (recursion > max_recursion) { msg_err_task_lambda("cyclic dependencies: maximum check level %ud exceed when " "checking dependencies for %s", @@ -571,8 +598,8 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache auto *dep_dyn_item = get_dynamic_item(dep.item->id); - if (!dep_dyn_item->finished) { - if (!dep_dyn_item->started) { + if (dep_dyn_item->status != cache_item_status::finished) { + if (dep_dyn_item->status == cache_item_status::not_started) { /* Not started */ if (!check_only) { if (!rec_functor(recursion + 1, @@ -609,14 +636,17 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache else { /* Started but not finished */ msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is " - "still executing", - dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + "still executing (%d events pending)", + dep.id, dep.sym.c_str(), + item->id, item->symbol.c_str(), + dep_dyn_item->async_events); + g_assert(dep_dyn_item->async_events > 0); ret = false; } } else { msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is already " - "checked", + "finished", dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); } } @@ -701,7 +731,7 @@ auto symcache_runtime::finalize_item(struct rspamd_task *task, cache_dynamic_ite } msg_debug_cache_task("process finalize for item %s(%d)", item->symbol.c_str(), item->id); - dyn_item->finished = true; + dyn_item->status = cache_item_status::finished; items_inflight--; cur_item = nullptr; @@ -787,7 +817,7 @@ auto symcache_runtime::process_item_rdeps(struct rspamd_task *task, cache_item * for (const auto &rdep: item->rdeps) { if (rdep.item) { auto *dyn_item = get_dynamic_item(rdep.item->id); - if (!dyn_item->started) { + if (dyn_item->status == cache_item_status::not_started) { msg_debug_cache_task("check item %d(%s) rdep of %s ", rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str()); diff --git a/src/libserver/symcache/symcache_runtime.hxx b/src/libserver/symcache/symcache_runtime.hxx index 181b32880..7e4a41269 100644 --- a/src/libserver/symcache/symcache_runtime.hxx +++ b/src/libserver/symcache/symcache_runtime.hxx @@ -30,6 +30,12 @@ struct rspamd_scan_result; namespace rspamd::symcache { +enum class cache_item_status : std::uint16_t { + not_started = 0, + started = 1, + pending = 2, + finished = 3, +}; /** * These items are saved within task structure and are used to track * symbols execution. @@ -38,8 +44,7 @@ namespace rspamd::symcache { */ struct cache_dynamic_item { std::uint16_t start_msec; /* Relative to task time */ - bool started; - bool finished; + cache_item_status status; std::uint32_t async_events; }; @@ -49,12 +54,12 @@ static_assert(std::is_trivial_v); class symcache_runtime { unsigned items_inflight; + enum class slow_status : std::uint8_t { none = 0, enabled = 1, disabled = 2, } slow_status; - bool profile; double profile_start; @@ -78,12 +83,7 @@ class symcache_runtime { public: /* Dropper for a shared ownership */ - auto savepoint_dtor() -> void - { - - /* Drop shared ownership */ - order.reset(); - } + auto savepoint_dtor(struct rspamd_task *task) -> void; /** * Creates a cache runtime using task mempool * @param task -- 2.39.5