From a3c0fbd979b0f901a89ad4c9106a263476965e98 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 29 Oct 2018 12:58:58 +0000 Subject: [PATCH] [Project] Rework dynamic parts of symcache items --- src/libserver/rspamd_symcache.c | 488 ++++++++++++++------------------ 1 file changed, 214 insertions(+), 274 deletions(-) diff --git a/src/libserver/rspamd_symcache.c b/src/libserver/rspamd_symcache.c index 70aff0d1b..37fdb568e 100644 --- a/src/libserver/rspamd_symcache.c +++ b/src/libserver/rspamd_symcache.c @@ -52,29 +52,26 @@ INIT_LOG_MODULE(symcache) -#define CHECK_START_BIT(checkpoint, item) \ - isset((checkpoint)->cur_bits->started, (item)->id) -#define SET_START_BIT(checkpoint, item) \ - setbit((checkpoint)->cur_bits->started, (item)->id ) +#define CHECK_START_BIT(checkpoint, dyn_item) \ + ((dyn_item)->started) +#define SET_START_BIT(checkpoint, dyn_item) \ + (dyn_item)->started = 1 -#define CHECK_FINISH_BIT(checkpoint, item) \ - isset((checkpoint)->cur_bits->finished, (item)->id) -#define SET_FINISH_BIT(checkpoint, item) \ - setbit((checkpoint)->cur_bits->finished, (item)->id) +#define CHECK_FINISH_BIT(checkpoint, dyn_item) \ + ((dyn_item)->finished) +#define SET_FINISH_BIT(checkpoint, dyn_item) \ + (dyn_item)->finished = 1 -static const guchar rspamd_symbols_cache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0 }; +static const guchar rspamd_symcache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0 }; -static gint rspamd_symbols_cache_find_filter (struct rspamd_symcache *cache, - const gchar *name); - -struct rspamd_symbols_cache_header { +struct rspamd_symcache_header { guchar magic[8]; guint nitems; guchar checksum[64]; guchar unused[128]; }; -struct symbols_cache_order { +struct symcache_order { GPtrArray *d; guint id; ref_entry_t ref; @@ -84,7 +81,7 @@ struct rspamd_symcache { /* Hash table for fast access */ GHashTable *items_by_symbol; GPtrArray *items_by_id; - struct symbols_cache_order *items_by_order; + struct symcache_order *items_by_order; GPtrArray *filters; GPtrArray *prefilters; GPtrArray *postfilters; @@ -200,8 +197,8 @@ struct cache_savepoint { gdouble lim; struct rspamd_symcache_item *cur_item; - struct symbols_cache_order *order; - GArray *dynamic_items; + struct symcache_order *order; + struct rspamd_symcache_dynamic_item dynamic_items[]; }; struct rspamd_cache_refresh_cbdata { @@ -220,60 +217,86 @@ struct rspamd_cache_refresh_cbdata { * ((f) > 0 ? (f) : FREQ_ALPHA) \ / (t > TIME_ALPHA ? t : TIME_ALPHA)) -static gboolean rspamd_symbols_cache_check_symbol (struct rspamd_task *task, +static gboolean rspamd_symcache_check_symbol (struct rspamd_task *task, struct rspamd_symcache *cache, struct rspamd_symcache_item *item, struct cache_savepoint *checkpoint); -static gboolean rspamd_symbols_cache_check_deps (struct rspamd_task *task, +static gboolean rspamd_symcache_check_deps (struct rspamd_task *task, struct rspamd_symcache *cache, struct rspamd_symcache_item *item, struct cache_savepoint *checkpoint, guint recursion, gboolean check_only); -static void rspamd_symbols_cache_disable_symbol_checkpoint (struct rspamd_task *task, +static void rspamd_symcache_disable_symbol_checkpoint (struct rspamd_task *task, struct rspamd_symcache *cache, const gchar *symbol); -static void rspamd_symbols_cache_enable_symbol_checkpoint (struct rspamd_task *task, +static void rspamd_symcache_enable_symbol_checkpoint (struct rspamd_task *task, struct rspamd_symcache *cache, const gchar *symbol); -static void rspamd_symbols_cache_disable_all_symbols (struct rspamd_task *task, +static void rspamd_symcache_disable_all_symbols (struct rspamd_task *task, struct rspamd_symcache *cache); static void -rspamd_symbols_cache_order_dtor (gpointer p) +rspamd_symcache_order_dtor (gpointer p) { - struct symbols_cache_order *ord = p; + struct symcache_order *ord = p; g_ptr_array_free (ord->d, TRUE); g_free (ord); } static void -rspamd_symbols_cache_order_unref (gpointer p) +rspamd_symcache_order_unref (gpointer p) { - struct symbols_cache_order *ord = p; + struct symcache_order *ord = p; REF_RELEASE (ord); } -static struct symbols_cache_order * -rspamd_symbols_cache_order_new (struct rspamd_symcache *cache, +static struct symcache_order * +rspamd_symcache_order_new (struct rspamd_symcache *cache, gsize nelts) { - struct symbols_cache_order *ord; + struct symcache_order *ord; ord = g_malloc0 (sizeof (*ord)); ord->d = g_ptr_array_sized_new (nelts); ord->id = cache->id; - REF_INIT_RETAIN (ord, rspamd_symbols_cache_order_dtor); + REF_INIT_RETAIN (ord, rspamd_symcache_order_dtor); return ord; } -static inline struct rspamd_symcache_item* +static inline struct rspamd_symcache_dynamic_item* rspamd_symcache_get_dynamic (struct cache_savepoint *checkpoint, struct rspamd_symcache_item *item) { - return &g_array_index (checkpoint->dynamic_items, - struct rspamd_symcache_item, item->id); + return &checkpoint->dynamic_items[item->id]; +} + +static inline struct rspamd_symcache_item * +rspamd_symcache_find_filter (struct rspamd_symcache *cache, + const gchar *name) +{ + struct rspamd_symcache_item *item; + + g_assert (cache != NULL); + + if (name == NULL) { + return NULL; + } + + item = g_hash_table_lookup (cache->items_by_symbol, name); + + if (item != NULL) { + + if (item->is_virtual) { + item = g_ptr_array_index (cache->filters, + item->specific.virtual.parent); + } + + return item; + } + + return NULL; } static gint @@ -369,7 +392,7 @@ cache_logic_cmp (const void *p1, const void *p2, gpointer ud) } static void -rspamd_symbols_cache_tsort_visit (struct rspamd_symcache *cache, +rspamd_symcache_tsort_visit (struct rspamd_symcache *cache, struct rspamd_symcache_item *it, guint cur_order) { @@ -397,7 +420,7 @@ rspamd_symbols_cache_tsort_visit (struct rspamd_symcache *cache, PTR_ARRAY_FOREACH (it->deps, i, dep) { msg_debug_cache ("visiting dep: %s (%d)", dep->item->symbol, cur_order + 1); - rspamd_symbols_cache_tsort_visit (cache, dep->item, cur_order + 1); + rspamd_symcache_tsort_visit (cache, dep->item, cur_order + 1); } it->order = cur_order; @@ -406,14 +429,14 @@ rspamd_symbols_cache_tsort_visit (struct rspamd_symcache *cache, } static void -rspamd_symbols_cache_resort (struct rspamd_symcache *cache) +rspamd_symcache_resort (struct rspamd_symcache *cache) { - struct symbols_cache_order *ord; + struct symcache_order *ord; guint i; guint64 total_hits = 0; struct rspamd_symcache_item *it; - ord = rspamd_symbols_cache_order_new (cache, cache->filters->len); + ord = rspamd_symcache_order_new (cache, cache->filters->len); for (i = 0; i < cache->filters->len; i ++) { it = g_ptr_array_index (cache->filters, i); @@ -431,7 +454,7 @@ rspamd_symbols_cache_resort (struct rspamd_symcache *cache) */ PTR_ARRAY_FOREACH (ord->d, i, it) { if (it->order == 0) { - rspamd_symbols_cache_tsort_visit (cache, it, 1); + rspamd_symcache_tsort_visit (cache, it, 1); } } @@ -451,7 +474,7 @@ rspamd_symbols_cache_resort (struct rspamd_symcache *cache) /* Sort items in logical order */ static void -rspamd_symbols_cache_post_init (struct rspamd_symcache *cache) +rspamd_symcache_post_init (struct rspamd_symcache *cache) { struct rspamd_symcache_item *it, *dit; struct cache_dependency *dep, *rdep; @@ -459,20 +482,12 @@ rspamd_symbols_cache_post_init (struct rspamd_symcache *cache) struct delayed_cache_condition *dcond; GList *cur; gint i, j; - gint id; cur = cache->delayed_deps; while (cur) { ddep = cur->data; - id = rspamd_symbols_cache_find_filter (cache, ddep->from); - - if (id != -1) { - it = g_ptr_array_index (cache->filters, id); - } - else { - it = NULL; - } + it = rspamd_symcache_find_filter (cache, ddep->from); if (it == NULL) { msg_err_cache ("cannot register delayed dependency between %s and %s, " @@ -491,7 +506,7 @@ rspamd_symbols_cache_post_init (struct rspamd_symcache *cache) while (cur) { dcond = cur->data; - it = g_hash_table_lookup (cache->items_by_symbol, dcond->sym); + it = rspamd_symcache_find_filter (cache, dcond->sym); if (it == NULL) { msg_err_cache ( @@ -511,14 +526,9 @@ rspamd_symbols_cache_post_init (struct rspamd_symcache *cache) for (j = 0; j < it->deps->len; j ++) { dep = g_ptr_array_index (it->deps, j); - dit = g_hash_table_lookup (cache->items_by_symbol, dep->sym); + dit = rspamd_symcache_find_filter (cache, dep->sym); if (dit != NULL) { - if (dit->is_virtual) { - dit = g_ptr_array_index (cache->filters, - dit->specific.virtual.parent); - } - if (dit->id == i) { msg_err_cache ("cannot add dependency on self: %s -> %s " "(resolved to %s)", @@ -559,13 +569,13 @@ rspamd_symbols_cache_post_init (struct rspamd_symcache *cache) g_ptr_array_sort_with_data (cache->postfilters, postfilters_cmp, cache); g_ptr_array_sort_with_data (cache->idempotent, postfilters_cmp, cache); - rspamd_symbols_cache_resort (cache); + rspamd_symcache_resort (cache); } static gboolean -rspamd_symbols_cache_load_items (struct rspamd_symcache *cache, const gchar *name) +rspamd_symcache_load_items (struct rspamd_symcache *cache, const gchar *name) { - struct rspamd_symbols_cache_header *hdr; + struct rspamd_symcache_header *hdr; struct stat st; struct ucl_parser *parser; ucl_object_t *top; @@ -615,8 +625,8 @@ rspamd_symbols_cache_load_items (struct rspamd_symcache *cache, const gchar *nam hdr = map; - if (memcmp (hdr->magic, rspamd_symbols_cache_magic, - sizeof (rspamd_symbols_cache_magic)) != 0) { + if (memcmp (hdr->magic, rspamd_symcache_magic, + sizeof (rspamd_symcache_magic)) != 0) { msg_info_cache ("cannot use file %s, bad magic", name); munmap (map, st.st_size); rspamd_file_unlock (fd, FALSE); @@ -729,9 +739,9 @@ rspamd_symbols_cache_load_items (struct rspamd_symcache *cache, const gchar *nam #define ROUND_DOUBLE(x) (floor((x) * 100.0) / 100.0) static gboolean -rspamd_symbols_cache_save_items (struct rspamd_symcache *cache, const gchar *name) +rspamd_symcache_save_items (struct rspamd_symcache *cache, const gchar *name) { - struct rspamd_symbols_cache_header hdr; + struct rspamd_symcache_header hdr; ucl_object_t *top, *elt, *freq; GHashTableIter it; struct rspamd_symcache_item *item; @@ -763,8 +773,8 @@ rspamd_symbols_cache_save_items (struct rspamd_symcache *cache, const gchar *nam rspamd_file_lock (fd, FALSE); memset (&hdr, 0, sizeof (hdr)); - memcpy (hdr.magic, rspamd_symbols_cache_magic, - sizeof (rspamd_symbols_cache_magic)); + memcpy (hdr.magic, rspamd_symcache_magic, + sizeof (rspamd_symcache_magic)); if (write (fd, &hdr, sizeof (hdr)) == -1) { msg_info_cache ("cannot write to file %s, error %d, %s", path, @@ -1024,7 +1034,7 @@ rspamd_symcache_save (struct rspamd_symcache *cache) if (cache->cfg->cache_filename) { /* Try to sync values to the disk */ - if (!rspamd_symbols_cache_save_items (cache, + if (!rspamd_symcache_save_items (cache, cache->cfg->cache_filename)) { msg_err_cache ("cannot save cache data to %s", cache->cfg->cache_filename); @@ -1071,7 +1081,7 @@ rspamd_symcache_destroy (struct rspamd_symcache *cache) } g_hash_table_destroy (cache->items_by_symbol); - g_ptr_array_free (cache->items_by_order, TRUE); + g_ptr_array_free (cache->items_by_id, TRUE); rspamd_mempool_delete (cache->static_pool); g_ptr_array_free (cache->filters, TRUE); g_ptr_array_free (cache->prefilters, TRUE); @@ -1099,7 +1109,7 @@ rspamd_symcache_new (struct rspamd_config *cfg) rspamd_mempool_new (rspamd_mempool_suggest_size (), "symcache"); cache->items_by_symbol = g_hash_table_new (rspamd_str_hash, rspamd_str_equal); - cache->items_by_order = g_ptr_array_new (); + cache->items_by_id = g_ptr_array_new (); cache->filters = g_ptr_array_new (); cache->prefilters = g_ptr_array_new (); cache->postfilters = g_ptr_array_new (); @@ -1129,20 +1139,20 @@ rspamd_symcache_init (struct rspamd_symcache *cache) /* Just in-memory cache */ if (cache->cfg->cache_filename == NULL) { - rspamd_symbols_cache_post_init (cache); + rspamd_symcache_post_init (cache); return TRUE; } /* Copy saved cache entries */ - res = rspamd_symbols_cache_load_items (cache, cache->cfg->cache_filename); - rspamd_symbols_cache_post_init (cache); + res = rspamd_symcache_load_items (cache, cache->cfg->cache_filename); + rspamd_symcache_post_init (cache); return res; } static void -rspamd_symbols_cache_validate_cb (gpointer k, gpointer v, gpointer ud) +rspamd_symcache_validate_cb (gpointer k, gpointer v, gpointer ud) { struct rspamd_symcache_item *item = v, *parent; struct rspamd_config *cfg; @@ -1220,7 +1230,7 @@ rspamd_symbols_cache_validate_cb (gpointer k, gpointer v, gpointer ud) } static void -rspamd_symbols_cache_metric_validate_cb (gpointer k, gpointer v, gpointer ud) +rspamd_symcache_metric_validate_cb (gpointer k, gpointer v, gpointer ud) { struct rspamd_symcache *cache = (struct rspamd_symcache *)ud; const gchar *sym = k; @@ -1254,11 +1264,11 @@ rspamd_symcache_validate (struct rspamd_symcache *cache, /* Now adjust symbol weights according to default metric */ g_hash_table_foreach (cfg->symbols, - rspamd_symbols_cache_metric_validate_cb, + rspamd_symcache_metric_validate_cb, cache); g_hash_table_foreach (cache->items_by_symbol, - rspamd_symbols_cache_validate_cb, + rspamd_symcache_validate_cb, cache); /* Now check each metric item and find corresponding symbol in a cache */ g_hash_table_iter_init (&it, cfg->symbols); @@ -1291,7 +1301,7 @@ rspamd_symcache_validate (struct rspamd_symcache *cache, /* Return true if metric has score that is more than spam score for it */ static gboolean -rspamd_symbols_cache_metric_limit (struct rspamd_task *task, +rspamd_symcache_metric_limit (struct rspamd_task *task, struct cache_savepoint *cp) { struct rspamd_metric_result *res; @@ -1329,7 +1339,7 @@ rspamd_symbols_cache_metric_limit (struct rspamd_task *task, } static gboolean -rspamd_symbols_cache_check_symbol (struct rspamd_task *task, +rspamd_symcache_check_symbol (struct rspamd_task *task, struct rspamd_symcache *cache, struct rspamd_symcache_item *item, struct cache_savepoint *checkpoint) @@ -1338,23 +1348,25 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task, struct rspamd_task **ptask; lua_State *L; gboolean check = TRUE; + struct rspamd_symcache_dynamic_item *dyn_item = + rspamd_symcache_get_dynamic (checkpoint, item); - if (item->type & SYMBOL_TYPE_CLASSIFIER) { + if (item->type & (SYMBOL_TYPE_CLASSIFIER|SYMBOL_TYPE_COMPOSITE)) { /* Classifiers are special :( */ return TRUE; } g_assert (!item->is_virtual); g_assert (item->specific.normal.func != NULL); - if (CHECK_START_BIT (checkpoint, item)) { + if (CHECK_START_BIT (checkpoint, dyn_item)) { /* * This can actually happen when deps span over different layers */ - return CHECK_FINISH_BIT (checkpoint, item); + return CHECK_FINISH_BIT (checkpoint, dyn_item); } /* Check has been started */ - SET_START_BIT (checkpoint, item); + SET_START_BIT (checkpoint, dyn_item); if (!item->enabled || (RSPAMD_TASK_IS_EMPTY (task) && !(item->type & SYMBOL_TYPE_EMPTY))) { @@ -1390,8 +1402,8 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task, #else t1 = rspamd_get_ticks (FALSE); #endif - item->start_ticks = t1; - item->async_events = 0; + dyn_item->start_msec = (t1 - task->time_real) * 1e3; + dyn_item->async_events = 0; g_assert (checkpoint->cur_item == NULL); checkpoint->cur_item = item; checkpoint->items_inflight ++; @@ -1404,7 +1416,7 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task, return TRUE; } - if (item->async_events == 0 && !CHECK_FINISH_BIT (checkpoint, item)) { + if (dyn_item->async_events == 0 && !CHECK_FINISH_BIT (checkpoint, dyn_item)) { msg_err_cache ("critical error: item %s has no async events pending, " "but it is not finalised", item->symbol); g_assert_not_reached (); @@ -1415,14 +1427,14 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task, else { msg_debug_cache_task ("skipping check of %s as its start condition is false", item->symbol); - SET_FINISH_BIT (checkpoint, item); + SET_FINISH_BIT (checkpoint, dyn_item); } return TRUE; } static gboolean -rspamd_symbols_cache_check_deps (struct rspamd_task *task, +rspamd_symcache_check_deps (struct rspamd_task *task, struct rspamd_symcache *cache, struct rspamd_symcache_item *item, struct cache_savepoint *checkpoint, @@ -1433,6 +1445,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, guint i; gboolean ret = TRUE; static const guint max_recursion = 20; + struct rspamd_symcache_dynamic_item *dyn_item; if (recursion > max_recursion) { msg_err_task ("cyclic dependencies: maximum check level %ud exceed when " @@ -1452,11 +1465,13 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, continue; } - if (!CHECK_FINISH_BIT (checkpoint, dep->item)) { - if (!CHECK_START_BIT (checkpoint, dep->item)) { + dyn_item = rspamd_symcache_get_dynamic (checkpoint, dep->item); + + if (!CHECK_FINISH_BIT (checkpoint, dyn_item)) { + if (!CHECK_START_BIT (checkpoint, dyn_item)) { /* Not started */ if (!check_only) { - if (!rspamd_symbols_cache_check_deps (task, cache, + if (!rspamd_symcache_check_deps (task, cache, dep->item, checkpoint, recursion + 1, @@ -1467,7 +1482,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, "symbol %d(%s)", dep->id, dep->sym, item->id, item->symbol); } - else if (!rspamd_symbols_cache_check_symbol (task, cache, + else if (!rspamd_symcache_check_symbol (task, cache, dep->item, checkpoint)) { /* Now started, but has events pending */ @@ -1512,17 +1527,8 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, return ret; } -#define BITS_PER_UINT64 (NBBY * sizeof (guint64)) -#define UINT64_BITMAP_SIZE(nbits) (((nbits) + BITS_PER_UINT64 - 1) / BITS_PER_UINT64) -#define ALLOC_BITMAP(bmap, nelts) do { \ - (bmap).started = rspamd_mempool_alloc0 (task->task_pool, \ - UINT64_BITMAP_SIZE (nelts) * sizeof (guint64)); \ - (bmap).finished = rspamd_mempool_alloc0 (task->task_pool, \ - UINT64_BITMAP_SIZE (nelts) * sizeof (guint64)); \ -} while (0) - static struct cache_savepoint * -rspamd_symbols_cache_make_checkpoint (struct rspamd_task *task, +rspamd_symcache_make_checkpoint (struct rspamd_task *task, struct rspamd_symcache *cache) { struct cache_savepoint *checkpoint; @@ -1534,25 +1540,21 @@ rspamd_symbols_cache_make_checkpoint (struct rspamd_task *task, msg_info_cache ("symbols cache has been modified since last check:" " old id: %ud, new id: %ud", cache->items_by_order->id, cache->id); - rspamd_symbols_cache_resort (cache); + rspamd_symcache_resort (cache); } - checkpoint = rspamd_mempool_alloc0 (task->task_pool, sizeof (*checkpoint)); - - ALLOC_BITMAP (checkpoint->prefilters, cache->prefilters->len); - ALLOC_BITMAP (checkpoint->filters, cache->filters->len); - ALLOC_BITMAP (checkpoint->postfilters, cache->postfilters->len); - ALLOC_BITMAP (checkpoint->idempotent, cache->idempotent->len); + checkpoint = rspamd_mempool_alloc0 (task->task_pool, + sizeof (*checkpoint) + + sizeof (struct rspamd_symcache_dynamic_item) * cache->items_by_id->len); g_assert (cache->items_by_order != NULL); checkpoint->version = cache->items_by_order->d->len; checkpoint->order = cache->items_by_order; REF_RETAIN (checkpoint->order); rspamd_mempool_add_destructor (task->task_pool, - rspamd_symbols_cache_order_unref, checkpoint->order); + rspamd_symcache_order_unref, checkpoint->order); checkpoint->pass = RSPAMD_CACHE_PASS_INIT; - checkpoint->cur_bits = &checkpoint->prefilters; task->checkpoint = checkpoint; task->result = task->result; @@ -1583,12 +1585,12 @@ rspamd_symcache_process_settings (struct rspamd_task *task, if (enabled) { /* Disable all symbols but selected */ - rspamd_symbols_cache_disable_all_symbols (task, cache); + rspamd_symcache_disable_all_symbols (task, cache); already_disabled = TRUE; it = NULL; while ((cur = ucl_iterate_object (enabled, &it, true)) != NULL) { - rspamd_symbols_cache_enable_symbol_checkpoint (task, cache, + rspamd_symcache_enable_symbol_checkpoint (task, cache, ucl_object_tostring (cur)); } } @@ -1600,7 +1602,7 @@ rspamd_symcache_process_settings (struct rspamd_task *task, it = NULL; if (!already_disabled) { - rspamd_symbols_cache_disable_all_symbols (task, cache); + rspamd_symcache_disable_all_symbols (task, cache); } while ((cur = ucl_iterate_object (enabled, &it, true)) != NULL) { @@ -1612,7 +1614,7 @@ rspamd_symcache_process_settings (struct rspamd_task *task, g_hash_table_iter_init (&gr_it, gr->symbols); while (g_hash_table_iter_next (&gr_it, &k, &v)) { - rspamd_symbols_cache_enable_symbol_checkpoint (task, cache, k); + rspamd_symcache_enable_symbol_checkpoint (task, cache, k); } } } @@ -1625,7 +1627,7 @@ rspamd_symcache_process_settings (struct rspamd_task *task, it = NULL; while ((cur = ucl_iterate_object (disabled, &it, true)) != NULL) { - rspamd_symbols_cache_disable_symbol_checkpoint (task, cache, + rspamd_symcache_disable_symbol_checkpoint (task, cache, ucl_object_tostring (cur)); } } @@ -1645,7 +1647,7 @@ rspamd_symcache_process_settings (struct rspamd_task *task, g_hash_table_iter_init (&gr_it, gr->symbols); while (g_hash_table_iter_next (&gr_it, &k, &v)) { - rspamd_symbols_cache_disable_symbol_checkpoint (task, cache, k); + rspamd_symcache_disable_symbol_checkpoint (task, cache, k); } } } @@ -1660,6 +1662,7 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, struct rspamd_symcache *cache, gint stage) { struct rspamd_symcache_item *item = NULL; + struct rspamd_symcache_dynamic_item *dyn_item; struct cache_savepoint *checkpoint; gint i; gboolean all_done; @@ -1669,7 +1672,7 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, g_assert (cache != NULL); if (task->checkpoint == NULL) { - checkpoint = rspamd_symbols_cache_make_checkpoint (task, cache); + checkpoint = rspamd_symcache_make_checkpoint (task, cache); task->checkpoint = checkpoint; } else { @@ -1695,17 +1698,17 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, /* Check for prefilters */ saved_priority = G_MININT; all_done = TRUE; - checkpoint->cur_bits = &checkpoint->prefilters; for (i = 0; i < (gint)cache->prefilters->len; i ++) { item = g_ptr_array_index (cache->prefilters, i); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); if (RSPAMD_TASK_IS_SKIPPED (task)) { return TRUE; } - if (!CHECK_START_BIT (checkpoint, item) && - !CHECK_FINISH_BIT (checkpoint, item)) { + if (!CHECK_START_BIT (checkpoint, dyn_item) && + !CHECK_FINISH_BIT (checkpoint, dyn_item)) { /* Check priorities */ if (saved_priority == G_MININT) { saved_priority = item->priority; @@ -1723,7 +1726,7 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, } } - rspamd_symbols_cache_check_symbol (task, cache, item, + rspamd_symcache_check_symbol (task, cache, item, checkpoint); all_done = FALSE; } @@ -1746,7 +1749,6 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, * we just save it for another pass */ all_done = TRUE; - checkpoint->cur_bits = &checkpoint->filters; for (i = 0; i < (gint)checkpoint->version; i ++) { if (RSPAMD_TASK_IS_SKIPPED (task)) { @@ -1754,15 +1756,16 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, } item = g_ptr_array_index (checkpoint->order->d, i); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); if (item->type & SYMBOL_TYPE_CLASSIFIER) { continue; } - if (!CHECK_START_BIT (checkpoint, item)) { + if (!CHECK_START_BIT (checkpoint, dyn_item)) { all_done = FALSE; - if (!rspamd_symbols_cache_check_deps (task, cache, item, + if (!rspamd_symcache_check_deps (task, cache, item, checkpoint, 0, FALSE)) { msg_debug_cache_task ("blocked execution of %d(%s) unless deps are " @@ -1772,12 +1775,12 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, continue; } - rspamd_symbols_cache_check_symbol (task, cache, item, + rspamd_symcache_check_symbol (task, cache, item, checkpoint); } if (!(item->type & SYMBOL_TYPE_FINE)) { - if (rspamd_symbols_cache_metric_limit (task, checkpoint)) { + if (rspamd_symcache_metric_limit (task, checkpoint)) { msg_info_task ("<%s> has already scored more than %.2f, so do " "not " "plan more checks", task->message_id, @@ -1803,7 +1806,6 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, /* Check for postfilters */ saved_priority = G_MININT; all_done = TRUE; - checkpoint->cur_bits = &checkpoint->postfilters; for (i = 0; i < (gint)cache->postfilters->len; i ++) { if (RSPAMD_TASK_IS_SKIPPED (task)) { @@ -1811,9 +1813,10 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, } item = g_ptr_array_index (cache->postfilters, i); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - if (!CHECK_START_BIT (checkpoint, item) && - !CHECK_FINISH_BIT (checkpoint, item)) { + if (!CHECK_START_BIT (checkpoint, dyn_item) && + !CHECK_FINISH_BIT (checkpoint, dyn_item)) { /* Check priorities */ all_done = FALSE; @@ -1833,7 +1836,7 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, } } - rspamd_symbols_cache_check_symbol (task, cache, item, + rspamd_symcache_check_symbol (task, cache, item, checkpoint); } } @@ -1856,13 +1859,13 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, case RSPAMD_CACHE_PASS_IDEMPOTENT: /* Check for postfilters */ saved_priority = G_MININT; - checkpoint->cur_bits = &checkpoint->idempotent; for (i = 0; i < (gint)cache->idempotent->len; i ++) { item = g_ptr_array_index (cache->idempotent, i); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - if (!CHECK_START_BIT (checkpoint, item) && - !CHECK_FINISH_BIT (checkpoint, item)) { + if (!CHECK_START_BIT (checkpoint, dyn_item) && + !CHECK_FINISH_BIT (checkpoint, dyn_item)) { /* Check priorities */ if (saved_priority == G_MININT) { saved_priority = item->priority; @@ -1879,7 +1882,7 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, return TRUE; } } - rspamd_symbols_cache_check_symbol (task, cache, item, + rspamd_symcache_check_symbol (task, cache, item, checkpoint); } } @@ -1888,12 +1891,12 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, case RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT: all_done = TRUE; - checkpoint->cur_bits = &checkpoint->idempotent; for (i = 0; i < (gint)cache->idempotent->len; i ++) { item = g_ptr_array_index (cache->idempotent, i); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - if (!CHECK_FINISH_BIT (checkpoint, item)) { + if (!CHECK_FINISH_BIT (checkpoint, dyn_item)) { all_done = FALSE; break; } @@ -1922,7 +1925,7 @@ struct counters_cbdata { #define ROUND_DOUBLE(x) (floor((x) * 100.0) / 100.0) static void -rspamd_symbols_cache_counters_cb (gpointer k, gpointer v, gpointer ud) +rspamd_symcache_counters_cb (gpointer k, gpointer v, gpointer ud) { struct counters_cbdata *cbd = ud; ucl_object_t *obj, *top; @@ -1982,13 +1985,13 @@ rspamd_symcache_counters (struct rspamd_symcache *cache) cbd.top = top; cbd.cache = cache; g_hash_table_foreach (cache->items_by_symbol, - rspamd_symbols_cache_counters_cb, &cbd); + rspamd_symcache_counters_cb, &cbd); return top; } static void -rspamd_symbols_cache_call_peak_cb (struct event_base *ev_base, +rspamd_symcache_call_peak_cb (struct event_base *ev_base, struct rspamd_symcache *cache, struct rspamd_symcache_item *item, gdouble cur_value, @@ -2015,7 +2018,7 @@ rspamd_symbols_cache_call_peak_cb (struct event_base *ev_base, } static void -rspamd_symbols_cache_resort_cb (gint fd, short what, gpointer ud) +rspamd_symcache_resort_cb (gint fd, short what, gpointer ud) { struct timeval tv; gdouble tm; @@ -2032,7 +2035,7 @@ rspamd_symbols_cache_resort_cb (gint fd, short what, gpointer ud) cur_ticks = rspamd_get_ticks (FALSE); msg_debug_cache ("resort symbols cache, next reload in %.2f seconds", tm); g_assert (cache != NULL); - evtimer_set (&cbdata->resort_ev, rspamd_symbols_cache_resort_cb, cbdata); + evtimer_set (&cbdata->resort_ev, rspamd_symcache_resort_cb, cbdata); event_base_set (cbdata->ev_base, &cbdata->resort_ev); double_to_tv (tm, &tv); event_add (&cbdata->resort_ev, &tv); @@ -2078,7 +2081,7 @@ rspamd_symbols_cache_resort_cb (gint fd, short what, gpointer ud) item->frequency_peaks); if (cache->peak_cb != -1) { - rspamd_symbols_cache_call_peak_cb (cbdata->ev_base, + rspamd_symcache_call_peak_cb (cbdata->ev_base, cache, item, cur_value, cur_err); } @@ -2120,7 +2123,7 @@ rspamd_symcache_start_refresh (struct rspamd_symcache *cache, tm = rspamd_time_jitter (cache->reload_time, 0); msg_debug_cache ("next reload in %.2f seconds", tm); g_assert (cache != NULL); - evtimer_set (&cbdata->resort_ev, rspamd_symbols_cache_resort_cb, + evtimer_set (&cbdata->resort_ev, rspamd_symcache_resort_cb, cbdata); event_base_set (ev_base, &cbdata->resort_ev); double_to_tv (tm, &tv); @@ -2201,8 +2204,11 @@ rspamd_symcache_find_symbol (struct rspamd_symcache *cache, const gchar *name) gboolean rspamd_symcache_stat_symbol (struct rspamd_symcache *cache, - const gchar *name, gdouble *frequency, gdouble *freq_stddev, - gdouble *tm, guint *nhits) + const gchar *name, + gdouble *frequency, + gdouble *freq_stddev, + gdouble *tm, + guint *nhits) { struct rspamd_symcache_item *item; @@ -2229,37 +2235,6 @@ rspamd_symcache_stat_symbol (struct rspamd_symcache *cache, return FALSE; } -static gint -rspamd_symbols_cache_find_filter (struct rspamd_symcache *cache, - const gchar *name) -{ - struct rspamd_symcache_item *item; - - g_assert (cache != NULL); - - if (name == NULL) { - return -1; - } - - item = g_hash_table_lookup (cache->items_by_symbol, name); - - if (item != NULL) { - - if (item->is_virtual) { - item = g_ptr_array_index (cache->filters, - item->specific.virtual.parent); - } - - if (!item->is_filter) { - return -1; - } - - return item->id; - } - - return -1; -} - const gchar * rspamd_symcache_symbol_by_id (struct rspamd_symcache *cache, gint id) @@ -2285,101 +2260,58 @@ rspamd_symcache_stats_symbols_count (struct rspamd_symcache *cache) return cache->stats_symbols_count; } -#define DISABLE_BITS(what) do { \ - memset (checkpoint->what.started, 0xff, NBYTES (cache->what->len)); \ - memset (checkpoint->what.finished, 0xff, NBYTES (cache->what->len)); \ -} while(0) static void -rspamd_symbols_cache_disable_all_symbols (struct rspamd_task *task, +rspamd_symcache_disable_all_symbols (struct rspamd_task *task, struct rspamd_symcache *cache) { struct cache_savepoint *checkpoint; guint i; struct rspamd_symcache_item *item; + struct rspamd_symcache_dynamic_item *dyn_item; if (task->checkpoint == NULL) { - checkpoint = rspamd_symbols_cache_make_checkpoint (task, cache); + checkpoint = rspamd_symcache_make_checkpoint (task, cache); task->checkpoint = checkpoint; } else { checkpoint = task->checkpoint; } - /* Set all symbols as started + finished to disable their execution */ - DISABLE_BITS(prefilters); - DISABLE_BITS(filters); - DISABLE_BITS(postfilters); - DISABLE_BITS(idempotent); - /* Enable for squeezed symbols */ - PTR_ARRAY_FOREACH (cache->squeezed, i, item) { - clrbit (checkpoint->filters.started, item->id); - clrbit (checkpoint->filters.finished, item->id); - } -} - -#undef DISABLE_BITS - -static struct rspamd_symcache_item * -rspamd_symbols_cache_get_item_and_bits (struct rspamd_task *task, - struct rspamd_symcache *cache, - struct cache_savepoint *checkpoint, - const gchar *symbol, - struct cache_bits **bits) -{ - struct rspamd_symcache_item *item; - - item = g_hash_table_lookup (cache->items_by_symbol, symbol); + PTR_ARRAY_FOREACH (cache->items_by_id, i, item) { + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - if (item) { - if (item->is_virtual) { - item = g_ptr_array_index (cache->filters, item->specific.virtual.parent); - *bits = &checkpoint->filters; - } - else { - if (item->type & SYMBOL_TYPE_PREFILTER) { - *bits = &checkpoint->prefilters; - } - else if (item->type & SYMBOL_TYPE_POSTFILTER) { - *bits = &checkpoint->postfilters; - } - else if (item->type & SYMBOL_TYPE_IDEMPOTENT) { - *bits = &checkpoint->idempotent; - } - else { - *bits = &checkpoint->filters; - } + if (!(item->type & SYMBOL_TYPE_SQUEEZED)) { + SET_FINISH_BIT (checkpoint, dyn_item); + SET_START_BIT (checkpoint, dyn_item); } } - - return item; } static void -rspamd_symbols_cache_disable_symbol_checkpoint (struct rspamd_task *task, +rspamd_symcache_disable_symbol_checkpoint (struct rspamd_task *task, struct rspamd_symcache *cache, const gchar *symbol) { struct cache_savepoint *checkpoint; struct rspamd_symcache_item *item; - struct cache_bits *bits = NULL; + struct rspamd_symcache_dynamic_item *dyn_item; if (task->checkpoint == NULL) { - checkpoint = rspamd_symbols_cache_make_checkpoint (task, cache); + checkpoint = rspamd_symcache_make_checkpoint (task, cache); task->checkpoint = checkpoint; } else { checkpoint = task->checkpoint; } - item = rspamd_symbols_cache_get_item_and_bits (task, cache, checkpoint, - symbol, &bits); + item = rspamd_symcache_find_filter (cache, symbol); if (item) { - if (!(item->type & SYMBOL_TYPE_SQUEEZED)) { - setbit (bits->started, item->id); - setbit (bits->finished, item->id); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + SET_FINISH_BIT (checkpoint, dyn_item); + SET_START_BIT (checkpoint, dyn_item); msg_debug_cache_task ("disable execution of %s", symbol); } else { @@ -2392,33 +2324,32 @@ rspamd_symbols_cache_disable_symbol_checkpoint (struct rspamd_task *task, } static void -rspamd_symbols_cache_enable_symbol_checkpoint (struct rspamd_task *task, +rspamd_symcache_enable_symbol_checkpoint (struct rspamd_task *task, struct rspamd_symcache *cache, const gchar *symbol) { struct cache_savepoint *checkpoint; struct rspamd_symcache_item *item; - struct cache_bits *bits = NULL; + struct rspamd_symcache_dynamic_item *dyn_item; if (task->checkpoint == NULL) { - checkpoint = rspamd_symbols_cache_make_checkpoint (task, cache); + checkpoint = rspamd_symcache_make_checkpoint (task, cache); task->checkpoint = checkpoint; } else { checkpoint = task->checkpoint; } - item = rspamd_symbols_cache_get_item_and_bits (task, cache, checkpoint, - symbol, &bits); + item = rspamd_symcache_find_filter (cache, symbol); if (item) { - if (!(item->type & SYMBOL_TYPE_SQUEEZED)) { - clrbit (bits->started, item->id); - clrbit (bits->finished, item->id); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + dyn_item->finished = 0; + dyn_item->started = 0; msg_debug_cache_task ("enable execution of %s", symbol); } else { - msg_debug_cache_task ("skip enabling of squeezed symbol %s", symbol); + msg_debug_cache_task ("skip enabling squeezed symbol %s", symbol); } } else { @@ -2435,14 +2366,9 @@ rspamd_symcache_get_cbdata (struct rspamd_symcache *cache, g_assert (cache != NULL); g_assert (symbol != NULL); - item = g_hash_table_lookup (cache->items_by_symbol, symbol); + item = rspamd_symcache_find_filter (cache, symbol); if (item) { - - if (item->is_virtual) { - item = g_ptr_array_index (cache->filters, item->specific.virtual.parent); - } - return item->specific.normal.user_data; } @@ -2455,24 +2381,24 @@ rspamd_symcache_is_checked (struct rspamd_task *task, { struct cache_savepoint *checkpoint; struct rspamd_symcache_item *item; - struct cache_bits *bits = NULL; + struct rspamd_symcache_dynamic_item *dyn_item; g_assert (cache != NULL); g_assert (symbol != NULL); if (task->checkpoint == NULL) { - checkpoint = rspamd_symbols_cache_make_checkpoint (task, cache); + checkpoint = rspamd_symcache_make_checkpoint (task, cache); task->checkpoint = checkpoint; } else { checkpoint = task->checkpoint; } - item = rspamd_symbols_cache_get_item_and_bits (task, cache, checkpoint, - symbol, &bits); + item = rspamd_symcache_find_filter (cache, symbol); if (item) { - return isset (bits->started, item->id); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + return dyn_item->started; } return FALSE; @@ -2521,13 +2447,14 @@ rspamd_symcache_get_cksum (struct rspamd_symcache *cache) gboolean rspamd_symcache_is_symbol_enabled (struct rspamd_task *task, - struct rspamd_symcache *cache, const gchar *symbol) + struct rspamd_symcache *cache, + const gchar *symbol) { struct cache_savepoint *checkpoint; struct rspamd_symcache_item *item; + struct rspamd_symcache_dynamic_item *dyn_item; lua_State *L; struct rspamd_task **ptask; - struct cache_bits *bits = NULL; gboolean ret = TRUE; g_assert (cache != NULL); @@ -2537,11 +2464,11 @@ rspamd_symcache_is_symbol_enabled (struct rspamd_task *task, if (checkpoint) { - item = rspamd_symbols_cache_get_item_and_bits (task, cache, checkpoint, - symbol, &bits); + item = rspamd_symcache_find_filter (cache, symbol); if (item) { - if (CHECK_START_BIT (checkpoint, item)) { + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + if (CHECK_START_BIT (checkpoint, dyn_item)) { ret = FALSE; } else { @@ -2630,15 +2557,17 @@ rspamd_symcache_finalize_item (struct rspamd_task *task, { struct cache_savepoint *checkpoint = task->checkpoint; struct cache_dependency *rdep; + struct rspamd_symcache_dynamic_item *dyn_item; gdouble t2, diff; guint i; struct timeval tv; - const gdouble slow_diff_limit = 0.3; + const gdouble slow_diff_limit = 300; /* Sanity checks */ g_assert (checkpoint->items_inflight > 0); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - if (item->async_events > 0) { + if (dyn_item->async_events > 0) { /* * XXX: Race condition * @@ -2649,13 +2578,13 @@ rspamd_symcache_finalize_item (struct rspamd_task *task, */ msg_debug_cache_task ("postpone finalisation of %s(%d) as there are %d " "async events pendning", - item->symbol, item->id, item->async_events); + item->symbol, item->id, dyn_item->async_events); return; } msg_debug_cache_task ("process finalize for item %s(%d)", item->symbol, item->id); - SET_FINISH_BIT (checkpoint, item); + SET_FINISH_BIT (checkpoint, dyn_item); checkpoint->items_inflight --; checkpoint->cur_item = NULL; @@ -2667,7 +2596,7 @@ rspamd_symcache_finalize_item (struct rspamd_task *task, t2 = rspamd_get_ticks (FALSE); #endif - diff = (t2 - item->start_ticks); + diff = ((t2 - task->time_real) * 1e3 - dyn_item->start_msec); if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) { rspamd_task_profile_set (task, item->symbol, diff); @@ -2676,7 +2605,7 @@ rspamd_symcache_finalize_item (struct rspamd_task *task, if (!(item->type & SYMBOL_TYPE_SQUEEZED)) { if (diff > slow_diff_limit) { msg_info_task ("slow rule: %s(%d): %.2f ms", item->symbol, item->id, - diff * 1000); + diff); } if (rspamd_worker_is_scanner (task->worker)) { @@ -2687,11 +2616,12 @@ rspamd_symcache_finalize_item (struct rspamd_task *task, /* Process all reverse dependencies */ PTR_ARRAY_FOREACH (item->rdeps, i, rdep) { if (rdep->item) { - if (!CHECK_START_BIT (checkpoint, rdep->item)) { + dyn_item = rspamd_symcache_get_dynamic (checkpoint, rdep->item); + if (!CHECK_START_BIT (checkpoint, dyn_item)) { msg_debug_cache_task ("check item %d(%s) rdep of %s ", rdep->item->id, rdep->item->symbol, item->symbol); - if (!rspamd_symbols_cache_check_deps (task, task->cfg->cache, + if (!rspamd_symcache_check_deps (task, task->cfg->cache, rdep->item, checkpoint, 0, FALSE)) { msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s " @@ -2699,7 +2629,7 @@ rspamd_symcache_finalize_item (struct rspamd_task *task, rdep->item->id, rdep->item->symbol, item->symbol); } else { - rspamd_symbols_cache_check_symbol (task, task->cfg->cache, + rspamd_symcache_check_symbol (task, task->cfg->cache, rdep->item, checkpoint); } @@ -2714,9 +2644,14 @@ rspamd_symcache_item_async_inc_full (struct rspamd_task *task, const gchar *subsystem, const gchar *loc) { - msg_debug_cache_task ("increase async events counter for %s(%d) = %d + 1; subsystem %s (%s)", - item->symbol, item->id, item->async_events, subsystem, loc); - return ++item->async_events; + struct rspamd_symcache_dynamic_item *dyn_item; + struct cache_savepoint *checkpoint = task->checkpoint; + + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + msg_debug_cache_task ("increase async events counter for %s(%d) = %d + 1; " + "subsystem %s (%s)", + item->symbol, item->id, dyn_item->async_events, subsystem, loc); + return ++dyn_item->async_events; } guint @@ -2725,11 +2660,16 @@ rspamd_symcache_item_async_dec_full (struct rspamd_task *task, const gchar *subsystem, const gchar *loc) { - msg_debug_cache_task ("decrease async events counter for %s(%d) = %d - 1; subsystem %s (%s)", - item->symbol, item->id, item->async_events, subsystem, loc); - g_assert (item->async_events > 0); + struct rspamd_symcache_dynamic_item *dyn_item; + struct cache_savepoint *checkpoint = task->checkpoint; + + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + msg_debug_cache_task ("decrease async events counter for %s(%d) = %d - 1; " + "subsystem %s (%s)", + item->symbol, item->id, dyn_item->async_events, subsystem, loc); + g_assert (dyn_item->async_events > 0); - return --item->async_events; + return --dyn_item->async_events; } gboolean -- 2.39.5