]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Add experimental HTTP statistics backend
authorVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 11 Jun 2022 12:24:50 +0000 (13:24 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Sat, 11 Jun 2022 12:24:50 +0000 (13:24 +0100)
src/libmime/received.cxx
src/libserver/mempool_vars_internal.h
src/libstat/CMakeLists.txt
src/libstat/backends/backends.h
src/libstat/backends/cdb_backend.cxx
src/libstat/backends/http_backend.cxx [new file with mode: 0644]

index 82ec2dac033becd0a0bf2eb2acdb58150916787e..691d7ca04edf67cb0f92582b74bfbb6f493e7478 100644 (file)
  * limitations under the License.
  */
 
-#include <mempool_vars_internal.h>
 #include "config.h"
 #include "libserver/url.h"
 #include "lua/lua_common.h"
 #include "libserver/cfg_file.h"
+#include "libserver/mempool_vars_internal.h"
 #include "mime_string.hxx"
 #include "smtp_parsers.h"
 #include "message.h"
index 6b68dd5a55d8ae18d344e980bfa95ecc5ec59098..72cf1b0955562f6af6b261167d50d7d3d87d819e 100644 (file)
@@ -41,5 +41,6 @@
 #define RSPAMD_MEMPOOL_SPAM_LEARNS "spam_learns"
 #define RSPAMD_MEMPOOL_HAM_LEARNS "ham_learns"
 #define RSPAMD_MEMPOOL_RE_MAPS_CACHE "re_maps_cache"
+#define RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME "stat_http_runtime"
 
 #endif
index 19962239d14f4e74a866718f996a9344a0cc546f..b1df5c1e621eecdf4b53a69bc026c9e6ba1313ca 100644 (file)
@@ -11,6 +11,7 @@ SET(CLASSIFIERSSRC    ${CMAKE_CURRENT_SOURCE_DIR}/classifiers/bayes.c
 SET(BACKENDSSRC        ${CMAKE_CURRENT_SOURCE_DIR}/backends/mmaped_file.c
                                        ${CMAKE_CURRENT_SOURCE_DIR}/backends/sqlite3_backend.c
                                        ${CMAKE_CURRENT_SOURCE_DIR}/backends/cdb_backend.cxx
+                                       ${CMAKE_CURRENT_SOURCE_DIR}/backends/http_backend.cxx
                                        ${CMAKE_CURRENT_SOURCE_DIR}/backends/redis_backend.c)
 
 SET(CACHESSRC  ${CMAKE_CURRENT_SOURCE_DIR}/learn_cache/sqlite3_cache.c
index 4c0b2276bcb13b76d947aa8927e50b6a42d805d5..afb408a1a02d44901bc7201efe1d9cb2d0127089 100644 (file)
@@ -84,13 +84,13 @@ struct rspamd_stat_backend {
                 gboolean learn, gpointer ctx); \
         gboolean rspamd_##name##_process_tokens (struct rspamd_task *task, \
                 GPtrArray *tokens, gint id, \
-                gpointer ctx); \
+                gpointer runtime); \
         gboolean rspamd_##name##_finalize_process (struct rspamd_task *task, \
                 gpointer runtime, \
                 gpointer ctx); \
         gboolean rspamd_##name##_learn_tokens (struct rspamd_task *task, \
                 GPtrArray *tokens, gint id, \
-                gpointer ctx); \
+                gpointer runtime); \
         gboolean rspamd_##name##_finalize_learn (struct rspamd_task *task, \
                 gpointer runtime, \
                 gpointer ctx, GError **err); \
@@ -116,6 +116,7 @@ RSPAMD_STAT_BACKEND_DEF(mmaped_file);
 RSPAMD_STAT_BACKEND_DEF(sqlite3);
 RSPAMD_STAT_BACKEND_DEF(cdb);
 RSPAMD_STAT_BACKEND_DEF(redis);
