"symcache", log_tag(), \
RSPAMD_LOG_FUNC, \
__VA_ARGS__)
+#define msg_err_cache_task(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
+ "symcache", task->task_pool->tag.uid, \
+ RSPAMD_LOG_FUNC, \
+ __VA_ARGS__)
#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
"symcache", log_tag(), \
RSPAMD_LOG_FUNC, \
}
}
+ /**
+ * Iterate over all composites using a specific functor
+ * @tparam Functor
+ * @param f
+ */
+ template<typename Functor>
+ auto connfilters_foreach(Functor f) -> bool {
+ return std::all_of(std::begin(connfilters), std::end(connfilters),
+ [&](const auto &sym_it){
+ return f(sym_it.get());
+ });
+ }
+ template<typename Functor>
+ auto prefilters_foreach(Functor f) -> bool {
+ return std::all_of(std::begin(prefilters), std::end(prefilters),
+ [&](const auto &sym_it){
+ return f(sym_it.get());
+ });
+ }
+ template<typename Functor>
+ auto postfilters_foreach(Functor f) -> bool {
+ return std::all_of(std::begin(postfilters), std::end(postfilters),
+ [&](const auto &sym_it){
+ return f(sym_it.get());
+ });
+ }
+ template<typename Functor>
+ auto idempotent_foreach(Functor f) -> bool {
+ return std::all_of(std::begin(idempotent), std::end(idempotent),
+ [&](const auto &sym_it){
+ return f(sym_it.get());
+ });
+ }
+ template<typename Functor>
+ auto filters_foreach(Functor f) -> bool {
+ return std::all_of(std::begin(filters), std::end(filters),
+ [&](const auto &sym_it){
+ return f(sym_it.get());
+ });
+ }
+
/**
* Resort cache if anything has been changed since last time
* @return
#include "symcache_runtime.hxx"
#include "libutil/cxx/util.hxx"
#include "libserver/task.h"
+#include "libmime/scan_result.h"
+#include <limits>
namespace rspamd::symcache {
constexpr static const auto PROFILE_PROBABILITY = 0.01;
auto
-symcache_runtime::create_savepoint(struct rspamd_task *task, symcache &cache) -> symcache_runtime *
+symcache_runtime::create(struct rspamd_task *task, symcache &cache) -> symcache_runtime *
{
cache.maybe_resort();
rspamd_mempool_add_destructor(task->task_pool,
symcache_runtime::savepoint_dtor, checkpoint);
- for (auto &pair : checkpoint->last_id_mappings) {
+ for (auto &pair: checkpoint->last_id_mappings) {
pair.first = -1;
pair.second = -1;
}
{
/* Lookup in cache */
if (save_in_cache) {
- for (const auto &cache_id : last_id_mappings) {
+ for (const auto &cache_id: last_id_mappings) {
if (cache_id.first == -1) {
break;
}
return nullptr;
}
+auto symcache_runtime::process_symbols(struct rspamd_task *task, symcache &cache, int stage) -> bool
+{
+ msg_debug_cache_task("symbols processing stage at pass: %d", stage);
+
+ if (RSPAMD_TASK_IS_SKIPPED(task)) {
+ return true;
+ }
+
+ switch (stage) {
+ case RSPAMD_TASK_STAGE_CONNFILTERS:
+ case RSPAMD_TASK_STAGE_PRE_FILTERS:
+ case RSPAMD_TASK_STAGE_POST_FILTERS:
+ case RSPAMD_TASK_STAGE_IDEMPOTENT:
+ return process_pre_postfilters(task, cache,
+ rspamd_session_events_pending(task->s), stage);
+ break;
+
+ case RSPAMD_TASK_STAGE_FILTERS:
+ return process_filters(task, cache,rspamd_session_events_pending(task->s));
+ break;
+
+ default:
+ g_assert_not_reached ();
+ }
+}
+
+auto
+symcache_runtime::process_pre_postfilters(struct rspamd_task *task,
+ symcache &cache,
+ int start_events,
+ int stage) -> bool
+{
+ auto saved_priority = std::numeric_limits<int>::min();
+ auto all_done = true;
+ auto compare_functor = +[](int a, int b) { return a < b; };
+
+ auto proc_func = [&](cache_item *item) {
+ auto dyn_item = get_dynamic_item(item->id, true);
+
+ if (!dyn_item->started && !dyn_item->finished) {
+ if (has_slow) {
+ /* Delay */
+ has_slow = false;
+
+ return false;
+ }
+
+ if (saved_priority == std::numeric_limits<int>::min()) {
+ saved_priority = item->priority;
+ }
+ else {
+ if (compare_functor(item->priority, saved_priority) &&
+ rspamd_session_events_pending(task->s) > start_events) {
+ /*
+ * Delay further checks as we have higher
+ * priority filters to be processed
+ */
+ return false;
+ }
+ }
+
+ process_symbol(task, cache, item, dyn_item);
+ all_done = false;
+ }
+
+ /* Continue processing */
+ return true;
+ };
+
+ switch (stage) {
+ case RSPAMD_TASK_STAGE_CONNFILTERS:
+ all_done = cache.connfilters_foreach(proc_func);
+ break;
+ case RSPAMD_TASK_STAGE_PRE_FILTERS:
+ all_done = cache.prefilters_foreach(proc_func);
+ break;
+ case RSPAMD_TASK_STAGE_POST_FILTERS:
+ compare_functor = +[](int a, int b) { return a > b; };
+ all_done = cache.postfilters_foreach(proc_func);
+ break;
+ case RSPAMD_TASK_STAGE_IDEMPOTENT:
+ compare_functor = +[](int a, int b) { return a > b; };
+ all_done = cache.idempotent_foreach(proc_func);
+ break;
+ default:
+ g_error("invalid invocation");
+ break;
+ }
+
+ return all_done;
+}
+
+auto
+symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool
+{
+ auto all_done = true;
+
+ cache.filters_foreach([&](cache_item *item) -> bool {
+ if (item->type == symcache_item_type::CLASSIFIER) {
+ return true;
+ }
+
+ auto dyn_item = get_dynamic_item(item->id, true);
+
+ if (!dyn_item->started && !dyn_item->finished) {
+ all_done = false;
+
+ if (!rspamd_symcache_check_deps(task, cache, item,
+ checkpoint, 0, FALSE)) {
+ msg_debug_cache_task ("blocked execution of %d(%s) unless deps are "
+ "resolved", item->id, item->symbol.c_str());
+
+ return true;
+ }
+
+ process_symbol(task, cache, item, dyn_item);
+
+ if (has_slow) {
+ /* Delay */
+ has_slow = false;
+
+ return false;
+ }
+ }
+
+ if (!(item->flags & SYMBOL_TYPE_FINE)) {
+ if (rspamd_symcache_metric_limit(task, checkpoint)) {
+ msg_info_task ("task has already scored more than %.2f, so do "
+ "not "
+ "plan more checks",
+ rs->score);
+ all_done = true;
+ return false;
+ }
+ }
+ });
+
+ return all_done;
+}
+
+auto
+symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, cache_item *item,
+ cache_dynamic_item *dyn_item) -> bool
+{
+ if (item->type == symcache_item_type::CLASSIFIER || item->type == symcache_item_type::COMPOSITE) {
+ /* Classifiers are special :( */
+ return true;
+ }
+
+ if (rspamd_session_blocked(task->s)) {
+ /*
+ * We cannot add new events as session is either destroyed or
+ * being cleaned up.
+ */
+ return true;
+ }
+
+ g_assert (!item->is_virtual());
+ if (dyn_item->started) {
+ /*
+ * This can actually happen when deps span over different layers
+ */
+ return dyn_item->finished;
+ }
+
+ /* Check has been started */
+ dyn_item->started = true;
+ auto check = true;
+
+ if (!item->is_allowed(task, true) || !item->check_conditions(task)) {
+ check = false;
+ }
+
+ if (check) {
+ msg_debug_cache_task("execute %s, %d; symbol type = %s", item->symbol.data(),
+ item->id);
+
+ if (profile) {
+ ev_now_update_if_cheap(task->event_loop);
+ dyn_item->start_msec = (ev_now(task->event_loop) -
+ profile_start) * 1e3;
+ }
+
+ dyn_item->async_events = 0;
+ cur_item = item;
+ items_inflight++;
+ /* Callback now must finalize itself */
+ item->call(task);
+ cur_item = NULL;
+
+ if (items_inflight == 0) {
+ return true;
+ }
+
+ if (dyn_item->async_events == 0 && !dyn_item->finished) {
+ msg_err_cache_task("critical error: item %s has no async events pending, "
+ "but it is not finalised", item->symbol.data());
+ g_assert_not_reached ();
+ }
+
+ return false;
+ }
+ else {
+ dyn_item->finished = true;
+ }
+
+ return true;
+}
+
}
#include "symcache_internal.hxx"
+struct rspamd_scan_result;
+
namespace rspamd::symcache {
/**
* These items are saved within task structure and are used to track
double profile_start;
double lim;
- struct rspamd_scan_result *rs;
+ struct ::rspamd_scan_result *rs;
struct cache_item *cur_item;
order_generation_ptr order;
/* Drop shared ownership */
real_savepoint->order.reset();
}
+
+ auto process_symbol(struct rspamd_task *task, symcache &cache, cache_item *item,
+ cache_dynamic_item *dyn_item) -> bool;
+ /* Specific stages of the processing */
+ auto process_pre_postfilters(struct rspamd_task *task, symcache &cache, int start_events, int stage) -> bool;
+ auto process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool;
+
public:
/**
* Creates a cache runtime using task mempool
* @param cache
* @return
*/
- static auto create_savepoint(struct rspamd_task *task, symcache &cache) -> symcache_runtime *;
+ static auto create(struct rspamd_task *task, symcache &cache) -> symcache_runtime *;
/**
* Process task settings
* @param task
* @return
*/
auto get_dynamic_item(int id, bool save_in_cache) const -> cache_dynamic_item *;
+
+ /**
+ * Process symbols in the cache
+ * @param task
+ * @param cache
+ * @param stage
+ * @return
+ */
+ auto process_symbols(struct rspamd_task *task, symcache &cache, int stage) -> bool;
};