Browse Source

[Rework] Rework files structure

tags/3.3
Vsevolod Stakhov 2 years ago
parent
commit
fd9693073d
No account linked to committer's email address

+ 3
- 2
src/libserver/CMakeLists.txt View File

@@ -16,11 +16,12 @@ SET(LIBRSPAMDSERVERSRC
${CMAKE_CURRENT_SOURCE_DIR}/monitored.c
${CMAKE_CURRENT_SOURCE_DIR}/protocol.c
${CMAKE_CURRENT_SOURCE_DIR}/re_cache.c
${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.cxx
${CMAKE_CURRENT_SOURCE_DIR}/redis_pool.cxx
${CMAKE_CURRENT_SOURCE_DIR}/roll_history.c
${CMAKE_CURRENT_SOURCE_DIR}/spf.c
${CMAKE_CURRENT_SOURCE_DIR}/ssl_util.c
${CMAKE_CURRENT_SOURCE_DIR}/rspamd_symcache.cxx
${CMAKE_CURRENT_SOURCE_DIR}/symcache/symcache_impl.cxx
${CMAKE_CURRENT_SOURCE_DIR}/symcache/symcache_c.cxx
${CMAKE_CURRENT_SOURCE_DIR}/task.c
${CMAKE_CURRENT_SOURCE_DIR}/url.c
${CMAKE_CURRENT_SOURCE_DIR}/worker_util.c

+ 0
- 616
src/libserver/rspamd_symcache.cxx View File

@@ -36,26 +36,6 @@

#include "contrib/robin-hood/robin_hood.h"

#define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
cache->static_pool->tag.tagname, cache->cfg->checksum, \
G_STRFUNC, \
__VA_ARGS__)
#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
cache->static_pool->tag.tagname, cache->cfg->checksum, \
G_STRFUNC, \
__VA_ARGS__)
#define msg_info_cache(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
cache->static_pool->tag.tagname, cache->cfg->checksum, \
G_STRFUNC, \
__VA_ARGS__)
#define msg_debug_cache(...) rspamd_conditional_debug_fast (NULL, NULL, \
rspamd_symcache_log_id, "symcache", cache->cfg->checksum, \
G_STRFUNC, \
__VA_ARGS__)
#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \
rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \
G_STRFUNC, \
__VA_ARGS__)

INIT_LOG_MODULE(symcache)

@@ -73,311 +53,7 @@ INIT_LOG_MODULE(symcache)
#define CLR_FINISH_BIT(checkpoint, dyn_item) \
(dyn_item)->finished = 0

namespace rspamd::symcache {

static const std::uint8_t rspamd_symcache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0};

struct rspamd_symcache_header {
std::uint8_t magic[8];
unsigned int nitems;
std::uint8_t checksum[64];
std::uint8_t unused[128];
};

struct cache_item;
using cache_item_ptr = std::shared_ptr<cache_item>;
using cache_item_weak_ptr = std::weak_ptr<cache_item>;

struct order_generation {
std::vector<cache_item_weak_ptr> d;
unsigned int generation_id;
};

using order_generation_ptr = std::shared_ptr<order_generation>;

/*
* This structure is optimised to store ids list:
* - If the first element is -1 then use dynamic part, else use static part
* There is no std::variant to save space
*/
struct id_list {
union {
std::uint32_t st[4];
struct {
std::uint32_t e; /* First element */
std::uint16_t len;
std::uint16_t allocated;
std::uint32_t *n;
} dyn;
} data;

id_list() {
std::memset((void *)&data, 0, sizeof(data));
}
/**
* Returns ids from a compressed list, accepting a mutable reference for number of elements
* @param nids output of the number of elements
* @return
*/
auto get_ids(std::size_t &nids) const -> const std::uint32_t * {
if (data.dyn.e == -1) {
/* Dynamic list */
nids = data.dyn.len;

return data.dyn.n;
}
else {
auto cnt = 0;

while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS(data.st)) {
cnt ++;
}

nids = cnt;

return data.st;
}
}

auto add_id(std::uint32_t id, rspamd_mempool_t *pool) -> void {
if (data.st[0] == -1) {
/* Dynamic array */
if (data.dyn.len < data.dyn.allocated) {
/* Trivial, append + sort */
data.dyn.n[data.dyn.len++] = id;
}
else {
/* Reallocate */
g_assert (data.dyn.allocated <= G_MAXINT16);
data.dyn.allocated *= 2;

auto *new_array = rspamd_mempool_alloc_array_type(pool,
data.dyn.allocated, std::uint32_t);
memcpy(new_array, data.dyn.n, data.dyn.len * sizeof(std::uint32_t));
data.dyn.n = new_array;
data.dyn.n[data.dyn.len++] = id;
}

std::sort(data.dyn.n, data.dyn.n + data.dyn.len);
}
else {
/* Static part */
auto cnt = 0u;
while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS (data.st)) {
cnt ++;
}

if (cnt < G_N_ELEMENTS (data.st)) {
data.st[cnt] = id;
}
else {
/* Switch to dynamic */
data.dyn.allocated = G_N_ELEMENTS (data.st) * 2;
auto *new_array = rspamd_mempool_alloc_array_type(pool,
data.dyn.allocated, std::uint32_t);
memcpy (new_array, data.st, sizeof(data.st));
data.dyn.n = new_array;
data.dyn.e = -1; /* Marker */
data.dyn.len = G_N_ELEMENTS (data.st);

/* Recursively jump to dynamic branch that will handle insertion + sorting */
add_id(id, pool); // tail call
}
}
}
};

