aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2024-06-21 16:14:56 +0600
committerGitHub <noreply@github.com>2024-06-21 16:14:56 +0600
commit90b73439d20d6e5b9b9e61cecbaa9809d7d0ddcd (patch)
tree555a09d3b715d3cf1f6e0405e6d360d610b3c7fc /src
parentb44099c96e279a0d60b0688e84e0ef6293194c59 (diff)
parentb2df28925f5ffc785969a973e0bfb5c5670d9d2b (diff)
downloadrspamd-90b73439d20d6e5b9b9e61cecbaa9809d7d0ddcd.tar.gz
rspamd-90b73439d20d6e5b9b9e61cecbaa9809d7d0ddcd.zip
Merge pull request #5020 from rspamd/vstakhov-slow-timer
[Rework] Rething slow timer
Diffstat (limited to 'src')
-rw-r--r--src/libserver/composites/composites.cxx6
-rw-r--r--src/libserver/composites/composites_internal.hxx6
-rw-r--r--src/libserver/html/html_tag_defs.hxx6
-rw-r--r--src/libserver/symcache/symcache_c.cxx17
-rw-r--r--src/libserver/symcache/symcache_impl.cxx111
-rw-r--r--src/libserver/symcache/symcache_internal.hxx4
-rw-r--r--src/libserver/symcache/symcache_item.cxx70
-rw-r--r--src/libserver/symcache/symcache_item.hxx39
-rw-r--r--src/libserver/symcache/symcache_runtime.cxx222
-rw-r--r--src/libserver/symcache/symcache_runtime.hxx30
10 files changed, 319 insertions, 192 deletions
diff --git a/src/libserver/composites/composites.cxx b/src/libserver/composites/composites.cxx
index 64f8f2daf..0e7adfb8a 100644
--- a/src/libserver/composites/composites.cxx
+++ b/src/libserver/composites/composites.cxx
@@ -1,11 +1,11 @@
-/*-
- * Copyright 2021 Vsevolod Stakhov
+/*
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/src/libserver/composites/composites_internal.hxx b/src/libserver/composites/composites_internal.hxx
index 55aaa2ee1..2ae4219eb 100644
--- a/src/libserver/composites/composites_internal.hxx
+++ b/src/libserver/composites/composites_internal.hxx
@@ -1,11 +1,11 @@
-/*-
- * Copyright 2021 Vsevolod Stakhov
+/*
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/src/libserver/html/html_tag_defs.hxx b/src/libserver/html/html_tag_defs.hxx
index 4cff79855..05a5e3cde 100644
--- a/src/libserver/html/html_tag_defs.hxx
+++ b/src/libserver/html/html_tag_defs.hxx
@@ -1,11 +1,11 @@
-/*-
- * Copyright 2021 Vsevolod Stakhov
+/*
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx
index 1ffcd9ceb..047fc1181 100644
--- a/src/libserver/symcache/symcache_c.cxx
+++ b/src/libserver/symcache/symcache_c.cxx
@@ -1,5 +1,5 @@
/*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -602,8 +602,15 @@ unsigned int rspamd_symcache_item_async_inc_full(struct rspamd_task *task,
"subsystem %s (%s)",
static_item->symbol.c_str(), static_item->id,
real_dyn_item->async_events, subsystem, loc);
+ auto nevents = ++real_dyn_item->async_events;
- return ++real_dyn_item->async_events;
+ if (nevents > 1) {
+ /* Item is async */
+ static_item->internal_flags &= ~rspamd::symcache::cache_item::bit_sync;
+ real_dyn_item->status = rspamd::symcache::cache_item_status::pending;
+ }
+
+ return nevents;
}
unsigned int rspamd_symcache_item_async_dec_full(struct rspamd_task *task,
@@ -673,10 +680,10 @@ void rspamd_symcache_composites_foreach(struct rspamd_task *task,
real_cache->composites_foreach([&](const auto *item) {
auto *dyn_item = cache_runtime->get_dynamic_item(item->id);
- if (dyn_item && !dyn_item->started) {
+ if (dyn_item && dyn_item->status == rspamd::symcache::cache_item_status::not_started) {
auto *old_item = cache_runtime->set_cur_item(dyn_item);
func((void *) item->get_name().c_str(), item->get_cbdata(), fd);
- dyn_item->finished = true;
+ dyn_item->status = rspamd::symcache::cache_item_status::finished;
cache_runtime->set_cur_item(old_item);
}
});
@@ -711,5 +718,5 @@ void rspamd_symcache_finalize_item(struct rspamd_task *task,
void rspamd_symcache_runtime_destroy(struct rspamd_task *task)
{
auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime);
- cache_runtime->savepoint_dtor();
+ cache_runtime->savepoint_dtor(task);
} \ No newline at end of file
diff --git a/src/libserver/symcache/symcache_impl.cxx b/src/libserver/symcache/symcache_impl.cxx
index 869e025b3..4ea087024 100644
--- a/src/libserver/symcache/symcache_impl.cxx
+++ b/src/libserver/symcache/symcache_impl.cxx
@@ -112,29 +112,39 @@ auto symcache::init() -> bool
/* Deal with the delayed dependencies */
msg_debug_cache("resolving delayed dependencies: %d in list", (int) delayed_deps->size());
for (const auto &delayed_dep: *delayed_deps) {
- auto virt_item = get_item_by_name(delayed_dep.from, false);
- auto real_item = get_item_by_name(delayed_dep.from, true);
-
- if (virt_item == nullptr || real_item == nullptr) {
- msg_err_cache("cannot register delayed dependency between %s and %s: "
- "%s is missing",
- delayed_dep.from.data(),
- delayed_dep.to.data(), delayed_dep.from.data());
+ auto virt_source = get_item_by_name(delayed_dep.from, false);
+ auto real_source = get_item_by_name(delayed_dep.from, true);
+
+ auto real_destination = get_item_by_name(delayed_dep.to, true);
+
+ if (virt_source == nullptr || real_source == nullptr || real_destination == nullptr) {
+ if (real_destination != nullptr) {
+ msg_err_cache("cannot register delayed dependency %s -> %s: "
+ "source %s is missing",
+ delayed_dep.from.data(),
+ delayed_dep.to.data(), delayed_dep.from.data());
+ }
+ else {
+ msg_err_cache("cannot register delayed dependency %s -> %s: "
+ "destionation %s is missing",
+ delayed_dep.from.data(),
+ delayed_dep.to.data(), delayed_dep.to.data());
+ }
}
else {
- if (!disabled_ids.contains(real_item->id)) {
+ if (!disabled_ids.contains(real_source->id)) {
msg_debug_cache("delayed between %s(%d:%d) -> %s",
delayed_dep.from.data(),
- real_item->id, virt_item->id,
+ real_source->id, virt_source->id,
delayed_dep.to.data());
- add_dependency(real_item->id, delayed_dep.to,
- virt_item != real_item ? virt_item->id : -1);
+ add_dependency(real_source->id, delayed_dep.to, real_destination->id,
+ virt_source != real_source ? virt_source->id : -1);
}
else {
msg_debug_cache("no delayed between %s(%d:%d) -> %s; %s is disabled",
delayed_dep.from.data(),
- real_item->id, virt_item->id,
+ real_source->id, virt_source->id,
delayed_dep.to.data(),
delayed_dep.from.data());
}
@@ -529,16 +539,27 @@ auto symcache::get_item_by_name_mut(std::string_view name, bool resolve_parent)
return it->second;
}
-auto symcache::add_dependency(int id_from, std::string_view to, int virtual_id_from) -> void
+auto symcache::add_dependency(int id_from, std::string_view to, int id_to, int virtual_id_from) -> void
{
g_assert(id_from >= 0 && id_from < (int) items_by_id.size());
+ g_assert(id_to >= 0 && id_to < (int) items_by_id.size());
const auto &source = items_by_id[id_from];
+ const auto &dest = items_by_id[id_to];
g_assert(source.get() != nullptr);
-
- source->deps.emplace_back(nullptr,
- std::string(to),
- id_from,
- -1);
+ g_assert(dest.get() != nullptr);
+
+ if (!source->deps.contains(id_to)) {
+ msg_debug_cache("add dependency %s(%d) -> %s(%d)",
+ source->symbol.c_str(), source->id, to.data(), dest->id);
+ source->deps.emplace(id_to, cache_dependency{dest.get(),
+ std::string(to),
+ -1});
+ }
+ else {
+ msg_debug_cache("duplicate dependency %s -> %s",
+ source->symbol.c_str(), to.data());
+ return;
+ }
if (virtual_id_from >= 0) {
@@ -546,10 +567,18 @@ auto symcache::add_dependency(int id_from, std::string_view to, int virtual_id_f
/* We need that for settings id propagation */
const auto &vsource = items_by_id[virtual_id_from];
g_assert(vsource.get() != nullptr);
- vsource->deps.emplace_back(nullptr,
- std::string(to),
- -1,
- virtual_id_from);
+
+ if (!vsource->deps.contains(id_to)) {
+ msg_debug_cache("add virtual dependency %s -> %s",
+ vsource->symbol.c_str(), to.data());
+ vsource->deps.emplace(id_to, cache_dependency{dest.get(),
+ std::string(to),
+ virtual_id_from});
+ }
+ else {
+ msg_debug_cache("duplicate virtual dependency %s -> %s",
+ vsource->symbol.c_str(), to.data());
+ }
}
}
@@ -625,7 +654,7 @@ auto symcache::resort() -> void
tsort_mark(it, tsort_mask::TEMP);
msg_debug_cache_lambda("visiting node: %s (%d)", it->symbol.c_str(), cur_order);
- for (const auto &dep: it->deps) {
+ for (const auto &[id, dep]: it->deps) {
msg_debug_cache_lambda("visiting dep: %s (%d)", dep.item->symbol.c_str(), cur_order + 1);
rec(dep.item, cur_order + 1, rec);
}
@@ -638,6 +667,7 @@ auto symcache::resort() -> void
*/
total_hits = 0;
auto used_items = ord->d.size();
+ msg_debug_cache("topologically sort %d filters", used_items);
for (const auto &it: ord->d) {
if (it->order == 0) {
@@ -696,7 +726,8 @@ auto symcache::resort() -> void
* We enrich ord with all other symbol types without any sorting,
* as it is done in another place
*/
- constexpr auto append_items_vec = [](const auto &vec, auto &out) {
+ const auto append_items_vec = [&](const auto &vec, auto &out, const char *what) {
+ msg_debug_cache_lambda("append %d items; type = %s", (int) vec.size(), what);
for (const auto &it: vec) {
if (it) {
out.emplace_back(it->getptr());
@@ -704,12 +735,12 @@ auto symcache::resort() -> void
}
};
- append_items_vec(connfilters, ord->d);
- append_items_vec(prefilters, ord->d);
- append_items_vec(postfilters, ord->d);
- append_items_vec(idempotent, ord->d);
- append_items_vec(composites, ord->d);
- append_items_vec(classifiers, ord->d);
+ append_items_vec(connfilters, ord->d, "connection filters");
+ append_items_vec(prefilters, ord->d, "prefilters");
+ append_items_vec(postfilters, ord->d, "postfilters");
+ append_items_vec(idempotent, ord->d, "idempotent filters");
+ append_items_vec(composites, ord->d, "composites");
+ append_items_vec(classifiers, ord->d, "classifiers");
/* After sorting is done, we can assign all elements in the by_symbol hash */
for (const auto [i, it]: rspamd::enumerate(ord->d)) {
@@ -856,8 +887,7 @@ auto symcache::validate(bool strict) -> bool
for (auto &pair: items_by_symbol) {
auto &item = pair.second;
- auto ghost = item->st->weight == 0;
- auto skipped = !ghost;
+ auto skipped = item->st->weight != 0;
if (item->is_scoreable() && g_hash_table_lookup(cfg->symbols, item->symbol.c_str()) == nullptr) {
if (!std::isnan(cfg->unknown_weight)) {
@@ -871,18 +901,19 @@ auto symcache::validate(bool strict) -> bool
msg_info_cache("adding unknown symbol %s with weight: %.2f",
item->symbol.c_str(), cfg->unknown_weight);
- ghost = false;
skipped = false;
}
else {
+ /* No `unknown weight`, no static score, and no dynamic score */
skipped = true;
}
}
else {
+ /* We have a score, so we are not skipped */
skipped = false;
}
- if (!ghost && skipped) {
+ if (skipped) {
if (!(item->flags & SYMBOL_TYPE_SKIPPED)) {
item->flags |= SYMBOL_TYPE_SKIPPED;
msg_warn_cache("symbol %s has no score registered, skip its check",
@@ -890,12 +921,6 @@ auto symcache::validate(bool strict) -> bool
}
}
- if (ghost) {
- msg_debug_cache("symbol %s is registered as ghost symbol, it won't be inserted "
- "to any metric",
- item->symbol.c_str());
- }
-
if (item->st->weight < 0 && item->priority == 0) {
item->priority++;
}
@@ -953,7 +978,7 @@ auto symcache::validate(bool strict) -> bool
auto item = get_item_by_name_mut((const char *) k, false);
if (item) {
- item->enabled = FALSE;
+ item->internal_flags &= ~cache_item::bit_enabled;
}
}
}
@@ -1215,7 +1240,7 @@ auto symcache::get_max_timeout(std::vector<std::pair<double, const cache_item *>
auto own_timeout = get_item_timeout(it);
auto max_child_timeout = 0.0;
- for (const auto &dep: it->deps) {
+ for (const auto &[id, dep]: it->deps) {
auto cld_timeout = self(dep.item, self);
if (cld_timeout > max_child_timeout) {
diff --git a/src/libserver/symcache/symcache_internal.hxx b/src/libserver/symcache/symcache_internal.hxx
index 255a4b1c1..c7dda51d1 100644
--- a/src/libserver/symcache/symcache_internal.hxx
+++ b/src/libserver/symcache/symcache_internal.hxx
@@ -1,5 +1,5 @@
/*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -365,7 +365,7 @@ public:
* @param virtual_id_from
* @return
*/
- auto add_dependency(int id_from, std::string_view to, int virtual_id_from) -> void;
+ auto add_dependency(int id_from, std::string_view to, int id_to, int virtual_id_from) -> void;
/**
* Add a delayed dependency between symbols that will be resolved on the init stage
diff --git a/src/libserver/symcache/symcache_item.cxx b/src/libserver/symcache/symcache_item.cxx
index ac901f5cf..490a87880 100644
--- a/src/libserver/symcache/symcache_item.cxx
+++ b/src/libserver/symcache/symcache_item.cxx
@@ -1,5 +1,5 @@
/*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -78,11 +78,11 @@ auto cache_item::process_deps(const symcache &cache) -> void
/* Allow logging macros to work */
auto log_tag = [&]() { return cache.log_tag(); };
- for (auto &dep: deps) {
+ for (auto &[_id, dep]: deps) {
msg_debug_cache("process real dependency %s on %s", symbol.c_str(), dep.sym.c_str());
auto *dit = cache.get_item_by_name_mut(dep.sym, true);
- if (dep.vid >= 0) {
+ if (dep.virtual_source_id >= 0) {
/* Case of the virtual symbol that depends on another (maybe virtual) symbol */
const auto *vdit = cache.get_item_by_name(dep.sym, false);
@@ -94,7 +94,7 @@ auto cache_item::process_deps(const symcache &cache) -> void
}
else {
msg_debug_cache("process virtual dependency %s(%d) on %s(%d)", symbol.c_str(),
- dep.vid, vdit->symbol.c_str(), vdit->id);
+ dep.virtual_source_id, vdit->symbol.c_str(), vdit->id);
unsigned nids = 0;
@@ -148,6 +148,8 @@ auto cache_item::process_deps(const symcache &cache) -> void
continue;
}
+
+ dep.item = dit;
}
else {
if (dit->id == id) {
@@ -161,36 +163,54 @@ auto cache_item::process_deps(const symcache &cache) -> void
auto *parent = get_parent_mut(cache);
if (parent) {
- dit->rdeps.emplace_back(parent, parent->symbol, parent->id, -1);
+ if (!dit->rdeps.contains(parent->id)) {
+ dit->rdeps.emplace(parent->id, cache_dependency{parent, parent->symbol, -1});
+ msg_debug_cache("added reverse dependency from %d on %d", parent->id,
+ dit->id);
+ }
+ else {
+ msg_debug_cache("reverse dependency from %d on %d already exists",
+ parent->id, dit->id);
+ }
dep.item = dit;
- dep.id = dit->id;
-
- msg_debug_cache("added reverse dependency from %d on %d", parent->id,
- dit->id);
+ }
+ else {
+ msg_err_cache("cannot find parent for virtual symbol %s, when resolving dependency %s",
+ symbol.c_str(), dep.sym.c_str());
}
}
else {
dep.item = dit;
- dep.id = dit->id;
- dit->rdeps.emplace_back(this, symbol, id, -1);
- msg_debug_cache("added reverse dependency from %d on %d", id,
- dit->id);
+ if (!dit->rdeps.contains(id)) {
+ dit->rdeps.emplace(id, cache_dependency{this, symbol, -1});
+ msg_debug_cache("added reverse dependency from %d on %d", id,
+ dit->id);
+ }
+ else {
+ msg_debug_cache("reverse dependency from %d on %d already exists",
+ id, dit->id);
+ }
}
}
}
}
- else if (dep.id >= 0) {
- msg_err_cache("cannot find dependency on symbol %s for symbol %s",
+ else {
+ msg_err_cache("cannot find dependency named %s for symbol %s",
dep.sym.c_str(), symbol.c_str());
-
- continue;
}
}
// Remove empty deps
- deps.erase(std::remove_if(std::begin(deps), std::end(deps),
- [](const auto &dep) { return !dep.item; }),
- std::end(deps));
+ for (auto it = deps.begin(); it != deps.end();) {
+ if (it->second.item == nullptr) {
+ msg_info_cache("remove empty dependency on %s for symbol %s",
+ it->second.sym.c_str(), symbol.c_str());
+ it = deps.erase(it);
+ }
+ else {
+ ++it;
+ }
+ }
}
auto cache_item::resolve_parent(const symcache &cache) -> bool
@@ -200,12 +220,6 @@ auto cache_item::resolve_parent(const symcache &cache) -> bool
if (is_virtual()) {
auto &virt = std::get<virtual_item>(specific);
- if (virt.get_parent(cache)) {
- msg_debug_cache("trying to resolve parent twice for %s", symbol.c_str());
-
- return false;
- }
-
return virt.resolve_parent(cache);
}
else {
@@ -320,11 +334,11 @@ auto cache_item::is_allowed(struct rspamd_task *task, bool exec_only) const -> b
}
/* Static checks */
- if (!enabled ||
+ if (!(internal_flags & cache_item::bit_enabled) ||
(RSPAMD_TASK_IS_EMPTY(task) && !(flags & SYMBOL_TYPE_EMPTY)) ||
(flags & SYMBOL_TYPE_MIME_ONLY && !RSPAMD_TASK_IS_MIME(task))) {
- if (!enabled) {
+ if (!(internal_flags & cache_item::bit_enabled)) {
msg_debug_cache_task("skipping %s of %s as it is permanently disabled",
what, symbol.c_str());
diff --git a/src/libserver/symcache/symcache_item.hxx b/src/libserver/symcache/symcache_item.hxx
index a60213a61..8ed973a82 100644
--- a/src/libserver/symcache/symcache_item.hxx
+++ b/src/libserver/symcache/symcache_item.hxx
@@ -1,5 +1,5 @@
/*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -177,14 +177,13 @@ public:
};
struct cache_dependency {
- cache_item *item; /* Real dependency */
- std::string sym; /* Symbolic dep name */
- int id; /* Real from */
- int vid; /* Virtual from */
+ cache_item *item; /* Real dependency */
+ std::string sym; /* Symbolic dep name */
+ int virtual_source_id; /* Virtual source */
public:
/* Default piecewise constructor */
- explicit cache_dependency(cache_item *_item, std::string _sym, int _id, int _vid)
- : item(_item), sym(std::move(_sym)), id(_id), vid(_vid)
+ explicit cache_dependency(cache_item *_item, std::string _sym, int _vid)
+ : item(_item), sym(std::move(_sym)), virtual_source_id(_vid)
{
}
};
@@ -215,15 +214,18 @@ struct cache_item : std::enable_shared_from_this<cache_item> {
struct rspamd_symcache_item_stat *st = nullptr;
struct rspamd_counter_data *cd = nullptr;
+ std::string symbol;
+
/* Unique id - counter */
int id;
std::uint64_t last_count = 0;
- std::string symbol;
symcache_item_type type;
int flags;
- /* Condition of execution */
- bool enabled = true;
+ static constexpr const auto bit_enabled = 0b0001;
+ static constexpr const auto bit_sync = 0b0010;
+ static constexpr const auto bit_slow = 0b0100;
+ int internal_flags = bit_enabled | bit_sync;
/* Priority */
int priority = 0;
@@ -246,9 +248,9 @@ struct cache_item : std::enable_shared_from_this<cache_item> {
augmentations;
/* Dependencies */
- std::vector<cache_dependency> deps;
+ ankerl::unordered_dense::map<int, cache_dependency> deps;
/* Reverse dependencies */
- std::vector<cache_dependency> rdeps;
+ ankerl::unordered_dense::map<int, cache_dependency> rdeps;
public:
/**
@@ -425,13 +427,16 @@ public:
return false;
}
- auto call(struct rspamd_task *task, cache_dynamic_item *dyn_item) const -> void
+ auto call(struct rspamd_task *task, cache_dynamic_item *dyn_item) const -> bool
{
if (std::holds_alternative<normal_item>(specific)) {
const auto &filter_data = std::get<normal_item>(specific);
filter_data.call(task, (struct rspamd_symcache_dynamic_item *) dyn_item);
+ return true;
}
+
+ return false;
}
/**
@@ -512,8 +517,8 @@ private:
void *user_data,
symcache_item_type _type,
int _flags)
- : id(_id),
- symbol(std::move(name)),
+ : symbol(std::move(name)),
+ id(_id),
type(_type),
flags(_flags),
priority(_priority),
@@ -541,8 +546,8 @@ private:
int parent,
symcache_item_type _type,
int _flags)
- : id(_id),
- symbol(std::move(name)),
+ : symbol(std::move(name)),
+ id(_id),
type(_type),
flags(_flags),
specific(virtual_item{parent})
diff --git a/src/libserver/symcache/symcache_runtime.cxx b/src/libserver/symcache/symcache_runtime.cxx
index 15f970d8c..dc7066b32 100644
--- a/src/libserver/symcache/symcache_runtime.cxx
+++ b/src/libserver/symcache/symcache_runtime.cxx
@@ -1,5 +1,5 @@
/*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,19 +38,26 @@ auto symcache_runtime::create(struct rspamd_task *task, symcache &cache) -> symc
{
cache.maybe_resort();
- auto &&cur_order = cache.get_cache_order();
+ auto cur_order = cache.get_cache_order();
+ auto allocated_size = sizeof(symcache_runtime) +
+ sizeof(struct cache_dynamic_item) * cur_order->size();
auto *checkpoint = (symcache_runtime *) rspamd_mempool_alloc0(task->task_pool,
- sizeof(symcache_runtime) +
- sizeof(struct cache_dynamic_item) * cur_order->size());
-
- checkpoint->order = cache.get_cache_order();
-
+ allocated_size);
+ msg_debug_cache_task("create symcache runtime for task: %d bytes, %d items", (int) allocated_size, (int) cur_order->size());
+ checkpoint->order = std::move(cur_order);
+ checkpoint->slow_status = slow_status::none;
/* Calculate profile probability */
ev_now_update_if_cheap(task->event_loop);
ev_tstamp now = ev_now(task->event_loop);
checkpoint->profile_start = now;
checkpoint->lim = rspamd_task_get_required_score(task, task->result);
+ /*
+ * We enable profiling if the following conditions are met:
+ * - we have not profiled for a long time
+ * - message is large
+ * - random probability
+ */
if ((cache.get_last_profile() == 0.0 || now > cache.get_last_profile() + PROFILE_MAX_TIME) ||
(task->msg.len >= PROFILE_MESSAGE_SIZE_THRESHOLD) ||
(rspamd_random_double_fast() >= (1 - PROFILE_PROBABILITY))) {
@@ -154,14 +161,20 @@ auto symcache_runtime::process_settings(struct rspamd_task *task, const symcache
return false;
}
+auto symcache_runtime::savepoint_dtor(struct rspamd_task *task) -> void
+{
+ msg_debug_cache_task("destroying savepoint");
+ /* Drop shared ownership */
+ order.reset();
+}
+
auto symcache_runtime::disable_all_symbols(int skip_mask) -> void
{
for (auto [i, item]: rspamd::enumerate(order->d)) {
auto *dyn_item = &dynamic_items[i];
if (!(item->get_flags() & skip_mask)) {
- dyn_item->finished = true;
- dyn_item->started = true;
+ dyn_item->status = cache_item_status::finished;
}
}
}
@@ -175,8 +188,7 @@ auto symcache_runtime::disable_symbol(struct rspamd_task *task, const symcache &
auto *dyn_item = get_dynamic_item(item->id);
if (dyn_item) {
- dyn_item->finished = true;
- dyn_item->started = true;
+ dyn_item->status = cache_item_status::finished;
msg_debug_cache_task("disable execution of %s", name.data());
return true;
@@ -201,8 +213,7 @@ auto symcache_runtime::enable_symbol(struct rspamd_task *task, const symcache &c
auto *dyn_item = get_dynamic_item(item->id);
if (dyn_item) {
- dyn_item->finished = false;
- dyn_item->started = false;
+ dyn_item->status = cache_item_status::not_started;
msg_debug_cache_task("enable execution of %s", name.data());
return true;
@@ -227,7 +238,7 @@ auto symcache_runtime::is_symbol_checked(const symcache &cache, std::string_view
auto *dyn_item = get_dynamic_item(item->id);
if (dyn_item) {
- return dyn_item->started;
+ return dyn_item->status != cache_item_status::not_started;
}
}
@@ -247,7 +258,7 @@ auto symcache_runtime::is_symbol_enabled(struct rspamd_task *task, const symcach
auto *dyn_item = get_dynamic_item(item->id);
if (dyn_item) {
- if (dyn_item->started) {
+ if (dyn_item->status != cache_item_status::not_started) {
/* Already started */
return false;
}
@@ -331,11 +342,8 @@ auto symcache_runtime::process_pre_postfilters(struct rspamd_task *task,
auto dyn_item = get_dynamic_item(item->id);
- if (!dyn_item->started && !dyn_item->finished) {
- if (has_slow) {
- /* Delay */
- has_slow = false;
-
+ if (dyn_item->status == cache_item_status::not_started) {
+ if (slow_status == slow_status::enabled) {
return false;
}
@@ -410,7 +418,7 @@ auto symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache
auto dyn_item = &dynamic_items[idx];
- if (!dyn_item->started) {
+ if (dyn_item->status == cache_item_status::not_started) {
all_done = false;
if (!check_item_deps(task, cache, item.get(),
@@ -424,10 +432,7 @@ auto symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache
process_symbol(task, cache, item.get(), dyn_item);
- if (has_slow) {
- /* Delay */
- has_slow = false;
-
+ if (slow_status == slow_status::enabled) {
return false;
}
}
@@ -453,15 +458,16 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache,
}
g_assert(!item->is_virtual());
- if (dyn_item->started) {
+ if (dyn_item->status != cache_item_status::not_started) {
/*
* This can actually happen when deps span over different layers
*/
- return dyn_item->finished;
+ msg_debug_cache_task("skip already started %s(%d) symbol", item->symbol.c_str(), item->id);
+
+ return dyn_item->status == cache_item_status::finished;
}
/* Check has been started */
- dyn_item->started = true;
auto check = true;
if (!item->is_allowed(task, true) || !item->check_conditions(task)) {
@@ -469,6 +475,7 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache,
}
if (check) {
+ dyn_item->status = cache_item_status::started;
msg_debug_cache_task("execute %s, %d; symbol type = %s", item->symbol.data(),
item->id, item_type_to_str(item->type));
@@ -482,24 +489,43 @@ auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache,
cur_item = dyn_item;
items_inflight++;
/* Callback now must finalize itself */
- item->call(task, dyn_item);
- cur_item = nullptr;
- if (items_inflight == 0) {
- return true;
- }
- if (dyn_item->async_events == 0 && !dyn_item->finished) {
- msg_err_cache_task("critical error: item %s has no async events pending, "
- "but it is not finalised",
- item->symbol.data());
- g_assert_not_reached();
- }
+ if (item->call(task, dyn_item)) {
+ cur_item = nullptr;
- return false;
+ if (items_inflight == 0) {
+ msg_debug_cache_task("item %s, %d is now finished (no async events)", item->symbol.data(),
+ item->id);
+ dyn_item->status = cache_item_status::finished;
+ return true;
+ }
+
+ if (dyn_item->async_events == 0 && dyn_item->status != cache_item_status::finished) {
+ msg_err_cache_task("critical error: item %s has no async events pending, "
+ "but it is not finalised",
+ item->symbol.data());
+ g_assert_not_reached();
+ }
+ else if (dyn_item->async_events > 0) {
+ msg_debug_cache_task("item %s, %d is now pending with %d async events", item->symbol.data(),
+ item->id, dyn_item->async_events);
+ }
+
+ return false;
+ }
+ else {
+ /* We were not able to call item, so we assume it is not callable */
+ msg_debug_cache_task("cannot call %s, %d; symbol type = %s", item->symbol.data(),
+ item->id, item_type_to_str(item->type));
+ dyn_item->status = cache_item_status::finished;
+ return true;
+ }
}
else {
- dyn_item->finished = true;
+ msg_debug_cache_task("do not check %s, %d", item->symbol.data(),
+ item->id);
+ dyn_item->status = cache_item_status::finished;
}
return true;
@@ -551,6 +577,9 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache
auto log_func = RSPAMD_LOG_FUNC;
auto inner_functor = [&](int recursion, cache_item *item, cache_dynamic_item *dyn_item, auto rec_functor) -> bool {
+ msg_debug_cache_task_lambda("recursively (%d) check dependencies for %s(%d)", recursion,
+ item->symbol.c_str(), item->id);
+
if (recursion > max_recursion) {
msg_err_task_lambda("cyclic dependencies: maximum check level %ud exceed when "
"checking dependencies for %s",
@@ -561,18 +590,18 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache
auto ret = true;
- for (const auto &dep: item->deps) {
+ for (const auto &[dest_id, dep]: item->deps) {
if (!dep.item) {
/* Assume invalid deps as done */
msg_debug_cache_task_lambda("symbol %d(%s) has invalid dependencies on %d(%s)",
- item->id, item->symbol.c_str(), dep.id, dep.sym.c_str());
+ item->id, item->symbol.c_str(), dest_id, dep.sym.c_str());
continue;
}
auto *dep_dyn_item = get_dynamic_item(dep.item->id);
- if (!dep_dyn_item->finished) {
- if (!dep_dyn_item->started) {
+ if (dep_dyn_item->status != cache_item_status::finished) {
+ if (dep_dyn_item->status == cache_item_status::not_started) {
/* Not started */
if (!check_only) {
if (!rec_functor(recursion + 1,
@@ -583,7 +612,7 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache
ret = false;
msg_debug_cache_task_lambda("delayed dependency %d(%s) for "
"symbol %d(%s)",
- dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
+ dest_id, dep.sym.c_str(), item->id, item->symbol.c_str());
}
else if (!process_symbol(task, cache, dep.item, dep_dyn_item)) {
/* Now started, but has events pending */
@@ -591,33 +620,36 @@ auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache
msg_debug_cache_task_lambda("started check of %d(%s) symbol "
"as dep for "
"%d(%s)",
- dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
+ dest_id, dep.sym.c_str(), item->id, item->symbol.c_str());
}
else {
msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is "
"already processed",
- dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
+ dest_id, dep.sym.c_str(), item->id, item->symbol.c_str());
}
}
else {
msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) "
"cannot be started now",
- dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
+ dest_id, dep.sym.c_str(), item->id, item->symbol.c_str());
ret = false;
}
}
else {
/* Started but not finished */
msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is "
- "still executing",
- dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
+ "still executing (%d events pending)",
+ dest_id, dep.sym.c_str(),
+ item->id, item->symbol.c_str(),
+ dep_dyn_item->async_events);
+ g_assert(dep_dyn_item->async_events > 0);
ret = false;
}
}
else {
msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is already "
- "checked",
- dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
+ "finished",
+ dest_id, dep.sym.c_str(), item->id, item->symbol.c_str());
}
}
@@ -654,7 +686,7 @@ rspamd_symcache_delayed_item_cb(EV_P_ ev_timer *w, int what)
if (cbd->event) {
cbd->event = nullptr;
- /* Timer will be stopped here */
+ /* Timer will be stopped here; `has_slow` is also reset there */
rspamd_session_remove_event(cbd->task->s,
rspamd_symcache_delayed_item_fin, cbd);
@@ -701,7 +733,7 @@ auto symcache_runtime::finalize_item(struct rspamd_task *task, cache_dynamic_ite
}
msg_debug_cache_task("process finalize for item %s(%d)", item->symbol.c_str(), item->id);
- dyn_item->finished = true;
+ dyn_item->status = cache_item_status::finished;
items_inflight--;
cur_item = nullptr;
@@ -732,45 +764,81 @@ auto symcache_runtime::finalize_item(struct rspamd_task *task, cache_dynamic_ite
}
else {
/* Just reset as no timer is added */
- has_slow = FALSE;
+ slow_status = slow_status::none;
return false;
}
return true;
};
- if (profile) {
+ /* Check if we need to profile symbol (always profile when we have seen this item to be slow */
+ if (profile || item->flags & cache_item::bit_slow) {
ev_now_update_if_cheap(task->event_loop);
auto diff = ((ev_now(task->event_loop) - profile_start) * 1e3 -
dyn_item->start_msec);
+ if (G_UNLIKELY(RSPAMD_TASK_IS_PROFILING(task))) {
+ rspamd_task_profile_set(task, item->symbol.c_str(), diff);
+ }
+
+ if (rspamd_worker_is_scanner(task->worker)) {
+ rspamd_set_counter(item->cd, diff);
+ }
+
if (diff > slow_diff_limit) {
- if (!has_slow) {
- has_slow = true;
+ item->internal_flags |= cache_item::bit_slow;
+
+ if (item->internal_flags & cache_item::bit_sync) {
+
+ /*
+ * We also need to adjust start timer for all async rules that
+ * are started before this rule, as this rule could delay them
+ * on its own. Hence, we need to make some corrections for all
+ * rules pending
+ */
+ bool need_slow = false;
+ for (const auto &[i, other_item]: rspamd::enumerate(order->d)) {
+ auto *other_dyn_item = &dynamic_items[i];
+
+ if (other_dyn_item->status == cache_item_status::pending && other_dyn_item->start_msec <= dyn_item->start_msec) {
+ other_dyn_item->start_msec += diff;
+
+ msg_debug_cache_task("slow sync rule %s(%d); adjust start time for pending rule %s(%d) by %.2fms to %dms",
+ item->symbol.c_str(), item->id,
+ other_item->symbol.c_str(),
+ other_item->id,
+ diff,
+ (int) other_dyn_item->start_msec);
+ /* We have something pending, so we need to enable slow timer */
+ need_slow = true;
+ }
+ }
- msg_info_task("slow rule: %s(%d): %.2f ms; enable slow timer delay",
- item->symbol.c_str(), item->id,
- diff);
+ if (need_slow && slow_status != slow_status::enabled) {
+ slow_status = slow_status::enabled;
- if (enable_slow_timer()) {
- /* Allow network execution */
- return;
+ msg_info_task("slow synchronous rule: %s(%d): %.2f ms; enable 100ms idle timer to allow other rules to be finished",
+ item->symbol.c_str(), item->id,
+ diff);
+ if (enable_slow_timer()) {
+ return;
+ }
+ }
+ else {
+ msg_info_task("slow synchronous rule: %s(%d): %.2f ms; idle timer has already been activated for this scan",
+ item->symbol.c_str(), item->id,
+ diff);
}
}
else {
- msg_info_task("slow rule: %s(%d): %.2f ms",
- item->symbol.c_str(), item->id,
- diff);
+ msg_notice_task("slow asynchronous rule: %s(%d): %.2f ms; no idle timer is needed",
+ item->symbol.c_str(), item->id,
+ diff);
}
}
-
- if (G_UNLIKELY(RSPAMD_TASK_IS_PROFILING(task))) {
- rspamd_task_profile_set(task, item->symbol.c_str(), diff);
- }
-
- if (rspamd_worker_is_scanner(task->worker)) {
- rspamd_set_counter(item->cd, diff);
+ else {
+ item->internal_flags &= ~cache_item::bit_slow;
}
}
@@ -786,10 +854,10 @@ auto symcache_runtime::process_item_rdeps(struct rspamd_task *task, cache_item *
return;
}
- for (const auto &rdep: item->rdeps) {
+ for (const auto &[id, rdep]: item->rdeps.values()) {
if (rdep.item) {
auto *dyn_item = get_dynamic_item(rdep.item->id);
- if (!dyn_item->started) {
+ if (dyn_item->status == cache_item_status::not_started) {
msg_debug_cache_task("check item %d(%s) rdep of %s ",
rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str());
diff --git a/src/libserver/symcache/symcache_runtime.hxx b/src/libserver/symcache/symcache_runtime.hxx
index aa8f66c0f..7e4a41269 100644
--- a/src/libserver/symcache/symcache_runtime.hxx
+++ b/src/libserver/symcache/symcache_runtime.hxx
@@ -1,5 +1,5 @@
/*
- * Copyright 2023 Vsevolod Stakhov
+ * Copyright 2024 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,6 +30,12 @@
struct rspamd_scan_result;
namespace rspamd::symcache {
+enum class cache_item_status : std::uint16_t {
+ not_started = 0,
+ started = 1,
+ pending = 2,
+ finished = 3,
+};
/**
* These items are saved within task structure and are used to track
* symbols execution.
@@ -38,18 +44,23 @@ namespace rspamd::symcache {
*/
struct cache_dynamic_item {
std::uint16_t start_msec; /* Relative to task time */
- bool started;
- bool finished;
+ cache_item_status status;
std::uint32_t async_events;
};
static_assert(sizeof(cache_dynamic_item) == sizeof(std::uint64_t));
static_assert(std::is_trivial_v<cache_dynamic_item>);
+
class symcache_runtime {
unsigned items_inflight;
+
+ enum class slow_status : std::uint8_t {
+ none = 0,
+ enabled = 1,
+ disabled = 2,
+ } slow_status;
bool profile;
- bool has_slow;
double profile_start;
double lim;
@@ -72,12 +83,7 @@ class symcache_runtime {
public:
/* Dropper for a shared ownership */
- auto savepoint_dtor() -> void
- {
-
- /* Drop shared ownership */
- order.reset();
- }
+ auto savepoint_dtor(struct rspamd_task *task) -> void;
/**
* Creates a cache runtime using task mempool
* @param task
@@ -199,7 +205,9 @@ public:
/* XXX: a helper to allow hiding internal implementation of the slow timer structure */
auto unset_slow() -> void
{
- has_slow = false;
+ if (slow_status == slow_status::enabled) {
+ slow_status = slow_status::disabled;
+ }
}
};