summaryrefslogtreecommitdiffstats
path: root/src/libserver/symcache
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2022-04-02 16:45:41 +0100
committerVsevolod Stakhov <vsevolod@rspamd.com>2022-04-02 16:45:41 +0100
commitfd9693073dc162c43710bd977830a3b400f2b43b (patch)
tree5e5831dfdc3d3c95a52f8cc75478d25149d412fd /src/libserver/symcache
parent4b6706a6955b111fb9366fdea2f87c81a9d2edd1 (diff)
downloadrspamd-fd9693073dc162c43710bd977830a3b400f2b43b.tar.gz
rspamd-fd9693073dc162c43710bd977830a3b400f2b43b.zip
[Rework] Rework files structure
Diffstat (limited to 'src/libserver/symcache')
-rw-r--r--src/libserver/symcache/symcache_c.cxx48
-rw-r--r--src/libserver/symcache/symcache_impl.cxx292
-rw-r--r--src/libserver/symcache/symcache_internal.hxx385
3 files changed, 725 insertions, 0 deletions
diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx
new file mode 100644
index 000000000..7255f9d10
--- /dev/null
+++ b/src/libserver/symcache/symcache_c.cxx
@@ -0,0 +1,48 @@
+/*-
+ * Copyright 2022 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "symcache_internal.hxx"
+
+/**
+ * C API for symcache
+ */
+
+#define C_API_SYMCACHE(ptr) (reinterpret_cast<rspamd::symcache::symcache *>(ptr))
+#define C_API_SYMCACHE_ITEM(ptr) (reinterpret_cast<rspamd::symcache::cache_item *>(ptr))
+
+void
+rspamd_symcache_destroy (struct rspamd_symcache *cache)
+{
+ auto *real_cache = C_API_SYMCACHE(cache);
+
+ delete real_cache;
+}
+
+struct rspamd_symcache*
+rspamd_symcache_new (struct rspamd_config *cfg)
+{
+ auto *ncache = new rspamd::symcache::symcache(cfg);
+
+ return (struct rspamd_symcache*)ncache;
+}
+
+gboolean
+rspamd_symcache_init (struct rspamd_symcache *cache)
+{
+ auto *real_cache = C_API_SYMCACHE(cache);
+
+ return real_cache->init();
+}
diff --git a/src/libserver/symcache/symcache_impl.cxx b/src/libserver/symcache/symcache_impl.cxx
new file mode 100644
index 000000000..aaf8b0cdd
--- /dev/null
+++ b/src/libserver/symcache/symcache_impl.cxx
@@ -0,0 +1,292 @@
+/*-
+ * Copyright 2022 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "symcache_internal.hxx"
+#include "unix-std.h"
+#include "libutil/cxx/locked_file.hxx"
+
+namespace rspamd::symcache {
+
+INIT_LOG_MODULE_PUBLIC(symcache)
+
+auto symcache::init() -> bool
+{
+ auto res = true;
+ reload_time = cfg->cache_reload_time;
+
+ if (cfg->cache_filename != NULL) {
+ res = load_items();
+ }
+
+ struct rspamd_symcache_item *it, *vit;
+ struct cache_dependency *dep;
+ struct delayed_cache_dependency *ddep;
+ struct delayed_cache_condition *dcond;
+ GList *cur;
+ gint i, j;
+
+ cur = cache->delayed_deps;
+ while (cur) {
+ ddep = cur->data;
+
+ vit = rspamd_symcache_find_filter(cache, ddep->from, false);
+ it = rspamd_symcache_find_filter(cache, ddep->from, true);
+
+ if (it == NULL || vit == NULL) {
+ msg_err_cache ("cannot register delayed dependency between %s and %s: "
+ "%s is missing", ddep->from, ddep->to, ddep->from);
+ }
+ else {
+ msg_debug_cache ("delayed between %s(%d:%d) -> %s", ddep->from,
+ it->id, vit->id, ddep->to);
+ rspamd_symcache_add_dependency(cache, it->id, ddep->to, vit != it ?
+ vit->id : -1);
+ }
+
+ cur = g_list_next (cur);
+ }
+
+ cur = cache->delayed_conditions;
+ while (cur) {
+ dcond = cur->data;
+
+ it = rspamd_symcache_find_filter(cache, dcond->sym, true);
+
+ if (it == NULL) {
+ msg_err_cache (
+ "cannot register delayed condition for %s",
+ dcond->sym);
+ luaL_unref(dcond->L, LUA_REGISTRYINDEX, dcond->cbref);
+ }
+ else {
+ struct rspamd_symcache_condition *ncond = rspamd_mempool_alloc0 (cache->static_pool,
+ sizeof(*ncond));
+ ncond->cb = dcond->cbref;
+ DL_APPEND(it->specific.normal.conditions, ncond);
+ }
+
+ cur = g_list_next (cur);
+ }
+
+ PTR_ARRAY_FOREACH (cache->items_by_id, i, it) {
+
+ PTR_ARRAY_FOREACH (it->deps, j, dep) {
+ rspamd_symcache_process_dep(cache, it, dep);
+ }
+
+ if (it->deps) {
+ /* Reversed loop to make removal safe */
+ for (j = it->deps->len - 1; j >= 0; j--) {
+ dep = g_ptr_array_index (it->deps, j);
+
+ if (dep->item == NULL) {
+ /* Remove useless dep */
+ g_ptr_array_remove_index(it->deps, j);
+ }
+ }
+ }
+ }
+
+ /* Special case for virtual symbols */
+ PTR_ARRAY_FOREACH (cache->virtual, i, it) {
+
+ PTR_ARRAY_FOREACH (it->deps, j, dep) {
+ rspamd_symcache_process_dep(cache, it, dep);
+ }
+ }
+
+ g_ptr_array_sort_with_data(cache->connfilters, prefilters_cmp, cache);
+ g_ptr_array_sort_with_data(cache->prefilters, prefilters_cmp, cache);
+ g_ptr_array_sort_with_data(cache->postfilters, postfilters_cmp, cache);
+ g_ptr_array_sort_with_data(cache->idempotent, postfilters_cmp, cache);
+
+ rspamd_symcache_resort(cache);
+
+ /* Connect metric symbols with symcache symbols */
+ if (cache->cfg->symbols) {
+ g_hash_table_foreach(cache->cfg->symbols,
+ rspamd_symcache_metric_connect_cb,
+ cache);
+ }
+
+ return res;
+}
+
+auto symcache::load_items() -> bool
+{
+ auto cached_map = util::raii_mmaped_locked_file::mmap_shared(cfg->cache_filename,
+ O_RDONLY, PROT_READ);
+
+ if (!cached_map.has_value()) {
+ msg_info_cache("%s", cached_map.error().c_str());
+ return false;
+ }
+
+
+ if (cached_map->get_size() < (gint) sizeof(symcache_header)) {
+ msg_info_cache("cannot use file %s, truncated: %z", cfg->cache_filename, ,
+ errno, strerror(errno));
+ return false;
+ }
+
+ const auto *hdr = (struct symcache_header *)cached_map->get_map();
+
+ if (memcmp(hdr->magic, symcache_magic,
+ sizeof(symcache_magic)) != 0) {
+ msg_info_cache("cannot use file %s, bad magic", cfg->cache_filename);
+
+ return false;
+ }
+
+ auto *parser = ucl_parser_new(0);
+ const auto *p = (const std::uint8_t *)(hdr + 1);
+
+ if (!ucl_parser_add_chunk(parser, p, cached_map->get_size() - sizeof(*hdr))) {
+ msg_info_cache ("cannot use file %s, cannot parse: %s", cfg->cache_filename,
+ ucl_parser_get_error(parser));
+ ucl_parser_free(parser);
+
+ return false;
+ }
+
+ auto *top = ucl_parser_get_object(parser);
+ ucl_parser_free(parser);
+
+ if (top == nullptr || ucl_object_type(top) != UCL_OBJECT) {
+ msg_info_cache ("cannot use file %s, bad object", cfg->cache_filename);
+ ucl_object_unref(top);
+
+ return false;
+ }
+
+ auto it = ucl_object_iterate_new(top);
+ const ucl_object_t *cur;
+ while ((cur = ucl_object_iterate_safe(it, true)) != nullptr) {
+ auto item_it = items_by_symbol.find(ucl_object_key(cur));
+
+ if (item_it != items_by_symbol.end()) {
+ auto item = item_it->second;
+ /* Copy saved info */
+ /*
+ * XXX: don't save or load weight, it should be obtained from the
+ * metric
+ */
+#if 0
+ elt = ucl_object_lookup (cur, "weight");
+
+ if (elt) {
+ w = ucl_object_todouble (elt);
+ if (w != 0) {
+ item->weight = w;
+ }
+ }
+#endif
+ const auto *elt = ucl_object_lookup(cur, "time");
+ if (elt) {
+ item->st->avg_time = ucl_object_todouble(elt);
+ }
+
+ elt = ucl_object_lookup(cur, "count");
+ if (elt) {
+ item->st->total_hits = ucl_object_toint(elt);
+ item->last_count = item->st->total_hits;
+ }
+
+ elt = ucl_object_lookup(cur, "frequency");
+ if (elt && ucl_object_type(elt) == UCL_OBJECT) {
+ const ucl_object_t *freq_elt;
+
+ freq_elt = ucl_object_lookup(elt, "avg");
+
+ if (freq_elt) {
+ item->st->avg_frequency = ucl_object_todouble(freq_elt);
+ }
+ freq_elt = ucl_object_lookup(elt, "stddev");
+
+ if (freq_elt) {
+ item->st->stddev_frequency = ucl_object_todouble(freq_elt);
+ }
+ }
+
+ if (item->is_virtual() && !(item->type & SYMBOL_TYPE_GHOST)) {
+ g_assert (item->specific.virtual.parent < (gint)cache->items_by_id->len);
+ parent = g_ptr_array_index (cache->items_by_id,
+ item->specific.virtual.parent);
+ item->specific.virtual.parent_item = parent;
+
+ if (parent->st->weight < item->st->weight) {
+ parent->st->weight = item->st->weight;
+ }
+
+ /*
+ * We maintain avg_time for virtual symbols equal to the
+ * parent item avg_time
+ */
+ item->st->avg_time = parent->st->avg_time;
+ }
+
+ cache->total_weight += fabs(item->st->weight);
+ cache->total_hits += item->st->total_hits;
+ }
+ }
+
+ ucl_object_iterate_free(it);
+ ucl_object_unref(top);
+
+ return true;
+}
+
+auto symcache::get_item_by_id(int id, bool resolve_parent) const -> const cache_item_ptr &
+{
+ if (id < 0 || id >= items_by_id.size()) {
+ g_abort();
+ }
+
+ auto &ret = items_by_id[id];
+
+ if (!ret) {
+ g_abort();
+ }
+
+ if (resolve_parent && ret->is_virtual()) {
+ return ret->get_parent(*this);
+ }
+
+ return ret;
+}
+
+
+auto cache_item::get_parent(const symcache &cache) const -> const cache_item_ptr &
+{
+ if (is_virtual()) {
+ const auto &virtual_sp = std::get<virtual_item>(specific);
+
+ return virtual_sp.get_parent()
+ }
+
+ return cache_item_ptr{nullptr};
+}
+
+auto virtual_item::get_parent(const symcache &cache) const -> const cache_item_ptr &
+{
+ if (parent) {
+ return parent;
+ }
+
+ return cache.get_item_by_id(parent_id, false);
+}
+
+} \ No newline at end of file
diff --git a/src/libserver/symcache/symcache_internal.hxx b/src/libserver/symcache/symcache_internal.hxx
new file mode 100644
index 000000000..a1207fc97
--- /dev/null
+++ b/src/libserver/symcache/symcache_internal.hxx
@@ -0,0 +1,385 @@
+/*-
+ * Copyright 2022 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal C++ structures and classes for symcache
+ */
+
+#ifndef RSPAMD_SYMCACHE_INTERNAL_HXX
+#define RSPAMD_SYMCACHE_INTERNAL_HXX
+#pragma once
+
+#include <cmath>
+#include <cstdlib>
+#include <cstdint>
+#include <vector>
+#include <string>
+#include <string_view>
+#include <memory>
+#include <variant>
+#include "contrib/robin-hood/robin_hood.h"
+
+#include "cfg_file.h"
+#include "lua/lua_common.h"
+
+#define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
+ cache->static_pool->tag.tagname, cache->cfg->checksum, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
+ static_pool->tag.tagname, cfg->checksum, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_info_cache(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
+ static_pool->tag.tagname, cfg->checksum, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_debug_cache(...) rspamd_conditional_debug_fast (NULL, NULL, \
+ rspamd_symcache_log_id, "symcache", cfg->checksum, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \
+ rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+
+namespace rspamd::symcache {
+
+/* Defined in symcache_impl.cxx */
+extern int rspamd_symcache_log_id;
+
+static const std::uint8_t symcache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0};
+
+struct symcache_header {
+ std::uint8_t magic[8];
+ unsigned int nitems;
+ std::uint8_t checksum[64];
+ std::uint8_t unused[128];
+};
+
+struct cache_item;
+using cache_item_ptr = std::shared_ptr<cache_item>;
+using cache_item_weak_ptr = std::weak_ptr<cache_item>;
+
+struct order_generation {
+ std::vector<cache_item_weak_ptr> d;
+ unsigned int generation_id;
+};
+
+using order_generation_ptr = std::shared_ptr<order_generation>;
+
+/*
+ * This structure is optimised to store ids list:
+ * - If the first element is -1 then use dynamic part, else use static part
+ * There is no std::variant to save space
+ */
+struct id_list {
+ union {
+ std::uint32_t st[4];
+ struct {
+ std::uint32_t e; /* First element */
+ std::uint16_t len;
+ std::uint16_t allocated;
+ std::uint32_t *n;
+ } dyn;
+ } data;
+
+ id_list() {
+ std::memset((void *)&data, 0, sizeof(data));
+ }
+ /**
+ * Returns ids from a compressed list, accepting a mutable reference for number of elements
+ * @param nids output of the number of elements
+ * @return
+ */
+ auto get_ids(std::size_t &nids) const -> const std::uint32_t * {
+ if (data.dyn.e == -1) {
+ /* Dynamic list */
+ nids = data.dyn.len;
+
+ return data.dyn.n;
+ }
+ else {
+ auto cnt = 0;
+
+ while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS(data.st)) {
+ cnt ++;
+ }
+
+ nids = cnt;
+
+ return data.st;
+ }
+ }
+
+ auto add_id(std::uint32_t id, rspamd_mempool_t *pool) -> void {
+ if (data.st[0] == -1) {
+ /* Dynamic array */
+ if (data.dyn.len < data.dyn.allocated) {
+ /* Trivial, append + sort */
+ data.dyn.n[data.dyn.len++] = id;
+ }
+ else {
+ /* Reallocate */
+ g_assert (data.dyn.allocated <= G_MAXINT16);
+ data.dyn.allocated *= 2;
+
+ auto *new_array = rspamd_mempool_alloc_array_type(pool,
+ data.dyn.allocated, std::uint32_t);
+ memcpy(new_array, data.dyn.n, data.dyn.len * sizeof(std::uint32_t));
+ data.dyn.n = new_array;
+ data.dyn.n[data.dyn.len++] = id;
+ }
+
+ std::sort(data.dyn.n, data.dyn.n + data.dyn.len);
+ }
+ else {
+ /* Static part */
+ auto cnt = 0u;
+ while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS (data.st)) {
+ cnt ++;
+ }
+
+ if (cnt < G_N_ELEMENTS (data.st)) {
+ data.st[cnt] = id;
+ }
+ else {
+ /* Switch to dynamic */
+ data.dyn.allocated = G_N_ELEMENTS (data.st) * 2;
+ auto *new_array = rspamd_mempool_alloc_array_type(pool,
+ data.dyn.allocated, std::uint32_t);
+ memcpy (new_array, data.st, sizeof(data.st));
+ data.dyn.n = new_array;
+ data.dyn.e = -1; /* Marker */
+ data.dyn.len = G_N_ELEMENTS (data.st);
+
+ /* Recursively jump to dynamic branch that will handle insertion + sorting */
+ add_id(id, pool); // tail call
+ }
+ }
+ }
+};
+
+class symcache;
+
+struct item_condition {
+private:
+ gint cb;
+ lua_State *L;
+public:
+ item_condition() {
+ // TODO
+ }
+ virtual ~item_condition() {
+ // TODO
+ }
+};
+
+class normal_item {
+private:
+ symbol_func_t func;
+ void *user_data;
+ std::vector<item_condition> conditions;
+public:
+ explicit normal_item() {
+ // TODO
+ }
+ auto add_condition() -> void {
+ // TODO
+ }
+ auto call() -> void {
+ // TODO
+ }
+};
+
+class virtual_item {
+private:
+ int parent_id;
+ cache_item_ptr parent;
+public:
+ explicit virtual_item() {
+ // TODO
+ }
+
+ auto get_parent(const symcache &cache) const -> const cache_item_ptr&;
+};
+
+struct cache_item {
+ /* This block is likely shared */
+ struct rspamd_symcache_item_stat *st;
+ struct rspamd_counter_data *cd;
+
+ std::uint64_t last_count;
+ std::string symbol;
+ std::string_view type_descr;
+ int type;
+
+ /* Callback data */
+ std::variant<normal_item, virtual_item> specific;
+
+ /* Condition of execution */
+ bool enabled;
+
+ /* Priority */
+ int priority;
+ /* Topological order */
+ unsigned int order;
+ /* Unique id - counter */
+ int id;
+
+ int frequency_peaks;
+ /* Settings ids */
+ id_list allowed_ids;
+ /* Allows execution but not symbols insertion */
+ id_list exec_only_ids;
+ id_list forbidden_ids;
+
+ /* Dependencies */
+ std::vector<cache_item_ptr> deps;
+ /* Reverse dependencies */
+ std::vector<cache_item_ptr> rdeps;
+
+ auto is_virtual() const -> bool { return std::holds_alternative<virtual_item>(specific); }
+ auto get_parent(const symcache &cache) const -> const cache_item_ptr &;
+};
+
+struct delayed_cache_dependency {
+ std::string from;
+ std::string to;
+};
+
+struct delayed_cache_condition {
+ std::string sym;
+ int cbref;
+ lua_State *L;
+};
+
+class symcache {
+private:
+ /* Map indexed by symbol name: all symbols must have unique names, so this map holds ownership */
+ robin_hood::unordered_flat_map<std::string_view, cache_item_ptr> items_by_symbol;
+ std::vector<cache_item_ptr> items_by_id;
+
+ /* Items sorted into some order */
+ order_generation_ptr items_by_order;
+ unsigned int cur_order_gen;
+
+ std::vector<cache_item_ptr> connfilters;
+ std::vector<cache_item_ptr> prefilters;
+ std::vector<cache_item_ptr> filters;
+ std::vector<cache_item_ptr> postfilters;
+ std::vector<cache_item_ptr> composites;
+ std::vector<cache_item_ptr> idempotent;
+ std::vector<cache_item_ptr> virtual_symbols;
+
+ /* These are stored within pointer to clean up after init */
+ std::unique_ptr<std::vector<delayed_cache_dependency>> delayed_deps;
+ std::unique_ptr<std::vector<delayed_cache_condition>> delayed_conditions;
+
+ rspamd_mempool_t *static_pool;
+ std::uint64_t cksum;
+ double total_weight;
+ std::size_t used_items;
+ std::size_t stats_symbols_count;
+ std::uint64_t total_hits;
+
+ struct rspamd_config *cfg;
+ lua_State *L;
+ double reload_time;
+ double last_profile;
+ int peak_cb;
+ int cache_id;
+
+private:
+ /* Internal methods */
+ auto load_items() -> bool;
+
+public:
+ explicit symcache(struct rspamd_config *cfg) : cfg(cfg) {
+ /* XXX: do we need a special pool for symcache? I don't think so */
+ static_pool = cfg->cfg_pool;
+ reload_time = cfg->cache_reload_time;
+ total_hits = 1;
+ total_weight = 1.0;
+ cksum = 0xdeadbabe;
+ peak_cb = -1;
+ cache_id = rspamd_random_uint64_fast();
+ L = (lua_State *)cfg->lua_state;
+ }
+
+ virtual ~symcache() {
+ if (peak_cb != -1) {
+ luaL_unref(L, LUA_REGISTRYINDEX, peak_cb);
+ }
+ }
+
+ auto get_item_by_id(int id, bool resolve_parent) const -> const cache_item_ptr &;
+
+ /*
+ * Initialises the symbols cache, must be called after all symbols are added
+ * and the config file is loaded
+ */
+ auto init() -> bool;
+};
+
+/*
+ * These items are saved within task structure and are used to track
+ * symbols execution
+ */
+struct cache_dynamic_item {
+ std::uint16_t start_msec; /* Relative to task time */
+ unsigned started: 1;
+ unsigned finished: 1;
+ /* unsigned pad:14; */
+ std::uint32_t async_events;
+};
+
+
+struct cache_dependency {
+ cache_item_ptr item; /* Owning pointer to the real dep */
+ std::string_view sym; /* Symbolic dep name */
+ int id; /* Real from */
+ int vid; /* Virtual from */
+};
+
+struct cache_savepoint {
+ unsigned order_gen;
+ unsigned items_inflight;
+ bool profile;
+ bool has_slow;
+
+ double profile_start;
+ double lim;
+
+ struct rspamd_scan_result *rs;
+
+ struct cache_item *cur_item;
+ order_generation_ptr order;
+ /* Dynamically expanded as needed */
+ struct cache_dynamic_item dynamic_items[];
+};
+
+struct cache_refresh_cbdata {
+ double last_resort;
+ ev_timer resort_ev;
+ symcache *cache;
+ struct rspamd_worker *w;
+ struct ev_loop *event_loop;
+};
+
+} // namespace rspamd
+
+#endif //RSPAMD_SYMCACHE_INTERNAL_HXX