struct item_condition {
private:
gint cb;
lua_State *L;
public:
item_condition() {
// TODO
}
virtual ~item_condition() {
// TODO
}
};

class normal_item {
private:
symbol_func_t func;
void *user_data;
std::vector<item_condition> conditions;
public:
explicit normal_item() {
// TODO
}
auto add_condition() -> void {
// TODO
}
auto call() -> void {
// TODO
}
};

class virtual_item {
private:
int parent_id;
cache_item_ptr parent;
public:
explicit virtual_item() {
// TODO
}
};

struct cache_item {
/* This block is likely shared */
struct rspamd_symcache_item_stat *st;
struct rspamd_counter_data *cd;

std::uint64_t last_count;
std::string symbol;
std::string_view type_descr;
int type;

/* Callback data */
std::variant<normal_item, virtual_item> specific;

/* Condition of execution */
bool enabled;

/* Priority */
int priority;
/* Topological order */
unsigned int order;
/* Unique id - counter */
int id;

int frequency_peaks;
/* Settings ids */
id_list allowed_ids;
/* Allows execution but not symbols insertion */
id_list exec_only_ids;
id_list forbidden_ids;

/* Dependencies */
std::vector<cache_item_ptr> deps;
/* Reverse dependencies */
std::vector<cache_item_ptr> rdeps;
};

struct delayed_cache_dependency {
std::string from;
std::string to;
};

struct delayed_cache_condition {
std::string sym;
int cbref;
lua_State *L;
};

struct symcache {
/* Map indexed by symbol name: all symbols must have unique names, so this map holds ownership */
robin_hood::unordered_flat_map<std::string_view, cache_item_ptr> items_by_symbol;
std::vector<cache_item_weak_ptr> items_by_id;

/* Items sorted into some order */
order_generation_ptr items_by_order;
unsigned int cur_order_gen;

std::vector<cache_item_weak_ptr> connfilters;
std::vector<cache_item_weak_ptr> prefilters;
std::vector<cache_item_weak_ptr> filters;
std::vector<cache_item_weak_ptr> postfilters;
std::vector<cache_item_weak_ptr> composites;
std::vector<cache_item_weak_ptr> idempotent;
std::vector<cache_item_weak_ptr> virtual_symbols;

/* These are stored within pointer to clean up after init */
std::unique_ptr<std::vector<delayed_cache_dependency>> delayed_deps;
std::unique_ptr<std::vector<delayed_cache_condition>> delayed_conditions;

rspamd_mempool_t *static_pool;
std::uint64_t cksum;
double total_weight;
std::size_t used_items;
std::size_t stats_symbols_count;
std::uint64_t total_hits;

struct rspamd_config *cfg;
lua_State *L;
double reload_time;
double last_profile;
int peak_cb;
int id;

public:
explicit symcache(struct rspamd_config *cfg) : cfg(cfg) {
/* XXX: do we need a special pool for symcache? I don't think so */
static_pool = cfg->cfg_pool;
reload_time = cfg->cache_reload_time;
total_hits = 1;
total_weight = 1.0;
cksum = 0xdeadbabe;
peak_cb = -1;
id = rspamd_random_uint64_fast();
L = (lua_State *)cfg->lua_state;
}

virtual ~symcache() {
if (peak_cb != -1) {
luaL_unref(L, LUA_REGISTRYINDEX, peak_cb);
}
}
};


/*
* These items are saved within task structure and are used to track
* symbols execution
*/
struct cache_dynamic_item {
std::uint16_t start_msec; /* Relative to task time */
unsigned started: 1;
unsigned finished: 1;
/* unsigned pad:14; */
std::uint32_t async_events;
};


struct cache_dependency {
cache_item_ptr item; /* Owning pointer to the real dep */
std::string_view sym; /* Symbolic dep name */
int id; /* Real from */
int vid; /* Virtual from */
};

struct cache_savepoint {
unsigned order_gen;
unsigned items_inflight;
bool profile;
bool has_slow;

double profile_start;
double lim;

struct rspamd_scan_result *rs;

struct cache_item *cur_item;
order_generation_ptr order;
/* Dynamically expanded as needed */
struct cache_dynamic_item dynamic_items[];
};

