You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

symcache_runtime.cxx 21KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823
  1. /*
  2. * Copyright 2023 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "symcache_internal.hxx"
  17. #include "symcache_item.hxx"
  18. #include "symcache_runtime.hxx"
  19. #include "libutil/cxx/util.hxx"
  20. #include "libserver/task.h"
  21. #include "libmime/scan_result.h"
  22. #include "utlist.h"
  23. #include "libserver/worker_util.h"
  24. #include <limits>
  25. #include <cmath>
  26. namespace rspamd::symcache {
  27. /* At least once per minute */
  28. constexpr static const auto PROFILE_MAX_TIME = 60.0;
  29. /* For messages larger than 2Mb enable profiling */
  30. constexpr static const auto PROFILE_MESSAGE_SIZE_THRESHOLD = 1024ul * 1024 * 2;
  31. /* Enable profile at least once per this amount of messages processed */
  32. constexpr static const auto PROFILE_PROBABILITY = 0.01;
  33. auto symcache_runtime::create(struct rspamd_task *task, symcache &cache) -> symcache_runtime *
  34. {
  35. cache.maybe_resort();
  36. auto &&cur_order = cache.get_cache_order();
  37. auto *checkpoint = (symcache_runtime *) rspamd_mempool_alloc0(task->task_pool,
  38. sizeof(symcache_runtime) +
  39. sizeof(struct cache_dynamic_item) * cur_order->size());
  40. checkpoint->order = cache.get_cache_order();
  41. /* Calculate profile probability */
  42. ev_now_update_if_cheap(task->event_loop);
  43. ev_tstamp now = ev_now(task->event_loop);
  44. checkpoint->profile_start = now;
  45. checkpoint->lim = rspamd_task_get_required_score(task, task->result);
  46. if ((cache.get_last_profile() == 0.0 || now > cache.get_last_profile() + PROFILE_MAX_TIME) ||
  47. (task->msg.len >= PROFILE_MESSAGE_SIZE_THRESHOLD) ||
  48. (rspamd_random_double_fast() >= (1 - PROFILE_PROBABILITY))) {
  49. msg_debug_cache_task("enable profiling of symbols for task");
  50. checkpoint->profile = true;
  51. cache.set_last_profile(now);
  52. }
  53. task->symcache_runtime = (void *) checkpoint;
  54. return checkpoint;
  55. }
  56. auto symcache_runtime::process_settings(struct rspamd_task *task, const symcache &cache) -> bool
  57. {
  58. if (!task->settings) {
  59. msg_err_task("`process_settings` is called with no settings");
  60. return false;
  61. }
  62. const auto *wl = ucl_object_lookup(task->settings, "whitelist");
  63. if (wl != nullptr) {
  64. msg_info_task("task is whitelisted");
  65. task->flags |= RSPAMD_TASK_FLAG_SKIP;
  66. return true;
  67. }
  68. auto already_disabled = false;
  69. auto process_group = [&](const ucl_object_t *gr_obj, auto functor) -> void {
  70. ucl_object_iter_t it = nullptr;
  71. const ucl_object_t *cur;
  72. if (gr_obj) {
  73. while ((cur = ucl_iterate_object(gr_obj, &it, true)) != nullptr) {
  74. if (ucl_object_type(cur) == UCL_STRING) {
  75. auto *gr = (struct rspamd_symbols_group *)
  76. g_hash_table_lookup(task->cfg->groups,
  77. ucl_object_tostring(cur));
  78. if (gr) {
  79. GHashTableIter gr_it;
  80. void *k, *v;
  81. g_hash_table_iter_init(&gr_it, gr->symbols);
  82. while (g_hash_table_iter_next(&gr_it, &k, &v)) {
  83. functor((const char *) k);
  84. }
  85. }
  86. }
  87. }
  88. }
  89. };
  90. ucl_object_iter_t it = nullptr;
  91. const ucl_object_t *cur;
  92. const auto *enabled = ucl_object_lookup(task->settings, "symbols_enabled");
  93. if (enabled) {
  94. msg_debug_cache_task("disable all symbols as `symbols_enabled` is found");
  95. /* Disable all symbols but selected */
  96. disable_all_symbols(SYMBOL_TYPE_EXPLICIT_DISABLE);
  97. already_disabled = true;
  98. it = nullptr;
  99. while ((cur = ucl_iterate_object(enabled, &it, true)) != nullptr) {
  100. enable_symbol(task, cache, ucl_object_tostring(cur));
  101. }
  102. }
  103. /* Enable groups of symbols */
  104. enabled = ucl_object_lookup(task->settings, "groups_enabled");
  105. if (enabled && !already_disabled) {
  106. disable_all_symbols(SYMBOL_TYPE_EXPLICIT_DISABLE);
  107. }
  108. process_group(enabled, [&](const char *sym) {
  109. enable_symbol(task, cache, sym);
  110. });
  111. const auto *disabled = ucl_object_lookup(task->settings, "symbols_disabled");
  112. if (disabled) {
  113. it = nullptr;
  114. while ((cur = ucl_iterate_object(disabled, &it, true)) != nullptr) {
  115. disable_symbol(task, cache, ucl_object_tostring(cur));
  116. }
  117. }
  118. /* Disable groups of symbols */
  119. disabled = ucl_object_lookup(task->settings, "groups_disabled");
  120. process_group(disabled, [&](const char *sym) {
  121. disable_symbol(task, cache, sym);
  122. });
  123. /* Update required limit */
  124. lim = rspamd_task_get_required_score(task, task->result);
  125. return false;
  126. }
  127. auto symcache_runtime::disable_all_symbols(int skip_mask) -> void
  128. {
  129. for (auto [i, item]: rspamd::enumerate(order->d)) {
  130. auto *dyn_item = &dynamic_items[i];
  131. if (!(item->get_flags() & skip_mask)) {
  132. dyn_item->finished = true;
  133. dyn_item->started = true;
  134. }
  135. }
  136. }
  137. auto symcache_runtime::disable_symbol(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool
  138. {
  139. const auto *item = cache.get_item_by_name(name, true);
  140. if (item != nullptr) {
  141. auto *dyn_item = get_dynamic_item(item->id);
  142. if (dyn_item) {
  143. dyn_item->finished = true;
  144. dyn_item->started = true;
  145. msg_debug_cache_task("disable execution of %s", name.data());
  146. return true;
  147. }
  148. else {
  149. msg_debug_cache_task("cannot disable %s: id not found %d", name.data(), item->id);
  150. }
  151. }
  152. else {
  153. msg_debug_cache_task("cannot disable %s: symbol not found", name.data());
  154. }
  155. return false;
  156. }
  157. auto symcache_runtime::enable_symbol(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool
  158. {
  159. const auto *item = cache.get_item_by_name(name, true);
  160. if (item != nullptr) {
  161. auto *dyn_item = get_dynamic_item(item->id);
  162. if (dyn_item) {
  163. dyn_item->finished = false;
  164. dyn_item->started = false;
  165. msg_debug_cache_task("enable execution of %s", name.data());
  166. return true;
  167. }
  168. else {
  169. msg_debug_cache_task("cannot enable %s: id not found %d", name.data(), item->id);
  170. }
  171. }
  172. else {
  173. msg_debug_cache_task("cannot enable %s: symbol not found", name.data());
  174. }
  175. return false;
  176. }
  177. auto symcache_runtime::is_symbol_checked(const symcache &cache, std::string_view name) -> bool
  178. {
  179. const auto *item = cache.get_item_by_name(name, true);
  180. if (item != nullptr) {
  181. auto *dyn_item = get_dynamic_item(item->id);
  182. if (dyn_item) {
  183. return dyn_item->started;
  184. }
  185. }
  186. return false;
  187. }
  188. auto symcache_runtime::is_symbol_enabled(struct rspamd_task *task, const symcache &cache, std::string_view name) -> bool
  189. {
  190. const auto *item = cache.get_item_by_name(name, true);
  191. if (item) {
  192. if (!item->is_allowed(task, true)) {
  193. return false;
  194. }
  195. else {
  196. auto *dyn_item = get_dynamic_item(item->id);
  197. if (dyn_item) {
  198. if (dyn_item->started) {
  199. /* Already started */
  200. return false;
  201. }
  202. if (!item->is_virtual()) {
  203. return std::get<normal_item>(item->specific).check_conditions(item->symbol, task);
  204. }
  205. }
  206. else {
  207. /* Unknown item */
  208. msg_debug_cache_task("cannot enable %s: symbol not found", name.data());
  209. }
  210. }
  211. }
  212. return true;
  213. }
  214. auto symcache_runtime::get_dynamic_item(int id) const -> cache_dynamic_item *
  215. {
  216. /* Not found in the cache, do a hash lookup */
  217. auto our_id_maybe = rspamd::find_map(order->by_cache_id, id);
  218. if (our_id_maybe) {
  219. return &dynamic_items[our_id_maybe.value()];
  220. }
  221. return nullptr;
  222. }
  223. auto symcache_runtime::process_symbols(struct rspamd_task *task, symcache &cache, unsigned int stage) -> bool
  224. {
  225. msg_debug_cache_task("symbols processing stage at pass: %d", stage);
  226. if (RSPAMD_TASK_IS_SKIPPED(task)) {
  227. return true;
  228. }
  229. switch (stage) {
  230. case RSPAMD_TASK_STAGE_CONNFILTERS:
  231. case RSPAMD_TASK_STAGE_PRE_FILTERS:
  232. case RSPAMD_TASK_STAGE_POST_FILTERS:
  233. case RSPAMD_TASK_STAGE_IDEMPOTENT:
  234. return process_pre_postfilters(task, cache,
  235. rspamd_session_events_pending(task->s), stage);
  236. break;
  237. case RSPAMD_TASK_STAGE_FILTERS:
  238. return process_filters(task, cache, rspamd_session_events_pending(task->s));
  239. break;
  240. default:
  241. g_assert_not_reached();
  242. }
  243. }
  244. auto symcache_runtime::process_pre_postfilters(struct rspamd_task *task,
  245. symcache &cache,
  246. int start_events,
  247. unsigned int stage) -> bool
  248. {
  249. auto saved_priority = std::numeric_limits<int>::min();
  250. auto all_done = true;
  251. auto log_func = RSPAMD_LOG_FUNC;
  252. auto compare_functor = +[](int a, int b) { return a < b; };
  253. auto proc_func = [&](cache_item *item) {
  254. /*
  255. * We can safely ignore all pre/postfilters except idempotent ones and
  256. * those that are marked as ignore passthrough result
  257. */
  258. if (stage != RSPAMD_TASK_STAGE_IDEMPOTENT &&
  259. !(item->flags & SYMBOL_TYPE_IGNORE_PASSTHROUGH)) {
  260. if (check_metric_limit(task)) {
  261. msg_debug_cache_task_lambda("task has already the result being set, ignore further checks");
  262. return true;
  263. }
  264. }
  265. auto dyn_item = get_dynamic_item(item->id);
  266. if (!dyn_item->started && !dyn_item->finished) {
  267. if (has_slow) {
  268. /* Delay */
  269. has_slow = false;
  270. return false;
  271. }
  272. if (saved_priority == std::numeric_limits<int>::min()) {
  273. saved_priority = item->priority;
  274. }
  275. else {
  276. if (compare_functor(item->priority, saved_priority) &&
  277. rspamd_session_events_pending(task->s) > start_events) {
  278. /*
  279. * Delay further checks as we have higher
  280. * priority filters to be processed
  281. */
  282. return false;
  283. }
  284. }
  285. return process_symbol(task, cache, item, dyn_item);
  286. }
  287. /* Continue processing */
  288. return true;
  289. };
  290. switch (stage) {
  291. case RSPAMD_TASK_STAGE_CONNFILTERS:
  292. all_done = cache.connfilters_foreach(proc_func);
  293. break;
  294. case RSPAMD_TASK_STAGE_PRE_FILTERS:
  295. all_done = cache.prefilters_foreach(proc_func);
  296. break;
  297. case RSPAMD_TASK_STAGE_POST_FILTERS:
  298. compare_functor = +[](int a, int b) { return a > b; };
  299. all_done = cache.postfilters_foreach(proc_func);
  300. break;
  301. case RSPAMD_TASK_STAGE_IDEMPOTENT:
  302. compare_functor = +[](int a, int b) { return a > b; };
  303. all_done = cache.idempotent_foreach(proc_func);
  304. break;
  305. default:
  306. g_error("invalid invocation");
  307. break;
  308. }
  309. return all_done;
  310. }
  311. auto symcache_runtime::process_filters(struct rspamd_task *task, symcache &cache, int start_events) -> bool
  312. {
  313. auto all_done = true;
  314. auto log_func = RSPAMD_LOG_FUNC;
  315. auto has_passtrough = false;
  316. for (const auto [idx, item]: rspamd::enumerate(order->d)) {
  317. /* Exclude all non filters */
  318. if (item->type != symcache_item_type::FILTER) {
  319. /*
  320. * We use breaking the loop as we append non-filters to the end of the list
  321. * so, it is safe to stop processing immediately
  322. */
  323. break;
  324. }
  325. if (!(item->flags & (SYMBOL_TYPE_FINE | SYMBOL_TYPE_IGNORE_PASSTHROUGH))) {
  326. if (has_passtrough || check_metric_limit(task)) {
  327. msg_debug_cache_task_lambda("task has already the result being set, ignore further checks");
  328. has_passtrough = true;
  329. /* Skip this item */
  330. continue;
  331. }
  332. }
  333. auto dyn_item = &dynamic_items[idx];
  334. if (!dyn_item->started) {
  335. all_done = false;
  336. if (!check_item_deps(task, cache, item.get(),
  337. dyn_item, false)) {
  338. msg_debug_cache_task("blocked execution of %d(%s) unless deps are "
  339. "resolved",
  340. item->id, item->symbol.c_str());
  341. continue;
  342. }
  343. process_symbol(task, cache, item.get(), dyn_item);
  344. if (has_slow) {
  345. /* Delay */
  346. has_slow = false;
  347. return false;
  348. }
  349. }
  350. }
  351. return all_done;
  352. }
  353. auto symcache_runtime::process_symbol(struct rspamd_task *task, symcache &cache, cache_item *item,
  354. cache_dynamic_item *dyn_item) -> bool
  355. {
  356. if (item->type == symcache_item_type::CLASSIFIER || item->type == symcache_item_type::COMPOSITE) {
  357. /* Classifiers are special :( */
  358. return true;
  359. }
  360. if (rspamd_session_blocked(task->s)) {
  361. /*
  362. * We cannot add new events as session is either destroyed or
  363. * being cleaned up.
  364. */
  365. return true;
  366. }
  367. g_assert(!item->is_virtual());
  368. if (dyn_item->started) {
  369. /*
  370. * This can actually happen when deps span over different layers
  371. */
  372. return dyn_item->finished;
  373. }
  374. /* Check has been started */
  375. dyn_item->started = true;
  376. auto check = true;
  377. if (!item->is_allowed(task, true) || !item->check_conditions(task)) {
  378. check = false;
  379. }
  380. if (check) {
  381. msg_debug_cache_task("execute %s, %d; symbol type = %s", item->symbol.data(),
  382. item->id, item_type_to_str(item->type));
  383. if (profile) {
  384. ev_now_update_if_cheap(task->event_loop);
  385. dyn_item->start_msec = (ev_now(task->event_loop) -
  386. profile_start) *
  387. 1e3;
  388. }
  389. dyn_item->async_events = 0;
  390. cur_item = dyn_item;
  391. items_inflight++;
  392. /* Callback now must finalize itself */
  393. item->call(task, dyn_item);
  394. cur_item = nullptr;
  395. if (items_inflight == 0) {
  396. return true;
  397. }
  398. if (dyn_item->async_events == 0 && !dyn_item->finished) {
  399. msg_err_cache_task("critical error: item %s has no async events pending, "
  400. "but it is not finalised",
  401. item->symbol.data());
  402. g_assert_not_reached();
  403. }
  404. return false;
  405. }
  406. else {
  407. dyn_item->finished = true;
  408. }
  409. return true;
  410. }
  411. auto symcache_runtime::check_metric_limit(struct rspamd_task *task) -> bool
  412. {
  413. if (task->flags & RSPAMD_TASK_FLAG_PASS_ALL) {
  414. return false;
  415. }
  416. /* Check score limit */
  417. if (!std::isnan(lim)) {
  418. if (task->result->score > lim) {
  419. return true;
  420. }
  421. }
  422. if (task->result->passthrough_result != nullptr) {
  423. /* We also need to check passthrough results */
  424. auto *pr = task->result->passthrough_result;
  425. DL_FOREACH(task->result->passthrough_result, pr)
  426. {
  427. struct rspamd_action_config *act_config =
  428. rspamd_find_action_config_for_action(task->result, pr->action);
  429. /* Skip least results */
  430. if (pr->flags & RSPAMD_PASSTHROUGH_LEAST) {
  431. continue;
  432. }
  433. /* Skip disabled actions */
  434. if (act_config && (act_config->flags & RSPAMD_ACTION_RESULT_DISABLED)) {
  435. continue;
  436. }
  437. /* Immediately stop on non least passthrough action */
  438. return true;
  439. }
  440. }
  441. return false;
  442. }
  443. auto symcache_runtime::check_item_deps(struct rspamd_task *task, symcache &cache, cache_item *item,
  444. cache_dynamic_item *dyn_item, bool check_only) -> bool
  445. {
  446. constexpr const auto max_recursion = 20;
  447. auto log_func = RSPAMD_LOG_FUNC;
  448. auto inner_functor = [&](int recursion, cache_item *item, cache_dynamic_item *dyn_item, auto rec_functor) -> bool {
  449. if (recursion > max_recursion) {
  450. msg_err_task_lambda("cyclic dependencies: maximum check level %ud exceed when "
  451. "checking dependencies for %s",
  452. max_recursion, item->symbol.c_str());
  453. return true;
  454. }
  455. auto ret = true;
  456. for (const auto &dep: item->deps) {
  457. if (!dep.item) {
  458. /* Assume invalid deps as done */
  459. msg_debug_cache_task_lambda("symbol %d(%s) has invalid dependencies on %d(%s)",
  460. item->id, item->symbol.c_str(), dep.id, dep.sym.c_str());
  461. continue;
  462. }
  463. auto *dep_dyn_item = get_dynamic_item(dep.item->id);
  464. if (!dep_dyn_item->finished) {
  465. if (!dep_dyn_item->started) {
  466. /* Not started */
  467. if (!check_only) {
  468. if (!rec_functor(recursion + 1,
  469. dep.item,
  470. dep_dyn_item,
  471. rec_functor)) {
  472. ret = false;
  473. msg_debug_cache_task_lambda("delayed dependency %d(%s) for "
  474. "symbol %d(%s)",
  475. dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
  476. }
  477. else if (!process_symbol(task, cache, dep.item, dep_dyn_item)) {
  478. /* Now started, but has events pending */
  479. ret = false;
  480. msg_debug_cache_task_lambda("started check of %d(%s) symbol "
  481. "as dep for "
  482. "%d(%s)",
  483. dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
  484. }
  485. else {
  486. msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is "
  487. "already processed",
  488. dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
  489. }
  490. }
  491. else {
  492. msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) "
  493. "cannot be started now",
  494. dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
  495. ret = false;
  496. }
  497. }
  498. else {
  499. /* Started but not finished */
  500. msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is "
  501. "still executing",
  502. dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
  503. ret = false;
  504. }
  505. }
  506. else {
  507. msg_debug_cache_task_lambda("dependency %d(%s) for symbol %d(%s) is already "
  508. "checked",
  509. dep.id, dep.sym.c_str(), item->id, item->symbol.c_str());
  510. }
  511. }
  512. return ret;
  513. };
  514. return inner_functor(0, item, dyn_item, inner_functor);
  515. }
  516. struct rspamd_symcache_delayed_cbdata {
  517. cache_item *item;
  518. struct rspamd_task *task;
  519. symcache_runtime *runtime;
  520. struct rspamd_async_event *event;
  521. struct ev_timer tm;
  522. };
  523. static void
  524. rspamd_symcache_delayed_item_fin(gpointer ud)
  525. {
  526. auto *cbd = (struct rspamd_symcache_delayed_cbdata *) ud;
  527. cbd->event = nullptr;
  528. cbd->runtime->unset_slow();
  529. ev_timer_stop(cbd->task->event_loop, &cbd->tm);
  530. }
  531. static void
  532. rspamd_symcache_delayed_item_cb(EV_P_ ev_timer *w, int what)
  533. {
  534. auto *cbd = (struct rspamd_symcache_delayed_cbdata *) w->data;
  535. if (cbd->event) {
  536. cbd->event = nullptr;
  537. /* Timer will be stopped here */
  538. rspamd_session_remove_event(cbd->task->s,
  539. rspamd_symcache_delayed_item_fin, cbd);
  540. cbd->runtime->process_item_rdeps(cbd->task, cbd->item);
  541. }
  542. }
  543. static void
  544. rspamd_delayed_timer_dtor(gpointer d)
  545. {
  546. auto *cbd = (struct rspamd_symcache_delayed_cbdata *) d;
  547. if (cbd->event) {
  548. /* Event has not been executed, this will also stop a timer */
  549. rspamd_session_remove_event(cbd->task->s,
  550. rspamd_symcache_delayed_item_fin, cbd);
  551. cbd->event = nullptr;
  552. }
  553. }
  554. auto symcache_runtime::finalize_item(struct rspamd_task *task, cache_dynamic_item *dyn_item) -> void
  555. {
  556. /* Limit to consider a rule as slow (in milliseconds) */
  557. constexpr const double slow_diff_limit = 300;
  558. auto *item = get_item_by_dynamic_item(dyn_item);
  559. /* Sanity checks */
  560. g_assert(items_inflight > 0);
  561. g_assert(item != nullptr);
  562. if (dyn_item->async_events > 0) {
  563. /*
  564. * XXX: Race condition
  565. *
  566. * It is possible that some async event is still in flight, but we
  567. * already know its result, however, it is the responsibility of that
  568. * event to decrease async events count and call this function
  569. * one more time
  570. */
  571. msg_debug_cache_task("postpone finalisation of %s(%d) as there are %d "
  572. "async events pending",
  573. item->symbol.c_str(), item->id, dyn_item->async_events);
  574. return;
  575. }
  576. msg_debug_cache_task("process finalize for item %s(%d)", item->symbol.c_str(), item->id);
  577. dyn_item->finished = true;
  578. items_inflight--;
  579. cur_item = nullptr;
  580. auto enable_slow_timer = [&]() -> bool {
  581. auto *cbd = rspamd_mempool_alloc0_type(task->task_pool, rspamd_symcache_delayed_cbdata);
  582. /* Add timer to allow something else to be executed */
  583. ev_timer *tm = &cbd->tm;
  584. cbd->event = rspamd_session_add_event(task->s,
  585. rspamd_symcache_delayed_item_fin, cbd,
  586. "symcache");
  587. cbd->runtime = this;
  588. /*
  589. * If no event could be added, then we are already in the destruction
  590. * phase. So the main issue is to deal with has slow here
  591. */
  592. if (cbd->event) {
  593. ev_timer_init(tm, rspamd_symcache_delayed_item_cb, 0.1, 0.0);
  594. ev_set_priority(tm, EV_MINPRI);
  595. rspamd_mempool_add_destructor(task->task_pool,
  596. rspamd_delayed_timer_dtor, cbd);
  597. cbd->task = task;
  598. cbd->item = item;
  599. tm->data = cbd;
  600. ev_timer_start(task->event_loop, tm);
  601. }
  602. else {
  603. /* Just reset as no timer is added */
  604. has_slow = FALSE;
  605. return false;
  606. }
  607. return true;
  608. };
  609. if (profile) {
  610. ev_now_update_if_cheap(task->event_loop);
  611. auto diff = ((ev_now(task->event_loop) - profile_start) * 1e3 -
  612. dyn_item->start_msec);
  613. if (diff > slow_diff_limit) {
  614. if (!has_slow) {
  615. has_slow = true;
  616. msg_info_task("slow rule: %s(%d): %.2f ms; enable slow timer delay",
  617. item->symbol.c_str(), item->id,
  618. diff);
  619. if (enable_slow_timer()) {
  620. /* Allow network execution */
  621. return;
  622. }
  623. }
  624. else {
  625. msg_info_task("slow rule: %s(%d): %.2f ms",
  626. item->symbol.c_str(), item->id,
  627. diff);
  628. }
  629. }
  630. if (G_UNLIKELY(RSPAMD_TASK_IS_PROFILING(task))) {
  631. rspamd_task_profile_set(task, item->symbol.c_str(), diff);
  632. }
  633. if (rspamd_worker_is_scanner(task->worker)) {
  634. rspamd_set_counter(item->cd, diff);
  635. }
  636. }
  637. process_item_rdeps(task, item);
  638. }
  639. auto symcache_runtime::process_item_rdeps(struct rspamd_task *task, cache_item *item) -> void
  640. {
  641. auto *cache_ptr = reinterpret_cast<symcache *>(task->cfg->cache);
  642. // Avoid race condition with the runtime destruction and the delay timer
  643. if (!order) {
  644. return;
  645. }
  646. for (const auto &rdep: item->rdeps) {
  647. if (rdep.item) {
  648. auto *dyn_item = get_dynamic_item(rdep.item->id);
  649. if (!dyn_item->started) {
  650. msg_debug_cache_task("check item %d(%s) rdep of %s ",
  651. rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str());
  652. if (!check_item_deps(task, *cache_ptr, rdep.item, dyn_item, false)) {
  653. msg_debug_cache_task("blocked execution of %d(%s) rdep of %s "
  654. "unless deps are resolved",
  655. rdep.item->id, rdep.item->symbol.c_str(), item->symbol.c_str());
  656. }
  657. else {
  658. process_symbol(task, *cache_ptr, rdep.item,
  659. dyn_item);
  660. }
  661. }
  662. }
  663. }
  664. }
  665. auto symcache_runtime::get_item_by_dynamic_item(cache_dynamic_item *dyn_item) const -> cache_item *
  666. {
  667. auto idx = dyn_item - dynamic_items;
  668. if (idx >= 0 && idx < order->size()) {
  669. return order->d[idx].get();
  670. }
  671. msg_err("internal error: invalid index to get: %d", (int) idx);
  672. return nullptr;
  673. }
  674. }// namespace rspamd::symcache