You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

symcache_runtime.cxx 25KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891
  1. /*
  2. * Copyright 2024 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "symcache_internal.hxx"
  17. #include "symcache_item.hxx"
  18. #include "symcache_runtime.hxx"
  19. #include "libutil/cxx/util.hxx"
  20. #include "libserver/task.h"
  21. #include "libmime/scan_result.h"
  22. #include "utlist.h"
  23. #include "libserver/worker_util.h"
  24. #include <limits>
  25. #include <cmath>
  26. namespace rspamd::symcache {
  27. /* At least once per minute */
  28. constexpr static const auto PROFILE_MAX_TIME = 60.0;
  29. /* For messages larger than 2Mb enable profiling */
  30. constexpr static const auto PROFILE_MESSAGE_SIZE_THRESHOLD = 1024ul * 1024 * 2;
  31. /* Enable profile at least once per this amount of messages processed */
  32. constexpr static const auto PROFILE_PROBABILITY = 0.01;
  33. auto symcache_runtime::create(struct rspamd_task *task, symcache &cache) -> symcache_runtime *
  34. {
  35. cache.maybe_resort();
  36. auto cur_order = cache.get_cache_order();
  37. auto allocated_size = sizeof(symcache_runtime) +
  38. sizeof(struct cache_dynamic_item) * cur_order->size();
  39. auto *checkpoint = (symcache_runtime *) rspamd_mempool_alloc0(task->task_pool,
  40. allocated_size);
  41. msg_debug_cache_task("create symcache runtime for task: %d bytes, %d items", (int) allocated_size, (int) cur_order->size());
  42. checkpoint->order = std::move(cur_order);
  43. checkpoint->slow_status = slow_status::none;
  44. /* Calculate profile probability */
  45. ev_now_update_if_cheap(task->event_loop);
  46. ev_tstamp now = ev_now(task->event_loop);
  47. checkpoint->profile_start = now;
  48. checkpoint->lim = rspamd_task_get_required_score(task, task->result);
  49. /*
  50. * We enable profiling if the following conditions are met:
  51. * - we have not profiled for a long time
  52. * - message is large
  53. * - random probability
  54. */
  55. if ((cache.get_last_profile() == 0.0 || now > cache.get_last_profile() + PROFILE_MAX_TIME) ||
  56. (task->msg.len >= PROFILE_MESSAGE_SIZE_THRESHOLD) ||
  57. (rspamd_random_double_fast() >= (1 - PROFILE_PROBABILITY))) {
  58. msg_debug_cache_task("enable profiling of symbols for task");
  59. checkpoint->profile = true;
  60. cache.set_last_profile(now);
  61. }
  62. task->symcache_runtime = (void *) checkpoint;
  63. return checkpoint;
  64. }
  65. auto symcache_runtime::process_settings(struct rspamd_task *task, const symcache &cache) -> bool
  66. {
  67. if (!task->settings) {
  68. msg_err_task("`process_settings` is called with no settings");
  69. return false;
  70. }
  71. const auto *wl = ucl_object_lookup(task->settings, "whitelist");
  72. if (wl != nullptr) {
  73. msg_info_task("task is whitelisted");
  74. task->flags |= RSPAMD_TASK_FLAG_SKIP;
  75. return true;
  76. }
  77. auto already_disabled = false;
  78. auto process_group = [&](const ucl_object_t *gr_obj, auto functor) -> void {
  79. ucl_object_iter_t it = nullptr;
  80. const ucl_object_t *cur;
  81. if (gr_obj) {
  82. while ((cur = ucl_iterate_object(gr_obj, &it, true)) != nullptr) {
  83. if (ucl_object_type(cur) == UCL_STRING) {
  84. auto *gr = (struct rspamd_symbols_group *)
  85. g_hash_table_lookup(task->cfg->groups,
  86. ucl_object_tostring(cur));
  87. if (gr) {
  88. GHashTableIter gr_it;
  89. void *k, *v;
  90. g_hash_table_iter_init(&gr_it, gr->symbols);
  91. while (g_hash_table_iter_next(&gr_it, &k, &v)) {
  92. functor((const char *) k);
  93. }
  94. }
  95. }
  96. }
  97. }
  98. };
  99. ucl_object_iter_t it = nullptr;
  100. const ucl_object_t *cur;
  101. const auto *enabled = ucl_object_lookup(task->settings, "symbols_enabled");
  102. if (enabled) {
  103. msg_debug_cache_task("disable all symbols as `symbols_enabled` is found");
  104. /* Disable all symbols but selected */
  105. disable_all_symbols(SYMBOL_TYPE_EXPLICIT_DISABLE);
  106. already_disabled = true;
  107. it = nullptr;
  108. while ((cur = ucl_iterate_object(enabled, &it, true)) != nullptr) {
  109. enable_symbol(task, cache, ucl_object_tostring(cur));
  110. }
  111. }
  112. /* Enable groups of symbols */
  113. enabled = ucl_object_lookup(task->settings, "groups_enabled");
  114. if (enabled && !already_disabled) {
  115. disable_all_symbols(SYMBOL_TYPE_EXPLICIT_DISABLE);
  116. }
  117. process_group(enabled, [&](const char *sym) {
  118. enable_symbol(task, cache, sym);
  119. });
  120. const auto *disabled = ucl_object_lookup(task->settings, "symbols_disabled");
  121. if (disabled) {
  122. it = nullptr;
  123. while ((cur = ucl_iterate_object(disabled, &it, true)) != nullptr) {
  124. disable_symbol(task, cache, ucl_object_tostring(cur));
  125. }
  126. }
  127. /* Disable groups of symbols */
  128. disabled = ucl_object_lookup(task->settings, "groups_disabled");
  129. process_group(disabled, [&](const char *sym) {
  130. disable_symbol(task, cache, sym);
  131. });
  132. /* Update required limit */
  133. lim = rspamd_task_get_required_score(task, task->result);
  134. return false;
  135. }
  136. auto symcache_runtime::savepoint_dtor(struct rspamd_task *task) -> void
  137. {
  138. msg_debug_cache_task("destroying savepoint");
  139. /* Drop shared ownership */
  140. order.reset();
  141. }
  142. auto symcache_runtime::disable_all_symbols(int skip_mask) -> void
  143. {
  144. for (auto [i, item]: rspamd::enumerate(order->d)) {
  145. auto *dyn_item = &dynamic_items[i];
  146. if (!(item->get_flags() & skip_mask)) {
  147. dyn_item->status = cache_item_status::finished;
  148. }
  149. }
  150. }
  151. auto symcache_runtime::disable_symbol(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool
  152. {
  153. const auto *item = cache.get_item_by_name(name, true);
  154. if (item != nullptr) {
  155. auto *dyn_item = get_dynamic_item(item->id);
  156. if (dyn_item) {
  157. dyn_item->status = cache_item_status::finished;
  158. msg_debug_cache_task("disable execution of %s", name.data());
  159. return true;
  160. }
  161. else {
  162. msg_debug_cache_task("cannot disable %s: id not found %d", name.data(), item->id);
  163. }
  164. }
  165. else {
  166. msg_debug_cache_task("cannot disable %s: symbol not found", name.data());
  167. }
  168. return false;
  169. }
  170. auto symcache_runtime::enable_symbol(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool
  171. {
  172. const auto *item = cache.get_item_by_name(name, true);
  173. if (item != nullptr) {
  174. auto *dyn_item = get_dynamic_item(item->id);
  175. if (dyn_item) {
  176. dyn_item->status = cache_item_status::not_started;
  177. msg_debug_cache_task("enable execution of %s", name.data());
  178. return true;
  179. }
  180. else {
  181. msg_debug_cache_task("cannot enable %s: id not found %d", name.data(), item->id);
  182. }
  183. }
  184. else {
  185. msg_debug_cache_task("cannot enable %s: symbol not found", name.data());
  186. }
  187. return false;
  188. }
  189. auto symcache_runtime::is_symbol_checked(const symcache &cache, std::string_view name) -> bool
  190. {
  191. const auto *item = cache.get_item_by_name(name, true);
  192. if (item != nullptr) {
  193. auto *dyn_item = get_dynamic_item(item->id);
  194. if (dyn_item) {
  195. return dyn_item->status != cache_item_status::not_started;
  196. }
  197. }
  198. return false;
  199. }
  200. auto symcache_runtime::is_symbol_enabled(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool
  201. {
  202. const auto *item = cache.get_item_by_name(name, true);
  203. if (item) {
  204. if (!item->is_allowed(task, true)) {
  205. return false;
  206. }
  207. else {
  208. auto *dyn_item = get_dynamic_item(item->id);
  209. if (dyn_item) {
  210. if (dyn_item->status != cache_item_status::not_started) {
  211. /* Already started */
  212. return false;
  213. }
  214. if (!item->is_virtual()) {
  215. return std::get<normal_item>(item->specific).check_conditions(item->symbol, task);
  216. }
  217. }
  218. else {
  219. /* Unknown item */
  220. msg_debug_cache_task("cannot enable %s: symbol not found", name.data());
  221. }
  222. }
  223. }
  224. return true;
  225. }
  226. auto symcache_runtime::get_dynamic_item(int id) const -> cache_dynamic_item *
  227. {
  228. /* Not found in the cache, do a hash lookup */
  229. auto our_id_maybe = rspamd::find_map(order->by_cache_id, id);
  230. if (our_id_maybe) {
  231. return &dynamic_items[our_id_maybe.value()];
  232. }
  233. return nullptr;
  234. }
  235. auto symcache_runtime::process_symbols(struct rspamd_task *task, symcache &cache, unsigned int stage) -> bool
  236. {
  237. msg_debug_cache_task("symbols processing stage at pass: %d", stage);
  238. if (RSPAMD_TASK_IS_SKIPPED(task)) {
  239. return true;
  240. }
  241. switch (stage) {
  242. case RSPAMD_TASK_STAGE_CONNFILTERS:
  243. case RSPAMD_TASK_STAGE_PRE_FILTERS:
  244. case RSPAMD_TASK_STAGE_POST_FILTERS:
  245. case RSPAMD_TASK_STAGE_IDEMPOTENT:
  246. return process_pre_postfilters(task, cache,
  247. rspamd_session_events_pending(task->s), stage);
  248. break;
  249. case RSPAMD_TASK_STAGE_FILTERS:
  250. return process_filters(task, cache, rspamd_session_events_pending(task->s));
  251. break;
  252. default:
  253. g_assert_not_reached();
  254. }
  255. }
  256. auto symcache_runtime::process_pre_postfilters(struct rspamd_task *task,
  257. symcache &cache,
  258. int start_events,
  259. unsigned int stage) -> bool
  260. {
  261. auto saved_priority = std::numeric_limits<int>::min();
  262. auto all_done = true;
  263. auto log_func = RSPAMD_LOG_FUNC;
  264. auto compare_functor = +[](int a, int b) { return a < b; };
  265. auto proc_func = [&](cache_item *item) {
  266. /*
  267. * We can safely ignore all pre/postfilters except idempotent ones and
  268. * those that are marked as ignore passthrough result
  269. */
  270. if (stage != RSPAMD_TASK_STAGE_IDEMPOTENT &&
  271. !(item->flags & SYMBOL_TYPE_IGNORE_PASSTHROUGH)) {
  272. if (check_metric_limit(task)) {
  273. msg_debug_cache_task_lambda("task has already the result being set, ignore further checks");
  274. return true;
  275. }
  276. }
  277. auto dyn_item = get_dynamic_item(item->id);
  278. if (dyn_item->status == cache_item_status::not_started) {
  279. if (slow_status == slow_status::enabled) {
  280. return false;
  281. }
  282. if (saved_priority == std::numeric_limits<int>::min()) {
  283. saved_priority = item->priority;
  284. }
  285. else {
  286. if (compare_functor(item->priority, saved_priority) &&
  287. rspamd_session_events_pending(task->s) > start_events) {
  288. /*
  289. * Delay further checks as we have higher
  290. * priority filters to be processed
  291. */
  292. return false;
  293. }
  294. }
  295. return process_symbol(task, cache, item, dyn_item);
  296. }
  297. /* Continue processing */
  298. return true;
  299. };
  300. switch (stage) {
  301. case RSPAMD_TASK_STAGE_CONNFILTERS:
  302. all_done = cache.connfilters_foreach(proc_func);
  303. break;
  304. case RSPAMD_TASK_STAGE_PRE_FILTERS:
  305. all_done = cache.prefilters_foreach(proc_func);
  306. break;
  307. case RSPAMD_TASK_STAGE_POST_FILTERS:
  308. compare_functor = +[](int a, int b) { return a > b; };
  309. all_done = cache.postfilters_foreach(proc_func);
  310. break;
  311. case RSPAMD_TASK_STAGE_IDEMPOTENT:
  312. compare_functor = +[](int a, int b) { return a > b; };
  313. all_done = cache.idempotent_foreach(proc_func);
  314. break;
  315. default:
  316. g_error("invalid invocation");
  317. break;
  318. }
  319. return all_done;
  320. }
  321. auto symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool
  322. {
  323. auto all_done = true;
  324. auto log_func = RSPAMD_LOG_FUNC;
  325. auto has_passtrough = false;
  326. for (const auto [idx, item]: rspamd::enumerate(order->d)) {
  327. /* Exclude all non filters */
  328. if (item->type != symcache_item_type::FILTER) {
  329. /*
  330. * We use breaking the loop as we append non-filters to the end of the list
  331. * so, it is safe to stop processing immediately
  332. */
  333. break;
  334. }
  335. if (!(item->flags & (SYMBOL_TYPE_FINE | SYMBOL_TYPE_IGNORE_PASSTHROUGH))) {
  336. if (has_passtrough || check_metric_limit(task)) {
  337. msg_debug_cache_task_lambda("task has already the result being set, ignore further checks");
  338. has_passtrough = true;
  339. /* Skip this item */
  340. continue;
  341. }
  342. }
  343. auto dyn_item = &dynamic_items[idx];
  344. if (dyn_item->status == cache_item_status::not_started) {
  345. all_done = false;
  346. if (!check_item_deps(task, cache, item.get(),
  347. dyn_item, false)) {
  348. msg_debug_cache_task("blocked execution of %d(%s) unless deps are "
  349. "resolved",
  350. item->id, item->symbol.c_str());
  351. continue;
  352. }
  353. process_symbol(task, cache, item.get(), dyn_item);
  354. if (slow_status == slow_status::enabled) {
  355. return false;
  356. }
  357. }
  358. }
  359. return all_done;
  360. }
  361. auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, cache_item *item,
  362. cache_dynamic_item *dyn_item) -> bool
  363. {
  364. if (item->type == symcache_item_type::CLASSIFIER || item->type == symcache_item_type::COMPOSITE) {
  365. /* Classifiers are special :( */
  366. return true;
  367. }
  368. if (rspamd_session_blocked(task->s)) {
  369. /*
  370. * We cannot add new events as session is either destroyed or
  371. * being cleaned up.
  372. */
  373. return true;
  374. }
  375. g_assert(!item->is_virtual());
  376. if (dyn_item->status != cache_item_status::not_started) {
  377. /*
  378. * This can actually happen when deps span over different layers
  379. */
  380. msg_debug_cache_task("skip already started %s(%d) symbol", item->symbol.c_str(), item->id);
  381. return dyn_item->status == cache_item_status::finished;
  382. }
  383. /* Check has been started */
  384. auto check = true;
  385. if (!item->is_allowed(task, true) || !item->check_conditions(task)) {
  386. check = false;
  387. }
  388. if (check) {
  389. dyn_item->status = cache_item_status::started;
  390. msg_debug_cache_task("execute %s, %d; symbol type = %s", item->symbol.data(),
  391. item->id, item_type_to_str(item->type));
  392. if (profile) {
  393. ev_now_update_if_cheap(task->event_loop);
  394. dyn_item->start_msec = (ev_now(task->event_loop) -
  395. profile_start) *
  396. 1e3;
  397. }
  398. dyn_item->async_events = 0;
  399. cur_item = dyn_item;
  400. items_inflight++;
  401. /* Callback now must finalize itself */
  402. if (item->call(task, dyn_item)) {
  403. cur_item = nullptr;
  404. if (items_inflight == 0) {
  405. msg_debug_cache_task("item %s, %d is now finished (no async events)", item->symbol.data(),
  406. item->id);
  407. dyn_item->status = cache_item_status::finished;
  408. return true;
  409. }
  410. if (dyn_item->async_events == 0 && dyn_item->status != cache_item_status::finished) {
  411. msg_err_cache_task("critical error: item %s has no async events pending, "
  412. "but it is not finalised",
  413. item->symbol.data());
  414. g_assert_not_reached();
  415. }
  416. else if (dyn_item->async_events > 0) {
  417. msg_debug_cache_task("item %s, %d is now pending with %d async events", item->symbol.data(),
  418. item->id, dyn_item->async_events);
  419. }
  420. return false;
  421. }
  422. else {
  423. /* We were not able to call item, so we assume it is not callable */
  424. msg_debug_cache_task("cannot call %s, %d; symbol type = %s", item->symbol.data(),
  425. item->id, item_type_to_str(item->type));
  426. dyn_item->status = cache_item_status::finished;
  427. return true;
  428. }
  429. }
  430. else {
  431. msg_debug_cache_task("do not check %s, %d", item->symbol.data(),
  432. item->id);
  433. dyn_item->status = cache_item_status::finished;
  434. }
  435. return true;
  436. }
  437. auto symcache_runtime::check_metric_limit(struct rspamd_task *task) -> bool
  438. {
  439. if (task->flags & RSPAMD_TASK_FLAG_PASS_ALL) {
  440. return false;
  441. }
  442. /* Check score limit */
  443. if (!std::isnan(lim)) {
  444. if (task->result->score > lim) {
  445. return true;
  446. }
  447. }
  448. if (task->result->passthrough_result != nullptr) {
  449. /* We also need to check passthrough results */
  450. auto *pr = task->result->passthrough_result;
  451. DL_FOREACH(task->result->passthrough_result, pr)
  452. {
  453. struct rspamd_action_config *act_config =
  454. rspamd_find_action_config_for_action(task->result, pr->action);
  455. /* Skip least results */
  456. if (pr->flags & RSPAMD_PASSTHROUGH_LEAST) {
  457. continue;
  458. }
  459. /* Skip disabled actions */
  460. if (act_config && (act_config->flags & RSPAMD_ACTION_RESULT_DISABLED)) {
  461. continue;
  462. }
  463. /* Immediately stop on non least passthrough action */
  464. return true;
  465. }
  466. }
  467. return false;
  468. }
  469. auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache, cache_item *item,
  470. cache_dynamic_item *dyn_item, bool check_only) -> bool
  471. {
  472. constexpr const auto max_recursion = 20;
  473. auto log_func = RSPAMD_LOG_FUNC;
  474. auto inner_functor = [&](int recursion, cache_item *item, cache_dynamic_item *dyn_item, auto rec_functor) -> bool {
  475. msg_debug_cache_task_lambda("recursively (%d) check dependencies for %s(%d)", recursion,
  476. item->symbol.c_str(), item->id);
  477. if (recursion > max_recursion) {
  478. msg_err_task_lambda("cyclic dependencies: maximum check level %ud exceed when "
  479. "checking dependencies for %s",
  480. max_recursion, item->symbol.c_str());
  481. return true;
  482. }
  483. auto ret = true;
  484. for (const auto &[dest_id, dep]: item->deps) {
  485. if (!dep.item) {
  486. /* Assume invalid deps as done */
  487. msg_debug_cache_task_lambda("symbol %d(%s) has invalid dependencies on %d(%s)",
  488. item->id, item->symbol.c_str(), dest_id, dep.sym.c_str());
  489. continue;
  490. }
  491. auto *dep_dyn_item = get_dynamic_item(dep.item->id);
  492. if (dep_dyn_item->status != cache_item_status::finished) {
  493. if (dep_dyn_item->status == cache_item_status::not_started) {
  494. /* Not started */
  495. if (!check_only) {
  496. if (!rec_functor(recursion + 1,
  497. dep.item,
  498. dep_dyn_item,
  499. rec_functor)) {
  500. ret = false;
  501. msg_debug_cache_task_lambda("delayed dependency %d(%s) for "
  502. "symbol %d(%s)",
  503. dest_id, dep.sym.c_str(), item->id, item->symbol.c_str());
  504. }
  505. else if (!process_symbol(task, cache, dep.item, dep_dyn_item)) {
  506. /* Now started, but has events pending */
  507. ret = false;
  508. msg_debug_cache_task_lambda("started check of %d(%s) symbol "
  509. "as dep for "
  510. "%d(%s)",
  511. dest_id, dep.sym.c_str(), item->id, item->symbol.c_str());
  512. }
  513. else {
  514. msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is "
  515. "already processed",
  516. dest_id, dep.sym.c_str(), item->id, item->symbol.c_str());
  517. }
  518. }
  519. else {
  520. msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) "
  521. "cannot be started now",
  522. dest_id, dep.sym.c_str(), item->id, item->symbol.c_str());
  523. ret = false;
  524. }
  525. }
  526. else {
  527. /* Started but not finished */
  528. msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is "
  529. "still executing (%d events pending)",
  530. dest_id, dep.sym.c_str(),
  531. item->id, item->symbol.c_str(),
  532. dep_dyn_item->async_events);
  533. g_assert(dep_dyn_item->async_events > 0);
  534. ret = false;
  535. }
  536. }
  537. else {
  538. msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is already "
  539. "finished",
  540. dest_id, dep.sym.c_str(), item->id, item->symbol.c_str());
  541. }
  542. }
  543. return ret;
  544. };
  545. return inner_functor(0, item, dyn_item, inner_functor);
  546. }
  547. struct rspamd_symcache_delayed_cbdata {
  548. cache_item *item;
  549. struct rspamd_task *task;
  550. symcache_runtime *runtime;
  551. struct rspamd_async_event *event;
  552. struct ev_timer tm;
  553. };
  554. static void
  555. rspamd_symcache_delayed_item_fin(gpointer ud)
  556. {
  557. auto *cbd = (struct rspamd_symcache_delayed_cbdata *) ud;
  558. cbd->event = nullptr;
  559. cbd->runtime->unset_slow();
  560. ev_timer_stop(cbd->task->event_loop, &cbd->tm);
  561. }
  562. static void
  563. rspamd_symcache_delayed_item_cb(EV_P_ ev_timer *w, int what)
  564. {
  565. auto *cbd = (struct rspamd_symcache_delayed_cbdata *) w->data;
  566. if (cbd->event) {
  567. cbd->event = nullptr;
  568. /* Timer will be stopped here; `has_slow` is also reset there */
  569. rspamd_session_remove_event(cbd->task->s,
  570. rspamd_symcache_delayed_item_fin, cbd);
  571. cbd->runtime->process_item_rdeps(cbd->task, cbd->item);
  572. }
  573. }
  574. static void
  575. rspamd_delayed_timer_dtor(gpointer d)
  576. {
  577. auto *cbd = (struct rspamd_symcache_delayed_cbdata *) d;
  578. if (cbd->event) {
  579. /* Event has not been executed, this will also stop a timer */
  580. rspamd_session_remove_event(cbd->task->s,
  581. rspamd_symcache_delayed_item_fin, cbd);
  582. cbd->event = nullptr;
  583. }
  584. }
  585. auto symcache_runtime::finalize_item(struct rspamd_task *task, cache_dynamic_item *dyn_item) -> void
  586. {
  587. /* Limit to consider a rule as slow (in milliseconds) */
  588. constexpr const double slow_diff_limit = 300;
  589. auto *item = get_item_by_dynamic_item(dyn_item);
  590. /* Sanity checks */
  591. g_assert(items_inflight > 0);
  592. g_assert(item != nullptr);
  593. if (dyn_item->async_events > 0) {
  594. /*
  595. * XXX: Race condition
  596. *
  597. * It is possible that some async event is still in flight, but we
  598. * already know its result, however, it is the responsibility of that
  599. * event to decrease async events count and call this function
  600. * one more time
  601. */
  602. msg_debug_cache_task("postpone finalisation of %s(%d) as there are %d "
  603. "async events pending",
  604. item->symbol.c_str(), item->id, dyn_item->async_events);
  605. return;
  606. }
  607. msg_debug_cache_task("process finalize for item %s(%d)", item->symbol.c_str(), item->id);
  608. dyn_item->status = cache_item_status::finished;
  609. items_inflight--;
  610. cur_item = nullptr;
  611. auto enable_slow_timer = [&]() -> bool {
  612. auto *cbd = rspamd_mempool_alloc0_type(task->task_pool, rspamd_symcache_delayed_cbdata);
  613. /* Add timer to allow something else to be executed */
  614. ev_timer *tm = &cbd->tm;
  615. cbd->event = rspamd_session_add_event(task->s,
  616. rspamd_symcache_delayed_item_fin, cbd,
  617. "symcache");
  618. cbd->runtime = this;
  619. /*
  620. * If no event could be added, then we are already in the destruction
  621. * phase. So the main issue is to deal with has slow here
  622. */
  623. if (cbd->event) {
  624. ev_timer_init(tm, rspamd_symcache_delayed_item_cb, 0.1, 0.0);
  625. ev_set_priority(tm, EV_MINPRI);
  626. rspamd_mempool_add_destructor(task->task_pool,
  627. rspamd_delayed_timer_dtor, cbd);
  628. cbd->task = task;
  629. cbd->item = item;
  630. tm->data = cbd;
  631. ev_timer_start(task->event_loop, tm);
  632. }
  633. else {
  634. /* Just reset as no timer is added */
  635. slow_status = slow_status::none;
  636. return false;
  637. }
  638. return true;
  639. };
  640. /* Check if we need to profile symbol (always profile when we have seen this item to be slow */
  641. if (profile || item->flags & cache_item::bit_slow) {
  642. ev_now_update_if_cheap(task->event_loop);
  643. auto diff = ((ev_now(task->event_loop) - profile_start) * 1e3 -
  644. dyn_item->start_msec);
  645. if (G_UNLIKELY(RSPAMD_TASK_IS_PROFILING(task))) {
  646. rspamd_task_profile_set(task, item->symbol.c_str(), diff);
  647. }
  648. if (rspamd_worker_is_scanner(task->worker)) {
  649. rspamd_set_counter(item->cd, diff);
  650. }
  651. if (diff > slow_diff_limit) {
  652. item->internal_flags |= cache_item::bit_slow;
  653. if (item->internal_flags & cache_item::bit_sync) {
  654. /*
  655. * We also need to adjust start timer for all async rules that
  656. * are started before this rule, as this rule could delay them
  657. * on its own. Hence, we need to make some corrections for all
  658. * rules pending
  659. */
  660. bool need_slow = false;
  661. for (const auto &[i, other_item]: rspamd::enumerate(order->d)) {
  662. auto *other_dyn_item = &dynamic_items[i];
  663. if (other_dyn_item->status == cache_item_status::pending && other_dyn_item->start_msec <= dyn_item->start_msec) {
  664. other_dyn_item->start_msec += diff;
  665. msg_debug_cache_task("slow sync rule %s(%d); adjust start time for pending rule %s(%d) by %.2fms to %dms",
  666. item->symbol.c_str(), item->id,
  667. other_item->symbol.c_str(),
  668. other_item->id,
  669. diff,
  670. (int) other_dyn_item->start_msec);
  671. /* We have something pending, so we need to enable slow timer */
  672. need_slow = true;
  673. }
  674. }
  675. if (need_slow && slow_status != slow_status::enabled) {
  676. slow_status = slow_status::enabled;
  677. msg_info_task("slow synchronous rule: %s(%d): %.2f ms; enable 100ms idle timer to allow other rules to be finished",
  678. item->symbol.c_str(), item->id,
  679. diff);
  680. if (enable_slow_timer()) {
  681. return;
  682. }
  683. }
  684. else {
  685. msg_info_task("slow synchronous rule: %s(%d): %.2f ms; idle timer has already been activated for this scan",
  686. item->symbol.c_str(), item->id,
  687. diff);
  688. }
  689. }
  690. else {
  691. msg_notice_task("slow asynchronous rule: %s(%d): %.2f ms; no idle timer is needed",
  692. item->symbol.c_str(), item->id,
  693. diff);
  694. }
  695. }
  696. else {
  697. item->internal_flags &= ~cache_item::bit_slow;
  698. }
  699. }
  700. process_item_rdeps(task, item);
  701. }
  702. auto symcache_runtime::process_item_rdeps(struct rspamd_task *task, cache_item *item) -> void
  703. {
  704. auto *cache_ptr = reinterpret_cast<symcache *>(task->cfg->cache);
  705. // Avoid race condition with the runtime destruction and the delay timer
  706. if (!order) {
  707. return;
  708. }
  709. for (const auto &[id, rdep]: item->rdeps.values()) {
  710. if (rdep.item) {
  711. auto *dyn_item = get_dynamic_item(rdep.item->id);
  712. if (dyn_item->status == cache_item_status::not_started) {
  713. msg_debug_cache_task("check item %d(%s) rdep of %s ",
  714. rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str());
  715. if (!check_item_deps(task, *cache_ptr, rdep.item, dyn_item, false)) {
  716. msg_debug_cache_task("blocked execution of %d(%s) rdep of %s "
  717. "unless deps are resolved",
  718. rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str());
  719. }
  720. else {
  721. process_symbol(task, *cache_ptr, rdep.item,
  722. dyn_item);
  723. }
  724. }
  725. }
  726. }
  727. }
  728. auto symcache_runtime::get_item_by_dynamic_item(cache_dynamic_item *dyn_item) const -> cache_item *
  729. {
  730. auto idx = dyn_item - dynamic_items;
  731. if (idx >= 0 && idx < order->size()) {
  732. return order->d[idx].get();
  733. }
  734. msg_err("internal error: invalid index to get: %d", (int) idx);
  735. return nullptr;
  736. }
  737. }// namespace rspamd::symcache