struct cache_refresh_cbdata {
double last_resort;
ev_timer resort_ev;
struct symcache *cache;
struct rspamd_worker *w;
struct ev_loop *event_loop;
};

} // namespace rspamd

#define C_API_SYMCACHE(ptr) (reinterpret_cast<rspamd::symcache::symcache *>(ptr))

/* At least once per minute */
#define PROFILE_MAX_TIME (60.0)
@@ -831,260 +507,6 @@ rspamd_symcache_process_dep (struct rspamd_symcache *cache,
}
}

/* Sort items in logical order */
static void
rspamd_symcache_post_init (struct rspamd_symcache *cache)
{
struct rspamd_symcache_item *it, *vit;
struct cache_dependency *dep;
struct delayed_cache_dependency *ddep;
struct delayed_cache_condition *dcond;
GList *cur;
gint i, j;

cur = cache->delayed_deps;
while (cur) {
ddep = cur->data;

vit = rspamd_symcache_find_filter (cache, ddep->from, false);
it = rspamd_symcache_find_filter (cache, ddep->from, true);

if (it == NULL || vit == NULL) {
msg_err_cache ("cannot register delayed dependency between %s and %s: "
"%s is missing", ddep->from, ddep->to, ddep->from);
}
else {
msg_debug_cache ("delayed between %s(%d:%d) -> %s", ddep->from,
it->id, vit->id, ddep->to);
rspamd_symcache_add_dependency (cache, it->id, ddep->to, vit != it ?
vit->id : -1);
}

cur = g_list_next (cur);
}

cur = cache->delayed_conditions;
while (cur) {
dcond = cur->data;

it = rspamd_symcache_find_filter (cache, dcond->sym, true);

if (it == NULL) {
msg_err_cache (
"cannot register delayed condition for %s",
dcond->sym);
luaL_unref (dcond->L, LUA_REGISTRYINDEX, dcond->cbref);
}
else {
struct rspamd_symcache_condition *ncond = rspamd_mempool_alloc0 (cache->static_pool,
sizeof (*ncond));
ncond->cb = dcond->cbref;
DL_APPEND (it->specific.normal.conditions, ncond);
}

cur = g_list_next (cur);
}

PTR_ARRAY_FOREACH (cache->items_by_id, i, it) {

PTR_ARRAY_FOREACH (it->deps, j, dep) {
rspamd_symcache_process_dep (cache, it, dep);
}

if (it->deps) {
/* Reversed loop to make removal safe */
for (j = it->deps->len - 1; j >= 0; j--) {
dep = g_ptr_array_index (it->deps, j);

if (dep->item == NULL) {
/* Remove useless dep */
g_ptr_array_remove_index (it->deps, j);
}
}
}
}

/* Special case for virtual symbols */
PTR_ARRAY_FOREACH (cache->virtual, i, it) {

PTR_ARRAY_FOREACH (it->deps, j, dep) {
rspamd_symcache_process_dep (cache, it, dep);
}
}

g_ptr_array_sort_with_data (cache->connfilters, prefilters_cmp, cache);
g_ptr_array_sort_with_data (cache->prefilters, prefilters_cmp, cache);
g_ptr_array_sort_with_data (cache->postfilters, postfilters_cmp, cache);
g_ptr_array_sort_with_data (cache->idempotent, postfilters_cmp, cache);

rspamd_symcache_resort (cache);
}

static gboolean
rspamd_symcache_load_items (struct rspamd_symcache *cache, const gchar *name)
{
struct rspamd_symcache_header *hdr;
struct stat st;
struct ucl_parser *parser;
ucl_object_t *top;
const ucl_object_t *cur, *elt;
ucl_object_iter_t it;
struct rspamd_symcache_item *item, *parent;
const guchar *p;
gint fd;
gpointer map;

fd = open (name, O_RDONLY);

if (fd == -1) {
msg_info_cache ("cannot open file %s, error %d, %s", name,
errno, strerror (errno));
return FALSE;
}

rspamd_file_lock (fd, FALSE);

if (fstat (fd, &st) == -1) {
rspamd_file_unlock (fd, FALSE);
close (fd);
msg_info_cache ("cannot stat file %s, error %d, %s", name,
errno, strerror (errno));
return FALSE;
}

if (st.st_size < (gint)sizeof (*hdr)) {
rspamd_file_unlock (fd, FALSE);
close (fd);
errno = EINVAL;
msg_info_cache ("cannot use file %s, error %d, %s", name,
errno, strerror (errno));
return FALSE;
}

map = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, fd, 0);

if (map == MAP_FAILED) {
rspamd_file_unlock (fd, FALSE);
close (fd);
msg_info_cache ("cannot mmap file %s, error %d, %s", name,
errno, strerror (errno));
return FALSE;
}