+RSPAMD_STAT_BACKEND_DEF(http);
 
 #ifdef  __cplusplus
 }
index 15c3d30350dc1463f66e08edd02ac07fc1f1e993..590cddafa9c0a56747ec634e5dc2bd23da69cf4b 100644 (file)
@@ -377,9 +377,9 @@ gboolean
 rspamd_cdb_process_tokens(struct rspamd_task* task,
                                                                   GPtrArray* tokens,
                                                                   gint id,
-                                                                  gpointer ctx)
+                                                                  gpointer runtime)
 {
-       auto *cdbp = CDB_FROM_RAW(ctx);
+       auto *cdbp = CDB_FROM_RAW(runtime);
        bool seen_values = false;
 
        for (auto i = 0u; i < tokens->len; i++) {
diff --git a/src/libstat/backends/http_backend.cxx b/src/libstat/backends/http_backend.cxx
new file mode 100644 (file)
index 0000000..bd3fd1d
--- /dev/null
@@ -0,0 +1,366 @@
+/*-
+ * Copyright 2022 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "config.h"
+#include "stat_internal.h"
+#include "libserver/http/http_connection.h"
+#include "libserver/mempool_vars_internal.h"
+#include "upstream.h"
+#include "contrib/robin-hood/robin_hood.h"
+#include <vector>
+
+namespace rspamd::stat::http {
+
+#define msg_debug_stat_http(...)  rspamd_conditional_debug_fast (NULL, NULL, \
+        rspamd_stat_http_log_id, "stat_http", task->task_pool->tag.uid, \
+        RSPAMD_LOG_FUNC, \
+        __VA_ARGS__)
+
+INIT_LOG_MODULE(stat_http)
+
+/* Represents all http backends defined in some configuration */
+class http_backends_collection {
+       std::vector<struct rspamd_statfile *> backends;
+       double timeout = 1.0; /* Default timeout */
+       struct upstream_list *read_servers = nullptr;
+       struct upstream_list *write_servers = nullptr;
+public:
+       static auto get() -> http_backends_collection& {
+               static http_backends_collection *singleton = nullptr;
+
+               if (singleton == nullptr) {
+                       singleton = new http_backends_collection;
+               }
+
+               return *singleton;
+       }
+
+       /**
+        * Add a new backend and (optionally initialize the basic backend parameters
+        * @param ctx
+        * @param cfg
+        * @param st
+        * @return
+        */
+       auto add_backend(struct rspamd_stat_ctx *ctx,
+                                        struct rspamd_config *cfg,
+                                        struct rspamd_statfile *st) -> bool;
+       /**
+        * Remove a statfile cleaning things up if the last statfile is removed
+        * @param st
+        * @return
+        */
+       auto remove_backend(struct rspamd_statfile *st) -> bool;
+
+       upstream *get_upstream(bool is_learn);
+
+private:
+       http_backends_collection() = default;
+       auto first_init(struct rspamd_stat_ctx *ctx,
+                                       struct rspamd_config *cfg,
+                                       struct rspamd_statfile *st) -> bool;
+};
+
+/*
+ * Created one per each task
+ */
+class http_backend_runtime final {
+public:
+       static auto create(struct rspamd_task *task, bool is_learn) -> http_backend_runtime *;
+private:
+       http_backends_collection *all_backends;
+       robin_hood::unordered_flat_map<int, struct rspamd_statfile *> seen_statfiles;
+       struct upstream *selected;
+private:
+       http_backend_runtime(struct rspamd_task *task, bool is_learn) :
+                       all_backends(&http_backends_collection::get()) {
+               selected = all_backends->get_upstream(is_learn);
+       }
+       ~http_backend_runtime() = default;
+       static auto dtor(void *p) -> void {
+               ((http_backend_runtime *)p)->~http_backend_runtime();
+       }
+};
+
+auto http_backend_runtime::create(struct rspamd_task *task, bool is_learn) -> http_backend_runtime *
+{
+       /* Alloc type provide proper size and alignment */
+       auto *allocated_runtime = rspamd_mempool_alloc_type(task->task_pool, http_backend_runtime);
+
+       rspamd_mempool_add_destructor(task->task_pool, http_backend_runtime::dtor, allocated_runtime);
+
+       return new (allocated_runtime) http_backend_runtime{task, is_learn};
+}
+
+auto
+http_backends_collection::add_backend(struct rspamd_stat_ctx *ctx,
+                                                                         struct rspamd_config *cfg,
+                                                                         struct rspamd_statfile *st) -> bool
+{
+       /* On empty list of backends we know that we need to load backend data actually */
+       if (backends.empty()) {
+               if (!first_init(ctx, cfg, st)) {
+                       return false;
+               }
+       }
+
+       backends.push_back(st);
+
+       return true;
+}
+
+auto http_backends_collection::first_init(struct rspamd_stat_ctx *ctx,
+                                                                                 struct rspamd_config *cfg,
+                                                                                 struct rspamd_statfile *st) -> bool
+{
+       auto try_load_backend_config = [&](const ucl_object_t *obj) -> bool {
+               if (!obj || ucl_object_type(obj) != UCL_OBJECT) {
+                       return false;
+               }
+
+               /* First try to load read servers */
+               auto *rs = ucl_object_lookup_any(obj, "read_servers", "servers", nullptr);
+               if (rs) {
+                       read_servers = rspamd_upstreams_create(cfg->ups_ctx);
+
+                       if (read_servers == nullptr) {
+                               return false;
+                       }
+
+                       if (!rspamd_upstreams_from_ucl(read_servers, rs, 80, this)) {
+                               rspamd_upstreams_destroy(read_servers);
+                               return false;
+                       }
+               }
+               auto *ws = ucl_object_lookup_any(obj, "write_servers", "servers", nullptr);
+               if (ws) {
+                       write_servers = rspamd_upstreams_create(cfg->ups_ctx);
+
+                       if (write_servers == nullptr) {
+                               return false;
+                       }
+
+                       if (!rspamd_upstreams_from_ucl(write_servers, rs, 80, this)) {
+                               rspamd_upstreams_destroy(write_servers);
+                               return false;
+                       }
+               }
+
+               auto *tim = ucl_object_lookup(obj, "timeout");
+
+               if (tim) {
+                       timeout = ucl_object_todouble(tim);
+               }
+
+               return true;
+       };
+
+       auto ret = false;
+       auto obj = ucl_object_lookup (st->classifier->cfg->opts, "backend");
+       if (obj != nullptr) {
+               ret = try_load_backend_config(obj);
+       }
+
+       /* Now try statfiles config */
+       if (!ret && st->stcf->opts) {
+               ret = try_load_backend_config(st->stcf->opts);
+       }
+
+       /* Now try classifier config */
+       if (!ret && st->classifier->cfg->opts) {
+               ret = try_load_backend_config(st->classifier->cfg->opts);
+       }
+
+       return ret;
+}
+
+auto http_backends_collection::remove_backend(struct rspamd_statfile *st) -> bool
+{
+       auto backend_it = std::remove(std::begin(backends), std::end(backends), st);
+
+       if (backend_it != std::end(backends)) {
+               /* Fast erasure with no order preservation */
+               std::swap(*backend_it, backends.back());
+               backends.pop_back();
+
+               if (backends.empty()) {
+                       /* De-init collection - likely config reload */
+                       if (read_servers) {
+                               rspamd_upstreams_destroy(read_servers);
+                               read_servers = nullptr;
+                       }
+
+                       if (write_servers) {
+                               rspamd_upstreams_destroy(write_servers);
+                               write_servers = nullptr;
+                       }
+               }
+
+               return true;
+       }
+
+       return false;
+}
+
+upstream *http_backends_collection::get_upstream(bool is_learn)
+{
+       auto *ups_list = read_servers;
+       if (is_learn) {
+               ups_list = write_servers;
+       }
+
+       return rspamd_upstream_get(ups_list, RSPAMD_UPSTREAM_ROUND_ROBIN, nullptr, 0);
+}
+
+}
+
+/* C API */
+
+gpointer
+rspamd_http_init(struct rspamd_stat_ctx* ctx,
+                               struct rspamd_config* cfg,
+                               struct rspamd_statfile* st)
+{
+       auto &collections = rspamd::stat::http::http_backends_collection::get();
+
+       if (!collections.add_backend(ctx, cfg, st)) {
+               msg_err_config("cannot load http backend");
+
+               return nullptr;
+       }
+
+       return (void *)&collections;
+}
+gpointer
+rspamd_http_runtime(struct rspamd_task* task,
+                                  struct rspamd_statfile_config* stcf,
+                                  gboolean learn,
+                                  gpointer ctx)
+{
+       auto maybe_existing = rspamd_mempool_get_variable(task->task_pool, RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME);
+
+       if (maybe_existing != nullptr) {
+               return maybe_existing;
+       }
+
+       auto runtime = rspamd::stat::http::http_backend_runtime::create(task, learn);
+
+       if (runtime) {
+               rspamd_mempool_set_variable(task->task_pool, RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME,
+                               (void *)runtime, nullptr);
+       }
+
+       return (void *)runtime;
+}
+
+gboolean
+rspamd_http_process_tokens(struct rspamd_task* task,
+                                                 GPtrArray* tokens,
+                                                 gint id,
+                                                 gpointer runtime)
+{
+       auto real_runtime = (rspamd::stat::http::http_backend_runtime *)runtime;
+
+       if (real_runtime) {
+               /* TODO */
+               return true;
+       }
+
+
+       return false;
+
+}
+gboolean
+rspamd_http_finalize_process(struct rspamd_task* task,
+                                                       gpointer runtime,
+                                                       gpointer ctx)
+{
+       /* Not needed */
+       return true;
+}
+
+gboolean
+rspamd_http_learn_tokens(struct rspamd_task* task,
+                                               GPtrArray* tokens,
+                                               gint id,
+                                               gpointer runtime)
+{
+       auto real_runtime = (rspamd::stat::http::http_backend_runtime *)runtime;
+
+       if (real_runtime) {
+               /* TODO */
+               return true;
+       }
+
+
+       return false;
+}
+gboolean
+rspamd_http_finalize_learn(struct rspamd_task* task,
+                                                 gpointer runtime,
+                                                 gpointer ctx,
+                                                 GError** err)
+{
+       return false;
+}
+
+gulong rspamd_http_total_learns(struct rspamd_task* task,
+                                                          gpointer runtime,
+                                                          gpointer ctx)
+{
+       /* TODO */
+       return 0;
+}
+gulong
+rspamd_http_inc_learns(struct rspamd_task* task,
+                                         gpointer runtime,
+                                         gpointer ctx)
+{
+       /* TODO */
+       return 0;
+}
+gulong
+rspamd_http_dec_learns(struct rspamd_task* task,
+                                         gpointer runtime,
+                                         gpointer ctx)
+{
+       /* TODO */
+       return (gulong)-1;
+}
+gulong
+rspamd_http_learns(struct rspamd_task* task,
+                                 gpointer runtime,
+                                 gpointer ctx)
+{
+       /* TODO */
+       return 0;
+}
+ucl_object_t*
+rspamd_http_get_stat(gpointer runtime, gpointer ctx)
+{
+       /* TODO */
+       return nullptr;
+}
+gpointer
+rspamd_http_load_tokenizer_config(gpointer runtime, gsize* len)
+{
+       return nullptr;
+}
+void
+rspamd_http_close(gpointer ctx)
+{
+       /* TODO */
+}
\ No newline at end of file