diff options
author | Vsevolod Stakhov <vsevolod@rspamd.com> | 2022-06-11 13:24:50 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rspamd.com> | 2022-06-11 13:24:50 +0100 |
commit | 57d60a6c7974da4604a2158c407c25251f070d7d (patch) | |
tree | bee8c78ed7b70ad93b820c672a846ca53c293d0c /src/libstat | |
parent | 48297adf9639ed9711c55560a26a09bee3da9dd7 (diff) | |
download | rspamd-57d60a6c7974da4604a2158c407c25251f070d7d.tar.gz rspamd-57d60a6c7974da4604a2158c407c25251f070d7d.zip |
[Project] Add experimental HTTP statistics backend
Diffstat (limited to 'src/libstat')
-rw-r--r-- | src/libstat/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/libstat/backends/backends.h | 5 | ||||
-rw-r--r-- | src/libstat/backends/cdb_backend.cxx | 4 | ||||
-rw-r--r-- | src/libstat/backends/http_backend.cxx | 366 |
4 files changed, 372 insertions, 4 deletions
diff --git a/src/libstat/CMakeLists.txt b/src/libstat/CMakeLists.txt index 19962239d..b1df5c1e6 100644 --- a/src/libstat/CMakeLists.txt +++ b/src/libstat/CMakeLists.txt @@ -11,6 +11,7 @@ SET(CLASSIFIERSSRC ${CMAKE_CURRENT_SOURCE_DIR}/classifiers/bayes.c SET(BACKENDSSRC ${CMAKE_CURRENT_SOURCE_DIR}/backends/mmaped_file.c ${CMAKE_CURRENT_SOURCE_DIR}/backends/sqlite3_backend.c ${CMAKE_CURRENT_SOURCE_DIR}/backends/cdb_backend.cxx + ${CMAKE_CURRENT_SOURCE_DIR}/backends/http_backend.cxx ${CMAKE_CURRENT_SOURCE_DIR}/backends/redis_backend.c) SET(CACHESSRC ${CMAKE_CURRENT_SOURCE_DIR}/learn_cache/sqlite3_cache.c diff --git a/src/libstat/backends/backends.h b/src/libstat/backends/backends.h index 4c0b2276b..afb408a1a 100644 --- a/src/libstat/backends/backends.h +++ b/src/libstat/backends/backends.h @@ -84,13 +84,13 @@ struct rspamd_stat_backend { gboolean learn, gpointer ctx); \ gboolean rspamd_##name##_process_tokens (struct rspamd_task *task, \ GPtrArray *tokens, gint id, \ - gpointer ctx); \ + gpointer runtime); \ gboolean rspamd_##name##_finalize_process (struct rspamd_task *task, \ gpointer runtime, \ gpointer ctx); \ gboolean rspamd_##name##_learn_tokens (struct rspamd_task *task, \ GPtrArray *tokens, gint id, \ - gpointer ctx); \ + gpointer runtime); \ gboolean rspamd_##name##_finalize_learn (struct rspamd_task *task, \ gpointer runtime, \ gpointer ctx, GError **err); \ @@ -116,6 +116,7 @@ RSPAMD_STAT_BACKEND_DEF(mmaped_file); RSPAMD_STAT_BACKEND_DEF(sqlite3); RSPAMD_STAT_BACKEND_DEF(cdb); RSPAMD_STAT_BACKEND_DEF(redis); +RSPAMD_STAT_BACKEND_DEF(http); #ifdef __cplusplus } diff --git a/src/libstat/backends/cdb_backend.cxx b/src/libstat/backends/cdb_backend.cxx index 15c3d3035..590cddafa 100644 --- a/src/libstat/backends/cdb_backend.cxx +++ b/src/libstat/backends/cdb_backend.cxx @@ -377,9 +377,9 @@ gboolean rspamd_cdb_process_tokens(struct rspamd_task* task, GPtrArray* tokens, gint id, - gpointer ctx) + gpointer runtime) { - auto *cdbp = CDB_FROM_RAW(ctx); + auto *cdbp = CDB_FROM_RAW(runtime); bool seen_values = false; for (auto i = 0u; i < tokens->len; i++) { diff --git a/src/libstat/backends/http_backend.cxx b/src/libstat/backends/http_backend.cxx new file mode 100644 index 000000000..bd3fd1d48 --- /dev/null +++ b/src/libstat/backends/http_backend.cxx @@ -0,0 +1,366 @@ +/*- + * Copyright 2022 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "config.h" +#include "stat_internal.h" +#include "libserver/http/http_connection.h" +#include "libserver/mempool_vars_internal.h" +#include "upstream.h" +#include "contrib/robin-hood/robin_hood.h" +#include <vector> + +namespace rspamd::stat::http { + +#define msg_debug_stat_http(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_stat_http_log_id, "stat_http", task->task_pool->tag.uid, \ + RSPAMD_LOG_FUNC, \ + __VA_ARGS__) + +INIT_LOG_MODULE(stat_http) + +/* Represents all http backends defined in some configuration */ +class http_backends_collection { + std::vector<struct rspamd_statfile *> backends; + double timeout = 1.0; /* Default timeout */ + struct upstream_list *read_servers = nullptr; + struct upstream_list *write_servers = nullptr; +public: + static auto get() -> http_backends_collection& { + static http_backends_collection *singleton = nullptr; + + if (singleton == nullptr) { + singleton = new http_backends_collection; + } + + return *singleton; + } + + /** + * Add a new backend and (optionally initialize the basic backend parameters + * @param ctx + * @param cfg + * @param st + * @return + */ + auto add_backend(struct rspamd_stat_ctx *ctx, + struct rspamd_config *cfg, + struct rspamd_statfile *st) -> bool; + /** + * Remove a statfile cleaning things up if the last statfile is removed + * @param st + * @return + */ + auto remove_backend(struct rspamd_statfile *st) -> bool; + + upstream *get_upstream(bool is_learn); + +private: + http_backends_collection() = default; + auto first_init(struct rspamd_stat_ctx *ctx, + struct rspamd_config *cfg, + struct rspamd_statfile *st) -> bool; +}; + +/* + * Created one per each task + */ +class http_backend_runtime final { +public: + static auto create(struct rspamd_task *task, bool is_learn) -> http_backend_runtime *; +private: + http_backends_collection *all_backends; + robin_hood::unordered_flat_map<int, struct rspamd_statfile *> seen_statfiles; + struct upstream *selected; +private: + http_backend_runtime(struct rspamd_task *task, bool is_learn) : + all_backends(&http_backends_collection::get()) { + selected = all_backends->get_upstream(is_learn); + } + ~http_backend_runtime() = default; + static auto dtor(void *p) -> void { + ((http_backend_runtime *)p)->~http_backend_runtime(); + } +}; + +auto http_backend_runtime::create(struct rspamd_task *task, bool is_learn) -> http_backend_runtime * +{ + /* Alloc type provide proper size and alignment */ + auto *allocated_runtime = rspamd_mempool_alloc_type(task->task_pool, http_backend_runtime); + + rspamd_mempool_add_destructor(task->task_pool, http_backend_runtime::dtor, allocated_runtime); + + return new (allocated_runtime) http_backend_runtime{task, is_learn}; +} + +auto +http_backends_collection::add_backend(struct rspamd_stat_ctx *ctx, + struct rspamd_config *cfg, + struct rspamd_statfile *st) -> bool +{ + /* On empty list of backends we know that we need to load backend data actually */ + if (backends.empty()) { + if (!first_init(ctx, cfg, st)) { + return false; + } + } + + backends.push_back(st); + + return true; +} + +auto http_backends_collection::first_init(struct rspamd_stat_ctx *ctx, + struct rspamd_config *cfg, + struct rspamd_statfile *st) -> bool +{ + auto try_load_backend_config = [&](const ucl_object_t *obj) -> bool { + if (!obj || ucl_object_type(obj) != UCL_OBJECT) { + return false; + } + + /* First try to load read servers */ + auto *rs = ucl_object_lookup_any(obj, "read_servers", "servers", nullptr); + if (rs) { + read_servers = rspamd_upstreams_create(cfg->ups_ctx); + + if (read_servers == nullptr) { + return false; + } + + if (!rspamd_upstreams_from_ucl(read_servers, rs, 80, this)) { + rspamd_upstreams_destroy(read_servers); + return false; + } + } + auto *ws = ucl_object_lookup_any(obj, "write_servers", "servers", nullptr); + if (ws) { + write_servers = rspamd_upstreams_create(cfg->ups_ctx); + + if (write_servers == nullptr) { + return false; + } + + if (!rspamd_upstreams_from_ucl(write_servers, rs, 80, this)) { + rspamd_upstreams_destroy(write_servers); + return false; + } + } + + auto *tim = ucl_object_lookup(obj, "timeout"); + + if (tim) { + timeout = ucl_object_todouble(tim); + } + + return true; + }; + + auto ret = false; + auto obj = ucl_object_lookup (st->classifier->cfg->opts, "backend"); + if (obj != nullptr) { + ret = try_load_backend_config(obj); + } + + /* Now try statfiles config */ + if (!ret && st->stcf->opts) { + ret = try_load_backend_config(st->stcf->opts); + } + + /* Now try classifier config */ + if (!ret && st->classifier->cfg->opts) { + ret = try_load_backend_config(st->classifier->cfg->opts); + } + + return ret; +} + +auto http_backends_collection::remove_backend(struct rspamd_statfile *st) -> bool +{ + auto backend_it = std::remove(std::begin(backends), std::end(backends), st); + + if (backend_it != std::end(backends)) { + /* Fast erasure with no order preservation */ + std::swap(*backend_it, backends.back()); + backends.pop_back(); + + if (backends.empty()) { + /* De-init collection - likely config reload */ + if (read_servers) { + rspamd_upstreams_destroy(read_servers); + read_servers = nullptr; + } + + if (write_servers) { + rspamd_upstreams_destroy(write_servers); + write_servers = nullptr; + } + } + + return true; + } + + return false; +} + +upstream *http_backends_collection::get_upstream(bool is_learn) +{ + auto *ups_list = read_servers; + if (is_learn) { + ups_list = write_servers; + } + + return rspamd_upstream_get(ups_list, RSPAMD_UPSTREAM_ROUND_ROBIN, nullptr, 0); +} + +} + +/* C API */ + +gpointer +rspamd_http_init(struct rspamd_stat_ctx* ctx, + struct rspamd_config* cfg, + struct rspamd_statfile* st) +{ + auto &collections = rspamd::stat::http::http_backends_collection::get(); + + if (!collections.add_backend(ctx, cfg, st)) { + msg_err_config("cannot load http backend"); + + return nullptr; + } + + return (void *)&collections; +} +gpointer +rspamd_http_runtime(struct rspamd_task* task, + struct rspamd_statfile_config* stcf, + gboolean learn, + gpointer ctx) +{ + auto maybe_existing = rspamd_mempool_get_variable(task->task_pool, RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME); + + if (maybe_existing != nullptr) { + return maybe_existing; + } + + auto runtime = rspamd::stat::http::http_backend_runtime::create(task, learn); + + if (runtime) { + rspamd_mempool_set_variable(task->task_pool, RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME, + (void *)runtime, nullptr); + } + + return (void *)runtime; +} + +gboolean +rspamd_http_process_tokens(struct rspamd_task* task, + GPtrArray* tokens, + gint id, + gpointer runtime) +{ + auto real_runtime = (rspamd::stat::http::http_backend_runtime *)runtime; + + if (real_runtime) { + /* TODO */ + return true; + } + + + return false; + +} +gboolean +rspamd_http_finalize_process(struct rspamd_task* task, + gpointer runtime, + gpointer ctx) +{ + /* Not needed */ + return true; +} + +gboolean +rspamd_http_learn_tokens(struct rspamd_task* task, + GPtrArray* tokens, + gint id, + gpointer runtime) +{ + auto real_runtime = (rspamd::stat::http::http_backend_runtime *)runtime; + + if (real_runtime) { + /* TODO */ + return true; + } + + + return false; +} +gboolean +rspamd_http_finalize_learn(struct rspamd_task* task, + gpointer runtime, + gpointer ctx, + GError** err) +{ + return false; +} + +gulong rspamd_http_total_learns(struct rspamd_task* task, + gpointer runtime, + gpointer ctx) +{ + /* TODO */ + return 0; +} +gulong +rspamd_http_inc_learns(struct rspamd_task* task, + gpointer runtime, + gpointer ctx) +{ + /* TODO */ + return 0; +} +gulong +rspamd_http_dec_learns(struct rspamd_task* task, + gpointer runtime, + gpointer ctx) +{ + /* TODO */ + return (gulong)-1; +} +gulong +rspamd_http_learns(struct rspamd_task* task, + gpointer runtime, + gpointer ctx) +{ + /* TODO */ + return 0; +} +ucl_object_t* +rspamd_http_get_stat(gpointer runtime, gpointer ctx) +{ + /* TODO */ + return nullptr; +} +gpointer +rspamd_http_load_tokenizer_config(gpointer runtime, gsize* len) +{ + return nullptr; +} +void +rspamd_http_close(gpointer ctx) +{ + /* TODO */ +}
\ No newline at end of file |