hdr = map;

if (memcmp (hdr->magic, rspamd_symcache_magic,
sizeof (rspamd_symcache_magic)) != 0) {
msg_info_cache ("cannot use file %s, bad magic", name);
munmap (map, st.st_size);
rspamd_file_unlock (fd, FALSE);
close (fd);

return FALSE;
}

parser = ucl_parser_new (0);
p = (const guchar *)(hdr + 1);

if (!ucl_parser_add_chunk (parser, p, st.st_size - sizeof (*hdr))) {
msg_info_cache ("cannot use file %s, cannot parse: %s", name,
ucl_parser_get_error (parser));
munmap (map, st.st_size);
ucl_parser_free (parser);
rspamd_file_unlock (fd, FALSE);
close (fd);

return FALSE;
}

top = ucl_parser_get_object (parser);
munmap (map, st.st_size);
rspamd_file_unlock (fd, FALSE);
close (fd);
ucl_parser_free (parser);

if (top == NULL || ucl_object_type (top) != UCL_OBJECT) {
msg_info_cache ("cannot use file %s, bad object", name);
ucl_object_unref (top);
return FALSE;
}

it = ucl_object_iterate_new (top);

while ((cur = ucl_object_iterate_safe (it, true))) {
item = g_hash_table_lookup (cache->items_by_symbol, ucl_object_key (cur));

if (item) {
/* Copy saved info */
/*
* XXX: don't save or load weight, it should be obtained from the
* metric
*/
#if 0
elt = ucl_object_lookup (cur, "weight");

if (elt) {
w = ucl_object_todouble (elt);
if (w != 0) {
item->weight = w;
}
}
#endif
elt = ucl_object_lookup (cur, "time");
if (elt) {
item->st->avg_time = ucl_object_todouble (elt);
}

elt = ucl_object_lookup (cur, "count");
if (elt) {
item->st->total_hits = ucl_object_toint (elt);
item->last_count = item->st->total_hits;
}

elt = ucl_object_lookup (cur, "frequency");
if (elt && ucl_object_type (elt) == UCL_OBJECT) {
const ucl_object_t *freq_elt;

freq_elt = ucl_object_lookup (elt, "avg");

if (freq_elt) {
item->st->avg_frequency = ucl_object_todouble (freq_elt);
}
freq_elt = ucl_object_lookup (elt, "stddev");

if (freq_elt) {
item->st->stddev_frequency = ucl_object_todouble (freq_elt);
}
}

if (item->is_virtual && !(item->type & SYMBOL_TYPE_GHOST)) {
g_assert (item->specific.virtual.parent < (gint)cache->items_by_id->len);
parent = g_ptr_array_index (cache->items_by_id,
item->specific.virtual.parent);
item->specific.virtual.parent_item = parent;

if (parent->st->weight < item->st->weight) {
parent->st->weight = item->st->weight;
}

/*
* We maintain avg_time for virtual symbols equal to the
* parent item avg_time
*/
item->st->avg_time = parent->st->avg_time;
}

cache->total_weight += fabs (item->st->weight);
cache->total_hits += item->st->total_hits;
}
}

ucl_object_iterate_free (it);
ucl_object_unref (top);

return TRUE;
}

#define ROUND_DOUBLE(x) (floor((x) * 100.0) / 100.0)

static gboolean
@@ -1438,21 +860,6 @@ rspamd_symcache_save (struct rspamd_symcache *cache)
}
}

void
rspamd_symcache_destroy (struct rspamd_symcache *cache)
{
auto *real_cache = C_API_SYMCACHE(cache);

delete real_cache;
}

struct rspamd_symcache*
rspamd_symcache_new (struct rspamd_config *cfg)
{
auto *ncache = new rspamd::symcache::symcache(cfg);

return (struct rspamd_symcache*)ncache;
}

static void
rspamd_symcache_metric_connect_cb (gpointer k, gpointer v, gpointer ud)
@@ -1472,30 +879,7 @@ rspamd_symcache_metric_connect_cb (gpointer k, gpointer v, gpointer ud)
}
}

gboolean
rspamd_symcache_init (struct rspamd_symcache *cache)
{
gboolean res = TRUE;

g_assert (cache != NULL);

cache->reload_time = cache->cfg->cache_reload_time;

if (cache->cfg->cache_filename != NULL) {
res = rspamd_symcache_load_items (cache, cache->cfg->cache_filename);
}

rspamd_symcache_post_init (cache);

/* Connect metric symbols with symcache symbols */
if (cache->cfg->symbols) {
g_hash_table_foreach (cache->cfg->symbols,
rspamd_symcache_metric_connect_cb,
cache);
}

return res;
}


static void

+ 48
- 0
src/libserver/symcache/symcache_c.cxx View File

@@ -0,0 +1,48 @@
/*-
* Copyright 2022 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "symcache_internal.hxx"

/**
* C API for symcache
*/

