Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

http_backend.cxx 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. /*-
  2. * Copyright 2022 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "stat_internal.h"
  18. #include "libserver/http/http_connection.h"
  19. #include "libserver/mempool_vars_internal.h"
  20. #include "upstream.h"
  21. #include "contrib/ankerl/unordered_dense.h"
  22. #include <algorithm>
  23. #include <vector>
  24. namespace rspamd::stat::http {
  25. #define msg_debug_stat_http(...) rspamd_conditional_debug_fast(NULL, NULL, \
  26. rspamd_stat_http_log_id, "stat_http", task->task_pool->tag.uid, \
  27. RSPAMD_LOG_FUNC, \
  28. __VA_ARGS__)
  29. INIT_LOG_MODULE(stat_http)
  30. /* Represents all http backends defined in some configuration */
  31. class http_backends_collection {
  32. std::vector<struct rspamd_statfile *> backends;
  33. double timeout = 1.0; /* Default timeout */
  34. struct upstream_list *read_servers = nullptr;
  35. struct upstream_list *write_servers = nullptr;
  36. public:
  37. static auto get() -> http_backends_collection &
  38. {
  39. static http_backends_collection *singleton = nullptr;
  40. if (singleton == nullptr) {
  41. singleton = new http_backends_collection;
  42. }
  43. return *singleton;
  44. }
  45. /**
  46. * Add a new backend and (optionally initialize the basic backend parameters
  47. * @param ctx
  48. * @param cfg
  49. * @param st
  50. * @return
  51. */
  52. auto add_backend(struct rspamd_stat_ctx *ctx,
  53. struct rspamd_config *cfg,
  54. struct rspamd_statfile *st) -> bool;
  55. /**
  56. * Remove a statfile cleaning things up if the last statfile is removed
  57. * @param st
  58. * @return
  59. */
  60. auto remove_backend(struct rspamd_statfile *st) -> bool;
  61. upstream *get_upstream(bool is_learn);
  62. private:
  63. http_backends_collection() = default;
  64. auto first_init(struct rspamd_stat_ctx *ctx,
  65. struct rspamd_config *cfg,
  66. struct rspamd_statfile *st) -> bool;
  67. };
  68. /*
  69. * Created one per each task
  70. */
  71. class http_backend_runtime final {
  72. public:
  73. static auto create(struct rspamd_task *task, bool is_learn) -> http_backend_runtime *;
  74. /* Add a new statfile with a specific id to the list of statfiles */
  75. auto notice_statfile(int id, const struct rspamd_statfile_config *st) -> void
  76. {
  77. seen_statfiles[id] = st;
  78. }
  79. auto process_tokens(struct rspamd_task *task,
  80. GPtrArray *tokens,
  81. int id,
  82. bool learn) -> bool;
  83. private:
  84. http_backends_collection *all_backends;
  85. ankerl::unordered_dense::map<int, const struct rspamd_statfile_config *> seen_statfiles;
  86. struct upstream *selected;
  87. private:
  88. http_backend_runtime(struct rspamd_task *task, bool is_learn)
  89. : all_backends(&http_backends_collection::get())
  90. {
  91. selected = all_backends->get_upstream(is_learn);
  92. }
  93. ~http_backend_runtime() = default;
  94. static auto dtor(void *p) -> void
  95. {
  96. ((http_backend_runtime *) p)->~http_backend_runtime();
  97. }
  98. };
  99. /*
  100. * Efficient way to make a messagepack payload from stat tokens,
  101. * avoiding any intermediate libraries, as we would send many tokens
  102. * all together
  103. */
  104. static auto
  105. stat_tokens_to_msgpack(GPtrArray *tokens) -> std::vector<std::uint8_t>
  106. {
  107. std::vector<std::uint8_t> ret;
  108. rspamd_token_t *cur;
  109. int i;
  110. /*
  111. * We define array, it's size and N elements each is uint64_t
  112. * Layout:
  113. * 0xdd - array marker
  114. * [4 bytes be] - size of the array
  115. * [ 0xcf + <8 bytes BE integer>] * N - array elements
  116. */
  117. ret.resize(tokens->len * (sizeof(std::uint64_t) + 1) + 5);
  118. ret.push_back('\xdd');
  119. std::uint32_t ulen = GUINT32_TO_BE(tokens->len);
  120. std::copy((const std::uint8_t *) &ulen,
  121. ((const std::uint8_t *) &ulen) + sizeof(ulen), std::back_inserter(ret));
  122. PTR_ARRAY_FOREACH(tokens, i, cur)
  123. {
  124. ret.push_back('\xcf');
  125. std::uint64_t val = GUINT64_TO_BE(cur->data);
  126. std::copy((const std::uint8_t *) &val,
  127. ((const std::uint8_t *) &val) + sizeof(val), std::back_inserter(ret));
  128. }
  129. return ret;
  130. }
  131. auto http_backend_runtime::create(struct rspamd_task *task, bool is_learn) -> http_backend_runtime *
  132. {
  133. /* Alloc type provide proper size and alignment */
  134. auto *allocated_runtime = rspamd_mempool_alloc_type(task->task_pool, http_backend_runtime);
  135. rspamd_mempool_add_destructor(task->task_pool, http_backend_runtime::dtor, allocated_runtime);
  136. return new (allocated_runtime) http_backend_runtime{task, is_learn};
  137. }
  138. auto http_backend_runtime::process_tokens(struct rspamd_task *task, GPtrArray *tokens, int id, bool learn) -> bool
  139. {
  140. if (!learn) {
  141. if (id == seen_statfiles.size() - 1) {
  142. /* Emit http request on the last statfile */
  143. }
  144. }
  145. else {
  146. /* On learn we need to learn all statfiles that we were requested to learn */
  147. if (seen_statfiles.empty()) {
  148. /* Request has been already set, or nothing to learn */
  149. return true;
  150. }
  151. else {
  152. seen_statfiles.clear();
  153. }
  154. }
  155. return true;
  156. }
  157. auto http_backends_collection::add_backend(struct rspamd_stat_ctx *ctx,
  158. struct rspamd_config *cfg,
  159. struct rspamd_statfile *st) -> bool
  160. {
  161. /* On empty list of backends we know that we need to load backend data actually */
  162. if (backends.empty()) {
  163. if (!first_init(ctx, cfg, st)) {
  164. return false;
  165. }
  166. }
  167. backends.push_back(st);
  168. return true;
  169. }
  170. auto http_backends_collection::first_init(struct rspamd_stat_ctx *ctx,
  171. struct rspamd_config *cfg,
  172. struct rspamd_statfile *st) -> bool
  173. {
  174. auto try_load_backend_config = [&](const ucl_object_t *obj) -> bool {
  175. if (!obj || ucl_object_type(obj) != UCL_OBJECT) {
  176. return false;
  177. }
  178. /* First try to load read servers */
  179. auto *rs = ucl_object_lookup_any(obj, "read_servers", "servers", nullptr);
  180. if (rs) {
  181. read_servers = rspamd_upstreams_create(cfg->ups_ctx);
  182. if (read_servers == nullptr) {
  183. return false;
  184. }
  185. if (!rspamd_upstreams_from_ucl(read_servers, rs, 80, this)) {
  186. rspamd_upstreams_destroy(read_servers);
  187. return false;
  188. }
  189. }
  190. auto *ws = ucl_object_lookup_any(obj, "write_servers", "servers", nullptr);
  191. if (ws) {
  192. write_servers = rspamd_upstreams_create(cfg->ups_ctx);
  193. if (write_servers == nullptr) {
  194. return false;
  195. }
  196. if (!rspamd_upstreams_from_ucl(write_servers, rs, 80, this)) {
  197. rspamd_upstreams_destroy(write_servers);
  198. return false;
  199. }
  200. }
  201. auto *tim = ucl_object_lookup(obj, "timeout");
  202. if (tim) {
  203. timeout = ucl_object_todouble(tim);
  204. }
  205. return true;
  206. };
  207. auto ret = false;
  208. auto obj = ucl_object_lookup(st->classifier->cfg->opts, "backend");
  209. if (obj != nullptr) {
  210. ret = try_load_backend_config(obj);
  211. }
  212. /* Now try statfiles config */
  213. if (!ret && st->stcf->opts) {
  214. ret = try_load_backend_config(st->stcf->opts);
  215. }
  216. /* Now try classifier config */
  217. if (!ret && st->classifier->cfg->opts) {
  218. ret = try_load_backend_config(st->classifier->cfg->opts);
  219. }
  220. return ret;
  221. }
  222. auto http_backends_collection::remove_backend(struct rspamd_statfile *st) -> bool
  223. {
  224. auto backend_it = std::remove(std::begin(backends), std::end(backends), st);
  225. if (backend_it != std::end(backends)) {
  226. /* Fast erasure with no order preservation */
  227. std::swap(*backend_it, backends.back());
  228. backends.pop_back();
  229. if (backends.empty()) {
  230. /* De-init collection - likely config reload */
  231. if (read_servers) {
  232. rspamd_upstreams_destroy(read_servers);
  233. read_servers = nullptr;
  234. }
  235. if (write_servers) {
  236. rspamd_upstreams_destroy(write_servers);
  237. write_servers = nullptr;
  238. }
  239. }
  240. return true;
  241. }
  242. return false;
  243. }
  244. upstream *http_backends_collection::get_upstream(bool is_learn)
  245. {
  246. auto *ups_list = read_servers;
  247. if (is_learn) {
  248. ups_list = write_servers;
  249. }
  250. return rspamd_upstream_get(ups_list, RSPAMD_UPSTREAM_ROUND_ROBIN, nullptr, 0);
  251. }
  252. }// namespace rspamd::stat::http
  253. /* C API */
  254. gpointer
  255. rspamd_http_init(struct rspamd_stat_ctx *ctx,
  256. struct rspamd_config *cfg,
  257. struct rspamd_statfile *st)
  258. {
  259. auto &collections = rspamd::stat::http::http_backends_collection::get();
  260. if (!collections.add_backend(ctx, cfg, st)) {
  261. msg_err_config("cannot load http backend");
  262. return nullptr;
  263. }
  264. return (void *) &collections;
  265. }
  266. gpointer
  267. rspamd_http_runtime(struct rspamd_task *task,
  268. struct rspamd_statfile_config *stcf,
  269. gboolean learn,
  270. gpointer ctx,
  271. int id)
  272. {
  273. auto maybe_existing = rspamd_mempool_get_variable(task->task_pool, RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME);
  274. if (maybe_existing != nullptr) {
  275. auto real_runtime = (rspamd::stat::http::http_backend_runtime *) maybe_existing;
  276. real_runtime->notice_statfile(id, stcf);
  277. return maybe_existing;
  278. }
  279. auto runtime = rspamd::stat::http::http_backend_runtime::create(task, learn);
  280. if (runtime) {
  281. runtime->notice_statfile(id, stcf);
  282. rspamd_mempool_set_variable(task->task_pool, RSPAMD_MEMPOOL_HTTP_STAT_BACKEND_RUNTIME,
  283. (void *) runtime, nullptr);
  284. }
  285. return (void *) runtime;
  286. }
  287. gboolean
  288. rspamd_http_process_tokens(struct rspamd_task *task,
  289. GPtrArray *tokens,
  290. int id,
  291. gpointer runtime)
  292. {
  293. auto real_runtime = (rspamd::stat::http::http_backend_runtime *) runtime;
  294. if (real_runtime) {
  295. return real_runtime->process_tokens(task, tokens, id, false);
  296. }
  297. return false;
  298. }
  299. gboolean
  300. rspamd_http_finalize_process(struct rspamd_task *task,
  301. gpointer runtime,
  302. gpointer ctx)
  303. {
  304. /* Not needed */
  305. return true;
  306. }
  307. gboolean
  308. rspamd_http_learn_tokens(struct rspamd_task *task,
  309. GPtrArray *tokens,
  310. int id,
  311. gpointer runtime)
  312. {
  313. auto real_runtime = (rspamd::stat::http::http_backend_runtime *) runtime;
  314. if (real_runtime) {
  315. return real_runtime->process_tokens(task, tokens, id, true);
  316. }
  317. return false;
  318. }
  319. gboolean
  320. rspamd_http_finalize_learn(struct rspamd_task *task,
  321. gpointer runtime,
  322. gpointer ctx,
  323. GError **err)
  324. {
  325. return false;
  326. }
  327. gulong rspamd_http_total_learns(struct rspamd_task *task,
  328. gpointer runtime,
  329. gpointer ctx)
  330. {
  331. /* TODO */
  332. return 0;
  333. }
  334. gulong
  335. rspamd_http_inc_learns(struct rspamd_task *task,
  336. gpointer runtime,
  337. gpointer ctx)
  338. {
  339. /* TODO */
  340. return 0;
  341. }
  342. gulong
  343. rspamd_http_dec_learns(struct rspamd_task *task,
  344. gpointer runtime,
  345. gpointer ctx)
  346. {
  347. /* TODO */
  348. return (gulong) -1;
  349. }
  350. gulong
  351. rspamd_http_learns(struct rspamd_task *task,
  352. gpointer runtime,
  353. gpointer ctx)
  354. {
  355. /* TODO */
  356. return 0;
  357. }
  358. ucl_object_t *
  359. rspamd_http_get_stat(gpointer runtime, gpointer ctx)
  360. {
  361. /* TODO */
  362. return nullptr;
  363. }
  364. gpointer
  365. rspamd_http_load_tokenizer_config(gpointer runtime, gsize *len)
  366. {
  367. return nullptr;
  368. }
  369. void rspamd_http_close(gpointer ctx)
  370. {
  371. /* TODO */
  372. }