diff options
-rw-r--r-- | src/libserver/CMakeLists.txt | 5 | ||||
-rw-r--r-- | src/libserver/rspamd_symcache.cxx | 616 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_c.cxx | 48 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_impl.cxx | 292 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_internal.hxx | 385 | ||||
-rw-r--r-- | src/libutil/CMakeLists.txt | 3 |
6 files changed, 730 insertions, 619 deletions
diff --git a/src/libserver/CMakeLists.txt b/src/libserver/CMakeLists.txt index 7371e8ade..17f5ca751 100644 --- a/src/libserver/CMakeLists.txt +++ b/src/libserver/CMakeLists.txt @@ -16,11 +16,12 @@ SET(LIBRSPAMDSERVERSRC ${CMAKE_CURRENT_SOURCE_DIR}/monitored.c ${CMAKE_CURRENT_SOURCE_DIR}/protocol.c ${CMAKE_CURRENT_SOURCE_DIR}/re_cache.c - ${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.cxx + ${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.cxx ${CMAKE_CURRENT_SOURCE_DIR}/roll_history.c ${CMAKE_CURRENT_SOURCE_DIR}/spf.c ${CMAKE_CURRENT_SOURCE_DIR}/ssl_util.c - ${CMAKE_CURRENT_SOURCE_DIR}/rspamd_symcache.cxx + ${CMAKE_CURRENT_SOURCE_DIR}/symcache/symcache_impl.cxx + ${CMAKE_CURRENT_SOURCE_DIR}/symcache/symcache_c.cxx ${CMAKE_CURRENT_SOURCE_DIR}/task.c ${CMAKE_CURRENT_SOURCE_DIR}/url.c ${CMAKE_CURRENT_SOURCE_DIR}/worker_util.c diff --git a/src/libserver/rspamd_symcache.cxx b/src/libserver/rspamd_symcache.cxx index cd44c2b84..e1e9ade4e 100644 --- a/src/libserver/rspamd_symcache.cxx +++ b/src/libserver/rspamd_symcache.cxx @@ -36,26 +36,6 @@ #include "contrib/robin-hood/robin_hood.h" -#define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ - cache->static_pool->tag.tagname, cache->cfg->checksum, \ - G_STRFUNC, \ - __VA_ARGS__) -#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ - cache->static_pool->tag.tagname, cache->cfg->checksum, \ - G_STRFUNC, \ - __VA_ARGS__) -#define msg_info_cache(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ - cache->static_pool->tag.tagname, cache->cfg->checksum, \ - G_STRFUNC, \ - __VA_ARGS__) -#define msg_debug_cache(...) rspamd_conditional_debug_fast (NULL, NULL, \ - rspamd_symcache_log_id, "symcache", cache->cfg->checksum, \ - G_STRFUNC, \ - __VA_ARGS__) -#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \ - rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \ - G_STRFUNC, \ - __VA_ARGS__) INIT_LOG_MODULE(symcache) @@ -73,311 +53,7 @@ INIT_LOG_MODULE(symcache) #define CLR_FINISH_BIT(checkpoint, dyn_item) \ (dyn_item)->finished = 0 -namespace rspamd::symcache { -static const std::uint8_t rspamd_symcache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0}; - -struct rspamd_symcache_header { - std::uint8_t magic[8]; - unsigned int nitems; - std::uint8_t checksum[64]; - std::uint8_t unused[128]; -}; - -struct cache_item; -using cache_item_ptr = std::shared_ptr<cache_item>; -using cache_item_weak_ptr = std::weak_ptr<cache_item>; - -struct order_generation { - std::vector<cache_item_weak_ptr> d; - unsigned int generation_id; -}; - -using order_generation_ptr = std::shared_ptr<order_generation>; - -/* - * This structure is optimised to store ids list: - * - If the first element is -1 then use dynamic part, else use static part - * There is no std::variant to save space - */ -struct id_list { - union { - std::uint32_t st[4]; - struct { - std::uint32_t e; /* First element */ - std::uint16_t len; - std::uint16_t allocated; - std::uint32_t *n; - } dyn; - } data; - - id_list() { - std::memset((void *)&data, 0, sizeof(data)); - } - /** - * Returns ids from a compressed list, accepting a mutable reference for number of elements - * @param nids output of the number of elements - * @return - */ - auto get_ids(std::size_t &nids) const -> const std::uint32_t * { - if (data.dyn.e == -1) { - /* Dynamic list */ - nids = data.dyn.len; - - return data.dyn.n; - } - else { - auto cnt = 0; - - while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS(data.st)) { - cnt ++; - } - - nids = cnt; - - return data.st; - } - } - - auto add_id(std::uint32_t id, rspamd_mempool_t *pool) -> void { - if (data.st[0] == -1) { - /* Dynamic array */ - if (data.dyn.len < data.dyn.allocated) { - /* Trivial, append + sort */ - data.dyn.n[data.dyn.len++] = id; - } - else { - /* Reallocate */ - g_assert (data.dyn.allocated <= G_MAXINT16); - data.dyn.allocated *= 2; - - auto *new_array = rspamd_mempool_alloc_array_type(pool, - data.dyn.allocated, std::uint32_t); - memcpy(new_array, data.dyn.n, data.dyn.len * sizeof(std::uint32_t)); - data.dyn.n = new_array; - data.dyn.n[data.dyn.len++] = id; - } - - std::sort(data.dyn.n, data.dyn.n + data.dyn.len); - } - else { - /* Static part */ - auto cnt = 0u; - while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS (data.st)) { - cnt ++; - } - - if (cnt < G_N_ELEMENTS (data.st)) { - data.st[cnt] = id; - } - else { - /* Switch to dynamic */ - data.dyn.allocated = G_N_ELEMENTS (data.st) * 2; - auto *new_array = rspamd_mempool_alloc_array_type(pool, - data.dyn.allocated, std::uint32_t); - memcpy (new_array, data.st, sizeof(data.st)); - data.dyn.n = new_array; - data.dyn.e = -1; /* Marker */ - data.dyn.len = G_N_ELEMENTS (data.st); - - /* Recursively jump to dynamic branch that will handle insertion + sorting */ - add_id(id, pool); // tail call - } - } - } -}; - -struct item_condition { -private: - gint cb; - lua_State *L; -public: - item_condition() { - // TODO - } - virtual ~item_condition() { - // TODO - } -}; - -class normal_item { -private: - symbol_func_t func; - void *user_data; - std::vector<item_condition> conditions; -public: - explicit normal_item() { - // TODO - } - auto add_condition() -> void { - // TODO - } - auto call() -> void { - // TODO - } -}; - -class virtual_item { -private: - int parent_id; - cache_item_ptr parent; -public: - explicit virtual_item() { - // TODO - } -}; - -struct cache_item { - /* This block is likely shared */ - struct rspamd_symcache_item_stat *st; - struct rspamd_counter_data *cd; - - std::uint64_t last_count; - std::string symbol; - std::string_view type_descr; - int type; - - /* Callback data */ - std::variant<normal_item, virtual_item> specific; - - /* Condition of execution */ - bool enabled; - - /* Priority */ - int priority; - /* Topological order */ - unsigned int order; - /* Unique id - counter */ - int id; - - int frequency_peaks; - /* Settings ids */ - id_list allowed_ids; - /* Allows execution but not symbols insertion */ - id_list exec_only_ids; - id_list forbidden_ids; - - /* Dependencies */ - std::vector<cache_item_ptr> deps; - /* Reverse dependencies */ - std::vector<cache_item_ptr> rdeps; -}; - -struct delayed_cache_dependency { - std::string from; - std::string to; -}; - -struct delayed_cache_condition { - std::string sym; - int cbref; - lua_State *L; -}; - -struct symcache { - /* Map indexed by symbol name: all symbols must have unique names, so this map holds ownership */ - robin_hood::unordered_flat_map<std::string_view, cache_item_ptr> items_by_symbol; - std::vector<cache_item_weak_ptr> items_by_id; - - /* Items sorted into some order */ - order_generation_ptr items_by_order; - unsigned int cur_order_gen; - - std::vector<cache_item_weak_ptr> connfilters; - std::vector<cache_item_weak_ptr> prefilters; - std::vector<cache_item_weak_ptr> filters; - std::vector<cache_item_weak_ptr> postfilters; - std::vector<cache_item_weak_ptr> composites; - std::vector<cache_item_weak_ptr> idempotent; - std::vector<cache_item_weak_ptr> virtual_symbols; - - /* These are stored within pointer to clean up after init */ - std::unique_ptr<std::vector<delayed_cache_dependency>> delayed_deps; - std::unique_ptr<std::vector<delayed_cache_condition>> delayed_conditions; - - rspamd_mempool_t *static_pool; - std::uint64_t cksum; - double total_weight; - std::size_t used_items; - std::size_t stats_symbols_count; - std::uint64_t total_hits; - - struct rspamd_config *cfg; - lua_State *L; - double reload_time; - double last_profile; - int peak_cb; - int id; - -public: - explicit symcache(struct rspamd_config *cfg) : cfg(cfg) { - /* XXX: do we need a special pool for symcache? I don't think so */ - static_pool = cfg->cfg_pool; - reload_time = cfg->cache_reload_time; - total_hits = 1; - total_weight = 1.0; - cksum = 0xdeadbabe; - peak_cb = -1; - id = rspamd_random_uint64_fast(); - L = (lua_State *)cfg->lua_state; - } - - virtual ~symcache() { - if (peak_cb != -1) { - luaL_unref(L, LUA_REGISTRYINDEX, peak_cb); - } - } -}; - - -/* - * These items are saved within task structure and are used to track - * symbols execution - */ -struct cache_dynamic_item { - std::uint16_t start_msec; /* Relative to task time */ - unsigned started: 1; - unsigned finished: 1; - /* unsigned pad:14; */ - std::uint32_t async_events; -}; - - -struct cache_dependency { - cache_item_ptr item; /* Owning pointer to the real dep */ - std::string_view sym; /* Symbolic dep name */ - int id; /* Real from */ - int vid; /* Virtual from */ -}; - -struct cache_savepoint { - unsigned order_gen; - unsigned items_inflight; - bool profile; - bool has_slow; - - double profile_start; - double lim; - - struct rspamd_scan_result *rs; - - struct cache_item *cur_item; - order_generation_ptr order; - /* Dynamically expanded as needed */ - struct cache_dynamic_item dynamic_items[]; -}; - -struct cache_refresh_cbdata { - double last_resort; - ev_timer resort_ev; - struct symcache *cache; - struct rspamd_worker *w; - struct ev_loop *event_loop; -}; - -} // namespace rspamd - -#define C_API_SYMCACHE(ptr) (reinterpret_cast<rspamd::symcache::symcache *>(ptr)) /* At least once per minute */ #define PROFILE_MAX_TIME (60.0) @@ -831,260 +507,6 @@ rspamd_symcache_process_dep (struct rspamd_symcache *cache, } } -/* Sort items in logical order */ -static void -rspamd_symcache_post_init (struct rspamd_symcache *cache) -{ - struct rspamd_symcache_item *it, *vit; - struct cache_dependency *dep; - struct delayed_cache_dependency *ddep; - struct delayed_cache_condition *dcond; - GList *cur; - gint i, j; - - cur = cache->delayed_deps; - while (cur) { - ddep = cur->data; - - vit = rspamd_symcache_find_filter (cache, ddep->from, false); - it = rspamd_symcache_find_filter (cache, ddep->from, true); - - if (it == NULL || vit == NULL) { - msg_err_cache ("cannot register delayed dependency between %s and %s: " - "%s is missing", ddep->from, ddep->to, ddep->from); - } - else { - msg_debug_cache ("delayed between %s(%d:%d) -> %s", ddep->from, - it->id, vit->id, ddep->to); - rspamd_symcache_add_dependency (cache, it->id, ddep->to, vit != it ? - vit->id : -1); - } - - cur = g_list_next (cur); - } - - cur = cache->delayed_conditions; - while (cur) { - dcond = cur->data; - - it = rspamd_symcache_find_filter (cache, dcond->sym, true); - - if (it == NULL) { - msg_err_cache ( - "cannot register delayed condition for %s", - dcond->sym); - luaL_unref (dcond->L, LUA_REGISTRYINDEX, dcond->cbref); - } - else { - struct rspamd_symcache_condition *ncond = rspamd_mempool_alloc0 (cache->static_pool, - sizeof (*ncond)); - ncond->cb = dcond->cbref; - DL_APPEND (it->specific.normal.conditions, ncond); - } - - cur = g_list_next (cur); - } - - PTR_ARRAY_FOREACH (cache->items_by_id, i, it) { - - PTR_ARRAY_FOREACH (it->deps, j, dep) { - rspamd_symcache_process_dep (cache, it, dep); - } - - if (it->deps) { - /* Reversed loop to make removal safe */ - for (j = it->deps->len - 1; j >= 0; j--) { - dep = g_ptr_array_index (it->deps, j); - - if (dep->item == NULL) { - /* Remove useless dep */ - g_ptr_array_remove_index (it->deps, j); - } - } - } - } - - /* Special case for virtual symbols */ - PTR_ARRAY_FOREACH (cache->virtual, i, it) { - - PTR_ARRAY_FOREACH (it->deps, j, dep) { - rspamd_symcache_process_dep (cache, it, dep); - } - } - - g_ptr_array_sort_with_data (cache->connfilters, prefilters_cmp, cache); - g_ptr_array_sort_with_data (cache->prefilters, prefilters_cmp, cache); - g_ptr_array_sort_with_data (cache->postfilters, postfilters_cmp, cache); - g_ptr_array_sort_with_data (cache->idempotent, postfilters_cmp, cache); - - rspamd_symcache_resort (cache); -} - -static gboolean -rspamd_symcache_load_items (struct rspamd_symcache *cache, const gchar *name) -{ - struct rspamd_symcache_header *hdr; - struct stat st; - struct ucl_parser *parser; - ucl_object_t *top; - const ucl_object_t *cur, *elt; - ucl_object_iter_t it; - struct rspamd_symcache_item *item, *parent; - const guchar *p; - gint fd; - gpointer map; - - fd = open (name, O_RDONLY); - - if (fd == -1) { - msg_info_cache ("cannot open file %s, error %d, %s", name, - errno, strerror (errno)); - return FALSE; - } - - rspamd_file_lock (fd, FALSE); - - if (fstat (fd, &st) == -1) { - rspamd_file_unlock (fd, FALSE); - close (fd); - msg_info_cache ("cannot stat file %s, error %d, %s", name, - errno, strerror (errno)); - return FALSE; - } - - if (st.st_size < (gint)sizeof (*hdr)) { - rspamd_file_unlock (fd, FALSE); - close (fd); - errno = EINVAL; - msg_info_cache ("cannot use file %s, error %d, %s", name, - errno, strerror (errno)); - return FALSE; - } - - map = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, fd, 0); - - if (map == MAP_FAILED) { - rspamd_file_unlock (fd, FALSE); - close (fd); - msg_info_cache ("cannot mmap file %s, error %d, %s", name, - errno, strerror (errno)); - return FALSE; - } - - hdr = map; - - if (memcmp (hdr->magic, rspamd_symcache_magic, - sizeof (rspamd_symcache_magic)) != 0) { - msg_info_cache ("cannot use file %s, bad magic", name); - munmap (map, st.st_size); - rspamd_file_unlock (fd, FALSE); - close (fd); - - return FALSE; - } - - parser = ucl_parser_new (0); - p = (const guchar *)(hdr + 1); - - if (!ucl_parser_add_chunk (parser, p, st.st_size - sizeof (*hdr))) { - msg_info_cache ("cannot use file %s, cannot parse: %s", name, - ucl_parser_get_error (parser)); - munmap (map, st.st_size); - ucl_parser_free (parser); - rspamd_file_unlock (fd, FALSE); - close (fd); - - return FALSE; - } - - top = ucl_parser_get_object (parser); - munmap (map, st.st_size); - rspamd_file_unlock (fd, FALSE); - close (fd); - ucl_parser_free (parser); - - if (top == NULL || ucl_object_type (top) != UCL_OBJECT) { - msg_info_cache ("cannot use file %s, bad object", name); - ucl_object_unref (top); - return FALSE; - } - - it = ucl_object_iterate_new (top); - - while ((cur = ucl_object_iterate_safe (it, true))) { - item = g_hash_table_lookup (cache->items_by_symbol, ucl_object_key (cur)); - - if (item) { - /* Copy saved info */ - /* - * XXX: don't save or load weight, it should be obtained from the - * metric - */ -#if 0 - elt = ucl_object_lookup (cur, "weight"); - - if (elt) { - w = ucl_object_todouble (elt); - if (w != 0) { - item->weight = w; - } - } -#endif - elt = ucl_object_lookup (cur, "time"); - if (elt) { - item->st->avg_time = ucl_object_todouble (elt); - } - - elt = ucl_object_lookup (cur, "count"); - if (elt) { - item->st->total_hits = ucl_object_toint (elt); - item->last_count = item->st->total_hits; - } - - elt = ucl_object_lookup (cur, "frequency"); - if (elt && ucl_object_type (elt) == UCL_OBJECT) { - const ucl_object_t *freq_elt; - - freq_elt = ucl_object_lookup (elt, "avg"); - - if (freq_elt) { - item->st->avg_frequency = ucl_object_todouble (freq_elt); - } - freq_elt = ucl_object_lookup (elt, "stddev"); - - if (freq_elt) { - item->st->stddev_frequency = ucl_object_todouble (freq_elt); - } - } - - if (item->is_virtual && !(item->type & SYMBOL_TYPE_GHOST)) { - g_assert (item->specific.virtual.parent < (gint)cache->items_by_id->len); - parent = g_ptr_array_index (cache->items_by_id, - item->specific.virtual.parent); - item->specific.virtual.parent_item = parent; - - if (parent->st->weight < item->st->weight) { - parent->st->weight = item->st->weight; - } - - /* - * We maintain avg_time for virtual symbols equal to the - * parent item avg_time - */ - item->st->avg_time = parent->st->avg_time; - } - - cache->total_weight += fabs (item->st->weight); - cache->total_hits += item->st->total_hits; - } - } - - ucl_object_iterate_free (it); - ucl_object_unref (top); - - return TRUE; -} - #define ROUND_DOUBLE(x) (floor((x) * 100.0) / 100.0) static gboolean @@ -1438,21 +860,6 @@ rspamd_symcache_save (struct rspamd_symcache *cache) } } -void -rspamd_symcache_destroy (struct rspamd_symcache *cache) -{ - auto *real_cache = C_API_SYMCACHE(cache); - - delete real_cache; -} - -struct rspamd_symcache* -rspamd_symcache_new (struct rspamd_config *cfg) -{ - auto *ncache = new rspamd::symcache::symcache(cfg); - - return (struct rspamd_symcache*)ncache; -} static void rspamd_symcache_metric_connect_cb (gpointer k, gpointer v, gpointer ud) @@ -1472,30 +879,7 @@ rspamd_symcache_metric_connect_cb (gpointer k, gpointer v, gpointer ud) } } -gboolean -rspamd_symcache_init (struct rspamd_symcache *cache) -{ - gboolean res = TRUE; - - g_assert (cache != NULL); - - cache->reload_time = cache->cfg->cache_reload_time; - if (cache->cfg->cache_filename != NULL) { - res = rspamd_symcache_load_items (cache, cache->cfg->cache_filename); - } - - rspamd_symcache_post_init (cache); - - /* Connect metric symbols with symcache symbols */ - if (cache->cfg->symbols) { - g_hash_table_foreach (cache->cfg->symbols, - rspamd_symcache_metric_connect_cb, - cache); - } - - return res; -} static void diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx new file mode 100644 index 000000000..7255f9d10 --- /dev/null +++ b/src/libserver/symcache/symcache_c.cxx @@ -0,0 +1,48 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "symcache_internal.hxx" + +/** + * C API for symcache + */ + +#define C_API_SYMCACHE(ptr) (reinterpret_cast<rspamd::symcache::symcache *>(ptr)) +#define C_API_SYMCACHE_ITEM(ptr) (reinterpret_cast<rspamd::symcache::cache_item *>(ptr)) + +void +rspamd_symcache_destroy (struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + delete real_cache; +} + +struct rspamd_symcache* +rspamd_symcache_new (struct rspamd_config *cfg) +{ + auto *ncache = new rspamd::symcache::symcache(cfg); + + return (struct rspamd_symcache*)ncache; +} + +gboolean +rspamd_symcache_init (struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + return real_cache->init(); +} diff --git a/src/libserver/symcache/symcache_impl.cxx b/src/libserver/symcache/symcache_impl.cxx new file mode 100644 index 000000000..aaf8b0cdd --- /dev/null +++ b/src/libserver/symcache/symcache_impl.cxx @@ -0,0 +1,292 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "symcache_internal.hxx" +#include "unix-std.h" +#include "libutil/cxx/locked_file.hxx" + +namespace rspamd::symcache { + +INIT_LOG_MODULE_PUBLIC(symcache) + +auto symcache::init() -> bool +{ + auto res = true; + reload_time = cfg->cache_reload_time; + + if (cfg->cache_filename != NULL) { + res = load_items(); + } + + struct rspamd_symcache_item *it, *vit; + struct cache_dependency *dep; + struct delayed_cache_dependency *ddep; + struct delayed_cache_condition *dcond; + GList *cur; + gint i, j; + + cur = cache->delayed_deps; + while (cur) { + ddep = cur->data; + + vit = rspamd_symcache_find_filter(cache, ddep->from, false); + it = rspamd_symcache_find_filter(cache, ddep->from, true); + + if (it == NULL || vit == NULL) { + msg_err_cache ("cannot register delayed dependency between %s and %s: " + "%s is missing", ddep->from, ddep->to, ddep->from); + } + else { + msg_debug_cache ("delayed between %s(%d:%d) -> %s", ddep->from, + it->id, vit->id, ddep->to); + rspamd_symcache_add_dependency(cache, it->id, ddep->to, vit != it ? + vit->id : -1); + } + + cur = g_list_next (cur); + } + + cur = cache->delayed_conditions; + while (cur) { + dcond = cur->data; + + it = rspamd_symcache_find_filter(cache, dcond->sym, true); + + if (it == NULL) { + msg_err_cache ( + "cannot register delayed condition for %s", + dcond->sym); + luaL_unref(dcond->L, LUA_REGISTRYINDEX, dcond->cbref); + } + else { + struct rspamd_symcache_condition *ncond = rspamd_mempool_alloc0 (cache->static_pool, + sizeof(*ncond)); + ncond->cb = dcond->cbref; + DL_APPEND(it->specific.normal.conditions, ncond); + } + + cur = g_list_next (cur); + } + + PTR_ARRAY_FOREACH (cache->items_by_id, i, it) { + + PTR_ARRAY_FOREACH (it->deps, j, dep) { + rspamd_symcache_process_dep(cache, it, dep); + } + + if (it->deps) { + /* Reversed loop to make removal safe */ + for (j = it->deps->len - 1; j >= 0; j--) { + dep = g_ptr_array_index (it->deps, j); + + if (dep->item == NULL) { + /* Remove useless dep */ + g_ptr_array_remove_index(it->deps, j); + } + } + } + } + + /* Special case for virtual symbols */ + PTR_ARRAY_FOREACH (cache->virtual, i, it) { + + PTR_ARRAY_FOREACH (it->deps, j, dep) { + rspamd_symcache_process_dep(cache, it, dep); + } + } + + g_ptr_array_sort_with_data(cache->connfilters, prefilters_cmp, cache); + g_ptr_array_sort_with_data(cache->prefilters, prefilters_cmp, cache); + g_ptr_array_sort_with_data(cache->postfilters, postfilters_cmp, cache); + g_ptr_array_sort_with_data(cache->idempotent, postfilters_cmp, cache); + + rspamd_symcache_resort(cache); + + /* Connect metric symbols with symcache symbols */ + if (cache->cfg->symbols) { + g_hash_table_foreach(cache->cfg->symbols, + rspamd_symcache_metric_connect_cb, + cache); + } + + return res; +} + +auto symcache::load_items() -> bool +{ + auto cached_map = util::raii_mmaped_locked_file::mmap_shared(cfg->cache_filename, + O_RDONLY, PROT_READ); + + if (!cached_map.has_value()) { + msg_info_cache("%s", cached_map.error().c_str()); + return false; + } + + + if (cached_map->get_size() < (gint) sizeof(symcache_header)) { + msg_info_cache("cannot use file %s, truncated: %z", cfg->cache_filename, , + errno, strerror(errno)); + return false; + } + + const auto *hdr = (struct symcache_header *)cached_map->get_map(); + + if (memcmp(hdr->magic, symcache_magic, + sizeof(symcache_magic)) != 0) { + msg_info_cache("cannot use file %s, bad magic", cfg->cache_filename); + + return false; + } + + auto *parser = ucl_parser_new(0); + const auto *p = (const std::uint8_t *)(hdr + 1); + + if (!ucl_parser_add_chunk(parser, p, cached_map->get_size() - sizeof(*hdr))) { + msg_info_cache ("cannot use file %s, cannot parse: %s", cfg->cache_filename, + ucl_parser_get_error(parser)); + ucl_parser_free(parser); + + return false; + } + + auto *top = ucl_parser_get_object(parser); + ucl_parser_free(parser); + + if (top == nullptr || ucl_object_type(top) != UCL_OBJECT) { + msg_info_cache ("cannot use file %s, bad object", cfg->cache_filename); + ucl_object_unref(top); + + return false; + } + + auto it = ucl_object_iterate_new(top); + const ucl_object_t *cur; + while ((cur = ucl_object_iterate_safe(it, true)) != nullptr) { + auto item_it = items_by_symbol.find(ucl_object_key(cur)); + + if (item_it != items_by_symbol.end()) { + auto item = item_it->second; + /* Copy saved info */ + /* + * XXX: don't save or load weight, it should be obtained from the + * metric + */ +#if 0 + elt = ucl_object_lookup (cur, "weight"); + + if (elt) { + w = ucl_object_todouble (elt); + if (w != 0) { + item->weight = w; + } + } +#endif + const auto *elt = ucl_object_lookup(cur, "time"); + if (elt) { + item->st->avg_time = ucl_object_todouble(elt); + } + + elt = ucl_object_lookup(cur, "count"); + if (elt) { + item->st->total_hits = ucl_object_toint(elt); + item->last_count = item->st->total_hits; + } + + elt = ucl_object_lookup(cur, "frequency"); + if (elt && ucl_object_type(elt) == UCL_OBJECT) { + const ucl_object_t *freq_elt; + + freq_elt = ucl_object_lookup(elt, "avg"); + + if (freq_elt) { + item->st->avg_frequency = ucl_object_todouble(freq_elt); + } + freq_elt = ucl_object_lookup(elt, "stddev"); + + if (freq_elt) { + item->st->stddev_frequency = ucl_object_todouble(freq_elt); + } + } + + if (item->is_virtual() && !(item->type & SYMBOL_TYPE_GHOST)) { + g_assert (item->specific.virtual.parent < (gint)cache->items_by_id->len); + parent = g_ptr_array_index (cache->items_by_id, + item->specific.virtual.parent); + item->specific.virtual.parent_item = parent; + + if (parent->st->weight < item->st->weight) { + parent->st->weight = item->st->weight; + } + + /* + * We maintain avg_time for virtual symbols equal to the + * parent item avg_time + */ + item->st->avg_time = parent->st->avg_time; + } + + cache->total_weight += fabs(item->st->weight); + cache->total_hits += item->st->total_hits; + } + } + + ucl_object_iterate_free(it); + ucl_object_unref(top); + + return true; +} + +auto symcache::get_item_by_id(int id, bool resolve_parent) const -> const cache_item_ptr & +{ + if (id < 0 || id >= items_by_id.size()) { + g_abort(); + } + + auto &ret = items_by_id[id]; + + if (!ret) { + g_abort(); + } + + if (resolve_parent && ret->is_virtual()) { + return ret->get_parent(*this); + } + + return ret; +} + + +auto cache_item::get_parent(const symcache &cache) const -> const cache_item_ptr & +{ + if (is_virtual()) { + const auto &virtual_sp = std::get<virtual_item>(specific); + + return virtual_sp.get_parent() + } + + return cache_item_ptr{nullptr}; +} + +auto virtual_item::get_parent(const symcache &cache) const -> const cache_item_ptr & +{ + if (parent) { + return parent; + } + + return cache.get_item_by_id(parent_id, false); +} + +}
\ No newline at end of file diff --git a/src/libserver/symcache/symcache_internal.hxx b/src/libserver/symcache/symcache_internal.hxx new file mode 100644 index 000000000..a1207fc97 --- /dev/null +++ b/src/libserver/symcache/symcache_internal.hxx @@ -0,0 +1,385 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal C++ structures and classes for symcache + */ + +#ifndef RSPAMD_SYMCACHE_INTERNAL_HXX +#define RSPAMD_SYMCACHE_INTERNAL_HXX +#pragma once + +#include <cmath> +#include <cstdlib> +#include <cstdint> +#include <vector> +#include <string> +#include <string_view> +#include <memory> +#include <variant> +#include "contrib/robin-hood/robin_hood.h" + +#include "cfg_file.h" +#include "lua/lua_common.h" + +#define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ + cache->static_pool->tag.tagname, cache->cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ + static_pool->tag.tagname, cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_info_cache(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ + static_pool->tag.tagname, cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_cache(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_symcache_log_id, "symcache", cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) + +namespace rspamd::symcache { + +/* Defined in symcache_impl.cxx */ +extern int rspamd_symcache_log_id; + +static const std::uint8_t symcache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0}; + +struct symcache_header { + std::uint8_t magic[8]; + unsigned int nitems; + std::uint8_t checksum[64]; + std::uint8_t unused[128]; +}; + +struct cache_item; +using cache_item_ptr = std::shared_ptr<cache_item>; +using cache_item_weak_ptr = std::weak_ptr<cache_item>; + +struct order_generation { + std::vector<cache_item_weak_ptr> d; + unsigned int generation_id; +}; + +using order_generation_ptr = std::shared_ptr<order_generation>; + +/* + * This structure is optimised to store ids list: + * - If the first element is -1 then use dynamic part, else use static part + * There is no std::variant to save space + */ +struct id_list { + union { + std::uint32_t st[4]; + struct { + std::uint32_t e; /* First element */ + std::uint16_t len; + std::uint16_t allocated; + std::uint32_t *n; + } dyn; + } data; + + id_list() { + std::memset((void *)&data, 0, sizeof(data)); + } + /** + * Returns ids from a compressed list, accepting a mutable reference for number of elements + * @param nids output of the number of elements + * @return + */ + auto get_ids(std::size_t &nids) const -> const std::uint32_t * { + if (data.dyn.e == -1) { + /* Dynamic list */ + nids = data.dyn.len; + + return data.dyn.n; + } + else { + auto cnt = 0; + + while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS(data.st)) { + cnt ++; + } + + nids = cnt; + + return data.st; + } + } + + auto add_id(std::uint32_t id, rspamd_mempool_t *pool) -> void { + if (data.st[0] == -1) { + /* Dynamic array */ + if (data.dyn.len < data.dyn.allocated) { + /* Trivial, append + sort */ + data.dyn.n[data.dyn.len++] = id; + } + else { + /* Reallocate */ + g_assert (data.dyn.allocated <= G_MAXINT16); + data.dyn.allocated *= 2; + + auto *new_array = rspamd_mempool_alloc_array_type(pool, + data.dyn.allocated, std::uint32_t); + memcpy(new_array, data.dyn.n, data.dyn.len * sizeof(std::uint32_t)); + data.dyn.n = new_array; + data.dyn.n[data.dyn.len++] = id; + } + + std::sort(data.dyn.n, data.dyn.n + data.dyn.len); + } + else { + /* Static part */ + auto cnt = 0u; + while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS (data.st)) { + cnt ++; + } + + if (cnt < G_N_ELEMENTS (data.st)) { + data.st[cnt] = id; + } + else { + /* Switch to dynamic */ + data.dyn.allocated = G_N_ELEMENTS (data.st) * 2; + auto *new_array = rspamd_mempool_alloc_array_type(pool, + data.dyn.allocated, std::uint32_t); + memcpy (new_array, data.st, sizeof(data.st)); + data.dyn.n = new_array; + data.dyn.e = -1; /* Marker */ + data.dyn.len = G_N_ELEMENTS (data.st); + + /* Recursively jump to dynamic branch that will handle insertion + sorting */ + add_id(id, pool); // tail call + } + } + } +}; + +class symcache; + +struct item_condition { +private: + gint cb; + lua_State *L; +public: + item_condition() { + // TODO + } + virtual ~item_condition() { + // TODO + } +}; + +class normal_item { +private: + symbol_func_t func; + void *user_data; + std::vector<item_condition> conditions; +public: + explicit normal_item() { + // TODO + } + auto add_condition() -> void { + // TODO + } + auto call() -> void { + // TODO + } +}; + +class virtual_item { +private: + int parent_id; + cache_item_ptr parent; +public: + explicit virtual_item() { + // TODO + } + + auto get_parent(const symcache &cache) const -> const cache_item_ptr&; +}; + +struct cache_item { + /* This block is likely shared */ + struct rspamd_symcache_item_stat *st; + struct rspamd_counter_data *cd; + + std::uint64_t last_count; + std::string symbol; + std::string_view type_descr; + int type; + + /* Callback data */ + std::variant<normal_item, virtual_item> specific; + + /* Condition of execution */ + bool enabled; + + /* Priority */ + int priority; + /* Topological order */ + unsigned int order; + /* Unique id - counter */ + int id; + + int frequency_peaks; + /* Settings ids */ + id_list allowed_ids; + /* Allows execution but not symbols insertion */ + id_list exec_only_ids; + id_list forbidden_ids; + + /* Dependencies */ + std::vector<cache_item_ptr> deps; + /* Reverse dependencies */ + std::vector<cache_item_ptr> rdeps; + + auto is_virtual() const -> bool { return std::holds_alternative<virtual_item>(specific); } + auto get_parent(const symcache &cache) const -> const cache_item_ptr &; +}; + +struct delayed_cache_dependency { + std::string from; + std::string to; +}; + +struct delayed_cache_condition { + std::string sym; + int cbref; + lua_State *L; +}; + +class symcache { +private: + /* Map indexed by symbol name: all symbols must have unique names, so this map holds ownership */ + robin_hood::unordered_flat_map<std::string_view, cache_item_ptr> items_by_symbol; + std::vector<cache_item_ptr> items_by_id; + + /* Items sorted into some order */ + order_generation_ptr items_by_order; + unsigned int cur_order_gen; + + std::vector<cache_item_ptr> connfilters; + std::vector<cache_item_ptr> prefilters; + std::vector<cache_item_ptr> filters; + std::vector<cache_item_ptr> postfilters; + std::vector<cache_item_ptr> composites; + std::vector<cache_item_ptr> idempotent; + std::vector<cache_item_ptr> virtual_symbols; + + /* These are stored within pointer to clean up after init */ + std::unique_ptr<std::vector<delayed_cache_dependency>> delayed_deps; + std::unique_ptr<std::vector<delayed_cache_condition>> delayed_conditions; + + rspamd_mempool_t *static_pool; + std::uint64_t cksum; + double total_weight; + std::size_t used_items; + std::size_t stats_symbols_count; + std::uint64_t total_hits; + + struct rspamd_config *cfg; + lua_State *L; + double reload_time; + double last_profile; + int peak_cb; + int cache_id; + +private: + /* Internal methods */ + auto load_items() -> bool; + +public: + explicit symcache(struct rspamd_config *cfg) : cfg(cfg) { + /* XXX: do we need a special pool for symcache? I don't think so */ + static_pool = cfg->cfg_pool; + reload_time = cfg->cache_reload_time; + total_hits = 1; + total_weight = 1.0; + cksum = 0xdeadbabe; + peak_cb = -1; + cache_id = rspamd_random_uint64_fast(); + L = (lua_State *)cfg->lua_state; + } + + virtual ~symcache() { + if (peak_cb != -1) { + luaL_unref(L, LUA_REGISTRYINDEX, peak_cb); + } + } + + auto get_item_by_id(int id, bool resolve_parent) const -> const cache_item_ptr &; + + /* + * Initialises the symbols cache, must be called after all symbols are added + * and the config file is loaded + */ + auto init() -> bool; +}; + +/* + * These items are saved within task structure and are used to track + * symbols execution + */ +struct cache_dynamic_item { + std::uint16_t start_msec; /* Relative to task time */ + unsigned started: 1; + unsigned finished: 1; + /* unsigned pad:14; */ + std::uint32_t async_events; +}; + + +struct cache_dependency { + cache_item_ptr item; /* Owning pointer to the real dep */ + std::string_view sym; /* Symbolic dep name */ + int id; /* Real from */ + int vid; /* Virtual from */ +}; + +struct cache_savepoint { + unsigned order_gen; + unsigned items_inflight; + bool profile; + bool has_slow; + + double profile_start; + double lim; + + struct rspamd_scan_result *rs; + + struct cache_item *cur_item; + order_generation_ptr order; + /* Dynamically expanded as needed */ + struct cache_dynamic_item dynamic_items[]; +}; + +struct cache_refresh_cbdata { + double last_resort; + ev_timer resort_ev; + symcache *cache; + struct rspamd_worker *w; + struct ev_loop *event_loop; +}; + +} // namespace rspamd + +#endif //RSPAMD_SYMCACHE_INTERNAL_HXX diff --git a/src/libutil/CMakeLists.txt b/src/libutil/CMakeLists.txt index 5160dfe7b..7b3103720 100644 --- a/src/libutil/CMakeLists.txt +++ b/src/libutil/CMakeLists.txt @@ -17,6 +17,7 @@ SET(LIBRSPAMDUTILSRC ${CMAKE_CURRENT_SOURCE_DIR}/util.c ${CMAKE_CURRENT_SOURCE_DIR}/heap.c ${CMAKE_CURRENT_SOURCE_DIR}/multipattern.c - ${CMAKE_CURRENT_SOURCE_DIR}/cxx/utf8_util.cxx) + ${CMAKE_CURRENT_SOURCE_DIR}/cxx/utf8_util.cxx + ${CMAKE_CURRENT_SOURCE_DIR}/cxx/locked_file.cxx) # Rspamdutil SET(RSPAMD_UTIL ${LIBRSPAMDUTILSRC} PARENT_SCOPE)
\ No newline at end of file |