*/
total_hits = 0;
auto used_items = ord->d.size();
+ msg_debug_cache("topologically sort %d filters", used_items);
for (const auto &it: ord->d) {
if (it->order == 0) {
* We enrich ord with all other symbol types without any sorting,
* as it is done in another place
*/
- constexpr auto append_items_vec = [](const auto &vec, auto &out) {
+ const auto append_items_vec = [&](const auto &vec, auto &out, const char *what) {
+ msg_debug_cache_lambda("append %d items; type = %s", (int) vec.size(), what);
for (const auto &it: vec) {
if (it) {
out.emplace_back(it->getptr());
}
};
- append_items_vec(connfilters, ord->d);
- append_items_vec(prefilters, ord->d);
- append_items_vec(postfilters, ord->d);
- append_items_vec(idempotent, ord->d);
- append_items_vec(composites, ord->d);
- append_items_vec(classifiers, ord->d);
+ append_items_vec(connfilters, ord->d, "connection filters");
+ append_items_vec(prefilters, ord->d, "prefilters");
+ append_items_vec(postfilters, ord->d, "postfilters");
+ append_items_vec(idempotent, ord->d, "idempotent filters");
+ append_items_vec(composites, ord->d, "composites");
+ append_items_vec(classifiers, ord->d, "classifiers");
/* After sorting is done, we can assign all elements in the by_symbol hash */
for (const auto [i, it]: rspamd::enumerate(ord->d)) {
{
cache.maybe_resort();
- auto &&cur_order = cache.get_cache_order();
+ auto cur_order = cache.get_cache_order();
+ auto allocated_size = sizeof(symcache_runtime) +
+ sizeof(struct cache_dynamic_item) * cur_order->size();
auto *checkpoint = (symcache_runtime *) rspamd_mempool_alloc0(task->task_pool,
- sizeof(symcache_runtime) +
- sizeof(struct cache_dynamic_item) * cur_order->size());
-
- checkpoint->order = cache.get_cache_order();
-
+ allocated_size);
+ msg_debug_cache_task("create symcache runtime for task: %d bytes, %d items", (int) allocated_size, (int) cur_order->size());
+ checkpoint->order = std::move(cur_order);
+ checkpoint->slow_status = slow_status::none;
/* Calculate profile probability */
ev_now_update_if_cheap(task->event_loop);
ev_tstamp now = ev_now(task->event_loop);
return false;
}
+auto symcache_runtime::savepoint_dtor(struct rspamd_task *task) -> void
+{
+ msg_debug_cache_task("destroying savepoint");
+ /* Drop shared ownership */
+ order.reset();
+}
+
auto symcache_runtime::disable_all_symbols(int skip_mask) -> void
{
for (auto [i, item]: rspamd::enumerate(order->d)) {
auto *dyn_item = &dynamic_items[i];
if (!(item->get_flags() & skip_mask)) {
- dyn_item->finished = true;
- dyn_item->started = true;
+ dyn_item->status = cache_item_status::finished;
}
}
}
auto *dyn_item = get_dynamic_item(item->id);
if (dyn_item) {
- dyn_item->finished = true;
- dyn_item->started = true;
+ dyn_item->status = cache_item_status::finished;
msg_debug_cache_task("disable execution of %s", name.data());
return true;
auto *dyn_item = get_dynamic_item(item->id);
if (dyn_item) {
- dyn_item->finished = false;
- dyn_item->started = false;
+ dyn_item->status = cache_item_status::not_started;
msg_debug_cache_task("enable execution of %s", name.data());
return true;
auto *dyn_item = get_dynamic_item(item->id);
if (dyn_item) {
- return dyn_item->started;
+ return dyn_item->status != cache_item_status::not_started;
}
}
auto *dyn_item = get_dynamic_item(item->id);
if (dyn_item) {
- if (dyn_item->started) {
+ if (dyn_item->status != cache_item_status::not_started) {
/* Already started */
return false;
}
auto dyn_item = get_dynamic_item(item->id);
- if (!dyn_item->started && !dyn_item->finished) {
+ if (dyn_item->status == cache_item_status::not_started) {
if (slow_status == slow_status::enabled) {
return false;
}
auto dyn_item = &dynamic_items[idx];
- if (!dyn_item->started) {
+ if (dyn_item->status == cache_item_status::not_started) {
all_done = false;
if (!check_item_deps(task, cache, item.get(),
}
g_assert(!item->is_virtual());
- if (dyn_item->started) {
+ if (dyn_item->status != cache_item_status::not_started) {
/*
* This can actually happen when deps span over different layers
*/
- return dyn_item->finished;
+ msg_debug_cache_task("skip already started %s(%d) symbol", item->symbol.c_str(), item->id);
+
+ return dyn_item->status == cache_item_status::finished;
}
/* Check has been started */
- dyn_item->started = true;
auto check = true;
if (!item->is_allowed(task, true) || !item->check_conditions(task)) {
}
if (check) {
+ dyn_item->status = cache_item_status::started;
msg_debug_cache_task("execute %s, %d; symbol type = %s", item->symbol.data(),
item->id, item_type_to_str(item->type));
cur_item = dyn_item;
items_inflight++;
/* Callback now must finalize itself */
- item->call(task, dyn_item);
- cur_item = nullptr;
+ if (item->call(task, dyn_item)) {
+ cur_item = nullptr;
- if (items_inflight == 0) {
- return true;
- }
+ if (items_inflight == 0) {
+ msg_debug_cache_task("item %s, %d is now finished (no async events)", item->symbol.data(),
+ item->id);
+ dyn_item->status = cache_item_status::finished;
+ return true;
+ }
- if (dyn_item->async_events == 0 && !dyn_item->finished) {
- msg_err_cache_task("critical error: item %s has no async events pending, "
- "but it is not finalised",
- item->symbol.data());
- g_assert_not_reached();
- }
+ if (dyn_item->async_events == 0 && dyn_item->status != cache_item_status::finished) {
+ msg_err_cache_task("critical error: item %s has no async events pending, "
+ "but it is not finalised",
+ item->symbol.data());
+ g_assert_not_reached();
+ }
- return false;
+ msg_debug_cache_task("item %s, %d is now pending", item->symbol.data(),
+ item->id);
+ dyn_item->status = cache_item_status::pending;
+
+ return false;
+ }
+ else {
+ /* We were not able to call item, so we assume it is not callable */
+ msg_debug_cache_task("cannot call %s, %d; symbol type = %s", item->symbol.data(),
+ item->id, item_type_to_str(item->type));
+ dyn_item->status = cache_item_status::finished;
+ return true;
+ }
}
else {
- dyn_item->finished = true;
+ msg_debug_cache_task("do not check %s, %d", item->symbol.data(),
+ item->id);
+ dyn_item->status = cache_item_status::finished;
}
return true;
auto log_func = RSPAMD_LOG_FUNC;
auto inner_functor = [&](int recursion, cache_item *item, cache_dynamic_item *dyn_item, auto rec_functor) -> bool {
+ msg_debug_cache_task_lambda("recursively (%d) check dependencies for %s(%d)", recursion,
+ item->symbol.c_str(), item->id);
+
if (recursion > max_recursion) {
msg_err_task_lambda("cyclic dependencies: maximum check level %ud exceed when "
"checking dependencies for %s",
auto *dep_dyn_item = get_dynamic_item(dep.item->id);
- if (!dep_dyn_item->finished) {
- if (!dep_dyn_item->started) {
+ if (dep_dyn_item->status != cache_item_status::finished) {
+ if (dep_dyn_item->status == cache_item_status::not_started) {
/* Not started */
if (!check_only) {
if (!rec_functor(recursion + 1,
else {
/* Started but not finished */
msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is "
- "still executing",
- dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
+ "still executing (%d events pending)",
+ dep.id, dep.sym.c_str(),
+ item->id, item->symbol.c_str(),
+ dep_dyn_item->async_events);
+ g_assert(dep_dyn_item->async_events > 0);
ret = false;
}
}
else {
msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is already "
- "checked",
+ "finished",
dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
}
}
}
msg_debug_cache_task("process finalize for item %s(%d)", item->symbol.c_str(), item->id);
- dyn_item->finished = true;
+ dyn_item->status = cache_item_status::finished;
items_inflight--;
cur_item = nullptr;
for (const auto &rdep: item->rdeps) {
if (rdep.item) {
auto *dyn_item = get_dynamic_item(rdep.item->id);
- if (!dyn_item->started) {
+ if (dyn_item->status == cache_item_status::not_started) {
msg_debug_cache_task("check item %d(%s) rdep of %s ",
rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str());