aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2022-04-02 16:45:41 +0100
committerVsevolod Stakhov <vsevolod@rspamd.com>2022-04-02 16:45:41 +0100
commitfd9693073dc162c43710bd977830a3b400f2b43b (patch)
tree5e5831dfdc3d3c95a52f8cc75478d25149d412fd
parent4b6706a6955b111fb9366fdea2f87c81a9d2edd1 (diff)
downloadrspamd-fd9693073dc162c43710bd977830a3b400f2b43b.tar.gz
rspamd-fd9693073dc162c43710bd977830a3b400f2b43b.zip
[Rework] Rework files structure
-rw-r--r--src/libserver/CMakeLists.txt5
-rw-r--r--src/libserver/rspamd_symcache.cxx616
-rw-r--r--src/libserver/symcache/symcache_c.cxx48
-rw-r--r--src/libserver/symcache/symcache_impl.cxx292
-rw-r--r--src/libserver/symcache/symcache_internal.hxx385
-rw-r--r--src/libutil/CMakeLists.txt3
6 files changed, 730 insertions, 619 deletions
diff --git a/src/libserver/CMakeLists.txt b/src/libserver/CMakeLists.txt
index 7371e8ade..17f5ca751 100644
--- a/src/libserver/CMakeLists.txt
+++ b/src/libserver/CMakeLists.txt
@@ -16,11 +16,12 @@ SET(LIBRSPAMDSERVERSRC
${CMAKE_CURRENT_SOURCE_DIR}/monitored.c
${CMAKE_CURRENT_SOURCE_DIR}/protocol.c
${CMAKE_CURRENT_SOURCE_DIR}/re_cache.c
- ${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.cxx
+ ${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.cxx
${CMAKE_CURRENT_SOURCE_DIR}/roll_history.c
${CMAKE_CURRENT_SOURCE_DIR}/spf.c
${CMAKE_CURRENT_SOURCE_DIR}/ssl_util.c
- ${CMAKE_CURRENT_SOURCE_DIR}/rspamd_symcache.cxx
+ ${CMAKE_CURRENT_SOURCE_DIR}/symcache/symcache_impl.cxx
+ ${CMAKE_CURRENT_SOURCE_DIR}/symcache/symcache_c.cxx
${CMAKE_CURRENT_SOURCE_DIR}/task.c
${CMAKE_CURRENT_SOURCE_DIR}/url.c
${CMAKE_CURRENT_SOURCE_DIR}/worker_util.c
diff --git a/src/libserver/rspamd_symcache.cxx b/src/libserver/rspamd_symcache.cxx
index cd44c2b84..e1e9ade4e 100644
--- a/src/libserver/rspamd_symcache.cxx
+++ b/src/libserver/rspamd_symcache.cxx
@@ -36,26 +36,6 @@
#include "contrib/robin-hood/robin_hood.h"
-#define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
- cache->static_pool->tag.tagname, cache->cfg->checksum, \
- G_STRFUNC, \
- __VA_ARGS__)
-#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
- cache->static_pool->tag.tagname, cache->cfg->checksum, \
- G_STRFUNC, \
- __VA_ARGS__)
-#define msg_info_cache(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
- cache->static_pool->tag.tagname, cache->cfg->checksum, \
- G_STRFUNC, \
- __VA_ARGS__)
-#define msg_debug_cache(...) rspamd_conditional_debug_fast (NULL, NULL, \
- rspamd_symcache_log_id, "symcache", cache->cfg->checksum, \
- G_STRFUNC, \
- __VA_ARGS__)
-#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \
- rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \
- G_STRFUNC, \
- __VA_ARGS__)
INIT_LOG_MODULE(symcache)
@@ -73,311 +53,7 @@ INIT_LOG_MODULE(symcache)
#define CLR_FINISH_BIT(checkpoint, dyn_item) \
(dyn_item)->finished = 0
-namespace rspamd::symcache {
-static const std::uint8_t rspamd_symcache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0};
-
-struct rspamd_symcache_header {
- std::uint8_t magic[8];
- unsigned int nitems;
- std::uint8_t checksum[64];
- std::uint8_t unused[128];
-};
-
-struct cache_item;
-using cache_item_ptr = std::shared_ptr<cache_item>;
-using cache_item_weak_ptr = std::weak_ptr<cache_item>;
-
-struct order_generation {
- std::vector<cache_item_weak_ptr> d;
- unsigned int generation_id;
-};
-
-using order_generation_ptr = std::shared_ptr<order_generation>;
-
-/*
- * This structure is optimised to store ids list:
- * - If the first element is -1 then use dynamic part, else use static part
- * There is no std::variant to save space
- */
-struct id_list {
- union {
- std::uint32_t st[4];
- struct {
- std::uint32_t e; /* First element */
- std::uint16_t len;
- std::uint16_t allocated;
- std::uint32_t *n;
- } dyn;
- } data;
-
- id_list() {
- std::memset((void *)&data, 0, sizeof(data));
- }
- /**
- * Returns ids from a compressed list, accepting a mutable reference for number of elements
- * @param nids output of the number of elements
- * @return
- */
- auto get_ids(std::size_t &nids) const -> const std::uint32_t * {
- if (data.dyn.e == -1) {
- /* Dynamic list */
- nids = data.dyn.len;
-
- return data.dyn.n;
- }
- else {
- auto cnt = 0;
-
- while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS(data.st)) {
- cnt ++;
- }
-
- nids = cnt;
-
- return data.st;
- }
- }
-
- auto add_id(std::uint32_t id, rspamd_mempool_t *pool) -> void {
- if (data.st[0] == -1) {
- /* Dynamic array */
- if (data.dyn.len < data.dyn.allocated) {
- /* Trivial, append + sort */
- data.dyn.n[data.dyn.len++] = id;
- }
- else {
- /* Reallocate */
- g_assert (data.dyn.allocated <= G_MAXINT16);
- data.dyn.allocated *= 2;
-
- auto *new_array = rspamd_mempool_alloc_array_type(pool,
- data.dyn.allocated, std::uint32_t);
- memcpy(new_array, data.dyn.n, data.dyn.len * sizeof(std::uint32_t));
- data.dyn.n = new_array;
- data.dyn.n[data.dyn.len++] = id;
- }
-
- std::sort(data.dyn.n, data.dyn.n + data.dyn.len);
- }
- else {
- /* Static part */
- auto cnt = 0u;
- while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS (data.st)) {
- cnt ++;
- }
-
- if (cnt < G_N_ELEMENTS (data.st)) {
- data.st[cnt] = id;
- }
- else {
- /* Switch to dynamic */
- data.dyn.allocated = G_N_ELEMENTS (data.st) * 2;
- auto *new_array = rspamd_mempool_alloc_array_type(pool,
- data.dyn.allocated, std::uint32_t);
- memcpy (new_array, data.st, sizeof(data.st));
- data.dyn.n = new_array;
- data.dyn.e = -1; /* Marker */
- data.dyn.len = G_N_ELEMENTS (data.st);
-
- /* Recursively jump to dynamic branch that will handle insertion + sorting */
- add_id(id, pool); // tail call
- }
- }
- }
-};
-
-struct item_condition {
-private:
- gint cb;
- lua_State *L;
-public:
- item_condition() {
- // TODO
- }
- virtual ~item_condition() {
- // TODO
- }
-};
-
-class normal_item {
-private:
- symbol_func_t func;
- void *user_data;
- std::vector<item_condition> conditions;
-public:
- explicit normal_item() {
- // TODO
- }
- auto add_condition() -> void {
- // TODO
- }
- auto call() -> void {
- // TODO
- }
-};
-
-class virtual_item {
-private:
- int parent_id;
- cache_item_ptr parent;
-public:
- explicit virtual_item() {
- // TODO
- }
-};
-
-struct cache_item {
- /* This block is likely shared */
- struct rspamd_symcache_item_stat *st;
- struct rspamd_counter_data *cd;
-
- std::uint64_t last_count;
- std::string symbol;
- std::string_view type_descr;
- int type;
-
- /* Callback data */
- std::variant<normal_item, virtual_item> specific;
-
- /* Condition of execution */
- bool enabled;
-
- /* Priority */
- int priority;
- /* Topological order */
- unsigned int order;
- /* Unique id - counter */
- int id;
-
- int frequency_peaks;
- /* Settings ids */
- id_list allowed_ids;
- /* Allows execution but not symbols insertion */
- id_list exec_only_ids;
- id_list forbidden_ids;
-
- /* Dependencies */
- std::vector<cache_item_ptr> deps;
- /* Reverse dependencies */
- std::vector<cache_item_ptr> rdeps;
-};
-
-struct delayed_cache_dependency {
- std::string from;
- std::string to;
-};
-
-struct delayed_cache_condition {
- std::string sym;
- int cbref;
- lua_State *L;
-};
-
-struct symcache {
- /* Map indexed by symbol name: all symbols must have unique names, so this map holds ownership */
- robin_hood::unordered_flat_map<std::string_view, cache_item_ptr> items_by_symbol;
- std::vector<cache_item_weak_ptr> items_by_id;
-
- /* Items sorted into some order */
- order_generation_ptr items_by_order;
- unsigned int cur_order_gen;
-
- std::vector<cache_item_weak_ptr> connfilters;
- std::vector<cache_item_weak_ptr> prefilters;
- std::vector<cache_item_weak_ptr> filters;
- std::vector<cache_item_weak_ptr> postfilters;
- std::vector<cache_item_weak_ptr> composites;
- std::vector<cache_item_weak_ptr> idempotent;
- std::vector<cache_item_weak_ptr> virtual_symbols;
-
- /* These are stored within pointer to clean up after init */
- std::unique_ptr<std::vector<delayed_cache_dependency>> delayed_deps;
- std::unique_ptr<std::vector<delayed_cache_condition>> delayed_conditions;
-
- rspamd_mempool_t *static_pool;
- std::uint64_t cksum;
- double total_weight;
- std::size_t used_items;
- std::size_t stats_symbols_count;
- std::uint64_t total_hits;
-
- struct rspamd_config *cfg;
- lua_State *L;
- double reload_time;
- double last_profile;
- int peak_cb;
- int id;
-
-public:
- explicit symcache(struct rspamd_config *cfg) : cfg(cfg) {
- /* XXX: do we need a special pool for symcache? I don't think so */
- static_pool = cfg->cfg_pool;
- reload_time = cfg->cache_reload_time;
- total_hits = 1;
- total_weight = 1.0;
- cksum = 0xdeadbabe;
- peak_cb = -1;
- id = rspamd_random_uint64_fast();
- L = (lua_State *)cfg->lua_state;
- }
-
- virtual ~symcache() {
- if (peak_cb != -1) {
- luaL_unref(L, LUA_REGISTRYINDEX, peak_cb);
- }
- }
-};
-
-
-/*
- * These items are saved within task structure and are used to track
- * symbols execution
- */
-struct cache_dynamic_item {
- std::uint16_t start_msec; /* Relative to task time */
- unsigned started: 1;
- unsigned finished: 1;
- /* unsigned pad:14; */
- std::uint32_t async_events;
-};
-
-
-struct cache_dependency {
- cache_item_ptr item; /* Owning pointer to the real dep */
- std::string_view sym; /* Symbolic dep name */
- int id; /* Real from */
- int vid; /* Virtual from */
-};
-
-struct cache_savepoint {
- unsigned order_gen;
- unsigned items_inflight;
- bool profile;
- bool has_slow;
-
- double profile_start;
- double lim;
-
- struct rspamd_scan_result *rs;
-
- struct cache_item *cur_item;
- order_generation_ptr order;
- /* Dynamically expanded as needed */
- struct cache_dynamic_item dynamic_items[];
-};
-
-struct cache_refresh_cbdata {
- double last_resort;
- ev_timer resort_ev;
- struct symcache *cache;
- struct rspamd_worker *w;
- struct ev_loop *event_loop;
-};
-
-} // namespace rspamd
-
-#define C_API_SYMCACHE(ptr) (reinterpret_cast<rspamd::symcache::symcache *>(ptr))
/* At least once per minute */
#define PROFILE_MAX_TIME (60.0)
@@ -831,260 +507,6 @@ rspamd_symcache_process_dep (struct rspamd_symcache *cache,
}
}
-/* Sort items in logical order */
-static void
-rspamd_symcache_post_init (struct rspamd_symcache *cache)
-{
- struct rspamd_symcache_item *it, *vit;
- struct cache_dependency *dep;
- struct delayed_cache_dependency *ddep;
- struct delayed_cache_condition *dcond;
- GList *cur;
- gint i, j;
-
- cur = cache->delayed_deps;
- while (cur) {
- ddep = cur->data;
-
- vit = rspamd_symcache_find_filter (cache, ddep->from, false);
- it = rspamd_symcache_find_filter (cache, ddep->from, true);
-
- if (it == NULL || vit == NULL) {
- msg_err_cache ("cannot register delayed dependency between %s and %s: "
- "%s is missing", ddep->from, ddep->to, ddep->from);
- }
- else {
- msg_debug_cache ("delayed between %s(%d:%d) -> %s", ddep->from,
- it->id, vit->id, ddep->to);
- rspamd_symcache_add_dependency (cache, it->id, ddep->to, vit != it ?
- vit->id : -1);
- }
-
- cur = g_list_next (cur);
- }
-
- cur = cache->delayed_conditions;
- while (cur) {
- dcond = cur->data;
-
- it = rspamd_symcache_find_filter (cache, dcond->sym, true);
-
- if (it == NULL) {
- msg_err_cache (
- "cannot register delayed condition for %s",
- dcond->sym);
- luaL_unref (dcond->L, LUA_REGISTRYINDEX, dcond->cbref);
- }
- else {
- struct rspamd_symcache_condition *ncond = rspamd_mempool_alloc0 (cache->static_pool,
- sizeof (*ncond));
- ncond->cb = dcond->cbref;
- DL_APPEND (it->specific.normal.conditions, ncond);
- }
-
- cur = g_list_next (cur);
- }
-
- PTR_ARRAY_FOREACH (cache->items_by_id, i, it) {
-
- PTR_ARRAY_FOREACH (it->deps, j, dep) {
- rspamd_symcache_process_dep (cache, it, dep);
- }
-
- if (it->deps) {
- /* Reversed loop to make removal safe */
- for (j = it->deps->len - 1; j >= 0; j--) {
- dep = g_ptr_array_index (it->deps, j);
-
- if (dep->item == NULL) {
- /* Remove useless dep */
- g_ptr_array_remove_index (it->deps, j);
- }
- }
- }
- }
-
- /* Special case for virtual symbols */
- PTR_ARRAY_FOREACH (cache->virtual, i, it) {
-
- PTR_ARRAY_FOREACH (it->deps, j, dep) {
- rspamd_symcache_process_dep (cache, it, dep);
- }
- }
-
- g_ptr_array_sort_with_data (cache->connfilters, prefilters_cmp, cache);
- g_ptr_array_sort_with_data (cache->prefilters, prefilters_cmp, cache);
- g_ptr_array_sort_with_data (cache->postfilters, postfilters_cmp, cache);
- g_ptr_array_sort_with_data (cache->idempotent, postfilters_cmp, cache);
-
- rspamd_symcache_resort (cache);
-}
-
-static gboolean
-rspamd_symcache_load_items (struct rspamd_symcache *cache, const gchar *name)
-{
- struct rspamd_symcache_header *hdr;
- struct stat st;
- struct ucl_parser *parser;
- ucl_object_t *top;
- const ucl_object_t *cur, *elt;
- ucl_object_iter_t it;
- struct rspamd_symcache_item *item, *parent;
- const guchar *p;
- gint fd;
- gpointer map;
-
- fd = open (name, O_RDONLY);
-
- if (fd == -1) {
- msg_info_cache ("cannot open file %s, error %d, %s", name,
- errno, strerror (errno));
- return FALSE;
- }
-
- rspamd_file_lock (fd, FALSE);
-
- if (fstat (fd, &st) == -1) {
- rspamd_file_unlock (fd, FALSE);
- close (fd);
- msg_info_cache ("cannot stat file %s, error %d, %s", name,
- errno, strerror (errno));
- return FALSE;
- }
-
- if (st.st_size < (gint)sizeof (*hdr)) {
- rspamd_file_unlock (fd, FALSE);
- close (fd);
- errno = EINVAL;
- msg_info_cache ("cannot use file %s, error %d, %s", name,
- errno, strerror (errno));
- return FALSE;
- }
-
- map = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, fd, 0);
-
- if (map == MAP_FAILED) {
- rspamd_file_unlock (fd, FALSE);
- close (fd);
- msg_info_cache ("cannot mmap file %s, error %d, %s", name,
- errno, strerror (errno));
- return FALSE;
- }
-
- hdr = map;
-
- if (memcmp (hdr->magic, rspamd_symcache_magic,
- sizeof (rspamd_symcache_magic)) != 0) {
- msg_info_cache ("cannot use file %s, bad magic", name);
- munmap (map, st.st_size);
- rspamd_file_unlock (fd, FALSE);
- close (fd);
-
- return FALSE;
- }
-
- parser = ucl_parser_new (0);
- p = (const guchar *)(hdr + 1);
-
- if (!ucl_parser_add_chunk (parser, p, st.st_size - sizeof (*hdr))) {
- msg_info_cache ("cannot use file %s, cannot parse: %s", name,
- ucl_parser_get_error (parser));
- munmap (map, st.st_size);
- ucl_parser_free (parser);
- rspamd_file_unlock (fd, FALSE);
- close (fd);
-
- return FALSE;
- }
-
- top = ucl_parser_get_object (parser);
- munmap (map, st.st_size);
- rspamd_file_unlock (fd, FALSE);
- close (fd);
- ucl_parser_free (parser);
-
- if (top == NULL || ucl_object_type (top) != UCL_OBJECT) {
- msg_info_cache ("cannot use file %s, bad object", name);
- ucl_object_unref (top);
- return FALSE;
- }
-
- it = ucl_object_iterate_new (top);
-
- while ((cur = ucl_object_iterate_safe (it, true))) {
- item = g_hash_table_lookup (cache->items_by_symbol, ucl_object_key (cur));
-
- if (item) {
- /* Copy saved info */
- /*
- * XXX: don't save or load weight, it should be obtained from the
- * metric
- */
-#if 0
- elt = ucl_object_lookup (cur, "weight");
-
- if (elt) {
- w = ucl_object_todouble (elt);
- if (w != 0) {
- item->weight = w;
- }
- }
-#endif
- elt = ucl_object_lookup (cur, "time");
- if (elt) {
- item->st->avg_time = ucl_object_todouble (elt);
- }
-
- elt = ucl_object_lookup (cur, "count");
- if (elt) {
- item->st->total_hits = ucl_object_toint (elt);
- item->last_count = item->st->total_hits;
- }
-
- elt = ucl_object_lookup (cur, "frequency");
- if (elt && ucl_object_type (elt) == UCL_OBJECT) {
- const ucl_object_t *freq_elt;
-
- freq_elt = ucl_object_lookup (elt, "avg");
-
- if (freq_elt) {
- item->st->avg_frequency = ucl_object_todouble (freq_elt);
- }
- freq_elt = ucl_object_lookup (elt, "stddev");
-
- if (freq_elt) {
- item->st->stddev_frequency = ucl_object_todouble (freq_elt);
- }
- }
-
- if (item->is_virtual && !(item->type & SYMBOL_TYPE_GHOST)) {
- g_assert (item->specific.virtual.parent < (gint)cache->items_by_id->len);
- parent = g_ptr_array_index (cache->items_by_id,
- item->specific.virtual.parent);
- item->specific.virtual.parent_item = parent;
-
- if (parent->st->weight < item->st->weight) {
- parent->st->weight = item->st->weight;
- }
-
- /*
- * We maintain avg_time for virtual symbols equal to the
- * parent item avg_time
- */
- item->st->avg_time = parent->st->avg_time;
- }
-
- cache->total_weight += fabs (item->st->weight);
- cache->total_hits += item->st->total_hits;
- }
- }
-
- ucl_object_iterate_free (it);
- ucl_object_unref (top);
-
- return TRUE;
-}
-
#define ROUND_DOUBLE(x) (floor((x) * 100.0) / 100.0)
static gboolean
@@ -1438,21 +860,6 @@ rspamd_symcache_save (struct rspamd_symcache *cache)
}
}
-void
-rspamd_symcache_destroy (struct rspamd_symcache *cache)
-{
- auto *real_cache = C_API_SYMCACHE(cache);
-
- delete real_cache;
-}
-
-struct rspamd_symcache*
-rspamd_symcache_new (struct rspamd_config *cfg)
-{
- auto *ncache = new rspamd::symcache::symcache(cfg);
-
- return (struct rspamd_symcache*)ncache;
-}
static void
rspamd_symcache_metric_connect_cb (gpointer k, gpointer v, gpointer ud)
@@ -1472,30 +879,7 @@ rspamd_symcache_metric_connect_cb (gpointer k, gpointer v, gpointer ud)
}
}
-gboolean
-rspamd_symcache_init (struct rspamd_symcache *cache)
-{
- gboolean res = TRUE;
-
- g_assert (cache != NULL);
-
- cache->reload_time = cache->cfg->cache_reload_time;
- if (cache->cfg->cache_filename != NULL) {
- res = rspamd_symcache_load_items (cache, cache->cfg->cache_filename);
- }
-
- rspamd_symcache_post_init (cache);
-
- /* Connect metric symbols with symcache symbols */
- if (cache->cfg->symbols) {
- g_hash_table_foreach (cache->cfg->symbols,
- rspamd_symcache_metric_connect_cb,
- cache);
- }
-
- return res;
-}
static void
diff --git a/src/libserver/symcache/symcache_c.cxx b/src/libserver/symcache/symcache_c.cxx
new file mode 100644
index 000000000..7255f9d10
--- /dev/null
+++ b/src/libserver/symcache/symcache_c.cxx
@@ -0,0 +1,48 @@
+/*-
+ * Copyright 2022 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "symcache_internal.hxx"
+
+/**
+ * C API for symcache
+ */
+
+#define C_API_SYMCACHE(ptr) (reinterpret_cast<rspamd::symcache::symcache *>(ptr))
+#define C_API_SYMCACHE_ITEM(ptr) (reinterpret_cast<rspamd::symcache::cache_item *>(ptr))
+
+void
+rspamd_symcache_destroy (struct rspamd_symcache *cache)
+{
+ auto *real_cache = C_API_SYMCACHE(cache);
+
+ delete real_cache;
+}
+
+struct rspamd_symcache*
+rspamd_symcache_new (struct rspamd_config *cfg)
+{
+ auto *ncache = new rspamd::symcache::symcache(cfg);
+
+ return (struct rspamd_symcache*)ncache;
+}
+
+gboolean
+rspamd_symcache_init (struct rspamd_symcache *cache)
+{
+ auto *real_cache = C_API_SYMCACHE(cache);
+
+ return real_cache->init();
+}
diff --git a/src/libserver/symcache/symcache_impl.cxx b/src/libserver/symcache/symcache_impl.cxx
new file mode 100644
index 000000000..aaf8b0cdd
--- /dev/null
+++ b/src/libserver/symcache/symcache_impl.cxx
@@ -0,0 +1,292 @@
+/*-
+ * Copyright 2022 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "symcache_internal.hxx"
+#include "unix-std.h"
+#include "libutil/cxx/locked_file.hxx"
+
+namespace rspamd::symcache {
+
+INIT_LOG_MODULE_PUBLIC(symcache)
+
+auto symcache::init() -> bool
+{
+ auto res = true;
+ reload_time = cfg->cache_reload_time;
+
+ if (cfg->cache_filename != NULL) {
+ res = load_items();
+ }
+
+ struct rspamd_symcache_item *it, *vit;
+ struct cache_dependency *dep;
+ struct delayed_cache_dependency *ddep;
+ struct delayed_cache_condition *dcond;
+ GList *cur;
+ gint i, j;
+
+ cur = cache->delayed_deps;
+ while (cur) {
+ ddep = cur->data;
+
+ vit = rspamd_symcache_find_filter(cache, ddep->from, false);
+ it = rspamd_symcache_find_filter(cache, ddep->from, true);
+
+ if (it == NULL || vit == NULL) {
+ msg_err_cache ("cannot register delayed dependency between %s and %s: "
+ "%s is missing", ddep->from, ddep->to, ddep->from);
+ }
+ else {
+ msg_debug_cache ("delayed between %s(%d:%d) -> %s", ddep->from,
+ it->id, vit->id, ddep->to);
+ rspamd_symcache_add_dependency(cache, it->id, ddep->to, vit != it ?
+ vit->id : -1);
+ }
+
+ cur = g_list_next (cur);
+ }
+
+ cur = cache->delayed_conditions;
+ while (cur) {
+ dcond = cur->data;
+
+ it = rspamd_symcache_find_filter(cache, dcond->sym, true);
+
+ if (it == NULL) {
+ msg_err_cache (
+ "cannot register delayed condition for %s",
+ dcond->sym);
+ luaL_unref(dcond->L, LUA_REGISTRYINDEX, dcond->cbref);
+ }
+ else {
+ struct rspamd_symcache_condition *ncond = rspamd_mempool_alloc0 (cache->static_pool,
+ sizeof(*ncond));
+ ncond->cb = dcond->cbref;
+ DL_APPEND(it->specific.normal.conditions, ncond);
+ }
+
+ cur = g_list_next (cur);
+ }
+
+ PTR_ARRAY_FOREACH (cache->items_by_id, i, it) {
+
+ PTR_ARRAY_FOREACH (it->deps, j, dep) {
+ rspamd_symcache_process_dep(cache, it, dep);
+ }
+
+ if (it->deps) {
+ /* Reversed loop to make removal safe */
+ for (j = it->deps->len - 1; j >= 0; j--) {
+ dep = g_ptr_array_index (it->deps, j);
+
+ if (dep->item == NULL) {
+ /* Remove useless dep */
+ g_ptr_array_remove_index(it->deps, j);
+ }
+ }
+ }
+ }
+
+ /* Special case for virtual symbols */
+ PTR_ARRAY_FOREACH (cache->virtual, i, it) {
+
+ PTR_ARRAY_FOREACH (it->deps, j, dep) {
+ rspamd_symcache_process_dep(cache, it, dep);
+ }
+ }
+
+ g_ptr_array_sort_with_data(cache->connfilters, prefilters_cmp, cache);
+ g_ptr_array_sort_with_data(cache->prefilters, prefilters_cmp, cache);
+ g_ptr_array_sort_with_data(cache->postfilters, postfilters_cmp, cache);
+ g_ptr_array_sort_with_data(cache->idempotent, postfilters_cmp, cache);
+
+ rspamd_symcache_resort(cache);
+
+ /* Connect metric symbols with symcache symbols */
+ if (cache->cfg->symbols) {
+ g_hash_table_foreach(cache->cfg->symbols,
+ rspamd_symcache_metric_connect_cb,
+ cache);
+ }
+
+ return res;
+}
+
+auto symcache::load_items() -> bool
+{
+ auto cached_map = util::raii_mmaped_locked_file::mmap_shared(cfg->cache_filename,
+ O_RDONLY, PROT_READ);
+
+ if (!cached_map.has_value()) {
+ msg_info_cache("%s", cached_map.error().c_str());
+ return false;
+ }
+
+
+ if (cached_map->get_size() < (gint) sizeof(symcache_header)) {
+ msg_info_cache("cannot use file %s, truncated: %z", cfg->cache_filename, ,
+ errno, strerror(errno));
+ return false;
+ }
+
+ const auto *hdr = (struct symcache_header *)cached_map->get_map();
+
+ if (memcmp(hdr->magic, symcache_magic,
+ sizeof(symcache_magic)) != 0) {
+ msg_info_cache("cannot use file %s, bad magic", cfg->cache_filename);
+
+ return false;
+ }
+
+ auto *parser = ucl_parser_new(0);
+ const auto *p = (const std::uint8_t *)(hdr + 1);
+
+ if (!ucl_parser_add_chunk(parser, p, cached_map->get_size() - sizeof(*hdr))) {
+ msg_info_cache ("cannot use file %s, cannot parse: %s", cfg->cache_filename,
+ ucl_parser_get_error(parser));
+ ucl_parser_free(parser);
+
+ return false;
+ }
+
+ auto *top = ucl_parser_get_object(parser);
+ ucl_parser_free(parser);
+
+ if (top == nullptr || ucl_object_type(top) != UCL_OBJECT) {
+ msg_info_cache ("cannot use file %s, bad object", cfg->cache_filename);
+ ucl_object_unref(top);
+
+ return false;
+ }
+
+ auto it = ucl_object_iterate_new(top);
+ const ucl_object_t *cur;
+ while ((cur = ucl_object_iterate_safe(it, true)) != nullptr) {
+ auto item_it = items_by_symbol.find(ucl_object_key(cur));
+
+ if (item_it != items_by_symbol.end()) {
+ auto item = item_it->second;
+ /* Copy saved info */
+ /*
+ * XXX: don't save or load weight, it should be obtained from the
+ * metric
+ */
+#if 0
+ elt = ucl_object_lookup (cur, "weight");
+
+ if (elt) {
+ w = ucl_object_todouble (elt);
+ if (w != 0) {
+ item->weight = w;
+ }
+ }
+#endif
+ const auto *elt = ucl_object_lookup(cur, "time");
+ if (elt) {
+ item->st->avg_time = ucl_object_todouble(elt);
+ }
+
+ elt = ucl_object_lookup(cur, "count");
+ if (elt) {
+ item->st->total_hits = ucl_object_toint(elt);
+ item->last_count = item->st->total_hits;
+ }
+
+ elt = ucl_object_lookup(cur, "frequency");
+ if (elt && ucl_object_type(elt) == UCL_OBJECT) {
+ const ucl_object_t *freq_elt;
+
+ freq_elt = ucl_object_lookup(elt, "avg");
+
+ if (freq_elt) {
+ item->st->avg_frequency = ucl_object_todouble(freq_elt);
+ }
+ freq_elt = ucl_object_lookup(elt, "stddev");
+
+ if (freq_elt) {
+ item->st->stddev_frequency = ucl_object_todouble(freq_elt);
+ }
+ }
+
+ if (item->is_virtual() && !(item->type & SYMBOL_TYPE_GHOST)) {
+ g_assert (item->specific.virtual.parent < (gint)cache->items_by_id->len);
+ parent = g_ptr_array_index (cache->items_by_id,
+ item->specific.virtual.parent);
+ item->specific.virtual.parent_item = parent;
+
+ if (parent->st->weight < item->st->weight) {
+ parent->st->weight = item->st->weight;
+ }
+
+ /*
+ * We maintain avg_time for virtual symbols equal to the
+ * parent item avg_time
+ */
+ item->st->avg_time = parent->st->avg_time;
+ }
+
+ cache->total_weight += fabs(item->st->weight);
+ cache->total_hits += item->st->total_hits;
+ }
+ }
+
+ ucl_object_iterate_free(it);
+ ucl_object_unref(top);
+
+ return true;
+}
+
+auto symcache::get_item_by_id(int id, bool resolve_parent) const -> const cache_item_ptr &
+{
+ if (id < 0 || id >= items_by_id.size()) {
+ g_abort();
+ }
+
+ auto &ret = items_by_id[id];
+
+ if (!ret) {
+ g_abort();
+ }
+
+ if (resolve_parent && ret->is_virtual()) {
+ return ret->get_parent(*this);
+ }
+
+ return ret;
+}
+
+
+auto cache_item::get_parent(const symcache &cache) const -> const cache_item_ptr &
+{
+ if (is_virtual()) {
+ const auto &virtual_sp = std::get<virtual_item>(specific);
+
+ return virtual_sp.get_parent()
+ }
+
+ return cache_item_ptr{nullptr};
+}
+
+auto virtual_item::get_parent(const symcache &cache) const -> const cache_item_ptr &
+{
+ if (parent) {
+ return parent;
+ }
+
+ return cache.get_item_by_id(parent_id, false);
+}
+
+} \ No newline at end of file
diff --git a/src/libserver/symcache/symcache_internal.hxx b/src/libserver/symcache/symcache_internal.hxx
new file mode 100644
index 000000000..a1207fc97
--- /dev/null
+++ b/src/libserver/symcache/symcache_internal.hxx
@@ -0,0 +1,385 @@
+/*-
+ * Copyright 2022 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal C++ structures and classes for symcache
+ */
+
+#ifndef RSPAMD_SYMCACHE_INTERNAL_HXX
+#define RSPAMD_SYMCACHE_INTERNAL_HXX
+#pragma once
+
+#include <cmath>
+#include <cstdlib>
+#include <cstdint>
+#include <vector>
+#include <string>
+#include <string_view>
+#include <memory>
+#include <variant>
+#include "contrib/robin-hood/robin_hood.h"
+
+#include "cfg_file.h"
+#include "lua/lua_common.h"
+
+#define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
+ cache->static_pool->tag.tagname, cache->cfg->checksum, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
+ static_pool->tag.tagname, cfg->checksum, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_info_cache(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
+ static_pool->tag.tagname, cfg->checksum, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_debug_cache(...) rspamd_conditional_debug_fast (NULL, NULL, \
+ rspamd_symcache_log_id, "symcache", cfg->checksum, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \
+ rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+
+namespace rspamd::symcache {
+
+/* Defined in symcache_impl.cxx */
+extern int rspamd_symcache_log_id;
+
+static const std::uint8_t symcache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0};
+
+struct symcache_header {
+ std::uint8_t magic[8];
+ unsigned int nitems;
+ std::uint8_t checksum[64];
+ std::uint8_t unused[128];
+};
+
+struct cache_item;
+using cache_item_ptr = std::shared_ptr<cache_item>;
+using cache_item_weak_ptr = std::weak_ptr<cache_item>;
+
+struct order_generation {
+ std::vector<cache_item_weak_ptr> d;
+ unsigned int generation_id;
+};
+
+using order_generation_ptr = std::shared_ptr<order_generation>;
+
+/*
+ * This structure is optimised to store ids list:
+ * - If the first element is -1 then use dynamic part, else use static part
+ * There is no std::variant to save space
+ */
+struct id_list {
+ union {
+ std::uint32_t st[4];
+ struct {
+ std::uint32_t e; /* First element */
+ std::uint16_t len;
+ std::uint16_t allocated;
+ std::uint32_t *n;
+ } dyn;
+ } data;
+
+ id_list() {
+ std::memset((void *)&data, 0, sizeof(data));
+ }
+ /**
+ * Returns ids from a compressed list, accepting a mutable reference for number of elements
+ * @param nids output of the number of elements
+ * @return
+ */
+ auto get_ids(std::size_t &nids) const -> const std::uint32_t * {
+ if (data.dyn.e == -1) {
+ /* Dynamic list */
+ nids = data.dyn.len;
+
+ return data.dyn.n;
+ }
+ else {
+ auto cnt = 0;
+
+ while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS(data.st)) {
+ cnt ++;
+ }
+
+ nids = cnt;
+
+ return data.st;
+ }
+ }
+
+ auto add_id(std::uint32_t id, rspamd_mempool_t *pool) -> void {
+ if (data.st[0] == -1) {
+ /* Dynamic array */
+ if (data.dyn.len < data.dyn.allocated) {
+ /* Trivial, append + sort */
+ data.dyn.n[data.dyn.len++] = id;
+ }
+ else {
+ /* Reallocate */
+ g_assert (data.dyn.allocated <= G_MAXINT16);
+ data.dyn.allocated *= 2;
+
+ auto *new_array = rspamd_mempool_alloc_array_type(pool,
+ data.dyn.allocated, std::uint32_t);
+ memcpy(new_array, data.dyn.n, data.dyn.len * sizeof(std::uint32_t));
+ data.dyn.n = new_array;
+ data.dyn.n[data.dyn.len++] = id;
+ }
+
+ std::sort(data.dyn.n, data.dyn.n + data.dyn.len);
+ }
+ else {
+ /* Static part */
+ auto cnt = 0u;
+ while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS (data.st)) {
+ cnt ++;
+ }
+
+ if (cnt < G_N_ELEMENTS (data.st)) {
+ data.st[cnt] = id;
+ }
+ else {
+ /* Switch to dynamic */
+ data.dyn.allocated = G_N_ELEMENTS (data.st) * 2;
+ auto *new_array = rspamd_mempool_alloc_array_type(pool,
+ data.dyn.allocated, std::uint32_t);
+ memcpy (new_array, data.st, sizeof(data.st));
+ data.dyn.n = new_array;
+ data.dyn.e = -1; /* Marker */
+ data.dyn.len = G_N_ELEMENTS (data.st);
+
+ /* Recursively jump to dynamic branch that will handle insertion + sorting */
+ add_id(id, pool); // tail call
+ }
+ }
+ }
+};
+
+class symcache;
+
+struct item_condition {
+private:
+ gint cb;
+ lua_State *L;
+public:
+ item_condition() {
+ // TODO
+ }
+ virtual ~item_condition() {
+ // TODO
+ }
+};
+
+class normal_item {
+private:
+ symbol_func_t func;
+ void *user_data;
+ std::vector<item_condition> conditions;
+public:
+ explicit normal_item() {
+ // TODO
+ }
+ auto add_condition() -> void {
+ // TODO
+ }
+ auto call() -> void {
+ // TODO
+ }
+};
+
+class virtual_item {
+private:
+ int parent_id;
+ cache_item_ptr parent;
+public:
+ explicit virtual_item() {
+ // TODO
+ }
+
+ auto get_parent(const symcache &cache) const -> const cache_item_ptr&;
+};
+
+struct cache_item {
+ /* This block is likely shared */
+ struct rspamd_symcache_item_stat *st;
+ struct rspamd_counter_data *cd;
+
+ std::uint64_t last_count;
+ std::string symbol;
+ std::string_view type_descr;
+ int type;
+
+ /* Callback data */
+ std::variant<normal_item, virtual_item> specific;
+
+ /* Condition of execution */
+ bool enabled;
+
+ /* Priority */
+ int priority;
+ /* Topological order */
+ unsigned int order;
+ /* Unique id - counter */
+ int id;
+
+ int frequency_peaks;
+ /* Settings ids */
+ id_list allowed_ids;
+ /* Allows execution but not symbols insertion */
+ id_list exec_only_ids;
+ id_list forbidden_ids;
+
+ /* Dependencies */
+ std::vector<cache_item_ptr> deps;
+ /* Reverse dependencies */
+ std::vector<cache_item_ptr> rdeps;
+
+ auto is_virtual() const -> bool { return std::holds_alternative<virtual_item>(specific); }
+ auto get_parent(const symcache &cache) const -> const cache_item_ptr &;
+};
+
+struct delayed_cache_dependency {
+ std::string from;
+ std::string to;
+};
+
+struct delayed_cache_condition {
+ std::string sym;
+ int cbref;
+ lua_State *L;
+};
+
+class symcache {
+private:
+ /* Map indexed by symbol name: all symbols must have unique names, so this map holds ownership */
+ robin_hood::unordered_flat_map<std::string_view, cache_item_ptr> items_by_symbol;
+ std::vector<cache_item_ptr> items_by_id;
+
+ /* Items sorted into some order */
+ order_generation_ptr items_by_order;
+ unsigned int cur_order_gen;
+
+ std::vector<cache_item_ptr> connfilters;
+ std::vector<cache_item_ptr> prefilters;
+ std::vector<cache_item_ptr> filters;
+ std::vector<cache_item_ptr> postfilters;
+ std::vector<cache_item_ptr> composites;
+ std::vector<cache_item_ptr> idempotent;
+ std::vector<cache_item_ptr> virtual_symbols;
+
+ /* These are stored within pointer to clean up after init */
+ std::unique_ptr<std::vector<delayed_cache_dependency>> delayed_deps;
+ std::unique_ptr<std::vector<delayed_cache_condition>> delayed_conditions;
+
+ rspamd_mempool_t *static_pool;
+ std::uint64_t cksum;
+ double total_weight;
+ std::size_t used_items;
+ std::size_t stats_symbols_count;
+ std::uint64_t total_hits;
+
+ struct rspamd_config *cfg;
+ lua_State *L;
+ double reload_time;
+ double last_profile;
+ int peak_cb;
+ int cache_id;
+
+private:
+ /* Internal methods */
+ auto load_items() -> bool;
+
+public:
+ explicit symcache(struct rspamd_config *cfg) : cfg(cfg) {
+ /* XXX: do we need a special pool for symcache? I don't think so */
+ static_pool = cfg->cfg_pool;
+ reload_time = cfg->cache_reload_time;
+ total_hits = 1;
+ total_weight = 1.0;
+ cksum = 0xdeadbabe;
+ peak_cb = -1;
+ cache_id = rspamd_random_uint64_fast();
+ L = (lua_State *)cfg->lua_state;
+ }
+
+ virtual ~symcache() {
+ if (peak_cb != -1) {
+ luaL_unref(L, LUA_REGISTRYINDEX, peak_cb);
+ }
+ }
+
+ auto get_item_by_id(int id, bool resolve_parent) const -> const cache_item_ptr &;
+
+ /*
+ * Initialises the symbols cache, must be called after all symbols are added
+ * and the config file is loaded
+ */
+ auto init() -> bool;
+};
+
+/*
+ * These items are saved within task structure and are used to track
+ * symbols execution
+ */
+struct cache_dynamic_item {
+ std::uint16_t start_msec; /* Relative to task time */
+ unsigned started: 1;
+ unsigned finished: 1;
+ /* unsigned pad:14; */
+ std::uint32_t async_events;
+};
+
+
+struct cache_dependency {
+ cache_item_ptr item; /* Owning pointer to the real dep */
+ std::string_view sym; /* Symbolic dep name */
+ int id; /* Real from */
+ int vid; /* Virtual from */
+};
+
+struct cache_savepoint {
+ unsigned order_gen;
+ unsigned items_inflight;
+ bool profile;
+ bool has_slow;
+
+ double profile_start;
+ double lim;
+
+ struct rspamd_scan_result *rs;
+
+ struct cache_item *cur_item;
+ order_generation_ptr order;
+ /* Dynamically expanded as needed */
+ struct cache_dynamic_item dynamic_items[];
+};
+
+struct cache_refresh_cbdata {
+ double last_resort;
+ ev_timer resort_ev;
+ symcache *cache;
+ struct rspamd_worker *w;
+ struct ev_loop *event_loop;
+};
+
+} // namespace rspamd
+
+#endif //RSPAMD_SYMCACHE_INTERNAL_HXX
diff --git a/src/libutil/CMakeLists.txt b/src/libutil/CMakeLists.txt
index 5160dfe7b..7b3103720 100644
--- a/src/libutil/CMakeLists.txt
+++ b/src/libutil/CMakeLists.txt
@@ -17,6 +17,7 @@ SET(LIBRSPAMDUTILSRC
${CMAKE_CURRENT_SOURCE_DIR}/util.c
${CMAKE_CURRENT_SOURCE_DIR}/heap.c
${CMAKE_CURRENT_SOURCE_DIR}/multipattern.c
- ${CMAKE_CURRENT_SOURCE_DIR}/cxx/utf8_util.cxx)
+ ${CMAKE_CURRENT_SOURCE_DIR}/cxx/utf8_util.cxx
+ ${CMAKE_CURRENT_SOURCE_DIR}/cxx/locked_file.cxx)
# Rspamdutil
SET(RSPAMD_UTIL ${LIBRSPAMDUTILSRC} PARENT_SCOPE) \ No newline at end of file