diff options
author | Vsevolod Stakhov <vsevolod@rspamd.com> | 2022-04-02 16:45:41 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rspamd.com> | 2022-04-02 16:45:41 +0100 |
commit | fd9693073dc162c43710bd977830a3b400f2b43b (patch) | |
tree | 5e5831dfdc3d3c95a52f8cc75478d25149d412fd /src/libserver/symcache | |
parent | 4b6706a6955b111fb9366fdea2f87c81a9d2edd1 (diff) | |
download | rspamd-fd9693073dc162c43710bd977830a3b400f2b43b.tar.gz rspamd-fd9693073dc162c43710bd977830a3b400f2b43b.zip |
[Rework] Rework files structure
Diffstat (limited to 'src/libserver/symcache')
-rw-r--r-- | src/libserver/symcache/symcache_c.cxx | 48 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_impl.cxx | 292 | ||||
-rw-r--r-- | src/libserver/symcache/symcache_internal.hxx | 385 |
3 files changed, 725 insertions, 0 deletions
diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx new file mode 100644 index 000000000..7255f9d10 --- /dev/null +++ b/src/libserver/symcache/symcache_c.cxx @@ -0,0 +1,48 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "symcache_internal.hxx" + +/** + * C API for symcache + */ + +#define C_API_SYMCACHE(ptr) (reinterpret_cast<rspamd::symcache::symcache *>(ptr)) +#define C_API_SYMCACHE_ITEM(ptr) (reinterpret_cast<rspamd::symcache::cache_item *>(ptr)) + +void +rspamd_symcache_destroy (struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + delete real_cache; +} + +struct rspamd_symcache* +rspamd_symcache_new (struct rspamd_config *cfg) +{ + auto *ncache = new rspamd::symcache::symcache(cfg); + + return (struct rspamd_symcache*)ncache; +} + +gboolean +rspamd_symcache_init (struct rspamd_symcache *cache) +{ + auto *real_cache = C_API_SYMCACHE(cache); + + return real_cache->init(); +} diff --git a/src/libserver/symcache/symcache_impl.cxx b/src/libserver/symcache/symcache_impl.cxx new file mode 100644 index 000000000..aaf8b0cdd --- /dev/null +++ b/src/libserver/symcache/symcache_impl.cxx @@ -0,0 +1,292 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "symcache_internal.hxx" +#include "unix-std.h" +#include "libutil/cxx/locked_file.hxx" + +namespace rspamd::symcache { + +INIT_LOG_MODULE_PUBLIC(symcache) + +auto symcache::init() -> bool +{ + auto res = true; + reload_time = cfg->cache_reload_time; + + if (cfg->cache_filename != NULL) { + res = load_items(); + } + + struct rspamd_symcache_item *it, *vit; + struct cache_dependency *dep; + struct delayed_cache_dependency *ddep; + struct delayed_cache_condition *dcond; + GList *cur; + gint i, j; + + cur = cache->delayed_deps; + while (cur) { + ddep = cur->data; + + vit = rspamd_symcache_find_filter(cache, ddep->from, false); + it = rspamd_symcache_find_filter(cache, ddep->from, true); + + if (it == NULL || vit == NULL) { + msg_err_cache ("cannot register delayed dependency between %s and %s: " + "%s is missing", ddep->from, ddep->to, ddep->from); + } + else { + msg_debug_cache ("delayed between %s(%d:%d) -> %s", ddep->from, + it->id, vit->id, ddep->to); + rspamd_symcache_add_dependency(cache, it->id, ddep->to, vit != it ? + vit->id : -1); + } + + cur = g_list_next (cur); + } + + cur = cache->delayed_conditions; + while (cur) { + dcond = cur->data; + + it = rspamd_symcache_find_filter(cache, dcond->sym, true); + + if (it == NULL) { + msg_err_cache ( + "cannot register delayed condition for %s", + dcond->sym); + luaL_unref(dcond->L, LUA_REGISTRYINDEX, dcond->cbref); + } + else { + struct rspamd_symcache_condition *ncond = rspamd_mempool_alloc0 (cache->static_pool, + sizeof(*ncond)); + ncond->cb = dcond->cbref; + DL_APPEND(it->specific.normal.conditions, ncond); + } + + cur = g_list_next (cur); + } + + PTR_ARRAY_FOREACH (cache->items_by_id, i, it) { + + PTR_ARRAY_FOREACH (it->deps, j, dep) { + rspamd_symcache_process_dep(cache, it, dep); + } + + if (it->deps) { + /* Reversed loop to make removal safe */ + for (j = it->deps->len - 1; j >= 0; j--) { + dep = g_ptr_array_index (it->deps, j); + + if (dep->item == NULL) { + /* Remove useless dep */ + g_ptr_array_remove_index(it->deps, j); + } + } + } + } + + /* Special case for virtual symbols */ + PTR_ARRAY_FOREACH (cache->virtual, i, it) { + + PTR_ARRAY_FOREACH (it->deps, j, dep) { + rspamd_symcache_process_dep(cache, it, dep); + } + } + + g_ptr_array_sort_with_data(cache->connfilters, prefilters_cmp, cache); + g_ptr_array_sort_with_data(cache->prefilters, prefilters_cmp, cache); + g_ptr_array_sort_with_data(cache->postfilters, postfilters_cmp, cache); + g_ptr_array_sort_with_data(cache->idempotent, postfilters_cmp, cache); + + rspamd_symcache_resort(cache); + + /* Connect metric symbols with symcache symbols */ + if (cache->cfg->symbols) { + g_hash_table_foreach(cache->cfg->symbols, + rspamd_symcache_metric_connect_cb, + cache); + } + + return res; +} + +auto symcache::load_items() -> bool +{ + auto cached_map = util::raii_mmaped_locked_file::mmap_shared(cfg->cache_filename, + O_RDONLY, PROT_READ); + + if (!cached_map.has_value()) { + msg_info_cache("%s", cached_map.error().c_str()); + return false; + } + + + if (cached_map->get_size() < (gint) sizeof(symcache_header)) { + msg_info_cache("cannot use file %s, truncated: %z", cfg->cache_filename, , + errno, strerror(errno)); + return false; + } + + const auto *hdr = (struct symcache_header *)cached_map->get_map(); + + if (memcmp(hdr->magic, symcache_magic, + sizeof(symcache_magic)) != 0) { + msg_info_cache("cannot use file %s, bad magic", cfg->cache_filename); + + return false; + } + + auto *parser = ucl_parser_new(0); + const auto *p = (const std::uint8_t *)(hdr + 1); + + if (!ucl_parser_add_chunk(parser, p, cached_map->get_size() - sizeof(*hdr))) { + msg_info_cache ("cannot use file %s, cannot parse: %s", cfg->cache_filename, + ucl_parser_get_error(parser)); + ucl_parser_free(parser); + + return false; + } + + auto *top = ucl_parser_get_object(parser); + ucl_parser_free(parser); + + if (top == nullptr || ucl_object_type(top) != UCL_OBJECT) { + msg_info_cache ("cannot use file %s, bad object", cfg->cache_filename); + ucl_object_unref(top); + + return false; + } + + auto it = ucl_object_iterate_new(top); + const ucl_object_t *cur; + while ((cur = ucl_object_iterate_safe(it, true)) != nullptr) { + auto item_it = items_by_symbol.find(ucl_object_key(cur)); + + if (item_it != items_by_symbol.end()) { + auto item = item_it->second; + /* Copy saved info */ + /* + * XXX: don't save or load weight, it should be obtained from the + * metric + */ +#if 0 + elt = ucl_object_lookup (cur, "weight"); + + if (elt) { + w = ucl_object_todouble (elt); + if (w != 0) { + item->weight = w; + } + } +#endif + const auto *elt = ucl_object_lookup(cur, "time"); + if (elt) { + item->st->avg_time = ucl_object_todouble(elt); + } + + elt = ucl_object_lookup(cur, "count"); + if (elt) { + item->st->total_hits = ucl_object_toint(elt); + item->last_count = item->st->total_hits; + } + + elt = ucl_object_lookup(cur, "frequency"); + if (elt && ucl_object_type(elt) == UCL_OBJECT) { + const ucl_object_t *freq_elt; + + freq_elt = ucl_object_lookup(elt, "avg"); + + if (freq_elt) { + item->st->avg_frequency = ucl_object_todouble(freq_elt); + } + freq_elt = ucl_object_lookup(elt, "stddev"); + + if (freq_elt) { + item->st->stddev_frequency = ucl_object_todouble(freq_elt); + } + } + + if (item->is_virtual() && !(item->type & SYMBOL_TYPE_GHOST)) { + g_assert (item->specific.virtual.parent < (gint)cache->items_by_id->len); + parent = g_ptr_array_index (cache->items_by_id, + item->specific.virtual.parent); + item->specific.virtual.parent_item = parent; + + if (parent->st->weight < item->st->weight) { + parent->st->weight = item->st->weight; + } + + /* + * We maintain avg_time for virtual symbols equal to the + * parent item avg_time + */ + item->st->avg_time = parent->st->avg_time; + } + + cache->total_weight += fabs(item->st->weight); + cache->total_hits += item->st->total_hits; + } + } + + ucl_object_iterate_free(it); + ucl_object_unref(top); + + return true; +} + +auto symcache::get_item_by_id(int id, bool resolve_parent) const -> const cache_item_ptr & +{ + if (id < 0 || id >= items_by_id.size()) { + g_abort(); + } + + auto &ret = items_by_id[id]; + + if (!ret) { + g_abort(); + } + + if (resolve_parent && ret->is_virtual()) { + return ret->get_parent(*this); + } + + return ret; +} + + +auto cache_item::get_parent(const symcache &cache) const -> const cache_item_ptr & +{ + if (is_virtual()) { + const auto &virtual_sp = std::get<virtual_item>(specific); + + return virtual_sp.get_parent() + } + + return cache_item_ptr{nullptr}; +} + +auto virtual_item::get_parent(const symcache &cache) const -> const cache_item_ptr & +{ + if (parent) { + return parent; + } + + return cache.get_item_by_id(parent_id, false); +} + +}
\ No newline at end of file diff --git a/src/libserver/symcache/symcache_internal.hxx b/src/libserver/symcache/symcache_internal.hxx new file mode 100644 index 000000000..a1207fc97 --- /dev/null +++ b/src/libserver/symcache/symcache_internal.hxx @@ -0,0 +1,385 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal C++ structures and classes for symcache + */ + +#ifndef RSPAMD_SYMCACHE_INTERNAL_HXX +#define RSPAMD_SYMCACHE_INTERNAL_HXX +#pragma once + +#include <cmath> +#include <cstdlib> +#include <cstdint> +#include <vector> +#include <string> +#include <string_view> +#include <memory> +#include <variant> +#include "contrib/robin-hood/robin_hood.h" + +#include "cfg_file.h" +#include "lua/lua_common.h" + +#define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ + cache->static_pool->tag.tagname, cache->cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ + static_pool->tag.tagname, cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_info_cache(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ + static_pool->tag.tagname, cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_cache(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_symcache_log_id, "symcache", cfg->checksum, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) + +namespace rspamd::symcache { + +/* Defined in symcache_impl.cxx */ +extern int rspamd_symcache_log_id; + +static const std::uint8_t symcache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0}; + +struct symcache_header { + std::uint8_t magic[8]; + unsigned int nitems; + std::uint8_t checksum[64]; + std::uint8_t unused[128]; +}; + +struct cache_item; +using cache_item_ptr = std::shared_ptr<cache_item>; +using cache_item_weak_ptr = std::weak_ptr<cache_item>; + +struct order_generation { + std::vector<cache_item_weak_ptr> d; + unsigned int generation_id; +}; + +using order_generation_ptr = std::shared_ptr<order_generation>; + +/* + * This structure is optimised to store ids list: + * - If the first element is -1 then use dynamic part, else use static part + * There is no std::variant to save space + */ +struct id_list { + union { + std::uint32_t st[4]; + struct { + std::uint32_t e; /* First element */ + std::uint16_t len; + std::uint16_t allocated; + std::uint32_t *n; + } dyn; + } data; + + id_list() { + std::memset((void *)&data, 0, sizeof(data)); + } + /** + * Returns ids from a compressed list, accepting a mutable reference for number of elements + * @param nids output of the number of elements + * @return + */ + auto get_ids(std::size_t &nids) const -> const std::uint32_t * { + if (data.dyn.e == -1) { + /* Dynamic list */ + nids = data.dyn.len; + + return data.dyn.n; + } + else { + auto cnt = 0; + + while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS(data.st)) { + cnt ++; + } + + nids = cnt; + + return data.st; + } + } + + auto add_id(std::uint32_t id, rspamd_mempool_t *pool) -> void { + if (data.st[0] == -1) { + /* Dynamic array */ + if (data.dyn.len < data.dyn.allocated) { + /* Trivial, append + sort */ + data.dyn.n[data.dyn.len++] = id; + } + else { + /* Reallocate */ + g_assert (data.dyn.allocated <= G_MAXINT16); + data.dyn.allocated *= 2; + + auto *new_array = rspamd_mempool_alloc_array_type(pool, + data.dyn.allocated, std::uint32_t); + memcpy(new_array, data.dyn.n, data.dyn.len * sizeof(std::uint32_t)); + data.dyn.n = new_array; + data.dyn.n[data.dyn.len++] = id; + } + + std::sort(data.dyn.n, data.dyn.n + data.dyn.len); + } + else { + /* Static part */ + auto cnt = 0u; + while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS (data.st)) { + cnt ++; + } + + if (cnt < G_N_ELEMENTS (data.st)) { + data.st[cnt] = id; + } + else { + /* Switch to dynamic */ + data.dyn.allocated = G_N_ELEMENTS (data.st) * 2; + auto *new_array = rspamd_mempool_alloc_array_type(pool, + data.dyn.allocated, std::uint32_t); + memcpy (new_array, data.st, sizeof(data.st)); + data.dyn.n = new_array; + data.dyn.e = -1; /* Marker */ + data.dyn.len = G_N_ELEMENTS (data.st); + + /* Recursively jump to dynamic branch that will handle insertion + sorting */ + add_id(id, pool); // tail call + } + } + } +}; + +class symcache; + +struct item_condition { +private: + gint cb; + lua_State *L; +public: + item_condition() { + // TODO + } + virtual ~item_condition() { + // TODO + } +}; + +class normal_item { +private: + symbol_func_t func; + void *user_data; + std::vector<item_condition> conditions; +public: + explicit normal_item() { + // TODO + } + auto add_condition() -> void { + // TODO + } + auto call() -> void { + // TODO + } +}; + +class virtual_item { +private: + int parent_id; + cache_item_ptr parent; +public: + explicit virtual_item() { + // TODO + } + + auto get_parent(const symcache &cache) const -> const cache_item_ptr&; +}; + +struct cache_item { + /* This block is likely shared */ + struct rspamd_symcache_item_stat *st; + struct rspamd_counter_data *cd; + + std::uint64_t last_count; + std::string symbol; + std::string_view type_descr; + int type; + + /* Callback data */ + std::variant<normal_item, virtual_item> specific; + + /* Condition of execution */ + bool enabled; + + /* Priority */ + int priority; + /* Topological order */ + unsigned int order; + /* Unique id - counter */ + int id; + + int frequency_peaks; + /* Settings ids */ + id_list allowed_ids; + /* Allows execution but not symbols insertion */ + id_list exec_only_ids; + id_list forbidden_ids; + + /* Dependencies */ + std::vector<cache_item_ptr> deps; + /* Reverse dependencies */ + std::vector<cache_item_ptr> rdeps; + + auto is_virtual() const -> bool { return std::holds_alternative<virtual_item>(specific); } + auto get_parent(const symcache &cache) const -> const cache_item_ptr &; +}; + +struct delayed_cache_dependency { + std::string from; + std::string to; +}; + +struct delayed_cache_condition { + std::string sym; + int cbref; + lua_State *L; +}; + +class symcache { +private: + /* Map indexed by symbol name: all symbols must have unique names, so this map holds ownership */ + robin_hood::unordered_flat_map<std::string_view, cache_item_ptr> items_by_symbol; + std::vector<cache_item_ptr> items_by_id; + + /* Items sorted into some order */ + order_generation_ptr items_by_order; + unsigned int cur_order_gen; + + std::vector<cache_item_ptr> connfilters; + std::vector<cache_item_ptr> prefilters; + std::vector<cache_item_ptr> filters; + std::vector<cache_item_ptr> postfilters; + std::vector<cache_item_ptr> composites; + std::vector<cache_item_ptr> idempotent; + std::vector<cache_item_ptr> virtual_symbols; + + /* These are stored within pointer to clean up after init */ + std::unique_ptr<std::vector<delayed_cache_dependency>> delayed_deps; + std::unique_ptr<std::vector<delayed_cache_condition>> delayed_conditions; + + rspamd_mempool_t *static_pool; + std::uint64_t cksum; + double total_weight; + std::size_t used_items; + std::size_t stats_symbols_count; + std::uint64_t total_hits; + + struct rspamd_config *cfg; + lua_State *L; + double reload_time; + double last_profile; + int peak_cb; + int cache_id; + +private: + /* Internal methods */ + auto load_items() -> bool; + +public: + explicit symcache(struct rspamd_config *cfg) : cfg(cfg) { + /* XXX: do we need a special pool for symcache? I don't think so */ + static_pool = cfg->cfg_pool; + reload_time = cfg->cache_reload_time; + total_hits = 1; + total_weight = 1.0; + cksum = 0xdeadbabe; + peak_cb = -1; + cache_id = rspamd_random_uint64_fast(); + L = (lua_State *)cfg->lua_state; + } + + virtual ~symcache() { + if (peak_cb != -1) { + luaL_unref(L, LUA_REGISTRYINDEX, peak_cb); + } + } + + auto get_item_by_id(int id, bool resolve_parent) const -> const cache_item_ptr &; + + /* + * Initialises the symbols cache, must be called after all symbols are added + * and the config file is loaded + */ + auto init() -> bool; +}; + +/* + * These items are saved within task structure and are used to track + * symbols execution + */ +struct cache_dynamic_item { + std::uint16_t start_msec; /* Relative to task time */ + unsigned started: 1; + unsigned finished: 1; + /* unsigned pad:14; */ + std::uint32_t async_events; +}; + + +struct cache_dependency { + cache_item_ptr item; /* Owning pointer to the real dep */ + std::string_view sym; /* Symbolic dep name */ + int id; /* Real from */ + int vid; /* Virtual from */ +}; + +struct cache_savepoint { + unsigned order_gen; + unsigned items_inflight; + bool profile; + bool has_slow; + + double profile_start; + double lim; + + struct rspamd_scan_result *rs; + + struct cache_item *cur_item; + order_generation_ptr order; + /* Dynamically expanded as needed */ + struct cache_dynamic_item dynamic_items[]; +}; + +struct cache_refresh_cbdata { + double last_resort; + ev_timer resort_ev; + symcache *cache; + struct rspamd_worker *w; + struct ev_loop *event_loop; +}; + +} // namespace rspamd + +#endif //RSPAMD_SYMCACHE_INTERNAL_HXX |