diff options
Diffstat (limited to 'src/libserver/rspamd_symcache.c')
-rw-r--r-- | src/libserver/rspamd_symcache.c | 2748 |
1 files changed, 2748 insertions, 0 deletions
diff --git a/src/libserver/rspamd_symcache.c b/src/libserver/rspamd_symcache.c new file mode 100644 index 000000000..70aff0d1b --- /dev/null +++ b/src/libserver/rspamd_symcache.c @@ -0,0 +1,2748 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "util.h" +#include "rspamd.h" +#include "message.h" +#include "rspamd_symcache.h" +#include "cfg_file.h" +#include "lua/lua_common.h" +#include "unix-std.h" +#include "contrib/t1ha/t1ha.h" +#include "libserver/worker_util.h" +#include <math.h> + +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L +# include <stdalign.h> +#endif + +#define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ + cache->static_pool->tag.tagname, cache->cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ + cache->static_pool->tag.tagname, cache->cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_info_cache(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ + cache->static_pool->tag.tagname, cache->cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_cache(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_symcache_log_id, "symcache", cache->cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) + +INIT_LOG_MODULE(symcache) + +#define CHECK_START_BIT(checkpoint, item) \ + isset((checkpoint)->cur_bits->started, (item)->id) +#define SET_START_BIT(checkpoint, item) \ + setbit((checkpoint)->cur_bits->started, (item)->id ) + +#define CHECK_FINISH_BIT(checkpoint, item) \ + isset((checkpoint)->cur_bits->finished, (item)->id) +#define SET_FINISH_BIT(checkpoint, item) \ + setbit((checkpoint)->cur_bits->finished, (item)->id) + +static const guchar rspamd_symbols_cache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0 }; + +static gint rspamd_symbols_cache_find_filter (struct rspamd_symcache *cache, + const gchar *name); + +struct rspamd_symbols_cache_header { + guchar magic[8]; + guint nitems; + guchar checksum[64]; + guchar unused[128]; +}; + +struct symbols_cache_order { + GPtrArray *d; + guint id; + ref_entry_t ref; +}; + +struct rspamd_symcache { + /* Hash table for fast access */ + GHashTable *items_by_symbol; + GPtrArray *items_by_id; + struct symbols_cache_order *items_by_order; + GPtrArray *filters; + GPtrArray *prefilters; + GPtrArray *postfilters; + GPtrArray *composites; + GPtrArray *idempotent; + GPtrArray *virtual; + GPtrArray *squeezed; + GList *delayed_deps; + GList *delayed_conditions; + rspamd_mempool_t *static_pool; + guint64 cksum; + gdouble total_weight; + guint used_items; + guint stats_symbols_count; + guint64 total_hits; + guint id; + struct rspamd_config *cfg; + gdouble reload_time; + gint peak_cb; +}; + +struct item_stat { + struct rspamd_counter_data time_counter; + gdouble avg_time; + gdouble weight; + guint hits; + guint64 total_hits; + struct rspamd_counter_data frequency_counter; + gdouble avg_frequency; + gdouble stddev_frequency; +}; + +struct rspamd_symcache_dynamic_item { + guint16 start_msec; /* Relative to task time */ + unsigned started:1; + unsigned finished:1; + /* unsigned pad:14; */ + guint32 async_events; +}; + +struct rspamd_symcache_item { + /* This block is likely shared */ + struct item_stat *st; + + guint64 last_count; + struct rspamd_counter_data *cd; + gchar *symbol; + enum rspamd_symbol_type type; + + /* Callback data */ + union { + struct { + symbol_func_t func; + gpointer user_data; + gint condition_cb; + } normal; + struct { + gint parent; + } virtual; + } specific; + + /* Condition of execution */ + gboolean enabled; + /* Used for async stuff checks */ + gboolean is_filter; + gboolean is_virtual; + + /* Priority */ + gint priority; + /* Topological order */ + guint order; + gint id; + gint frequency_peaks; + + /* Dependencies */ + GPtrArray *deps; + GPtrArray *rdeps; +}; + +struct cache_dependency { + struct rspamd_symcache_item *item; + gchar *sym; + gint id; +}; + +struct delayed_cache_dependency { + gchar *from; + gchar *to; +}; + +struct delayed_cache_condition { + gchar *sym; + gint cbref; + lua_State *L; +}; + +enum rspamd_cache_savepoint_stage { + RSPAMD_CACHE_PASS_INIT = 0, + RSPAMD_CACHE_PASS_PREFILTERS, + RSPAMD_CACHE_PASS_FILTERS, + RSPAMD_CACHE_PASS_POSTFILTERS, + RSPAMD_CACHE_PASS_IDEMPOTENT, + RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT, + RSPAMD_CACHE_PASS_DONE, +}; + +struct cache_savepoint { + enum rspamd_cache_savepoint_stage pass; + guint version; + guint items_inflight; + + struct rspamd_metric_result *rs; + gdouble lim; + + struct rspamd_symcache_item *cur_item; + struct symbols_cache_order *order; + GArray *dynamic_items; +}; + +struct rspamd_cache_refresh_cbdata { + gdouble last_resort; + struct event resort_ev; + struct rspamd_symcache *cache; + struct rspamd_worker *w; + struct event_base *ev_base; +}; + +/* weight, frequency, time */ +#define TIME_ALPHA (1.0) +#define WEIGHT_ALPHA (0.1) +#define FREQ_ALPHA (0.01) +#define SCORE_FUN(w, f, t) (((w) > 0 ? (w) : WEIGHT_ALPHA) \ + * ((f) > 0 ? (f) : FREQ_ALPHA) \ + / (t > TIME_ALPHA ? t : TIME_ALPHA)) + +static gboolean rspamd_symbols_cache_check_symbol (struct rspamd_task *task, + struct rspamd_symcache *cache, + struct rspamd_symcache_item *item, + struct cache_savepoint *checkpoint); +static gboolean rspamd_symbols_cache_check_deps (struct rspamd_task *task, + struct rspamd_symcache *cache, + struct rspamd_symcache_item *item, + struct cache_savepoint *checkpoint, + guint recursion, + gboolean check_only); +static void rspamd_symbols_cache_disable_symbol_checkpoint (struct rspamd_task *task, + struct rspamd_symcache *cache, const gchar *symbol); +static void rspamd_symbols_cache_enable_symbol_checkpoint (struct rspamd_task *task, + struct rspamd_symcache *cache, const gchar *symbol); +static void rspamd_symbols_cache_disable_all_symbols (struct rspamd_task *task, + struct rspamd_symcache *cache); + +static void +rspamd_symbols_cache_order_dtor (gpointer p) +{ + struct symbols_cache_order *ord = p; + + g_ptr_array_free (ord->d, TRUE); + g_free (ord); +} + +static void +rspamd_symbols_cache_order_unref (gpointer p) +{ + struct symbols_cache_order *ord = p; + + REF_RELEASE (ord); +} + +static struct symbols_cache_order * +rspamd_symbols_cache_order_new (struct rspamd_symcache *cache, + gsize nelts) +{ + struct symbols_cache_order *ord; + + ord = g_malloc0 (sizeof (*ord)); + ord->d = g_ptr_array_sized_new (nelts); + ord->id = cache->id; + REF_INIT_RETAIN (ord, rspamd_symbols_cache_order_dtor); + + return ord; +} + +static inline struct rspamd_symcache_item* +rspamd_symcache_get_dynamic (struct cache_savepoint *checkpoint, + struct rspamd_symcache_item *item) +{ + return &g_array_index (checkpoint->dynamic_items, + struct rspamd_symcache_item, item->id); +} + +static gint +postfilters_cmp (const void *p1, const void *p2, gpointer ud) +{ + const struct rspamd_symcache_item *i1 = *(struct rspamd_symcache_item **)p1, + *i2 = *(struct rspamd_symcache_item **)p2; + double w1, w2; + + w1 = i1->priority; + w2 = i2->priority; + + if (w1 > w2) { + return 1; + } + else if (w1 < w2) { + return -1; + } + + return 0; +} + +static gint +prefilters_cmp (const void *p1, const void *p2, gpointer ud) +{ + const struct rspamd_symcache_item *i1 = *(struct rspamd_symcache_item **)p1, + *i2 = *(struct rspamd_symcache_item **)p2; + double w1, w2; + + w1 = i1->priority; + w2 = i2->priority; + + if (w1 < w2) { + return 1; + } + else if (w1 > w2) { + return -1; + } + + return 0; +} + +#define TSORT_MARK_PERM(it) (it)->order |= (1u << 31) +#define TSORT_MARK_TEMP(it) (it)->order |= (1u << 30) +#define TSORT_IS_MARKED_PERM(it) ((it)->order & (1u << 31)) +#define TSORT_IS_MARKED_TEMP(it) ((it)->order & (1u << 30)) +#define TSORT_UNMASK(it) ((it)->order & ~((1u << 31) | (1u << 30))) + +static gint +cache_logic_cmp (const void *p1, const void *p2, gpointer ud) +{ + const struct rspamd_symcache_item *i1 = *(struct rspamd_symcache_item **)p1, + *i2 = *(struct rspamd_symcache_item **)p2; + struct rspamd_symcache *cache = ud; + double w1, w2; + double weight1, weight2; + double f1 = 0, f2 = 0, t1, t2, avg_freq, avg_weight; + guint o1 = TSORT_UNMASK (i1), o2 = TSORT_UNMASK (i2); + + + if (o1 == o2) { + /* Heurstic */ + if (i1->priority == i2->priority) { + avg_freq = ((gdouble) cache->total_hits / cache->used_items); + avg_weight = (cache->total_weight / cache->used_items); + f1 = (double) i1->st->total_hits / avg_freq; + f2 = (double) i2->st->total_hits / avg_freq; + weight1 = fabs (i1->st->weight) / avg_weight; + weight2 = fabs (i2->st->weight) / avg_weight; + t1 = i1->st->avg_time; + t2 = i2->st->avg_time; + w1 = SCORE_FUN (weight1, f1, t1); + w2 = SCORE_FUN (weight2, f2, t2); + } else { + /* Strict sorting */ + w1 = abs (i1->priority); + w2 = abs (i2->priority); + } + } + else { + w1 = o1; + w2 = o2; + } + + if (w2 > w1) { + return 1; + } + else if (w2 < w1) { + return -1; + } + + return 0; +} + +static void +rspamd_symbols_cache_tsort_visit (struct rspamd_symcache *cache, + struct rspamd_symcache_item *it, + guint cur_order) +{ + struct cache_dependency *dep; + guint i; + + if (TSORT_IS_MARKED_PERM (it)) { + if (cur_order > TSORT_UNMASK (it)) { + /* Need to recalculate the whole chain */ + it->order = cur_order; /* That also removes all masking */ + } + else { + /* We are fine, stop DFS */ + return; + } + } + else if (TSORT_IS_MARKED_TEMP (it)) { + msg_err_cache ("cyclic dependencies found when checking '%s'!", + it->symbol); + return; + } + + TSORT_MARK_TEMP (it); + msg_debug_cache ("visiting node: %s (%d)", it->symbol, cur_order); + + PTR_ARRAY_FOREACH (it->deps, i, dep) { + msg_debug_cache ("visiting dep: %s (%d)", dep->item->symbol, cur_order + 1); + rspamd_symbols_cache_tsort_visit (cache, dep->item, cur_order + 1); + } + + it->order = cur_order; + + TSORT_MARK_PERM (it); +} + +static void +rspamd_symbols_cache_resort (struct rspamd_symcache *cache) +{ + struct symbols_cache_order *ord; + guint i; + guint64 total_hits = 0; + struct rspamd_symcache_item *it; + + ord = rspamd_symbols_cache_order_new (cache, cache->filters->len); + + for (i = 0; i < cache->filters->len; i ++) { + it = g_ptr_array_index (cache->filters, i); + total_hits += it->st->total_hits; + it->order = 0; + g_ptr_array_add (ord->d, it); + } + + /* Topological sort, intended to be O(N) but my implementation + * is not linear (semi-linear usually) as I want to make it as + * simple as possible. + * On each stage it does DFS for unseen nodes. In theory, that + * can be more complicated than linear - O(N^2) for specially + * crafted data. But I don't care. + */ + PTR_ARRAY_FOREACH (ord->d, i, it) { + if (it->order == 0) { + rspamd_symbols_cache_tsort_visit (cache, it, 1); + } + } + + /* + * Now we have all sorted and can do some heuristical sort, keeping + * topological order invariant + */ + g_ptr_array_sort_with_data (ord->d, cache_logic_cmp, cache); + cache->total_hits = total_hits; + + if (cache->items_by_order) { + REF_RELEASE (cache->items_by_order); + } + + cache->items_by_order = ord; +} + +/* Sort items in logical order */ +static void +rspamd_symbols_cache_post_init (struct rspamd_symcache *cache) +{ + struct rspamd_symcache_item *it, *dit; + struct cache_dependency *dep, *rdep; + struct delayed_cache_dependency *ddep; + struct delayed_cache_condition *dcond; + GList *cur; + gint i, j; + gint id; + + cur = cache->delayed_deps; + while (cur) { + ddep = cur->data; + + id = rspamd_symbols_cache_find_filter (cache, ddep->from); + + if (id != -1) { + it = g_ptr_array_index (cache->filters, id); + } + else { + it = NULL; + } + + if (it == NULL) { + msg_err_cache ("cannot register delayed dependency between %s and %s, " + "%s is missing", ddep->from, ddep->to, ddep->from); + } + else { + msg_debug_cache ("delayed between %s(%d) -> %s", ddep->from, + it->id, ddep->to); + rspamd_symcache_add_dependency (cache, it->id, ddep->to); + } + + cur = g_list_next (cur); + } + + cur = cache->delayed_conditions; + while (cur) { + dcond = cur->data; + + it = g_hash_table_lookup (cache->items_by_symbol, dcond->sym); + + if (it == NULL) { + msg_err_cache ( + "cannot register delayed condition for %s", + dcond->sym); + luaL_unref (dcond->L, LUA_REGISTRYINDEX, dcond->cbref); + } + else { + it->specific.normal.condition_cb = dcond->cbref; + } + + cur = g_list_next (cur); + } + + for (i = 0; i < cache->filters->len; i ++) { + it = g_ptr_array_index (cache->filters, i); + + for (j = 0; j < it->deps->len; j ++) { + dep = g_ptr_array_index (it->deps, j); + dit = g_hash_table_lookup (cache->items_by_symbol, dep->sym); + + if (dit != NULL) { + if (dit->is_virtual) { + dit = g_ptr_array_index (cache->filters, + dit->specific.virtual.parent); + } + + if (dit->id == i) { + msg_err_cache ("cannot add dependency on self: %s -> %s " + "(resolved to %s)", + it->symbol, dep->sym, dit->symbol); + } + else { + rdep = rspamd_mempool_alloc (cache->static_pool, + sizeof (*rdep)); + + rdep->sym = dep->sym; + rdep->item = it; + rdep->id = i; + g_ptr_array_add (dit->rdeps, rdep); + dep->item = dit; + dep->id = dit->id; + + msg_debug_cache ("add dependency from %d on %d", it->id, + dit->id); + } + } + else { + msg_err_cache ("cannot find dependency on symbol %s", dep->sym); + } + } + + /* Reversed loop to make removal safe */ + for (j = it->deps->len - 1; j >= 0; j --) { + dep = g_ptr_array_index (it->deps, j); + + if (dep->item == NULL) { + /* Remove useless dep */ + g_ptr_array_remove_index (it->deps, j); + } + } + } + + g_ptr_array_sort_with_data (cache->prefilters, prefilters_cmp, cache); + g_ptr_array_sort_with_data (cache->postfilters, postfilters_cmp, cache); + g_ptr_array_sort_with_data (cache->idempotent, postfilters_cmp, cache); + + rspamd_symbols_cache_resort (cache); +} + +static gboolean +rspamd_symbols_cache_load_items (struct rspamd_symcache *cache, const gchar *name) +{ + struct rspamd_symbols_cache_header *hdr; + struct stat st; + struct ucl_parser *parser; + ucl_object_t *top; + const ucl_object_t *cur, *elt; + ucl_object_iter_t it; + struct rspamd_symcache_item *item, *parent; + const guchar *p; + gint fd; + gpointer map; + + fd = open (name, O_RDONLY); + + if (fd == -1) { + msg_info_cache ("cannot open file %s, error %d, %s", name, + errno, strerror (errno)); + return FALSE; + } + + rspamd_file_lock (fd, FALSE); + + if (fstat (fd, &st) == -1) { + rspamd_file_unlock (fd, FALSE); + close (fd); + msg_info_cache ("cannot stat file %s, error %d, %s", name, + errno, strerror (errno)); + return FALSE; + } + + if (st.st_size < (gint)sizeof (*hdr)) { + rspamd_file_unlock (fd, FALSE); + close (fd); + errno = EINVAL; + msg_info_cache ("cannot use file %s, error %d, %s", name, + errno, strerror (errno)); + return FALSE; + } + + map = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, fd, 0); + + if (map == MAP_FAILED) { + rspamd_file_unlock (fd, FALSE); + close (fd); + msg_info_cache ("cannot mmap file %s, error %d, %s", name, + errno, strerror (errno)); + return FALSE; + } + + hdr = map; + + if (memcmp (hdr->magic, rspamd_symbols_cache_magic, + sizeof (rspamd_symbols_cache_magic)) != 0) { + msg_info_cache ("cannot use file %s, bad magic", name); + munmap (map, st.st_size); + rspamd_file_unlock (fd, FALSE); + close (fd); + + return FALSE; + } + + parser = ucl_parser_new (0); + p = (const guchar *)(hdr + 1); + + if (!ucl_parser_add_chunk (parser, p, st.st_size - sizeof (*hdr))) { + msg_info_cache ("cannot use file %s, cannot parse: %s", name, + ucl_parser_get_error (parser)); + munmap (map, st.st_size); + ucl_parser_free (parser); + rspamd_file_unlock (fd, FALSE); + close (fd); + + return FALSE; + } + + top = ucl_parser_get_object (parser); + munmap (map, st.st_size); + rspamd_file_unlock (fd, FALSE); + close (fd); + ucl_parser_free (parser); + + if (top == NULL || ucl_object_type (top) != UCL_OBJECT) { + msg_info_cache ("cannot use file %s, bad object", name); + ucl_object_unref (top); + return FALSE; + } + + it = ucl_object_iterate_new (top); + + while ((cur = ucl_object_iterate_safe (it, true))) { + item = g_hash_table_lookup (cache->items_by_symbol, ucl_object_key (cur)); + + if (item) { + /* Copy saved info */ + /* + * XXX: don't save or load weight, it should be obtained from the + * metric + */ +#if 0 + elt = ucl_object_lookup (cur, "weight"); + + if (elt) { + w = ucl_object_todouble (elt); + if (w != 0) { + item->weight = w; + } + } +#endif + elt = ucl_object_lookup (cur, "time"); + if (elt) { + item->st->avg_time = ucl_object_todouble (elt); + } + + elt = ucl_object_lookup (cur, "count"); + if (elt) { + item->st->total_hits = ucl_object_toint (elt); + item->last_count = item->st->total_hits; + } + + elt = ucl_object_lookup (cur, "frequency"); + if (elt && ucl_object_type (elt) == UCL_OBJECT) { + const ucl_object_t *cur; + + cur = ucl_object_lookup (elt, "avg"); + + if (cur) { + item->st->avg_frequency = ucl_object_todouble (cur); + } + cur = ucl_object_lookup (elt, "stddev"); + + if (cur) { + item->st->stddev_frequency = ucl_object_todouble (cur); + } + } + + if (item->is_virtual) { + g_assert (item->specific.virtual.parent < (gint)cache->filters->len); + parent = g_ptr_array_index (cache->filters, + item->specific.virtual.parent); + + if (parent->st->weight < item->st->weight) { + parent->st->weight = item->st->weight; + } + + /* + * We maintain avg_time for virtual symbols equal to the + * parent item avg_time + */ + item->st->avg_time = parent->st->avg_time; + } + + cache->total_weight += fabs (item->st->weight); + cache->total_hits += item->st->total_hits; + } + } + + ucl_object_iterate_free (it); + ucl_object_unref (top); + + return TRUE; +} + +#define ROUND_DOUBLE(x) (floor((x) * 100.0) / 100.0) + +static gboolean +rspamd_symbols_cache_save_items (struct rspamd_symcache *cache, const gchar *name) +{ + struct rspamd_symbols_cache_header hdr; + ucl_object_t *top, *elt, *freq; + GHashTableIter it; + struct rspamd_symcache_item *item; + struct ucl_emitter_functions *efunc; + gpointer k, v; + gint fd; + bool ret; + gchar path[PATH_MAX]; + + rspamd_snprintf (path, sizeof (path), "%s.new", name); + + for (;;) { + fd = open (path, O_CREAT | O_WRONLY | O_EXCL, 00644); + + if (fd == -1) { + if (errno == EEXIST) { + /* Some other process is already writing data, give up silently */ + return TRUE; + } + + msg_info_cache ("cannot open file %s, error %d, %s", path, + errno, strerror (errno)); + return FALSE; + } + + break; + } + + rspamd_file_lock (fd, FALSE); + + memset (&hdr, 0, sizeof (hdr)); + memcpy (hdr.magic, rspamd_symbols_cache_magic, + sizeof (rspamd_symbols_cache_magic)); + + if (write (fd, &hdr, sizeof (hdr)) == -1) { + msg_info_cache ("cannot write to file %s, error %d, %s", path, + errno, strerror (errno)); + rspamd_file_unlock (fd, FALSE); + close (fd); + + return FALSE; + } + + top = ucl_object_typed_new (UCL_OBJECT); + g_hash_table_iter_init (&it, cache->items_by_symbol); + + while (g_hash_table_iter_next (&it, &k, &v)) { + item = v; + elt = ucl_object_typed_new (UCL_OBJECT); + ucl_object_insert_key (elt, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->weight)), + "weight", 0, false); + ucl_object_insert_key (elt, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->time_counter.mean)), + "time", 0, false); + ucl_object_insert_key (elt, ucl_object_fromint (item->st->total_hits), + "count", 0, false); + + freq = ucl_object_typed_new (UCL_OBJECT); + ucl_object_insert_key (freq, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->frequency_counter.mean)), + "avg", 0, false); + ucl_object_insert_key (freq, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->frequency_counter.stddev)), + "stddev", 0, false); + ucl_object_insert_key (elt, freq, "frequency", 0, false); + + ucl_object_insert_key (top, elt, k, 0, false); + } + + efunc = ucl_object_emit_fd_funcs (fd); + ret = ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, efunc, NULL); + ucl_object_emit_funcs_free (efunc); + ucl_object_unref (top); + rspamd_file_unlock (fd, FALSE); + close (fd); + + if (rename (path, name) == -1) { + msg_info_cache ("cannot rename %s -> %s, error %d, %s", path, name, + errno, strerror (errno)); + (void)unlink (path); + ret = FALSE; + } + + return ret; +} + +#undef ROUND_DOUBLE + +gint +rspamd_symcache_add_symbol (struct rspamd_symcache *cache, + const gchar *name, + gint priority, + symbol_func_t func, + gpointer user_data, + enum rspamd_symbol_type type, + gint parent) +{ + struct rspamd_symcache_item *item = NULL; + + g_assert (cache != NULL); + + if (name == NULL && !(type & SYMBOL_TYPE_CALLBACK)) { + msg_warn_cache ("no name for non-callback symbol!"); + } + else if ((type & SYMBOL_TYPE_VIRTUAL) && parent == -1) { + msg_warn_cache ("no parent symbol is associated with virtual symbol %s", + name); + } + + if (name != NULL && !(type & SYMBOL_TYPE_CALLBACK)) { + if (g_hash_table_lookup (cache->items_by_symbol, name) != NULL) { + msg_err_cache ("skip duplicate symbol registration for %s", name); + return -1; + } + } + + if (type & (SYMBOL_TYPE_CLASSIFIER|SYMBOL_TYPE_CALLBACK| + SYMBOL_TYPE_PREFILTER|SYMBOL_TYPE_POSTFILTER| + SYMBOL_TYPE_IDEMPOTENT)) { + type |= SYMBOL_TYPE_NOSTAT; + } + + item = rspamd_mempool_alloc0 (cache->static_pool, + sizeof (struct rspamd_symcache_item)); + item->st = rspamd_mempool_alloc0_shared (cache->static_pool, + sizeof (*item->st)); + item->enabled = TRUE; + + /* + * We do not share cd to skip locking, instead we'll just calculate it on + * save or accumulate + */ + item->cd = rspamd_mempool_alloc0 (cache->static_pool, + sizeof (struct rspamd_counter_data)); + item->priority = priority; + item->type = type; + + if ((type & SYMBOL_TYPE_FINE) && item->priority == 0) { + /* Make priority for negative weighted symbols */ + item->priority = 1; + } + + if (func) { + /* Non-virtual symbol */ + g_assert (parent == -1); + + if (item->type & SYMBOL_TYPE_PREFILTER) { + g_ptr_array_add (cache->prefilters, item); + } + else if (item->type & SYMBOL_TYPE_IDEMPOTENT) { + g_ptr_array_add (cache->idempotent, item); + } + else if (item->type & SYMBOL_TYPE_POSTFILTER) { + g_ptr_array_add (cache->postfilters, item); + } + else { + item->is_filter = TRUE; + g_ptr_array_add (cache->filters, item); + } + + item->id = cache->items_by_id->len; + g_ptr_array_add (cache->items_by_id, item); + + item->specific.normal.func = func; + item->specific.normal.user_data = user_data; + item->specific.normal.condition_cb = -1; + } + else { + /* + * Three possibilities here when no function is specified: + * - virtual symbol + * - classifier symbol + * - composite symbol + */ + if (item->type & SYMBOL_TYPE_COMPOSITE) { + item->specific.normal.condition_cb = -1; + g_ptr_array_add (cache->composites, item); + + item->id = cache->items_by_id->len; + g_ptr_array_add (cache->items_by_id, item); + } + else if (item->type & SYMBOL_TYPE_CLASSIFIER) { + /* Treat it as normal symbol to allow enable/disable */ + item->id = cache->items_by_id->len; + g_ptr_array_add (cache->items_by_id, item); + + item->is_filter = TRUE; + item->specific.normal.func = NULL; + item->specific.normal.user_data = NULL; + item->specific.normal.condition_cb = -1; + } + else { + /* Require parent */ + g_assert (parent != -1); + + item->is_virtual = TRUE; + item->specific.virtual.parent = parent; + item->id = cache->virtual->len; + g_ptr_array_add (cache->virtual, item); + } + } + + if (item->type & SYMBOL_TYPE_SQUEEZED) { + g_ptr_array_add (cache->squeezed, item); + } + + cache->used_items ++; + cache->id ++; + + if (!(item->type & + (SYMBOL_TYPE_IDEMPOTENT|SYMBOL_TYPE_NOSTAT|SYMBOL_TYPE_CLASSIFIER))) { + if (name != NULL) { + cache->cksum = t1ha (name, strlen (name), + cache->cksum); + } else { + cache->cksum = t1ha (&item->id, sizeof (item->id), + cache->cksum); + } + + cache->stats_symbols_count ++; + } + + if (name != NULL) { + item->symbol = rspamd_mempool_strdup (cache->static_pool, name); + msg_debug_cache ("used items: %d, added symbol: %s, %d", + cache->used_items, name, item->id); + } else { + g_assert (func != NULL); + msg_debug_cache ("used items: %d, added unnamed symbol: %d", + cache->used_items, item->id); + } + + if (item->is_filter) { + /* Only plain filters can have deps and rdeps */ + item->deps = g_ptr_array_new (); + item->rdeps = g_ptr_array_new (); + rspamd_mempool_add_destructor (cache->static_pool, + rspamd_ptr_array_free_hard, item->deps); + rspamd_mempool_add_destructor (cache->static_pool, + rspamd_ptr_array_free_hard, item->rdeps); + } + + if (name != NULL) { + g_hash_table_insert (cache->items_by_symbol, item->symbol, item); + } + + return item->id; +} + +void +rspamd_symcache_set_peak_callback (struct rspamd_symcache *cache, + gint cbref) +{ + g_assert (cache != NULL); + + if (cache->peak_cb != -1) { + luaL_unref (cache->cfg->lua_state, LUA_REGISTRYINDEX, + cache->peak_cb); + } + + cache->peak_cb = cbref; + msg_info_cache ("registered peak callback"); +} + +gboolean +rspamd_symcache_add_condition_delayed (struct rspamd_symcache *cache, + const gchar *sym, lua_State *L, gint cbref) +{ + struct delayed_cache_condition *ncond; + + g_assert (cache != NULL); + g_assert (sym != NULL); + + ncond = g_malloc0 (sizeof (*ncond)); + ncond->sym = g_strdup (sym); + ncond->cbref = cbref; + ncond->L = L; + cache->id ++; + + cache->delayed_conditions = g_list_prepend (cache->delayed_conditions, ncond); + + return TRUE; +} + +void +rspamd_symcache_save (struct rspamd_symcache *cache) +{ + if (cache != NULL) { + + if (cache->cfg->cache_filename) { + /* Try to sync values to the disk */ + if (!rspamd_symbols_cache_save_items (cache, + cache->cfg->cache_filename)) { + msg_err_cache ("cannot save cache data to %s", + cache->cfg->cache_filename); + } + } + } +} + +void +rspamd_symcache_destroy (struct rspamd_symcache *cache) +{ + GList *cur; + struct delayed_cache_dependency *ddep; + struct delayed_cache_condition *dcond; + + if (cache != NULL) { + rspamd_symcache_save (cache); + + if (cache->delayed_deps) { + cur = cache->delayed_deps; + + while (cur) { + ddep = cur->data; + g_free (ddep->from); + g_free (ddep->to); + g_free (ddep); + cur = g_list_next (cur); + } + + g_list_free (cache->delayed_deps); + } + + if (cache->delayed_conditions) { + cur = cache->delayed_conditions; + + while (cur) { + dcond = cur->data; + g_free (dcond->sym); + g_free (dcond); + cur = g_list_next (cur); + } + + g_list_free (cache->delayed_conditions); + } + + g_hash_table_destroy (cache->items_by_symbol); + g_ptr_array_free (cache->items_by_order, TRUE); + rspamd_mempool_delete (cache->static_pool); + g_ptr_array_free (cache->filters, TRUE); + g_ptr_array_free (cache->prefilters, TRUE); + g_ptr_array_free (cache->postfilters, TRUE); + g_ptr_array_free (cache->idempotent, TRUE); + g_ptr_array_free (cache->composites, TRUE); + g_ptr_array_free (cache->squeezed, TRUE); + REF_RELEASE (cache->items_by_order); + + if (cache->peak_cb != -1) { + luaL_unref (cache->cfg->lua_state, LUA_REGISTRYINDEX, cache->peak_cb); + } + + g_free (cache); + } +} + +struct rspamd_symcache* +rspamd_symcache_new (struct rspamd_config *cfg) +{ + struct rspamd_symcache *cache; + + cache = g_malloc0 (sizeof (struct rspamd_symcache)); + cache->static_pool = + rspamd_mempool_new (rspamd_mempool_suggest_size (), "symcache"); + cache->items_by_symbol = g_hash_table_new (rspamd_str_hash, + rspamd_str_equal); + cache->items_by_order = g_ptr_array_new (); + cache->filters = g_ptr_array_new (); + cache->prefilters = g_ptr_array_new (); + cache->postfilters = g_ptr_array_new (); + cache->idempotent = g_ptr_array_new (); + cache->composites = g_ptr_array_new (); + cache->virtual = g_ptr_array_new (); + cache->squeezed = g_ptr_array_new (); + cache->reload_time = cfg->cache_reload_time; + cache->total_hits = 1; + cache->total_weight = 1.0; + cache->cfg = cfg; + cache->cksum = 0xdeadbabe; + cache->peak_cb = -1; + cache->id = rspamd_random_uint64_fast (); + + return cache; +} + +gboolean +rspamd_symcache_init (struct rspamd_symcache *cache) +{ + gboolean res; + + g_assert (cache != NULL); + + cache->reload_time = cache->cfg->cache_reload_time; + + /* Just in-memory cache */ + if (cache->cfg->cache_filename == NULL) { + rspamd_symbols_cache_post_init (cache); + return TRUE; + } + + /* Copy saved cache entries */ + res = rspamd_symbols_cache_load_items (cache, cache->cfg->cache_filename); + rspamd_symbols_cache_post_init (cache); + + return res; +} + + +static void +rspamd_symbols_cache_validate_cb (gpointer k, gpointer v, gpointer ud) +{ + struct rspamd_symcache_item *item = v, *parent; + struct rspamd_config *cfg; + struct rspamd_symcache *cache = (struct rspamd_symcache *)ud; + struct rspamd_symbol *s; + gboolean skipped, ghost; + gint p1, p2; + + ghost = item->st->weight == 0 ? TRUE : FALSE; + cfg = cache->cfg; + + /* Check whether this item is skipped */ + skipped = !ghost; + g_assert (cfg != NULL); + + if ((item->type & + (SYMBOL_TYPE_NORMAL|SYMBOL_TYPE_VIRTUAL|SYMBOL_TYPE_COMPOSITE|SYMBOL_TYPE_CLASSIFIER)) + && g_hash_table_lookup (cfg->symbols, item->symbol) == NULL) { + + if (cfg->unknown_weight != 0) { + + skipped = FALSE; + item->st->weight = cfg->unknown_weight; + s = rspamd_mempool_alloc0 (cache->static_pool, + sizeof (*s)); + s->name = item->symbol; + s->weight_ptr = &item->st->weight; + g_hash_table_insert (cfg->symbols, item->symbol, s); + + msg_info_cache ("adding unknown symbol %s", item->symbol); + ghost = FALSE; + } + else { + skipped = TRUE; + } + } + else { + skipped = FALSE; + } + + if (!ghost && skipped) { + item->type |= SYMBOL_TYPE_SKIPPED; + msg_warn_cache ("symbol %s has no score registered, skip its check", + item->symbol); + } + + if (ghost) { + msg_debug_cache ("symbol %s is registered as ghost symbol, it won't be inserted " + "to any metric", item->symbol); + } + + if (item->st->weight < 0 && item->priority == 0) { + item->priority ++; + } + + if (item->is_virtual) { + g_assert (item->specific.virtual.parent < (gint)cache->filters->len); + parent = g_ptr_array_index (cache->filters, + item->specific.virtual.parent); + + if (fabs (parent->st->weight) < fabs (item->st->weight)) { + parent->st->weight = item->st->weight; + } + + p1 = abs (item->priority); + p2 = abs (parent->priority); + + if (p1 != p2) { + parent->priority = MAX (p1, p2); + item->priority = parent->priority; + } + } + + cache->total_weight += fabs (item->st->weight); +} + +static void +rspamd_symbols_cache_metric_validate_cb (gpointer k, gpointer v, gpointer ud) +{ + struct rspamd_symcache *cache = (struct rspamd_symcache *)ud; + const gchar *sym = k; + struct rspamd_symbol *s = (struct rspamd_symbol *)v; + gdouble weight; + struct rspamd_symcache_item *item; + + weight = *s->weight_ptr; + item = g_hash_table_lookup (cache->items_by_symbol, sym); + + if (item) { + item->st->weight = weight; + } +} + +gboolean +rspamd_symcache_validate (struct rspamd_symcache *cache, + struct rspamd_config *cfg, + gboolean strict) +{ + struct rspamd_symcache_item *item; + GHashTableIter it; + gpointer k, v; + struct rspamd_symbol *sym_def; + gboolean ignore_symbol = FALSE, ret = TRUE; + + if (cache == NULL) { + msg_err ("empty cache is invalid"); + return FALSE; + } + + /* Now adjust symbol weights according to default metric */ + g_hash_table_foreach (cfg->symbols, + rspamd_symbols_cache_metric_validate_cb, + cache); + + g_hash_table_foreach (cache->items_by_symbol, + rspamd_symbols_cache_validate_cb, + cache); + /* Now check each metric item and find corresponding symbol in a cache */ + g_hash_table_iter_init (&it, cfg->symbols); + + while (g_hash_table_iter_next (&it, &k, &v)) { + ignore_symbol = FALSE; + sym_def = v; + + if (sym_def && (sym_def->flags & RSPAMD_SYMBOL_FLAG_IGNORE)) { + ignore_symbol = TRUE; + } + + if (!ignore_symbol) { + item = g_hash_table_lookup (cache->items_by_symbol, k); + + if (item == NULL) { + msg_warn_cache ( + "symbol '%s' has its score defined but there is no " + "corresponding rule registered", + k); + if (strict) { + ret = FALSE; + } + } + } + } + + return ret; +} + +/* Return true if metric has score that is more than spam score for it */ +static gboolean +rspamd_symbols_cache_metric_limit (struct rspamd_task *task, + struct cache_savepoint *cp) +{ + struct rspamd_metric_result *res; + double ms; + + if (task->flags & RSPAMD_TASK_FLAG_PASS_ALL) { + return FALSE; + } + + if (cp->lim == 0.0) { + res = task->result; + + if (res) { + ms = rspamd_task_get_required_score (task, res); + + if (!isnan (ms) && cp->lim < ms) { + cp->rs = res; + cp->lim = ms; + } + } + } + + if (cp->rs) { + + if (cp->rs->score > cp->lim) { + return TRUE; + } + } + else { + /* No reject score define, always check all rules */ + cp->lim = -1; + } + + return FALSE; +} + +static gboolean +rspamd_symbols_cache_check_symbol (struct rspamd_task *task, + struct rspamd_symcache *cache, + struct rspamd_symcache_item *item, + struct cache_savepoint *checkpoint) +{ + double t1 = 0; + struct rspamd_task **ptask; + lua_State *L; + gboolean check = TRUE; + + if (item->type & SYMBOL_TYPE_CLASSIFIER) { + /* Classifiers are special :( */ + return TRUE; + } + + g_assert (!item->is_virtual); + g_assert (item->specific.normal.func != NULL); + if (CHECK_START_BIT (checkpoint, item)) { + /* + * This can actually happen when deps span over different layers + */ + return CHECK_FINISH_BIT (checkpoint, item); + } + + /* Check has been started */ + SET_START_BIT (checkpoint, item); + + if (!item->enabled || + (RSPAMD_TASK_IS_EMPTY (task) && !(item->type & SYMBOL_TYPE_EMPTY))) { + check = FALSE; + } + else if (item->specific.normal.condition_cb != -1) { + /* We also executes condition callback to check if we need this symbol */ + L = task->cfg->lua_state; + lua_rawgeti (L, LUA_REGISTRYINDEX, item->specific.normal.condition_cb); + ptask = lua_newuserdata (L, sizeof (struct rspamd_task *)); + rspamd_lua_setclass (L, "rspamd{task}", -1); + *ptask = task; + + if (lua_pcall (L, 1, 1, 0) != 0) { + msg_info_task ("call to condition for %s failed: %s", + item->symbol, lua_tostring (L, -1)); + lua_pop (L, 1); + } + else { + check = lua_toboolean (L, -1); + lua_pop (L, 1); + } + } + + if (check) { + msg_debug_cache_task ("execute %s, %d", item->symbol, item->id); +#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC + struct timeval tv; + + event_base_update_cache_time (task->ev_base); + event_base_gettimeofday_cached (task->ev_base, &tv); + t1 = tv_to_double (&tv); +#else + t1 = rspamd_get_ticks (FALSE); +#endif + item->start_ticks = t1; + item->async_events = 0; + g_assert (checkpoint->cur_item == NULL); + checkpoint->cur_item = item; + checkpoint->items_inflight ++; + /* Callback now must finalize itself */ + item->specific.normal.func (task, item, item->specific.normal.user_data); + checkpoint->cur_item = NULL; + + if (checkpoint->items_inflight == 0) { + + return TRUE; + } + + if (item->async_events == 0 && !CHECK_FINISH_BIT (checkpoint, item)) { + msg_err_cache ("critical error: item %s has no async events pending, " + "but it is not finalised", item->symbol); + g_assert_not_reached (); + } + + return FALSE; + } + else { + msg_debug_cache_task ("skipping check of %s as its start condition is false", + item->symbol); + SET_FINISH_BIT (checkpoint, item); + } + + return TRUE; +} + +static gboolean +rspamd_symbols_cache_check_deps (struct rspamd_task *task, + struct rspamd_symcache *cache, + struct rspamd_symcache_item *item, + struct cache_savepoint *checkpoint, + guint recursion, + gboolean check_only) +{ + struct cache_dependency *dep; + guint i; + gboolean ret = TRUE; + static const guint max_recursion = 20; + + if (recursion > max_recursion) { + msg_err_task ("cyclic dependencies: maximum check level %ud exceed when " + "checking dependencies for %s", max_recursion, item->symbol); + + return TRUE; + } + + if (item->deps != NULL && item->deps->len > 0) { + for (i = 0; i < item->deps->len; i ++) { + dep = g_ptr_array_index (item->deps, i); + + if (dep->item == NULL) { + /* Assume invalid deps as done */ + msg_debug_cache_task ("symbol %d(%s) has invalid dependencies on %d(%s)", + item->id, item->symbol, dep->id, dep->sym); + continue; + } + + if (!CHECK_FINISH_BIT (checkpoint, dep->item)) { + if (!CHECK_START_BIT (checkpoint, dep->item)) { + /* Not started */ + if (!check_only) { + if (!rspamd_symbols_cache_check_deps (task, cache, + dep->item, + checkpoint, + recursion + 1, + check_only)) { + + ret = FALSE; + msg_debug_cache_task ("delayed dependency %d(%s) for " + "symbol %d(%s)", + dep->id, dep->sym, item->id, item->symbol); + } + else if (!rspamd_symbols_cache_check_symbol (task, cache, + dep->item, + checkpoint)) { + /* Now started, but has events pending */ + ret = FALSE; + msg_debug_cache_task ("started check of %d(%s) symbol " + "as dep for " + "%d(%s)", + dep->id, dep->sym, item->id, item->symbol); + } + else { + msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is " + "already processed", + dep->id, dep->sym, item->id, item->symbol); + } + } + else { + msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) " + "cannot be started now", + dep->id, dep->sym, + item->id, item->symbol); + ret = FALSE; + } + } + else { + /* Started but not finished */ + msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is " + "still executing", + dep->id, dep->sym, + item->id, item->symbol); + ret = FALSE; + } + } + else { + msg_debug_cache_task ("dependency %d(%s) for symbol %d(%s) is already " + "checked", + dep->id, dep->sym, + item->id, item->symbol); + } + } + } + + return ret; +} + +#define BITS_PER_UINT64 (NBBY * sizeof (guint64)) +#define UINT64_BITMAP_SIZE(nbits) (((nbits) + BITS_PER_UINT64 - 1) / BITS_PER_UINT64) +#define ALLOC_BITMAP(bmap, nelts) do { \ + (bmap).started = rspamd_mempool_alloc0 (task->task_pool, \ + UINT64_BITMAP_SIZE (nelts) * sizeof (guint64)); \ + (bmap).finished = rspamd_mempool_alloc0 (task->task_pool, \ + UINT64_BITMAP_SIZE (nelts) * sizeof (guint64)); \ +} while (0) + +static struct cache_savepoint * +rspamd_symbols_cache_make_checkpoint (struct rspamd_task *task, + struct rspamd_symcache *cache) +{ + struct cache_savepoint *checkpoint; + + if (cache->items_by_order->id != cache->id) { + /* + * Cache has been modified, need to resort it + */ + msg_info_cache ("symbols cache has been modified since last check:" + " old id: %ud, new id: %ud", + cache->items_by_order->id, cache->id); + rspamd_symbols_cache_resort (cache); + } + + checkpoint = rspamd_mempool_alloc0 (task->task_pool, sizeof (*checkpoint)); + + ALLOC_BITMAP (checkpoint->prefilters, cache->prefilters->len); + ALLOC_BITMAP (checkpoint->filters, cache->filters->len); + ALLOC_BITMAP (checkpoint->postfilters, cache->postfilters->len); + ALLOC_BITMAP (checkpoint->idempotent, cache->idempotent->len); + + g_assert (cache->items_by_order != NULL); + checkpoint->version = cache->items_by_order->d->len; + checkpoint->order = cache->items_by_order; + REF_RETAIN (checkpoint->order); + rspamd_mempool_add_destructor (task->task_pool, + rspamd_symbols_cache_order_unref, checkpoint->order); + + checkpoint->pass = RSPAMD_CACHE_PASS_INIT; + checkpoint->cur_bits = &checkpoint->prefilters; + task->checkpoint = checkpoint; + + task->result = task->result; + + return checkpoint; +} + +gboolean +rspamd_symcache_process_settings (struct rspamd_task *task, + struct rspamd_symcache *cache) +{ + const ucl_object_t *wl, *cur, *disabled, *enabled; + struct rspamd_symbols_group *gr; + GHashTableIter gr_it; + ucl_object_iter_t it = NULL; + gboolean already_disabled = FALSE; + gpointer k, v; + + wl = ucl_object_lookup (task->settings, "whitelist"); + + if (wl != NULL) { + msg_info_task ("<%s> is whitelisted", task->message_id); + task->flags |= RSPAMD_TASK_FLAG_SKIP; + return TRUE; + } + + enabled = ucl_object_lookup (task->settings, "symbols_enabled"); + + if (enabled) { + /* Disable all symbols but selected */ + rspamd_symbols_cache_disable_all_symbols (task, cache); + already_disabled = TRUE; + it = NULL; + + while ((cur = ucl_iterate_object (enabled, &it, true)) != NULL) { + rspamd_symbols_cache_enable_symbol_checkpoint (task, cache, + ucl_object_tostring (cur)); + } + } + + /* Enable groups of symbols */ + enabled = ucl_object_lookup (task->settings, "groups_enabled"); + + if (enabled) { + it = NULL; + + if (!already_disabled) { + rspamd_symbols_cache_disable_all_symbols (task, cache); + } + + while ((cur = ucl_iterate_object (enabled, &it, true)) != NULL) { + if (ucl_object_type (cur) == UCL_STRING) { + gr = g_hash_table_lookup (task->cfg->groups, + ucl_object_tostring (cur)); + + if (gr) { + g_hash_table_iter_init (&gr_it, gr->symbols); + + while (g_hash_table_iter_next (&gr_it, &k, &v)) { + rspamd_symbols_cache_enable_symbol_checkpoint (task, cache, k); + } + } + } + } + } + + disabled = ucl_object_lookup (task->settings, "symbols_disabled"); + + if (disabled) { + it = NULL; + + while ((cur = ucl_iterate_object (disabled, &it, true)) != NULL) { + rspamd_symbols_cache_disable_symbol_checkpoint (task, cache, + ucl_object_tostring (cur)); + } + } + + /* Disable groups of symbols */ + disabled = ucl_object_lookup (task->settings, "groups_disabled"); + + if (disabled) { + it = NULL; + + while ((cur = ucl_iterate_object (disabled, &it, true)) != NULL) { + if (ucl_object_type (cur) == UCL_STRING) { + gr = g_hash_table_lookup (task->cfg->groups, + ucl_object_tostring (cur)); + + if (gr) { + g_hash_table_iter_init (&gr_it, gr->symbols); + + while (g_hash_table_iter_next (&gr_it, &k, &v)) { + rspamd_symbols_cache_disable_symbol_checkpoint (task, cache, k); + } + } + } + } + } + + return FALSE; +} + +gboolean +rspamd_symcache_process_symbols (struct rspamd_task *task, + struct rspamd_symcache *cache, gint stage) +{ + struct rspamd_symcache_item *item = NULL; + struct cache_savepoint *checkpoint; + gint i; + gboolean all_done; + gint saved_priority; + guint start_events_pending; + + g_assert (cache != NULL); + + if (task->checkpoint == NULL) { + checkpoint = rspamd_symbols_cache_make_checkpoint (task, cache); + task->checkpoint = checkpoint; + } + else { + checkpoint = task->checkpoint; + } + + if (stage == RSPAMD_TASK_STAGE_POST_FILTERS && checkpoint->pass < + RSPAMD_CACHE_PASS_POSTFILTERS) { + checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS; + } + + if (stage == RSPAMD_TASK_STAGE_IDEMPOTENT && checkpoint->pass < + RSPAMD_CACHE_PASS_IDEMPOTENT) { + checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT; + } + + msg_debug_cache_task ("symbols processing stage at pass: %d", checkpoint->pass); + start_events_pending = rspamd_session_events_pending (task->s); + + switch (checkpoint->pass) { + case RSPAMD_CACHE_PASS_INIT: + case RSPAMD_CACHE_PASS_PREFILTERS: + /* Check for prefilters */ + saved_priority = G_MININT; + all_done = TRUE; + checkpoint->cur_bits = &checkpoint->prefilters; + + for (i = 0; i < (gint)cache->prefilters->len; i ++) { + item = g_ptr_array_index (cache->prefilters, i); + + if (RSPAMD_TASK_IS_SKIPPED (task)) { + return TRUE; + } + + if (!CHECK_START_BIT (checkpoint, item) && + !CHECK_FINISH_BIT (checkpoint, item)) { + /* Check priorities */ + if (saved_priority == G_MININT) { + saved_priority = item->priority; + } + else { + if (item->priority < saved_priority && + rspamd_session_events_pending (task->s) > start_events_pending) { + /* + * Delay further checks as we have higher + * priority filters to be processed + */ + checkpoint->pass = RSPAMD_CACHE_PASS_PREFILTERS; + + return TRUE; + } + } + + rspamd_symbols_cache_check_symbol (task, cache, item, + checkpoint); + all_done = FALSE; + } + } + + if (all_done || stage == RSPAMD_TASK_STAGE_FILTERS) { + checkpoint->pass = RSPAMD_CACHE_PASS_FILTERS; + } + + if (stage == RSPAMD_TASK_STAGE_FILTERS) { + return rspamd_symcache_process_symbols (task, cache, stage); + } + + break; + + case RSPAMD_CACHE_PASS_FILTERS: + /* + * On the first pass we check symbols that do not have dependencies + * If we figure out symbol that has no dependencies satisfied, then + * we just save it for another pass + */ + all_done = TRUE; + checkpoint->cur_bits = &checkpoint->filters; + + for (i = 0; i < (gint)checkpoint->version; i ++) { + if (RSPAMD_TASK_IS_SKIPPED (task)) { + return TRUE; + } + + item = g_ptr_array_index (checkpoint->order->d, i); + + if (item->type & SYMBOL_TYPE_CLASSIFIER) { + continue; + } + + if (!CHECK_START_BIT (checkpoint, item)) { + all_done = FALSE; + + if (!rspamd_symbols_cache_check_deps (task, cache, item, + checkpoint, 0, FALSE)) { + + msg_debug_cache_task ("blocked execution of %d(%s) unless deps are " + "resolved", + item->id, item->symbol); + + continue; + } + + rspamd_symbols_cache_check_symbol (task, cache, item, + checkpoint); + } + + if (!(item->type & SYMBOL_TYPE_FINE)) { + if (rspamd_symbols_cache_metric_limit (task, checkpoint)) { + msg_info_task ("<%s> has already scored more than %.2f, so do " + "not " + "plan more checks", task->message_id, + checkpoint->rs->score); + all_done = TRUE; + break; + } + } + } + + if (all_done || stage == RSPAMD_TASK_STAGE_POST_FILTERS) { + checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS; + } + + if (stage == RSPAMD_TASK_STAGE_POST_FILTERS) { + + return rspamd_symcache_process_symbols (task, cache, stage); + } + + break; + + case RSPAMD_CACHE_PASS_POSTFILTERS: + /* Check for postfilters */ + saved_priority = G_MININT; + all_done = TRUE; + checkpoint->cur_bits = &checkpoint->postfilters; + + for (i = 0; i < (gint)cache->postfilters->len; i ++) { + if (RSPAMD_TASK_IS_SKIPPED (task)) { + return TRUE; + } + + item = g_ptr_array_index (cache->postfilters, i); + + if (!CHECK_START_BIT (checkpoint, item) && + !CHECK_FINISH_BIT (checkpoint, item)) { + /* Check priorities */ + all_done = FALSE; + + if (saved_priority == G_MININT) { + saved_priority = item->priority; + } + else { + if (item->priority > saved_priority && + rspamd_session_events_pending (task->s) > start_events_pending) { + /* + * Delay further checks as we have higher + * priority filters to be processed + */ + checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS; + + return TRUE; + } + } + + rspamd_symbols_cache_check_symbol (task, cache, item, + checkpoint); + } + } + + if (all_done) { + checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT; + } + + if (checkpoint->items_inflight == 0 || + stage == RSPAMD_TASK_STAGE_IDEMPOTENT) { + checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT; + } + + if (stage == RSPAMD_TASK_STAGE_IDEMPOTENT) { + return rspamd_symcache_process_symbols (task, cache, stage); + } + + break; + + case RSPAMD_CACHE_PASS_IDEMPOTENT: + /* Check for postfilters */ + saved_priority = G_MININT; + checkpoint->cur_bits = &checkpoint->idempotent; + + for (i = 0; i < (gint)cache->idempotent->len; i ++) { + item = g_ptr_array_index (cache->idempotent, i); + + if (!CHECK_START_BIT (checkpoint, item) && + !CHECK_FINISH_BIT (checkpoint, item)) { + /* Check priorities */ + if (saved_priority == G_MININT) { + saved_priority = item->priority; + } + else { + if (item->priority > saved_priority && + rspamd_session_events_pending (task->s) > start_events_pending) { + /* + * Delay further checks as we have higher + * priority filters to be processed + */ + checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT; + + return TRUE; + } + } + rspamd_symbols_cache_check_symbol (task, cache, item, + checkpoint); + } + } + checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT; + break; + + case RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT: + all_done = TRUE; + checkpoint->cur_bits = &checkpoint->idempotent; + + for (i = 0; i < (gint)cache->idempotent->len; i ++) { + item = g_ptr_array_index (cache->idempotent, i); + + if (!CHECK_FINISH_BIT (checkpoint, item)) { + all_done = FALSE; + break; + } + } + + if (all_done) { + checkpoint->pass = RSPAMD_CACHE_PASS_DONE; + + return TRUE; + } + break; + + case RSPAMD_CACHE_PASS_DONE: + return TRUE; + break; + } + + return FALSE; +} + +struct counters_cbdata { + ucl_object_t *top; + struct rspamd_symcache *cache; +}; + +#define ROUND_DOUBLE(x) (floor((x) * 100.0) / 100.0) + +static void +rspamd_symbols_cache_counters_cb (gpointer k, gpointer v, gpointer ud) +{ + struct counters_cbdata *cbd = ud; + ucl_object_t *obj, *top; + struct rspamd_symcache_item *item = v, *parent; + const gchar *symbol = k; + + top = cbd->top; + + obj = ucl_object_typed_new (UCL_OBJECT); + ucl_object_insert_key (obj, ucl_object_fromstring (symbol ? symbol : "unknown"), + "symbol", 0, false); + + if (item->is_virtual) { + parent = g_ptr_array_index (cbd->cache->filters, + item->specific.virtual.parent); + ucl_object_insert_key (obj, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->weight)), + "weight", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromdouble (ROUND_DOUBLE (parent->st->avg_frequency)), + "frequency", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromint (parent->st->total_hits), + "hits", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromdouble (ROUND_DOUBLE (parent->st->avg_time)), + "time", 0, false); + } + else { + ucl_object_insert_key (obj, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->weight)), + "weight", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->avg_frequency)), + "frequency", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromint (item->st->total_hits), + "hits", 0, false); + ucl_object_insert_key (obj, + ucl_object_fromdouble (ROUND_DOUBLE (item->st->avg_time)), + "time", 0, false); + } + + ucl_array_append (top, obj); +} + +#undef ROUND_DOUBLE + +ucl_object_t * +rspamd_symcache_counters (struct rspamd_symcache *cache) +{ + ucl_object_t *top; + struct counters_cbdata cbd; + + g_assert (cache != NULL); + top = ucl_object_typed_new (UCL_ARRAY); + cbd.top = top; + cbd.cache = cache; + g_hash_table_foreach (cache->items_by_symbol, + rspamd_symbols_cache_counters_cb, &cbd); + + return top; +} + +static void +rspamd_symbols_cache_call_peak_cb (struct event_base *ev_base, + struct rspamd_symcache *cache, + struct rspamd_symcache_item *item, + gdouble cur_value, + gdouble cur_err) +{ + lua_State *L = cache->cfg->lua_state; + struct event_base **pbase; + + lua_rawgeti (L, LUA_REGISTRYINDEX, cache->peak_cb); + pbase = lua_newuserdata (L, sizeof (*pbase)); + *pbase = ev_base; + rspamd_lua_setclass (L, "rspamd{ev_base}", -1); + lua_pushstring (L, item->symbol); + lua_pushnumber (L, item->st->avg_frequency); + lua_pushnumber (L, sqrt (item->st->stddev_frequency)); + lua_pushnumber (L, cur_value); + lua_pushnumber (L, cur_err); + + if (lua_pcall (L, 6, 0, 0) != 0) { + msg_info_cache ("call to peak function for %s failed: %s", + item->symbol, lua_tostring (L, -1)); + lua_pop (L, 1); + } +} + +static void +rspamd_symbols_cache_resort_cb (gint fd, short what, gpointer ud) +{ + struct timeval tv; + gdouble tm; + struct rspamd_cache_refresh_cbdata *cbdata = ud; + struct rspamd_symcache *cache; + struct rspamd_symcache_item *item; + guint i; + gdouble cur_ticks; + static const double decay_rate = 0.7; + + cache = cbdata->cache; + /* Plan new event */ + tm = rspamd_time_jitter (cache->reload_time, 0); + cur_ticks = rspamd_get_ticks (FALSE); + msg_debug_cache ("resort symbols cache, next reload in %.2f seconds", tm); + g_assert (cache != NULL); + evtimer_set (&cbdata->resort_ev, rspamd_symbols_cache_resort_cb, cbdata); + event_base_set (cbdata->ev_base, &cbdata->resort_ev); + double_to_tv (tm, &tv); + event_add (&cbdata->resort_ev, &tv); + + if (rspamd_worker_is_primary_controller (cbdata->w)) { + /* Gather stats from shared execution times */ + for (i = 0; i < cache->filters->len; i ++) { + item = g_ptr_array_index (cache->filters, i); + item->st->total_hits += item->st->hits; + g_atomic_int_set (&item->st->hits, 0); + + if (item->last_count > 0 && cbdata->w->index == 0) { + /* Calculate frequency */ + gdouble cur_err, cur_value; + + cur_value = (item->st->total_hits - item->last_count) / + (cur_ticks - cbdata->last_resort); + rspamd_set_counter_ema (&item->st->frequency_counter, + cur_value, decay_rate); + item->st->avg_frequency = item->st->frequency_counter.mean; + item->st->stddev_frequency = item->st->frequency_counter.stddev; + + if (cur_value > 0) { + msg_debug_cache ("frequency for %s is %.2f, avg: %.2f", + item->symbol, cur_value, item->st->avg_frequency); + } + + cur_err = (item->st->avg_frequency - cur_value); + cur_err *= cur_err; + + /* + * TODO: replace magic number + */ + if (item->st->frequency_counter.number > 10 && + cur_err > sqrt (item->st->stddev_frequency) * 3) { + item->frequency_peaks ++; + msg_debug_cache ("peak found for %s is %.2f, avg: %.2f, " + "stddev: %.2f, error: %.2f, peaks: %d", + item->symbol, cur_value, + item->st->avg_frequency, + item->st->stddev_frequency, + cur_err, + item->frequency_peaks); + + if (cache->peak_cb != -1) { + rspamd_symbols_cache_call_peak_cb (cbdata->ev_base, + cache, item, + cur_value, cur_err); + } + } + } + + item->last_count = item->st->total_hits; + + if (item->cd->number > 0) { + if (item->type & (SYMBOL_TYPE_CALLBACK|SYMBOL_TYPE_NORMAL)) { + item->st->avg_time = item->cd->mean; + rspamd_set_counter_ema (&item->st->time_counter, + item->st->avg_time, decay_rate); + item->st->avg_time = item->st->time_counter.mean; + memset (item->cd, 0, sizeof (*item->cd)); + } + } + } + + + cbdata->last_resort = cur_ticks; + /* We don't do actual sorting due to topological guarantees */ + } +} + +void +rspamd_symcache_start_refresh (struct rspamd_symcache *cache, + struct event_base *ev_base, struct rspamd_worker *w) +{ + struct timeval tv; + gdouble tm; + struct rspamd_cache_refresh_cbdata *cbdata; + + cbdata = rspamd_mempool_alloc0 (cache->static_pool, sizeof (*cbdata)); + cbdata->last_resort = rspamd_get_ticks (TRUE); + cbdata->ev_base = ev_base; + cbdata->w = w; + cbdata->cache = cache; + tm = rspamd_time_jitter (cache->reload_time, 0); + msg_debug_cache ("next reload in %.2f seconds", tm); + g_assert (cache != NULL); + evtimer_set (&cbdata->resort_ev, rspamd_symbols_cache_resort_cb, + cbdata); + event_base_set (ev_base, &cbdata->resort_ev); + double_to_tv (tm, &tv); + event_add (&cbdata->resort_ev, &tv); + rspamd_mempool_add_destructor (cache->static_pool, + (rspamd_mempool_destruct_t) event_del, + &cbdata->resort_ev); +} + +void +rspamd_symcache_inc_frequency (struct rspamd_symcache *cache, + const gchar *symbol) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + + item = g_hash_table_lookup (cache->items_by_symbol, symbol); + + if (item != NULL) { + g_atomic_int_inc (&item->st->hits); + } +} + +void +rspamd_symcache_add_dependency (struct rspamd_symcache *cache, + gint id_from, const gchar *to) +{ + struct rspamd_symcache_item *source; + struct cache_dependency *dep; + + g_assert (id_from >= 0 && id_from < (gint)cache->filters->len); + + source = g_ptr_array_index (cache->filters, id_from); + dep = rspamd_mempool_alloc (cache->static_pool, sizeof (*dep)); + dep->id = id_from; + dep->sym = rspamd_mempool_strdup (cache->static_pool, to); + /* Will be filled later */ + dep->item = NULL; + g_ptr_array_add (source->deps, dep); +} + +void +rspamd_symcache_add_delayed_dependency (struct rspamd_symcache *cache, + const gchar *from, const gchar *to) +{ + struct delayed_cache_dependency *ddep; + + g_assert (from != NULL); + g_assert (to != NULL); + + ddep = g_malloc0 (sizeof (*ddep)); + ddep->from = g_strdup (from); + ddep->to = g_strdup (to); + + cache->delayed_deps = g_list_prepend (cache->delayed_deps, ddep); +} + +gint +rspamd_symcache_find_symbol (struct rspamd_symcache *cache, const gchar *name) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + + if (name == NULL) { + return -1; + } + + item = g_hash_table_lookup (cache->items_by_symbol, name); + + if (item != NULL) { + return item->id; + } + + return -1; +} + +gboolean +rspamd_symcache_stat_symbol (struct rspamd_symcache *cache, + const gchar *name, gdouble *frequency, gdouble *freq_stddev, + gdouble *tm, guint *nhits) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + + if (name == NULL) { + return FALSE; + } + + item = g_hash_table_lookup (cache->items_by_symbol, name); + + if (item != NULL) { + *frequency = item->st->avg_frequency; + *freq_stddev = sqrt (item->st->stddev_frequency); + *tm = item->st->time_counter.mean; + + if (nhits) { + *nhits = item->st->hits; + } + + return TRUE; + } + + return FALSE; +} + +static gint +rspamd_symbols_cache_find_filter (struct rspamd_symcache *cache, + const gchar *name) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + + if (name == NULL) { + return -1; + } + + item = g_hash_table_lookup (cache->items_by_symbol, name); + + if (item != NULL) { + + if (item->is_virtual) { + item = g_ptr_array_index (cache->filters, + item->specific.virtual.parent); + } + + if (!item->is_filter) { + return -1; + } + + return item->id; + } + + return -1; +} + +const gchar * +rspamd_symcache_symbol_by_id (struct rspamd_symcache *cache, + gint id) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + + if (id < 0 || id >= (gint)cache->filters->len) { + return NULL; + } + + item = g_ptr_array_index (cache->filters, id); + + return item->symbol; +} + +guint +rspamd_symcache_stats_symbols_count (struct rspamd_symcache *cache) +{ + g_assert (cache != NULL); + + return cache->stats_symbols_count; +} + +#define DISABLE_BITS(what) do { \ + memset (checkpoint->what.started, 0xff, NBYTES (cache->what->len)); \ + memset (checkpoint->what.finished, 0xff, NBYTES (cache->what->len)); \ +} while(0) + +static void +rspamd_symbols_cache_disable_all_symbols (struct rspamd_task *task, + struct rspamd_symcache *cache) +{ + struct cache_savepoint *checkpoint; + guint i; + struct rspamd_symcache_item *item; + + if (task->checkpoint == NULL) { + checkpoint = rspamd_symbols_cache_make_checkpoint (task, cache); + task->checkpoint = checkpoint; + } + else { + checkpoint = task->checkpoint; + } + + /* Set all symbols as started + finished to disable their execution */ + DISABLE_BITS(prefilters); + DISABLE_BITS(filters); + DISABLE_BITS(postfilters); + DISABLE_BITS(idempotent); + + /* Enable for squeezed symbols */ + PTR_ARRAY_FOREACH (cache->squeezed, i, item) { + clrbit (checkpoint->filters.started, item->id); + clrbit (checkpoint->filters.finished, item->id); + } +} + +#undef DISABLE_BITS + +static struct rspamd_symcache_item * +rspamd_symbols_cache_get_item_and_bits (struct rspamd_task *task, + struct rspamd_symcache *cache, + struct cache_savepoint *checkpoint, + const gchar *symbol, + struct cache_bits **bits) +{ + struct rspamd_symcache_item *item; + + item = g_hash_table_lookup (cache->items_by_symbol, symbol); + + if (item) { + if (item->is_virtual) { + item = g_ptr_array_index (cache->filters, item->specific.virtual.parent); + *bits = &checkpoint->filters; + } + else { + if (item->type & SYMBOL_TYPE_PREFILTER) { + *bits = &checkpoint->prefilters; + } + else if (item->type & SYMBOL_TYPE_POSTFILTER) { + *bits = &checkpoint->postfilters; + } + else if (item->type & SYMBOL_TYPE_IDEMPOTENT) { + *bits = &checkpoint->idempotent; + } + else { + *bits = &checkpoint->filters; + } + } + } + + return item; +} + +static void +rspamd_symbols_cache_disable_symbol_checkpoint (struct rspamd_task *task, + struct rspamd_symcache *cache, const gchar *symbol) +{ + struct cache_savepoint *checkpoint; + struct rspamd_symcache_item *item; + struct cache_bits *bits = NULL; + + if (task->checkpoint == NULL) { + checkpoint = rspamd_symbols_cache_make_checkpoint (task, cache); + task->checkpoint = checkpoint; + } + else { + checkpoint = task->checkpoint; + } + + item = rspamd_symbols_cache_get_item_and_bits (task, cache, checkpoint, + symbol, &bits); + + if (item) { + + if (!(item->type & SYMBOL_TYPE_SQUEEZED)) { + setbit (bits->started, item->id); + setbit (bits->finished, item->id); + msg_debug_cache_task ("disable execution of %s", symbol); + } + else { + msg_debug_cache_task ("skip disabling squeezed symbol %s", symbol); + } + } + else { + msg_info_task ("cannot disable %s: not found", symbol); + } +} + +static void +rspamd_symbols_cache_enable_symbol_checkpoint (struct rspamd_task *task, + struct rspamd_symcache *cache, const gchar *symbol) +{ + struct cache_savepoint *checkpoint; + struct rspamd_symcache_item *item; + struct cache_bits *bits = NULL; + + if (task->checkpoint == NULL) { + checkpoint = rspamd_symbols_cache_make_checkpoint (task, cache); + task->checkpoint = checkpoint; + } + else { + checkpoint = task->checkpoint; + } + + item = rspamd_symbols_cache_get_item_and_bits (task, cache, checkpoint, + symbol, &bits); + + if (item) { + + if (!(item->type & SYMBOL_TYPE_SQUEEZED)) { + clrbit (bits->started, item->id); + clrbit (bits->finished, item->id); + msg_debug_cache_task ("enable execution of %s", symbol); + } + else { + msg_debug_cache_task ("skip enabling of squeezed symbol %s", symbol); + } + } + else { + msg_info_task ("cannot enable %s: not found", symbol); + } +} + +struct rspamd_abstract_callback_data* +rspamd_symcache_get_cbdata (struct rspamd_symcache *cache, + const gchar *symbol) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + item = g_hash_table_lookup (cache->items_by_symbol, symbol); + + if (item) { + + if (item->is_virtual) { + item = g_ptr_array_index (cache->filters, item->specific.virtual.parent); + } + + return item->specific.normal.user_data; + } + + return NULL; +} + +gboolean +rspamd_symcache_is_checked (struct rspamd_task *task, + struct rspamd_symcache *cache, const gchar *symbol) +{ + struct cache_savepoint *checkpoint; + struct rspamd_symcache_item *item; + struct cache_bits *bits = NULL; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + if (task->checkpoint == NULL) { + checkpoint = rspamd_symbols_cache_make_checkpoint (task, cache); + task->checkpoint = checkpoint; + } + else { + checkpoint = task->checkpoint; + } + + item = rspamd_symbols_cache_get_item_and_bits (task, cache, checkpoint, + symbol, &bits); + + if (item) { + return isset (bits->started, item->id); + } + + return FALSE; +} + +void +rspamd_symcache_disable_symbol (struct rspamd_symcache *cache, + const gchar *symbol) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + item = g_hash_table_lookup (cache->items_by_symbol, symbol); + + if (item) { + item->enabled = FALSE; + } +} + +void +rspamd_symcache_enable_symbol (struct rspamd_symcache *cache, + const gchar *symbol) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + item = g_hash_table_lookup (cache->items_by_symbol, symbol); + + if (item) { + item->enabled = TRUE; + } +} + +guint64 +rspamd_symcache_get_cksum (struct rspamd_symcache *cache) +{ + g_assert (cache != NULL); + + return cache->cksum; +} + + +gboolean +rspamd_symcache_is_symbol_enabled (struct rspamd_task *task, + struct rspamd_symcache *cache, const gchar *symbol) +{ + struct cache_savepoint *checkpoint; + struct rspamd_symcache_item *item; + lua_State *L; + struct rspamd_task **ptask; + struct cache_bits *bits = NULL; + gboolean ret = TRUE; + + g_assert (cache != NULL); + g_assert (symbol != NULL); + + checkpoint = task->checkpoint; + + + if (checkpoint) { + item = rspamd_symbols_cache_get_item_and_bits (task, cache, checkpoint, + symbol, &bits); + + if (item) { + if (CHECK_START_BIT (checkpoint, item)) { + ret = FALSE; + } + else { + if (item->specific.normal.condition_cb != -1) { + /* We also executes condition callback to check if we need this symbol */ + L = task->cfg->lua_state; + lua_rawgeti (L, LUA_REGISTRYINDEX, + item->specific.normal.condition_cb); + ptask = lua_newuserdata (L, sizeof (struct rspamd_task *)); + rspamd_lua_setclass (L, "rspamd{task}", -1); + *ptask = task; + + if (lua_pcall (L, 1, 1, 0) != 0) { + msg_info_task ("call to condition for %s failed: %s", + item->symbol, lua_tostring (L, -1)); + lua_pop (L, 1); + } + else { + ret = lua_toboolean (L, -1); + lua_pop (L, 1); + } + } + } + } + } + + return ret; +} + +void +rspamd_symcache_foreach (struct rspamd_symcache *cache, + void (*func) (gint, const gchar *, gint, gpointer), + gpointer ud) +{ + struct rspamd_symcache_item *item; + GHashTableIter it; + gpointer k, v; + + g_hash_table_iter_init (&it, cache->items_by_symbol); + + while (g_hash_table_iter_next (&it, &k, &v)) { + item = (struct rspamd_symcache_item *)v; + func (item->id, item->symbol, item->type, ud); + } +} + +struct rspamd_symcache_item * +rspamd_symcache_get_cur_item (struct rspamd_task *task) +{ + struct cache_savepoint *checkpoint = task->checkpoint; + + if (checkpoint == NULL) { + return NULL; + } + + return checkpoint->cur_item; +} + +/** + * Replaces the current item being processed. + * Returns the current item being processed (if any) + * @param task + * @param item + * @return + */ +struct rspamd_symcache_item * +rspamd_symcache_set_cur_item (struct rspamd_task *task, + struct rspamd_symcache_item *item) +{ + struct cache_savepoint *checkpoint = task->checkpoint; + struct rspamd_symcache_item *ex; + + ex = checkpoint->cur_item; + checkpoint->cur_item = item; + + return ex; +} + + +/** + * Finalize the current async element potentially calling its deps + */ +void +rspamd_symcache_finalize_item (struct rspamd_task *task, + struct rspamd_symcache_item *item) +{ + struct cache_savepoint *checkpoint = task->checkpoint; + struct cache_dependency *rdep; + gdouble t2, diff; + guint i; + struct timeval tv; + const gdouble slow_diff_limit = 0.3; + + /* Sanity checks */ + g_assert (checkpoint->items_inflight > 0); + + if (item->async_events > 0) { + /* + * XXX: Race condition + * + * It is possible that some async event is still in flight, but we + * already know its result, however, it is the responsibility of that + * event to decrease async events count and call this function + * one more time + */ + msg_debug_cache_task ("postpone finalisation of %s(%d) as there are %d " + "async events pendning", + item->symbol, item->id, item->async_events); + + return; + } + + msg_debug_cache_task ("process finalize for item %s(%d)", item->symbol, item->id); + SET_FINISH_BIT (checkpoint, item); + checkpoint->items_inflight --; + checkpoint->cur_item = NULL; + +#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC + event_base_update_cache_time (task->ev_base); + event_base_gettimeofday_cached (task->ev_base, &tv); + t2 = tv_to_double (&tv); +#else + t2 = rspamd_get_ticks (FALSE); +#endif + + diff = (t2 - item->start_ticks); + + if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) { + rspamd_task_profile_set (task, item->symbol, diff); + } + + if (!(item->type & SYMBOL_TYPE_SQUEEZED)) { + if (diff > slow_diff_limit) { + msg_info_task ("slow rule: %s(%d): %.2f ms", item->symbol, item->id, + diff * 1000); + } + + if (rspamd_worker_is_scanner (task->worker)) { + rspamd_set_counter (item->cd, diff); + } + } + + /* Process all reverse dependencies */ + PTR_ARRAY_FOREACH (item->rdeps, i, rdep) { + if (rdep->item) { + if (!CHECK_START_BIT (checkpoint, rdep->item)) { + msg_debug_cache_task ("check item %d(%s) rdep of %s ", + rdep->item->id, rdep->item->symbol, item->symbol); + + if (!rspamd_symbols_cache_check_deps (task, task->cfg->cache, + rdep->item, + checkpoint, 0, FALSE)) { + msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s " + "unless deps are resolved", + rdep->item->id, rdep->item->symbol, item->symbol); + } + else { + rspamd_symbols_cache_check_symbol (task, task->cfg->cache, + rdep->item, + checkpoint); + } + } + } + } +} + +guint +rspamd_symcache_item_async_inc_full (struct rspamd_task *task, + struct rspamd_symcache_item *item, + const gchar *subsystem, + const gchar *loc) +{ + msg_debug_cache_task ("increase async events counter for %s(%d) = %d + 1; subsystem %s (%s)", + item->symbol, item->id, item->async_events, subsystem, loc); + return ++item->async_events; +} + +guint +rspamd_symcache_item_async_dec_full (struct rspamd_task *task, + struct rspamd_symcache_item *item, + const gchar *subsystem, + const gchar *loc) +{ + msg_debug_cache_task ("decrease async events counter for %s(%d) = %d - 1; subsystem %s (%s)", + item->symbol, item->id, item->async_events, subsystem, loc); + g_assert (item->async_events > 0); + + return --item->async_events; +} + +gboolean +rspamd_symcache_item_async_dec_check_full (struct rspamd_task *task, + struct rspamd_symcache_item *item, + const gchar *subsystem, + const gchar *loc) +{ + if (rspamd_symcache_item_async_dec_full (task, item, subsystem, loc) == 0) { + rspamd_symcache_finalize_item (task, item); + + return TRUE; + } + + return FALSE; +}
\ No newline at end of file |