#define C_API_SYMCACHE(ptr) (reinterpret_cast<rspamd::symcache::symcache *>(ptr))
#define C_API_SYMCACHE_ITEM(ptr) (reinterpret_cast<rspamd::symcache::cache_item *>(ptr))

void
rspamd_symcache_destroy (struct rspamd_symcache *cache)
{
auto *real_cache = C_API_SYMCACHE(cache);

delete real_cache;
}

struct rspamd_symcache*
rspamd_symcache_new (struct rspamd_config *cfg)
{
auto *ncache = new rspamd::symcache::symcache(cfg);

return (struct rspamd_symcache*)ncache;
}

gboolean
rspamd_symcache_init (struct rspamd_symcache *cache)
{
auto *real_cache = C_API_SYMCACHE(cache);

return real_cache->init();
}

+ 292
- 0
src/libserver/symcache/symcache_impl.cxx View File

@@ -0,0 +1,292 @@
/*-
* Copyright 2022 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "symcache_internal.hxx"
#include "unix-std.h"
#include "libutil/cxx/locked_file.hxx"

namespace rspamd::symcache {

INIT_LOG_MODULE_PUBLIC(symcache)

auto symcache::init() -> bool
{
auto res = true;
reload_time = cfg->cache_reload_time;

if (cfg->cache_filename != NULL) {
res = load_items();
}

struct rspamd_symcache_item *it, *vit;
struct cache_dependency *dep;
struct delayed_cache_dependency *ddep;
struct delayed_cache_condition *dcond;
GList *cur;
gint i, j;

cur = cache->delayed_deps;
while (cur) {
ddep = cur->data;

vit = rspamd_symcache_find_filter(cache, ddep->from, false);
it = rspamd_symcache_find_filter(cache, ddep->from, true);

if (it == NULL || vit == NULL) {
msg_err_cache ("cannot register delayed dependency between %s and %s: "
"%s is missing", ddep->from, ddep->to, ddep->from);
}
else {
msg_debug_cache ("delayed between %s(%d:%d) -> %s", ddep->from,
it->id, vit->id, ddep->to);
rspamd_symcache_add_dependency(cache, it->id, ddep->to, vit != it ?
vit->id : -1);
}

cur = g_list_next (cur);
}

cur = cache->delayed_conditions;
while (cur) {
dcond = cur->data;

it = rspamd_symcache_find_filter(cache, dcond->sym, true);

if (it == NULL) {
msg_err_cache (
"cannot register delayed condition for %s",
dcond->sym);
luaL_unref(dcond->L, LUA_REGISTRYINDEX, dcond->cbref);
}
else {
struct rspamd_symcache_condition *ncond = rspamd_mempool_alloc0 (cache->static_pool,
sizeof(*ncond));
ncond->cb = dcond->cbref;
DL_APPEND(it->specific.normal.conditions, ncond);
}

cur = g_list_next (cur);
}

PTR_ARRAY_FOREACH (cache->items_by_id, i, it) {

PTR_ARRAY_FOREACH (it->deps, j, dep) {
rspamd_symcache_process_dep(cache, it, dep);
}

if (it->deps) {
/* Reversed loop to make removal safe */
for (j = it->deps->len - 1; j >= 0; j--) {
dep = g_ptr_array_index (it->deps, j);

if (dep->item == NULL) {
/* Remove useless dep */
g_ptr_array_remove_index(it->deps, j);
}
}
}
}

/* Special case for virtual symbols */
PTR_ARRAY_FOREACH (cache->virtual, i, it) {

PTR_ARRAY_FOREACH (it->deps, j, dep) {
rspamd_symcache_process_dep(cache, it, dep);
}
}

g_ptr_array_sort_with_data(cache->connfilters, prefilters_cmp, cache);
g_ptr_array_sort_with_data(cache->prefilters, prefilters_cmp, cache);
g_ptr_array_sort_with_data(cache->postfilters, postfilters_cmp, cache);
g_ptr_array_sort_with_data(cache->idempotent, postfilters_cmp, cache);

rspamd_symcache_resort(cache);

/* Connect metric symbols with symcache symbols */
if (cache->cfg->symbols) {
g_hash_table_foreach(cache->cfg->symbols,
rspamd_symcache_metric_connect_cb,
cache);
}

return res;
}

