diff options
Diffstat (limited to 'src/libserver/rspamd_symcache.cxx')
-rw-r--r-- | src/libserver/rspamd_symcache.cxx | 3898 |
1 files changed, 3898 insertions, 0 deletions
diff --git a/src/libserver/rspamd_symcache.cxx b/src/libserver/rspamd_symcache.cxx new file mode 100644 index 000000000..a1aa8c504 --- /dev/null +++ b/src/libserver/rspamd_symcache.cxx @@ -0,0 +1,3898 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "util.h" +#include "rspamd.h" +#include "message.h" +#include "rspamd_symcache.h" +#include "cfg_file.h" +#include "lua/lua_common.h" +#include "unix-std.h" +#include "contrib/t1ha/t1ha.h" +#include "libserver/worker_util.h" + +#include <cmath> +#include <cstdint> +#include <vector> +#include <string> +#include <string_view> +#include <memory> +#include <variant> +#include "libutil/cxx/local_shared_ptr.hxx" + +#include "contrib/robin-hood/robin_hood.h" + +#define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ + cache->static_pool->tag.tagname, cache->cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ + cache->static_pool->tag.tagname, cache->cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_info_cache(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ + cache->static_pool->tag.tagname, cache->cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_cache(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_symcache_log_id, "symcache", cache->cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) + +INIT_LOG_MODULE(symcache) + +#define CHECK_START_BIT(checkpoint, dyn_item) \ + ((dyn_item)->started) +#define SET_START_BIT(checkpoint, dyn_item) \ + (dyn_item)->started = 1 +#define CLR_START_BIT(checkpoint, dyn_item) \ + (dyn_item)->started = 0 + +#define CHECK_FINISH_BIT(checkpoint, dyn_item) \ + ((dyn_item)->finished) +#define SET_FINISH_BIT(checkpoint, dyn_item) \ + (dyn_item)->finished = 1 +#define CLR_FINISH_BIT(checkpoint, dyn_item) \ + (dyn_item)->finished = 0 + +namespace rspamd::symcache { + +static const std::uint8_t rspamd_symcache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0}; + +struct rspamd_symcache_header { + std::uint8_t magic[8]; + unsigned int nitems; + std::uint8_t checksum[64]; + std::uint8_t unused[128]; +}; + +struct cache_item; +using cache_item_ptr = rspamd::local_shared_ptr<cache_item>; +using cache_item_weak_ptr = rspamd::local_weak_ptr<cache_item>; + +struct order_generation { + std::vector<cache_item_weak_ptr> d; + unsigned int generation_id; +}; + +using order_generation_ptr = rspamd::local_shared_ptr<order_generation>; + +/* + * This structure is optimised to store ids list: + * - If the first element is -1 then use dynamic part, else use static part + * There is no std::variant to save space + */ +struct id_list { + union { + guint32 st[4]; + struct { + guint32 e; /* First element */ + guint16 len; + guint16 allocated; + guint *n; + } dyn; + }; +}; + +struct item_condition { +private: + gint cb; + lua_State *L; +public: + item_condition() { + // TODO + } + virtual ~item_condition() { + // TODO + } +}; + +class normal_item { +private: + symbol_func_t func; + void *user_data; + std::vector<item_condition> conditions; +public: + explicit normal_item() { + // TODO + } + auto add_condition() -> void { + // TODO + } + auto call() -> void { + // TODO + } +}; + +class virtual_item { +private: + int parent_id; + cache_item_ptr parent; +public: + explicit virtual_item() { + // TODO + } +}; + +struct cache_item { + /* This block is likely shared */ + struct rspamd_symcache_item_stat *st; + struct rspamd_counter_data *cd; + + std::uint64_t last_count; + std::string symbol; + std::string_view type_descr; + int type; + + /* Callback data */ + std::variant<normal_item, virtual_item> specific; + + /* Condition of execution */ + bool enabled; + + /* Priority */ + int priority; + /* Topological order */ + unsigned int order; + /* Unique id - counter */ + int id; + + int frequency_peaks; + /* Settings ids */ + id_list allowed_ids; + /* Allows execution but not symbols insertion */ + id_list exec_only_ids; + id_list forbidden_ids; + + /* Dependencies */ + std::vector<cache_item_ptr> deps; + /* Reverse dependencies */ + std::vector<cache_item_ptr> rdeps; +}; + +struct delayed_cache_dependency { + std::string from; + std::string to; +}; + +struct delayed_cache_condition { + std::string sym; + int cbref; + lua_State *L; +}; + +struct rspamd_symcache { + /* Map indexed by symbol name: all symbols must have unique names, so this map holds ownership */ + robin_hood::unordered_flat_map<std::string_view, cache_item_ptr> items_by_symbol; + std::vector<cache_item_weak_ptr> items_by_id; + + /* Items sorted into some order */ + order_generation_ptr items_by_order; + unsigned int cur_order_gen; + + std::vector<cache_item_weak_ptr> connfilters; + std::vector<cache_item_weak_ptr> prefilters; + std::vector<cache_item_weak_ptr> filters; + std::vector<cache_item_weak_ptr> postfilters; + std::vector<cache_item_weak_ptr> composites; + std::vector<cache_item_weak_ptr> idempotent; + std::vector<cache_item_weak_ptr> virtual_symbols; + + std::vector<delayed_cache_dependency> delayed_deps; + std::vector<delayed_cache_condition> delayed_conditions; + + rspamd_mempool_t *static_pool; + std::uint64_t cksum; + double total_weight; + std::size_t used_items; + std::size_t stats_symbols_count; + std::uint64_t total_hits; + + struct rspamd_config *cfg; + double reload_time; + double last_profile; + int peak_cb; +}; + +struct cache_dynamic_item { + guint16 start_msec; /* Relative to task time */ + unsigned started: 1; + unsigned finished: 1; + /* unsigned pad:14; */ + guint32 async_events; +}; + + +struct cache_dependency { + struct rspamd_symcache_item *item; /* Real dependency */ + gchar *sym; /* Symbolic dep name */ + gint id; /* Real from */ + gint vid; /* Virtual from */ +}; + +struct cache_savepoint { + guint version; + guint items_inflight; + gboolean profile; + gboolean has_slow; + gdouble profile_start; + + struct rspamd_scan_result *rs; + gdouble lim; + + struct rspamd_symcache_item *cur_item; + struct symcache_order *order; + struct rspamd_symcache_dynamic_item dynamic_items[]; +}; + +struct rspamd_cache_refresh_cbdata { + gdouble last_resort; + ev_timer resort_ev; + struct rspamd_symcache *cache; + struct rspamd_worker *w; + struct ev_loop *event_loop; +}; +} // namespace rspamd + +/* At least once per minute */ +#define PROFILE_MAX_TIME (60.0) +/* For messages larger than 2Mb enable profiling */ +#define PROFILE_MESSAGE_SIZE_THRESHOLD (1024 * 1024 * 2) +/* Enable profile at least once per this amount of messages processed */ +#define PROFILE_PROBABILITY (0.01) + +/* weight, frequency, time */ +#define TIME_ALPHA (1.0) +#define WEIGHT_ALPHA (0.1) +#define FREQ_ALPHA (0.01) +#define SCORE_FUN(w, f, t) (((w) > 0 ? (w) : WEIGHT_ALPHA) \ + * ((f) > 0 ? (f) : FREQ_ALPHA) \ + / (t > TIME_ALPHA ? t : TIME_ALPHA)) + +static gboolean rspamd_symcache_check_symbol (struct rspamd_task *task, + struct rspamd_symcache *cache, + struct rspamd_symcache_item *item, + struct cache_savepoint *checkpoint); +static gboolean rspamd_symcache_check_deps (struct rspamd_task *task, + struct rspamd_symcache *cache, + struct rspamd_symcache_item *item, + struct cache_savepoint *checkpoint, + guint recursion, + gboolean check_only); +static void rspamd_symcache_disable_symbol_checkpoint (struct rspamd_task *task, + struct rspamd_symcache *cache, const gchar *symbol); +static void rspamd_symcache_enable_symbol_checkpoint (struct rspamd_task *task, + struct rspamd_symcache *cache, const gchar *symbol); + +static void +rspamd_symcache_order_dtor (gpointer p) +{ + struct symcache_order *ord = p; + + g_ptr_array_free (ord->d, TRUE); + g_free (ord); +} + +static void +rspamd_symcache_order_unref (gpointer p) +{ + struct symcache_order *ord = p; + + REF_RELEASE (ord); +} + +static gint +rspamd_id_cmp (const void * a, const void * b) +{ + return (*(guint32*)a - *(guint32*)b); +} + +static struct symcache_order * +rspamd_symcache_order_new (struct rspamd_symcache *cache, + gsize nelts) +{ + struct symcache_order *ord; + + ord = g_malloc0 (sizeof (*ord)); + ord->d = g_ptr_array_sized_new (nelts); + ord->id = cache->id; + REF_INIT_RETAIN (ord, rspamd_symcache_order_dtor); + + return ord; +} + +static inline struct rspamd_symcache_dynamic_item* +rspamd_symcache_get_dynamic (struct cache_savepoint *checkpoint, + struct rspamd_symcache_item *item) +{ + return &checkpoint->dynamic_items[item->id]; +} + +static inline struct rspamd_symcache_item * +rspamd_symcache_find_filter (struct rspamd_symcache *cache, + const gchar *name, + bool resolve_parent) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + + if (name == NULL) { + return NULL; + } + + item = g_hash_table_lookup (cache->items_by_symbol, name); + + if (item != NULL) { + + if (resolve_parent && item->is_virtual && !(item->type & SYMBOL_TYPE_GHOST)) { + item =item->specific.virtual.parent_item; + } + + return item; + } + + return NULL; +} + +const gchar * +rspamd_symcache_get_parent (struct rspamd_symcache *cache, + const gchar *symbol) +{ + struct rspamd_symcache_item *item, *parent; + + g_assert (cache != NULL); + + if (symbol == NULL) { + return NULL; + } + + item = g_hash_table_lookup (cache->items_by_symbol, symbol); + + if (item != NULL) { + + if (item->is_virtual && !(item->type & SYMBOL_TYPE_GHOST)) { + parent = item->specific.virtual.parent_item; + + if (!parent) { + item->specific.virtual.parent_item = g_ptr_array_index (cache->items_by_id, + item->specific.virtual.parent); + parent = item->specific.virtual.parent_item; + } + + item = parent; + } + + return item->symbol; + } + + return NULL; +} + +static gint +postfilters_cmp (const void *p1, const void *p2, gpointer ud) +{ + const struct rspamd_symcache_item *i1 = *(struct rspamd_symcache_item **)p1, + *i2 = *(struct rspamd_symcache_item **)p2; + double w1, w2; + + w1 = i1->priority; + w2 = i2->priority; + + if (w1 > w2) { + return 1; + } + else if (w1 < w2) { + return -1; + } + + return 0; +} + +static gint +prefilters_cmp (const void *p1, const void *p2, gpointer ud) +{ + const struct rspamd_symcache_item *i1 = *(struct rspamd_symcache_item **)p1, + *i2 = *(struct rspamd_symcache_item **)p2; + double w1, w2; + + w1 = i1->priority; + w2 = i2->priority; + + if (w1 < w2) { + return 1; + } + else if (w1 > w2) { + return -1; + } + + return 0; +} + +#define TSORT_MARK_PERM(it) (it)->order |= (1u << 31) +#define TSORT_MARK_TEMP(it) (it)->order |= (1u << 30) +#define TSORT_IS_MARKED_PERM(it) ((it)->order & (1u << 31)) +#define TSORT_IS_MARKED_TEMP(it) ((it)->order & (1u << 30)) +#define TSORT_UNMASK(it) ((it)->order & ~((1u << 31) | (1u << 30))) + +static gint +cache_logic_cmp (const void *p1, const void *p2, gpointer ud) +{ + const struct rspamd_symcache_item *i1 = *(struct rspamd_symcache_item **)p1, + *i2 = *(struct rspamd_symcache_item **)p2; + struct rspamd_symcache *cache = ud; + double w1, w2; + double weight1, weight2; + double f1 = 0, f2 = 0, t1, t2, avg_freq, avg_weight; + guint o1 = TSORT_UNMASK (i1), o2 = TSORT_UNMASK (i2); + + + if (o1 == o2) { + /* Heuristic */ + if (i1->priority == i2->priority) { + avg_freq = ((gdouble) cache->total_hits / cache->used_items); + avg_weight = (cache->total_weight / cache->used_items); + f1 = (double) i1->st->total_hits / avg_freq; + f2 = (double) i2->st->total_hits / avg_freq; + weight1 = fabs (i1->st->weight) / avg_weight; + weight2 = fabs (i2->st->weight) / avg_weight; + t1 = i1->st->avg_time; + t2 = i2->st->avg_time; + w1 = SCORE_FUN (weight1, f1, t1); + w2 = SCORE_FUN (weight2, f2, t2); + } else { + /* Strict sorting */ + w1 = abs (i1->priority); + w2 = abs (i2->priority); + } + } + else { + w1 = o1; + w2 = o2; + } + + if (w2 > w1) { + return 1; + } + else if (w2 < w1) { + return -1; + } + + return 0; +} + +static void +rspamd_symcache_tsort_visit (struct rspamd_symcache *cache, + struct rspamd_symcache_item *it, + guint cur_order) +{ + struct cache_dependency *dep; + guint i; + + if (TSORT_IS_MARKED_PERM (it)) { + if (cur_order > TSORT_UNMASK (it)) { + /* Need to recalculate the whole chain */ + it->order = cur_order; /* That also removes all masking */ + } + else { + /* We are fine, stop DFS */ + return; + } + } + else if (TSORT_IS_MARKED_TEMP (it)) { + msg_err_cache ("cyclic dependencies found when checking '%s'!", + it->symbol); + return; + } + + TSORT_MARK_TEMP (it); + msg_debug_cache ("visiting node: %s (%d)", it->symbol, cur_order); + + PTR_ARRAY_FOREACH (it->deps, i, dep) { + msg_debug_cache ("visiting dep: %s (%d)", dep->item->symbol, cur_order + 1); + rspamd_symcache_tsort_visit (cache, dep->item, cur_order + 1); + } + + it->order = cur_order; + + TSORT_MARK_PERM (it); +} + +static void +rspamd_symcache_resort (struct rspamd_symcache *cache) +{ + struct symcache_order *ord; + guint i; + guint64 total_hits = 0; + struct rspamd_symcache_item *it; + + ord = rspamd_symcache_order_new (cache, cache->filters->len); + + for (i = 0; i < cache->filters->len; i ++) { + it = g_ptr_array_index (cache->filters, i); + total_hits += it->st->total_hits; + it->order = 0; + g_ptr_array_add (ord->d, it); + } + + /* Topological sort, intended to be O(N) but my implementation + * is not linear (semi-linear usually) as I want to make it as + * simple as possible. + * On each stage it does DFS for unseen nodes. In theory, that + * can be more complicated than linear - O(N^2) for specially + * crafted data. But I don't care. + */ + PTR_ARRAY_FOREACH (ord->d, i, it) { + if (it->order == 0) { + rspamd_symcache_tsort_visit (cache, it, 1); + } + } + + /* + * Now we have all sorted and can do some heuristical sort, keeping + * topological order invariant + */ + g_ptr_array_sort_with_data (ord->d, cache_logic_cmp, cache); + cache->total_hits = total_hits; + + if (cache->items_by_order) { + REF_RELEASE (cache->items_by_order); + } + + cache->items_by_order = ord; +} + +static void +rspamd_symcache_propagate_dep (struct rspamd_symcache *cache, + struct rspamd_symcache_item *it, + struct rspamd_symcache_item *dit) +{ + const guint *ids; + guint nids = 0; + + msg_debug_cache ("check id propagation for dependency %s from %s", + it->symbol, dit->symbol); + ids = rspamd_symcache_get_allowed_settings_ids (cache, dit->symbol, &nids); + + /* TODO: merge? */ + if (nids > 0) { + msg_info_cache ("propagate allowed ids from %s to %s", + dit->symbol, it->symbol); + + rspamd_symcache_set_allowed_settings_ids (cache, it->symbol, ids, + nids); + } + + ids = rspamd_symcache_get_forbidden_settings_ids (cache, dit->symbol, &nids); + + if (nids > 0) { + msg_info_cache ("propagate forbidden ids from %s to %s", + dit->symbol, it->symbol); + + rspamd_symcache_set_forbidden_settings_ids (cache, it->symbol, ids, + nids); + } +} + +static void +rspamd_symcache_process_dep (struct rspamd_symcache *cache, + struct rspamd_symcache_item *it, + struct cache_dependency *dep) +{ + struct rspamd_symcache_item *dit = NULL, *vdit = NULL; + struct cache_dependency *rdep; + + if (dep->id >= 0) { + msg_debug_cache ("process real dependency %s on %s", it->symbol, dep->sym); + dit = rspamd_symcache_find_filter (cache, dep->sym, true); + } + + if (dep->vid >= 0) { + /* Case of the virtual symbol that depends on another (maybe virtual) symbol */ + vdit = rspamd_symcache_find_filter (cache, dep->sym, false); + + if (!vdit) { + if (dit) { + msg_err_cache ("cannot add dependency from %s on %s: no dependency symbol registered", + dep->sym, dit->symbol); + } + } + else { + msg_debug_cache ("process virtual dependency %s(%d) on %s(%d)", it->symbol, + dep->vid, vdit->symbol, vdit->id); + } + } + else { + vdit = dit; + } + + if (dit != NULL) { + if (!dit->is_filter) { + /* + * Check sanity: + * - filters -> prefilter dependency is OK and always satisfied + * - postfilter -> (filter, prefilter) dep is ok + * - idempotent -> (any) dep is OK + * + * Otherwise, emit error + * However, even if everything is fine this dep is useless ¯\_(ツ)_/¯ + */ + gboolean ok_dep = FALSE; + + if (it->is_filter) { + if (dit->is_filter) { + ok_dep = TRUE; + } + else if (dit->type & SYMBOL_TYPE_PREFILTER) { + ok_dep = TRUE; + } + } + else if (it->type & SYMBOL_TYPE_POSTFILTER) { + if (dit->type & SYMBOL_TYPE_PREFILTER) { + ok_dep = TRUE; + } + } + else if (it->type & SYMBOL_TYPE_IDEMPOTENT) { + if (dit->type & (SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER)) { + ok_dep = TRUE; + } + } + else if (it->type & SYMBOL_TYPE_PREFILTER) { + if (it->priority < dit->priority) { + /* Also OK */ + ok_dep = TRUE; + } + } + + if (!ok_dep) { + msg_err_cache ("cannot add dependency from %s on %s: invalid symbol types", + dep->sym, dit->symbol); + + return; + } + } + else { + if (dit->id == it->id) { + msg_err_cache ("cannot add dependency on self: %s -> %s " + "(resolved to %s)", + it->symbol, dep->sym, dit->symbol); + } else { + rdep = rspamd_mempool_alloc (cache->static_pool, + sizeof (*rdep)); + + rdep->sym = dep->sym; + rdep->item = it; + rdep->id = it->id; + g_assert (dit->rdeps != NULL); + g_ptr_array_add (dit->rdeps, rdep); + dep->item = dit; + dep->id = dit->id; + + msg_debug_cache ("add dependency from %d on %d", it->id, + dit->id); + } + } + } + else if (dep->id >= 0) { + msg_err_cache ("cannot find dependency on symbol %s for symbol %s", + dep->sym, it->symbol); + + return; + } + + if (vdit) { + /* Use virtual symbol to propagate deps */ + rspamd_symcache_propagate_dep (cache, it, vdit); + } +} + +/* Sort items in logical order */ +static void +rspamd_symcache_post_init (struct rspamd_symcache *cache) +{ + struct rspamd_symcache_item *it, *vit; + struct cache_dependency *dep; + struct delayed_cache_dependency *ddep; + struct delayed_cache_condition *dcond; + GList *cur; + gint i, j; + + cur = cache->delayed_deps; + while (cur) { + ddep = cur->data; + + vit = rspamd_symcache_find_filter (cache, ddep->from, false); + it = rspamd_symcache_find_filter (cache, ddep->from, true); + + if (it == NULL || vit == NULL) { + msg_err_cache ("cannot register delayed dependency between %s and %s: " + "%s is missing", ddep->from, ddep->to, ddep->from); + } + else { + msg_debug_cache ("delayed between %s(%d:%d) -> %s", ddep->from, + it->id, vit->id, ddep->to); + rspamd_symcache_add_dependency (cache, it->id, ddep->to, vit != it ? + vit->id : -1); + } + + cur = g_list_next (cur); + } + + cur = cache->delayed_conditions; + while (cur) { + dcond = cur->data; + + it = rspamd_symcache_find_filter (cache, dcond->sym, true); + + if (it == NULL) { + msg_err_cache ( + "cannot register delayed condition for %s", + dcond->sym); + luaL_unref (dcond->L, LUA_REGISTRYINDEX, dcond->cbref); + } + else { + struct rspamd_symcache_condition *ncond = rspamd_mempool_alloc0 (cache->static_pool, + sizeof (*ncond)); + ncond->cb = dcond->cbref; + DL_APPEND (it->specific.normal.conditions, ncond); + } + + cur = g_list_next (cur); + } + + PTR_ARRAY_FOREACH (cache->items_by_id, i, it) { + + PTR_ARRAY_FOREACH (it->deps, j, dep) { + rspamd_symcache_process_dep (cache, it, dep); + } + + if (it->deps) { + /* Reversed loop to make removal safe */ + for (j = it->deps->len - 1; j >= 0; j--) { + dep = g_ptr_array_index (it->deps, j); + + if (dep->item == NULL) { + /* Remove useless dep */ + g_ptr_array_remove_index (it->deps, j); + } + } + } + } + + /* Special case for virtual symbols */ + PTR_ARRAY_FOREACH (cache->virtual, i, it) { + + PTR_ARRAY_FOREACH (it->deps, j, dep) { + rspamd_symcache_process_dep (cache, it, dep); + } + } + + g_ptr_array_sort_with_data (cache->connfilters, prefilters_cmp, cache); + g_ptr_array_sort_with_data (cache->prefilters, prefilters_cmp, cache); + g_ptr_array_sort_with_data (cache->postfilters, postfilters_cmp, cache); + g_ptr_array_sort_with_data (cache->idempotent, postfilters_cmp, cache); + + rspamd_symcache_resort (cache); +} + +static gboolean +rspamd_symcache_load_items (struct rspamd_symcache *cache, const gchar *name) +{ + struct rspamd_symcache_header *hdr; + struct stat st; + struct ucl_parser *parser; + ucl_object_t *top; + const ucl_object_t *cur, *elt; + ucl_object_iter_t it; + struct rspamd_symcache_item *item, *parent; + const guchar *p; + gint fd; + gpointer map; + + fd = open (name, O_RDONLY); + + if (fd == -1) { + msg_info_cache ("cannot open file %s, error %d, %s", name, + errno, strerror (errno)); + return FALSE; + } + + rspamd_file_lock (fd, FALSE); + + if (fstat (fd, &st) == -1) { + rspamd_file_unlock (fd, FALSE); + close (fd); + msg_info_cache ("cannot stat file %s, error %d, %s", name, + errno, strerror (errno)); + return FALSE; + } + + if (st.st_size < (gint)sizeof (*hdr)) { + rspamd_file_unlock (fd, FALSE); + close (fd); + errno = EINVAL; + msg_info_cache ("cannot use file %s, error %d, %s", name, + errno, strerror (errno)); + return FALSE; + } + + map = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, fd, 0); + + if (map == MAP_FAILED) { + rspamd_file_unlock (fd, FALSE); + close (fd); + msg_info_cache ("cannot mmap file %s, error %d, %s", name, + errno, strerror (errno)); + return FALSE; + } + + hdr = map; + + if (memcmp (hdr->magic, rspamd_symcache_magic, + sizeof (rspamd_symcache_magic)) != 0) { + msg_info_cache ("cannot use file %s, bad magic", name); + munmap (map, st.st_size); + rspamd_file_unlock (fd, FALSE); + close (fd); + + return FALSE; + } + + parser = ucl_parser_new (0); + p = (const guchar *)(hdr + 1); + + if (!ucl_parser_add_chunk (parser, p, st.st_size - sizeof (*hdr))) { + msg_info_cache ("cannot use file %s, cannot parse: %s", name, + ucl_parser_get_error (parser)); + munmap (map, st.st_size); + ucl_parser_free (parser); + rspamd_file_unlock (fd, FALSE); + close (fd); + + return FALSE; + } + + top = ucl_parser_get_object (parser); + munmap (map, st.st_size); + rspamd_file_unlock (fd, FALSE); + close (fd); + ucl_parser_free (parser); + + if (top == NULL || ucl_object_type (top) != UCL_OBJECT) { + msg_info_cache ("cannot use file %s, bad object", name); + ucl_object_unref (top); + return FALSE; + } + + it = ucl_object_iterate_new (top); + + while ((cur = ucl_object_iterate_safe (it, true))) { + item = g_hash_table_lookup (cache->items_by_symbol, ucl_object_key (cur)); + + if (item) { + /* Copy saved info */ + /* + * XXX: don't save or load weight, it should be obtained from the + * metric + */ +#if 0 + elt = ucl_object_lookup (cur, "weight"); + + if (elt) { + w = ucl_object_todouble (elt); + if (w != 0) { + item->weight = w; + } + } +#endif + elt = ucl_object_lookup (cur, "time"); + if (elt) { + item->st->avg_time = ucl_object_todouble (elt); + } + + elt = ucl_object_lookup (cur, "count"); + if (elt) { + item->st->total_hits = ucl_object_toint (elt); + item->last_count = item->st->total_hits; + } + + elt = ucl_object_lookup (cur, "frequency"); + if (elt && ucl_object_type (elt) == UCL_OBJECT) { + const ucl_object_t *freq_elt; + + freq_elt = ucl_object_lookup (elt, "avg"); + + if (freq_elt) { + item->st->avg_frequency = ucl_object_todouble (freq_elt); + } + freq_elt = ucl_object_lookup (elt, "stddev"); + + if (freq_elt) { + item->st->stddev_frequency = ucl_object_todouble (freq_elt); + } + } + + if (item->is_virtual && !(item->type & SYMBOL_TYPE_GHOST)) { + g_assert (item->specific.virtual.parent < (gint)cache->items_by_id->len); + parent = g_ptr_array_index (cache->items_by_id, + item->specific.virtual.parent); + item->specific.virtual.parent_item = parent; + + if (parent->st->weight < item->st->weight) { + parent->st->weight = item->st->weight; + } + + /* + * We maintain avg_time for virtual symbols equal to the + * parent item avg_time + */ + item->st->avg_time = parent->st->avg_time; + } + + cache->total_weight += fabs (item->st->weight); + cache->total_hits += item->st->total_hits; + } + } + + ucl_object_iterate_free (it); + ucl_object_unref (top); + + return TRUE; +} + +#define ROUND_DOUBLE(x) (floor((x) * 100.0) / 100.0) + +static gboolean +rspamd_symcache_save_items (struct rspamd_symcache *cache, const gchar *name) +{ + struct rspamd_symcache_header hdr; + ucl_object_t *top, *elt, *freq; + GHashTableIter it; + struct rspamd_symcache_item *item; + struct ucl_emitter_functions *efunc; + gpointer k, v; + gint fd; + FILE *fp; + bool ret; + gchar path[PATH_MAX]; + + rspamd_snprintf (path, sizeof (path), "%s.new", name); + + fd = open (path, O_CREAT | O_WRONLY | O_EXCL, 00644); + + if (fd == -1) { + if (errno == EEXIST) { + /* Some other process is already writing data, give up silently */ + return TRUE; + } + + msg_err_cache ("cannot open file %s, error %d, %s", path, + errno, strerror (errno)); + return FALSE; + } + + rspamd_file_lock (fd, FALSE); + fp = fdopen (fd, "w"); + + memset (&hdr, 0, sizeof (hdr)); + memcpy (hdr.magic, rspamd_symcache_magic, + sizeof (rspamd_symcache_magic)); + + if (fwrite (&hdr, sizeof (hdr), 1, fp) == -1) { + msg_err_cache ("cannot write to file %s, error %d, %s", path, + errno, strerror (errno)); + rspamd_file_unlock (fd, FALSE); + fclose (fp); + + return FALSE; + } + + top = ucl_object_typed_new (UCL_OBJECT); + g_hash_table_iter_init (&it, cache->items_by_symbol); + + while (g_hash_table_iter_next (&it, &k, &v)) { + item = v; + elt = ucl_object_typed_new (UCL_OBJECT); + ucl_object_insert_key (elt, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->weight)), + "weight", 0, false); + ucl_object_insert_key (elt, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->time_counter.mean)), + "time", 0, false); + ucl_object_insert_key (elt, ucl_object_fromint (item->st->total_hits), + "count", 0, false); + + freq = ucl_object_typed_new (UCL_OBJECT); + ucl_object_insert_key (freq, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->frequency_counter.mean)), + "avg", 0, false); + ucl_object_insert_key (freq, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->frequency_counter.stddev)), + "stddev", 0, false); + ucl_object_insert_key (elt, freq, "frequency", 0, false); + + ucl_object_insert_key (top, elt, k, 0, false); + } + + efunc = ucl_object_emit_file_funcs (fp); + ret = ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, efunc, NULL); + ucl_object_emit_funcs_free (efunc); + ucl_object_unref (top); + rspamd_file_unlock (fd, FALSE); + fclose (fp); + + if (rename (path, name) == -1) { + msg_err_cache ("cannot rename %s -> %s, error %d, %s", path, name, + errno, strerror (errno)); + (void)unlink (path); + ret = FALSE; + } + + return ret; +} + +#undef ROUND_DOUBLE + +gint +rspamd_symcache_add_symbol (struct rspamd_symcache *cache, + const gchar *name, + gint priority, + symbol_func_t func, + gpointer user_data, + enum rspamd_symbol_type type, + gint parent) +{ + struct rspamd_symcache_item *item = NULL; + const gchar *type_str = "normal"; + + g_assert (cache != NULL); + + if (name == NULL && !(type & SYMBOL_TYPE_CALLBACK)) { + msg_warn_cache ("no name for non-callback symbol!"); + } + else if ((type & SYMBOL_TYPE_VIRTUAL & (~SYMBOL_TYPE_GHOST)) && parent == -1) { + msg_warn_cache ("no parent symbol is associated with virtual symbol %s", + name); + } + + if (name != NULL && !(type & SYMBOL_TYPE_CALLBACK)) { + struct rspamd_symcache_item *existing; + + if (strcspn (name, " \t\n\r") != strlen (name)) { + msg_warn_cache ("bogus characters in symbol name: \"%s\"", + name); + } + + if ((existing = g_hash_table_lookup (cache->items_by_symbol, name)) != NULL) { + + if (existing->type & SYMBOL_TYPE_GHOST) { + /* + * Complicated part: + * - we need to remove the existing ghost symbol + * - we need to cleanup containers: + * - symbols hash + * - specific array + * - items_by_it + * - decrement used_items + */ + msg_info_cache ("duplicate ghost symbol %s is removed", name); + + if (existing->container) { + g_ptr_array_remove (existing->container, existing); + } + + g_ptr_array_remove (cache->items_by_id, existing->container); + cache->used_items --; + g_hash_table_remove (cache->items_by_symbol, name); + /* + * Here can be memory leak, but we assume that ghost symbols + * are also virtual + */ + } + else { + msg_err_cache ("skip duplicate symbol registration for %s", name); + return -1; + } + } + } + + if (type & (SYMBOL_TYPE_CLASSIFIER|SYMBOL_TYPE_CALLBACK| + SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER| + SYMBOL_TYPE_IDEMPOTENT|SYMBOL_TYPE_GHOST)) { + type |= SYMBOL_TYPE_NOSTAT; + } + + item = rspamd_mempool_alloc0 (cache->static_pool, + sizeof (struct rspamd_symcache_item)); + item->st = rspamd_mempool_alloc0_shared (cache->static_pool, + sizeof (*item->st)); + item->enabled = TRUE; + + /* + * We do not share cd to skip locking, instead we'll just calculate it on + * save or accumulate + */ + item->cd = rspamd_mempool_alloc0 (cache->static_pool, + sizeof (struct rspamd_counter_data)); + item->priority = priority; + item->type = type; + + if ((type & SYMBOL_TYPE_FINE) && item->priority == 0) { + /* Make priority for negative weighted symbols */ + item->priority = 1; + } + + if (func) { + /* Non-virtual symbol */ + g_assert (parent == -1); + + if (item->type & SYMBOL_TYPE_PREFILTER) { + type_str = "prefilter"; + g_ptr_array_add (cache->prefilters, item); + item->container = cache->prefilters; + } + else if (item->type & SYMBOL_TYPE_IDEMPOTENT) { + type_str = "idempotent"; + g_ptr_array_add (cache->idempotent, item); + item->container = cache->idempotent; + } + else if (item->type & SYMBOL_TYPE_POSTFILTER) { + type_str = "postfilter"; + g_ptr_array_add (cache->postfilters, item); + item->container = cache->postfilters; + } + else if (item->type & SYMBOL_TYPE_CONNFILTER) { + type_str = "connfilter"; + g_ptr_array_add (cache->connfilters, item); + item->container = cache->connfilters; + } + else { + item->is_filter = TRUE; + g_ptr_array_add (cache->filters, item); + item->container = cache->filters; + } + + item->id = cache->items_by_id->len; + g_ptr_array_add (cache->items_by_id, item); + + item->specific.normal.func = func; + item->specific.normal.user_data = user_data; + item->specific.normal.conditions = NULL; + } + else { + /* + * Three possibilities here when no function is specified: + * - virtual symbol (beware of ghosts!) + * - classifier symbol + * - composite symbol + */ + if (item->type & SYMBOL_TYPE_COMPOSITE) { + item->specific.normal.conditions = NULL; + item->specific.normal.user_data = user_data; + g_assert (user_data != NULL); + g_ptr_array_add (cache->composites, item); + + item->id = cache->items_by_id->len; + g_ptr_array_add (cache->items_by_id, item); + item->container = cache->composites; + type_str = "composite"; + } + else if (item->type & SYMBOL_TYPE_CLASSIFIER) { + /* Treat it as normal symbol to allow enable/disable */ + item->id = cache->items_by_id->len; + g_ptr_array_add (cache->items_by_id, item); + + item->is_filter = TRUE; + item->specific.normal.func = NULL; + item->specific.normal.user_data = NULL; + item->specific.normal.conditions = NULL; + type_str = "classifier"; + } + else { + item->is_virtual = TRUE; + item->specific.virtual.parent = parent; + item->specific.virtual.parent_item = + g_ptr_array_index (cache->items_by_id, parent); + item->id = cache->virtual->len; + g_ptr_array_add (cache->virtual, item); + item->container = cache->virtual; + /* Not added to items_by_id, handled by parent */ + type_str = "virtual"; + } + } + + cache->used_items ++; + cache->id ++; + + if (!(item->type & + (SYMBOL_TYPE_IDEMPOTENT|SYMBOL_TYPE_NOSTAT|SYMBOL_TYPE_CLASSIFIER))) { + if (name != NULL) { + cache->cksum = t1ha (name, strlen (name), + cache->cksum); + } else { + cache->cksum = t1ha (&item->id, sizeof (item->id), + cache->cksum); + } + + cache->stats_symbols_count ++; + } + + if (name != NULL) { + item->symbol = rspamd_mempool_strdup (cache->static_pool, name); + msg_debug_cache ("used items: %d, added symbol: %s, %d; symbol type: %s", + cache->used_items, name, item->id, type_str); + } else { + g_assert (func != NULL); + msg_debug_cache ("used items: %d, added unnamed symbol: %d; symbol type: %s", + cache->used_items, item->id, type_str); + } + + item->deps = g_ptr_array_new (); + item->rdeps = g_ptr_array_new (); + item->type_descr = type_str; + rspamd_mempool_add_destructor (cache->static_pool, + rspamd_ptr_array_free_hard, item->deps); + rspamd_mempool_add_destructor (cache->static_pool, + rspamd_ptr_array_free_hard, item->rdeps); + + if (name != NULL) { + g_hash_table_insert (cache->items_by_symbol, item->symbol, item); + } + + return item->id; +} + +void +rspamd_symcache_set_peak_callback (struct rspamd_symcache *cache, + gint cbref) +{ + g_assert (cache != NULL); + + if (cache->peak_cb != -1) { + luaL_unref (cache->cfg->lua_state, LUA_REGISTRYINDEX, + cache->peak_cb); + } + + cache->peak_cb = cbref; + msg_info_cache ("registered peak callback"); +} + +gboolean +rspamd_symcache_add_condition_delayed (struct rspamd_symcache *cache, + const gchar *sym, lua_State *L, gint cbref) +{ + struct delayed_cache_condition *ncond; + + g_assert (cache != NULL); + g_assert (sym != NULL); + + ncond = g_malloc0 (sizeof (*ncond)); + ncond->sym = g_strdup (sym); + ncond->cbref = cbref; + ncond->L = L; + cache->id ++; + + cache->delayed_conditions = g_list_prepend (cache->delayed_conditions, ncond); + + return TRUE; +} + +void +rspamd_symcache_save (struct rspamd_symcache *cache) +{ + if (cache != NULL) { + + if (cache->cfg->cache_filename) { + /* Try to sync values to the disk */ + if (!rspamd_symcache_save_items (cache, + cache->cfg->cache_filename)) { + msg_err_cache ("cannot save cache data to %s: %s", + cache->cfg->cache_filename, strerror (errno)); + } + } + } +} + +void +rspamd_symcache_destroy (struct rspamd_symcache *cache) +{ + GList *cur; + struct delayed_cache_dependency *ddep; + struct delayed_cache_condition *dcond; + + if (cache != NULL) { + if (cache->delayed_deps) { + cur = cache->delayed_deps; + + while (cur) { + ddep = cur->data; + g_free (ddep->from); + g_free (ddep->to); + g_free (ddep); + cur = g_list_next (cur); + } + + g_list_free (cache->delayed_deps); + } + + if (cache->delayed_conditions) { + cur = cache->delayed_conditions; + + while (cur) { + dcond = cur->data; + g_free (dcond->sym); + g_free (dcond); + cur = g_list_next (cur); + } + + g_list_free (cache->delayed_conditions); + } + + g_hash_table_destroy (cache->items_by_symbol); + g_ptr_array_free (cache->items_by_id, TRUE); + rspamd_mempool_delete (cache->static_pool); + g_ptr_array_free (cache->connfilters, TRUE); + g_ptr_array_free (cache->prefilters, TRUE); + g_ptr_array_free (cache->filters, TRUE); + g_ptr_array_free (cache->postfilters, TRUE); + g_ptr_array_free (cache->idempotent, TRUE); + g_ptr_array_free (cache->composites, TRUE); + g_ptr_array_free (cache->virtual, TRUE); + REF_RELEASE (cache->items_by_order); + + if (cache->peak_cb != -1) { + luaL_unref (cache->cfg->lua_state, LUA_REGISTRYINDEX, cache->peak_cb); + } + + g_free (cache); + } +} + +struct rspamd_symcache* +rspamd_symcache_new (struct rspamd_config *cfg) +{ + struct rspamd_symcache *cache; + + cache = g_malloc0 (sizeof (struct rspamd_symcache)); + cache->static_pool = + rspamd_mempool_new (rspamd_mempool_suggest_size (), "symcache", 0); + cache->items_by_symbol = g_hash_table_new (rspamd_str_hash, + rspamd_str_equal); + cache->items_by_id = g_ptr_array_new (); + cache->connfilters = g_ptr_array_new (); + cache->prefilters = g_ptr_array_new (); + cache->filters = g_ptr_array_new (); + cache->postfilters = g_ptr_array_new (); + cache->idempotent = g_ptr_array_new (); + cache->composites = g_ptr_array_new (); + cache->virtual = g_ptr_array_new (); + cache->reload_time = cfg->cache_reload_time; + cache->total_hits = 1; + cache->total_weight = 1.0; + cache->cfg = cfg; + cache->cksum = 0xdeadbabe; + cache->peak_cb = -1; + cache->id = (guint)rspamd_random_uint64_fast (); + + return cache; +} + +static void +rspamd_symcache_metric_connect_cb (gpointer k, gpointer v, gpointer ud) +{ + struct rspamd_symcache *cache = (struct rspamd_symcache *)ud; + const gchar *sym = k; + struct rspamd_symbol *s = (struct rspamd_symbol *)v; + gdouble weight; + struct rspamd_symcache_item *item; + + weight = *s->weight_ptr; + item = g_hash_table_lookup (cache->items_by_symbol, sym); + + if (item) { + item->st->weight = weight; + s->cache_item = item; + } +} + +gboolean +rspamd_symcache_init (struct rspamd_symcache *cache) +{ + gboolean res = TRUE; + + g_assert (cache != NULL); + + cache->reload_time = cache->cfg->cache_reload_time; + + if (cache->cfg->cache_filename != NULL) { + res = rspamd_symcache_load_items (cache, cache->cfg->cache_filename); + } + + rspamd_symcache_post_init (cache); + + /* Connect metric symbols with symcache symbols */ + if (cache->cfg->symbols) { + g_hash_table_foreach (cache->cfg->symbols, + rspamd_symcache_metric_connect_cb, + cache); + } + + return res; +} + + +static void +rspamd_symcache_validate_cb (gpointer k, gpointer v, gpointer ud) +{ + struct rspamd_symcache_item *item = v, *parent; + struct rspamd_config *cfg; + struct rspamd_symcache *cache = (struct rspamd_symcache *)ud; + struct rspamd_symbol *s; + gboolean skipped, ghost; + gint p1, p2; + + ghost = item->st->weight == 0 ? TRUE : FALSE; + cfg = cache->cfg; + + /* Check whether this item is skipped */ + skipped = !ghost; + g_assert (cfg != NULL); + + if ((item->type & + (SYMBOL_TYPE_NORMAL|SYMBOL_TYPE_VIRTUAL|SYMBOL_TYPE_COMPOSITE|SYMBOL_TYPE_CLASSIFIER)) + && g_hash_table_lookup (cfg->symbols, item->symbol) == NULL) { + + if (!isnan(cfg->unknown_weight)) { + + skipped = FALSE; + item->st->weight = cfg->unknown_weight; + s = rspamd_mempool_alloc0 (cache->static_pool, + sizeof (*s)); + s->name = item->symbol; + s->weight_ptr = &item->st->weight; + g_hash_table_insert (cfg->symbols, item->symbol, s); + + msg_info_cache ("adding unknown symbol %s with weight: %.2f", + item->symbol, cfg->unknown_weight); + ghost = FALSE; + } + else { + skipped = TRUE; + } + } + else { + skipped = FALSE; + } + + if (!ghost && skipped) { + if (!(item->type & SYMBOL_TYPE_SKIPPED)) { + item->type |= SYMBOL_TYPE_SKIPPED; + msg_warn_cache ("symbol %s has no score registered, skip its check", + item->symbol); + } + } + + if (ghost) { + msg_debug_cache ("symbol %s is registered as ghost symbol, it won't be inserted " + "to any metric", item->symbol); + } + + if (item->st->weight < 0 && item->priority == 0) { + item->priority ++; + } + + if (item->is_virtual) { + if (!(item->type & SYMBOL_TYPE_GHOST)) { + g_assert (item->specific.virtual.parent != -1); + g_assert (item->specific.virtual.parent < (gint) cache->items_by_id->len); + parent = g_ptr_array_index (cache->items_by_id, + item->specific.virtual.parent); + item->specific.virtual.parent_item = parent; + + if (fabs (parent->st->weight) < fabs (item->st->weight)) { + parent->st->weight = item->st->weight; + } + + p1 = abs (item->priority); + p2 = abs (parent->priority); + + if (p1 != p2) { + parent->priority = MAX (p1, p2); + item->priority = parent->priority; + } + } + } + + cache->total_weight += fabs (item->st->weight); +} + +gboolean +rspamd_symcache_validate (struct rspamd_symcache *cache, + struct rspamd_config *cfg, + gboolean strict) +{ + struct rspamd_symcache_item *item; + GHashTableIter it; + gpointer k, v; + struct rspamd_symbol *sym_def; + gboolean ignore_symbol = FALSE, ret = TRUE; + + if (cache == NULL) { + msg_err ("empty cache is invalid"); + return FALSE; + } + + + g_hash_table_foreach (cache->items_by_symbol, + rspamd_symcache_validate_cb, + cache); + /* Now check each metric item and find corresponding symbol in a cache */ + g_hash_table_iter_init (&it, cfg->symbols); + + while (g_hash_table_iter_next (&it, &k, &v)) { + ignore_symbol = FALSE; + sym_def = v; + + if (sym_def && (sym_def->flags & + (RSPAMD_SYMBOL_FLAG_IGNORE_METRIC|RSPAMD_SYMBOL_FLAG_DISABLED))) { + ignore_symbol = TRUE; + } + + if (!ignore_symbol) { + item = g_hash_table_lookup (cache->items_by_symbol, k); + + if (item == NULL) { + msg_warn_cache ( + "symbol '%s' has its score defined but there is no " + "corresponding rule registered", + k); + if (strict) { + ret = FALSE; + } + } + } + else if (sym_def->flags & RSPAMD_SYMBOL_FLAG_DISABLED) { + item = g_hash_table_lookup (cache->items_by_symbol, k); + + if (item) { + item->enabled = FALSE; + } + } + } + + return ret; +} + +/* Return true if metric has score that is more than spam score for it */ +static gboolean +rspamd_symcache_metric_limit (struct rspamd_task *task, + struct cache_savepoint *cp) +{ + struct rspamd_scan_result *res; + double ms; + + if (task->flags & RSPAMD_TASK_FLAG_PASS_ALL) { + return FALSE; + } + + if (cp->lim == 0.0) { + res = task->result; + + if (res) { + ms = rspamd_task_get_required_score (task, res); + + if (!isnan (ms) && cp->lim < ms) { + cp->rs = res; + cp->lim = ms; + } + } + } + + if (cp->rs) { + + if (cp->rs->score > cp->lim) { + return TRUE; + } + } + else { + /* No reject score define, always check all rules */ + cp->lim = -1; + } + + return FALSE; +} + +static inline gboolean +rspamd_symcache_check_id_list (const struct rspamd_symcache_id_list *ls, guint32 id) +{ + guint i; + + if (ls->dyn.e == -1) { + guint *res = bsearch (&id, ls->dyn.n, ls->dyn.len, sizeof (guint32), + rspamd_id_cmp); + + if (res) { + return TRUE; + } + } + else { + for (i = 0; i < G_N_ELEMENTS (ls->st); i ++) { + if (ls->st[i] == id) { + return TRUE; + } + else if (ls->st[i] == 0) { + return FALSE; + } + } + } + + return FALSE; +} + +gboolean +rspamd_symcache_is_item_allowed (struct rspamd_task *task, + struct rspamd_symcache_item *item, + gboolean exec_only) +{ + const gchar *what = "execution"; + + if (!exec_only) { + what = "symbol insertion"; + } + + /* Static checks */ + if (!item->enabled || + (RSPAMD_TASK_IS_EMPTY (task) && !(item->type & SYMBOL_TYPE_EMPTY)) || + (item->type & SYMBOL_TYPE_MIME_ONLY && !RSPAMD_TASK_IS_MIME(task))) { + + if (!item->enabled) { + msg_debug_cache_task ("skipping %s of %s as it is permanently disabled; symbol type=%s", + what, item->symbol, item->type_descr); + + return FALSE; + } + else { + /* + * Exclude virtual symbols + */ + if (exec_only) { + msg_debug_cache_task ("skipping check of %s as it cannot be " + "executed for this task type; symbol type=%s", + item->symbol, item->type_descr); + + return FALSE; + } + } + } + + /* Settings checks */ + if (task->settings_elt != 0) { + guint32 id = task->settings_elt->id; + + if (item->forbidden_ids.st[0] != 0 && + rspamd_symcache_check_id_list (&item->forbidden_ids, + id)) { + msg_debug_cache_task ("deny %s of %s as it is forbidden for " + "settings id %ud; symbol type=%s", + what, + item->symbol, + id, + item->type_descr); + + return FALSE; + } + + if (!(item->type & SYMBOL_TYPE_EXPLICIT_DISABLE)) { + if (item->allowed_ids.st[0] == 0 || + !rspamd_symcache_check_id_list (&item->allowed_ids, + id)) { + + if (task->settings_elt->policy == RSPAMD_SETTINGS_POLICY_IMPLICIT_ALLOW) { + msg_debug_cache_task ("allow execution of %s settings id %ud " + "allows implicit execution of the symbols;" + "symbol type=%s", + item->symbol, + id, + item->type_descr); + + return TRUE; + } + + if (exec_only) { + /* + * Special case if any of our virtual children are enabled + */ + if (rspamd_symcache_check_id_list (&item->exec_only_ids, id)) { + return TRUE; + } + } + + msg_debug_cache_task ("deny %s of %s as it is not listed " + "as allowed for settings id %ud; symbol type=%s", + what, + item->symbol, + id, + item->type_descr); + return FALSE; + } + } + else { + msg_debug_cache_task ("allow %s of %s for " + "settings id %ud as it can be only disabled explicitly;" + " symbol type=%s", + what, + item->symbol, + id, + item->type_descr); + } + } + else if (item->type & SYMBOL_TYPE_EXPLICIT_ENABLE) { + msg_debug_cache_task ("deny %s of %s as it must be explicitly enabled; symbol type=%s", + what, + item->symbol, + item->type_descr); + return FALSE; + } + + /* Allow all symbols with no settings id */ + return TRUE; +} + +static gboolean +rspamd_symcache_check_symbol (struct rspamd_task *task, + struct rspamd_symcache *cache, + struct rspamd_symcache_item *item, + struct cache_savepoint *checkpoint) +{ + struct rspamd_task **ptask; + lua_State *L; + gboolean check = TRUE; + struct rspamd_symcache_dynamic_item *dyn_item = + rspamd_symcache_get_dynamic (checkpoint, item); + + if (item->type & (SYMBOL_TYPE_CLASSIFIER|SYMBOL_TYPE_COMPOSITE)) { + /* Classifiers are special :( */ + return TRUE; + } + + if (rspamd_session_blocked (task->s)) { + /* + * We cannot add new events as session is either destroyed or + * being cleaned up. + */ + return TRUE; + } + + g_assert (!item->is_virtual); + g_assert (item->specific.normal.func != NULL); + if (CHECK_START_BIT (checkpoint, dyn_item)) { + /* + * This can actually happen when deps span over different layers + */ + return CHECK_FINISH_BIT (checkpoint, dyn_item); + } + + /* Check has been started */ + SET_START_BIT (checkpoint, dyn_item); + + if (!rspamd_symcache_is_item_allowed (task, item, TRUE)) { + check = FALSE; + } + else if (item->specific.normal.conditions) { + struct rspamd_symcache_condition *cur_cond; + + DL_FOREACH (item->specific.normal.conditions, cur_cond) { + /* We also executes condition callback to check if we need this symbol */ + L = task->cfg->lua_state; + lua_rawgeti (L, LUA_REGISTRYINDEX, cur_cond->cb); + ptask = lua_newuserdata (L, sizeof (struct rspamd_task *)); + rspamd_lua_setclass (L, "rspamd{task}", -1); + *ptask = task; + + if (lua_pcall (L, 1, 1, 0) != 0) { + msg_info_task ("call to condition for %s failed: %s", + item->symbol, lua_tostring (L, -1)); + lua_pop (L, 1); + } + else { + check = lua_toboolean (L, -1); + lua_pop (L, 1); + } + + if (!check) { + break; + } + } + + if (!check) { + msg_debug_cache_task ("skipping check of %s as its start condition is false; " + "symbol type = %s", + item->symbol, item->type_descr); + } + } + + if (check) { + msg_debug_cache_task ("execute %s, %d; symbol type = %s", item->symbol, + item->id, item->type_descr); + + if (checkpoint->profile) { + ev_now_update_if_cheap (task->event_loop); + dyn_item->start_msec = (ev_now (task->event_loop) - + checkpoint->profile_start) * 1e3; + } + + dyn_item->async_events = 0; + checkpoint->cur_item = item; + checkpoint->items_inflight ++; + /* Callback now must finalize itself */ + item->specific.normal.func (task, item, item->specific.normal.user_data); + checkpoint->cur_item = NULL; + + if (checkpoint->items_inflight == 0) { + + return TRUE; + } + + if (dyn_item->async_events == 0 && !CHECK_FINISH_BIT (checkpoint, dyn_item)) { + msg_err_cache ("critical error: item %s has no async events pending, " + "but it is not finalised", item->symbol); + g_assert_not_reached (); + } + + return FALSE; + } + else { + SET_FINISH_BIT (checkpoint, dyn_item); + } + + return TRUE; +} + +static gboolean +rspamd_symcache_check_deps (struct rspamd_task *task, + struct rspamd_symcache *cache, + struct rspamd_symcache_item *item, + struct cache_savepoint *checkpoint, + guint recursion, + gboolean check_only) +{ + struct cache_dependency *dep; + guint i; + gboolean ret = TRUE; + static const guint max_recursion = 20; + struct rspamd_symcache_dynamic_item *dyn_item; + + if (recursion > max_recursion) { + msg_err_task ("cyclic dependencies: maximum check level %ud exceed when " + "checking dependencies for %s", max_recursion, item->symbol); + + return TRUE; + } + + if (item->deps != NULL && item->deps->len > 0) { + for (i = 0; i < item->deps->len; i ++) { + dep = g_ptr_array_index (item->deps, i); + + if (dep->item == NULL) { + /* Assume invalid deps as done */ + msg_debug_cache_task ("symbol %d(%s) has invalid dependencies on %d(%s)", + item->id, item->symbol, dep->id, dep->sym); + continue; + } + + dyn_item = rspamd_symcache_get_dynamic (checkpoint, dep->item); + + if (!CHECK_FINISH_BIT (checkpoint, dyn_item)) { + if (!CHECK_START_BIT (checkpoint, dyn_item)) { + /* Not started */ + if (!check_only) { + if (!rspamd_symcache_check_deps (task, cache, + dep->item, + checkpoint, + recursion + 1, + check_only)) { + + ret = FALSE; + msg_debug_cache_task ("delayed dependency %d(%s) for " + "symbol %d(%s)", + dep->id, dep->sym, item->id, item->symbol); + } + else if (!rspamd_symcache_check_symbol (task, cache, + dep->item, + checkpoint)) { + /* Now started, but has events pending */ + ret = FALSE; + msg_debug_cache_task ("started check of %d(%s) symbol " + "as dep for " + "%d(%s)", + dep->id, dep->sym, item->id, item->symbol); + } + else { + msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is " + "already processed", + dep->id, dep->sym, item->id, item->symbol); + } + } + else { + msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) " + "cannot be started now", + dep->id, dep->sym, + item->id, item->symbol); + ret = FALSE; + } + } + else { + /* Started but not finished */ + msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is " + "still executing", + dep->id, dep->sym, + item->id, item->symbol); + ret = FALSE; + } + } + else { + msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is already " + "checked", + dep->id, dep->sym, + item->id, item->symbol); + } + } + } + + return ret; +} + +static struct cache_savepoint * +rspamd_symcache_make_checkpoint (struct rspamd_task *task, + struct rspamd_symcache *cache) +{ + struct cache_savepoint *checkpoint; + + if (cache->items_by_order->id != cache->id) { + /* + * Cache has been modified, need to resort it + */ + msg_info_cache ("symbols cache has been modified since last check:" + " old id: %ud, new id: %ud", + cache->items_by_order->id, cache->id); + rspamd_symcache_resort (cache); + } + + checkpoint = rspamd_mempool_alloc0 (task->task_pool, + sizeof (*checkpoint) + + sizeof (struct rspamd_symcache_dynamic_item) * cache->items_by_id->len); + + g_assert (cache->items_by_order != NULL); + checkpoint->version = cache->items_by_order->d->len; + checkpoint->order = cache->items_by_order; + REF_RETAIN (checkpoint->order); + rspamd_mempool_add_destructor (task->task_pool, + rspamd_symcache_order_unref, checkpoint->order); + + /* Calculate profile probability */ + ev_now_update_if_cheap (task->event_loop); + ev_tstamp now = ev_now (task->event_loop); + checkpoint->profile_start = now; + + if ((cache->last_profile == 0.0 || now > cache->last_profile + PROFILE_MAX_TIME) || + (task->msg.len >= PROFILE_MESSAGE_SIZE_THRESHOLD) || + (rspamd_random_double_fast () >= (1 - PROFILE_PROBABILITY))) { + msg_debug_cache_task ("enable profiling of symbols for task"); + checkpoint->profile = TRUE; + cache->last_profile = now; + } + + task->checkpoint = checkpoint; + + return checkpoint; +} + +gboolean +rspamd_symcache_process_settings (struct rspamd_task *task, + struct rspamd_symcache *cache) +{ + const ucl_object_t *wl, *cur, *disabled, *enabled; + struct rspamd_symbols_group *gr; + GHashTableIter gr_it; + ucl_object_iter_t it = NULL; + gboolean already_disabled = FALSE; + gpointer k, v; + + wl = ucl_object_lookup (task->settings, "whitelist"); + + if (wl != NULL) { + msg_info_task ("task is whitelisted"); + task->flags |= RSPAMD_TASK_FLAG_SKIP; + return TRUE; + } + + enabled = ucl_object_lookup (task->settings, "symbols_enabled"); + + if (enabled) { + /* Disable all symbols but selected */ + rspamd_symcache_disable_all_symbols (task, cache, + SYMBOL_TYPE_EXPLICIT_DISABLE); + already_disabled = TRUE; + it = NULL; + + while ((cur = ucl_iterate_object (enabled, &it, true)) != NULL) { + rspamd_symcache_enable_symbol_checkpoint (task, cache, + ucl_object_tostring (cur)); + } + } + + /* Enable groups of symbols */ + enabled = ucl_object_lookup (task->settings, "groups_enabled"); + + if (enabled) { + it = NULL; + + if (!already_disabled) { + rspamd_symcache_disable_all_symbols (task, cache, + SYMBOL_TYPE_EXPLICIT_DISABLE); + } + + while ((cur = ucl_iterate_object (enabled, &it, true)) != NULL) { + if (ucl_object_type (cur) == UCL_STRING) { + gr = g_hash_table_lookup (task->cfg->groups, + ucl_object_tostring (cur)); + + if (gr) { + g_hash_table_iter_init (&gr_it, gr->symbols); + + while (g_hash_table_iter_next (&gr_it, &k, &v)) { + rspamd_symcache_enable_symbol_checkpoint (task, cache, k); + } + } + } + } + } + + disabled = ucl_object_lookup (task->settings, "symbols_disabled"); + + if (disabled) { + it = NULL; + + while ((cur = ucl_iterate_object (disabled, &it, true)) != NULL) { + rspamd_symcache_disable_symbol_checkpoint (task, cache, + ucl_object_tostring (cur)); + } + } + + /* Disable groups of symbols */ + disabled = ucl_object_lookup (task->settings, "groups_disabled"); + + if (disabled) { + it = NULL; + + while ((cur = ucl_iterate_object (disabled, &it, true)) != NULL) { + if (ucl_object_type (cur) == UCL_STRING) { + gr = g_hash_table_lookup (task->cfg->groups, + ucl_object_tostring (cur)); + + if (gr) { + g_hash_table_iter_init (&gr_it, gr->symbols); + + while (g_hash_table_iter_next (&gr_it, &k, &v)) { + rspamd_symcache_disable_symbol_checkpoint (task, cache, k); + } + } + } + } + } + + return FALSE; +} + +gboolean +rspamd_symcache_process_symbols (struct rspamd_task *task, + struct rspamd_symcache *cache, + gint stage) +{ + struct rspamd_symcache_item *item = NULL; + struct rspamd_symcache_dynamic_item *dyn_item; + struct cache_savepoint *checkpoint; + gint i; + gboolean all_done = TRUE; + gint saved_priority; + guint start_events_pending; + + g_assert (cache != NULL); + + if (task->checkpoint == NULL) { + checkpoint = rspamd_symcache_make_checkpoint (task, cache); + task->checkpoint = checkpoint; + } + else { + checkpoint = task->checkpoint; + } + + msg_debug_cache_task ("symbols processing stage at pass: %d", stage); + start_events_pending = rspamd_session_events_pending (task->s); + + switch (stage) { + case RSPAMD_TASK_STAGE_CONNFILTERS: + /* Check for connection filters */ + saved_priority = G_MININT; + all_done = TRUE; + + for (i = 0; i < (gint) cache->connfilters->len; i++) { + item = g_ptr_array_index (cache->connfilters, i); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + + if (RSPAMD_TASK_IS_SKIPPED (task)) { + return TRUE; + } + + if (!CHECK_START_BIT (checkpoint, dyn_item) && + !CHECK_FINISH_BIT (checkpoint, dyn_item)) { + + if (checkpoint->has_slow) { + /* Delay */ + checkpoint->has_slow = FALSE; + + return FALSE; + } + /* Check priorities */ + if (saved_priority == G_MININT) { + saved_priority = item->priority; + } + else { + if (item->priority < saved_priority && + rspamd_session_events_pending (task->s) > start_events_pending) { + /* + * Delay further checks as we have higher + * priority filters to be processed + */ + return FALSE; + } + } + + rspamd_symcache_check_symbol (task, cache, item, + checkpoint); + all_done = FALSE; + } + } + break; + + case RSPAMD_TASK_STAGE_PRE_FILTERS: + /* Check for prefilters */ + saved_priority = G_MININT; + all_done = TRUE; + + for (i = 0; i < (gint) cache->prefilters->len; i++) { + item = g_ptr_array_index (cache->prefilters, i); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + + if (RSPAMD_TASK_IS_SKIPPED (task)) { + return TRUE; + } + + if (!CHECK_START_BIT (checkpoint, dyn_item) && + !CHECK_FINISH_BIT (checkpoint, dyn_item)) { + /* Check priorities */ + if (checkpoint->has_slow) { + /* Delay */ + checkpoint->has_slow = FALSE; + + return FALSE; + } + + if (saved_priority == G_MININT) { + saved_priority = item->priority; + } + else { + if (item->priority < saved_priority && + rspamd_session_events_pending (task->s) > start_events_pending) { + /* + * Delay further checks as we have higher + * priority filters to be processed + */ + return FALSE; + } + } + + rspamd_symcache_check_symbol (task, cache, item, + checkpoint); + all_done = FALSE; + } + } + + break; + + case RSPAMD_TASK_STAGE_FILTERS: + all_done = TRUE; + + for (i = 0; i < (gint) checkpoint->version; i++) { + if (RSPAMD_TASK_IS_SKIPPED (task)) { + return TRUE; + } + + item = g_ptr_array_index (checkpoint->order->d, i); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + + if (item->type & SYMBOL_TYPE_CLASSIFIER) { + continue; + } + + if (!CHECK_START_BIT (checkpoint, dyn_item)) { + all_done = FALSE; + + if (!rspamd_symcache_check_deps (task, cache, item, + checkpoint, 0, FALSE)) { + + msg_debug_cache_task ("blocked execution of %d(%s) unless deps are " + "resolved", + item->id, item->symbol); + + continue; + } + + rspamd_symcache_check_symbol (task, cache, item, + checkpoint); + + if (checkpoint->has_slow) { + /* Delay */ + checkpoint->has_slow = FALSE; + + return FALSE; + } + } + + if (!(item->type & SYMBOL_TYPE_FINE)) { + if (rspamd_symcache_metric_limit (task, checkpoint)) { + msg_info_task ("task has already scored more than %.2f, so do " + "not " + "plan more checks", + checkpoint->rs->score); + all_done = TRUE; + break; + } + } + } + + break; + + case RSPAMD_TASK_STAGE_POST_FILTERS: + /* Check for postfilters */ + saved_priority = G_MININT; + all_done = TRUE; + + for (i = 0; i < (gint) cache->postfilters->len; i++) { + if (RSPAMD_TASK_IS_SKIPPED (task)) { + return TRUE; + } + + item = g_ptr_array_index (cache->postfilters, i); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + + if (!CHECK_START_BIT (checkpoint, dyn_item) && + !CHECK_FINISH_BIT (checkpoint, dyn_item)) { + /* Check priorities */ + all_done = FALSE; + + if (checkpoint->has_slow) { + /* Delay */ + checkpoint->has_slow = FALSE; + + return FALSE; + } + + if (saved_priority == G_MININT) { + saved_priority = item->priority; + } + else { + if (item->priority > saved_priority && + rspamd_session_events_pending (task->s) > start_events_pending) { + /* + * Delay further checks as we have higher + * priority filters to be processed + */ + + return FALSE; + } + } + + rspamd_symcache_check_symbol (task, cache, item, + checkpoint); + } + } + + break; + + case RSPAMD_TASK_STAGE_IDEMPOTENT: + /* Check for postfilters */ + saved_priority = G_MININT; + + for (i = 0; i < (gint) cache->idempotent->len; i++) { + item = g_ptr_array_index (cache->idempotent, i); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + + if (!CHECK_START_BIT (checkpoint, dyn_item) && + !CHECK_FINISH_BIT (checkpoint, dyn_item)) { + /* Check priorities */ + if (checkpoint->has_slow) { + /* Delay */ + checkpoint->has_slow = FALSE; + + return FALSE; + } + + if (saved_priority == G_MININT) { + saved_priority = item->priority; + } + else { + if (item->priority > saved_priority && + rspamd_session_events_pending (task->s) > start_events_pending) { + /* + * Delay further checks as we have higher + * priority filters to be processed + */ + return FALSE; + } + } + rspamd_symcache_check_symbol (task, cache, item, + checkpoint); + } + } + break; + default: + g_assert_not_reached (); + } + + return all_done; +} + +struct counters_cbdata { + ucl_object_t *top; + struct rspamd_symcache *cache; +}; + +/* Leave several digits */ +#define P10(X) (1e##X) +#define ROUND_DOUBLE_DIGITS(x, dig) (floor((x) * P10(dig)) / P10(dig)) +#define ROUND_DOUBLE(x) ROUND_DOUBLE_DIGITS(x, 3) + +static void +rspamd_symcache_counters_cb (gpointer k, gpointer v, gpointer ud) +{ + struct counters_cbdata *cbd = ud; + ucl_object_t *obj, *top; + struct rspamd_symcache_item *item = v, *parent; + const gchar *symbol = k; + + top = cbd->top; + + obj = ucl_object_typed_new (UCL_OBJECT); + ucl_object_insert_key (obj, ucl_object_fromstring (symbol ? symbol : "unknown"), + "symbol", 0, false); + + if (item->is_virtual) { + if (!(item->type & SYMBOL_TYPE_GHOST)) { + parent = g_ptr_array_index (cbd->cache->items_by_id, + item->specific.virtual.parent); + ucl_object_insert_key (obj, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->weight)), + "weight", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromdouble (ROUND_DOUBLE (parent->st->avg_frequency)), + "frequency", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromint (parent->st->total_hits), + "hits", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromdouble (ROUND_DOUBLE (parent->st->avg_time)), + "time", 0, false); + } + else { + ucl_object_insert_key (obj, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->weight)), + "weight", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromdouble (0.0), + "frequency", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromdouble (0.0), + "hits", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromdouble (0.0), + "time", 0, false); + } + } + else { + ucl_object_insert_key (obj, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->weight)), + "weight", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->avg_frequency)), + "frequency", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromint (item->st->total_hits), + "hits", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->avg_time)), + "time", 0, false); + } + + ucl_array_append (top, obj); +} + +#undef ROUND_DOUBLE + +ucl_object_t * +rspamd_symcache_counters (struct rspamd_symcache *cache) +{ + ucl_object_t *top; + struct counters_cbdata cbd; + + g_assert (cache != NULL); + top = ucl_object_typed_new (UCL_ARRAY); + cbd.top = top; + cbd.cache = cache; + g_hash_table_foreach (cache->items_by_symbol, + rspamd_symcache_counters_cb, &cbd); + + return top; +} + +static void +rspamd_symcache_call_peak_cb (struct ev_loop *ev_base, + struct rspamd_symcache *cache, + struct rspamd_symcache_item *item, + gdouble cur_value, + gdouble cur_err) +{ + lua_State *L = cache->cfg->lua_state; + struct ev_loop **pbase; + + lua_rawgeti (L, LUA_REGISTRYINDEX, cache->peak_cb); + pbase = lua_newuserdata (L, sizeof (*pbase)); + *pbase = ev_base; + rspamd_lua_setclass (L, "rspamd{ev_base}", -1); + lua_pushstring (L, item->symbol); + lua_pushnumber (L, item->st->avg_frequency); + lua_pushnumber (L, sqrt (item->st->stddev_frequency)); + lua_pushnumber (L, cur_value); + lua_pushnumber (L, cur_err); + + if (lua_pcall (L, 6, 0, 0) != 0) { + msg_info_cache ("call to peak function for %s failed: %s", + item->symbol, lua_tostring (L, -1)); + lua_pop (L, 1); + } +} + +static void +rspamd_symcache_resort_cb (EV_P_ ev_timer *w, int revents) +{ + gdouble tm; + struct rspamd_cache_refresh_cbdata *cbdata = + (struct rspamd_cache_refresh_cbdata *)w->data; + struct rspamd_symcache *cache; + struct rspamd_symcache_item *item; + guint i; + gdouble cur_ticks; + static const double decay_rate = 0.25; + + cache = cbdata->cache; + /* Plan new event */ + tm = rspamd_time_jitter (cache->reload_time, 0); + cur_ticks = rspamd_get_ticks (FALSE); + msg_debug_cache ("resort symbols cache, next reload in %.2f seconds", tm); + g_assert (cache != NULL); + cbdata->resort_ev.repeat = tm; + ev_timer_again (EV_A_ w); + + if (rspamd_worker_is_primary_controller (cbdata->w)) { + /* Gather stats from shared execution times */ + for (i = 0; i < cache->filters->len; i ++) { + item = g_ptr_array_index (cache->filters, i); + item->st->total_hits += item->st->hits; + g_atomic_int_set (&item->st->hits, 0); + + if (item->last_count > 0 && cbdata->w->index == 0) { + /* Calculate frequency */ + gdouble cur_err, cur_value; + + cur_value = (item->st->total_hits - item->last_count) / + (cur_ticks - cbdata->last_resort); + rspamd_set_counter_ema (&item->st->frequency_counter, + cur_value, decay_rate); + item->st->avg_frequency = item->st->frequency_counter.mean; + item->st->stddev_frequency = item->st->frequency_counter.stddev; + + if (cur_value > 0) { + msg_debug_cache ("frequency for %s is %.2f, avg: %.2f", + item->symbol, cur_value, item->st->avg_frequency); + } + + cur_err = (item->st->avg_frequency - cur_value); + cur_err *= cur_err; + + /* + * TODO: replace magic number + */ + if (item->st->frequency_counter.number > 10 && + cur_err > sqrt (item->st->stddev_frequency) * 3) { + item->frequency_peaks ++; + msg_debug_cache ("peak found for %s is %.2f, avg: %.2f, " + "stddev: %.2f, error: %.2f, peaks: %d", + item->symbol, cur_value, + item->st->avg_frequency, + item->st->stddev_frequency, + cur_err, + item->frequency_peaks); + + if (cache->peak_cb != -1) { + rspamd_symcache_call_peak_cb (cbdata->event_loop, + cache, item, + cur_value, cur_err); + } + } + } + + item->last_count = item->st->total_hits; + + if (item->cd->number > 0) { + if (item->type & (SYMBOL_TYPE_CALLBACK|SYMBOL_TYPE_NORMAL)) { + item->st->avg_time = item->cd->mean; + rspamd_set_counter_ema (&item->st->time_counter, + item->st->avg_time, decay_rate); + item->st->avg_time = item->st->time_counter.mean; + memset (item->cd, 0, sizeof (*item->cd)); + } + } + } + + cbdata->last_resort = cur_ticks; + /* We don't do actual sorting due to topological guarantees */ + } +} + +static void +rspamd_symcache_refresh_dtor (void *d) +{ + struct rspamd_cache_refresh_cbdata *cbdata = + (struct rspamd_cache_refresh_cbdata *)d; + + ev_timer_stop (cbdata->event_loop, &cbdata->resort_ev); +} + +void +rspamd_symcache_start_refresh (struct rspamd_symcache *cache, + struct ev_loop *ev_base, struct rspamd_worker *w) +{ + gdouble tm; + struct rspamd_cache_refresh_cbdata *cbdata; + + cbdata = rspamd_mempool_alloc0 (cache->static_pool, sizeof (*cbdata)); + cbdata->last_resort = rspamd_get_ticks (TRUE); + cbdata->event_loop = ev_base; + cbdata->w = w; + cbdata->cache = cache; + tm = rspamd_time_jitter (cache->reload_time, 0); + msg_debug_cache ("next reload in %.2f seconds", tm); + g_assert (cache != NULL); + cbdata->resort_ev.data = cbdata; + ev_timer_init (&cbdata->resort_ev, rspamd_symcache_resort_cb, + tm, tm); + ev_timer_start (cbdata->event_loop, &cbdata->resort_ev); + rspamd_mempool_add_destructor (cache->static_pool, + rspamd_symcache_refresh_dtor, cbdata); +} + +void +rspamd_symcache_inc_frequency (struct rspamd_symcache *cache, + struct rspamd_symcache_item *item) +{ + if (item != NULL) { + g_atomic_int_inc (&item->st->hits); + } +} + +void +rspamd_symcache_add_dependency (struct rspamd_symcache *cache, + gint id_from, const gchar *to, + gint virtual_id_from) +{ + struct rspamd_symcache_item *source, *vsource; + struct cache_dependency *dep; + + g_assert (id_from >= 0 && id_from < (gint)cache->items_by_id->len); + + source = (struct rspamd_symcache_item *)g_ptr_array_index (cache->items_by_id, id_from); + dep = rspamd_mempool_alloc (cache->static_pool, sizeof (*dep)); + dep->id = id_from; + dep->sym = rspamd_mempool_strdup (cache->static_pool, to); + /* Will be filled later */ + dep->item = NULL; + dep->vid = -1; + g_ptr_array_add (source->deps, dep); + + if (virtual_id_from >= 0) { + g_assert (virtual_id_from < (gint)cache->virtual->len); + /* We need that for settings id propagation */ + vsource = (struct rspamd_symcache_item *) + g_ptr_array_index (cache->virtual, virtual_id_from); + dep = rspamd_mempool_alloc (cache->static_pool, sizeof (*dep)); + dep->vid = virtual_id_from; + dep->id = -1; + dep->sym = rspamd_mempool_strdup (cache->static_pool, to); + /* Will be filled later */ + dep->item = NULL; + g_ptr_array_add (vsource->deps, dep); + } +} + +void +rspamd_symcache_add_delayed_dependency (struct rspamd_symcache *cache, + const gchar *from, const gchar *to) +{ + struct delayed_cache_dependency *ddep; + + g_assert (from != NULL); + g_assert (to != NULL); + + ddep = g_malloc0 (sizeof (*ddep)); + ddep->from = g_strdup (from); + ddep->to = g_strdup (to); + + cache->delayed_deps = g_list_prepend (cache->delayed_deps, ddep); +} + +gint +rspamd_symcache_find_symbol (struct rspamd_symcache *cache, const gchar *name) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + + if (name == NULL) { + return -1; + } + + item = g_hash_table_lookup (cache->items_by_symbol, name); + + if (item != NULL) { + return item->id; + } + + return -1; +} + +gboolean +rspamd_symcache_stat_symbol (struct rspamd_symcache *cache, + const gchar *name, + gdouble *frequency, + gdouble *freq_stddev, + gdouble *tm, + guint *nhits) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + + if (name == NULL) { + return FALSE; + } + + item = g_hash_table_lookup (cache->items_by_symbol, name); + + if (item != NULL) { + *frequency = item->st->avg_frequency; + *freq_stddev = sqrt (item->st->stddev_frequency); + *tm = item->st->time_counter.mean; + + if (nhits) { + *nhits = item->st->hits; + } + + return TRUE; + } + + return FALSE; +} + +const gchar * +rspamd_symcache_symbol_by_id (struct rspamd_symcache *cache, + gint id) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + + if (id < 0 || id >= (gint)cache->items_by_id->len) { + return NULL; + } + + item = g_ptr_array_index (cache->items_by_id, id); + + return item->symbol; +} + +guint +rspamd_symcache_stats_symbols_count (struct rspamd_symcache *cache) +{ + g_assert (cache != NULL); + + return cache->stats_symbols_count; +} + + +void +rspamd_symcache_disable_all_symbols (struct rspamd_task *task, + struct rspamd_symcache *cache, + guint skip_mask) +{ + struct cache_savepoint *checkpoint; + guint i; + struct rspamd_symcache_item *item; + struct rspamd_symcache_dynamic_item *dyn_item; + + if (task->checkpoint == NULL) { + checkpoint = rspamd_symcache_make_checkpoint (task, cache); + task->checkpoint = checkpoint; + } + else { + checkpoint = task->checkpoint; + } + + /* Enable for squeezed symbols */ + PTR_ARRAY_FOREACH (cache->items_by_id, i, item) { + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + + if (!(item->type & (skip_mask))) { + SET_FINISH_BIT (checkpoint, dyn_item); + SET_START_BIT (checkpoint, dyn_item); + } + } +} + +static void +rspamd_symcache_disable_symbol_checkpoint (struct rspamd_task *task, + struct rspamd_symcache *cache, const gchar *symbol) +{ + struct cache_savepoint *checkpoint; + struct rspamd_symcache_item *item; + struct rspamd_symcache_dynamic_item *dyn_item; + + if (task->checkpoint == NULL) { + checkpoint = rspamd_symcache_make_checkpoint (task, cache); + task->checkpoint = checkpoint; + } + else { + checkpoint = task->checkpoint; + } + + item = rspamd_symcache_find_filter (cache, symbol, true); + + if (item) { + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + SET_FINISH_BIT (checkpoint, dyn_item); + SET_START_BIT (checkpoint, dyn_item); + msg_debug_cache_task ("disable execution of %s", symbol); + } + else { + msg_info_task ("cannot disable %s: not found", symbol); + } +} + +static void +rspamd_symcache_enable_symbol_checkpoint (struct rspamd_task *task, + struct rspamd_symcache *cache, const gchar *symbol) +{ + struct cache_savepoint *checkpoint; + struct rspamd_symcache_item *item; + struct rspamd_symcache_dynamic_item *dyn_item; + + if (task->checkpoint == NULL) { + checkpoint = rspamd_symcache_make_checkpoint (task, cache); + task->checkpoint = checkpoint; + } + else { + checkpoint = task->checkpoint; + } + + item = rspamd_symcache_find_filter (cache, symbol, true); + + if (item) { + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + dyn_item->finished = 0; + dyn_item->started = 0; + msg_debug_cache_task ("enable execution of %s", symbol); + } + else { + msg_info_task ("cannot enable %s: not found", symbol); + } +} + +struct rspamd_abstract_callback_data* +rspamd_symcache_get_cbdata (struct rspamd_symcache *cache, + const gchar *symbol) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + item = rspamd_symcache_find_filter (cache, symbol, true); + + if (item) { + return item->specific.normal.user_data; + } + + return NULL; +} + +gboolean +rspamd_symcache_is_checked (struct rspamd_task *task, + struct rspamd_symcache *cache, const gchar *symbol) +{ + struct cache_savepoint *checkpoint; + struct rspamd_symcache_item *item; + struct rspamd_symcache_dynamic_item *dyn_item; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + if (task->checkpoint == NULL) { + checkpoint = rspamd_symcache_make_checkpoint (task, cache); + task->checkpoint = checkpoint; + } + else { + checkpoint = task->checkpoint; + } + + item = rspamd_symcache_find_filter (cache, symbol, true); + + if (item) { + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + return dyn_item->started; + } + + return FALSE; +} + +void +rspamd_symcache_disable_symbol_perm (struct rspamd_symcache *cache, + const gchar *symbol, + gboolean resolve_parent) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + item = rspamd_symcache_find_filter (cache, symbol, resolve_parent); + + if (item) { + item->enabled = FALSE; + } +} + +void +rspamd_symcache_enable_symbol_perm (struct rspamd_symcache *cache, + const gchar *symbol) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + item = rspamd_symcache_find_filter (cache, symbol, true); + + if (item) { + item->enabled = TRUE; + } +} + +guint64 +rspamd_symcache_get_cksum (struct rspamd_symcache *cache) +{ + g_assert (cache != NULL); + + return cache->cksum; +} + + +gboolean +rspamd_symcache_is_symbol_enabled (struct rspamd_task *task, + struct rspamd_symcache *cache, + const gchar *symbol) +{ + struct cache_savepoint *checkpoint; + struct rspamd_symcache_item *item; + struct rspamd_symcache_dynamic_item *dyn_item; + lua_State *L; + struct rspamd_task **ptask; + gboolean ret = TRUE; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + checkpoint = task->checkpoint; + + + if (checkpoint) { + item = rspamd_symcache_find_filter (cache, symbol, true); + + if (item) { + + if (!rspamd_symcache_is_item_allowed (task, item, TRUE)) { + ret = FALSE; + } + else { + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + if (CHECK_START_BIT (checkpoint, dyn_item)) { + ret = FALSE; + } + else { + if (item->specific.normal.conditions) { + struct rspamd_symcache_condition *cur_cond; + + DL_FOREACH (item->specific.normal.conditions, cur_cond) { + /* + * We also executes condition callback to check + * if we need this symbol + */ + L = task->cfg->lua_state; + lua_rawgeti (L, LUA_REGISTRYINDEX, cur_cond->cb); + ptask = lua_newuserdata (L, sizeof (struct rspamd_task *)); + rspamd_lua_setclass (L, "rspamd{task}", -1); + *ptask = task; + + if (lua_pcall (L, 1, 1, 0) != 0) { + msg_info_task ("call to condition for %s failed: %s", + item->symbol, lua_tostring (L, -1)); + lua_pop (L, 1); + } + else { + ret = lua_toboolean (L, -1); + lua_pop (L, 1); + } + + if (!ret) { + break; + } + } + } + } + } + } + } + + return ret; +} + + +gboolean +rspamd_symcache_enable_symbol (struct rspamd_task *task, + struct rspamd_symcache *cache, + const gchar *symbol) +{ + struct cache_savepoint *checkpoint; + struct rspamd_symcache_item *item; + struct rspamd_symcache_dynamic_item *dyn_item; + gboolean ret = FALSE; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + checkpoint = task->checkpoint; + + if (checkpoint) { + item = rspamd_symcache_find_filter (cache, symbol, true); + + if (item) { + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + + if (!CHECK_FINISH_BIT (checkpoint, dyn_item)) { + ret = TRUE; + CLR_START_BIT (checkpoint, dyn_item); + CLR_FINISH_BIT (checkpoint, dyn_item); + } + else { + msg_debug_task ("cannot enable symbol %s: already started", symbol); + } + } + } + + return ret; +} + + +gboolean +rspamd_symcache_disable_symbol (struct rspamd_task *task, + struct rspamd_symcache *cache, + const gchar *symbol) +{ + struct cache_savepoint *checkpoint; + struct rspamd_symcache_item *item; + struct rspamd_symcache_dynamic_item *dyn_item; + gboolean ret = FALSE; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + checkpoint = task->checkpoint; + + if (checkpoint) { + item = rspamd_symcache_find_filter (cache, symbol, true); + + if (item) { + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + + if (!CHECK_START_BIT (checkpoint, dyn_item)) { + ret = TRUE; + SET_START_BIT (checkpoint, dyn_item); + SET_FINISH_BIT (checkpoint, dyn_item); + } + else { + if (!CHECK_FINISH_BIT (checkpoint, dyn_item)) { + msg_warn_task ("cannot disable symbol %s: already started", + symbol); + } + } + } + } + + return ret; +} + +void +rspamd_symcache_foreach (struct rspamd_symcache *cache, + void (*func) (struct rspamd_symcache_item *, gpointer), + gpointer ud) +{ + struct rspamd_symcache_item *item; + GHashTableIter it; + gpointer k, v; + + g_hash_table_iter_init (&it, cache->items_by_symbol); + + while (g_hash_table_iter_next (&it, &k, &v)) { + item = (struct rspamd_symcache_item *)v; + func (item, ud); + } +} + +struct rspamd_symcache_item * +rspamd_symcache_get_cur_item (struct rspamd_task *task) +{ + struct cache_savepoint *checkpoint = task->checkpoint; + + if (checkpoint == NULL) { + return NULL; + } + + return checkpoint->cur_item; +} + +/** + * Replaces the current item being processed. + * Returns the current item being processed (if any) + * @param task + * @param item + * @return + */ +struct rspamd_symcache_item * +rspamd_symcache_set_cur_item (struct rspamd_task *task, + struct rspamd_symcache_item *item) +{ + struct cache_savepoint *checkpoint = task->checkpoint; + struct rspamd_symcache_item *ex; + + ex = checkpoint->cur_item; + checkpoint->cur_item = item; + + return ex; +} + +struct rspamd_symcache_delayed_cbdata { + struct rspamd_symcache_item *item; + struct rspamd_task *task; + struct rspamd_async_event *event; + struct ev_timer tm; +}; + +static void +rspamd_symcache_delayed_item_fin (gpointer ud) +{ + struct rspamd_symcache_delayed_cbdata *cbd = + (struct rspamd_symcache_delayed_cbdata *)ud; + struct rspamd_task *task; + struct cache_savepoint *checkpoint; + + task = cbd->task; + checkpoint = task->checkpoint; + checkpoint->has_slow = FALSE; + ev_timer_stop (task->event_loop, &cbd->tm); +} + +static void +rspamd_symcache_delayed_item_cb (EV_P_ ev_timer *w, int what) +{ + struct rspamd_symcache_delayed_cbdata *cbd = + (struct rspamd_symcache_delayed_cbdata *)w->data; + struct rspamd_symcache_item *item; + struct rspamd_task *task; + struct cache_dependency *rdep; + struct cache_savepoint *checkpoint; + struct rspamd_symcache_dynamic_item *dyn_item; + guint i; + + item = cbd->item; + task = cbd->task; + checkpoint = task->checkpoint; + cbd->event = NULL; + + /* Timer will be stopped here */ + rspamd_session_remove_event (task->s, + rspamd_symcache_delayed_item_fin, cbd); + + /* Process all reverse dependencies */ + PTR_ARRAY_FOREACH (item->rdeps, i, rdep) { + if (rdep->item) { + dyn_item = rspamd_symcache_get_dynamic (checkpoint, rdep->item); + if (!CHECK_START_BIT (checkpoint, dyn_item)) { + msg_debug_cache_task ("check item %d(%s) rdep of %s ", + rdep->item->id, rdep->item->symbol, item->symbol); + + if (!rspamd_symcache_check_deps (task, task->cfg->cache, + rdep->item, + checkpoint, 0, FALSE)) { + msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s " + "unless deps are resolved", + rdep->item->id, rdep->item->symbol, item->symbol); + } + else { + rspamd_symcache_check_symbol (task, task->cfg->cache, + rdep->item, + checkpoint); + } + } + } + } +} + +static void +rspamd_delayed_timer_dtor (gpointer d) +{ + struct rspamd_symcache_delayed_cbdata *cbd = + (struct rspamd_symcache_delayed_cbdata *)d; + + if (cbd->event) { + /* Event has not been executed */ + rspamd_session_remove_event (cbd->task->s, + rspamd_symcache_delayed_item_fin, cbd); + cbd->event = NULL; + } +} + +/** + * Finalize the current async element potentially calling its deps + */ +void +rspamd_symcache_finalize_item (struct rspamd_task *task, + struct rspamd_symcache_item *item) +{ + struct cache_savepoint *checkpoint = task->checkpoint; + struct cache_dependency *rdep; + struct rspamd_symcache_dynamic_item *dyn_item; + gdouble diff; + guint i; + gboolean enable_slow_timer = FALSE; + const gdouble slow_diff_limit = 300; + + /* Sanity checks */ + g_assert (checkpoint->items_inflight > 0); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + + if (dyn_item->async_events > 0) { + /* + * XXX: Race condition + * + * It is possible that some async event is still in flight, but we + * already know its result, however, it is the responsibility of that + * event to decrease async events count and call this function + * one more time + */ + msg_debug_cache_task ("postpone finalisation of %s(%d) as there are %d " + "async events pending", + item->symbol, item->id, dyn_item->async_events); + + return; + } + + msg_debug_cache_task ("process finalize for item %s(%d)", item->symbol, item->id); + SET_FINISH_BIT (checkpoint, dyn_item); + checkpoint->items_inflight --; + checkpoint->cur_item = NULL; + + if (checkpoint->profile) { + ev_now_update_if_cheap (task->event_loop); + diff = ((ev_now (task->event_loop) - checkpoint->profile_start) * 1e3 - + dyn_item->start_msec); + + if (diff > slow_diff_limit) { + + if (!checkpoint->has_slow) { + checkpoint->has_slow = TRUE; + enable_slow_timer = TRUE; + msg_info_task ("slow rule: %s(%d): %.2f ms; enable slow timer delay", + item->symbol, item->id, + diff); + } + else { + msg_info_task ("slow rule: %s(%d): %.2f ms", + item->symbol, item->id, + diff); + } + } + + if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) { + rspamd_task_profile_set (task, item->symbol, diff); + } + + if (rspamd_worker_is_scanner (task->worker)) { + rspamd_set_counter (item->cd, diff); + } + } + + if (enable_slow_timer) { + struct rspamd_symcache_delayed_cbdata *cbd = + rspamd_mempool_alloc (task->task_pool,sizeof (*cbd)); + /* Add timer to allow something else to be executed */ + ev_timer *tm = &cbd->tm; + + cbd->event = rspamd_session_add_event (task->s, + rspamd_symcache_delayed_item_fin, cbd, + "symcache"); + + /* + * If no event could be added, then we are already in the destruction + * phase. So the main issue is to deal with has slow here + */ + if (cbd->event) { + ev_timer_init (tm, rspamd_symcache_delayed_item_cb, 0.1, 0.0); + ev_set_priority (tm, EV_MINPRI); + rspamd_mempool_add_destructor (task->task_pool, + rspamd_delayed_timer_dtor, cbd); + + cbd->task = task; + cbd->item = item; + tm->data = cbd; + ev_timer_start (task->event_loop, tm); + } + else { + /* Just reset as no timer is added */ + checkpoint->has_slow = FALSE; + } + + return; + } + + /* Process all reverse dependencies */ + PTR_ARRAY_FOREACH (item->rdeps, i, rdep) { + if (rdep->item) { + dyn_item = rspamd_symcache_get_dynamic (checkpoint, rdep->item); + if (!CHECK_START_BIT (checkpoint, dyn_item)) { + msg_debug_cache_task ("check item %d(%s) rdep of %s ", + rdep->item->id, rdep->item->symbol, item->symbol); + + if (!rspamd_symcache_check_deps (task, task->cfg->cache, + rdep->item, + checkpoint, 0, FALSE)) { + msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s " + "unless deps are resolved", + rdep->item->id, rdep->item->symbol, item->symbol); + } + else { + rspamd_symcache_check_symbol (task, task->cfg->cache, + rdep->item, + checkpoint); + } + } + } + } +} + +guint +rspamd_symcache_item_async_inc_full (struct rspamd_task *task, + struct rspamd_symcache_item *item, + const gchar *subsystem, + const gchar *loc) +{ + struct rspamd_symcache_dynamic_item *dyn_item; + struct cache_savepoint *checkpoint = task->checkpoint; + + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + msg_debug_cache_task ("increase async events counter for %s(%d) = %d + 1; " + "subsystem %s (%s)", + item->symbol, item->id, dyn_item->async_events, subsystem, loc); + return ++dyn_item->async_events; +} + +guint +rspamd_symcache_item_async_dec_full (struct rspamd_task *task, + struct rspamd_symcache_item *item, + const gchar *subsystem, + const gchar *loc) +{ + struct rspamd_symcache_dynamic_item *dyn_item; + struct cache_savepoint *checkpoint = task->checkpoint; + + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + msg_debug_cache_task ("decrease async events counter for %s(%d) = %d - 1; " + "subsystem %s (%s)", + item->symbol, item->id, dyn_item->async_events, subsystem, loc); + g_assert (dyn_item->async_events > 0); + + return --dyn_item->async_events; +} + +gboolean +rspamd_symcache_item_async_dec_check_full (struct rspamd_task *task, + struct rspamd_symcache_item *item, + const gchar *subsystem, + const gchar *loc) +{ + if (rspamd_symcache_item_async_dec_full (task, item, subsystem, loc) == 0) { + rspamd_symcache_finalize_item (task, item); + + return TRUE; + } + + return FALSE; +} + +gboolean +rspamd_symcache_add_symbol_flags (struct rspamd_symcache *cache, + const gchar *symbol, + guint flags) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + item = rspamd_symcache_find_filter (cache, symbol, true); + + if (item) { + item->type |= flags; + + return TRUE; + } + + return FALSE; +} + +gboolean +rspamd_symcache_set_symbol_flags (struct rspamd_symcache *cache, + const gchar *symbol, + guint flags) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + item = rspamd_symcache_find_filter (cache, symbol, true); + + if (item) { + item->type = flags; + + return TRUE; + } + + return FALSE; +} + +void +rspamd_symcache_get_symbol_details(struct rspamd_symcache *cache, + const gchar *symbol, + ucl_object_t *this_sym_ucl) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + item = rspamd_symcache_find_filter (cache, symbol, false); + + if (item) { + ucl_object_insert_key (this_sym_ucl, + ucl_object_fromstring(item->type_descr), + "type", strlen("type"), false); + + // any other data? + } +} + +guint +rspamd_symcache_get_symbol_flags (struct rspamd_symcache *cache, + const gchar *symbol) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + item = rspamd_symcache_find_filter (cache, symbol, true); + + if (item) { + return item->type; + } + + return 0; +} + +void +rspamd_symcache_composites_foreach (struct rspamd_task *task, + struct rspamd_symcache *cache, + GHFunc func, + gpointer fd) +{ + guint i; + struct rspamd_symcache_item *item; + struct rspamd_symcache_dynamic_item *dyn_item; + + if (task->checkpoint == NULL) { + return; + } + + PTR_ARRAY_FOREACH (cache->composites, i, item) { + dyn_item = rspamd_symcache_get_dynamic (task->checkpoint, item); + + if (!CHECK_START_BIT (task->checkpoint, dyn_item)) { + /* Cannot do it due to 2 passes */ + /* SET_START_BIT (task->checkpoint, dyn_item); */ + func (item->symbol, item->specific.normal.user_data, fd); + SET_FINISH_BIT (task->checkpoint, dyn_item); + } + } +} + +bool +rspamd_symcache_set_allowed_settings_ids (struct rspamd_symcache *cache, + const gchar *symbol, + const guint32 *ids, + guint nids) +{ + struct rspamd_symcache_item *item; + + item = rspamd_symcache_find_filter (cache, symbol, false); + + if (item == NULL) { + return false; + } + + if (nids <= G_N_ELEMENTS (item->allowed_ids.st)) { + /* Use static version */ + memset (&item->allowed_ids, 0, sizeof (item->allowed_ids)); + for (guint i = 0; i < nids; i++) { + item->allowed_ids.st[i] = ids[i]; + } + } + else { + /* Need to use a separate list */ + item->allowed_ids.dyn.e = -1; /* Flag */ + item->allowed_ids.dyn.n = rspamd_mempool_alloc (cache->static_pool, + sizeof (guint32) * nids); + item->allowed_ids.dyn.len = nids; + item->allowed_ids.dyn.allocated = nids; + + for (guint i = 0; i < nids; i++) { + item->allowed_ids.dyn.n[i] = ids[i]; + } + + /* Keep sorted */ + qsort (item->allowed_ids.dyn.n, nids, sizeof (guint32), rspamd_id_cmp); + } + + return true; +} + +bool +rspamd_symcache_set_forbidden_settings_ids (struct rspamd_symcache *cache, + const gchar *symbol, + const guint32 *ids, + guint nids) +{ + struct rspamd_symcache_item *item; + + item = rspamd_symcache_find_filter (cache, symbol, false); + + if (item == NULL) { + return false; + } + + g_assert (nids < G_MAXUINT16); + + if (nids <= G_N_ELEMENTS (item->forbidden_ids.st)) { + /* Use static version */ + memset (&item->forbidden_ids, 0, sizeof (item->forbidden_ids)); + for (guint i = 0; i < nids; i++) { + item->forbidden_ids.st[i] = ids[i]; + } + } + else { + /* Need to use a separate list */ + item->forbidden_ids.dyn.e = -1; /* Flag */ + item->forbidden_ids.dyn.n = rspamd_mempool_alloc (cache->static_pool, + sizeof (guint32) * nids); + item->forbidden_ids.dyn.len = nids; + item->forbidden_ids.dyn.allocated = nids; + + for (guint i = 0; i < nids; i++) { + item->forbidden_ids.dyn.n[i] = ids[i]; + } + + /* Keep sorted */ + qsort (item->forbidden_ids.dyn.n, nids, sizeof (guint32), rspamd_id_cmp); + } + + return true; +} + +const guint32* +rspamd_symcache_get_allowed_settings_ids (struct rspamd_symcache *cache, + const gchar *symbol, + guint *nids) +{ + struct rspamd_symcache_item *item; + guint cnt = 0; + + item = rspamd_symcache_find_filter (cache, symbol, false); + + if (item == NULL) { + return NULL; + } + + if (item->allowed_ids.dyn.e == -1) { + /* Dynamic list */ + *nids = item->allowed_ids.dyn.len; + + return item->allowed_ids.dyn.n; + } + else { + while (item->allowed_ids.st[cnt] != 0 && cnt < G_N_ELEMENTS (item->allowed_ids.st)) { + cnt ++; + } + + *nids = cnt; + + return item->allowed_ids.st; + } +} + +const guint32* +rspamd_symcache_get_forbidden_settings_ids (struct rspamd_symcache *cache, + const gchar *symbol, + guint *nids) +{ + struct rspamd_symcache_item *item; + guint cnt = 0; + + item = rspamd_symcache_find_filter (cache, symbol, false); + + if (item == NULL) { + return NULL; + } + + if (item->forbidden_ids.dyn.e == -1) { + /* Dynamic list */ + *nids = item->allowed_ids.dyn.len; + + return item->allowed_ids.dyn.n; + } + else { + while (item->forbidden_ids.st[cnt] != 0 && cnt < G_N_ELEMENTS (item->allowed_ids.st)) { + cnt ++; + } + + *nids = cnt; + + return item->forbidden_ids.st; + } +} + +/* Insertion sort: usable for near-sorted ids list */ +static inline void +rspamd_ids_insertion_sort (guint *a, guint n) +{ + for (guint i = 1; i < n; i++) { + guint32 tmp = a[i]; + guint j = i; + + while (j > 0 && tmp < a[j - 1]) { + a[j] = a[j - 1]; + j --; + } + + a[j] = tmp; + } +} + +static inline void +rspamd_symcache_add_id_to_list (rspamd_mempool_t *pool, + struct rspamd_symcache_id_list *ls, + guint32 id) +{ + guint cnt = 0; + guint *new_array; + + if (ls->st[0] == -1) { + /* Dynamic array */ + if (ls->dyn.len < ls->dyn.allocated) { + /* Trivial, append + sort */ + ls->dyn.n[ls->dyn.len++] = id; + } + else { + /* Reallocate */ + g_assert (ls->dyn.allocated <= G_MAXINT16); + ls->dyn.allocated *= 2; + + new_array = rspamd_mempool_alloc (pool, + ls->dyn.allocated * sizeof (guint32)); + memcpy (new_array, ls->dyn.n, ls->dyn.len * sizeof (guint32)); + ls->dyn.n = new_array; + ls->dyn.n[ls->dyn.len++] = id; + } + + rspamd_ids_insertion_sort (ls->dyn.n, ls->dyn.len); + } + else { + /* Static part */ + while (ls->st[cnt] != 0 && cnt < G_N_ELEMENTS (ls->st)) { + cnt ++; + } + + if (cnt < G_N_ELEMENTS (ls->st)) { + ls->st[cnt] = id; + } + else { + /* Switch to dynamic */ + new_array = rspamd_mempool_alloc (pool, + G_N_ELEMENTS (ls->st) * 2 * sizeof (guint32)); + memcpy (new_array, ls->st, G_N_ELEMENTS (ls->st) * sizeof (guint32)); + ls->dyn.n = new_array; + ls->dyn.e = -1; + ls->dyn.allocated = G_N_ELEMENTS (ls->st) * 2; + ls->dyn.len = G_N_ELEMENTS (ls->st); + + /* Recursively jump to dynamic branch that will handle insertion + sorting */ + rspamd_symcache_add_id_to_list (pool, ls, id); + } + } +} + +void +rspamd_symcache_process_settings_elt (struct rspamd_symcache *cache, + struct rspamd_config_settings_elt *elt) +{ + guint32 id = elt->id; + ucl_object_iter_t iter; + struct rspamd_symcache_item *item, *parent; + const ucl_object_t *cur; + + + if (elt->symbols_disabled) { + /* Process denied symbols */ + iter = NULL; + + while ((cur = ucl_object_iterate (elt->symbols_disabled, &iter, true)) != NULL) { + const gchar *sym = ucl_object_key (cur); + item = rspamd_symcache_find_filter (cache, sym, false); + + if (item) { + if (item->is_virtual) { + /* + * Virtual symbols are special: + * we ignore them in symcache but prevent them from being + * inserted. + */ + rspamd_symcache_add_id_to_list (cache->static_pool, + &item->forbidden_ids, id); + msg_debug_cache ("deny virtual symbol %s for settings %ud (%s); " + "parent can still be executed", + sym, id, elt->name); + } + else { + /* Normal symbol, disable it */ + rspamd_symcache_add_id_to_list (cache->static_pool, + &item->forbidden_ids, id); + msg_debug_cache ("deny symbol %s for settings %ud (%s)", + sym, id, elt->name); + } + } + else { + msg_warn_cache ("cannot find a symbol to disable %s " + "when processing settings %ud (%s)", + sym, id, elt->name); + } + } + } + + if (elt->symbols_enabled) { + iter = NULL; + + while ((cur = ucl_object_iterate (elt->symbols_enabled, &iter, true)) != NULL) { + /* Here, we resolve parent and explicitly allow it */ + const gchar *sym = ucl_object_key (cur); + item = rspamd_symcache_find_filter (cache, sym, false); + + if (item) { + if (item->is_virtual) { + if (!(item->type & SYMBOL_TYPE_GHOST)) { + parent = rspamd_symcache_find_filter (cache, sym, true); + + if (parent) { + if (elt->symbols_disabled && + ucl_object_lookup (elt->symbols_disabled, parent->symbol)) { + msg_err_cache ("conflict in %s: cannot enable disabled symbol %s, " + "wanted to enable symbol %s", + elt->name, parent->symbol, sym); + continue; + } + + rspamd_symcache_add_id_to_list (cache->static_pool, + &parent->exec_only_ids, id); + msg_debug_cache ("allow just execution of symbol %s for settings %ud (%s)", + parent->symbol, id, elt->name); + } + } + /* Ignore ghosts */ + } + + rspamd_symcache_add_id_to_list (cache->static_pool, + &item->allowed_ids, id); + msg_debug_cache ("allow execution of symbol %s for settings %ud (%s)", + sym, id, elt->name); + } + else { + msg_warn_cache ("cannot find a symbol to enable %s " + "when processing settings %ud (%s)", + sym, id, elt->name); + } + } + } +} + +gint +rspamd_symcache_item_flags (struct rspamd_symcache_item *item) +{ + if (item) { + return item->type; + } + + return 0; +} + +const gchar* +rspamd_symcache_item_name (struct rspamd_symcache_item *item) +{ + return item ? item->symbol : NULL; +} + +const struct rspamd_symcache_item_stat * +rspamd_symcache_item_stat (struct rspamd_symcache_item *item) +{ + return item ? item->st : NULL; +} + +gboolean +rspamd_symcache_item_is_enabled (struct rspamd_symcache_item *item) +{ + if (item) { + if (!item->enabled) { + return FALSE; + } + + if (item->is_virtual && item->specific.virtual.parent_item != NULL) { + return rspamd_symcache_item_is_enabled (item->specific.virtual.parent_item); + } + + return TRUE; + } + + return FALSE; +} + +struct rspamd_symcache_item * rspamd_symcache_item_get_parent ( + struct rspamd_symcache_item *item) +{ + if (item && item->is_virtual && item->specific.virtual.parent_item != NULL) { + return item->specific.virtual.parent_item; + } + + return NULL; +} + +const GPtrArray* +rspamd_symcache_item_get_deps (struct rspamd_symcache_item *item) +{ + struct rspamd_symcache_item *parent; + + if (item) { + parent = rspamd_symcache_item_get_parent (item); + + if (parent) { + item = parent; + } + + return item->deps; + } + + return NULL; +} + +const GPtrArray* +rspamd_symcache_item_get_rdeps (struct rspamd_symcache_item *item) +{ + struct rspamd_symcache_item *parent; + + if (item) { + parent = rspamd_symcache_item_get_parent (item); + + if (parent) { + item = parent; + } + + return item->rdeps; + } + + return NULL; +} + +void +rspamd_symcache_enable_profile (struct rspamd_task *task) +{ + struct cache_savepoint *checkpoint = task->checkpoint; + + if (checkpoint && !checkpoint->profile) { + ev_now_update_if_cheap (task->event_loop); + ev_tstamp now = ev_now (task->event_loop); + checkpoint->profile_start = now; + + msg_debug_cache_task ("enable profiling of symbols for task"); + checkpoint->profile = TRUE; + } +} |