aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2024-06-18 14:48:37 +0100
committerVsevolod Stakhov <vsevolod@rspamd.com>2024-06-18 14:48:37 +0100
commit24a4b03e8a5acd33f72c5747936e6e96650a8992 (patch)
tree009326aff7ce2e0b92c47abde2838fff524973d9
parent6e72d056bd78174c8a16c4316deb44be43af552e (diff)
downloadrspamd-24a4b03e8a5acd33f72c5747936e6e96650a8992.tar.gz
rspamd-24a4b03e8a5acd33f72c5747936e6e96650a8992.zip
[Rework] Use explicit item status
-rw-r--r--src/libserver/symcache/symcache_c.cxx8
-rw-r--r--src/libserver/symcache/symcache_impl.cxx16
-rw-r--r--src/libserver/symcache/symcache_item.cxx8
-rw-r--r--src/libserver/symcache/symcache_item.hxx7
-rw-r--r--src/libserver/symcache/symcache_runtime.cxx108
-rw-r--r--src/libserver/symcache/symcache_runtime.hxx18
6 files changed, 97 insertions, 68 deletions
diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx
index 1ffcd9ceb..3214aab2e 100644
--- a/src/libserver/symcache/symcache_c.cxx
+++ b/src/libserver/symcache/symcache_c.cxx
@@ -1,5 +1,5 @@
/*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -673,10 +673,10 @@ void rspamd_symcache_composites_foreach(struct rspamd_task *task,
real_cache->composites_foreach([&](const auto *item) {
auto *dyn_item = cache_runtime->get_dynamic_item(item->id);
- if (dyn_item && !dyn_item->started) {
+ if (dyn_item && dyn_item->status == rspamd::symcache::cache_item_status::not_started) {
auto *old_item = cache_runtime->set_cur_item(dyn_item);
func((void *) item->get_name().c_str(), item->get_cbdata(), fd);
- dyn_item->finished = true;
+ dyn_item->status = rspamd::symcache::cache_item_status::finished;
cache_runtime->set_cur_item(old_item);
}
});
@@ -711,5 +711,5 @@ void rspamd_symcache_finalize_item(struct rspamd_task *task,
void rspamd_symcache_runtime_destroy(struct rspamd_task *task)
{
auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime);
- cache_runtime->savepoint_dtor();
+ cache_runtime->savepoint_dtor(task);
} \ No newline at end of file
diff --git a/src/libserver/symcache/symcache_impl.cxx b/src/libserver/symcache/symcache_impl.cxx
index 869e025b3..34d399c5d 100644
--- a/src/libserver/symcache/symcache_impl.cxx
+++ b/src/libserver/symcache/symcache_impl.cxx
@@ -638,6 +638,7 @@ auto symcache::resort() -> void
*/
total_hits = 0;
auto used_items = ord->d.size();
+ msg_debug_cache("topologically sort %d filters", used_items);
for (const auto &it: ord->d) {
if (it->order == 0) {
@@ -696,7 +697,8 @@ auto symcache::resort() -> void
* We enrich ord with all other symbol types without any sorting,
* as it is done in another place
*/
- constexpr auto append_items_vec = [](const auto &vec, auto &out) {
+ const auto append_items_vec = [&](const auto &vec, auto &out, const char *what) {
+ msg_debug_cache_lambda("append %d items; type = %s", (int) vec.size(), what);
for (const auto &it: vec) {
if (it) {
out.emplace_back(it->getptr());
@@ -704,12 +706,12 @@ auto symcache::resort() -> void
}
};
- append_items_vec(connfilters, ord->d);
- append_items_vec(prefilters, ord->d);
- append_items_vec(postfilters, ord->d);
- append_items_vec(idempotent, ord->d);
- append_items_vec(composites, ord->d);
- append_items_vec(classifiers, ord->d);
+ append_items_vec(connfilters, ord->d, "connection filters");
+ append_items_vec(prefilters, ord->d, "prefilters");
+ append_items_vec(postfilters, ord->d, "postfilters");
+ append_items_vec(idempotent, ord->d, "idempotent filters");
+ append_items_vec(composites, ord->d, "composites");
+ append_items_vec(classifiers, ord->d, "classifiers");
/* After sorting is done, we can assign all elements in the by_symbol hash */
for (const auto [i, it]: rspamd::enumerate(ord->d)) {
diff --git a/src/libserver/symcache/symcache_item.cxx b/src/libserver/symcache/symcache_item.cxx
index ac901f5cf..ca81267c4 100644
--- a/src/libserver/symcache/symcache_item.cxx
+++ b/src/libserver/symcache/symcache_item.cxx
@@ -1,5 +1,5 @@
/*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -200,12 +200,6 @@ auto cache_item::resolve_parent(const symcache &cache) -> bool
if (is_virtual()) {
auto &virt = std::get<virtual_item>(specific);
- if (virt.get_parent(cache)) {
- msg_debug_cache("trying to resolve parent twice for %s", symbol.c_str());
-
- return false;
- }
-
return virt.resolve_parent(cache);
}
else {
diff --git a/src/libserver/symcache/symcache_item.hxx b/src/libserver/symcache/symcache_item.hxx
index a60213a61..95127f850 100644
--- a/src/libserver/symcache/symcache_item.hxx
+++ b/src/libserver/symcache/symcache_item.hxx
@@ -1,5 +1,5 @@
/*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -425,13 +425,16 @@ public:
return false;
}
- auto call(struct rspamd_task *task, cache_dynamic_item *dyn_item) const -> void
+ auto call(struct rspamd_task *task, cache_dynamic_item *dyn_item) const -> bool
{
if (std::holds_alternative<normal_item>(specific)) {
const auto &filter_data = std::get<normal_item>(specific);
filter_data.call(task, (struct rspamd_symcache_dynamic_item *) dyn_item);
+ return true;
}
+
+ return false;
}
/**
diff --git a/src/libserver/symcache/symcache_runtime.cxx b/src/libserver/symcache/symcache_runtime.cxx
index 741e75183..8c2df4696 100644
--- a/src/libserver/symcache/symcache_runtime.cxx
+++ b/src/libserver/symcache/symcache_runtime.cxx
@@ -38,13 +38,14 @@ auto symcache_runtime::create(struct rspamd_task *task, symcache &cache) -> symc
{
cache.maybe_resort();
- auto &&cur_order = cache.get_cache_order();
+ auto cur_order = cache.get_cache_order();
+ auto allocated_size = sizeof(symcache_runtime) +
+ sizeof(struct cache_dynamic_item) * cur_order->size();
auto *checkpoint = (symcache_runtime *) rspamd_mempool_alloc0(task->task_pool,
- sizeof(symcache_runtime) +
- sizeof(struct cache_dynamic_item) * cur_order->size());
-
- checkpoint->order = cache.get_cache_order();
-
+ allocated_size);
+ msg_debug_cache_task("create symcache runtime for task: %d bytes, %d items", (int) allocated_size, (int) cur_order->size());
+ checkpoint->order = std::move(cur_order);
+ checkpoint->slow_status = slow_status::none;
/* Calculate profile probability */
ev_now_update_if_cheap(task->event_loop);
ev_tstamp now = ev_now(task->event_loop);
@@ -160,14 +161,20 @@ auto symcache_runtime::process_settings(struct rspamd_task *task, const symcache
return false;
}
+auto symcache_runtime::savepoint_dtor(struct rspamd_task *task) -> void
+{
+ msg_debug_cache_task("destroying savepoint");
+ /* Drop shared ownership */
+ order.reset();
+}
+
auto symcache_runtime::disable_all_symbols(int skip_mask) -> void
{
for (auto [i, item]: rspamd::enumerate(order->d)) {
auto *dyn_item = &dynamic_items[i];
if (!(item->get_flags() & skip_mask)) {
- dyn_item->finished = true;
- dyn_item->started = true;
+ dyn_item->status = cache_item_status::finished;
}
}
}
@@ -181,8 +188,7 @@ auto symcache_runtime::disable_symbol(struct rspamd_task *task, const symcache &
auto *dyn_item = get_dynamic_item(item->id);
if (dyn_item) {
- dyn_item->finished = true;
- dyn_item->started = true;
+ dyn_item->status = cache_item_status::finished;
msg_debug_cache_task("disable execution of %s", name.data());
return true;
@@ -207,8 +213,7 @@ auto symcache_runtime::enable_symbol(struct rspamd_task *task, const symcache &c
auto *dyn_item = get_dynamic_item(item->id);
if (dyn_item) {
- dyn_item->finished = false;
- dyn_item->started = false;
+ dyn_item->status = cache_item_status::not_started;
msg_debug_cache_task("enable execution of %s", name.data());
return true;
@@ -233,7 +238,7 @@ auto symcache_runtime::is_symbol_checked(const symcache &cache, std::string_view
auto *dyn_item = get_dynamic_item(item->id);
if (dyn_item) {
- return dyn_item->started;
+ return dyn_item->status != cache_item_status::not_started;
}
}
@@ -253,7 +258,7 @@ auto symcache_runtime::is_symbol_enabled(struct rspamd_task *task, const symcach
auto *dyn_item = get_dynamic_item(item->id);
if (dyn_item) {
- if (dyn_item->started) {
+ if (dyn_item->status != cache_item_status::not_started) {
/* Already started */
return false;
}
@@ -337,7 +342,7 @@ auto symcache_runtime::process_pre_postfilters(struct rspamd_task *task,
auto dyn_item = get_dynamic_item(item->id);
- if (!dyn_item->started && !dyn_item->finished) {
+ if (dyn_item->status == cache_item_status::not_started) {
if (slow_status == slow_status::enabled) {
return false;
}
@@ -413,7 +418,7 @@ auto symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache
auto dyn_item = &dynamic_items[idx];
- if (!dyn_item->started) {
+ if (dyn_item->status == cache_item_status::not_started) {
all_done = false;
if (!check_item_deps(task, cache, item.get(),
@@ -453,15 +458,16 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache,
}
g_assert(!item->is_virtual());
- if (dyn_item->started) {
+ if (dyn_item->status != cache_item_status::not_started) {
/*
* This can actually happen when deps span over different layers
*/
- return dyn_item->finished;
+ msg_debug_cache_task("skip already started %s(%d) symbol", item->symbol.c_str(), item->id);
+
+ return dyn_item->status == cache_item_status::finished;
}
/* Check has been started */
- dyn_item->started = true;
auto check = true;
if (!item->is_allowed(task, true) || !item->check_conditions(task)) {
@@ -469,6 +475,7 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache,
}
if (check) {
+ dyn_item->status = cache_item_status::started;
msg_debug_cache_task("execute %s, %d; symbol type = %s", item->symbol.data(),
item->id, item_type_to_str(item->type));
@@ -482,24 +489,41 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache,
cur_item = dyn_item;
items_inflight++;
/* Callback now must finalize itself */
- item->call(task, dyn_item);
- cur_item = nullptr;
+ if (item->call(task, dyn_item)) {
+ cur_item = nullptr;
- if (items_inflight == 0) {
- return true;
- }
+ if (items_inflight == 0) {
+ msg_debug_cache_task("item %s, %d is now finished (no async events)", item->symbol.data(),
+ item->id);
+ dyn_item->status = cache_item_status::finished;
+ return true;
+ }
- if (dyn_item->async_events == 0 && !dyn_item->finished) {
- msg_err_cache_task("critical error: item %s has no async events pending, "
- "but it is not finalised",
- item->symbol.data());
- g_assert_not_reached();
- }
+ if (dyn_item->async_events == 0 && dyn_item->status != cache_item_status::finished) {
+ msg_err_cache_task("critical error: item %s has no async events pending, "
+ "but it is not finalised",
+ item->symbol.data());
+ g_assert_not_reached();
+ }
- return false;
+ msg_debug_cache_task("item %s, %d is now pending", item->symbol.data(),
+ item->id);
+ dyn_item->status = cache_item_status::pending;
+
+ return false;
+ }
+ else {
+ /* We were not able to call item, so we assume it is not callable */
+ msg_debug_cache_task("cannot call %s, %d; symbol type = %s", item->symbol.data(),
+ item->id, item_type_to_str(item->type));
+ dyn_item->status = cache_item_status::finished;
+ return true;
+ }
}
else {
- dyn_item->finished = true;
+ msg_debug_cache_task("do not check %s, %d", item->symbol.data(),
+ item->id);
+ dyn_item->status = cache_item_status::finished;
}
return true;
@@ -551,6 +575,9 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache
auto log_func = RSPAMD_LOG_FUNC;
auto inner_functor = [&](int recursion, cache_item *item, cache_dynamic_item *dyn_item, auto rec_functor) -> bool {
+ msg_debug_cache_task_lambda("recursively (%d) check dependencies for %s(%d)", recursion,
+ item->symbol.c_str(), item->id);
+
if (recursion > max_recursion) {
msg_err_task_lambda("cyclic dependencies: maximum check level %ud exceed when "
"checking dependencies for %s",
@@ -571,8 +598,8 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache
auto *dep_dyn_item = get_dynamic_item(dep.item->id);
- if (!dep_dyn_item->finished) {
- if (!dep_dyn_item->started) {
+ if (dep_dyn_item->status != cache_item_status::finished) {
+ if (dep_dyn_item->status == cache_item_status::not_started) {
/* Not started */
if (!check_only) {
if (!rec_functor(recursion + 1,
@@ -609,14 +636,17 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache
else {
/* Started but not finished */
msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is "
- "still executing",
- dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
+ "still executing (%d events pending)",
+ dep.id, dep.sym.c_str(),
+ item->id, item->symbol.c_str(),
+ dep_dyn_item->async_events);
+ g_assert(dep_dyn_item->async_events > 0);
ret = false;
}
}
else {
msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is already "
- "checked",
+ "finished",
dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
}
}
@@ -701,7 +731,7 @@ auto symcache_runtime::finalize_item(struct rspamd_task *task, cache_dynamic_ite
}
msg_debug_cache_task("process finalize for item %s(%d)", item->symbol.c_str(), item->id);
- dyn_item->finished = true;
+ dyn_item->status = cache_item_status::finished;
items_inflight--;
cur_item = nullptr;
@@ -787,7 +817,7 @@ auto symcache_runtime::process_item_rdeps(struct rspamd_task *task, cache_item *
for (const auto &rdep: item->rdeps) {
if (rdep.item) {
auto *dyn_item = get_dynamic_item(rdep.item->id);
- if (!dyn_item->started) {
+ if (dyn_item->status == cache_item_status::not_started) {
msg_debug_cache_task("check item %d(%s) rdep of %s ",
rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str());
diff --git a/src/libserver/symcache/symcache_runtime.hxx b/src/libserver/symcache/symcache_runtime.hxx
index 181b32880..7e4a41269 100644
--- a/src/libserver/symcache/symcache_runtime.hxx
+++ b/src/libserver/symcache/symcache_runtime.hxx
@@ -30,6 +30,12 @@
struct rspamd_scan_result;
namespace rspamd::symcache {
+enum class cache_item_status : std::uint16_t {
+ not_started = 0,
+ started = 1,
+ pending = 2,
+ finished = 3,
+};
/**
* These items are saved within task structure and are used to track
* symbols execution.
@@ -38,8 +44,7 @@ namespace rspamd::symcache {
*/
struct cache_dynamic_item {
std::uint16_t start_msec; /* Relative to task time */
- bool started;
- bool finished;
+ cache_item_status status;
std::uint32_t async_events;
};
@@ -49,12 +54,12 @@ static_assert(std::is_trivial_v<cache_dynamic_item>);
class symcache_runtime {
unsigned items_inflight;
+
enum class slow_status : std::uint8_t {
none = 0,
enabled = 1,
disabled = 2,
} slow_status;
-
bool profile;
double profile_start;
@@ -78,12 +83,7 @@ class symcache_runtime {
public:
/* Dropper for a shared ownership */
- auto savepoint_dtor() -> void
- {
-
- /* Drop shared ownership */
- order.reset();
- }
+ auto savepoint_dtor(struct rspamd_task *task) -> void;
/**
* Creates a cache runtime using task mempool
* @param task