auto symcache::load_items() -> bool
{
auto cached_map = util::raii_mmaped_locked_file::mmap_shared(cfg->cache_filename,
O_RDONLY, PROT_READ);

if (!cached_map.has_value()) {
msg_info_cache("%s", cached_map.error().c_str());
return false;
}


if (cached_map->get_size() < (gint) sizeof(symcache_header)) {
msg_info_cache("cannot use file %s, truncated: %z", cfg->cache_filename, ,
errno, strerror(errno));
return false;
}

const auto *hdr = (struct symcache_header *)cached_map->get_map();

if (memcmp(hdr->magic, symcache_magic,
sizeof(symcache_magic)) != 0) {
msg_info_cache("cannot use file %s, bad magic", cfg->cache_filename);

return false;
}

auto *parser = ucl_parser_new(0);
const auto *p = (const std::uint8_t *)(hdr + 1);

if (!ucl_parser_add_chunk(parser, p, cached_map->get_size() - sizeof(*hdr))) {
msg_info_cache ("cannot use file %s, cannot parse: %s", cfg->cache_filename,
ucl_parser_get_error(parser));
ucl_parser_free(parser);

return false;
}

auto *top = ucl_parser_get_object(parser);
ucl_parser_free(parser);

if (top == nullptr || ucl_object_type(top) != UCL_OBJECT) {
msg_info_cache ("cannot use file %s, bad object", cfg->cache_filename);
ucl_object_unref(top);

return false;
}

auto it = ucl_object_iterate_new(top);
const ucl_object_t *cur;
while ((cur = ucl_object_iterate_safe(it, true)) != nullptr) {
auto item_it = items_by_symbol.find(ucl_object_key(cur));

if (item_it != items_by_symbol.end()) {
auto item = item_it->second;
/* Copy saved info */
/*
* XXX: don't save or load weight, it should be obtained from the
* metric
*/
#if 0
elt = ucl_object_lookup (cur, "weight");

if (elt) {
w = ucl_object_todouble (elt);
if (w != 0) {
item->weight = w;
}
}
#endif
const auto *elt = ucl_object_lookup(cur, "time");
if (elt) {
item->st->avg_time = ucl_object_todouble(elt);
}

elt = ucl_object_lookup(cur, "count");
if (elt) {
item->st->total_hits = ucl_object_toint(elt);
item->last_count = item->st->total_hits;
}

elt = ucl_object_lookup(cur, "frequency");
if (elt && ucl_object_type(elt) == UCL_OBJECT) {
const ucl_object_t *freq_elt;

freq_elt = ucl_object_lookup(elt, "avg");

if (freq_elt) {
item->st->avg_frequency = ucl_object_todouble(freq_elt);
}
freq_elt = ucl_object_lookup(elt, "stddev");

if (freq_elt) {
item->st->stddev_frequency = ucl_object_todouble(freq_elt);
}
}

if (item->is_virtual() && !(item->type & SYMBOL_TYPE_GHOST)) {
g_assert (item->specific.virtual.parent < (gint)cache->items_by_id->len);
parent = g_ptr_array_index (cache->items_by_id,
item->specific.virtual.parent);
item->specific.virtual.parent_item = parent;

if (parent->st->weight < item->st->weight) {
parent->st->weight = item->st->weight;
}

/*
* We maintain avg_time for virtual symbols equal to the
* parent item avg_time
*/
item->st->avg_time = parent->st->avg_time;
}

cache->total_weight += fabs(item->st->weight);
cache->total_hits += item->st->total_hits;
}
}

ucl_object_iterate_free(it);
ucl_object_unref(top);

return true;
}

auto symcache::get_item_by_id(int id, bool resolve_parent) const -> const cache_item_ptr &
{
if (id < 0 || id >= items_by_id.size()) {
g_abort();
}

auto &ret = items_by_id[id];

if (!ret) {
g_abort();
}

if (resolve_parent && ret->is_virtual()) {
return ret->get_parent(*this);
}

return ret;
}


auto cache_item::get_parent(const symcache &cache) const -> const cache_item_ptr &
{
if (is_virtual()) {
const auto &virtual_sp = std::get<virtual_item>(specific);

return virtual_sp.get_parent()
}

return cache_item_ptr{nullptr};
}

auto virtual_item::get_parent(const symcache &cache) const -> const cache_item_ptr &
{
if (parent) {
return parent;
}

return cache.get_item_by_id(parent_id, false);
}

}

+ 385
- 0
src/libserver/symcache/symcache_internal.hxx View File

@@ -0,0 +1,385 @@
/*-
* Copyright 2022 Vsevolod Stakhov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Internal C++ structures and classes for symcache
*/

#ifndef RSPAMD_SYMCACHE_INTERNAL_HXX
#define RSPAMD_SYMCACHE_INTERNAL_HXX
#pragma once

#include <cmath>
#include <cstdlib>
#include <cstdint>
#include <vector>
#include <string>
#include <string_view>
#include <memory>
#include <variant>
#include "contrib/robin-hood/robin_hood.h"

#include "cfg_file.h"
#include "lua/lua_common.h"

