diff options
author | Mehmet Suslu <msuslu@gmail.com> | 2022-05-05 16:32:45 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-05 16:32:45 +0300 |
commit | 95764f816a9e1251a755c6edad339637345cfe28 (patch) | |
tree | 82b6133183d3a2454ad9864910ae163c5c877a13 /src/libserver | |
parent | 7ac2008888c1373cc59d948c9aa97a14e8001f77 (diff) | |
parent | ba79557df94aa5e8756914338063e59a61ac6328 (diff) | |
download | rspamd-95764f816a9e1251a755c6edad339637345cfe28.tar.gz rspamd-95764f816a9e1251a755c6edad339637345cfe28.zip |
Merge branch 'rspamd:master' into master
Diffstat (limited to 'src/libserver')
-rw-r--r-- | src/libserver/CMakeLists.txt | 7 | ||||
-rw-r--r-- | src/libserver/cfg_file.h | 2 | ||||
-rw-r--r-- | src/libserver/cfg_utils.c | 10 | ||||
-rw-r--r-- | src/libserver/dkim.c | 28 | ||||
-rw-r--r-- | src/libserver/rspamd_symcache.c | 3857 | ||||
-rw-r--r-- | src/libserver/rspamd_symcache.h | 105 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_c.cxx | 557 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_id_list.hxx | 176 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_impl.cxx | 1063 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_internal.hxx | 464 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_item.cxx | 500 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_item.hxx | 432 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_periodic.hxx | 90 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_runtime.cxx | 829 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_runtime.hxx | 203 | ||||
-rw-r--r-- | src/libserver/task.h | 14 |
16 files changed, 4368 insertions, 3969 deletions
diff --git a/src/libserver/CMakeLists.txt b/src/libserver/CMakeLists.txt index a4fdbbfcb..d287f44c1 100644 --- a/src/libserver/CMakeLists.txt +++ b/src/libserver/CMakeLists.txt @@ -16,11 +16,14 @@ SET(LIBRSPAMDSERVERSRC ${CMAKE_CURRENT_SOURCE_DIR}/monitored.c ${CMAKE_CURRENT_SOURCE_DIR}/protocol.c ${CMAKE_CURRENT_SOURCE_DIR}/re_cache.c - ${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.cxx + ${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.cxx ${CMAKE_CURRENT_SOURCE_DIR}/roll_history.c ${CMAKE_CURRENT_SOURCE_DIR}/spf.c ${CMAKE_CURRENT_SOURCE_DIR}/ssl_util.c - ${CMAKE_CURRENT_SOURCE_DIR}/rspamd_symcache.c + ${CMAKE_CURRENT_SOURCE_DIR}/symcache/symcache_impl.cxx + ${CMAKE_CURRENT_SOURCE_DIR}/symcache/symcache_item.cxx + ${CMAKE_CURRENT_SOURCE_DIR}/symcache/symcache_runtime.cxx + ${CMAKE_CURRENT_SOURCE_DIR}/symcache/symcache_c.cxx ${CMAKE_CURRENT_SOURCE_DIR}/task.c ${CMAKE_CURRENT_SOURCE_DIR}/url.c ${CMAKE_CURRENT_SOURCE_DIR}/worker_util.c diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h index 7532639a7..18524af8d 100644 --- a/src/libserver/cfg_file.h +++ b/src/libserver/cfg_file.h @@ -146,7 +146,7 @@ struct rspamd_symbol { struct rspamd_symbols_group *gr; /* Main group */ GPtrArray *groups; /* Other groups */ guint flags; - struct rspamd_symcache_item *cache_item; + void *cache_item; gint nshots; }; diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c index 9a96f363c..c9fa31d21 100644 --- a/src/libserver/cfg_utils.c +++ b/src/libserver/cfg_utils.c @@ -249,6 +249,9 @@ rspamd_config_new (enum rspamd_config_init_flags flags) cfg->lua_gc_step = DEFAULT_LUA_GC_STEP; cfg->full_gc_iters = DEFAULT_GC_MAXITERS; + /* Default hyperscan cache */ + cfg->hs_cache_dir = RSPAMD_DBDIR "/"; + if (!(flags & RSPAMD_CONFIG_INIT_SKIP_LUA)) { cfg->lua_state = rspamd_lua_init (flags & RSPAMD_CONFIG_INIT_WIPE_LUA_MEM); cfg->own_lua_state = TRUE; @@ -2860,7 +2863,7 @@ rspamd_libs_reset_decompression (struct rspamd_external_libs_ctx *ctx) return FALSE; } else { - r = ZSTD_resetDStream (ctx->in_zstream); + r = ZSTD_DCtx_reset (ctx->in_zstream, ZSTD_reset_session_only); if (ZSTD_isError (r)) { msg_err ("cannot init decompression stream: %s", @@ -2885,7 +2888,10 @@ rspamd_libs_reset_compression (struct rspamd_external_libs_ctx *ctx) } else { /* Dictionary will be reused automatically if specified */ - r = ZSTD_resetCStream (ctx->out_zstream, 0); + r = ZSTD_CCtx_reset (ctx->out_zstream, ZSTD_reset_session_only); + if (!ZSTD_isError (r)) { + r = ZSTD_CCtx_setPledgedSrcSize (ctx->out_zstream, 0); + } if (ZSTD_isError (r)) { msg_err ("cannot init compression stream: %s", diff --git a/src/libserver/dkim.c b/src/libserver/dkim.c index f37fc1005..4bf96b1b6 100644 --- a/src/libserver/dkim.c +++ b/src/libserver/dkim.c @@ -151,6 +151,7 @@ struct rspamd_dkim_context_s { struct rspamd_dkim_key_s { guint8 *keydata; + guint8 *raw_key; gsize keylen; gsize decoded_len; gchar key_id[RSPAMD_DKIM_KEY_ID_LEN]; @@ -1332,11 +1333,26 @@ rspamd_dkim_make_key (const gchar *keydata, key = g_malloc0 (sizeof (rspamd_dkim_key_t)); REF_INIT_RETAIN (key, rspamd_dkim_key_free); key->keydata = g_malloc0 (keylen + 1); + key->raw_key = g_malloc (keylen); key->decoded_len = keylen; - key->keylen = keylen; key->type = type; - if (!rspamd_cryptobox_base64_decode (keydata, keylen, key->keydata, + /* Copy key skipping all spaces and newlines */ + const char *h = keydata; + guint8 *t = key->raw_key; + + while (h - keydata < keylen) { + if (!g_ascii_isspace(*h)) { + *t++ = *h++; + } + else { + h++; + } + } + + key->keylen = t - key->raw_key; + + if (!rspamd_cryptobox_base64_decode (key->raw_key, key->keylen, key->keydata, &key->decoded_len)) { REF_RELEASE (key); g_set_error (err, @@ -1470,6 +1486,7 @@ rspamd_dkim_key_free (rspamd_dkim_key_t *key) BIO_free (key->key_bio); } + g_free (key->raw_key); g_free (key->keydata); g_free (key); } @@ -1578,13 +1595,6 @@ rspamd_dkim_parse_key (const gchar *txt, gsize *keylen, GError **err) tag = '\0'; p++; } - else if (g_ascii_isspace (*p)) { - klen = p - c; - key = c; - state = skip_spaces; - next_state = read_tag; - tag = '\0'; - } else { p ++; } diff --git a/src/libserver/rspamd_symcache.c b/src/libserver/rspamd_symcache.c deleted file mode 100644 index d2989d213..000000000 --- a/src/libserver/rspamd_symcache.c +++ /dev/null @@ -1,3857 +0,0 @@ -/*- - * Copyright 2016 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "config.h" -#include "util.h" -#include "rspamd.h" -#include "message.h" -#include "rspamd_symcache.h" -#include "cfg_file.h" -#include "lua/lua_common.h" -#include "unix-std.h" -#include "contrib/t1ha/t1ha.h" -#include "libserver/worker_util.h" -#include "khash.h" -#include "utlist.h" -#include <math.h> - -#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L -# include <stdalign.h> -#endif - -#define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ - cache->static_pool->tag.tagname, cache->cfg->checksum, \ - G_STRFUNC, \ - __VA_ARGS__) -#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ - cache->static_pool->tag.tagname, cache->cfg->checksum, \ - G_STRFUNC, \ - __VA_ARGS__) -#define msg_info_cache(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ - cache->static_pool->tag.tagname, cache->cfg->checksum, \ - G_STRFUNC, \ - __VA_ARGS__) -#define msg_debug_cache(...) rspamd_conditional_debug_fast (NULL, NULL, \ - rspamd_symcache_log_id, "symcache", cache->cfg->checksum, \ - G_STRFUNC, \ - __VA_ARGS__) -#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \ - rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \ - G_STRFUNC, \ - __VA_ARGS__) - -INIT_LOG_MODULE(symcache) - -#define CHECK_START_BIT(checkpoint, dyn_item) \ - ((dyn_item)->started) -#define SET_START_BIT(checkpoint, dyn_item) \ - (dyn_item)->started = 1 -#define CLR_START_BIT(checkpoint, dyn_item) \ - (dyn_item)->started = 0 - -#define CHECK_FINISH_BIT(checkpoint, dyn_item) \ - ((dyn_item)->finished) -#define SET_FINISH_BIT(checkpoint, dyn_item) \ - (dyn_item)->finished = 1 -#define CLR_FINISH_BIT(checkpoint, dyn_item) \ - (dyn_item)->finished = 0 -static const guchar rspamd_symcache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0 }; - -struct rspamd_symcache_header { - guchar magic[8]; - guint nitems; - guchar checksum[64]; - guchar unused[128]; -}; - -struct symcache_order { - GPtrArray *d; - guint id; - ref_entry_t ref; -}; - -/* - * This structure is optimised to store ids list: - * - If the first element is -1 then use dynamic part, else use static part - */ -struct rspamd_symcache_id_list { - union { - guint32 st[4]; - struct { - guint32 e; /* First element */ - guint16 len; - guint16 allocated; - guint *n; - } dyn; - }; -}; - -struct rspamd_symcache_condition { - gint cb; - struct rspamd_symcache_condition *prev, *next; -}; - -struct rspamd_symcache_item { - /* This block is likely shared */ - struct rspamd_symcache_item_stat *st; - - guint64 last_count; - struct rspamd_counter_data *cd; - gchar *symbol; - const gchar *type_descr; - gint type; - - /* Callback data */ - union { - struct { - symbol_func_t func; - gpointer user_data; - struct rspamd_symcache_condition *conditions; - } normal; - struct { - gint parent; - struct rspamd_symcache_item *parent_item; - } virtual; - } specific; - - /* Condition of execution */ - gboolean enabled; - /* Used for async stuff checks */ - gboolean is_filter; - gboolean is_virtual; - - /* Priority */ - gint priority; - /* Topological order */ - guint order; - gint id; - gint frequency_peaks; - /* Settings ids */ - struct rspamd_symcache_id_list allowed_ids; - /* Allows execution but not symbols insertion */ - struct rspamd_symcache_id_list exec_only_ids; - struct rspamd_symcache_id_list forbidden_ids; - - /* Dependencies */ - GPtrArray *deps; - GPtrArray *rdeps; - - /* Container */ - GPtrArray *container; -}; - -struct rspamd_symcache { - /* Hash table for fast access */ - GHashTable *items_by_symbol; - GPtrArray *items_by_id; - struct symcache_order *items_by_order; - GPtrArray *connfilters; - GPtrArray *prefilters; - GPtrArray *filters; - GPtrArray *postfilters; - GPtrArray *composites; - GPtrArray *idempotent; - GPtrArray *virtual; - GList *delayed_deps; - GList *delayed_conditions; - rspamd_mempool_t *static_pool; - guint64 cksum; - gdouble total_weight; - guint used_items; - guint stats_symbols_count; - guint64 total_hits; - guint id; - struct rspamd_config *cfg; - gdouble reload_time; - gdouble last_profile; - gint peak_cb; -}; - -struct rspamd_symcache_dynamic_item { - guint16 start_msec; /* Relative to task time */ - unsigned started:1; - unsigned finished:1; - /* unsigned pad:14; */ - guint32 async_events; -}; - - - -struct cache_dependency { - struct rspamd_symcache_item *item; /* Real dependency */ - gchar *sym; /* Symbolic dep name */ - gint id; /* Real from */ - gint vid; /* Virtual from */ -}; - -struct delayed_cache_dependency { - gchar *from; - gchar *to; -}; - -struct delayed_cache_condition { - gchar *sym; - gint cbref; - lua_State *L; -}; - -struct cache_savepoint { - guint version; - guint items_inflight; - gboolean profile; - gboolean has_slow; - gdouble profile_start; - - struct rspamd_scan_result *rs; - gdouble lim; - - struct rspamd_symcache_item *cur_item; - struct symcache_order *order; - struct rspamd_symcache_dynamic_item dynamic_items[]; -}; - -struct rspamd_cache_refresh_cbdata { - gdouble last_resort; - ev_timer resort_ev; - struct rspamd_symcache *cache; - struct rspamd_worker *w; - struct ev_loop *event_loop; -}; - -/* At least once per minute */ -#define PROFILE_MAX_TIME (60.0) -/* For messages larger than 2Mb enable profiling */ -#define PROFILE_MESSAGE_SIZE_THRESHOLD (1024 * 1024 * 2) -/* Enable profile at least once per this amount of messages processed */ -#define PROFILE_PROBABILITY (0.01) - -/* weight, frequency, time */ -#define TIME_ALPHA (1.0) -#define WEIGHT_ALPHA (0.1) -#define FREQ_ALPHA (0.01) -#define SCORE_FUN(w, f, t) (((w) > 0 ? (w) : WEIGHT_ALPHA) \ - * ((f) > 0 ? (f) : FREQ_ALPHA) \ - / (t > TIME_ALPHA ? t : TIME_ALPHA)) - -static gboolean rspamd_symcache_check_symbol (struct rspamd_task *task, - struct rspamd_symcache *cache, - struct rspamd_symcache_item *item, - struct cache_savepoint *checkpoint); -static gboolean rspamd_symcache_check_deps (struct rspamd_task *task, - struct rspamd_symcache *cache, - struct rspamd_symcache_item *item, - struct cache_savepoint *checkpoint, - guint recursion, - gboolean check_only); -static void rspamd_symcache_disable_symbol_checkpoint (struct rspamd_task *task, - struct rspamd_symcache *cache, const gchar *symbol); -static void rspamd_symcache_enable_symbol_checkpoint (struct rspamd_task *task, - struct rspamd_symcache *cache, const gchar *symbol); - -static void -rspamd_symcache_order_dtor (gpointer p) -{ - struct symcache_order *ord = p; - - g_ptr_array_free (ord->d, TRUE); - g_free (ord); -} - -static void -rspamd_symcache_order_unref (gpointer p) -{ - struct symcache_order *ord = p; - - REF_RELEASE (ord); -} - -static gint -rspamd_id_cmp (const void * a, const void * b) -{ - return (*(guint32*)a - *(guint32*)b); -} - -static struct symcache_order * -rspamd_symcache_order_new (struct rspamd_symcache *cache, - gsize nelts) -{ - struct symcache_order *ord; - - ord = g_malloc0 (sizeof (*ord)); - ord->d = g_ptr_array_sized_new (nelts); - ord->id = cache->id; - REF_INIT_RETAIN (ord, rspamd_symcache_order_dtor); - - return ord; -} - -static inline struct rspamd_symcache_dynamic_item* -rspamd_symcache_get_dynamic (struct cache_savepoint *checkpoint, - struct rspamd_symcache_item *item) -{ - return &checkpoint->dynamic_items[item->id]; -} - -static inline struct rspamd_symcache_item * -rspamd_symcache_find_filter (struct rspamd_symcache *cache, - const gchar *name, - bool resolve_parent) -{ - struct rspamd_symcache_item *item; - - g_assert (cache != NULL); - - if (name == NULL) { - return NULL; - } - - item = g_hash_table_lookup (cache->items_by_symbol, name); - - if (item != NULL) { - - if (resolve_parent && item->is_virtual && !(item->type & SYMBOL_TYPE_GHOST)) { - item =item->specific.virtual.parent_item; - } - - return item; - } - - return NULL; -} - -const gchar * -rspamd_symcache_get_parent (struct rspamd_symcache *cache, - const gchar *symbol) -{ - struct rspamd_symcache_item *item, *parent; - - g_assert (cache != NULL); - - if (symbol == NULL) { - return NULL; - } - - item = g_hash_table_lookup (cache->items_by_symbol, symbol); - - if (item != NULL) { - - if (item->is_virtual && !(item->type & SYMBOL_TYPE_GHOST)) { - parent = item->specific.virtual.parent_item; - - if (!parent) { - item->specific.virtual.parent_item = g_ptr_array_index (cache->items_by_id, - item->specific.virtual.parent); - parent = item->specific.virtual.parent_item; - } - - item = parent; - } - - return item->symbol; - } - - return NULL; -} - -static gint -postfilters_cmp (const void *p1, const void *p2, gpointer ud) -{ - const struct rspamd_symcache_item *i1 = *(struct rspamd_symcache_item **)p1, - *i2 = *(struct rspamd_symcache_item **)p2; - double w1, w2; - - w1 = i1->priority; - w2 = i2->priority; - - if (w1 > w2) { - return 1; - } - else if (w1 < w2) { - return -1; - } - - return 0; -} - -static gint -prefilters_cmp (const void *p1, const void *p2, gpointer ud) -{ - const struct rspamd_symcache_item *i1 = *(struct rspamd_symcache_item **)p1, - *i2 = *(struct rspamd_symcache_item **)p2; - double w1, w2; - - w1 = i1->priority; - w2 = i2->priority; - - if (w1 < w2) { - return 1; - } - else if (w1 > w2) { - return -1; - } - - return 0; -} - -#define TSORT_MARK_PERM(it) (it)->order |= (1u << 31) -#define TSORT_MARK_TEMP(it) (it)->order |= (1u << 30) -#define TSORT_IS_MARKED_PERM(it) ((it)->order & (1u << 31)) -#define TSORT_IS_MARKED_TEMP(it) ((it)->order & (1u << 30)) -#define TSORT_UNMASK(it) ((it)->order & ~((1u << 31) | (1u << 30))) - -static gint -cache_logic_cmp (const void *p1, const void *p2, gpointer ud) -{ - const struct rspamd_symcache_item *i1 = *(struct rspamd_symcache_item **)p1, - *i2 = *(struct rspamd_symcache_item **)p2; - struct rspamd_symcache *cache = ud; - double w1, w2; - double weight1, weight2; - double f1 = 0, f2 = 0, t1, t2, avg_freq, avg_weight; - guint o1 = TSORT_UNMASK (i1), o2 = TSORT_UNMASK (i2); - - - if (o1 == o2) { - /* Heuristic */ - if (i1->priority == i2->priority) { - avg_freq = ((gdouble) cache->total_hits / cache->used_items); - avg_weight = (cache->total_weight / cache->used_items); - f1 = (double) i1->st->total_hits / avg_freq; - f2 = (double) i2->st->total_hits / avg_freq; - weight1 = fabs (i1->st->weight) / avg_weight; - weight2 = fabs (i2->st->weight) / avg_weight; - t1 = i1->st->avg_time; - t2 = i2->st->avg_time; - w1 = SCORE_FUN (weight1, f1, t1); - w2 = SCORE_FUN (weight2, f2, t2); - } else { - /* Strict sorting */ - w1 = abs (i1->priority); - w2 = abs (i2->priority); - } - } - else { - w1 = o1; - w2 = o2; - } - - if (w2 > w1) { - return 1; - } - else if (w2 < w1) { - return -1; - } - - return 0; -} - -static void -rspamd_symcache_tsort_visit (struct rspamd_symcache *cache, - struct rspamd_symcache_item *it, - guint cur_order) -{ - struct cache_dependency *dep; - guint i; - - if (TSORT_IS_MARKED_PERM (it)) { - if (cur_order > TSORT_UNMASK (it)) { - /* Need to recalculate the whole chain */ - it->order = cur_order; /* That also removes all masking */ - } - else { - /* We are fine, stop DFS */ - return; - } - } - else if (TSORT_IS_MARKED_TEMP (it)) { - msg_err_cache ("cyclic dependencies found when checking '%s'!", - it->symbol); - return; - } - - TSORT_MARK_TEMP (it); - msg_debug_cache ("visiting node: %s (%d)", it->symbol, cur_order); - - PTR_ARRAY_FOREACH (it->deps, i, dep) { - msg_debug_cache ("visiting dep: %s (%d)", dep->item->symbol, cur_order + 1); - rspamd_symcache_tsort_visit (cache, dep->item, cur_order + 1); - } - - it->order = cur_order; - - TSORT_MARK_PERM (it); -} - -static void -rspamd_symcache_resort (struct rspamd_symcache *cache) -{ - struct symcache_order *ord; - guint i; - guint64 total_hits = 0; - struct rspamd_symcache_item *it; - - ord = rspamd_symcache_order_new (cache, cache->filters->len); - - for (i = 0; i < cache->filters->len; i ++) { - it = g_ptr_array_index (cache->filters, i); - total_hits += it->st->total_hits; - it->order = 0; - g_ptr_array_add (ord->d, it); - } - - /* Topological sort, intended to be O(N) but my implementation - * is not linear (semi-linear usually) as I want to make it as - * simple as possible. - * On each stage it does DFS for unseen nodes. In theory, that - * can be more complicated than linear - O(N^2) for specially - * crafted data. But I don't care. - */ - PTR_ARRAY_FOREACH (ord->d, i, it) { - if (it->order == 0) { - rspamd_symcache_tsort_visit (cache, it, 1); - } - } - - /* - * Now we have all sorted and can do some heuristical sort, keeping - * topological order invariant - */ - g_ptr_array_sort_with_data (ord->d, cache_logic_cmp, cache); - cache->total_hits = total_hits; - - if (cache->items_by_order) { - REF_RELEASE (cache->items_by_order); - } - - cache->items_by_order = ord; -} - -static void -rspamd_symcache_propagate_dep (struct rspamd_symcache *cache, - struct rspamd_symcache_item *it, - struct rspamd_symcache_item *dit) -{ - const guint *ids; - guint nids = 0; - - msg_debug_cache ("check id propagation for dependency %s from %s", - it->symbol, dit->symbol); - ids = rspamd_symcache_get_allowed_settings_ids (cache, dit->symbol, &nids); - - /* TODO: merge? */ - if (nids > 0) { - msg_info_cache ("propagate allowed ids from %s to %s", - dit->symbol, it->symbol); - - rspamd_symcache_set_allowed_settings_ids (cache, it->symbol, ids, - nids); - } - - ids = rspamd_symcache_get_forbidden_settings_ids (cache, dit->symbol, &nids); - - if (nids > 0) { - msg_info_cache ("propagate forbidden ids from %s to %s", - dit->symbol, it->symbol); - - rspamd_symcache_set_forbidden_settings_ids (cache, it->symbol, ids, - nids); - } -} - -static void -rspamd_symcache_process_dep (struct rspamd_symcache *cache, - struct rspamd_symcache_item *it, - struct cache_dependency *dep) -{ - struct rspamd_symcache_item *dit = NULL, *vdit = NULL; - struct cache_dependency *rdep; - - if (dep->id >= 0) { - msg_debug_cache ("process real dependency %s on %s", it->symbol, dep->sym); - dit = rspamd_symcache_find_filter (cache, dep->sym, true); - } - - if (dep->vid >= 0) { - /* Case of the virtual symbol that depends on another (maybe virtual) symbol */ - vdit = rspamd_symcache_find_filter (cache, dep->sym, false); - - if (!vdit) { - if (dit) { - msg_err_cache ("cannot add dependency from %s on %s: no dependency symbol registered", - dep->sym, dit->symbol); - } - } - else { - msg_debug_cache ("process virtual dependency %s(%d) on %s(%d)", it->symbol, - dep->vid, vdit->symbol, vdit->id); - } - } - else { - vdit = dit; - } - - if (dit != NULL) { - if (!dit->is_filter) { - /* - * Check sanity: - * - filters -> prefilter dependency is OK and always satisfied - * - postfilter -> (filter, prefilter) dep is ok - * - idempotent -> (any) dep is OK - * - * Otherwise, emit error - * However, even if everything is fine this dep is useless ¯\_(ツ)_/¯ - */ - gboolean ok_dep = FALSE; - - if (it->is_filter) { - if (dit->is_filter) { - ok_dep = TRUE; - } - else if (dit->type & SYMBOL_TYPE_PREFILTER) { - ok_dep = TRUE; - } - } - else if (it->type & SYMBOL_TYPE_POSTFILTER) { - if (dit->type & SYMBOL_TYPE_PREFILTER) { - ok_dep = TRUE; - } - } - else if (it->type & SYMBOL_TYPE_IDEMPOTENT) { - if (dit->type & (SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER)) { - ok_dep = TRUE; - } - } - else if (it->type & SYMBOL_TYPE_PREFILTER) { - if (it->priority < dit->priority) { - /* Also OK */ - ok_dep = TRUE; - } - } - - if (!ok_dep) { - msg_err_cache ("cannot add dependency from %s on %s: invalid symbol types", - dep->sym, dit->symbol); - - return; - } - } - else { - if (dit->id == it->id) { - msg_err_cache ("cannot add dependency on self: %s -> %s " - "(resolved to %s)", - it->symbol, dep->sym, dit->symbol); - } else { - rdep = rspamd_mempool_alloc (cache->static_pool, - sizeof (*rdep)); - - rdep->sym = dep->sym; - rdep->item = it; - rdep->id = it->id; - g_assert (dit->rdeps != NULL); - g_ptr_array_add (dit->rdeps, rdep); - dep->item = dit; - dep->id = dit->id; - - msg_debug_cache ("add dependency from %d on %d", it->id, - dit->id); - } - } - } - else if (dep->id >= 0) { - msg_err_cache ("cannot find dependency on symbol %s for symbol %s", - dep->sym, it->symbol); - - return; - } - - if (vdit) { - /* Use virtual symbol to propagate deps */ - rspamd_symcache_propagate_dep (cache, it, vdit); - } -} - -/* Sort items in logical order */ -static void -rspamd_symcache_post_init (struct rspamd_symcache *cache) -{ - struct rspamd_symcache_item *it, *vit; - struct cache_dependency *dep; - struct delayed_cache_dependency *ddep; - struct delayed_cache_condition *dcond; - GList *cur; - gint i, j; - - cur = cache->delayed_deps; - while (cur) { - ddep = cur->data; - - vit = rspamd_symcache_find_filter (cache, ddep->from, false); - it = rspamd_symcache_find_filter (cache, ddep->from, true); - - if (it == NULL || vit == NULL) { - msg_err_cache ("cannot register delayed dependency between %s and %s: " - "%s is missing", ddep->from, ddep->to, ddep->from); - } - else { - msg_debug_cache ("delayed between %s(%d:%d) -> %s", ddep->from, - it->id, vit->id, ddep->to); - rspamd_symcache_add_dependency (cache, it->id, ddep->to, vit != it ? - vit->id : -1); - } - - cur = g_list_next (cur); - } - - cur = cache->delayed_conditions; - while (cur) { - dcond = cur->data; - - it = rspamd_symcache_find_filter (cache, dcond->sym, true); - - if (it == NULL) { - msg_err_cache ( - "cannot register delayed condition for %s", - dcond->sym); - luaL_unref (dcond->L, LUA_REGISTRYINDEX, dcond->cbref); - } - else { - struct rspamd_symcache_condition *ncond = rspamd_mempool_alloc0 (cache->static_pool, - sizeof (*ncond)); - ncond->cb = dcond->cbref; - DL_APPEND (it->specific.normal.conditions, ncond); - } - - cur = g_list_next (cur); - } - - PTR_ARRAY_FOREACH (cache->items_by_id, i, it) { - - PTR_ARRAY_FOREACH (it->deps, j, dep) { - rspamd_symcache_process_dep (cache, it, dep); - } - - if (it->deps) { - /* Reversed loop to make removal safe */ - for (j = it->deps->len - 1; j >= 0; j--) { - dep = g_ptr_array_index (it->deps, j); - - if (dep->item == NULL) { - /* Remove useless dep */ - g_ptr_array_remove_index (it->deps, j); - } - } - } - } - - /* Special case for virtual symbols */ - PTR_ARRAY_FOREACH (cache->virtual, i, it) { - - PTR_ARRAY_FOREACH (it->deps, j, dep) { - rspamd_symcache_process_dep (cache, it, dep); - } - } - - g_ptr_array_sort_with_data (cache->connfilters, prefilters_cmp, cache); - g_ptr_array_sort_with_data (cache->prefilters, prefilters_cmp, cache); - g_ptr_array_sort_with_data (cache->postfilters, postfilters_cmp, cache); - g_ptr_array_sort_with_data (cache->idempotent, postfilters_cmp, cache); - - rspamd_symcache_resort (cache); -} - -static gboolean -rspamd_symcache_load_items (struct rspamd_symcache *cache, const gchar *name) -{ - struct rspamd_symcache_header *hdr; - struct stat st; - struct ucl_parser *parser; - ucl_object_t *top; - const ucl_object_t *cur, *elt; - ucl_object_iter_t it; - struct rspamd_symcache_item *item, *parent; - const guchar *p; - gint fd; - gpointer map; - - fd = open (name, O_RDONLY); - - if (fd == -1) { - msg_info_cache ("cannot open file %s, error %d, %s", name, - errno, strerror (errno)); - return FALSE; - } - - rspamd_file_lock (fd, FALSE); - - if (fstat (fd, &st) == -1) { - rspamd_file_unlock (fd, FALSE); - close (fd); - msg_info_cache ("cannot stat file %s, error %d, %s", name, - errno, strerror (errno)); - return FALSE; - } - - if (st.st_size < (gint)sizeof (*hdr)) { - rspamd_file_unlock (fd, FALSE); - close (fd); - errno = EINVAL; - msg_info_cache ("cannot use file %s, error %d, %s", name, - errno, strerror (errno)); - return FALSE; - } - - map = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, fd, 0); - - if (map == MAP_FAILED) { - rspamd_file_unlock (fd, FALSE); - close (fd); - msg_info_cache ("cannot mmap file %s, error %d, %s", name, - errno, strerror (errno)); - return FALSE; - } - - hdr = map; - - if (memcmp (hdr->magic, rspamd_symcache_magic, - sizeof (rspamd_symcache_magic)) != 0) { - msg_info_cache ("cannot use file %s, bad magic", name); - munmap (map, st.st_size); - rspamd_file_unlock (fd, FALSE); - close (fd); - - return FALSE; - } - - parser = ucl_parser_new (0); - p = (const guchar *)(hdr + 1); - - if (!ucl_parser_add_chunk (parser, p, st.st_size - sizeof (*hdr))) { - msg_info_cache ("cannot use file %s, cannot parse: %s", name, - ucl_parser_get_error (parser)); - munmap (map, st.st_size); - ucl_parser_free (parser); - rspamd_file_unlock (fd, FALSE); - close (fd); - - return FALSE; - } - - top = ucl_parser_get_object (parser); - munmap (map, st.st_size); - rspamd_file_unlock (fd, FALSE); - close (fd); - ucl_parser_free (parser); - - if (top == NULL || ucl_object_type (top) != UCL_OBJECT) { - msg_info_cache ("cannot use file %s, bad object", name); - ucl_object_unref (top); - return FALSE; - } - - it = ucl_object_iterate_new (top); - - while ((cur = ucl_object_iterate_safe (it, true))) { - item = g_hash_table_lookup (cache->items_by_symbol, ucl_object_key (cur)); - - if (item) { - /* Copy saved info */ - /* - * XXX: don't save or load weight, it should be obtained from the - * metric - */ -#if 0 - elt = ucl_object_lookup (cur, "weight"); - - if (elt) { - w = ucl_object_todouble (elt); - if (w != 0) { - item->weight = w; - } - } -#endif - elt = ucl_object_lookup (cur, "time"); - if (elt) { - item->st->avg_time = ucl_object_todouble (elt); - } - - elt = ucl_object_lookup (cur, "count"); - if (elt) { - item->st->total_hits = ucl_object_toint (elt); - item->last_count = item->st->total_hits; - } - - elt = ucl_object_lookup (cur, "frequency"); - if (elt && ucl_object_type (elt) == UCL_OBJECT) { - const ucl_object_t *freq_elt; - - freq_elt = ucl_object_lookup (elt, "avg"); - - if (freq_elt) { - item->st->avg_frequency = ucl_object_todouble (freq_elt); - } - freq_elt = ucl_object_lookup (elt, "stddev"); - - if (freq_elt) { - item->st->stddev_frequency = ucl_object_todouble (freq_elt); - } - } - - if (item->is_virtual && !(item->type & SYMBOL_TYPE_GHOST)) { - g_assert (item->specific.virtual.parent < (gint)cache->items_by_id->len); - parent = g_ptr_array_index (cache->items_by_id, - item->specific.virtual.parent); - item->specific.virtual.parent_item = parent; - - if (parent->st->weight < item->st->weight) { - parent->st->weight = item->st->weight; - } - - /* - * We maintain avg_time for virtual symbols equal to the - * parent item avg_time - */ - item->st->avg_time = parent->st->avg_time; - } - - cache->total_weight += fabs (item->st->weight); - cache->total_hits += item->st->total_hits; - } - } - - ucl_object_iterate_free (it); - ucl_object_unref (top); - - return TRUE; -} - -#define ROUND_DOUBLE(x) (floor((x) * 100.0) / 100.0) - -static gboolean -rspamd_symcache_save_items (struct rspamd_symcache *cache, const gchar *name) -{ - struct rspamd_symcache_header hdr; - ucl_object_t *top, *elt, *freq; - GHashTableIter it; - struct rspamd_symcache_item *item; - struct ucl_emitter_functions *efunc; - gpointer k, v; - gint fd; - FILE *fp; - bool ret; - gchar path[PATH_MAX]; - - rspamd_snprintf (path, sizeof (path), "%s.new", name); - - fd = open (path, O_CREAT | O_WRONLY | O_EXCL, 00644); - - if (fd == -1) { - if (errno == EEXIST) { - /* Some other process is already writing data, give up silently */ - return TRUE; - } - - msg_err_cache ("cannot open file %s, error %d, %s", path, - errno, strerror (errno)); - return FALSE; - } - - rspamd_file_lock (fd, FALSE); - fp = fdopen (fd, "w"); - - memset (&hdr, 0, sizeof (hdr)); - memcpy (hdr.magic, rspamd_symcache_magic, - sizeof (rspamd_symcache_magic)); - - if (fwrite (&hdr, sizeof (hdr), 1, fp) == -1) { - msg_err_cache ("cannot write to file %s, error %d, %s", path, - errno, strerror (errno)); - rspamd_file_unlock (fd, FALSE); - fclose (fp); - - return FALSE; - } - - top = ucl_object_typed_new (UCL_OBJECT); - g_hash_table_iter_init (&it, cache->items_by_symbol); - - while (g_hash_table_iter_next (&it, &k, &v)) { - item = v; - elt = ucl_object_typed_new (UCL_OBJECT); - ucl_object_insert_key (elt, - ucl_object_fromdouble (ROUND_DOUBLE (item->st->weight)), - "weight", 0, false); - ucl_object_insert_key (elt, - ucl_object_fromdouble (ROUND_DOUBLE (item->st->time_counter.mean)), - "time", 0, false); - ucl_object_insert_key (elt, ucl_object_fromint (item->st->total_hits), - "count", 0, false); - - freq = ucl_object_typed_new (UCL_OBJECT); - ucl_object_insert_key (freq, - ucl_object_fromdouble (ROUND_DOUBLE (item->st->frequency_counter.mean)), - "avg", 0, false); - ucl_object_insert_key (freq, - ucl_object_fromdouble (ROUND_DOUBLE (item->st->frequency_counter.stddev)), - "stddev", 0, false); - ucl_object_insert_key (elt, freq, "frequency", 0, false); - - ucl_object_insert_key (top, elt, k, 0, false); - } - - efunc = ucl_object_emit_file_funcs (fp); - ret = ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, efunc, NULL); - ucl_object_emit_funcs_free (efunc); - ucl_object_unref (top); - rspamd_file_unlock (fd, FALSE); - fclose (fp); - - if (rename (path, name) == -1) { - msg_err_cache ("cannot rename %s -> %s, error %d, %s", path, name, - errno, strerror (errno)); - (void)unlink (path); - ret = FALSE; - } - - return ret; -} - -#undef ROUND_DOUBLE - -gint -rspamd_symcache_add_symbol (struct rspamd_symcache *cache, - const gchar *name, - gint priority, - symbol_func_t func, - gpointer user_data, - enum rspamd_symbol_type type, - gint parent) -{ - struct rspamd_symcache_item *item = NULL; - const gchar *type_str = "normal"; - - g_assert (cache != NULL); - - if (name == NULL && !(type & SYMBOL_TYPE_CALLBACK)) { - msg_warn_cache ("no name for non-callback symbol!"); - } - else if ((type & SYMBOL_TYPE_VIRTUAL & (~SYMBOL_TYPE_GHOST)) && parent == -1) { - msg_warn_cache ("no parent symbol is associated with virtual symbol %s", - name); - } - - if (name != NULL && !(type & SYMBOL_TYPE_CALLBACK)) { - struct rspamd_symcache_item *existing; - - if (strcspn (name, " \t\n\r") != strlen (name)) { - msg_warn_cache ("bogus characters in symbol name: \"%s\"", - name); - } - - if ((existing = g_hash_table_lookup (cache->items_by_symbol, name)) != NULL) { - - if (existing->type & SYMBOL_TYPE_GHOST) { - /* - * Complicated part: - * - we need to remove the existing ghost symbol - * - we need to cleanup containers: - * - symbols hash - * - specific array - * - items_by_it - * - decrement used_items - */ - msg_info_cache ("duplicate ghost symbol %s is removed", name); - - if (existing->container) { - g_ptr_array_remove (existing->container, existing); - } - - g_ptr_array_remove (cache->items_by_id, existing->container); - cache->used_items --; - g_hash_table_remove (cache->items_by_symbol, name); - /* - * Here can be memory leak, but we assume that ghost symbols - * are also virtual - */ - } - else { - msg_err_cache ("skip duplicate symbol registration for %s", name); - return -1; - } - } - } - - if (type & (SYMBOL_TYPE_CLASSIFIER|SYMBOL_TYPE_CALLBACK| - SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER| - SYMBOL_TYPE_IDEMPOTENT|SYMBOL_TYPE_GHOST)) { - type |= SYMBOL_TYPE_NOSTAT; - } - - item = rspamd_mempool_alloc0 (cache->static_pool, - sizeof (struct rspamd_symcache_item)); - item->st = rspamd_mempool_alloc0_shared (cache->static_pool, - sizeof (*item->st)); - item->enabled = TRUE; - - /* - * We do not share cd to skip locking, instead we'll just calculate it on - * save or accumulate - */ - item->cd = rspamd_mempool_alloc0 (cache->static_pool, - sizeof (struct rspamd_counter_data)); - item->priority = priority; - item->type = type; - - if ((type & SYMBOL_TYPE_FINE) && item->priority == 0) { - /* Make priority for negative weighted symbols */ - item->priority = 1; - } - - if (func) { - /* Non-virtual symbol */ - g_assert (parent == -1); - - if (item->type & SYMBOL_TYPE_PREFILTER) { - type_str = "prefilter"; - g_ptr_array_add (cache->prefilters, item); - item->container = cache->prefilters; - } - else if (item->type & SYMBOL_TYPE_IDEMPOTENT) { - type_str = "idempotent"; - g_ptr_array_add (cache->idempotent, item); - item->container = cache->idempotent; - } - else if (item->type & SYMBOL_TYPE_POSTFILTER) { - type_str = "postfilter"; - g_ptr_array_add (cache->postfilters, item); - item->container = cache->postfilters; - } - else if (item->type & SYMBOL_TYPE_CONNFILTER) { - type_str = "connfilter"; - g_ptr_array_add (cache->connfilters, item); - item->container = cache->connfilters; - } - else { - item->is_filter = TRUE; - g_ptr_array_add (cache->filters, item); - item->container = cache->filters; - } - - item->id = cache->items_by_id->len; - g_ptr_array_add (cache->items_by_id, item); - - item->specific.normal.func = func; - item->specific.normal.user_data = user_data; - item->specific.normal.conditions = NULL; - } - else { - /* - * Three possibilities here when no function is specified: - * - virtual symbol (beware of ghosts!) - * - classifier symbol - * - composite symbol - */ - if (item->type & SYMBOL_TYPE_COMPOSITE) { - item->specific.normal.conditions = NULL; - item->specific.normal.user_data = user_data; - g_assert (user_data != NULL); - g_ptr_array_add (cache->composites, item); - - item->id = cache->items_by_id->len; - g_ptr_array_add (cache->items_by_id, item); - item->container = cache->composites; - type_str = "composite"; - } - else if (item->type & SYMBOL_TYPE_CLASSIFIER) { - /* Treat it as normal symbol to allow enable/disable */ - item->id = cache->items_by_id->len; - g_ptr_array_add (cache->items_by_id, item); - - item->is_filter = TRUE; - item->specific.normal.func = NULL; - item->specific.normal.user_data = NULL; - item->specific.normal.conditions = NULL; - type_str = "classifier"; - } - else { - item->is_virtual = TRUE; - item->specific.virtual.parent = parent; - item->specific.virtual.parent_item = - g_ptr_array_index (cache->items_by_id, parent); - item->id = cache->virtual->len; - g_ptr_array_add (cache->virtual, item); - item->container = cache->virtual; - /* Not added to items_by_id, handled by parent */ - type_str = "virtual"; - } - } - - cache->used_items ++; - cache->id ++; - - if (!(item->type & - (SYMBOL_TYPE_IDEMPOTENT|SYMBOL_TYPE_NOSTAT|SYMBOL_TYPE_CLASSIFIER))) { - if (name != NULL) { - cache->cksum = t1ha (name, strlen (name), - cache->cksum); - } else { - cache->cksum = t1ha (&item->id, sizeof (item->id), - cache->cksum); - } - - cache->stats_symbols_count ++; - } - - if (name != NULL) { - item->symbol = rspamd_mempool_strdup (cache->static_pool, name); - msg_debug_cache ("used items: %d, added symbol: %s, %d; symbol type: %s", - cache->used_items, name, item->id, type_str); - } else { - g_assert (func != NULL); - msg_debug_cache ("used items: %d, added unnamed symbol: %d; symbol type: %s", - cache->used_items, item->id, type_str); - } - - item->deps = g_ptr_array_new (); - item->rdeps = g_ptr_array_new (); - item->type_descr = type_str; - rspamd_mempool_add_destructor (cache->static_pool, - rspamd_ptr_array_free_hard, item->deps); - rspamd_mempool_add_destructor (cache->static_pool, - rspamd_ptr_array_free_hard, item->rdeps); - - if (name != NULL) { - g_hash_table_insert (cache->items_by_symbol, item->symbol, item); - } - - return item->id; -} - -void -rspamd_symcache_set_peak_callback (struct rspamd_symcache *cache, - gint cbref) -{ - g_assert (cache != NULL); - - if (cache->peak_cb != -1) { - luaL_unref (cache->cfg->lua_state, LUA_REGISTRYINDEX, - cache->peak_cb); - } - - cache->peak_cb = cbref; - msg_info_cache ("registered peak callback"); -} - -gboolean -rspamd_symcache_add_condition_delayed (struct rspamd_symcache *cache, - const gchar *sym, lua_State *L, gint cbref) -{ - struct delayed_cache_condition *ncond; - - g_assert (cache != NULL); - g_assert (sym != NULL); - - ncond = g_malloc0 (sizeof (*ncond)); - ncond->sym = g_strdup (sym); - ncond->cbref = cbref; - ncond->L = L; - cache->id ++; - - cache->delayed_conditions = g_list_prepend (cache->delayed_conditions, ncond); - - return TRUE; -} - -void -rspamd_symcache_save (struct rspamd_symcache *cache) -{ - if (cache != NULL) { - - if (cache->cfg->cache_filename) { - /* Try to sync values to the disk */ - if (!rspamd_symcache_save_items (cache, - cache->cfg->cache_filename)) { - msg_err_cache ("cannot save cache data to %s: %s", - cache->cfg->cache_filename, strerror (errno)); - } - } - } -} - -void -rspamd_symcache_destroy (struct rspamd_symcache *cache) -{ - GList *cur; - struct delayed_cache_dependency *ddep; - struct delayed_cache_condition *dcond; - - if (cache != NULL) { - if (cache->delayed_deps) { - cur = cache->delayed_deps; - - while (cur) { - ddep = cur->data; - g_free (ddep->from); - g_free (ddep->to); - g_free (ddep); - cur = g_list_next (cur); - } - - g_list_free (cache->delayed_deps); - } - - if (cache->delayed_conditions) { - cur = cache->delayed_conditions; - - while (cur) { - dcond = cur->data; - g_free (dcond->sym); - g_free (dcond); - cur = g_list_next (cur); - } - - g_list_free (cache->delayed_conditions); - } - - g_hash_table_destroy (cache->items_by_symbol); - g_ptr_array_free (cache->items_by_id, TRUE); - rspamd_mempool_delete (cache->static_pool); - g_ptr_array_free (cache->connfilters, TRUE); - g_ptr_array_free (cache->prefilters, TRUE); - g_ptr_array_free (cache->filters, TRUE); - g_ptr_array_free (cache->postfilters, TRUE); - g_ptr_array_free (cache->idempotent, TRUE); - g_ptr_array_free (cache->composites, TRUE); - g_ptr_array_free (cache->virtual, TRUE); - REF_RELEASE (cache->items_by_order); - - if (cache->peak_cb != -1) { - luaL_unref (cache->cfg->lua_state, LUA_REGISTRYINDEX, cache->peak_cb); - } - - g_free (cache); - } -} - -struct rspamd_symcache* -rspamd_symcache_new (struct rspamd_config *cfg) -{ - struct rspamd_symcache *cache; - - cache = g_malloc0 (sizeof (struct rspamd_symcache)); - cache->static_pool = - rspamd_mempool_new (rspamd_mempool_suggest_size (), "symcache", 0); - cache->items_by_symbol = g_hash_table_new (rspamd_str_hash, - rspamd_str_equal); - cache->items_by_id = g_ptr_array_new (); - cache->connfilters = g_ptr_array_new (); - cache->prefilters = g_ptr_array_new (); - cache->filters = g_ptr_array_new (); - cache->postfilters = g_ptr_array_new (); - cache->idempotent = g_ptr_array_new (); - cache->composites = g_ptr_array_new (); - cache->virtual = g_ptr_array_new (); - cache->reload_time = cfg->cache_reload_time; - cache->total_hits = 1; - cache->total_weight = 1.0; - cache->cfg = cfg; - cache->cksum = 0xdeadbabe; - cache->peak_cb = -1; - cache->id = (guint)rspamd_random_uint64_fast (); - - return cache; -} - -static void -rspamd_symcache_metric_connect_cb (gpointer k, gpointer v, gpointer ud) -{ - struct rspamd_symcache *cache = (struct rspamd_symcache *)ud; - const gchar *sym = k; - struct rspamd_symbol *s = (struct rspamd_symbol *)v; - gdouble weight; - struct rspamd_symcache_item *item; - - weight = *s->weight_ptr; - item = g_hash_table_lookup (cache->items_by_symbol, sym); - - if (item) { - item->st->weight = weight; - s->cache_item = item; - } -} - -gboolean -rspamd_symcache_init (struct rspamd_symcache *cache) -{ - gboolean res = TRUE; - - g_assert (cache != NULL); - - cache->reload_time = cache->cfg->cache_reload_time; - - if (cache->cfg->cache_filename != NULL) { - res = rspamd_symcache_load_items (cache, cache->cfg->cache_filename); - } - - rspamd_symcache_post_init (cache); - - /* Connect metric symbols with symcache symbols */ - if (cache->cfg->symbols) { - g_hash_table_foreach (cache->cfg->symbols, - rspamd_symcache_metric_connect_cb, - cache); - } - - return res; -} - - -static void -rspamd_symcache_validate_cb (gpointer k, gpointer v, gpointer ud) -{ - struct rspamd_symcache_item *item = v, *parent; - struct rspamd_config *cfg; - struct rspamd_symcache *cache = (struct rspamd_symcache *)ud; - struct rspamd_symbol *s; - gboolean skipped, ghost; - gint p1, p2; - - ghost = item->st->weight == 0 ? TRUE : FALSE; - cfg = cache->cfg; - - /* Check whether this item is skipped */ - skipped = !ghost; - g_assert (cfg != NULL); - - if ((item->type & - (SYMBOL_TYPE_NORMAL|SYMBOL_TYPE_VIRTUAL|SYMBOL_TYPE_COMPOSITE|SYMBOL_TYPE_CLASSIFIER)) - && g_hash_table_lookup (cfg->symbols, item->symbol) == NULL) { - - if (!isnan(cfg->unknown_weight)) { - - skipped = FALSE; - item->st->weight = cfg->unknown_weight; - s = rspamd_mempool_alloc0 (cache->static_pool, - sizeof (*s)); - s->name = item->symbol; - s->weight_ptr = &item->st->weight; - g_hash_table_insert (cfg->symbols, item->symbol, s); - - msg_info_cache ("adding unknown symbol %s with weight: %.2f", - item->symbol, cfg->unknown_weight); - ghost = FALSE; - } - else { - skipped = TRUE; - } - } - else { - skipped = FALSE; - } - - if (!ghost && skipped) { - if (!(item->type & SYMBOL_TYPE_SKIPPED)) { - item->type |= SYMBOL_TYPE_SKIPPED; - msg_warn_cache ("symbol %s has no score registered, skip its check", - item->symbol); - } - } - - if (ghost) { - msg_debug_cache ("symbol %s is registered as ghost symbol, it won't be inserted " - "to any metric", item->symbol); - } - - if (item->st->weight < 0 && item->priority == 0) { - item->priority ++; - } - - if (item->is_virtual) { - if (!(item->type & SYMBOL_TYPE_GHOST)) { - g_assert (item->specific.virtual.parent != -1); - g_assert (item->specific.virtual.parent < (gint) cache->items_by_id->len); - parent = g_ptr_array_index (cache->items_by_id, - item->specific.virtual.parent); - item->specific.virtual.parent_item = parent; - - if (fabs (parent->st->weight) < fabs (item->st->weight)) { - parent->st->weight = item->st->weight; - } - - p1 = abs (item->priority); - p2 = abs (parent->priority); - - if (p1 != p2) { - parent->priority = MAX (p1, p2); - item->priority = parent->priority; - } - } - } - - cache->total_weight += fabs (item->st->weight); -} - -gboolean -rspamd_symcache_validate (struct rspamd_symcache *cache, - struct rspamd_config *cfg, - gboolean strict) -{ - struct rspamd_symcache_item *item; - GHashTableIter it; - gpointer k, v; - struct rspamd_symbol *sym_def; - gboolean ignore_symbol = FALSE, ret = TRUE; - - if (cache == NULL) { - msg_err ("empty cache is invalid"); - return FALSE; - } - - - g_hash_table_foreach (cache->items_by_symbol, - rspamd_symcache_validate_cb, - cache); - /* Now check each metric item and find corresponding symbol in a cache */ - g_hash_table_iter_init (&it, cfg->symbols); - - while (g_hash_table_iter_next (&it, &k, &v)) { - ignore_symbol = FALSE; - sym_def = v; - - if (sym_def && (sym_def->flags & - (RSPAMD_SYMBOL_FLAG_IGNORE_METRIC|RSPAMD_SYMBOL_FLAG_DISABLED))) { - ignore_symbol = TRUE; - } - - if (!ignore_symbol) { - item = g_hash_table_lookup (cache->items_by_symbol, k); - - if (item == NULL) { - msg_warn_cache ( - "symbol '%s' has its score defined but there is no " - "corresponding rule registered", - k); - if (strict) { - ret = FALSE; - } - } - } - else if (sym_def->flags & RSPAMD_SYMBOL_FLAG_DISABLED) { - item = g_hash_table_lookup (cache->items_by_symbol, k); - - if (item) { - item->enabled = FALSE; - } - } - } - - return ret; -} - -/* Return true if metric has score that is more than spam score for it */ -static gboolean -rspamd_symcache_metric_limit (struct rspamd_task *task, - struct cache_savepoint *cp) -{ - struct rspamd_scan_result *res; - double ms; - - if (task->flags & RSPAMD_TASK_FLAG_PASS_ALL) { - return FALSE; - } - - if (cp->lim == 0.0) { - res = task->result; - - if (res) { - ms = rspamd_task_get_required_score (task, res); - - if (!isnan (ms) && cp->lim < ms) { - cp->rs = res; - cp->lim = ms; - } - } - } - - if (cp->rs) { - - if (cp->rs->score > cp->lim) { - return TRUE; - } - } - else { - /* No reject score define, always check all rules */ - cp->lim = -1; - } - - return FALSE; -} - -static inline gboolean -rspamd_symcache_check_id_list (const struct rspamd_symcache_id_list *ls, guint32 id) -{ - guint i; - - if (ls->dyn.e == -1) { - guint *res = bsearch (&id, ls->dyn.n, ls->dyn.len, sizeof (guint32), - rspamd_id_cmp); - - if (res) { - return TRUE; - } - } - else { - for (i = 0; i < G_N_ELEMENTS (ls->st); i ++) { - if (ls->st[i] == id) { - return TRUE; - } - else if (ls->st[i] == 0) { - return FALSE; - } - } - } - - return FALSE; -} - -gboolean -rspamd_symcache_is_item_allowed (struct rspamd_task *task, - struct rspamd_symcache_item *item, - gboolean exec_only) -{ - const gchar *what = "execution"; - - if (!exec_only) { - what = "symbol insertion"; - } - - /* Static checks */ - if (!item->enabled || - (RSPAMD_TASK_IS_EMPTY (task) && !(item->type & SYMBOL_TYPE_EMPTY)) || - (item->type & SYMBOL_TYPE_MIME_ONLY && !RSPAMD_TASK_IS_MIME(task))) { - - if (!item->enabled) { - msg_debug_cache_task ("skipping %s of %s as it is permanently disabled; symbol type=%s", - what, item->symbol, item->type_descr); - - return FALSE; - } - else { - /* - * Exclude virtual symbols - */ - if (exec_only) { - msg_debug_cache_task ("skipping check of %s as it cannot be " - "executed for this task type; symbol type=%s", - item->symbol, item->type_descr); - - return FALSE; - } - } - } - - /* Settings checks */ - if (task->settings_elt != 0) { - guint32 id = task->settings_elt->id; - - if (item->forbidden_ids.st[0] != 0 && - rspamd_symcache_check_id_list (&item->forbidden_ids, - id)) { - msg_debug_cache_task ("deny %s of %s as it is forbidden for " - "settings id %ud; symbol type=%s", - what, - item->symbol, - id, - item->type_descr); - - return FALSE; - } - - if (!(item->type & SYMBOL_TYPE_EXPLICIT_DISABLE)) { - if (item->allowed_ids.st[0] == 0 || - !rspamd_symcache_check_id_list (&item->allowed_ids, - id)) { - - if (task->settings_elt->policy == RSPAMD_SETTINGS_POLICY_IMPLICIT_ALLOW) { - msg_debug_cache_task ("allow execution of %s settings id %ud " - "allows implicit execution of the symbols;" - "symbol type=%s", - item->symbol, - id, - item->type_descr); - - return TRUE; - } - - if (exec_only) { - /* - * Special case if any of our virtual children are enabled - */ - if (rspamd_symcache_check_id_list (&item->exec_only_ids, id)) { - return TRUE; - } - } - - msg_debug_cache_task ("deny %s of %s as it is not listed " - "as allowed for settings id %ud; symbol type=%s", - what, - item->symbol, - id, - item->type_descr); - return FALSE; - } - } - else { - msg_debug_cache_task ("allow %s of %s for " - "settings id %ud as it can be only disabled explicitly;" - " symbol type=%s", - what, - item->symbol, - id, - item->type_descr); - } - } - else if (item->type & SYMBOL_TYPE_EXPLICIT_ENABLE) { - msg_debug_cache_task ("deny %s of %s as it must be explicitly enabled; symbol type=%s", - what, - item->symbol, - item->type_descr); - return FALSE; - } - - /* Allow all symbols with no settings id */ - return TRUE; -} - -static gboolean -rspamd_symcache_check_symbol (struct rspamd_task *task, - struct rspamd_symcache *cache, - struct rspamd_symcache_item *item, - struct cache_savepoint *checkpoint) -{ - struct rspamd_task **ptask; - lua_State *L; - gboolean check = TRUE; - struct rspamd_symcache_dynamic_item *dyn_item = - rspamd_symcache_get_dynamic (checkpoint, item); - - if (item->type & (SYMBOL_TYPE_CLASSIFIER|SYMBOL_TYPE_COMPOSITE)) { - /* Classifiers are special :( */ - return TRUE; - } - - if (rspamd_session_blocked (task->s)) { - /* - * We cannot add new events as session is either destroyed or - * being cleaned up. - */ - return TRUE; - } - - g_assert (!item->is_virtual); - g_assert (item->specific.normal.func != NULL); - if (CHECK_START_BIT (checkpoint, dyn_item)) { - /* - * This can actually happen when deps span over different layers - */ - return CHECK_FINISH_BIT (checkpoint, dyn_item); - } - - /* Check has been started */ - SET_START_BIT (checkpoint, dyn_item); - - if (!rspamd_symcache_is_item_allowed (task, item, TRUE)) { - check = FALSE; - } - else if (item->specific.normal.conditions) { - struct rspamd_symcache_condition *cur_cond; - - DL_FOREACH (item->specific.normal.conditions, cur_cond) { - /* We also executes condition callback to check if we need this symbol */ - L = task->cfg->lua_state; - lua_rawgeti (L, LUA_REGISTRYINDEX, cur_cond->cb); - ptask = lua_newuserdata (L, sizeof (struct rspamd_task *)); - rspamd_lua_setclass (L, "rspamd{task}", -1); - *ptask = task; - - if (lua_pcall (L, 1, 1, 0) != 0) { - msg_info_task ("call to condition for %s failed: %s", - item->symbol, lua_tostring (L, -1)); - lua_pop (L, 1); - } - else { - check = lua_toboolean (L, -1); - lua_pop (L, 1); - } - - if (!check) { - break; - } - } - - if (!check) { - msg_debug_cache_task ("skipping check of %s as its start condition is false; " - "symbol type = %s", - item->symbol, item->type_descr); - } - } - - if (check) { - msg_debug_cache_task ("execute %s, %d; symbol type = %s", item->symbol, - item->id, item->type_descr); - - if (checkpoint->profile) { - ev_now_update_if_cheap (task->event_loop); - dyn_item->start_msec = (ev_now (task->event_loop) - - checkpoint->profile_start) * 1e3; - } - - dyn_item->async_events = 0; - checkpoint->cur_item = item; - checkpoint->items_inflight ++; - /* Callback now must finalize itself */ - item->specific.normal.func (task, item, item->specific.normal.user_data); - checkpoint->cur_item = NULL; - - if (checkpoint->items_inflight == 0) { - - return TRUE; - } - - if (dyn_item->async_events == 0 && !CHECK_FINISH_BIT (checkpoint, dyn_item)) { - msg_err_cache ("critical error: item %s has no async events pending, " - "but it is not finalised", item->symbol); - g_assert_not_reached (); - } - - return FALSE; - } - else { - SET_FINISH_BIT (checkpoint, dyn_item); - } - - return TRUE; -} - -static gboolean -rspamd_symcache_check_deps (struct rspamd_task *task, - struct rspamd_symcache *cache, - struct rspamd_symcache_item *item, - struct cache_savepoint *checkpoint, - guint recursion, - gboolean check_only) -{ - struct cache_dependency *dep; - guint i; - gboolean ret = TRUE; - static const guint max_recursion = 20; - struct rspamd_symcache_dynamic_item *dyn_item; - - if (recursion > max_recursion) { - msg_err_task ("cyclic dependencies: maximum check level %ud exceed when " - "checking dependencies for %s", max_recursion, item->symbol); - - return TRUE; - } - - if (item->deps != NULL && item->deps->len > 0) { - for (i = 0; i < item->deps->len; i ++) { - dep = g_ptr_array_index (item->deps, i); - - if (dep->item == NULL) { - /* Assume invalid deps as done */ - msg_debug_cache_task ("symbol %d(%s) has invalid dependencies on %d(%s)", - item->id, item->symbol, dep->id, dep->sym); - continue; - } - - dyn_item = rspamd_symcache_get_dynamic (checkpoint, dep->item); - - if (!CHECK_FINISH_BIT (checkpoint, dyn_item)) { - if (!CHECK_START_BIT (checkpoint, dyn_item)) { - /* Not started */ - if (!check_only) { - if (!rspamd_symcache_check_deps (task, cache, - dep->item, - checkpoint, - recursion + 1, - check_only)) { - - ret = FALSE; - msg_debug_cache_task ("delayed dependency %d(%s) for " - "symbol %d(%s)", - dep->id, dep->sym, item->id, item->symbol); - } - else if (!rspamd_symcache_check_symbol (task, cache, - dep->item, - checkpoint)) { - /* Now started, but has events pending */ - ret = FALSE; - msg_debug_cache_task ("started check of %d(%s) symbol " - "as dep for " - "%d(%s)", - dep->id, dep->sym, item->id, item->symbol); - } - else { - msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is " - "already processed", - dep->id, dep->sym, item->id, item->symbol); - } - } - else { - msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) " - "cannot be started now", - dep->id, dep->sym, - item->id, item->symbol); - ret = FALSE; - } - } - else { - /* Started but not finished */ - msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is " - "still executing", - dep->id, dep->sym, - item->id, item->symbol); - ret = FALSE; - } - } - else { - msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is already " - "checked", - dep->id, dep->sym, - item->id, item->symbol); - } - } - } - - return ret; -} - -static struct cache_savepoint * -rspamd_symcache_make_checkpoint (struct rspamd_task *task, - struct rspamd_symcache *cache) -{ - struct cache_savepoint *checkpoint; - - if (cache->items_by_order->id != cache->id) { - /* - * Cache has been modified, need to resort it - */ - msg_info_cache ("symbols cache has been modified since last check:" - " old id: %ud, new id: %ud", - cache->items_by_order->id, cache->id); - rspamd_symcache_resort (cache); - } - - checkpoint = rspamd_mempool_alloc0 (task->task_pool, - sizeof (*checkpoint) + - sizeof (struct rspamd_symcache_dynamic_item) * cache->items_by_id->len); - - g_assert (cache->items_by_order != NULL); - checkpoint->version = cache->items_by_order->d->len; - checkpoint->order = cache->items_by_order; - REF_RETAIN (checkpoint->order); - rspamd_mempool_add_destructor (task->task_pool, - rspamd_symcache_order_unref, checkpoint->order); - - /* Calculate profile probability */ - ev_now_update_if_cheap (task->event_loop); - ev_tstamp now = ev_now (task->event_loop); - checkpoint->profile_start = now; - - if ((cache->last_profile == 0.0 || now > cache->last_profile + PROFILE_MAX_TIME) || - (task->msg.len >= PROFILE_MESSAGE_SIZE_THRESHOLD) || - (rspamd_random_double_fast () >= (1 - PROFILE_PROBABILITY))) { - msg_debug_cache_task ("enable profiling of symbols for task"); - checkpoint->profile = TRUE; - cache->last_profile = now; - } - - task->checkpoint = checkpoint; - - return checkpoint; -} - -gboolean -rspamd_symcache_process_settings (struct rspamd_task *task, - struct rspamd_symcache *cache) -{ - const ucl_object_t *wl, *cur, *disabled, *enabled; - struct rspamd_symbols_group *gr; - GHashTableIter gr_it; - ucl_object_iter_t it = NULL; - gboolean already_disabled = FALSE; - gpointer k, v; - - wl = ucl_object_lookup (task->settings, "whitelist"); - - if (wl != NULL) { - msg_info_task ("task is whitelisted"); - task->flags |= RSPAMD_TASK_FLAG_SKIP; - return TRUE; - } - - enabled = ucl_object_lookup (task->settings, "symbols_enabled"); - - if (enabled) { - /* Disable all symbols but selected */ - rspamd_symcache_disable_all_symbols (task, cache, - SYMBOL_TYPE_EXPLICIT_DISABLE); - already_disabled = TRUE; - it = NULL; - - while ((cur = ucl_iterate_object (enabled, &it, true)) != NULL) { - rspamd_symcache_enable_symbol_checkpoint (task, cache, - ucl_object_tostring (cur)); - } - } - - /* Enable groups of symbols */ - enabled = ucl_object_lookup (task->settings, "groups_enabled"); - - if (enabled) { - it = NULL; - - if (!already_disabled) { - rspamd_symcache_disable_all_symbols (task, cache, - SYMBOL_TYPE_EXPLICIT_DISABLE); - } - - while ((cur = ucl_iterate_object (enabled, &it, true)) != NULL) { - if (ucl_object_type (cur) == UCL_STRING) { - gr = g_hash_table_lookup (task->cfg->groups, - ucl_object_tostring (cur)); - - if (gr) { - g_hash_table_iter_init (&gr_it, gr->symbols); - - while (g_hash_table_iter_next (&gr_it, &k, &v)) { - rspamd_symcache_enable_symbol_checkpoint (task, cache, k); - } - } - } - } - } - - disabled = ucl_object_lookup (task->settings, "symbols_disabled"); - - if (disabled) { - it = NULL; - - while ((cur = ucl_iterate_object (disabled, &it, true)) != NULL) { - rspamd_symcache_disable_symbol_checkpoint (task, cache, - ucl_object_tostring (cur)); - } - } - - /* Disable groups of symbols */ - disabled = ucl_object_lookup (task->settings, "groups_disabled"); - - if (disabled) { - it = NULL; - - while ((cur = ucl_iterate_object (disabled, &it, true)) != NULL) { - if (ucl_object_type (cur) == UCL_STRING) { - gr = g_hash_table_lookup (task->cfg->groups, - ucl_object_tostring (cur)); - - if (gr) { - g_hash_table_iter_init (&gr_it, gr->symbols); - - while (g_hash_table_iter_next (&gr_it, &k, &v)) { - rspamd_symcache_disable_symbol_checkpoint (task, cache, k); - } - } - } - } - } - - return FALSE; -} - -gboolean -rspamd_symcache_process_symbols (struct rspamd_task *task, - struct rspamd_symcache *cache, - gint stage) -{ - struct rspamd_symcache_item *item = NULL; - struct rspamd_symcache_dynamic_item *dyn_item; - struct cache_savepoint *checkpoint; - gint i; - gboolean all_done = TRUE; - gint saved_priority; - guint start_events_pending; - - g_assert (cache != NULL); - - if (task->checkpoint == NULL) { - checkpoint = rspamd_symcache_make_checkpoint (task, cache); - task->checkpoint = checkpoint; - } - else { - checkpoint = task->checkpoint; - } - - msg_debug_cache_task ("symbols processing stage at pass: %d", stage); - start_events_pending = rspamd_session_events_pending (task->s); - - switch (stage) { - case RSPAMD_TASK_STAGE_CONNFILTERS: - /* Check for connection filters */ - saved_priority = G_MININT; - all_done = TRUE; - - for (i = 0; i < (gint) cache->connfilters->len; i++) { - item = g_ptr_array_index (cache->connfilters, i); - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - - if (RSPAMD_TASK_IS_SKIPPED (task)) { - return TRUE; - } - - if (!CHECK_START_BIT (checkpoint, dyn_item) && - !CHECK_FINISH_BIT (checkpoint, dyn_item)) { - - if (checkpoint->has_slow) { - /* Delay */ - checkpoint->has_slow = FALSE; - - return FALSE; - } - /* Check priorities */ - if (saved_priority == G_MININT) { - saved_priority = item->priority; - } - else { - if (item->priority < saved_priority && - rspamd_session_events_pending (task->s) > start_events_pending) { - /* - * Delay further checks as we have higher - * priority filters to be processed - */ - return FALSE; - } - } - - rspamd_symcache_check_symbol (task, cache, item, - checkpoint); - all_done = FALSE; - } - } - break; - - case RSPAMD_TASK_STAGE_PRE_FILTERS: - /* Check for prefilters */ - saved_priority = G_MININT; - all_done = TRUE; - - for (i = 0; i < (gint) cache->prefilters->len; i++) { - item = g_ptr_array_index (cache->prefilters, i); - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - - if (RSPAMD_TASK_IS_SKIPPED (task)) { - return TRUE; - } - - if (!CHECK_START_BIT (checkpoint, dyn_item) && - !CHECK_FINISH_BIT (checkpoint, dyn_item)) { - /* Check priorities */ - if (checkpoint->has_slow) { - /* Delay */ - checkpoint->has_slow = FALSE; - - return FALSE; - } - - if (saved_priority == G_MININT) { - saved_priority = item->priority; - } - else { - if (item->priority < saved_priority && - rspamd_session_events_pending (task->s) > start_events_pending) { - /* - * Delay further checks as we have higher - * priority filters to be processed - */ - return FALSE; - } - } - - rspamd_symcache_check_symbol (task, cache, item, - checkpoint); - all_done = FALSE; - } - } - - break; - - case RSPAMD_TASK_STAGE_FILTERS: - all_done = TRUE; - - for (i = 0; i < (gint) checkpoint->version; i++) { - if (RSPAMD_TASK_IS_SKIPPED (task)) { - return TRUE; - } - - item = g_ptr_array_index (checkpoint->order->d, i); - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - - if (item->type & SYMBOL_TYPE_CLASSIFIER) { - continue; - } - - if (!CHECK_START_BIT (checkpoint, dyn_item)) { - all_done = FALSE; - - if (!rspamd_symcache_check_deps (task, cache, item, - checkpoint, 0, FALSE)) { - - msg_debug_cache_task ("blocked execution of %d(%s) unless deps are " - "resolved", - item->id, item->symbol); - - continue; - } - - rspamd_symcache_check_symbol (task, cache, item, - checkpoint); - - if (checkpoint->has_slow) { - /* Delay */ - checkpoint->has_slow = FALSE; - - return FALSE; - } - } - - if (!(item->type & SYMBOL_TYPE_FINE)) { - if (rspamd_symcache_metric_limit (task, checkpoint)) { - msg_info_task ("task has already scored more than %.2f, so do " - "not " - "plan more checks", - checkpoint->rs->score); - all_done = TRUE; - break; - } - } - } - - break; - - case RSPAMD_TASK_STAGE_POST_FILTERS: - /* Check for postfilters */ - saved_priority = G_MININT; - all_done = TRUE; - - for (i = 0; i < (gint) cache->postfilters->len; i++) { - if (RSPAMD_TASK_IS_SKIPPED (task)) { - return TRUE; - } - - item = g_ptr_array_index (cache->postfilters, i); - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - - if (!CHECK_START_BIT (checkpoint, dyn_item) && - !CHECK_FINISH_BIT (checkpoint, dyn_item)) { - /* Check priorities */ - all_done = FALSE; - - if (checkpoint->has_slow) { - /* Delay */ - checkpoint->has_slow = FALSE; - - return FALSE; - } - - if (saved_priority == G_MININT) { - saved_priority = item->priority; - } - else { - if (item->priority > saved_priority && - rspamd_session_events_pending (task->s) > start_events_pending) { - /* - * Delay further checks as we have higher - * priority filters to be processed - */ - - return FALSE; - } - } - - rspamd_symcache_check_symbol (task, cache, item, - checkpoint); - } - } - - break; - - case RSPAMD_TASK_STAGE_IDEMPOTENT: - /* Check for postfilters */ - saved_priority = G_MININT; - - for (i = 0; i < (gint) cache->idempotent->len; i++) { - item = g_ptr_array_index (cache->idempotent, i); - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - - if (!CHECK_START_BIT (checkpoint, dyn_item) && - !CHECK_FINISH_BIT (checkpoint, dyn_item)) { - /* Check priorities */ - if (checkpoint->has_slow) { - /* Delay */ - checkpoint->has_slow = FALSE; - - return FALSE; - } - - if (saved_priority == G_MININT) { - saved_priority = item->priority; - } - else { - if (item->priority > saved_priority && - rspamd_session_events_pending (task->s) > start_events_pending) { - /* - * Delay further checks as we have higher - * priority filters to be processed - */ - return FALSE; - } - } - rspamd_symcache_check_symbol (task, cache, item, - checkpoint); - } - } - break; - default: - g_assert_not_reached (); - } - - return all_done; -} - -struct counters_cbdata { - ucl_object_t *top; - struct rspamd_symcache *cache; -}; - -/* Leave several digits */ -#define P10(X) (1e##X) -#define ROUND_DOUBLE_DIGITS(x, dig) (floor((x) * P10(dig)) / P10(dig)) -#define ROUND_DOUBLE(x) ROUND_DOUBLE_DIGITS(x, 3) - -static void -rspamd_symcache_counters_cb (gpointer k, gpointer v, gpointer ud) -{ - struct counters_cbdata *cbd = ud; - ucl_object_t *obj, *top; - struct rspamd_symcache_item *item = v, *parent; - const gchar *symbol = k; - - top = cbd->top; - - obj = ucl_object_typed_new (UCL_OBJECT); - ucl_object_insert_key (obj, ucl_object_fromstring (symbol ? symbol : "unknown"), - "symbol", 0, false); - - if (item->is_virtual) { - if (!(item->type & SYMBOL_TYPE_GHOST)) { - parent = g_ptr_array_index (cbd->cache->items_by_id, - item->specific.virtual.parent); - ucl_object_insert_key (obj, - ucl_object_fromdouble (ROUND_DOUBLE (item->st->weight)), - "weight", 0, false); - ucl_object_insert_key (obj, - ucl_object_fromdouble (ROUND_DOUBLE (parent->st->avg_frequency)), - "frequency", 0, false); - ucl_object_insert_key (obj, - ucl_object_fromint (parent->st->total_hits), - "hits", 0, false); - ucl_object_insert_key (obj, - ucl_object_fromdouble (ROUND_DOUBLE (parent->st->avg_time)), - "time", 0, false); - } - else { - ucl_object_insert_key (obj, - ucl_object_fromdouble (ROUND_DOUBLE (item->st->weight)), - "weight", 0, false); - ucl_object_insert_key (obj, - ucl_object_fromdouble (0.0), - "frequency", 0, false); - ucl_object_insert_key (obj, - ucl_object_fromdouble (0.0), - "hits", 0, false); - ucl_object_insert_key (obj, - ucl_object_fromdouble (0.0), - "time", 0, false); - } - } - else { - ucl_object_insert_key (obj, - ucl_object_fromdouble (ROUND_DOUBLE (item->st->weight)), - "weight", 0, false); - ucl_object_insert_key (obj, - ucl_object_fromdouble (ROUND_DOUBLE (item->st->avg_frequency)), - "frequency", 0, false); - ucl_object_insert_key (obj, - ucl_object_fromint (item->st->total_hits), - "hits", 0, false); - ucl_object_insert_key (obj, - ucl_object_fromdouble (ROUND_DOUBLE (item->st->avg_time)), - "time", 0, false); - } - - ucl_array_append (top, obj); -} - -#undef ROUND_DOUBLE - -ucl_object_t * -rspamd_symcache_counters (struct rspamd_symcache *cache) -{ - ucl_object_t *top; - struct counters_cbdata cbd; - - g_assert (cache != NULL); - top = ucl_object_typed_new (UCL_ARRAY); - cbd.top = top; - cbd.cache = cache; - g_hash_table_foreach (cache->items_by_symbol, - rspamd_symcache_counters_cb, &cbd); - - return top; -} - -static void -rspamd_symcache_call_peak_cb (struct ev_loop *ev_base, - struct rspamd_symcache *cache, - struct rspamd_symcache_item *item, - gdouble cur_value, - gdouble cur_err) -{ - lua_State *L = cache->cfg->lua_state; - struct ev_loop **pbase; - - lua_rawgeti (L, LUA_REGISTRYINDEX, cache->peak_cb); - pbase = lua_newuserdata (L, sizeof (*pbase)); - *pbase = ev_base; - rspamd_lua_setclass (L, "rspamd{ev_base}", -1); - lua_pushstring (L, item->symbol); - lua_pushnumber (L, item->st->avg_frequency); - lua_pushnumber (L, sqrt (item->st->stddev_frequency)); - lua_pushnumber (L, cur_value); - lua_pushnumber (L, cur_err); - - if (lua_pcall (L, 6, 0, 0) != 0) { - msg_info_cache ("call to peak function for %s failed: %s", - item->symbol, lua_tostring (L, -1)); - lua_pop (L, 1); - } -} - -static void -rspamd_symcache_resort_cb (EV_P_ ev_timer *w, int revents) -{ - gdouble tm; - struct rspamd_cache_refresh_cbdata *cbdata = - (struct rspamd_cache_refresh_cbdata *)w->data; - struct rspamd_symcache *cache; - struct rspamd_symcache_item *item; - guint i; - gdouble cur_ticks; - static const double decay_rate = 0.25; - - cache = cbdata->cache; - /* Plan new event */ - tm = rspamd_time_jitter (cache->reload_time, 0); - cur_ticks = rspamd_get_ticks (FALSE); - msg_debug_cache ("resort symbols cache, next reload in %.2f seconds", tm); - g_assert (cache != NULL); - cbdata->resort_ev.repeat = tm; - ev_timer_again (EV_A_ w); - - if (rspamd_worker_is_primary_controller (cbdata->w)) { - /* Gather stats from shared execution times */ - for (i = 0; i < cache->filters->len; i ++) { - item = g_ptr_array_index (cache->filters, i); - item->st->total_hits += item->st->hits; - g_atomic_int_set (&item->st->hits, 0); - - if (item->last_count > 0 && cbdata->w->index == 0) { - /* Calculate frequency */ - gdouble cur_err, cur_value; - - cur_value = (item->st->total_hits - item->last_count) / - (cur_ticks - cbdata->last_resort); - rspamd_set_counter_ema (&item->st->frequency_counter, - cur_value, decay_rate); - item->st->avg_frequency = item->st->frequency_counter.mean; - item->st->stddev_frequency = item->st->frequency_counter.stddev; - - if (cur_value > 0) { - msg_debug_cache ("frequency for %s is %.2f, avg: %.2f", - item->symbol, cur_value, item->st->avg_frequency); - } - - cur_err = (item->st->avg_frequency - cur_value); - cur_err *= cur_err; - - /* - * TODO: replace magic number - */ - if (item->st->frequency_counter.number > 10 && - cur_err > sqrt (item->st->stddev_frequency) * 3) { - item->frequency_peaks ++; - msg_debug_cache ("peak found for %s is %.2f, avg: %.2f, " - "stddev: %.2f, error: %.2f, peaks: %d", - item->symbol, cur_value, - item->st->avg_frequency, - item->st->stddev_frequency, - cur_err, - item->frequency_peaks); - - if (cache->peak_cb != -1) { - rspamd_symcache_call_peak_cb (cbdata->event_loop, - cache, item, - cur_value, cur_err); - } - } - } - - item->last_count = item->st->total_hits; - - if (item->cd->number > 0) { - if (item->type & (SYMBOL_TYPE_CALLBACK|SYMBOL_TYPE_NORMAL)) { - item->st->avg_time = item->cd->mean; - rspamd_set_counter_ema (&item->st->time_counter, - item->st->avg_time, decay_rate); - item->st->avg_time = item->st->time_counter.mean; - memset (item->cd, 0, sizeof (*item->cd)); - } - } - } - - cbdata->last_resort = cur_ticks; - /* We don't do actual sorting due to topological guarantees */ - } -} - -static void -rspamd_symcache_refresh_dtor (void *d) -{ - struct rspamd_cache_refresh_cbdata *cbdata = - (struct rspamd_cache_refresh_cbdata *)d; - - ev_timer_stop (cbdata->event_loop, &cbdata->resort_ev); -} - -void -rspamd_symcache_start_refresh (struct rspamd_symcache *cache, - struct ev_loop *ev_base, struct rspamd_worker *w) -{ - gdouble tm; - struct rspamd_cache_refresh_cbdata *cbdata; - - cbdata = rspamd_mempool_alloc0 (cache->static_pool, sizeof (*cbdata)); - cbdata->last_resort = rspamd_get_ticks (TRUE); - cbdata->event_loop = ev_base; - cbdata->w = w; - cbdata->cache = cache; - tm = rspamd_time_jitter (cache->reload_time, 0); - msg_debug_cache ("next reload in %.2f seconds", tm); - g_assert (cache != NULL); - cbdata->resort_ev.data = cbdata; - ev_timer_init (&cbdata->resort_ev, rspamd_symcache_resort_cb, - tm, tm); - ev_timer_start (cbdata->event_loop, &cbdata->resort_ev); - rspamd_mempool_add_destructor (cache->static_pool, - rspamd_symcache_refresh_dtor, cbdata); -} - -void -rspamd_symcache_inc_frequency (struct rspamd_symcache *cache, - struct rspamd_symcache_item *item) -{ - if (item != NULL) { - g_atomic_int_inc (&item->st->hits); - } -} - -void -rspamd_symcache_add_dependency (struct rspamd_symcache *cache, - gint id_from, const gchar *to, - gint virtual_id_from) -{ - struct rspamd_symcache_item *source, *vsource; - struct cache_dependency *dep; - - g_assert (id_from >= 0 && id_from < (gint)cache->items_by_id->len); - - source = (struct rspamd_symcache_item *)g_ptr_array_index (cache->items_by_id, id_from); - dep = rspamd_mempool_alloc (cache->static_pool, sizeof (*dep)); - dep->id = id_from; - dep->sym = rspamd_mempool_strdup (cache->static_pool, to); - /* Will be filled later */ - dep->item = NULL; - dep->vid = -1; - g_ptr_array_add (source->deps, dep); - - if (virtual_id_from >= 0) { - g_assert (virtual_id_from < (gint)cache->virtual->len); - /* We need that for settings id propagation */ - vsource = (struct rspamd_symcache_item *) - g_ptr_array_index (cache->virtual, virtual_id_from); - dep = rspamd_mempool_alloc (cache->static_pool, sizeof (*dep)); - dep->vid = virtual_id_from; - dep->id = -1; - dep->sym = rspamd_mempool_strdup (cache->static_pool, to); - /* Will be filled later */ - dep->item = NULL; - g_ptr_array_add (vsource->deps, dep); - } -} - -void -rspamd_symcache_add_delayed_dependency (struct rspamd_symcache *cache, - const gchar *from, const gchar *to) -{ - struct delayed_cache_dependency *ddep; - - g_assert (from != NULL); - g_assert (to != NULL); - - ddep = g_malloc0 (sizeof (*ddep)); - ddep->from = g_strdup (from); - ddep->to = g_strdup (to); - - cache->delayed_deps = g_list_prepend (cache->delayed_deps, ddep); -} - -gint -rspamd_symcache_find_symbol (struct rspamd_symcache *cache, const gchar *name) -{ - struct rspamd_symcache_item *item; - - g_assert (cache != NULL); - - if (name == NULL) { - return -1; - } - - item = g_hash_table_lookup (cache->items_by_symbol, name); - - if (item != NULL) { - return item->id; - } - - return -1; -} - -gboolean -rspamd_symcache_stat_symbol (struct rspamd_symcache *cache, - const gchar *name, - gdouble *frequency, - gdouble *freq_stddev, - gdouble *tm, - guint *nhits) -{ - struct rspamd_symcache_item *item; - - g_assert (cache != NULL); - - if (name == NULL) { - return FALSE; - } - - item = g_hash_table_lookup (cache->items_by_symbol, name); - - if (item != NULL) { - *frequency = item->st->avg_frequency; - *freq_stddev = sqrt (item->st->stddev_frequency); - *tm = item->st->time_counter.mean; - - if (nhits) { - *nhits = item->st->hits; - } - - return TRUE; - } - - return FALSE; -} - -const gchar * -rspamd_symcache_symbol_by_id (struct rspamd_symcache *cache, - gint id) -{ - struct rspamd_symcache_item *item; - - g_assert (cache != NULL); - - if (id < 0 || id >= (gint)cache->items_by_id->len) { - return NULL; - } - - item = g_ptr_array_index (cache->items_by_id, id); - - return item->symbol; -} - -guint -rspamd_symcache_stats_symbols_count (struct rspamd_symcache *cache) -{ - g_assert (cache != NULL); - - return cache->stats_symbols_count; -} - - -void -rspamd_symcache_disable_all_symbols (struct rspamd_task *task, - struct rspamd_symcache *cache, - guint skip_mask) -{ - struct cache_savepoint *checkpoint; - guint i; - struct rspamd_symcache_item *item; - struct rspamd_symcache_dynamic_item *dyn_item; - - if (task->checkpoint == NULL) { - checkpoint = rspamd_symcache_make_checkpoint (task, cache); - task->checkpoint = checkpoint; - } - else { - checkpoint = task->checkpoint; - } - - /* Enable for squeezed symbols */ - PTR_ARRAY_FOREACH (cache->items_by_id, i, item) { - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - - if (!(item->type & (skip_mask))) { - SET_FINISH_BIT (checkpoint, dyn_item); - SET_START_BIT (checkpoint, dyn_item); - } - } -} - -static void -rspamd_symcache_disable_symbol_checkpoint (struct rspamd_task *task, - struct rspamd_symcache *cache, const gchar *symbol) -{ - struct cache_savepoint *checkpoint; - struct rspamd_symcache_item *item; - struct rspamd_symcache_dynamic_item *dyn_item; - - if (task->checkpoint == NULL) { - checkpoint = rspamd_symcache_make_checkpoint (task, cache); - task->checkpoint = checkpoint; - } - else { - checkpoint = task->checkpoint; - } - - item = rspamd_symcache_find_filter (cache, symbol, true); - - if (item) { - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - SET_FINISH_BIT (checkpoint, dyn_item); - SET_START_BIT (checkpoint, dyn_item); - msg_debug_cache_task ("disable execution of %s", symbol); - } - else { - msg_info_task ("cannot disable %s: not found", symbol); - } -} - -static void -rspamd_symcache_enable_symbol_checkpoint (struct rspamd_task *task, - struct rspamd_symcache *cache, const gchar *symbol) -{ - struct cache_savepoint *checkpoint; - struct rspamd_symcache_item *item; - struct rspamd_symcache_dynamic_item *dyn_item; - - if (task->checkpoint == NULL) { - checkpoint = rspamd_symcache_make_checkpoint (task, cache); - task->checkpoint = checkpoint; - } - else { - checkpoint = task->checkpoint; - } - - item = rspamd_symcache_find_filter (cache, symbol, true); - - if (item) { - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - dyn_item->finished = 0; - dyn_item->started = 0; - msg_debug_cache_task ("enable execution of %s", symbol); - } - else { - msg_info_task ("cannot enable %s: not found", symbol); - } -} - -struct rspamd_abstract_callback_data* -rspamd_symcache_get_cbdata (struct rspamd_symcache *cache, - const gchar *symbol) -{ - struct rspamd_symcache_item *item; - - g_assert (cache != NULL); - g_assert (symbol != NULL); - - item = rspamd_symcache_find_filter (cache, symbol, true); - - if (item) { - return item->specific.normal.user_data; - } - - return NULL; -} - -gboolean -rspamd_symcache_is_checked (struct rspamd_task *task, - struct rspamd_symcache *cache, const gchar *symbol) -{ - struct cache_savepoint *checkpoint; - struct rspamd_symcache_item *item; - struct rspamd_symcache_dynamic_item *dyn_item; - - g_assert (cache != NULL); - g_assert (symbol != NULL); - - if (task->checkpoint == NULL) { - checkpoint = rspamd_symcache_make_checkpoint (task, cache); - task->checkpoint = checkpoint; - } - else { - checkpoint = task->checkpoint; - } - - item = rspamd_symcache_find_filter (cache, symbol, true); - - if (item) { - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - return dyn_item->started; - } - - return FALSE; -} - -void -rspamd_symcache_disable_symbol_perm (struct rspamd_symcache *cache, - const gchar *symbol, - gboolean resolve_parent) -{ - struct rspamd_symcache_item *item; - - g_assert (cache != NULL); - g_assert (symbol != NULL); - - item = rspamd_symcache_find_filter (cache, symbol, resolve_parent); - - if (item) { - item->enabled = FALSE; - } -} - -void -rspamd_symcache_enable_symbol_perm (struct rspamd_symcache *cache, - const gchar *symbol) -{ - struct rspamd_symcache_item *item; - - g_assert (cache != NULL); - g_assert (symbol != NULL); - - item = rspamd_symcache_find_filter (cache, symbol, true); - - if (item) { - item->enabled = TRUE; - } -} - -guint64 -rspamd_symcache_get_cksum (struct rspamd_symcache *cache) -{ - g_assert (cache != NULL); - - return cache->cksum; -} - - -gboolean -rspamd_symcache_is_symbol_enabled (struct rspamd_task *task, - struct rspamd_symcache *cache, - const gchar *symbol) -{ - struct cache_savepoint *checkpoint; - struct rspamd_symcache_item *item; - struct rspamd_symcache_dynamic_item *dyn_item; - lua_State *L; - struct rspamd_task **ptask; - gboolean ret = TRUE; - - g_assert (cache != NULL); - g_assert (symbol != NULL); - - checkpoint = task->checkpoint; - - - if (checkpoint) { - item = rspamd_symcache_find_filter (cache, symbol, true); - - if (item) { - - if (!rspamd_symcache_is_item_allowed (task, item, TRUE)) { - ret = FALSE; - } - else { - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - if (CHECK_START_BIT (checkpoint, dyn_item)) { - ret = FALSE; - } - else { - if (item->specific.normal.conditions) { - struct rspamd_symcache_condition *cur_cond; - - DL_FOREACH (item->specific.normal.conditions, cur_cond) { - /* - * We also executes condition callback to check - * if we need this symbol - */ - L = task->cfg->lua_state; - lua_rawgeti (L, LUA_REGISTRYINDEX, cur_cond->cb); - ptask = lua_newuserdata (L, sizeof (struct rspamd_task *)); - rspamd_lua_setclass (L, "rspamd{task}", -1); - *ptask = task; - - if (lua_pcall (L, 1, 1, 0) != 0) { - msg_info_task ("call to condition for %s failed: %s", - item->symbol, lua_tostring (L, -1)); - lua_pop (L, 1); - } - else { - ret = lua_toboolean (L, -1); - lua_pop (L, 1); - } - - if (!ret) { - break; - } - } - } - } - } - } - } - - return ret; -} - - -gboolean -rspamd_symcache_enable_symbol (struct rspamd_task *task, - struct rspamd_symcache *cache, - const gchar *symbol) -{ - struct cache_savepoint *checkpoint; - struct rspamd_symcache_item *item; - struct rspamd_symcache_dynamic_item *dyn_item; - gboolean ret = FALSE; - - g_assert (cache != NULL); - g_assert (symbol != NULL); - - checkpoint = task->checkpoint; - - if (checkpoint) { - item = rspamd_symcache_find_filter (cache, symbol, true); - - if (item) { - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - - if (!CHECK_FINISH_BIT (checkpoint, dyn_item)) { - ret = TRUE; - CLR_START_BIT (checkpoint, dyn_item); - CLR_FINISH_BIT (checkpoint, dyn_item); - } - else { - msg_debug_task ("cannot enable symbol %s: already started", symbol); - } - } - } - - return ret; -} - - -gboolean -rspamd_symcache_disable_symbol (struct rspamd_task *task, - struct rspamd_symcache *cache, - const gchar *symbol) -{ - struct cache_savepoint *checkpoint; - struct rspamd_symcache_item *item; - struct rspamd_symcache_dynamic_item *dyn_item; - gboolean ret = FALSE; - - g_assert (cache != NULL); - g_assert (symbol != NULL); - - checkpoint = task->checkpoint; - - if (checkpoint) { - item = rspamd_symcache_find_filter (cache, symbol, true); - - if (item) { - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - - if (!CHECK_START_BIT (checkpoint, dyn_item)) { - ret = TRUE; - SET_START_BIT (checkpoint, dyn_item); - SET_FINISH_BIT (checkpoint, dyn_item); - } - else { - if (!CHECK_FINISH_BIT (checkpoint, dyn_item)) { - msg_warn_task ("cannot disable symbol %s: already started", - symbol); - } - } - } - } - - return ret; -} - -void -rspamd_symcache_foreach (struct rspamd_symcache *cache, - void (*func) (struct rspamd_symcache_item *, gpointer), - gpointer ud) -{ - struct rspamd_symcache_item *item; - GHashTableIter it; - gpointer k, v; - - g_hash_table_iter_init (&it, cache->items_by_symbol); - - while (g_hash_table_iter_next (&it, &k, &v)) { - item = (struct rspamd_symcache_item *)v; - func (item, ud); - } -} - -struct rspamd_symcache_item * -rspamd_symcache_get_cur_item (struct rspamd_task *task) -{ - struct cache_savepoint *checkpoint = task->checkpoint; - - if (checkpoint == NULL) { - return NULL; - } - - return checkpoint->cur_item; -} - -/** - * Replaces the current item being processed. - * Returns the current item being processed (if any) - * @param task - * @param item - * @return - */ -struct rspamd_symcache_item * -rspamd_symcache_set_cur_item (struct rspamd_task *task, - struct rspamd_symcache_item *item) -{ - struct cache_savepoint *checkpoint = task->checkpoint; - struct rspamd_symcache_item *ex; - - ex = checkpoint->cur_item; - checkpoint->cur_item = item; - - return ex; -} - -struct rspamd_symcache_delayed_cbdata { - struct rspamd_symcache_item *item; - struct rspamd_task *task; - struct rspamd_async_event *event; - struct ev_timer tm; -}; - -static void -rspamd_symcache_delayed_item_fin (gpointer ud) -{ - struct rspamd_symcache_delayed_cbdata *cbd = - (struct rspamd_symcache_delayed_cbdata *)ud; - struct rspamd_task *task; - struct cache_savepoint *checkpoint; - - task = cbd->task; - checkpoint = task->checkpoint; - checkpoint->has_slow = FALSE; - ev_timer_stop (task->event_loop, &cbd->tm); -} - -static void -rspamd_symcache_delayed_item_cb (EV_P_ ev_timer *w, int what) -{ - struct rspamd_symcache_delayed_cbdata *cbd = - (struct rspamd_symcache_delayed_cbdata *)w->data; - struct rspamd_symcache_item *item; - struct rspamd_task *task; - struct cache_dependency *rdep; - struct cache_savepoint *checkpoint; - struct rspamd_symcache_dynamic_item *dyn_item; - guint i; - - item = cbd->item; - task = cbd->task; - checkpoint = task->checkpoint; - cbd->event = NULL; - - /* Timer will be stopped here */ - rspamd_session_remove_event (task->s, - rspamd_symcache_delayed_item_fin, cbd); - - /* Process all reverse dependencies */ - PTR_ARRAY_FOREACH (item->rdeps, i, rdep) { - if (rdep->item) { - dyn_item = rspamd_symcache_get_dynamic (checkpoint, rdep->item); - if (!CHECK_START_BIT (checkpoint, dyn_item)) { - msg_debug_cache_task ("check item %d(%s) rdep of %s ", - rdep->item->id, rdep->item->symbol, item->symbol); - - if (!rspamd_symcache_check_deps (task, task->cfg->cache, - rdep->item, - checkpoint, 0, FALSE)) { - msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s " - "unless deps are resolved", - rdep->item->id, rdep->item->symbol, item->symbol); - } - else { - rspamd_symcache_check_symbol (task, task->cfg->cache, - rdep->item, - checkpoint); - } - } - } - } -} - -static void -rspamd_delayed_timer_dtor (gpointer d) -{ - struct rspamd_symcache_delayed_cbdata *cbd = - (struct rspamd_symcache_delayed_cbdata *)d; - - if (cbd->event) { - /* Event has not been executed */ - rspamd_session_remove_event (cbd->task->s, - rspamd_symcache_delayed_item_fin, cbd); - cbd->event = NULL; - } -} - -/** - * Finalize the current async element potentially calling its deps - */ -void -rspamd_symcache_finalize_item (struct rspamd_task *task, - struct rspamd_symcache_item *item) -{ - struct cache_savepoint *checkpoint = task->checkpoint; - struct cache_dependency *rdep; - struct rspamd_symcache_dynamic_item *dyn_item; - gdouble diff; - guint i; - gboolean enable_slow_timer = FALSE; - const gdouble slow_diff_limit = 300; - - /* Sanity checks */ - g_assert (checkpoint->items_inflight > 0); - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - - if (dyn_item->async_events > 0) { - /* - * XXX: Race condition - * - * It is possible that some async event is still in flight, but we - * already know its result, however, it is the responsibility of that - * event to decrease async events count and call this function - * one more time - */ - msg_debug_cache_task ("postpone finalisation of %s(%d) as there are %d " - "async events pending", - item->symbol, item->id, dyn_item->async_events); - - return; - } - - msg_debug_cache_task ("process finalize for item %s(%d)", item->symbol, item->id); - SET_FINISH_BIT (checkpoint, dyn_item); - checkpoint->items_inflight --; - checkpoint->cur_item = NULL; - - if (checkpoint->profile) { - ev_now_update_if_cheap (task->event_loop); - diff = ((ev_now (task->event_loop) - checkpoint->profile_start) * 1e3 - - dyn_item->start_msec); - - if (diff > slow_diff_limit) { - - if (!checkpoint->has_slow) { - checkpoint->has_slow = TRUE; - enable_slow_timer = TRUE; - msg_info_task ("slow rule: %s(%d): %.2f ms; enable slow timer delay", - item->symbol, item->id, - diff); - } - else { - msg_info_task ("slow rule: %s(%d): %.2f ms", - item->symbol, item->id, - diff); - } - } - - if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) { - rspamd_task_profile_set (task, item->symbol, diff); - } - - if (rspamd_worker_is_scanner (task->worker)) { - rspamd_set_counter (item->cd, diff); - } - } - - if (enable_slow_timer) { - struct rspamd_symcache_delayed_cbdata *cbd = - rspamd_mempool_alloc (task->task_pool,sizeof (*cbd)); - /* Add timer to allow something else to be executed */ - ev_timer *tm = &cbd->tm; - - cbd->event = rspamd_session_add_event (task->s, - rspamd_symcache_delayed_item_fin, cbd, - "symcache"); - - /* - * If no event could be added, then we are already in the destruction - * phase. So the main issue is to deal with has slow here - */ - if (cbd->event) { - ev_timer_init (tm, rspamd_symcache_delayed_item_cb, 0.1, 0.0); - ev_set_priority (tm, EV_MINPRI); - rspamd_mempool_add_destructor (task->task_pool, - rspamd_delayed_timer_dtor, cbd); - - cbd->task = task; - cbd->item = item; - tm->data = cbd; - ev_timer_start (task->event_loop, tm); - } - else { - /* Just reset as no timer is added */ - checkpoint->has_slow = FALSE; - } - - return; - } - - /* Process all reverse dependencies */ - PTR_ARRAY_FOREACH (item->rdeps, i, rdep) { - if (rdep->item) { - dyn_item = rspamd_symcache_get_dynamic (checkpoint, rdep->item); - if (!CHECK_START_BIT (checkpoint, dyn_item)) { - msg_debug_cache_task ("check item %d(%s) rdep of %s ", - rdep->item->id, rdep->item->symbol, item->symbol); - - if (!rspamd_symcache_check_deps (task, task->cfg->cache, - rdep->item, - checkpoint, 0, FALSE)) { - msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s " - "unless deps are resolved", - rdep->item->id, rdep->item->symbol, item->symbol); - } - else { - rspamd_symcache_check_symbol (task, task->cfg->cache, - rdep->item, - checkpoint); - } - } - } - } -} - -guint -rspamd_symcache_item_async_inc_full (struct rspamd_task *task, - struct rspamd_symcache_item *item, - const gchar *subsystem, - const gchar *loc) -{ - struct rspamd_symcache_dynamic_item *dyn_item; - struct cache_savepoint *checkpoint = task->checkpoint; - - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - msg_debug_cache_task ("increase async events counter for %s(%d) = %d + 1; " - "subsystem %s (%s)", - item->symbol, item->id, dyn_item->async_events, subsystem, loc); - return ++dyn_item->async_events; -} - -guint -rspamd_symcache_item_async_dec_full (struct rspamd_task *task, - struct rspamd_symcache_item *item, - const gchar *subsystem, - const gchar *loc) -{ - struct rspamd_symcache_dynamic_item *dyn_item; - struct cache_savepoint *checkpoint = task->checkpoint; - - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - msg_debug_cache_task ("decrease async events counter for %s(%d) = %d - 1; " - "subsystem %s (%s)", - item->symbol, item->id, dyn_item->async_events, subsystem, loc); - g_assert (dyn_item->async_events > 0); - - return --dyn_item->async_events; -} - -gboolean -rspamd_symcache_item_async_dec_check_full (struct rspamd_task *task, - struct rspamd_symcache_item *item, - const gchar *subsystem, - const gchar *loc) -{ - if (rspamd_symcache_item_async_dec_full (task, item, subsystem, loc) == 0) { - rspamd_symcache_finalize_item (task, item); - - return TRUE; - } - - return FALSE; -} - -gboolean -rspamd_symcache_add_symbol_flags (struct rspamd_symcache *cache, - const gchar *symbol, - guint flags) -{ - struct rspamd_symcache_item *item; - - g_assert (cache != NULL); - g_assert (symbol != NULL); - - item = rspamd_symcache_find_filter (cache, symbol, true); - - if (item) { - item->type |= flags; - - return TRUE; - } - - return FALSE; -} - -gboolean -rspamd_symcache_set_symbol_flags (struct rspamd_symcache *cache, - const gchar *symbol, - guint flags) -{ - struct rspamd_symcache_item *item; - - g_assert (cache != NULL); - g_assert (symbol != NULL); - - item = rspamd_symcache_find_filter (cache, symbol, true); - - if (item) { - item->type = flags; - - return TRUE; - } - - return FALSE; -} - -void -rspamd_symcache_get_symbol_details(struct rspamd_symcache *cache, - const gchar *symbol, - ucl_object_t *this_sym_ucl) -{ - struct rspamd_symcache_item *item; - - g_assert (cache != NULL); - g_assert (symbol != NULL); - - item = rspamd_symcache_find_filter (cache, symbol, false); - - if (item) { - ucl_object_insert_key (this_sym_ucl, - ucl_object_fromstring(item->type_descr), - "type", strlen("type"), false); - - // any other data? - } -} - -guint -rspamd_symcache_get_symbol_flags (struct rspamd_symcache *cache, - const gchar *symbol) -{ - struct rspamd_symcache_item *item; - - g_assert (cache != NULL); - g_assert (symbol != NULL); - - item = rspamd_symcache_find_filter (cache, symbol, true); - - if (item) { - return item->type; - } - - return 0; -} - -void -rspamd_symcache_composites_foreach (struct rspamd_task *task, - struct rspamd_symcache *cache, - GHFunc func, - gpointer fd) -{ - guint i; - struct rspamd_symcache_item *item; - struct rspamd_symcache_dynamic_item *dyn_item; - - if (task->checkpoint == NULL) { - return; - } - - PTR_ARRAY_FOREACH (cache->composites, i, item) { - dyn_item = rspamd_symcache_get_dynamic (task->checkpoint, item); - - if (!CHECK_START_BIT (task->checkpoint, dyn_item)) { - /* Cannot do it due to 2 passes */ - /* SET_START_BIT (task->checkpoint, dyn_item); */ - func (item->symbol, item->specific.normal.user_data, fd); - SET_FINISH_BIT (task->checkpoint, dyn_item); - } - } -} - -bool -rspamd_symcache_set_allowed_settings_ids (struct rspamd_symcache *cache, - const gchar *symbol, - const guint32 *ids, - guint nids) -{ - struct rspamd_symcache_item *item; - - item = rspamd_symcache_find_filter (cache, symbol, false); - - if (item == NULL) { - return false; - } - - if (nids <= G_N_ELEMENTS (item->allowed_ids.st)) { - /* Use static version */ - memset (&item->allowed_ids, 0, sizeof (item->allowed_ids)); - for (guint i = 0; i < nids; i++) { - item->allowed_ids.st[i] = ids[i]; - } - } - else { - /* Need to use a separate list */ - item->allowed_ids.dyn.e = -1; /* Flag */ - item->allowed_ids.dyn.n = rspamd_mempool_alloc (cache->static_pool, - sizeof (guint32) * nids); - item->allowed_ids.dyn.len = nids; - item->allowed_ids.dyn.allocated = nids; - - for (guint i = 0; i < nids; i++) { - item->allowed_ids.dyn.n[i] = ids[i]; - } - - /* Keep sorted */ - qsort (item->allowed_ids.dyn.n, nids, sizeof (guint32), rspamd_id_cmp); - } - - return true; -} - -bool -rspamd_symcache_set_forbidden_settings_ids (struct rspamd_symcache *cache, - const gchar *symbol, - const guint32 *ids, - guint nids) -{ - struct rspamd_symcache_item *item; - - item = rspamd_symcache_find_filter (cache, symbol, false); - - if (item == NULL) { - return false; - } - - g_assert (nids < G_MAXUINT16); - - if (nids <= G_N_ELEMENTS (item->forbidden_ids.st)) { - /* Use static version */ - memset (&item->forbidden_ids, 0, sizeof (item->forbidden_ids)); - for (guint i = 0; i < nids; i++) { - item->forbidden_ids.st[i] = ids[i]; - } - } - else { - /* Need to use a separate list */ - item->forbidden_ids.dyn.e = -1; /* Flag */ - item->forbidden_ids.dyn.n = rspamd_mempool_alloc (cache->static_pool, - sizeof (guint32) * nids); - item->forbidden_ids.dyn.len = nids; - item->forbidden_ids.dyn.allocated = nids; - - for (guint i = 0; i < nids; i++) { - item->forbidden_ids.dyn.n[i] = ids[i]; - } - - /* Keep sorted */ - qsort (item->forbidden_ids.dyn.n, nids, sizeof (guint32), rspamd_id_cmp); - } - - return true; -} - -const guint32* -rspamd_symcache_get_allowed_settings_ids (struct rspamd_symcache *cache, - const gchar *symbol, - guint *nids) -{ - struct rspamd_symcache_item *item; - guint cnt = 0; - - item = rspamd_symcache_find_filter (cache, symbol, false); - - if (item == NULL) { - return NULL; - } - - if (item->allowed_ids.dyn.e == -1) { - /* Dynamic list */ - *nids = item->allowed_ids.dyn.len; - - return item->allowed_ids.dyn.n; - } - else { - while (item->allowed_ids.st[cnt] != 0 && cnt < G_N_ELEMENTS (item->allowed_ids.st)) { - cnt ++; - } - - *nids = cnt; - - return item->allowed_ids.st; - } -} - -const guint32* -rspamd_symcache_get_forbidden_settings_ids (struct rspamd_symcache *cache, - const gchar *symbol, - guint *nids) -{ - struct rspamd_symcache_item *item; - guint cnt = 0; - - item = rspamd_symcache_find_filter (cache, symbol, false); - - if (item == NULL) { - return NULL; - } - - if (item->forbidden_ids.dyn.e == -1) { - /* Dynamic list */ - *nids = item->allowed_ids.dyn.len; - - return item->allowed_ids.dyn.n; - } - else { - while (item->forbidden_ids.st[cnt] != 0 && cnt < G_N_ELEMENTS (item->allowed_ids.st)) { - cnt ++; - } - - *nids = cnt; - - return item->forbidden_ids.st; - } -} - -/* Insertion sort: usable for near-sorted ids list */ -static inline void -rspamd_ids_insertion_sort (guint *a, guint n) -{ - for (guint i = 1; i < n; i++) { - guint32 tmp = a[i]; - guint j = i; - - while (j > 0 && tmp < a[j - 1]) { - a[j] = a[j - 1]; - j --; - } - - a[j] = tmp; - } -} - -static inline void -rspamd_symcache_add_id_to_list (rspamd_mempool_t *pool, - struct rspamd_symcache_id_list *ls, - guint32 id) -{ - guint cnt = 0; - guint *new_array; - - if (ls->st[0] == -1) { - /* Dynamic array */ - if (ls->dyn.len < ls->dyn.allocated) { - /* Trivial, append + sort */ - ls->dyn.n[ls->dyn.len++] = id; - } - else { - /* Reallocate */ - g_assert (ls->dyn.allocated <= G_MAXINT16); - ls->dyn.allocated *= 2; - - new_array = rspamd_mempool_alloc (pool, - ls->dyn.allocated * sizeof (guint32)); - memcpy (new_array, ls->dyn.n, ls->dyn.len * sizeof (guint32)); - ls->dyn.n = new_array; - ls->dyn.n[ls->dyn.len++] = id; - } - - rspamd_ids_insertion_sort (ls->dyn.n, ls->dyn.len); - } - else { - /* Static part */ - while (ls->st[cnt] != 0 && cnt < G_N_ELEMENTS (ls->st)) { - cnt ++; - } - - if (cnt < G_N_ELEMENTS (ls->st)) { - ls->st[cnt] = id; - } - else { - /* Switch to dynamic */ - new_array = rspamd_mempool_alloc (pool, - G_N_ELEMENTS (ls->st) * 2 * sizeof (guint32)); - memcpy (new_array, ls->st, G_N_ELEMENTS (ls->st) * sizeof (guint32)); - ls->dyn.n = new_array; - ls->dyn.e = -1; - ls->dyn.allocated = G_N_ELEMENTS (ls->st) * 2; - ls->dyn.len = G_N_ELEMENTS (ls->st); - - /* Recursively jump to dynamic branch that will handle insertion + sorting */ - rspamd_symcache_add_id_to_list (pool, ls, id); - } - } -} - -void -rspamd_symcache_process_settings_elt (struct rspamd_symcache *cache, - struct rspamd_config_settings_elt *elt) -{ - guint32 id = elt->id; - ucl_object_iter_t iter; - struct rspamd_symcache_item *item, *parent; - const ucl_object_t *cur; - - - if (elt->symbols_disabled) { - /* Process denied symbols */ - iter = NULL; - - while ((cur = ucl_object_iterate (elt->symbols_disabled, &iter, true)) != NULL) { - const gchar *sym = ucl_object_key (cur); - item = rspamd_symcache_find_filter (cache, sym, false); - - if (item) { - if (item->is_virtual) { - /* - * Virtual symbols are special: - * we ignore them in symcache but prevent them from being - * inserted. - */ - rspamd_symcache_add_id_to_list (cache->static_pool, - &item->forbidden_ids, id); - msg_debug_cache ("deny virtual symbol %s for settings %ud (%s); " - "parent can still be executed", - sym, id, elt->name); - } - else { - /* Normal symbol, disable it */ - rspamd_symcache_add_id_to_list (cache->static_pool, - &item->forbidden_ids, id); - msg_debug_cache ("deny symbol %s for settings %ud (%s)", - sym, id, elt->name); - } - } - else { - msg_warn_cache ("cannot find a symbol to disable %s " - "when processing settings %ud (%s)", - sym, id, elt->name); - } - } - } - - if (elt->symbols_enabled) { - iter = NULL; - - while ((cur = ucl_object_iterate (elt->symbols_enabled, &iter, true)) != NULL) { - /* Here, we resolve parent and explicitly allow it */ - const gchar *sym = ucl_object_key (cur); - item = rspamd_symcache_find_filter (cache, sym, false); - - if (item) { - if (item->is_virtual) { - if (!(item->type & SYMBOL_TYPE_GHOST)) { - parent = rspamd_symcache_find_filter (cache, sym, true); - - if (parent) { - if (elt->symbols_disabled && - ucl_object_lookup (elt->symbols_disabled, parent->symbol)) { - msg_err_cache ("conflict in %s: cannot enable disabled symbol %s, " - "wanted to enable symbol %s", - elt->name, parent->symbol, sym); - continue; - } - - rspamd_symcache_add_id_to_list (cache->static_pool, - &parent->exec_only_ids, id); - msg_debug_cache ("allow just execution of symbol %s for settings %ud (%s)", - parent->symbol, id, elt->name); - } - } - /* Ignore ghosts */ - } - - rspamd_symcache_add_id_to_list (cache->static_pool, - &item->allowed_ids, id); - msg_debug_cache ("allow execution of symbol %s for settings %ud (%s)", - sym, id, elt->name); - } - else { - msg_warn_cache ("cannot find a symbol to enable %s " - "when processing settings %ud (%s)", - sym, id, elt->name); - } - } - } -} - -gint -rspamd_symcache_item_flags (struct rspamd_symcache_item *item) -{ - if (item) { - return item->type; - } - - return 0; -} - -const gchar* -rspamd_symcache_item_name (struct rspamd_symcache_item *item) -{ - return item ? item->symbol : NULL; -} - -const struct rspamd_symcache_item_stat * -rspamd_symcache_item_stat (struct rspamd_symcache_item *item) -{ - return item ? item->st : NULL; -} - -gboolean -rspamd_symcache_item_is_enabled (struct rspamd_symcache_item *item) -{ - if (item) { - if (!item->enabled) { - return FALSE; - } - - if (item->is_virtual && item->specific.virtual.parent_item != NULL) { - return rspamd_symcache_item_is_enabled (item->specific.virtual.parent_item); - } - - return TRUE; - } - - return FALSE; -} - -struct rspamd_symcache_item * rspamd_symcache_item_get_parent ( - struct rspamd_symcache_item *item) -{ - if (item && item->is_virtual && item->specific.virtual.parent_item != NULL) { - return item->specific.virtual.parent_item; - } - - return NULL; -} - -const GPtrArray* -rspamd_symcache_item_get_deps (struct rspamd_symcache_item *item) -{ - struct rspamd_symcache_item *parent; - - if (item) { - parent = rspamd_symcache_item_get_parent (item); - - if (parent) { - item = parent; - } - - return item->deps; - } - - return NULL; -} - -const GPtrArray* -rspamd_symcache_item_get_rdeps (struct rspamd_symcache_item *item) -{ - struct rspamd_symcache_item *parent; - - if (item) { - parent = rspamd_symcache_item_get_parent (item); - - if (parent) { - item = parent; - } - - return item->rdeps; - } - - return NULL; -} - -void -rspamd_symcache_enable_profile (struct rspamd_task *task) -{ - struct cache_savepoint *checkpoint = task->checkpoint; - - if (checkpoint && !checkpoint->profile) { - ev_now_update_if_cheap (task->event_loop); - ev_tstamp now = ev_now (task->event_loop); - checkpoint->profile_start = now; - - msg_debug_cache_task ("enable profiling of symbols for task"); - checkpoint->profile = TRUE; - } -} diff --git a/src/libserver/rspamd_symcache.h b/src/libserver/rspamd_symcache.h index 1d670db04..e47ea3edd 100644 --- a/src/libserver/rspamd_symcache.h +++ b/src/libserver/rspamd_symcache.h @@ -69,6 +69,9 @@ struct rspamd_abstract_callback_data { char data[]; }; +/** + * Shared memory block specific for each symbol + */ struct rspamd_symcache_item_stat { struct rspamd_counter_data time_counter; gdouble avg_time; @@ -169,15 +172,6 @@ gboolean rspamd_symcache_stat_symbol (struct rspamd_symcache *cache, guint *nhits); /** - * Find symbol in cache by its id - * @param cache - * @param id - * @return symbol's name or NULL - */ -const gchar *rspamd_symcache_symbol_by_id (struct rspamd_symcache *cache, - gint id); - -/** * Returns number of symbols registered in symbols cache * @param cache * @return number of symbols in the cache @@ -185,16 +179,6 @@ const gchar *rspamd_symcache_symbol_by_id (struct rspamd_symcache *cache, guint rspamd_symcache_stats_symbols_count (struct rspamd_symcache *cache); /** - * Call function for cached symbol using saved callback - * @param task task object - * @param cache symbols cache - * @param saved_item pointer to currently saved item - */ -gboolean rspamd_symcache_process_symbols (struct rspamd_task *task, - struct rspamd_symcache *cache, - gint stage); - -/** * Validate cache items against theirs weights defined in metrics * @param cache symbols cache * @param cfg configuration @@ -205,6 +189,16 @@ gboolean rspamd_symcache_validate (struct rspamd_symcache *cache, gboolean strict); /** + * Call function for cached symbol using saved callback + * @param task task object + * @param cache symbols cache + * @param saved_item pointer to currently saved item + */ +gboolean rspamd_symcache_process_symbols (struct rspamd_task *task, + struct rspamd_symcache *cache, + gint stage); + +/** * Return statistics about the cache as ucl object (array of objects one per item) * @param cache * @return @@ -216,7 +210,7 @@ ucl_object_t *rspamd_symcache_counters (struct rspamd_symcache *cache); * @param cache * @param ev_base */ -void rspamd_symcache_start_refresh (struct rspamd_symcache *cache, +void* rspamd_symcache_start_refresh (struct rspamd_symcache *cache, struct ev_loop *ev_base, struct rspamd_worker *w); @@ -229,17 +223,6 @@ void rspamd_symcache_inc_frequency (struct rspamd_symcache *cache, struct rspamd_symcache_item *item); /** - * Add dependency relation between two symbols identified by id (source) and - * a symbolic name (destination). Destination could be virtual or real symbol. - * Callback destinations are not yet supported. - * @param id_from source symbol - * @param to destination name - */ -void rspamd_symcache_add_dependency (struct rspamd_symcache *cache, - gint id_from, const gchar *to, - gint virtual_id_from); - -/** * Add delayed dependency that is resolved on cache post-load routine * @param cache * @param from @@ -249,23 +232,6 @@ void rspamd_symcache_add_delayed_dependency (struct rspamd_symcache *cache, const gchar *from, const gchar *to); /** - * Disable specific symbol in the cache - * @param cache - * @param symbol - */ -void rspamd_symcache_disable_symbol_perm (struct rspamd_symcache *cache, - const gchar *symbol, - gboolean resolve_parent); - -/** - * Enable specific symbol in the cache - * @param cache - * @param symbol - */ -void rspamd_symcache_enable_symbol_perm (struct rspamd_symcache *cache, - const gchar *symbol); - -/** * Get abstract callback data for a symbol (or its parent symbol) * @param cache cache object * @param symbol symbol name @@ -283,21 +249,6 @@ struct rspamd_abstract_callback_data *rspamd_symcache_get_cbdata ( const gchar *rspamd_symcache_get_parent (struct rspamd_symcache *cache, const gchar *symbol); -/** - * Adds flags to a symbol - * @param cache - * @param symbol - * @param flags - * @return - */ -gboolean rspamd_symcache_add_symbol_flags (struct rspamd_symcache *cache, - const gchar *symbol, - guint flags); - -gboolean rspamd_symcache_set_symbol_flags (struct rspamd_symcache *cache, - const gchar *symbol, - guint flags); - guint rspamd_symcache_get_symbol_flags (struct rspamd_symcache *cache, const gchar *symbol); @@ -546,34 +497,6 @@ const gchar* rspamd_symcache_item_name (struct rspamd_symcache_item *item); */ const struct rspamd_symcache_item_stat * rspamd_symcache_item_stat (struct rspamd_symcache_item *item); -/** - * Returns if an item is enabled (for virtual it also means that parent should be enabled) - * @param item - * @return - */ -gboolean rspamd_symcache_item_is_enabled (struct rspamd_symcache_item *item); -/** - * Returns parent for virtual symbols (or NULL) - * @param item - * @return - */ -struct rspamd_symcache_item * rspamd_symcache_item_get_parent ( - struct rspamd_symcache_item *item); -/** - * Returns direct deps for an element - * @param item - * @return array of struct rspamd_symcache_item * - */ -const GPtrArray* rspamd_symcache_item_get_deps ( - struct rspamd_symcache_item *item); -/** - * Returns direct reverse deps for an element - * @param item - * @return array of struct rspamd_symcache_item * - */ -const GPtrArray* rspamd_symcache_item_get_rdeps ( - struct rspamd_symcache_item *item); - /** * Enable profiling for task (e.g. when a slow rule has been found) diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx new file mode 100644 index 000000000..d4ebf4be3 --- /dev/null +++ b/src/libserver/symcache/symcache_c.cxx @@ -0,0 +1,557 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "symcache_internal.hxx" +#include "symcache_periodic.hxx" +#include "symcache_item.hxx" +#include "symcache_runtime.hxx" + +/** + * C API for symcache + */ + +#define C_API_SYMCACHE(ptr) (reinterpret_cast<rspamd::symcache::symcache *>(ptr)) +#define C_API_SYMCACHE_RUNTIME(ptr) (reinterpret_cast<rspamd::symcache::symcache_runtime *>(ptr)) +#define C_API_SYMCACHE_ITEM(ptr) (reinterpret_cast<rspamd::symcache::cache_item *>(ptr)) + +void +rspamd_symcache_destroy(struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + delete real_cache; +} + +struct rspamd_symcache * +rspamd_symcache_new(struct rspamd_config *cfg) +{ + auto *ncache = new rspamd::symcache::symcache(cfg); + + return (struct rspamd_symcache *) ncache; +} + +gboolean +rspamd_symcache_init(struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + return real_cache->init(); +} + +void +rspamd_symcache_save(struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + real_cache->save_items(); +} + +gint +rspamd_symcache_add_symbol(struct rspamd_symcache *cache, + const gchar *name, + gint priority, + symbol_func_t func, + gpointer user_data, + enum rspamd_symbol_type type, + gint parent) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + if (parent == -1) { + return real_cache->add_symbol_with_callback(name, priority, func, user_data, type); + } + else { + return real_cache->add_virtual_symbol(name, parent, type); + } +} + +void +rspamd_symcache_set_peak_callback(struct rspamd_symcache *cache, gint cbref) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + real_cache->set_peak_cb(cbref); +} + +gboolean +rspamd_symcache_add_condition_delayed(struct rspamd_symcache *cache, + const gchar *sym, lua_State *L, gint cbref) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + real_cache->add_delayed_condition(sym, cbref); + + return TRUE; +} + +gint rspamd_symcache_find_symbol(struct rspamd_symcache *cache, + const gchar *name) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto sym_maybe = real_cache->get_item_by_name(name, false); + + if (sym_maybe != nullptr) { + return sym_maybe->id; + } + + return -1; +} + +gboolean rspamd_symcache_stat_symbol(struct rspamd_symcache *cache, + const gchar *name, + gdouble *frequency, + gdouble *freq_stddev, + gdouble *tm, + guint *nhits) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto sym_maybe = real_cache->get_item_by_name(name, false); + + if (sym_maybe != nullptr) { + *frequency = sym_maybe->st->avg_frequency; + *freq_stddev = sqrt(sym_maybe->st->stddev_frequency); + *tm = sym_maybe->st->time_counter.mean; + + if (nhits) { + *nhits = sym_maybe->st->hits; + } + + return TRUE; + } + + return FALSE; +} + + +guint +rspamd_symcache_stats_symbols_count(struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + return real_cache->get_stats_symbols_count(); +} + +guint64 +rspamd_symcache_get_cksum(struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + return real_cache->get_cksum(); +} + +gboolean +rspamd_symcache_validate(struct rspamd_symcache *cache, + struct rspamd_config *cfg, + gboolean strict) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + return real_cache->validate(strict); +} + +ucl_object_t * +rspamd_symcache_counters(struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + return real_cache->counters(); +} + +void * +rspamd_symcache_start_refresh(struct rspamd_symcache *cache, + struct ev_loop *ev_base, struct rspamd_worker *w) +{ + auto *real_cache = C_API_SYMCACHE(cache); + return new rspamd::symcache::cache_refresh_cbdata{real_cache, ev_base, w}; +} + +void +rspamd_symcache_inc_frequency(struct rspamd_symcache *_cache, struct rspamd_symcache_item *item) +{ + auto *real_item = C_API_SYMCACHE_ITEM(item); + + if (real_item) { + real_item->inc_frequency(); + } +} + +void +rspamd_symcache_add_delayed_dependency(struct rspamd_symcache *cache, + const gchar *from, const gchar *to) +{ + auto *real_cache = C_API_SYMCACHE(cache); + real_cache->add_delayed_dependency(from, to); +} + +const gchar * +rspamd_symcache_get_parent(struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto *sym = real_cache->get_item_by_name(symbol, false); + + if (sym && sym->is_virtual()) { + auto *parent = sym->get_parent(*real_cache); + + if (parent) { + return parent->get_name().c_str(); + } + } + + return nullptr; +} + +const gchar * +rspamd_symcache_item_name(struct rspamd_symcache_item *item) +{ + auto *real_item = C_API_SYMCACHE_ITEM(item); + return real_item->get_name().c_str(); +} + +gint +rspamd_symcache_item_flags(struct rspamd_symcache_item *item) +{ + auto *real_item = C_API_SYMCACHE_ITEM(item); + return real_item->get_flags(); +} + +guint +rspamd_symcache_get_symbol_flags(struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto *sym = real_cache->get_item_by_name(symbol, false); + + if (sym) { + return sym->get_flags(); + } + + return 0; +} + +const struct rspamd_symcache_item_stat * +rspamd_symcache_item_stat(struct rspamd_symcache_item *item) +{ + auto *real_item = C_API_SYMCACHE_ITEM(item); + return real_item->st; +} + +void +rspamd_symcache_get_symbol_details(struct rspamd_symcache *cache, + const gchar *symbol, + ucl_object_t *this_sym_ucl) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto *sym = real_cache->get_item_by_name(symbol, false); + + if (sym) { + ucl_object_insert_key(this_sym_ucl, + ucl_object_fromstring(sym->get_type_str()), + "type", strlen("type"), false); + } +} + +void +rspamd_symcache_foreach(struct rspamd_symcache *cache, + void (*func)(struct rspamd_symcache_item *item, gpointer /* userdata */), + gpointer ud) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + real_cache->symbols_foreach([&](const rspamd::symcache::cache_item *item) { + func((struct rspamd_symcache_item *) item, ud); + }); +} + +void +rspamd_symcache_process_settings_elt(struct rspamd_symcache *cache, + struct rspamd_config_settings_elt *elt) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + real_cache->process_settings_elt(elt); +} + +bool +rspamd_symcache_set_allowed_settings_ids(struct rspamd_symcache *cache, + const gchar *symbol, + const guint32 *ids, + guint nids) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto *item = real_cache->get_item_by_name_mut(symbol, false); + + if (item == nullptr) { + return false; + } + + item->allowed_ids.set_ids(ids, nids, real_cache->get_pool()); + return true; +} + +bool +rspamd_symcache_set_forbidden_settings_ids(struct rspamd_symcache *cache, + const gchar *symbol, + const guint32 *ids, + guint nids) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto *item = real_cache->get_item_by_name_mut(symbol, false); + + if (item == nullptr) { + return false; + } + + item->forbidden_ids.set_ids(ids, nids, real_cache->get_pool()); + return true; +} + +const guint32 * +rspamd_symcache_get_allowed_settings_ids(struct rspamd_symcache *cache, + const gchar *symbol, + guint *nids) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + const auto *item = real_cache->get_item_by_name(symbol, false); + return item->allowed_ids.get_ids(*nids); + +} + +const guint32 * +rspamd_symcache_get_forbidden_settings_ids(struct rspamd_symcache *cache, + const gchar *symbol, + guint *nids) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + const auto *item = real_cache->get_item_by_name(symbol, false); + return item->forbidden_ids.get_ids(*nids); +} + +void +rspamd_symcache_disable_all_symbols(struct rspamd_task *task, + struct rspamd_symcache *_cache, + guint skip_mask) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + + cache_runtime->disable_all_symbols(skip_mask); +} + +gboolean +rspamd_symcache_disable_symbol(struct rspamd_task *task, + struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_cache = C_API_SYMCACHE(cache); + + return cache_runtime->disable_symbol(task, *real_cache, symbol); +} + +gboolean +rspamd_symcache_enable_symbol(struct rspamd_task *task, + struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_cache = C_API_SYMCACHE(cache); + + return cache_runtime->enable_symbol(task, *real_cache, symbol); +} + +gboolean +rspamd_symcache_is_checked(struct rspamd_task *task, + struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_cache = C_API_SYMCACHE(cache); + + return cache_runtime->is_symbol_checked(*real_cache, symbol); +} + +gboolean +rspamd_symcache_process_settings(struct rspamd_task *task, + struct rspamd_symcache *cache) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_cache = C_API_SYMCACHE(cache); + + return cache_runtime->process_settings(task, *real_cache); +} + +gboolean +rspamd_symcache_is_item_allowed(struct rspamd_task *task, + struct rspamd_symcache_item *item, + gboolean exec_only) +{ + auto *real_item = C_API_SYMCACHE_ITEM(item); + + return real_item->is_allowed(task, exec_only); +} + +gboolean +rspamd_symcache_is_symbol_enabled(struct rspamd_task *task, + struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_cache = C_API_SYMCACHE(cache); + + return cache_runtime->is_symbol_enabled(task, *real_cache, symbol); +} + +struct rspamd_symcache_item * +rspamd_symcache_get_cur_item(struct rspamd_task *task) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + + return (struct rspamd_symcache_item *) cache_runtime->get_cur_item(); +} + +struct rspamd_symcache_item * +rspamd_symcache_set_cur_item(struct rspamd_task *task, struct rspamd_symcache_item *item) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_item = C_API_SYMCACHE_ITEM(item); + + return (struct rspamd_symcache_item *) cache_runtime->set_cur_item(real_item); +} + +void +rspamd_symcache_enable_profile(struct rspamd_task *task) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + + cache_runtime->set_profile_mode(true); +} + +guint +rspamd_symcache_item_async_inc_full(struct rspamd_task *task, + struct rspamd_symcache_item *item, + const gchar *subsystem, + const gchar *loc) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_item = C_API_SYMCACHE_ITEM(item); + + auto *dyn_item = cache_runtime->get_dynamic_item(real_item->id, true); + msg_debug_cache_task("increase async events counter for %s(%d) = %d + 1; " + "subsystem %s (%s)", + real_item->symbol.c_str(), real_item->id, + dyn_item->async_events, subsystem, loc); + + return ++dyn_item->async_events; +} + +guint +rspamd_symcache_item_async_dec_full(struct rspamd_task *task, + struct rspamd_symcache_item *item, + const gchar *subsystem, + const gchar *loc) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_item = C_API_SYMCACHE_ITEM(item); + + auto *dyn_item = cache_runtime->get_dynamic_item(real_item->id, true); + msg_debug_cache_task("increase async events counter for %s(%d) = %d + 1; " + "subsystem %s (%s)", + real_item->symbol.c_str(), real_item->id, + dyn_item->async_events, subsystem, loc); + g_assert(dyn_item->async_events > 0); + + return --dyn_item->async_events; +} + +gboolean +rspamd_symcache_item_async_dec_check_full(struct rspamd_task *task, + struct rspamd_symcache_item *item, + const gchar *subsystem, + const gchar *loc) +{ + if (rspamd_symcache_item_async_dec_full(task, item, subsystem, loc) == 0) { + rspamd_symcache_finalize_item(task, item); + + return TRUE; + } + + return FALSE; +} + +struct rspamd_abstract_callback_data * +rspamd_symcache_get_cbdata(struct rspamd_symcache *cache, + const gchar *symbol) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + auto *item = real_cache->get_item_by_name(symbol, true); + + if (item) { + return (struct rspamd_abstract_callback_data *) item->get_cbdata(); + } + + return nullptr; +} + +void +rspamd_symcache_composites_foreach(struct rspamd_task *task, + struct rspamd_symcache *cache, + GHFunc func, + gpointer fd) +{ + auto *real_cache = C_API_SYMCACHE(cache); + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + + real_cache->composites_foreach([&](const auto *item) { + auto *dyn_item = cache_runtime->get_dynamic_item(item->id, false); + + if (dyn_item->started) { + func((void *)item->get_name().c_str(), item->get_cbdata(), fd); + dyn_item->finished = true; + } + }); +} + +gboolean +rspamd_symcache_process_symbols(struct rspamd_task *task, + struct rspamd_symcache *cache, + gint stage) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + if (task->symcache_runtime == nullptr) { + task->symcache_runtime = rspamd::symcache::symcache_runtime::create(task, *real_cache); + } + + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + return cache_runtime->process_symbols(task, *real_cache, stage); +} + +void +rspamd_symcache_finalize_item(struct rspamd_task *task, + struct rspamd_symcache_item *item) +{ + auto *cache_runtime = C_API_SYMCACHE_RUNTIME(task->symcache_runtime); + auto *real_item = C_API_SYMCACHE_ITEM(item); + + cache_runtime->finalize_item(task, real_item); +}
\ No newline at end of file diff --git a/src/libserver/symcache/symcache_id_list.hxx b/src/libserver/symcache/symcache_id_list.hxx new file mode 100644 index 000000000..10a3ba2b7 --- /dev/null +++ b/src/libserver/symcache/symcache_id_list.hxx @@ -0,0 +1,176 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RSPAMD_SYMCACHE_ID_LIST_HXX +#define RSPAMD_SYMCACHE_ID_LIST_HXX +#pragma once + +#include <cstdint> +#include <cstring> // for memset +#include <algorithm> // for sort/bsearch + +#include "config.h" +#include "libutil/mem_pool.h" + +namespace rspamd::symcache { +/* + * This structure is optimised to store ids list: + * - If the first element is -1 then use dynamic part, else use static part + * There is no std::variant to save space + */ +struct id_list { + union { + std::uint32_t st[4]; + struct { + std::uint32_t e; /* First element */ + std::uint16_t len; + std::uint16_t allocated; + std::uint32_t *n; + } dyn; + } data; + + id_list() = default; + + auto reset() + { + std::memset(&data, 0, sizeof(data)); + } + + /** + * Returns ids from a compressed list, accepting a mutable reference for number of elements + * @param nids output of the number of elements + * @return + */ + auto get_ids(unsigned &nids) const -> const std::uint32_t * + { + if (data.dyn.e == -1) { + /* Dynamic list */ + nids = data.dyn.len; + + return data.dyn.n; + } + else { + auto cnt = 0; + + while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS(data.st)) { + cnt++; + } + + nids = cnt; + + return data.st; + } + } + + auto add_id(std::uint32_t id, rspamd_mempool_t *pool) -> void + { + if (data.st[0] == -1) { + /* Dynamic array */ + if (data.dyn.len < data.dyn.allocated) { + /* Trivial, append + sort */ + data.dyn.n[data.dyn.len++] = id; + } + else { + /* Reallocate */ + g_assert(data.dyn.allocated <= G_MAXINT16); + data.dyn.allocated *= 2; + + auto *new_array = rspamd_mempool_alloc_array_type(pool, + data.dyn.allocated, std::uint32_t); + memcpy(new_array, data.dyn.n, data.dyn.len * sizeof(std::uint32_t)); + data.dyn.n = new_array; + data.dyn.n[data.dyn.len++] = id; + } + + std::sort(data.dyn.n, data.dyn.n + data.dyn.len); + } + else { + /* Static part */ + auto cnt = 0u; + while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS(data.st)) { + cnt++; + } + + if (cnt < G_N_ELEMENTS(data.st)) { + data.st[cnt] = id; + } + else { + /* Switch to dynamic */ + data.dyn.allocated = G_N_ELEMENTS(data.st) * 2; + auto *new_array = rspamd_mempool_alloc_array_type(pool, + data.dyn.allocated, std::uint32_t); + memcpy(new_array, data.st, sizeof(data.st)); + data.dyn.n = new_array; + data.dyn.e = -1; /* Marker */ + data.dyn.len = G_N_ELEMENTS(data.st); + + /* Recursively jump to dynamic branch that will handle insertion + sorting */ + add_id(id, pool); // tail call + } + } + } + + auto set_ids(const std::uint32_t *ids, std::size_t nids, rspamd_mempool_t *pool) -> void + { + if (nids <= G_N_ELEMENTS(data.st)) { + /* Use static version */ + reset(); + + for (auto i = 0; i < nids; i++) { + data.st[i] = ids[i]; + } + } + else { + /* Need to use a separate list */ + data.dyn.e = -1; /* Flag */ + data.dyn.n = rspamd_mempool_alloc_array_type(pool, nids, std::uint32_t); + data.dyn.len = nids; + data.dyn.allocated = nids; + + for (auto i = 0; i < nids; i++) { + data.dyn.n[i] = ids[i]; + } + + /* Keep sorted */ + std::sort(data.dyn.n, data.dyn.n + data.dyn.len); + } + } + + auto check_id(unsigned int id) const -> bool + { + if (data.dyn.e == -1) { + return std::binary_search(data.dyn.n, data.dyn.n + data.dyn.len, id); + } + else { + for (auto elt: data.st) { + if (elt == id) { + return true; + } + else if (elt == 0) { + return false; + } + } + } + + return false; + } +}; + +static_assert(std::is_trivial_v<id_list>); + +} + +#endif //RSPAMD_SYMCACHE_ID_LIST_HXX diff --git a/src/libserver/symcache/symcache_impl.cxx b/src/libserver/symcache/symcache_impl.cxx new file mode 100644 index 000000000..ab1b41fc4 --- /dev/null +++ b/src/libserver/symcache/symcache_impl.cxx @@ -0,0 +1,1063 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "lua/lua_common.h" +#include "symcache_internal.hxx" +#include "symcache_item.hxx" +#include "symcache_runtime.hxx" +#include "unix-std.h" +#include "libutil/cxx/locked_file.hxx" +#include "fmt/core.h" +#include "contrib/t1ha/t1ha.h" + +#include <cmath> + +namespace rspamd::symcache { + +INIT_LOG_MODULE_PUBLIC(symcache) + +auto symcache::init() -> bool +{ + auto res = true; + reload_time = cfg->cache_reload_time; + + if (cfg->cache_filename != nullptr) { + res = load_items(); + } + + /* Deal with the delayed dependencies */ + for (const auto &delayed_dep: *delayed_deps) { + auto virt_item = get_item_by_name(delayed_dep.from, false); + auto real_item = get_item_by_name(delayed_dep.from, true); + + if (virt_item == nullptr || real_item == nullptr) { + msg_err_cache("cannot register delayed dependency between %s and %s: " + "%s is missing", + delayed_dep.from.data(), + delayed_dep.to.data(), delayed_dep.from.data()); + } + else { + msg_debug_cache("delayed between %s(%d:%d) -> %s", + delayed_dep.from.data(), + real_item->id, virt_item->id, + delayed_dep.to.data()); + add_dependency(real_item->id, delayed_dep.to, virt_item != real_item ? + virt_item->id : -1); + } + } + + /* Remove delayed dependencies, as they are no longer needed at this point */ + delayed_deps.reset(); + + + /* Deal with the delayed conditions */ + for (const auto &delayed_cond: *delayed_conditions) { + auto it = get_item_by_name_mut(delayed_cond.sym, true); + + if (it == nullptr) { + msg_err_cache ( + "cannot register delayed condition for %s", + delayed_cond.sym.c_str()); + luaL_unref(delayed_cond.L, LUA_REGISTRYINDEX, delayed_cond.cbref); + } + else { + if (!it->add_condition(delayed_cond.L, delayed_cond.cbref)) { + msg_err_cache ( + "cannot register delayed condition for %s: virtual parent; qed", + delayed_cond.sym.c_str()); + g_abort(); + } + } + } + delayed_conditions.reset(); + + for (auto &it: items_by_id) { + it->process_deps(*this); + } + + /* Sorting stuff */ + auto postfilters_cmp = [](const auto &it1, const auto &it2) -> int { + if (it1->priority > it2->priority) { + return 1; + } + else if (it1->priority == it2->priority) { + return 0; + } + + return -1; + }; + auto prefilters_cmp = [](const auto &it1, const auto &it2) -> int { + if (it1->priority > it2->priority) { + return -1; + } + else if (it1->priority == it2->priority) { + return 0; + } + + return 1; + }; + + + std::stable_sort(std::begin(connfilters), std::end(connfilters), prefilters_cmp); + std::stable_sort(std::begin(prefilters), std::end(prefilters), prefilters_cmp); + std::stable_sort(std::begin(postfilters), std::end(postfilters), postfilters_cmp); + std::stable_sort(std::begin(idempotent), std::end(idempotent), postfilters_cmp); + + resort(); + + /* Connect metric symbols with symcache symbols */ + if (cfg->symbols) { + g_hash_table_foreach(cfg->symbols, + symcache::metric_connect_cb, + (void *) this); + } + + return res; +} + +auto symcache::load_items() -> bool +{ + auto cached_map = util::raii_mmaped_locked_file::mmap_shared(cfg->cache_filename, + O_RDONLY, PROT_READ); + + if (!cached_map.has_value()) { + msg_info_cache("%s", cached_map.error().c_str()); + return false; + } + + + if (cached_map->get_size() < (gint) sizeof(symcache_header)) { + msg_info_cache("cannot use file %s, truncated: %z", cfg->cache_filename, + errno, strerror(errno)); + return false; + } + + const auto *hdr = (struct symcache_header *) cached_map->get_map(); + + if (memcmp(hdr->magic, symcache_magic, + sizeof(symcache_magic)) != 0) { + msg_info_cache("cannot use file %s, bad magic", cfg->cache_filename); + + return false; + } + + auto *parser = ucl_parser_new(0); + const auto *p = (const std::uint8_t *) (hdr + 1); + + if (!ucl_parser_add_chunk(parser, p, cached_map->get_size() - sizeof(*hdr))) { + msg_info_cache ("cannot use file %s, cannot parse: %s", cfg->cache_filename, + ucl_parser_get_error(parser)); + ucl_parser_free(parser); + + return false; + } + + auto *top = ucl_parser_get_object(parser); + ucl_parser_free(parser); + + if (top == nullptr || ucl_object_type(top) != UCL_OBJECT) { + msg_info_cache ("cannot use file %s, bad object", cfg->cache_filename); + ucl_object_unref(top); + + return false; + } + + auto it = ucl_object_iterate_new(top); + const ucl_object_t *cur; + while ((cur = ucl_object_iterate_safe(it, true)) != nullptr) { + auto item_it = items_by_symbol.find(ucl_object_key(cur)); + + if (item_it != items_by_symbol.end()) { + auto item = item_it->second; + /* Copy saved info */ + /* + * XXX: don't save or load weight, it should be obtained from the + * metric + */ +#if 0 + elt = ucl_object_lookup (cur, "weight"); + + if (elt) { + w = ucl_object_todouble (elt); + if (w != 0) { + item->weight = w; + } + } +#endif + const auto *elt = ucl_object_lookup(cur, "time"); + if (elt) { + item->st->avg_time = ucl_object_todouble(elt); + } + + elt = ucl_object_lookup(cur, "count"); + if (elt) { + item->st->total_hits = ucl_object_toint(elt); + item->last_count = item->st->total_hits; + } + + elt = ucl_object_lookup(cur, "frequency"); + if (elt && ucl_object_type(elt) == UCL_OBJECT) { + const ucl_object_t *freq_elt; + + freq_elt = ucl_object_lookup(elt, "avg"); + + if (freq_elt) { + item->st->avg_frequency = ucl_object_todouble(freq_elt); + } + freq_elt = ucl_object_lookup(elt, "stddev"); + + if (freq_elt) { + item->st->stddev_frequency = ucl_object_todouble(freq_elt); + } + } + + if (item->is_virtual() && !item->is_ghost()) { + const auto &parent = item->get_parent(*this); + + if (parent) { + if (parent->st->weight < item->st->weight) { + parent->st->weight = item->st->weight; + } + } + /* + * We maintain avg_time for virtual symbols equal to the + * parent item avg_time + */ + item->st->avg_time = parent->st->avg_time; + } + + total_weight += fabs(item->st->weight); + total_hits += item->st->total_hits; + } + } + + ucl_object_iterate_free(it); + ucl_object_unref(top); + + return true; +} + +template<typename T> +static constexpr auto round_to_hundreds(T x) +{ + return (::floor(x) * 100.0) / 100.0; +} + +bool symcache::save_items() const +{ + if (cfg->cache_filename == nullptr) { + return false; + } + + auto file_sink = util::raii_file_sink::create(cfg->cache_filename, + O_WRONLY | O_TRUNC, 00644); + + if (!file_sink.has_value()) { + if (errno == EEXIST) { + /* Some other process is already writing data, give up silently */ + return false; + } + + msg_err_cache("%s", file_sink.error().c_str()); + + return false; + } + + struct symcache_header hdr; + memset(&hdr, 0, sizeof(hdr)); + memcpy(hdr.magic, symcache_magic, sizeof(symcache_magic)); + + if (write(file_sink->get_fd(), &hdr, sizeof(hdr)) == -1) { + msg_err_cache("cannot write to file %s, error %d, %s", cfg->cache_filename, + errno, strerror(errno)); + + return false; + } + + auto *top = ucl_object_typed_new(UCL_OBJECT); + + for (const auto &it: items_by_symbol) { + auto item = it.second; + auto elt = ucl_object_typed_new(UCL_OBJECT); + ucl_object_insert_key(elt, + ucl_object_fromdouble(round_to_hundreds(item->st->weight)), + "weight", 0, false); + ucl_object_insert_key(elt, + ucl_object_fromdouble(round_to_hundreds(item->st->time_counter.mean)), + "time", 0, false); + ucl_object_insert_key(elt, ucl_object_fromint(item->st->total_hits), + "count", 0, false); + + auto *freq = ucl_object_typed_new(UCL_OBJECT); + ucl_object_insert_key(freq, + ucl_object_fromdouble(round_to_hundreds(item->st->frequency_counter.mean)), + "avg", 0, false); + ucl_object_insert_key(freq, + ucl_object_fromdouble(round_to_hundreds(item->st->frequency_counter.stddev)), + "stddev", 0, false); + ucl_object_insert_key(elt, freq, "frequency", 0, false); + + ucl_object_insert_key(top, elt, it.first.data(), 0, true); + } + + auto fp = fdopen(file_sink->get_fd(), "a"); + auto *efunc = ucl_object_emit_file_funcs(fp); + auto ret = ucl_object_emit_full(top, UCL_EMIT_JSON_COMPACT, efunc, nullptr); + ucl_object_emit_funcs_free(efunc); + ucl_object_unref(top); + fclose(fp); + + return ret; +} + +auto symcache::metric_connect_cb(void *k, void *v, void *ud) -> void +{ + auto *cache = (symcache *) ud; + const auto *sym = (const char *) k; + auto *s = (struct rspamd_symbol *) v; + auto weight = *s->weight_ptr; + auto *item = cache->get_item_by_name_mut(sym, false); + + if (item) { + item->st->weight = weight; + s->cache_item = (void *) item; + } +} + + +auto symcache::get_item_by_id(int id, bool resolve_parent) const -> const cache_item * +{ + if (id < 0 || id >= items_by_id.size()) { + msg_err_cache("internal error: requested item with id %d, when we have just %d items in the cache", + id, (int) items_by_id.size()); + return nullptr; + } + + auto &ret = items_by_id[id]; + + if (!ret) { + msg_err_cache("internal error: requested item with id %d but it is empty; qed", + id); + return nullptr; + } + + if (resolve_parent && ret->is_virtual()) { + return ret->get_parent(*this); + } + + return ret.get(); +} + +auto symcache::get_item_by_name(std::string_view name, bool resolve_parent) const -> const cache_item * +{ + auto it = items_by_symbol.find(name); + + if (it == items_by_symbol.end()) { + return nullptr; + } + + if (resolve_parent && it->second->is_virtual()) { + return it->second->get_parent(*this); + } + + return it->second.get(); +} + +auto symcache::get_item_by_name_mut(std::string_view name, bool resolve_parent) const -> cache_item * +{ + auto it = items_by_symbol.find(name); + + if (it == items_by_symbol.end()) { + return nullptr; + } + + if (resolve_parent && it->second->is_virtual()) { + return (cache_item *) it->second->get_parent(*this); + } + + return it->second.get(); +} + +auto symcache::add_dependency(int id_from, std::string_view to, int virtual_id_from) -> void +{ + g_assert (id_from >= 0 && id_from < (gint) items_by_id.size()); + const auto &source = items_by_id[id_from]; + g_assert (source.get() != nullptr); + + source->deps.emplace_back(cache_item_ptr{nullptr}, + std::string(to), + id_from, + -1); + + + if (virtual_id_from >= 0) { + g_assert (virtual_id_from < (gint) items_by_id.size()); + /* We need that for settings id propagation */ + const auto &vsource = items_by_id[virtual_id_from]; + g_assert (vsource.get() != nullptr); + vsource->deps.emplace_back(cache_item_ptr{nullptr}, + std::string(to), + -1, + virtual_id_from); + } +} + +auto symcache::resort() -> void +{ + auto ord = std::make_shared<order_generation>(filters.size(), cur_order_gen); + + for (auto &it: filters) { + if (it) { + total_hits += it->st->total_hits; + it->order = 0; + ord->d.emplace_back(it); + } + } + + enum class tsort_mask { + PERM, + TEMP + }; + + constexpr auto tsort_unmask = [](cache_item *it) -> auto { + return (it->order & ~((1u << 31) | (1u << 30))); + }; + + /* Recursive topological sort helper */ + const auto tsort_visit = [&](cache_item *it, unsigned cur_order, auto &&rec) { + constexpr auto tsort_mark = [](cache_item *it, tsort_mask how) { + switch (how) { + case tsort_mask::PERM: + it->order |= (1u << 31); + break; + case tsort_mask::TEMP: + it->order |= (1u << 30); + break; + } + }; + constexpr auto tsort_is_marked = [](cache_item *it, tsort_mask how) { + switch (how) { + case tsort_mask::PERM: + return (it->order & (1u << 31)); + case tsort_mask::TEMP: + return (it->order & (1u << 30)); + } + + return 100500u; /* Because fuck compilers, that's why */ + }; + + if (tsort_is_marked(it, tsort_mask::PERM)) { + if (cur_order > tsort_unmask(it)) { + /* Need to recalculate the whole chain */ + it->order = cur_order; /* That also removes all masking */ + } + else { + /* We are fine, stop DFS */ + return; + } + } + else if (tsort_is_marked(it, tsort_mask::TEMP)) { + msg_err_cache("cyclic dependencies found when checking '%s'!", + it->symbol.c_str()); + return; + } + + tsort_mark(it, tsort_mask::TEMP); + msg_debug_cache("visiting node: %s (%d)", it->symbol.c_str(), cur_order); + + for (const auto &dep: it->deps) { + msg_debug_cache ("visiting dep: %s (%d)", dep.item->symbol.c_str(), cur_order + 1); + rec(dep.item.get(), cur_order + 1, rec); + } + + it->order = cur_order; + tsort_mark(it, tsort_mask::PERM); + }; + /* + * Topological sort + */ + total_hits = 0; + auto used_items = ord->d.size(); + + for (const auto &it: ord->d) { + if (it->order == 0) { + tsort_visit(it.get(), 0, tsort_visit); + } + } + + + /* Main sorting comparator */ + constexpr auto score_functor = [](auto w, auto f, auto t) -> auto { + auto time_alpha = 1.0, weight_alpha = 0.1, freq_alpha = 0.01; + + return ((w > 0.0 ? w : weight_alpha) * (f > 0.0 ? f : freq_alpha) / + (t > time_alpha ? t : time_alpha)); + }; + + auto cache_order_cmp = [&](const auto &it1, const auto &it2) -> auto { + auto o1 = tsort_unmask(it1.get()), o2 = tsort_unmask(it2.get()); + double w1 = 0., w2 = 0.; + + if (o1 == o2) { + /* No topological order */ + if (it1->priority == it2->priority) { + auto avg_freq = ((double) total_hits / used_items); + auto avg_weight = (total_weight / used_items); + auto f1 = (double) it1->st->total_hits / avg_freq; + auto f2 = (double) it2->st->total_hits / avg_freq; + auto weight1 = std::fabs(it1->st->weight) / avg_weight; + auto weight2 = std::fabs(it2->st->weight) / avg_weight; + auto t1 = it1->st->avg_time; + auto t2 = it2->st->avg_time; + w1 = score_functor(weight1, f1, t1); + w2 = score_functor(weight2, f2, t2); + } + else { + /* Strict sorting */ + w1 = std::abs(it1->priority); + w2 = std::abs(it2->priority); + } + } + else { + w1 = o1; + w2 = o2; + } + + if (w2 > w1) { + return 1; + } + else if (w2 < w1) { + return -1; + } + + return 0; + }; + + std::stable_sort(std::begin(ord->d), std::end(ord->d), cache_order_cmp); + /* + * Here lives some ugly legacy! + * We have several filters classes, connfilters, prefilters, filters... etc + * + * Our order is meaningful merely for filters, but we have to add other classes + * to understand if those symbols are checked or disabled. + * We can disable symbols for almost everything but not for virtual symbols. + * The rule of thumb is that if a symbol has explicit parent, then it is a + * virtual symbol that follows it's special rules + */ + + /* + * We enrich ord with all other symbol types without any sorting, + * as it is done in another place + */ + constexpr auto append_items_vec = [](const auto &vec, auto &out) { + for (const auto &it: vec) { + if (it) { + out.emplace_back(it); + } + } + }; + + append_items_vec(connfilters, ord->d); + append_items_vec(prefilters, ord->d); + append_items_vec(postfilters, ord->d); + append_items_vec(idempotent, ord->d); + append_items_vec(composites, ord->d); + append_items_vec(classifiers, ord->d); + + /* After sorting is done, we can assign all elements in the by_symbol hash */ + for (auto i = 0; i < ord->size(); i++) { + const auto &it = ord->d[i]; + ord->by_symbol[it->get_name()] = i; + ord->by_cache_id[it->id] = i; + } + /* Finally set the current order */ + std::swap(ord, items_by_order); +} + +auto symcache::add_symbol_with_callback(std::string_view name, + int priority, + symbol_func_t func, + void *user_data, + enum rspamd_symbol_type flags_and_type) -> int +{ + auto real_type_pair_maybe = item_type_from_c(flags_and_type); + + if (!real_type_pair_maybe.has_value()) { + msg_err_cache("incompatible flags when adding %s: %s", name.data(), + real_type_pair_maybe.error().c_str()); + return -1; + } + + auto real_type_pair = real_type_pair_maybe.value(); + + if (real_type_pair.first != symcache_item_type::FILTER) { + real_type_pair.second |= SYMBOL_TYPE_NOSTAT; + } + if (real_type_pair.second & (SYMBOL_TYPE_GHOST | SYMBOL_TYPE_CALLBACK)) { + real_type_pair.second |= SYMBOL_TYPE_NOSTAT; + } + + if (real_type_pair.first == symcache_item_type::VIRTUAL) { + msg_err_cache("trying to add virtual symbol %s as real (no parent)", name.data()); + return -1; + } + + if ((real_type_pair.second & SYMBOL_TYPE_FINE) && priority == 0) { + /* Adjust priority for negative weighted symbols */ + priority = 1; + } + + std::string static_string_name; + + if (name.empty()) { + static_string_name = fmt::format("AUTO_{}", (void *) func); + } + else { + static_string_name = name; + } + + if (items_by_symbol.contains(static_string_name)) { + msg_err_cache("duplicate symbol name: %s", static_string_name.data()); + return -1; + } + + auto id = items_by_id.size(); + + auto item = cache_item::create_with_function(static_pool, id, + std::move(static_string_name), + priority, func, user_data, + real_type_pair.first, real_type_pair.second); + + items_by_symbol[item->get_name()] = item; + get_item_specific_vector(*item).push_back(item); + items_by_id.push_back(item); + + if (!(real_type_pair.second & SYMBOL_TYPE_NOSTAT)) { + cksum = t1ha(name.data(), name.size(), cksum); + stats_symbols_count++; + } + + return id; +} + +auto symcache::add_virtual_symbol(std::string_view name, int parent_id, enum rspamd_symbol_type flags_and_type) -> int +{ + if (name.empty()) { + msg_err_cache("cannot register a virtual symbol with no name; qed"); + return -1; + } + + auto real_type_pair_maybe = item_type_from_c(flags_and_type); + + if (!real_type_pair_maybe.has_value()) { + msg_err_cache("incompatible flags when adding %s: %s", name.data(), + real_type_pair_maybe.error().c_str()); + return -1; + } + + auto real_type_pair = real_type_pair_maybe.value(); + + if (items_by_symbol.contains(name)) { + msg_err_cache("duplicate symbol name: %s", name.data()); + return -1; + } + + auto id = items_by_id.size(); + + auto item = cache_item::create_with_virtual(static_pool, + id, + std::string{name}, + parent_id, real_type_pair.first, real_type_pair.second); + items_by_symbol[item->get_name()] = item; + get_item_specific_vector(*item).push_back(item); + items_by_id.push_back(item); + + return id; +} + +auto symcache::set_peak_cb(int cbref) -> void +{ + if (peak_cb != -1) { + luaL_unref(L, LUA_REGISTRYINDEX, peak_cb); + } + + peak_cb = cbref; + msg_info_cache("registered peak callback"); +} + +auto symcache::add_delayed_condition(std::string_view sym, int cbref) -> void +{ + delayed_conditions->emplace_back(sym, cbref, (lua_State *) cfg->lua_state); +} + +auto symcache::validate(bool strict) -> bool +{ + total_weight = 1.0; + + for (auto &pair: items_by_symbol) { + auto &item = pair.second; + auto ghost = item->st->weight == 0 ? true : false; + auto skipped = !ghost; + + if (item->is_scoreable() && g_hash_table_lookup(cfg->symbols, item->symbol.c_str()) == nullptr) { + if (!std::isnan(cfg->unknown_weight)) { + item->st->weight = cfg->unknown_weight; + auto *s = rspamd_mempool_alloc0_type(static_pool, + struct rspamd_symbol); + /* Legit as we actually never modify this data */ + s->name = (char *) item->symbol.c_str(); + s->weight_ptr = &item->st->weight; + g_hash_table_insert(cfg->symbols, (void *) s->name, (void *) s); + + msg_info_cache ("adding unknown symbol %s with weight: %.2f", + item->symbol.c_str(), cfg->unknown_weight); + ghost = false; + skipped = false; + } + else { + skipped = true; + } + } + else { + skipped = false; + } + + if (!ghost && skipped) { + if (!(item->flags & SYMBOL_TYPE_SKIPPED)) { + item->flags |= SYMBOL_TYPE_SKIPPED; + msg_warn_cache("symbol %s has no score registered, skip its check", + item->symbol.c_str()); + } + } + + if (ghost) { + msg_debug_cache ("symbol %s is registered as ghost symbol, it won't be inserted " + "to any metric", item->symbol.c_str()); + } + + if (item->st->weight < 0 && item->priority == 0) { + item->priority++; + } + + if (item->is_virtual()) { + if (!(item->flags & SYMBOL_TYPE_GHOST)) { + auto *parent = const_cast<cache_item *>(item->get_parent(*this)); + + if (parent == nullptr) { + item->resolve_parent(*this); + parent = const_cast<cache_item *>(item->get_parent(*this)); + } + + if (::fabs(parent->st->weight) < ::fabs(item->st->weight)) { + parent->st->weight = item->st->weight; + } + + auto p1 = ::abs(item->priority); + auto p2 = ::abs(parent->priority); + + if (p1 != p2) { + parent->priority = MAX(p1, p2); + item->priority = parent->priority; + } + } + } + + total_weight += fabs(item->st->weight); + } + + /* Now check each metric item and find corresponding symbol in a cache */ + auto ret = true; + GHashTableIter it; + void *k, *v; + g_hash_table_iter_init(&it, cfg->symbols); + + while (g_hash_table_iter_next(&it, &k, &v)) { + auto ignore_symbol = false; + auto sym_def = (struct rspamd_symbol *) v; + + if (sym_def && (sym_def->flags & + (RSPAMD_SYMBOL_FLAG_IGNORE_METRIC | RSPAMD_SYMBOL_FLAG_DISABLED))) { + ignore_symbol = true; + } + + if (!ignore_symbol) { + if (!items_by_symbol.contains((const char *) k)) { + msg_warn_cache ( + "symbol '%s' has its score defined but there is no " + "corresponding rule registered", + k); + if (strict) { + ret = FALSE; + } + } + } + else if (sym_def->flags & RSPAMD_SYMBOL_FLAG_DISABLED) { + auto item = get_item_by_name_mut((const char *) k, false); + + if (item) { + item->enabled = FALSE; + } + } + } + + return ret; +} + +auto symcache::counters() const -> ucl_object_t * +{ + auto *top = ucl_object_typed_new(UCL_ARRAY); + constexpr const auto round_float = [](const auto x, const int digits) -> auto { + const auto power10 = ::pow(10, digits); + return (::floor(x * power10) / power10); + }; + + for (auto &pair: items_by_symbol) { + auto &item = pair.second; + auto symbol = pair.first; + + auto *obj = ucl_object_typed_new(UCL_OBJECT); + ucl_object_insert_key(obj, ucl_object_fromlstring(symbol.data(), symbol.size()), + "symbol", 0, false); + + if (item->is_virtual()) { + if (!(item->flags & SYMBOL_TYPE_GHOST)) { + const auto *parent = item->get_parent(*this); + ucl_object_insert_key(obj, + ucl_object_fromdouble(round_float(item->st->weight, 3)), + "weight", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromdouble(round_float(parent->st->avg_frequency, 3)), + "frequency", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromint(parent->st->total_hits), + "hits", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromdouble(round_float(parent->st->avg_time, 3)), + "time", 0, false); + } + else { + ucl_object_insert_key(obj, + ucl_object_fromdouble(round_float(item->st->weight, 3)), + "weight", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromdouble(0.0), + "frequency", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromdouble(0.0), + "hits", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromdouble(0.0), + "time", 0, false); + } + } + else { + ucl_object_insert_key(obj, + ucl_object_fromdouble(round_float(item->st->weight, 3)), + "weight", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromdouble(round_float(item->st->avg_frequency, 3)), + "frequency", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromint(item->st->total_hits), + "hits", 0, false); + ucl_object_insert_key(obj, + ucl_object_fromdouble(round_float(item->st->avg_time, 3)), + "time", 0, false); + } + + ucl_array_append(top, obj); + } + + return top; +} + +auto symcache::periodic_resort(struct ev_loop *ev_loop, double cur_time, double last_resort) -> void +{ + for (const auto &item: filters) { + + if (item->update_counters_check_peak(L, ev_loop, cur_time, last_resort)) { + auto cur_value = (item->st->total_hits - item->last_count) / + (cur_time - last_resort); + auto cur_err = (item->st->avg_frequency - cur_value); + cur_err *= cur_err; + msg_debug_cache ("peak found for %s is %.2f, avg: %.2f, " + "stddev: %.2f, error: %.2f, peaks: %d", + item->symbol.c_str(), cur_value, + item->st->avg_frequency, + item->st->stddev_frequency, + cur_err, + item->frequency_peaks); + + if (peak_cb != -1) { + struct ev_loop **pbase; + + lua_rawgeti(L, LUA_REGISTRYINDEX, peak_cb); + pbase = (struct ev_loop **) lua_newuserdata(L, sizeof(*pbase)); + *pbase = ev_loop; + rspamd_lua_setclass(L, "rspamd{ev_base}", -1); + lua_pushlstring(L, item->symbol.c_str(), item->symbol.size()); + lua_pushnumber(L, item->st->avg_frequency); + lua_pushnumber(L, ::sqrt(item->st->stddev_frequency)); + lua_pushnumber(L, cur_value); + lua_pushnumber(L, cur_err); + + if (lua_pcall(L, 6, 0, 0) != 0) { + msg_info_cache ("call to peak function for %s failed: %s", + item->symbol.c_str(), lua_tostring(L, -1)); + lua_pop (L, 1); + } + } + } + } +} + +symcache::~symcache() +{ + if (peak_cb != -1) { + luaL_unref(L, LUA_REGISTRYINDEX, peak_cb); + } +} + +auto symcache::maybe_resort() -> bool +{ + if (items_by_order->generation_id != cur_order_gen) { + /* + * Cache has been modified, need to resort it + */ + msg_info_cache("symbols cache has been modified since last check:" + " old id: %ud, new id: %ud", + items_by_order->generation_id, cur_order_gen); + resort(); + + return true; + } + + return false; +} + +auto +symcache::get_item_specific_vector(const cache_item &it) -> symcache::items_ptr_vec & +{ + switch (it.get_type()) { + case symcache_item_type::CONNFILTER: + return connfilters; + case symcache_item_type::FILTER: + return filters; + case symcache_item_type::IDEMPOTENT: + return idempotent; + case symcache_item_type::PREFILTER: + return prefilters; + case symcache_item_type::POSTFILTER: + return postfilters; + case symcache_item_type::COMPOSITE: + return composites; + case symcache_item_type::CLASSIFIER: + return classifiers; + case symcache_item_type::VIRTUAL: + return virtual_symbols; + } + + RSPAMD_UNREACHABLE; +} + +auto +symcache::process_settings_elt(struct rspamd_config_settings_elt *elt) -> void +{ + + auto id = elt->id; + + if (elt->symbols_disabled) { + /* Process denied symbols */ + ucl_object_iter_t iter = nullptr; + const ucl_object_t *cur; + + while ((cur = ucl_object_iterate(elt->symbols_disabled, &iter, true)) != NULL) { + const auto *sym = ucl_object_key(cur); + auto *item = get_item_by_name_mut(sym, false); + + if (item != nullptr) { + if (item->is_virtual()) { + /* + * Virtual symbols are special: + * we ignore them in symcache but prevent them from being + * inserted. + */ + item->forbidden_ids.add_id(id, static_pool); + msg_debug_cache("deny virtual symbol %s for settings %ud (%s); " + "parent can still be executed", + sym, id, elt->name); + } + else { + /* Normal symbol, disable it */ + item->forbidden_ids.add_id(id, static_pool); + msg_debug_cache ("deny symbol %s for settings %ud (%s)", + sym, id, elt->name); + } + } + else { + msg_warn_cache ("cannot find a symbol to disable %s " + "when processing settings %ud (%s)", + sym, id, elt->name); + } + } + } + + if (elt->symbols_enabled) { + ucl_object_iter_t iter = nullptr; + const ucl_object_t *cur; + + while ((cur = ucl_object_iterate (elt->symbols_enabled, &iter, true)) != nullptr) { + /* Here, we resolve parent and explicitly allow it */ + const auto *sym = ucl_object_key(cur); + + auto *item = get_item_by_name_mut(sym, false); + + if (item != nullptr) { + if (item->is_virtual()) { + if (!(item->flags & SYMBOL_TYPE_GHOST)) { + auto *parent = get_item_by_name_mut(sym, true); + + if (parent) { + if (elt->symbols_disabled && + ucl_object_lookup(elt->symbols_disabled, parent->symbol.data())) { + msg_err_cache ("conflict in %s: cannot enable disabled symbol %s, " + "wanted to enable symbol %s", + elt->name, parent->symbol.data(), sym); + continue; + } + + parent->exec_only_ids.add_id(id, static_pool); + msg_debug_cache ("allow just execution of symbol %s for settings %ud (%s)", + parent->symbol.data(), id, elt->name); + } + } + /* Ignore ghosts */ + } + + item->allowed_ids.add_id(id, static_pool); + msg_debug_cache ("allow execution of symbol %s for settings %ud (%s)", + sym, id, elt->name); + } + else { + msg_warn_cache ("cannot find a symbol to enable %s " + "when processing settings %ud (%s)", + sym, id, elt->name); + } + } + } +} + +}
\ No newline at end of file diff --git a/src/libserver/symcache/symcache_internal.hxx b/src/libserver/symcache/symcache_internal.hxx new file mode 100644 index 000000000..00c0d4d8b --- /dev/null +++ b/src/libserver/symcache/symcache_internal.hxx @@ -0,0 +1,464 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal C++ structures and classes for symcache + */ + +#ifndef RSPAMD_SYMCACHE_INTERNAL_HXX +#define RSPAMD_SYMCACHE_INTERNAL_HXX +#pragma once + +#include <cmath> +#include <cstdlib> +#include <cstdint> +#include <utility> +#include <vector> +#include <string> +#include <string_view> +#include <memory> +#include <variant> + +#include "rspamd_symcache.h" +#include "contrib/libev/ev.h" +#include "contrib/robin-hood/robin_hood.h" +#include "contrib/expected/expected.hpp" +#include "cfg_file.h" + +#include "symcache_id_list.hxx" + +#define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ + "symcache", log_tag(), \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +#define msg_err_cache_task(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ + "symcache", task->task_pool->tag.uid, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ + "symcache", log_tag(), \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +#define msg_info_cache(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ + "symcache", log_tag(), \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +#define msg_debug_cache(...) rspamd_conditional_debug_fast (NULL, NULL, \ + ::rspamd::symcache::rspamd_symcache_log_id, "symcache", log_tag(), \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) +#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \ + ::rspamd::symcache::rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) + +struct lua_State; + +namespace rspamd::symcache { + +/* Defined in symcache_impl.cxx */ +extern int rspamd_symcache_log_id; + +static const std::uint8_t symcache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0}; + +struct symcache_header { + std::uint8_t magic[8]; + unsigned int nitems; + std::uint8_t checksum[64]; + std::uint8_t unused[128]; +}; + +struct cache_item; +using cache_item_ptr = std::shared_ptr<cache_item>; + +/** + * This structure is intended to keep the current ordering for all symbols + * It is designed to be shared among all tasks and keep references to the real + * symbols. + * If some symbol has been added or removed to the symbol cache, it will not affect + * the current order, and it will only be regenerated for the subsequent tasks. + * This allows safe and no copy sharing and keeping track of all symbols in the + * cache runtime. + */ +struct order_generation { + /* All items ordered */ + std::vector<cache_item_ptr> d; + /* Mapping from symbol name to the position in the order array */ + robin_hood::unordered_flat_map<std::string_view, unsigned int> by_symbol; + /* Mapping from symbol id to the position in the order array */ + robin_hood::unordered_flat_map<unsigned int, unsigned int> by_cache_id; + /* It matches cache->generation_id; if not, a fresh ordering is required */ + unsigned int generation_id; + + explicit order_generation(std::size_t nelts, unsigned id) : generation_id(id) { + d.reserve(nelts); + by_symbol.reserve(nelts); + by_cache_id.reserve(nelts); + } + + auto size() const -> auto { return d.size(); } +}; + +using order_generation_ptr = std::shared_ptr<order_generation>; + + +struct delayed_cache_dependency { + std::string from; + std::string to; + + delayed_cache_dependency(std::string_view _from, std::string_view _to) : from(_from), to(_to) {} +}; + +struct delayed_cache_condition { + std::string sym; + int cbref; + lua_State *L; +public: + delayed_cache_condition(std::string_view _sym, int _cbref, lua_State *_L) : + sym(_sym), cbref(_cbref), L(_L) {} +}; + +class symcache { +private: + using items_ptr_vec = std::vector<cache_item_ptr>; + /* Map indexed by symbol name: all symbols must have unique names, so this map holds ownership */ + robin_hood::unordered_flat_map<std::string_view, cache_item_ptr> items_by_symbol; + items_ptr_vec items_by_id; + + /* Items sorted into some order */ + order_generation_ptr items_by_order; + unsigned int cur_order_gen; + + /* Specific vectors for execution/iteration */ + items_ptr_vec connfilters; + items_ptr_vec prefilters; + items_ptr_vec filters; + items_ptr_vec postfilters; + items_ptr_vec composites; + items_ptr_vec idempotent; + items_ptr_vec classifiers; + items_ptr_vec virtual_symbols; + + /* These are stored within pointer to clean up after init */ + std::unique_ptr<std::vector<delayed_cache_dependency>> delayed_deps; + std::unique_ptr<std::vector<delayed_cache_condition>> delayed_conditions; + + rspamd_mempool_t *static_pool; + std::uint64_t cksum; + double total_weight; + std::size_t stats_symbols_count; + +private: + std::uint64_t total_hits; + + struct rspamd_config *cfg; + lua_State *L; + double reload_time; + double last_profile; + +private: + int peak_cb; + int cache_id; + +private: + /* Internal methods */ + auto load_items() -> bool; + auto resort() -> void; + auto get_item_specific_vector(const cache_item &) -> items_ptr_vec&; + /* Helper for g_hash_table_foreach */ + static auto metric_connect_cb(void *k, void *v, void *ud) -> void; + +public: + explicit symcache(struct rspamd_config *cfg) : cfg(cfg) { + /* XXX: do we need a special pool for symcache? I don't think so */ + static_pool = cfg->cfg_pool; + reload_time = cfg->cache_reload_time; + total_hits = 1; + total_weight = 1.0; + cksum = 0xdeadbabe; + peak_cb = -1; + cache_id = rspamd_random_uint64_fast(); + L = (lua_State *)cfg->lua_state; + delayed_conditions = std::make_unique<std::vector<delayed_cache_condition>>(); + delayed_deps = std::make_unique<std::vector<delayed_cache_dependency>>(); + } + + virtual ~symcache(); + + /** + * Saves items on disk (if possible) + * @return + */ + auto save_items() const -> bool; + + /** + * Get an item by ID + * @param id + * @param resolve_parent + * @return + */ + auto get_item_by_id(int id, bool resolve_parent) const -> const cache_item *; + /** + * Get an item by it's name + * @param name + * @param resolve_parent + * @return + */ + auto get_item_by_name(std::string_view name, bool resolve_parent) const -> const cache_item *; + /** + * Get an item by it's name, mutable pointer + * @param name + * @param resolve_parent + * @return + */ + auto get_item_by_name_mut(std::string_view name, bool resolve_parent) const -> cache_item *; + + /** + * Add a direct dependency + * @param id_from + * @param to + * @param virtual_id_from + * @return + */ + auto add_dependency(int id_from, std::string_view to, int virtual_id_from) -> void; + + /** + * Add a delayed dependency between symbols that will be resolved on the init stage + * @param from + * @param to + */ + auto add_delayed_dependency(std::string_view from, std::string_view to) -> void { + if (!delayed_deps) { + delayed_deps = std::make_unique<std::vector<delayed_cache_dependency>>(); + } + + delayed_deps->emplace_back(from, to); + } + + /** + * Initialises the symbols cache, must be called after all symbols are added + * and the config file is loaded + */ + auto init() -> bool; + + /** + * Log helper that returns cfg checksum + * @return + */ + auto log_tag() const -> const char* { + return cfg->checksum; + } + + /** + * Helper to return a memory pool associated with the cache + * @return + */ + auto get_pool() const { + return static_pool; + } + + /** + * A method to add a generic symbol with a callback to couple with C API + * @param name name of the symbol, unlike C API it must be "" for callback only (compat) symbols, in this case an automatic name is generated + * @param priority + * @param func + * @param user_data + * @param flags_and_type mix of flags and type in a messy C enum + * @return id of a new symbol or -1 in case of failure + */ + auto add_symbol_with_callback(std::string_view name, + int priority, + symbol_func_t func, + void *user_data, + enum rspamd_symbol_type flags_and_type) -> int; + /** + * A method to add a generic virtual symbol with no function associated + * @param name must have some value, or a fatal error will strike you + * @param parent_id if this param is -1 then this symbol is associated with nothing + * @param flags_and_type mix of flags and type in a messy C enum + * @return id of a new symbol or -1 in case of failure + */ + auto add_virtual_symbol(std::string_view name, int parent_id, + enum rspamd_symbol_type flags_and_type) -> int; + + /** + * Sets a lua callback to be called on peaks in execution time + * @param cbref + */ + auto set_peak_cb(int cbref) -> void; + + /** + * Add a delayed condition for a symbol that might not be registered yet + * @param sym + * @param cbref + */ + auto add_delayed_condition(std::string_view sym, int cbref) -> void; + + /** + * Returns number of symbols that needs to be checked in statistical algorithm + * @return + */ + auto get_stats_symbols_count() const { + return stats_symbols_count; + } + + /** + * Returns a checksum for the cache + * @return + */ + auto get_cksum() const { + return cksum; + } + + /** + * Validate symbols in the cache + * @param strict + * @return + */ + auto validate(bool strict) -> bool; + + /** + * Returns counters for the cache + * @return + */ + auto counters() const -> ucl_object_t *; + + /** + * Adjusts stats of the cache for the periodic counter + */ + auto periodic_resort(struct ev_loop *ev_loop, double cur_time, double last_resort) -> void; + + /** + * A simple helper to get the reload time + * @return + */ + auto get_reload_time() const { return reload_time; }; + + /** + * Iterate over all symbols using a specific functor + * @tparam Functor + * @param f + */ + template<typename Functor> + auto symbols_foreach(Functor f) -> void { + for (const auto &sym_it : items_by_symbol) { + f(sym_it.second.get()); + } + } + + /** + * Iterate over all composites using a specific functor + * @tparam Functor + * @param f + */ + template<typename Functor> + auto composites_foreach(Functor f) -> void { + for (const auto &sym_it : composites) { + f(sym_it.get()); + } + } + + /** + * Iterate over all composites using a specific functor + * @tparam Functor + * @param f + */ + template<typename Functor> + auto connfilters_foreach(Functor f) -> bool { + return std::all_of(std::begin(connfilters), std::end(connfilters), + [&](const auto &sym_it){ + return f(sym_it.get()); + }); + } + template<typename Functor> + auto prefilters_foreach(Functor f) -> bool { + return std::all_of(std::begin(prefilters), std::end(prefilters), + [&](const auto &sym_it){ + return f(sym_it.get()); + }); + } + template<typename Functor> + auto postfilters_foreach(Functor f) -> bool { + return std::all_of(std::begin(postfilters), std::end(postfilters), + [&](const auto &sym_it){ + return f(sym_it.get()); + }); + } + template<typename Functor> + auto idempotent_foreach(Functor f) -> bool { + return std::all_of(std::begin(idempotent), std::end(idempotent), + [&](const auto &sym_it){ + return f(sym_it.get()); + }); + } + template<typename Functor> + auto filters_foreach(Functor f) -> bool { + return std::all_of(std::begin(filters), std::end(filters), + [&](const auto &sym_it){ + return f(sym_it.get()); + }); + } + + /** + * Resort cache if anything has been changed since last time + * @return + */ + auto maybe_resort() -> bool; + + /** + * Returns number of items with ids + * @return + */ + auto get_items_count() const -> auto { + return items_by_id.size(); + } + + /** + * Returns current set of items ordered for sharing ownership + * @return + */ + auto get_cache_order() const -> auto { + return items_by_order; + } + + /** + * Get last profile timestamp + * @return + */ + auto get_last_profile() const -> auto { + return last_profile; + } + + /** + * Sets last profile timestamp + * @param last_profile + * @return + */ + auto set_last_profile(double last_profile){ + symcache::last_profile = last_profile; + } + + /** + * Process settings elt identified by id + * @param elt + */ + auto process_settings_elt(struct rspamd_config_settings_elt *elt) -> void; +}; + + +} // namespace rspamd + +#endif //RSPAMD_SYMCACHE_INTERNAL_HXX diff --git a/src/libserver/symcache/symcache_item.cxx b/src/libserver/symcache/symcache_item.cxx new file mode 100644 index 000000000..f78605b13 --- /dev/null +++ b/src/libserver/symcache/symcache_item.cxx @@ -0,0 +1,500 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "lua/lua_common.h" +#include "symcache_internal.hxx" +#include "symcache_item.hxx" +#include "fmt/core.h" +#include "libserver/task.h" + +namespace rspamd::symcache { + +auto cache_item::get_parent(const symcache &cache) const -> const cache_item * +{ + if (is_virtual()) { + const auto &virtual_sp = std::get<virtual_item>(specific); + + return virtual_sp.get_parent(cache); + } + + return nullptr; +} + +auto cache_item::get_parent_mut(const symcache &cache) -> cache_item * +{ + if (is_virtual()) { + auto &virtual_sp = std::get<virtual_item>(specific); + + return virtual_sp.get_parent_mut(cache); + } + + return nullptr; +} + +auto cache_item::process_deps(const symcache &cache) -> void +{ + /* Allow logging macros to work */ + auto log_tag = [&]() { return cache.log_tag(); }; + + for (auto &dep: deps) { + msg_debug_cache ("process real dependency %s on %s", symbol.c_str(), dep.sym.c_str()); + auto *dit = cache.get_item_by_name_mut(dep.sym, true); + + if (dep.vid >= 0) { + /* Case of the virtual symbol that depends on another (maybe virtual) symbol */ + const auto *vdit = cache.get_item_by_name(dep.sym, false); + + if (!vdit) { + if (dit) { + msg_err_cache("cannot add dependency from %s on %s: no dependency symbol registered", + dep.sym.c_str(), dit->symbol.c_str()); + } + } + else { + msg_debug_cache("process virtual dependency %s(%d) on %s(%d)", symbol.c_str(), + dep.vid, vdit->symbol.c_str(), vdit->id); + + unsigned nids = 0; + + /* Propagate ids */ + msg_debug_cache("check id propagation for dependency %s from %s", + symbol.c_str(), dit->symbol.c_str()); + + const auto *ids = dit->allowed_ids.get_ids(nids); + + if (nids > 0) { + msg_debug_cache("propagate allowed ids from %s to %s", + dit->symbol.c_str(), symbol.c_str()); + + allowed_ids.set_ids(ids, nids, cache.get_pool()); + } + + ids = dit->forbidden_ids.get_ids(nids); + + if (nids > 0) { + msg_debug_cache("propagate forbidden ids from %s to %s", + dit->symbol.c_str(), symbol.c_str()); + + forbidden_ids.set_ids(ids, nids, cache.get_pool()); + } + } + } + + if (dit != nullptr) { + if (!dit->is_filter()) { + /* + * Check sanity: + * - filters -> prefilter dependency is OK and always satisfied + * - postfilter -> (filter, prefilter) dep is ok + * - idempotent -> (any) dep is OK + * + * Otherwise, emit error + * However, even if everything is fine this dep is useless ¯\_(ツ)_/¯ + */ + auto ok_dep = false; + + if (dit->get_type() == type) { + ok_dep = true; + } + else if (type < dit->get_type()) { + ok_dep = true; + } + + if (!ok_dep) { + msg_err_cache ("cannot add dependency from %s on %s: invalid symbol types", + dep.sym.c_str(), symbol.c_str()); + + continue; + } + } + else { + if (dit->id == id) { + msg_err_cache ("cannot add dependency on self: %s -> %s " + "(resolved to %s)", + symbol.c_str(), dep.sym.c_str(), dit->symbol.c_str()); + } + else { + /* Create a reverse dep */ + if (is_virtual()) { + auto *parent = get_parent_mut(cache); + + if (parent) { + dit->rdeps.emplace_back(parent->getptr(), dep.sym, parent->id, -1); + dep.item = dit->getptr(); + dep.id = dit->id; + + msg_debug_cache ("added reverse dependency from %d on %d", parent->id, + dit->id); + } + } + else { + dep.item = dit->getptr(); + dep.id = dit->id; + dit->rdeps.emplace_back(getptr(), dep.sym, id, -1); + msg_debug_cache ("added reverse dependency from %d on %d", id, + dit->id); + } + } + } + } + else if (dep.id >= 0) { + msg_err_cache ("cannot find dependency on symbol %s for symbol %s", + dep.sym.c_str(), symbol.c_str()); + + continue; + } + } + + // Remove empty deps + deps.erase(std::remove_if(std::begin(deps), std::end(deps), + [](const auto &dep) { return !dep.item; }), std::end(deps)); +} + +auto cache_item::resolve_parent(const symcache &cache) -> bool +{ + auto log_tag = [&]() { return cache.log_tag(); }; + + if (is_virtual()) { + auto &virt = std::get<virtual_item>(specific); + + if (virt.get_parent(cache)) { + msg_warn_cache("trying to resolve parent twice for %s", symbol.c_str()); + + return false; + } + + return virt.resolve_parent(cache); + } + else { + msg_warn_cache("trying to resolve a parent for non-virtual symbol %s", symbol.c_str()); + } + + return false; +} + +auto cache_item::update_counters_check_peak(lua_State *L, + struct ev_loop *ev_loop, + double cur_time, + double last_resort) -> bool +{ + auto ret = false; + static const double decay_rate = 0.25; + + st->total_hits += st->hits; + g_atomic_int_set(&st->hits, 0); + + if (last_count > 0) { + auto cur_value = (st->total_hits - last_count) / + (cur_time - last_resort); + rspamd_set_counter_ema(&st->frequency_counter, + cur_value, decay_rate); + st->avg_frequency = st->frequency_counter.mean; + st->stddev_frequency = st->frequency_counter.stddev; + + auto cur_err = (st->avg_frequency - cur_value); + cur_err *= cur_err; + + if (st->frequency_counter.number > 10 && + cur_err > ::sqrt(st->stddev_frequency) * 3) { + frequency_peaks++; + ret = true; + } + } + + last_count = st->total_hits; + + if (cd->number > 0) { + if (!is_virtual()) { + st->avg_time = cd->mean; + rspamd_set_counter_ema(&st->time_counter, + st->avg_time, decay_rate); + st->avg_time = st->time_counter.mean; + memset(cd, 0, sizeof(*cd)); + } + } + + return ret; +} + +auto cache_item::get_type_str() const -> const char * +{ + switch (type) { + case symcache_item_type::CONNFILTER: + return "connfilter"; + case symcache_item_type::FILTER: + return "filter"; + case symcache_item_type::IDEMPOTENT: + return "idempotent"; + case symcache_item_type::PREFILTER: + return "prefilter"; + case symcache_item_type::POSTFILTER: + return "postfilter"; + case symcache_item_type::COMPOSITE: + return "composite"; + case symcache_item_type::CLASSIFIER: + return "classifier"; + case symcache_item_type::VIRTUAL: + return "virtual"; + } + + RSPAMD_UNREACHABLE; +} + +auto cache_item::is_allowed(struct rspamd_task *task, bool exec_only) const -> bool +{ + const auto *what = "execution"; + + if (!exec_only) { + what = "symbol insertion"; + } + + /* Static checks */ + if (!enabled || + (RSPAMD_TASK_IS_EMPTY(task) && !(flags & SYMBOL_TYPE_EMPTY)) || + (flags & SYMBOL_TYPE_MIME_ONLY && !RSPAMD_TASK_IS_MIME(task))) { + + if (!enabled) { + msg_debug_cache_task("skipping %s of %s as it is permanently disabled", + what, symbol.c_str()); + + return false; + } + else { + /* + * If we check merely execution (not insertion), then we disallow + * mime symbols for non mime tasks and vice versa + */ + if (exec_only) { + msg_debug_cache_task("skipping check of %s as it cannot be " + "executed for this task type", + symbol.c_str()); + + return FALSE; + } + } + } + + /* Settings checks */ + if (task->settings_elt != nullptr) { + if (forbidden_ids.check_id(task->settings_elt->id)) { + msg_debug_cache_task ("deny %s of %s as it is forbidden for " + "settings id %ud", + what, + symbol.c_str(), + task->settings_elt->id); + + return false; + } + + if (!(flags & SYMBOL_TYPE_EXPLICIT_DISABLE)) { + if (allowed_ids.check_id(task->settings_elt->id)) { + + if (task->settings_elt->policy == RSPAMD_SETTINGS_POLICY_IMPLICIT_ALLOW) { + msg_debug_cache_task("allow execution of %s settings id %ud " + "allows implicit execution of the symbols;", + symbol.c_str(), + id); + + return true; + } + + if (exec_only) { + /* + * Special case if any of our virtual children are enabled + */ + if (exec_only_ids.check_id(task->settings_elt->id)) { + return true; + } + } + + msg_debug_cache_task ("deny %s of %s as it is not listed " + "as allowed for settings id %ud", + what, + symbol.c_str(), + task->settings_elt->id); + return false; + } + } + else { + msg_debug_cache_task ("allow %s of %s for " + "settings id %ud as it can be only disabled explicitly", + what, + symbol.c_str(), + task->settings_elt->id); + } + } + else if (flags & SYMBOL_TYPE_EXPLICIT_ENABLE) { + msg_debug_cache_task ("deny %s of %s as it must be explicitly enabled", + what, + symbol.c_str()); + return false; + } + + /* Allow all symbols with no settings id */ + return true; +} + +auto virtual_item::get_parent(const symcache &cache) const -> const cache_item * +{ + if (parent) { + return parent.get(); + } + + return cache.get_item_by_id(parent_id, false); +} + +auto virtual_item::get_parent_mut(const symcache &cache) -> cache_item * +{ + if (parent) { + return parent.get(); + } + + return const_cast<cache_item *>(cache.get_item_by_id(parent_id, false)); +} + +auto virtual_item::resolve_parent(const symcache &cache) -> bool +{ + if (parent) { + return false; + } + + auto item_ptr = cache.get_item_by_id(parent_id, true); + + if (item_ptr) { + parent = const_cast<cache_item *>(item_ptr)->getptr(); + + return true; + } + + return false; +} + +auto item_type_from_c(enum rspamd_symbol_type type) -> tl::expected<std::pair<symcache_item_type, int>, std::string> +{ + constexpr const auto trivial_types = SYMBOL_TYPE_CONNFILTER | SYMBOL_TYPE_PREFILTER + | SYMBOL_TYPE_POSTFILTER | SYMBOL_TYPE_IDEMPOTENT + | SYMBOL_TYPE_COMPOSITE | SYMBOL_TYPE_CLASSIFIER + | SYMBOL_TYPE_VIRTUAL; + + constexpr auto all_but_one_ty = [&](int type, int exclude_bit) -> auto { + return (type & trivial_types) & (trivial_types & ~exclude_bit); + }; + + if (type & trivial_types) { + auto check_trivial = [&](auto flag, + symcache_item_type ty) -> tl::expected<std::pair<symcache_item_type, int>, std::string> { + if (all_but_one_ty(type, flag)) { + return tl::make_unexpected(fmt::format("invalid flags for a symbol: {}", type)); + } + + return std::make_pair(ty, type & ~flag); + }; + if (type & SYMBOL_TYPE_CONNFILTER) { + return check_trivial(SYMBOL_TYPE_CONNFILTER, symcache_item_type::CONNFILTER); + } + else if (type & SYMBOL_TYPE_PREFILTER) { + return check_trivial(SYMBOL_TYPE_PREFILTER, symcache_item_type::PREFILTER); + } + else if (type & SYMBOL_TYPE_POSTFILTER) { + return check_trivial(SYMBOL_TYPE_POSTFILTER, symcache_item_type::POSTFILTER); + } + else if (type & SYMBOL_TYPE_IDEMPOTENT) { + return check_trivial(SYMBOL_TYPE_IDEMPOTENT, symcache_item_type::IDEMPOTENT); + } + else if (type & SYMBOL_TYPE_COMPOSITE) { + return check_trivial(SYMBOL_TYPE_COMPOSITE, symcache_item_type::COMPOSITE); + } + else if (type & SYMBOL_TYPE_CLASSIFIER) { + return check_trivial(SYMBOL_TYPE_CLASSIFIER, symcache_item_type::CLASSIFIER); + } + else if (type & SYMBOL_TYPE_VIRTUAL) { + return check_trivial(SYMBOL_TYPE_VIRTUAL, symcache_item_type::VIRTUAL); + } + + return tl::make_unexpected(fmt::format("internal error: impossible flags combination", type)); + } + + /* Maybe check other flags combination here? */ + return std::make_pair(symcache_item_type::FILTER, type); +} + +bool operator<(symcache_item_type lhs, symcache_item_type rhs) +{ + auto ret = false; + switch (lhs) { + case symcache_item_type::CONNFILTER: + break; + case symcache_item_type::PREFILTER: + if (rhs == symcache_item_type::CONNFILTER) { + ret = true; + } + break; + case symcache_item_type::FILTER: + if (rhs == symcache_item_type::CONNFILTER || rhs == symcache_item_type::PREFILTER) { + ret = true; + } + break; + case symcache_item_type::POSTFILTER: + if (rhs != symcache_item_type::IDEMPOTENT) { + ret = true; + } + break; + case symcache_item_type::IDEMPOTENT: + default: + break; + } + + return ret; +} + +item_condition::~item_condition() +{ + if (cb != -1 && L != nullptr) { + luaL_unref(L, LUA_REGISTRYINDEX, cb); + } +} + +auto item_condition::check(std::string_view sym_name, struct rspamd_task *task) const -> bool +{ + if (cb != -1 && L != nullptr) { + auto ret = false; + + lua_rawgeti(L, LUA_REGISTRYINDEX, cb); + + lua_pushcfunction (L, &rspamd_lua_traceback); + auto err_idx = lua_gettop(L); + + auto **ptask = (struct rspamd_task **) lua_newuserdata(L, sizeof(struct rspamd_task *)); + rspamd_lua_setclass(L, "rspamd{task}", -1); + *ptask = task; + + if (lua_pcall(L, 1, 1, err_idx) != 0) { + msg_info_task("call to condition for %s failed: %s", + sym_name.data(), lua_tostring(L, -1)); + } + else { + ret = lua_toboolean(L, -1); + } + + lua_settop(L, err_idx - 1); + + return ret; + } + + return true; +} + +} diff --git a/src/libserver/symcache/symcache_item.hxx b/src/libserver/symcache/symcache_item.hxx new file mode 100644 index 000000000..1d0cd7e35 --- /dev/null +++ b/src/libserver/symcache/symcache_item.hxx @@ -0,0 +1,432 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RSPAMD_SYMCACHE_ITEM_HXX +#define RSPAMD_SYMCACHE_ITEM_HXX + +#pragma once + +#include <utility> +#include <vector> +#include <string> +#include <string_view> +#include <memory> +#include <variant> +#include <algorithm> + +#include "rspamd_symcache.h" +#include "symcache_id_list.hxx" +#include "contrib/expected/expected.hpp" +#include "contrib/libev/ev.h" + +namespace rspamd::symcache { + +class symcache; +struct cache_item; +using cache_item_ptr = std::shared_ptr<cache_item>; + +enum class symcache_item_type { + CONNFILTER, /* Executed on connection stage */ + PREFILTER, /* Executed before all filters */ + FILTER, /* Normal symbol with a callback */ + POSTFILTER, /* Executed after all filters */ + IDEMPOTENT, /* Executed after postfilters, cannot change results */ + CLASSIFIER, /* A virtual classifier symbol */ + COMPOSITE, /* A virtual composite symbol */ + VIRTUAL, /* A virtual symbol... */ +}; + +/* + * Compare item types: earlier stages symbols are > than later stages symbols + * Order for virtual stuff is not defined. + */ +bool operator<(symcache_item_type lhs, symcache_item_type rhs); + +constexpr static auto item_type_to_str(symcache_item_type t) -> const char * { + switch(t) { + case symcache_item_type::CONNFILTER: + return "connfilter"; + case symcache_item_type::PREFILTER: + return "prefilter"; + case symcache_item_type::FILTER: + return "filter"; + case symcache_item_type::POSTFILTER: + return "postfilter"; + case symcache_item_type::IDEMPOTENT: + return "idempotent"; + case symcache_item_type::CLASSIFIER: + return "classifier"; + case symcache_item_type::COMPOSITE: + return "composite"; + case symcache_item_type::VIRTUAL: + return "virtual"; + } +} + +/** + * This is a public helper to convert a legacy C type to a more static type + * @param type input type as a C enum + * @return pair of type safe symcache_item_type + the remaining flags or an error + */ +auto item_type_from_c(enum rspamd_symbol_type type) -> tl::expected<std::pair<symcache_item_type, int>, std::string>; + +struct item_condition { +private: + lua_State *L; + int cb; +public: + item_condition(lua_State *_L, int _cb) : L(_L), cb(_cb) {} + virtual ~item_condition(); + + auto check(std::string_view sym_name, struct rspamd_task *task) const -> bool; +}; + +class normal_item { +private: + symbol_func_t func; + void *user_data; + std::vector<item_condition> conditions; +public: + explicit normal_item(symbol_func_t _func, void *_user_data) : func(_func), user_data(_user_data) + { + } + + auto add_condition(lua_State *L, int cbref) -> void + { + conditions.emplace_back(L, cbref); + } + + auto call(struct rspamd_task *task, struct rspamd_symcache_item *item) const -> void + { + func(task, item, user_data); + } + + auto check_conditions(std::string_view sym_name, struct rspamd_task *task) const -> bool { + return std::all_of(std::begin(conditions), std::end(conditions), + [&](const auto &cond) { return cond.check(sym_name, task); }); + } + + auto get_cbdata() const -> auto { + return user_data; + } +}; + +class virtual_item { +private: + int parent_id; + cache_item_ptr parent; +public: + explicit virtual_item(int _parent_id) : parent_id(_parent_id) + { + } + + auto get_parent(const symcache &cache) const -> const cache_item *; + auto get_parent_mut(const symcache &cache) -> cache_item *; + + auto resolve_parent(const symcache &cache) -> bool; +}; + +struct cache_dependency { + cache_item_ptr item; /* Real dependency */ + std::string sym; /* Symbolic dep name */ + int id; /* Real from */ + int vid; /* Virtual from */ +public: + /* Default piecewise constructor */ + cache_dependency(cache_item_ptr _item, std::string _sym, int _id, int _vid) : + item(std::move(_item)), sym(std::move(_sym)), id(_id), vid(_vid) + { + } +}; + +struct cache_item : std::enable_shared_from_this<cache_item> { + /* This block is likely shared */ + struct rspamd_symcache_item_stat *st = nullptr; + struct rspamd_counter_data *cd = nullptr; + + /* Unique id - counter */ + int id; + std::uint64_t last_count = 0; + std::string symbol; + symcache_item_type type; + int flags; + + /* Condition of execution */ + bool enabled = true; + + /* Priority */ + int priority = 0; + /* Topological order */ + unsigned int order = 0; + int frequency_peaks = 0; + + /* Specific data for virtual and callback symbols */ + std::variant<normal_item, virtual_item> specific; + + /* Settings ids */ + id_list allowed_ids{}; + /* Allows execution but not symbols insertion */ + id_list exec_only_ids{}; + id_list forbidden_ids{}; + + /* Dependencies */ + std::vector<cache_dependency> deps; + /* Reverse dependencies */ + std::vector<cache_dependency> rdeps; + +public: + /** + * Create a normal item with a callback + * @param name + * @param priority + * @param func + * @param user_data + * @param type + * @param flags + * @return + */ + [[nodiscard]] static auto create_with_function(rspamd_mempool_t *pool, + int id, + std::string &&name, + int priority, + symbol_func_t func, + void *user_data, + symcache_item_type type, + int flags) -> cache_item_ptr + { + return std::shared_ptr<cache_item>(new cache_item(pool, + id, std::move(name), priority, + func, user_data, + type, flags)); + } + + /** + * Create a virtual item + * @param name + * @param priority + * @param parent + * @param type + * @param flags + * @return + */ + [[nodiscard]] static auto create_with_virtual(rspamd_mempool_t *pool, + int id, + std::string &&name, + int parent, + symcache_item_type type, + int flags) -> cache_item_ptr + { + return std::shared_ptr<cache_item>(new cache_item(pool, id, std::move(name), + parent, type, flags)); + } + + /** + * Share ownership on the item + * @return + */ + auto getptr() -> cache_item_ptr + { + return shared_from_this(); + } + + /** + * Process and resolve dependencies for the item + * @param cache + */ + auto process_deps(const symcache &cache) -> void; + + auto is_virtual() const -> bool + { + return std::holds_alternative<virtual_item>(specific); + } + + auto is_filter() const -> bool + { + return std::holds_alternative<normal_item>(specific) && + (type == symcache_item_type::FILTER); + } + + /** + * Returns true if a symbol should have some score defined + * @return + */ + auto is_scoreable() const -> bool + { + return !(flags & SYMBOL_TYPE_CALLBACK) && + ((type == symcache_item_type::FILTER) || + is_virtual() || + (type == symcache_item_type::COMPOSITE) || + (type == symcache_item_type::CLASSIFIER)); + } + + auto is_ghost() const -> bool + { + return flags & SYMBOL_TYPE_GHOST; + } + + auto get_parent(const symcache &cache) const -> const cache_item *; + auto get_parent_mut(const symcache &cache) -> cache_item *; + + auto resolve_parent(const symcache &cache) -> bool; + + auto get_type() const -> auto + { + return type; + } + + auto get_type_str() const -> const char*; + + auto get_name() const -> const std::string & + { + return symbol; + } + + auto get_flags() const -> auto { + return flags; + }; + + auto add_condition(lua_State *L, int cbref) -> bool + { + if (!is_virtual()) { + auto &normal = std::get<normal_item>(specific); + normal.add_condition(L, cbref); + + return true; + } + + return false; + } + + auto update_counters_check_peak(lua_State *L, + struct ev_loop *ev_loop, + double cur_time, + double last_resort) -> bool; + + /** + * Increase frequency for a symbol + */ + auto inc_frequency() -> void { + g_atomic_int_inc(&st->hits); + } + + /** + * Check if an item is allowed to be executed not checking item conditions + * @param task + * @param exec_only + * @return + */ + auto is_allowed(struct rspamd_task *task, bool exec_only) const -> bool; + + /** + * Returns callback data + * @return + */ + auto get_cbdata() const -> void * { + if (std::holds_alternative<normal_item>(specific)) { + const auto &filter_data = std::get<normal_item>(specific); + + return filter_data.get_cbdata(); + } + + return nullptr; + } + + /** + * Check all conditions for an item + * @param task + * @return + */ + auto check_conditions(struct rspamd_task *task) const -> auto { + if (std::holds_alternative<normal_item>(specific)) { + const auto &filter_data = std::get<normal_item>(specific); + + return filter_data.check_conditions(symbol, task); + } + + return false; + } + + auto call(struct rspamd_task *task) const -> void { + if (std::holds_alternative<normal_item>(specific)) { + const auto &filter_data = std::get<normal_item>(specific); + + filter_data.call(task, (struct rspamd_symcache_item *)this); + } + } + +private: + /** + * Constructor for a normal symbols with callback + * @param name + * @param _priority + * @param func + * @param user_data + * @param _type + * @param _flags + */ + cache_item(rspamd_mempool_t *pool, + int _id, + std::string &&name, + int _priority, + symbol_func_t func, + void *user_data, + symcache_item_type _type, + int _flags) : id(_id), + symbol(std::move(name)), + type(_type), + flags(_flags), + priority(_priority), + specific(normal_item{func, user_data}) + { + /* These structures are kept trivial, so they need to be explicitly reset */ + forbidden_ids.reset(); + allowed_ids.reset(); + exec_only_ids.reset(); + st = rspamd_mempool_alloc0_shared_type(pool, std::remove_pointer_t<decltype(st)>); + cd = rspamd_mempool_alloc0_shared_type(pool, std::remove_pointer_t<decltype(cd)>); + } + + /** + * Constructor for a virtual symbol + * @param name + * @param _priority + * @param parent + * @param _type + * @param _flags + */ + cache_item(rspamd_mempool_t *pool, + int _id, + std::string &&name, + int parent, + symcache_item_type _type, + int _flags) : id(_id), + symbol(std::move(name)), + type(_type), + flags(_flags), + specific(virtual_item{parent}) + { + /* These structures are kept trivial, so they need to be explicitly reset */ + forbidden_ids.reset(); + allowed_ids.reset(); + exec_only_ids.reset(); + st = rspamd_mempool_alloc0_shared_type(pool, std::remove_pointer_t<decltype(st)>); + cd = rspamd_mempool_alloc0_shared_type(pool, std::remove_pointer_t<decltype(cd)>); + } +}; + +} + +#endif //RSPAMD_SYMCACHE_ITEM_HXX diff --git a/src/libserver/symcache/symcache_periodic.hxx b/src/libserver/symcache/symcache_periodic.hxx new file mode 100644 index 000000000..2719fca25 --- /dev/null +++ b/src/libserver/symcache/symcache_periodic.hxx @@ -0,0 +1,90 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef RSPAMD_SYMCACHE_PERIODIC_HXX +#define RSPAMD_SYMCACHE_PERIODIC_HXX + +#pragma once + +#include "config.h" +#include "contrib/libev/ev.h" +#include "symcache_internal.hxx" +#include "worker_util.h" + +namespace rspamd::symcache { +struct cache_refresh_cbdata { +private: + + symcache *cache; + struct ev_loop *event_loop; + struct rspamd_worker *w; + double reload_time; + double last_resort; + ev_timer resort_ev; + +public: + explicit cache_refresh_cbdata(symcache *_cache, + struct ev_loop *_ev_base, + struct rspamd_worker *_w) + : cache(_cache), event_loop(_ev_base), w(_w) + { + auto log_tag = [&]() { return cache->log_tag(); }; + last_resort = rspamd_get_ticks(TRUE); + reload_time = cache->get_reload_time(); + auto tm = rspamd_time_jitter(reload_time, 0); + msg_debug_cache("next reload in %.2f seconds", tm); + ev_timer_init (&resort_ev, cache_refresh_cbdata::resort_cb, + tm, tm); + resort_ev.data = (void *) this; + ev_timer_start(event_loop, &resort_ev); + rspamd_mempool_add_destructor(cache->get_pool(), + cache_refresh_cbdata::refresh_dtor, (void *) this); + } + + static void refresh_dtor(void *d) + { + auto *cbdata = (struct cache_refresh_cbdata *) d; + delete cbdata; + } + + static void resort_cb(EV_P_ ev_timer *w, int _revents) { + auto *cbdata = (struct cache_refresh_cbdata *)w->data; + static const double decay_rate = 0.25; + + auto log_tag = [&]() { return cbdata->cache->log_tag(); }; + + if (rspamd_worker_is_primary_controller(cbdata->w)) { + /* Plan new event */ + auto tm = rspamd_time_jitter(cbdata->reload_time, 0); + msg_debug_cache("resort symbols cache, next reload in %.2f seconds", tm); + cbdata->resort_ev.repeat = tm; + ev_timer_again(EV_A_ w); + auto cur_time = rspamd_get_ticks(FALSE); + cbdata->cache->periodic_resort(cbdata->event_loop, cur_time, cbdata->last_resort); + cbdata->last_resort = cur_time; + } + } + +private: + ~cache_refresh_cbdata() + { + ev_timer_stop(event_loop, &resort_ev); + } +}; +} + +#endif //RSPAMD_SYMCACHE_PERIODIC_HXX diff --git a/src/libserver/symcache/symcache_runtime.cxx b/src/libserver/symcache/symcache_runtime.cxx new file mode 100644 index 000000000..7ee8b9cd9 --- /dev/null +++ b/src/libserver/symcache/symcache_runtime.cxx @@ -0,0 +1,829 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "symcache_internal.hxx" +#include "symcache_item.hxx" +#include "symcache_runtime.hxx" +#include "libutil/cxx/util.hxx" +#include "libserver/task.h" +#include "libmime/scan_result.h" +#include "libserver/worker_util.h" +#include <limits> +#include <cmath> + +namespace rspamd::symcache { + +/* At least once per minute */ +constexpr static const auto PROFILE_MAX_TIME = 60.0; +/* For messages larger than 2Mb enable profiling */ +constexpr static const auto PROFILE_MESSAGE_SIZE_THRESHOLD = 1024ul * 1024 * 2; +/* Enable profile at least once per this amount of messages processed */ +constexpr static const auto PROFILE_PROBABILITY = 0.01; + +auto +symcache_runtime::create(struct rspamd_task *task, symcache &cache) -> symcache_runtime * +{ + cache.maybe_resort(); + + auto &&cur_order = cache.get_cache_order(); + auto *checkpoint = (symcache_runtime *) rspamd_mempool_alloc0 (task->task_pool, + sizeof(symcache_runtime) + + sizeof(struct cache_dynamic_item) * cur_order->size()); + + checkpoint->order = cache.get_cache_order(); + rspamd_mempool_add_destructor(task->task_pool, + symcache_runtime::savepoint_dtor, checkpoint); + + for (auto &pair: checkpoint->last_id_mappings) { + pair.first = -1; + pair.second = -1; + } + + /* Calculate profile probability */ + ev_now_update_if_cheap(task->event_loop); + ev_tstamp now = ev_now(task->event_loop); + checkpoint->profile_start = now; + + if ((cache.get_last_profile() == 0.0 || now > cache.get_last_profile() + PROFILE_MAX_TIME) || + (task->msg.len >= PROFILE_MESSAGE_SIZE_THRESHOLD) || + (rspamd_random_double_fast() >= (1 - PROFILE_PROBABILITY))) { + msg_debug_cache_task("enable profiling of symbols for task"); + checkpoint->profile = true; + cache.set_last_profile(now); + } + + task->symcache_runtime = (void *) checkpoint; + + return checkpoint; +} + +auto +symcache_runtime::process_settings(struct rspamd_task *task, const symcache &cache) -> bool +{ + if (!task->settings) { + msg_err_task("`process_settings` is called with no settings"); + return false; + } + + const auto *wl = ucl_object_lookup(task->settings, "whitelist"); + + if (wl != nullptr) { + msg_info_task("task is whitelisted"); + task->flags |= RSPAMD_TASK_FLAG_SKIP; + return true; + } + + auto already_disabled = false; + + auto process_group = [&](const ucl_object_t *gr_obj, auto functor) -> void { + ucl_object_iter_t it = nullptr; + const ucl_object_t *cur; + + if (gr_obj) { + while ((cur = ucl_iterate_object(gr_obj, &it, true)) != nullptr) { + if (ucl_object_type(cur) == UCL_STRING) { + auto *gr = (struct rspamd_symbols_group *) + g_hash_table_lookup(task->cfg->groups, + ucl_object_tostring(cur)); + + if (gr) { + GHashTableIter gr_it; + void *k, *v; + g_hash_table_iter_init(&gr_it, gr->symbols); + + while (g_hash_table_iter_next(&gr_it, &k, &v)) { + functor((const char *) k); + } + } + } + } + } + }; + + ucl_object_iter_t it = nullptr; + const ucl_object_t *cur; + + const auto *enabled = ucl_object_lookup(task->settings, "symbols_enabled"); + + if (enabled) { + /* Disable all symbols but selected */ + disable_all_symbols(SYMBOL_TYPE_EXPLICIT_DISABLE); + already_disabled = true; + it = nullptr; + + while ((cur = ucl_iterate_object(enabled, &it, true)) != nullptr) { + enable_symbol(task, cache, ucl_object_tostring(cur)); + } + } + + /* Enable groups of symbols */ + enabled = ucl_object_lookup(task->settings, "groups_enabled"); + if (enabled && !already_disabled) { + disable_all_symbols(SYMBOL_TYPE_EXPLICIT_DISABLE); + } + process_group(enabled, [&](const char *sym) { + enable_symbol(task, cache, sym); + }); + + const auto *disabled = ucl_object_lookup(task->settings, "symbols_disabled"); + + if (disabled) { + it = nullptr; + + while ((cur = ucl_iterate_object (disabled, &it, true)) != nullptr) { + disable_symbol(task, cache, ucl_object_tostring(cur)); + } + } + + /* Disable groups of symbols */ + disabled = ucl_object_lookup(task->settings, "groups_disabled"); + process_group(disabled, [&](const char *sym) { + disable_symbol(task, cache, sym); + }); + + return false; +} + +auto symcache_runtime::disable_all_symbols(int skip_mask) -> void +{ + for (auto i = 0; i < order->size(); i++) { + auto *dyn_item = &dynamic_items[i]; + const auto &item = order->d[i]; + + if (!(item->get_flags() & skip_mask)) { + dyn_item->finished = true; + dyn_item->started = true; + } + } +} + +auto +symcache_runtime::disable_symbol(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool +{ + const auto *item = cache.get_item_by_name(name, true); + + if (item != nullptr) { + + auto *dyn_item = get_dynamic_item(item->id, false); + + if (dyn_item) { + dyn_item->finished = true; + dyn_item->started = true; + msg_debug_cache_task("disable execution of %s", name.data()); + + return true; + } + else { + msg_debug_cache_task("cannot disable %s: id not found %d", name.data(), item->id); + } + } + else { + msg_debug_cache_task("cannot disable %s: symbol not found", name.data()); + } + + return false; +} + +auto +symcache_runtime::enable_symbol(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool +{ + const auto *item = cache.get_item_by_name(name, true); + + if (item != nullptr) { + + auto *dyn_item = get_dynamic_item(item->id, false); + + if (dyn_item) { + dyn_item->finished = false; + dyn_item->started = false; + msg_debug_cache_task("enable execution of %s", name.data()); + + return true; + } + else { + msg_debug_cache_task("cannot enable %s: id not found %d", name.data(), item->id); + } + } + else { + msg_debug_cache_task("cannot enable %s: symbol not found", name.data()); + } + + return false; +} + +auto +symcache_runtime::is_symbol_checked(const symcache &cache, std::string_view name) -> bool +{ + const auto *item = cache.get_item_by_name(name, true); + + if (item != nullptr) { + + auto *dyn_item = get_dynamic_item(item->id, true); + + if (dyn_item) { + return dyn_item->started; + } + } + + return false; +} + +auto +symcache_runtime::is_symbol_enabled(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool +{ + + const auto *item = cache.get_item_by_name(name, true); + if (item) { + + if (!item->is_allowed(task, true)) { + return false; + } + else { + auto *dyn_item = get_dynamic_item(item->id, true); + + if (dyn_item) { + if (dyn_item->started) { + /* Already started */ + return false; + } + + if (!item->is_virtual()) { + return std::get<normal_item>(item->specific).check_conditions(item->symbol, task); + } + } + else { + /* Unknown item */ + msg_debug_cache_task("cannot enable %s: symbol not found", name.data()); + } + } + } + + return true; +} + +auto symcache_runtime::get_dynamic_item(int id, bool save_in_cache) const -> cache_dynamic_item * +{ + /* Lookup in cache */ + for (const auto &cache_id: last_id_mappings) { + if (cache_id.first == -1) { + break; + } + if (cache_id.first == id) { + auto *dyn_item = &dynamic_items[cache_id.second]; + + return dyn_item; + } + } + + /* Not found in the cache, do a hash lookup */ + auto our_id_maybe = rspamd::find_map(order->by_cache_id, id); + + if (our_id_maybe) { + auto *dyn_item = &dynamic_items[our_id_maybe.value()]; + + if (!save_in_cache) { + return dyn_item; + } + + /* Insert in the cache, swapping the first item with the last empty item */ + auto first_known = last_id_mappings[0]; + last_id_mappings[0].first = id; + last_id_mappings[0].second = our_id_maybe.value(); + + if (first_known.first != -1) { + /* This loop is guaranteed to finish as we have just inserted one item */ + + constexpr const auto cache_size = sizeof(last_id_mappings) / sizeof(last_id_mappings[0]); + int i = cache_size - 1; + for (;; --i) { + if (last_id_mappings[i].first != -1) { + if (i < cache_size - 1) { + i++; + } + break; + } + } + + last_id_mappings[i] = first_known; + } + + return dyn_item; + } + + return nullptr; +} + +auto symcache_runtime::process_symbols(struct rspamd_task *task, symcache &cache, int stage) -> bool +{ + msg_debug_cache_task("symbols processing stage at pass: %d", stage); + + if (RSPAMD_TASK_IS_SKIPPED(task)) { + return true; + } + + switch (stage) { + case RSPAMD_TASK_STAGE_CONNFILTERS: + case RSPAMD_TASK_STAGE_PRE_FILTERS: + case RSPAMD_TASK_STAGE_POST_FILTERS: + case RSPAMD_TASK_STAGE_IDEMPOTENT: + return process_pre_postfilters(task, cache, + rspamd_session_events_pending(task->s), stage); + break; + + case RSPAMD_TASK_STAGE_FILTERS: + return process_filters(task, cache, rspamd_session_events_pending(task->s)); + break; + + default: + g_assert_not_reached (); + } +} + +auto +symcache_runtime::process_pre_postfilters(struct rspamd_task *task, + symcache &cache, + int start_events, + int stage) -> bool +{ + auto saved_priority = std::numeric_limits<int>::min(); + auto all_done = true; + auto compare_functor = +[](int a, int b) { return a < b; }; + + auto proc_func = [&](cache_item *item) { + auto dyn_item = get_dynamic_item(item->id, true); + + if (!dyn_item->started && !dyn_item->finished) { + if (has_slow) { + /* Delay */ + has_slow = false; + + return false; + } + + if (saved_priority == std::numeric_limits<int>::min()) { + saved_priority = item->priority; + } + else { + if (compare_functor(item->priority, saved_priority) && + rspamd_session_events_pending(task->s) > start_events) { + /* + * Delay further checks as we have higher + * priority filters to be processed + */ + return false; + } + } + + process_symbol(task, cache, item, dyn_item); + all_done = false; + } + + /* Continue processing */ + return true; + }; + + switch (stage) { + case RSPAMD_TASK_STAGE_CONNFILTERS: + all_done = cache.connfilters_foreach(proc_func); + break; + case RSPAMD_TASK_STAGE_PRE_FILTERS: + all_done = cache.prefilters_foreach(proc_func); + break; + case RSPAMD_TASK_STAGE_POST_FILTERS: + compare_functor = +[](int a, int b) { return a > b; }; + all_done = cache.postfilters_foreach(proc_func); + break; + case RSPAMD_TASK_STAGE_IDEMPOTENT: + compare_functor = +[](int a, int b) { return a > b; }; + all_done = cache.idempotent_foreach(proc_func); + break; + default: + g_error("invalid invocation"); + break; + } + + return all_done; +} + +auto +symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool +{ + auto all_done = true; + + for (const auto [idx, item] : rspamd::enumerate(order->d)) { + if (item->type == symcache_item_type::CLASSIFIER) { + continue; + } + + auto dyn_item = &dynamic_items[idx]; + + if (!dyn_item->started && !dyn_item->finished) { + all_done = false; + + if (!check_item_deps(task, cache, item.get(), + dyn_item, false)) { + msg_debug_cache_task("blocked execution of %d(%s) unless deps are " + "resolved", item->id, item->symbol.c_str()); + + break; + } + + process_symbol(task, cache, item.get(), dyn_item); + + if (has_slow) { + /* Delay */ + has_slow = false; + + break; + } + } + + if (!(item->flags & SYMBOL_TYPE_FINE)) { + if (check_metric_limit(task)) { + msg_info_task ("task has already scored more than %.2f, so do " + "not " + "plan more checks", + rs->score); + all_done = true; + break; + } + } + } + + return all_done; +} + +auto +symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, cache_item *item, + cache_dynamic_item *dyn_item) -> bool +{ + if (item->type == symcache_item_type::CLASSIFIER || item->type == symcache_item_type::COMPOSITE) { + /* Classifiers are special :( */ + return true; + } + + if (rspamd_session_blocked(task->s)) { + /* + * We cannot add new events as session is either destroyed or + * being cleaned up. + */ + return true; + } + + g_assert (!item->is_virtual()); + if (dyn_item->started) { + /* + * This can actually happen when deps span over different layers + */ + return dyn_item->finished; + } + + /* Check has been started */ + dyn_item->started = true; + auto check = true; + + if (!item->is_allowed(task, true) || !item->check_conditions(task)) { + check = false; + } + + if (check) { + msg_debug_cache_task("execute %s, %d; symbol type = %s", item->symbol.data(), + item->id, item_type_to_str(item->type)); + + if (profile) { + ev_now_update_if_cheap(task->event_loop); + dyn_item->start_msec = (ev_now(task->event_loop) - + profile_start) * 1e3; + } + + dyn_item->async_events = 0; + cur_item = item; + items_inflight++; + /* Callback now must finalize itself */ + item->call(task); + cur_item = NULL; + + if (items_inflight == 0) { + return true; + } + + if (dyn_item->async_events == 0 && !dyn_item->finished) { + msg_err_cache_task("critical error: item %s has no async events pending, " + "but it is not finalised", item->symbol.data()); + g_assert_not_reached (); + } + + return false; + } + else { + dyn_item->finished = true; + } + + return true; +} + +auto +symcache_runtime::check_metric_limit(struct rspamd_task *task) -> bool +{ + if (task->flags & RSPAMD_TASK_FLAG_PASS_ALL) { + return false; + } + + if (lim == 0.0) { + auto *res = task->result; + + if (res) { + auto ms = rspamd_task_get_required_score(task, res); + + if (!std::isnan(ms) && lim < ms) { + rs = res; + lim = ms; + } + } + } + + if (rs) { + + if (rs->score > lim) { + return true; + } + } + else { + /* No reject score define, always check all rules */ + lim = -1; + } + + return false; +} + +auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache, cache_item *item, + cache_dynamic_item *dyn_item, bool check_only) -> bool +{ + constexpr const auto max_recursion = 20; + + auto inner_functor = [&](int recursion, cache_item *item, cache_dynamic_item *dyn_item, auto rec_functor) -> bool { + if (recursion > max_recursion) { + msg_err_task ("cyclic dependencies: maximum check level %ud exceed when " + "checking dependencies for %s", max_recursion, item->symbol.c_str()); + + return true; + } + + auto ret = true; + + if (!item->deps.empty()) { + + for (const auto &dep: item->deps) { + if (!dep.item) { + /* Assume invalid deps as done */ + msg_debug_cache_task("symbol %d(%s) has invalid dependencies on %d(%s)", + item->id, item->symbol.c_str(), dep.id, dep.sym.c_str()); + continue; + } + + auto *dep_dyn_item = get_dynamic_item(dep.item->id, true); + + if (!dep_dyn_item->finished) { + if (!dep_dyn_item->started) { + /* Not started */ + if (!check_only) { + if (!rec_functor(recursion + 1, + dep.item.get(), + dep_dyn_item, + rec_functor)) { + + ret = false; + msg_debug_cache_task("delayed dependency %d(%s) for " + "symbol %d(%s)", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + } + else if (!process_symbol(task, cache, dep.item.get(), dep_dyn_item)) { + /* Now started, but has events pending */ + ret = false; + msg_debug_cache_task("started check of %d(%s) symbol " + "as dep for " + "%d(%s)", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + } + else { + msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is " + "already processed", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + } + } + else { + msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) " + "cannot be started now", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + ret = false; + } + } + else { + /* Started but not finished */ + msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is " + "still executing", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + ret = false; + } + } + else { + msg_debug_cache_task("dependency %d(%s) for symbol %d(%s) is already " + "checked", + dep.id, dep.sym.c_str(), item->id, item->symbol.c_str()); + } + } + } + + return ret; + }; + + return inner_functor(0, item, dyn_item, inner_functor); +} + + +struct rspamd_symcache_delayed_cbdata { + cache_item *item; + struct rspamd_task *task; + symcache_runtime *runtime; + struct rspamd_async_event *event; + struct ev_timer tm; +}; + +static void +rspamd_symcache_delayed_item_fin(gpointer ud) +{ + auto *cbd = (struct rspamd_symcache_delayed_cbdata *) ud; + + cbd->runtime->unset_slow(); + ev_timer_stop(cbd->task->event_loop, &cbd->tm); +} + +static void +rspamd_symcache_delayed_item_cb(EV_P_ ev_timer *w, int what) +{ + auto *cbd = (struct rspamd_symcache_delayed_cbdata *) w->data; + + cbd->event = NULL; + + /* Timer will be stopped here */ + rspamd_session_remove_event (cbd->task->s, + rspamd_symcache_delayed_item_fin, cbd); + cbd->runtime->process_item_rdeps(cbd->task, cbd->item); + +} + +static void +rspamd_delayed_timer_dtor(gpointer d) +{ + auto *cbd = (struct rspamd_symcache_delayed_cbdata *) d; + + if (cbd->event) { + /* Event has not been executed */ + rspamd_session_remove_event (cbd->task->s, + rspamd_symcache_delayed_item_fin, cbd); + cbd->event = nullptr; + } +} + +auto +symcache_runtime::finalize_item(struct rspamd_task *task, cache_item *item) -> void +{ + /* Limit to consider a rule as slow (in milliseconds) */ + constexpr const gdouble slow_diff_limit = 300; + /* Sanity checks */ + g_assert (items_inflight > 0); + auto *dyn_item = get_dynamic_item(item->id, false); + + if (dyn_item->async_events > 0) { + /* + * XXX: Race condition + * + * It is possible that some async event is still in flight, but we + * already know its result, however, it is the responsibility of that + * event to decrease async events count and call this function + * one more time + */ + msg_debug_cache_task("postpone finalisation of %s(%d) as there are %d " + "async events pending", + item->symbol.c_str(), item->id, dyn_item->async_events); + + return; + } + + msg_debug_cache_task("process finalize for item %s(%d)", item->symbol.c_str(), item->id); + dyn_item->finished = true; + items_inflight--; + cur_item = nullptr; + + auto enable_slow_timer = [&]() -> bool { + auto *cbd = rspamd_mempool_alloc0_type(task->task_pool, rspamd_symcache_delayed_cbdata); + /* Add timer to allow something else to be executed */ + ev_timer *tm = &cbd->tm; + + cbd->event = rspamd_session_add_event (task->s, + rspamd_symcache_delayed_item_fin, cbd, + "symcache"); + cbd->runtime = this; + + /* + * If no event could be added, then we are already in the destruction + * phase. So the main issue is to deal with has slow here + */ + if (cbd->event) { + ev_timer_init (tm, rspamd_symcache_delayed_item_cb, 0.1, 0.0); + ev_set_priority (tm, EV_MINPRI); + rspamd_mempool_add_destructor (task->task_pool, + rspamd_delayed_timer_dtor, cbd); + + cbd->task = task; + cbd->item = item; + tm->data = cbd; + ev_timer_start(task->event_loop, tm); + } + else { + /* Just reset as no timer is added */ + has_slow = FALSE; + return false; + } + + return true; + }; + + if (profile) { + ev_now_update_if_cheap(task->event_loop); + auto diff = ((ev_now(task->event_loop) - profile_start) * 1e3 - + dyn_item->start_msec); + + if (diff > slow_diff_limit) { + + if (!has_slow) { + has_slow = true; + + msg_info_task ("slow rule: %s(%d): %.2f ms; enable slow timer delay", + item->symbol.c_str(), item->id, + diff); + + if (enable_slow_timer()) { + /* Allow network execution */ + return; + } + } + else { + msg_info_task ("slow rule: %s(%d): %.2f ms", + item->symbol.c_str(), item->id, + diff); + } + } + + if (G_UNLIKELY(RSPAMD_TASK_IS_PROFILING(task))) { + rspamd_task_profile_set(task, item->symbol.c_str(), diff); + } + + if (rspamd_worker_is_scanner(task->worker)) { + rspamd_set_counter(item->cd, diff); + } + } + + process_item_rdeps(task, item); +} + +auto symcache_runtime::process_item_rdeps(struct rspamd_task *task, cache_item *item) -> void +{ + auto *cache_ptr = reinterpret_cast<symcache *>(task->cfg->cache); + + for (const auto &rdep: item->rdeps) { + if (rdep.item) { + auto *dyn_item = get_dynamic_item(rdep.item->id, true); + if (!dyn_item->started) { + msg_debug_cache_task ("check item %d(%s) rdep of %s ", + rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str()); + + if (!check_item_deps(task, *cache_ptr, rdep.item.get(), dyn_item, false)) { + msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s " + "unless deps are resolved", + rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str()); + } + else { + process_symbol(task, *cache_ptr, rdep.item.get(), + dyn_item); + } + } + } + } +} + +} + diff --git a/src/libserver/symcache/symcache_runtime.hxx b/src/libserver/symcache/symcache_runtime.hxx new file mode 100644 index 000000000..1d77bfd4a --- /dev/null +++ b/src/libserver/symcache/symcache_runtime.hxx @@ -0,0 +1,203 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/** + * Symcache runtime is produced for each task and it consists of symbols + * being executed, being dynamically disabled/enabled and it also captures + * the current order of the symbols (produced by resort periodic) + */ + +#ifndef RSPAMD_SYMCACHE_RUNTIME_HXX +#define RSPAMD_SYMCACHE_RUNTIME_HXX +#pragma once + +#include "symcache_internal.hxx" + +struct rspamd_scan_result; + +namespace rspamd::symcache { +/** + * These items are saved within task structure and are used to track + * symbols execution. + * Each symcache item occupies a single dynamic item, that currently has 8 bytes + * length + */ +struct cache_dynamic_item { + std::uint16_t start_msec; /* Relative to task time */ + bool started; + bool finished; + std::uint32_t async_events; +}; + +static_assert(sizeof(cache_dynamic_item) == sizeof(std::uint64_t)); +static_assert(std::is_trivial_v<cache_dynamic_item>); + +class symcache_runtime { + unsigned items_inflight; + bool profile; + bool has_slow; + + double profile_start; + double lim; + + struct ::rspamd_scan_result *rs; + + struct cache_item *cur_item; + order_generation_ptr order; + /* Cache of the last items to speed up lookups */ + mutable std::pair<int, int> last_id_mappings[8]; + /* Dynamically expanded as needed */ + mutable struct cache_dynamic_item dynamic_items[]; + /* We allocate this structure merely in memory pool, so destructor is absent */ + ~symcache_runtime() = delete; + /* Dropper for a shared ownership */ + static auto savepoint_dtor(void *ptr) -> void { + auto *real_savepoint = (symcache_runtime *)ptr; + + /* Drop shared ownership */ + real_savepoint->order.reset(); + } + + auto process_symbol(struct rspamd_task *task, symcache &cache, cache_item *item, + cache_dynamic_item *dyn_item) -> bool; + /* Specific stages of the processing */ + auto process_pre_postfilters(struct rspamd_task *task, symcache &cache, int start_events, int stage) -> bool; + auto process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool; + auto check_metric_limit(struct rspamd_task *task) -> bool; + auto check_item_deps(struct rspamd_task *task, symcache &cache, cache_item *item, + cache_dynamic_item *dyn_item, bool check_only) -> bool; + +public: + /** + * Creates a cache runtime using task mempool + * @param task + * @param cache + * @return + */ + static auto create(struct rspamd_task *task, symcache &cache) -> symcache_runtime *; + /** + * Process task settings + * @param task + * @return + */ + auto process_settings(struct rspamd_task *task, const symcache &cache) -> bool; + + /** + * Disable all symbols but not touching ones that are in the specific mask + * @param skip_mask + */ + auto disable_all_symbols(int skip_mask) -> void; + + /** + * Disable a symbol (or it's parent) + * @param name + * @return + */ + auto disable_symbol(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool; + + /** + * Enable a symbol (or it's parent) + * @param name + * @return + */ + auto enable_symbol(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool; + + /** + * Checks if an item has been checked/disabled + * @param cache + * @param name + * @return + */ + auto is_symbol_checked(const symcache &cache, std::string_view name) -> bool; + + /** + * Checks if a symbol is enabled for execution, checking all pending conditions + * @param task + * @param cache + * @param name + * @return + */ + auto is_symbol_enabled(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool; + + /** + * Get the current processed item + * @return + */ + auto get_cur_item() const -> auto { + return cur_item; + } + + /** + * Set the current processed item + * @param item + * @return + */ + auto set_cur_item(cache_item *item) -> auto { + std::swap(item, cur_item); + return item; + } + + /** + * Set profile mode for the runtime + * @param enable + * @return + */ + auto set_profile_mode(bool enable) -> auto { + std::swap(profile, enable); + return enable; + } + + /** + * Returns the dynamic item by static item id + * @param id + * @return + */ + auto get_dynamic_item(int id, bool save_in_cache) const -> cache_dynamic_item *; + + /** + * Process symbols in the cache + * @param task + * @param cache + * @param stage + * @return + */ + auto process_symbols(struct rspamd_task *task, symcache &cache, int stage) -> bool; + + /** + * Finalize execution of some item in the cache + * @param task + * @param item + */ + auto finalize_item(struct rspamd_task *task, cache_item *item) -> void; + + /** + * Process unblocked reverse dependencies of the specific item + * @param task + * @param item + */ + auto process_item_rdeps(struct rspamd_task *task, cache_item *item) -> void; + + /* XXX: a helper to allow hiding internal implementation of the slow timer structure */ + auto unset_slow() -> void { + has_slow = false; + } +}; + + +} + +#endif //RSPAMD_SYMCACHE_RUNTIME_HXX diff --git a/src/libserver/task.h b/src/libserver/task.h index 9c9db32e9..40d8f2705 100644 --- a/src/libserver/task.h +++ b/src/libserver/task.h @@ -131,13 +131,13 @@ enum rspamd_task_stage { #define RSPAMD_TASK_PROTOCOL_FLAG_GROUPS (1u << 6u) #define RSPAMD_TASK_PROTOCOL_FLAG_MAX_SHIFT (6u) -#define RSPAMD_TASK_IS_SKIPPED(task) (((task)->flags & RSPAMD_TASK_FLAG_SKIP)) -#define RSPAMD_TASK_IS_SPAMC(task) (((task)->cmd == CMD_CHECK_SPAMC)) -#define RSPAMD_TASK_IS_PROCESSED(task) (((task)->processed_stages & RSPAMD_TASK_STAGE_DONE)) +#define RSPAMD_TASK_IS_SKIPPED(task) (G_UNLIKELY((task)->flags & RSPAMD_TASK_FLAG_SKIP)) +#define RSPAMD_TASK_IS_SPAMC(task) (G_UNLIKELY((task)->cmd == CMD_CHECK_SPAMC)) +#define RSPAMD_TASK_IS_PROCESSED(task) (G_UNLIKELY((task)->processed_stages & RSPAMD_TASK_STAGE_DONE)) #define RSPAMD_TASK_IS_CLASSIFIED(task) (((task)->processed_stages & RSPAMD_TASK_STAGE_CLASSIFIERS)) -#define RSPAMD_TASK_IS_EMPTY(task) (((task)->flags & RSPAMD_TASK_FLAG_EMPTY)) -#define RSPAMD_TASK_IS_PROFILING(task) (((task)->flags & RSPAMD_TASK_FLAG_PROFILE)) -#define RSPAMD_TASK_IS_MIME(task) (((task)->flags & RSPAMD_TASK_FLAG_MIME)) +#define RSPAMD_TASK_IS_EMPTY(task) (G_UNLIKELY((task)->flags & RSPAMD_TASK_FLAG_EMPTY)) +#define RSPAMD_TASK_IS_PROFILING(task) (G_UNLIKELY((task)->flags & RSPAMD_TASK_FLAG_PROFILE)) +#define RSPAMD_TASK_IS_MIME(task) (G_LIKELY((task)->flags & RSPAMD_TASK_FLAG_MIME)) struct rspamd_email_address; struct rspamd_lang_detector; @@ -207,7 +207,7 @@ struct rspamd_task { struct ev_timer timeout_ev; /**< Global task timeout */ struct ev_io guard_ev; /**< Event for input sanity guard */ - gpointer checkpoint; /**< Opaque checkpoint data */ + gpointer symcache_runtime; /**< Opaque checkpoint data */ ucl_object_t *settings; /**< Settings applied to task */ struct rspamd_config_settings_elt *settings_elt; /**< preprocessed settings id elt */ |