#define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
cache->static_pool->tag.tagname, cache->cfg->checksum, \
G_STRFUNC, \
__VA_ARGS__)
#define msg_warn_cache(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
static_pool->tag.tagname, cfg->checksum, \
G_STRFUNC, \
__VA_ARGS__)
#define msg_info_cache(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
static_pool->tag.tagname, cfg->checksum, \
G_STRFUNC, \
__VA_ARGS__)
#define msg_debug_cache(...) rspamd_conditional_debug_fast (NULL, NULL, \
rspamd_symcache_log_id, "symcache", cfg->checksum, \
G_STRFUNC, \
__VA_ARGS__)
#define msg_debug_cache_task(...) rspamd_conditional_debug_fast (NULL, NULL, \
rspamd_symcache_log_id, "symcache", task->task_pool->tag.uid, \
G_STRFUNC, \
__VA_ARGS__)

namespace rspamd::symcache {

/* Defined in symcache_impl.cxx */
extern int rspamd_symcache_log_id;

static const std::uint8_t symcache_magic[8] = {'r', 's', 'c', 2, 0, 0, 0, 0};

struct symcache_header {
std::uint8_t magic[8];
unsigned int nitems;
std::uint8_t checksum[64];
std::uint8_t unused[128];
};

struct cache_item;
using cache_item_ptr = std::shared_ptr<cache_item>;
using cache_item_weak_ptr = std::weak_ptr<cache_item>;

struct order_generation {
std::vector<cache_item_weak_ptr> d;
unsigned int generation_id;
};

using order_generation_ptr = std::shared_ptr<order_generation>;

/*
* This structure is optimised to store ids list:
* - If the first element is -1 then use dynamic part, else use static part
* There is no std::variant to save space
*/
struct id_list {
union {
std::uint32_t st[4];
struct {
std::uint32_t e; /* First element */
std::uint16_t len;
std::uint16_t allocated;
std::uint32_t *n;
} dyn;
} data;

id_list() {
std::memset((void *)&data, 0, sizeof(data));
}
/**
* Returns ids from a compressed list, accepting a mutable reference for number of elements
* @param nids output of the number of elements
* @return
*/
auto get_ids(std::size_t &nids) const -> const std::uint32_t * {
if (data.dyn.e == -1) {
/* Dynamic list */
nids = data.dyn.len;

return data.dyn.n;
}
else {
auto cnt = 0;

while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS(data.st)) {
cnt ++;
}

nids = cnt;

return data.st;
}
}

auto add_id(std::uint32_t id, rspamd_mempool_t *pool) -> void {
if (data.st[0] == -1) {
/* Dynamic array */
if (data.dyn.len < data.dyn.allocated) {
/* Trivial, append + sort */
data.dyn.n[data.dyn.len++] = id;
}
else {
/* Reallocate */
g_assert (data.dyn.allocated <= G_MAXINT16);
data.dyn.allocated *= 2;

auto *new_array = rspamd_mempool_alloc_array_type(pool,
data.dyn.allocated, std::uint32_t);
memcpy(new_array, data.dyn.n, data.dyn.len * sizeof(std::uint32_t));
data.dyn.n = new_array;
data.dyn.n[data.dyn.len++] = id;
}

std::sort(data.dyn.n, data.dyn.n + data.dyn.len);
}
else {
/* Static part */
auto cnt = 0u;
while (data.st[cnt] != 0 && cnt < G_N_ELEMENTS (data.st)) {
cnt ++;
}

if (cnt < G_N_ELEMENTS (data.st)) {
data.st[cnt] = id;
}
else {
/* Switch to dynamic */
data.dyn.allocated = G_N_ELEMENTS (data.st) * 2;
auto *new_array = rspamd_mempool_alloc_array_type(pool,
data.dyn.allocated, std::uint32_t);
memcpy (new_array, data.st, sizeof(data.st));
data.dyn.n = new_array;
data.dyn.e = -1; /* Marker */
data.dyn.len = G_N_ELEMENTS (data.st);

/* Recursively jump to dynamic branch that will handle insertion + sorting */
add_id(id, pool); // tail call
}
}
}
};

class symcache;

struct item_condition {
private:
gint cb;
lua_State *L;
public:
item_condition() {
// TODO
}
virtual ~item_condition() {
// TODO
}
};

class normal_item {
private:
symbol_func_t func;
void *user_data;
std::vector<item_condition> conditions;
public:
explicit normal_item() {
// TODO
}
auto add_condition() -> void {
// TODO
}
auto call() -> void {
// TODO
}
};

class virtual_item {
private:
int parent_id;
cache_item_ptr parent;
public:
explicit virtual_item() {
// TODO
}

auto get_parent(const symcache &cache) const -> const cache_item_ptr&;
};

struct cache_item {
/* This block is likely shared */
struct rspamd_symcache_item_stat *st;
struct rspamd_counter_data *cd;

std::uint64_t last_count;
std::string symbol;
std::string_view type_descr;
int type;

/* Callback data */
std::variant<normal_item, virtual_item> specific;

/* Condition of execution */
bool enabled;

/* Priority */
int priority;
/* Topological order */
unsigned int order;
/* Unique id - counter */
int id;

int frequency_peaks;
/* Settings ids */
id_list allowed_ids;
/* Allows execution but not symbols insertion */
id_list exec_only_ids;
id_list forbidden_ids;

/* Dependencies */
std::vector<cache_item_ptr> deps;
/* Reverse dependencies */
std::vector<cache_item_ptr> rdeps;

auto is_virtual() const -> bool { return std::holds_alternative<virtual_item>(specific); }
auto get_parent(const symcache &cache) const -> const cache_item_ptr &;
};

struct delayed_cache_dependency {
std::string from;
std::string to;
};

struct delayed_cache_condition {
std::string sym;
int cbref;
lua_State *L;
};

class symcache {
private:
/* Map indexed by symbol name: all symbols must have unique names, so this map holds ownership */
robin_hood::unordered_flat_map<std::string_view, cache_item_ptr> items_by_symbol;
std::vector<cache_item_ptr> items_by_id;

/* Items sorted into some order */
order_generation_ptr items_by_order;
unsigned int cur_order_gen;

std::vector<cache_item_ptr> connfilters;
std::vector<cache_item_ptr> prefilters;
std::vector<cache_item_ptr> filters;
std::vector<cache_item_ptr> postfilters;
std::vector<cache_item_ptr> composites;
std::vector<cache_item_ptr> idempotent;
std::vector<cache_item_ptr> virtual_symbols;

/* These are stored within pointer to clean up after init */
std::unique_ptr<std::vector<delayed_cache_dependency>> delayed_deps;
std::unique_ptr<std::vector<delayed_cache_condition>> delayed_conditions;

rspamd_mempool_t *static_pool;
std::uint64_t cksum;
double total_weight;
std::size_t used_items;
std::size_t stats_symbols_count;
std::uint64_t total_hits;

struct rspamd_config *cfg;
lua_State *L;
double reload_time;
double last_profile;
int peak_cb;
int cache_id;

private:
/* Internal methods */
auto load_items() -> bool;

public:
explicit symcache(struct rspamd_config *cfg) : cfg(cfg) {
/* XXX: do we need a special pool for symcache? I don't think so */
static_pool = cfg->cfg_pool;
reload_time = cfg->cache_reload_time;
total_hits = 1;
total_weight = 1.0;
cksum = 0xdeadbabe;
peak_cb = -1;
cache_id = rspamd_random_uint64_fast();
L = (lua_State *)cfg->lua_state;
}

virtual ~symcache() {
if (peak_cb != -1) {
luaL_unref(L, LUA_REGISTRYINDEX, peak_cb);
}
}

auto get_item_by_id(int id, bool resolve_parent) const -> const cache_item_ptr &;

/*
* Initialises the symbols cache, must be called after all symbols are added
* and the config file is loaded
*/
auto init() -> bool;
};

/*
* These items are saved within task structure and are used to track
* symbols execution
*/
struct cache_dynamic_item {
std::uint16_t start_msec; /* Relative to task time */
unsigned started: 1;
unsigned finished: 1;
/* unsigned pad:14; */
std::uint32_t async_events;
};


struct cache_dependency {
cache_item_ptr item; /* Owning pointer to the real dep */
std::string_view sym; /* Symbolic dep name */
int id; /* Real from */
int vid; /* Virtual from */
};

struct cache_savepoint {
unsigned order_gen;
unsigned items_inflight;
bool profile;
bool has_slow;

double profile_start;
double lim;

struct rspamd_scan_result *rs;

struct cache_item *cur_item;
order_generation_ptr order;
/* Dynamically expanded as needed */
struct cache_dynamic_item dynamic_items[];
};

struct cache_refresh_cbdata {
double last_resort;
ev_timer resort_ev;
symcache *cache;
struct rspamd_worker *w;
struct ev_loop *event_loop;
};

} // namespace rspamd

#endif //RSPAMD_SYMCACHE_INTERNAL_HXX

+ 2
- 1
src/libutil/CMakeLists.txt View File

@@ -17,6 +17,7 @@ SET(LIBRSPAMDUTILSRC
${CMAKE_CURRENT_SOURCE_DIR}/util.c
${CMAKE_CURRENT_SOURCE_DIR}/heap.c
${CMAKE_CURRENT_SOURCE_DIR}/multipattern.c
${CMAKE_CURRENT_SOURCE_DIR}/cxx/utf8_util.cxx)
${CMAKE_CURRENT_SOURCE_DIR}/cxx/utf8_util.cxx
${CMAKE_CURRENT_SOURCE_DIR}/cxx/locked_file.cxx)
# Rspamdutil
SET(RSPAMD_UTIL ${LIBRSPAMDUTILSRC} PARENT_SCOPE)

Loading…
Cancel
Save