From: Vsevolod Stakhov Date: Mon, 19 Aug 2019 11:39:32 +0000 (+0100) Subject: [Rework] Kill old dragons in symcache processing stages X-Git-Tag: 2.0~392 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=b3c005cfcb5bcdd66ec9640afa620a30fa2e0718;p=rspamd.git [Rework] Kill old dragons in symcache processing stages --- diff --git a/src/libserver/rspamd_symcache.c b/src/libserver/rspamd_symcache.c index 71b8a4c6a..a1c3d8d28 100644 --- a/src/libserver/rspamd_symcache.c +++ b/src/libserver/rspamd_symcache.c @@ -209,19 +209,7 @@ struct delayed_cache_condition { lua_State *L; }; -enum rspamd_cache_savepoint_stage { - RSPAMD_CACHE_PASS_INIT = 0, - RSPAMD_CACHE_PASS_PREFILTERS_EMPTY, - RSPAMD_CACHE_PASS_PREFILTERS, - RSPAMD_CACHE_PASS_FILTERS, - RSPAMD_CACHE_PASS_POSTFILTERS, - RSPAMD_CACHE_PASS_IDEMPOTENT, - RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT, - RSPAMD_CACHE_PASS_DONE, -}; - struct cache_savepoint { - enum rspamd_cache_savepoint_stage pass; guint version; guint items_inflight; gboolean profile; @@ -1824,10 +1812,8 @@ rspamd_symcache_make_checkpoint (struct rspamd_task *task, cache->last_profile = now; } - checkpoint->pass = RSPAMD_CACHE_PASS_INIT; task->checkpoint = checkpoint; - return checkpoint; } @@ -1935,12 +1921,9 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, struct rspamd_symcache_item *item = NULL; struct rspamd_symcache_dynamic_item *dyn_item; struct cache_savepoint *checkpoint; - GPtrArray *sel; gint i; - gboolean all_done; + gboolean all_done = TRUE; gint saved_priority; - enum rspamd_cache_savepoint_stage next; - gint next_task_stage; guint start_events_pending; g_assert (cache != NULL); @@ -1953,44 +1936,17 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, checkpoint = task->checkpoint; } - if (stage == RSPAMD_TASK_STAGE_POST_FILTERS && checkpoint->pass < - RSPAMD_CACHE_PASS_POSTFILTERS) { - checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS; - } - - if (stage == RSPAMD_TASK_STAGE_IDEMPOTENT && checkpoint->pass < - RSPAMD_CACHE_PASS_IDEMPOTENT) { - checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT; - } - - msg_debug_cache_task ("symbols processing stage at pass: %d, %d stage requested", - checkpoint->pass, stage); + msg_debug_cache_task ("symbols processing stage at pass: %d"); start_events_pending = rspamd_session_events_pending (task->s); - switch (checkpoint->pass) { - case RSPAMD_CACHE_PASS_INIT: - case RSPAMD_CACHE_PASS_PREFILTERS: - case RSPAMD_CACHE_PASS_PREFILTERS_EMPTY: + switch (stage) { + case RSPAMD_TASK_STAGE_PRE_FILTERS_EMPTY: /* Check for prefilters */ saved_priority = G_MININT; all_done = TRUE; - if (checkpoint->pass != RSPAMD_CACHE_PASS_PREFILTERS) { - sel = cache->prefilters_empty; - next = RSPAMD_CACHE_PASS_PREFILTERS; - next_task_stage = RSPAMD_TASK_STAGE_PRE_FILTERS; - checkpoint->pass = RSPAMD_CACHE_PASS_PREFILTERS_EMPTY; - } - else { - sel = cache->prefilters; - next = RSPAMD_CACHE_PASS_FILTERS; - checkpoint->pass = RSPAMD_CACHE_PASS_PREFILTERS; - next_task_stage = RSPAMD_TASK_STAGE_FILTERS; - } - - - for (i = 0; i < (gint)sel->len; i ++) { - item = g_ptr_array_index (sel, i); + for (i = 0; i < (gint) cache->prefilters_empty->len; i++) { + item = g_ptr_array_index (cache->prefilters_empty, i); dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); if (RSPAMD_TASK_IS_SKIPPED (task)) { @@ -1998,19 +1954,19 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, } if (!CHECK_START_BIT (checkpoint, dyn_item) && - !CHECK_FINISH_BIT (checkpoint, dyn_item)) { + !CHECK_FINISH_BIT (checkpoint, dyn_item)) { /* Check priorities */ if (saved_priority == G_MININT) { saved_priority = item->priority; } else { if (item->priority < saved_priority && - rspamd_session_events_pending (task->s) > start_events_pending) { + rspamd_session_events_pending (task->s) > start_events_pending) { /* * Delay further checks as we have higher * priority filters to be processed */ - return TRUE; + return FALSE; } } @@ -2020,25 +1976,50 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, } } - if (all_done || stage == next_task_stage) { - checkpoint->pass = next; - } + break; + + case RSPAMD_TASK_STAGE_PRE_FILTERS: + /* Check for prefilters */ + saved_priority = G_MININT; + all_done = TRUE; + + for (i = 0; i < (gint) cache->prefilters->len; i++) { + item = g_ptr_array_index (cache->prefilters, i); + dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); + + if (RSPAMD_TASK_IS_SKIPPED (task)) { + return TRUE; + } + + if (!CHECK_START_BIT (checkpoint, dyn_item) && + !CHECK_FINISH_BIT (checkpoint, dyn_item)) { + /* Check priorities */ + if (saved_priority == G_MININT) { + saved_priority = item->priority; + } + else { + if (item->priority < saved_priority && + rspamd_session_events_pending (task->s) > start_events_pending) { + /* + * Delay further checks as we have higher + * priority filters to be processed + */ + return FALSE; + } + } - if (stage == next_task_stage) { - return rspamd_symcache_process_symbols (task, cache, stage); + rspamd_symcache_check_symbol (task, cache, item, + checkpoint); + all_done = FALSE; + } } break; - case RSPAMD_CACHE_PASS_FILTERS: - /* - * On the first pass we check symbols that do not have dependencies - * If we figure out symbol that has no dependencies satisfied, then - * we just save it for another pass - */ + case RSPAMD_TASK_STAGE_FILTERS: all_done = TRUE; - for (i = 0; i < (gint)checkpoint->version; i ++) { + for (i = 0; i < (gint) checkpoint->version; i++) { if (RSPAMD_TASK_IS_SKIPPED (task)) { return TRUE; } @@ -2057,7 +2038,7 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, checkpoint, 0, FALSE)) { msg_debug_cache_task ("blocked execution of %d(%s) unless deps are " - "resolved", + "resolved", item->id, item->symbol); continue; @@ -2079,23 +2060,14 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, } } - if (all_done || stage == RSPAMD_TASK_STAGE_POST_FILTERS) { - checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS; - } - - if (stage == RSPAMD_TASK_STAGE_POST_FILTERS) { - - return rspamd_symcache_process_symbols (task, cache, stage); - } - break; - case RSPAMD_CACHE_PASS_POSTFILTERS: + case RSPAMD_TASK_STAGE_POST_FILTERS: /* Check for postfilters */ saved_priority = G_MININT; all_done = TRUE; - for (i = 0; i < (gint)cache->postfilters->len; i ++) { + for (i = 0; i < (gint) cache->postfilters->len; i++) { if (RSPAMD_TASK_IS_SKIPPED (task)) { return TRUE; } @@ -2104,7 +2076,7 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); if (!CHECK_START_BIT (checkpoint, dyn_item) && - !CHECK_FINISH_BIT (checkpoint, dyn_item)) { + !CHECK_FINISH_BIT (checkpoint, dyn_item)) { /* Check priorities */ all_done = FALSE; @@ -2113,14 +2085,13 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, } else { if (item->priority > saved_priority && - rspamd_session_events_pending (task->s) > start_events_pending) { + rspamd_session_events_pending (task->s) > start_events_pending) { /* * Delay further checks as we have higher * priority filters to be processed */ - checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS; - return TRUE; + return FALSE; } } @@ -2129,80 +2100,42 @@ rspamd_symcache_process_symbols (struct rspamd_task *task, } } - if (all_done) { - checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT; - } - - if (checkpoint->items_inflight == 0 || - stage == RSPAMD_TASK_STAGE_IDEMPOTENT) { - checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT; - } - - if (stage == RSPAMD_TASK_STAGE_IDEMPOTENT) { - return rspamd_symcache_process_symbols (task, cache, stage); - } - break; - case RSPAMD_CACHE_PASS_IDEMPOTENT: + case RSPAMD_TASK_STAGE_IDEMPOTENT: /* Check for postfilters */ saved_priority = G_MININT; - for (i = 0; i < (gint)cache->idempotent->len; i ++) { + for (i = 0; i < (gint) cache->idempotent->len; i++) { item = g_ptr_array_index (cache->idempotent, i); dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); if (!CHECK_START_BIT (checkpoint, dyn_item) && - !CHECK_FINISH_BIT (checkpoint, dyn_item)) { + !CHECK_FINISH_BIT (checkpoint, dyn_item)) { /* Check priorities */ if (saved_priority == G_MININT) { saved_priority = item->priority; } else { if (item->priority > saved_priority && - rspamd_session_events_pending (task->s) > start_events_pending) { + rspamd_session_events_pending (task->s) > start_events_pending) { /* * Delay further checks as we have higher * priority filters to be processed */ - checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT; - - return TRUE; + return FALSE; } } rspamd_symcache_check_symbol (task, cache, item, checkpoint); } } - checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT; - break; - - case RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT: - all_done = TRUE; - - for (i = 0; i < (gint)cache->idempotent->len; i ++) { - item = g_ptr_array_index (cache->idempotent, i); - dyn_item = rspamd_symcache_get_dynamic (checkpoint, item); - - if (!CHECK_FINISH_BIT (checkpoint, dyn_item)) { - all_done = FALSE; - break; - } - } - - if (all_done) { - checkpoint->pass = RSPAMD_CACHE_PASS_DONE; - - return TRUE; - } - break; - - case RSPAMD_CACHE_PASS_DONE: - return TRUE; break; + default: + g_assert_not_reached (); } - return FALSE; + return all_done; } struct counters_cbdata { diff --git a/src/libserver/task.c b/src/libserver/task.c index e9a63cbad..b8e178a7b 100644 --- a/src/libserver/task.c +++ b/src/libserver/task.c @@ -693,7 +693,7 @@ gboolean rspamd_task_process (struct rspamd_task *task, guint stages) { gint st; - gboolean ret = TRUE; + gboolean ret = TRUE, all_done = TRUE; GError *stat_error = NULL; /* Avoid nested calls */ @@ -717,8 +717,10 @@ rspamd_task_process (struct rspamd_task *task, guint stages) break; case RSPAMD_TASK_STAGE_PRE_FILTERS_EMPTY: - rspamd_symcache_process_symbols (task, task->cfg->cache, - RSPAMD_TASK_STAGE_PRE_FILTERS_EMPTY); + case RSPAMD_TASK_STAGE_PRE_FILTERS: + case RSPAMD_TASK_STAGE_FILTERS: + case RSPAMD_TASK_STAGE_IDEMPOTENT: + all_done = rspamd_symcache_process_symbols (task, task->cfg->cache, st); break; case RSPAMD_TASK_STAGE_PROCESS_MESSAGE: @@ -727,16 +729,6 @@ rspamd_task_process (struct rspamd_task *task, guint stages) } break; - case RSPAMD_TASK_STAGE_PRE_FILTERS: - rspamd_symcache_process_symbols (task, task->cfg->cache, - RSPAMD_TASK_STAGE_PRE_FILTERS); - break; - - case RSPAMD_TASK_STAGE_FILTERS: - rspamd_symcache_process_symbols (task, task->cfg->cache, - RSPAMD_TASK_STAGE_FILTERS); - break; - case RSPAMD_TASK_STAGE_CLASSIFIERS: case RSPAMD_TASK_STAGE_CLASSIFIERS_PRE: case RSPAMD_TASK_STAGE_CLASSIFIERS_POST: @@ -754,10 +746,10 @@ rspamd_task_process (struct rspamd_task *task, guint stages) break; case RSPAMD_TASK_STAGE_POST_FILTERS: - rspamd_symcache_process_symbols (task, task->cfg->cache, - RSPAMD_TASK_STAGE_POST_FILTERS); + all_done = rspamd_symcache_process_symbols (task, task->cfg->cache, + st); - if ((task->flags & RSPAMD_TASK_FLAG_LEARN_AUTO) && + if (all_done && (task->flags & RSPAMD_TASK_FLAG_LEARN_AUTO) && !RSPAMD_TASK_IS_EMPTY (task) && !(task->flags & (RSPAMD_TASK_FLAG_LEARN_SPAM|RSPAMD_TASK_FLAG_LEARN_HAM))) { rspamd_stat_check_autolearn (task); @@ -811,10 +803,6 @@ rspamd_task_process (struct rspamd_task *task, guint stages) /* Second run of composites processing before idempotent filters */ rspamd_make_composites (task); break; - case RSPAMD_TASK_STAGE_IDEMPOTENT: - rspamd_symcache_process_symbols (task, task->cfg->cache, - RSPAMD_TASK_STAGE_IDEMPOTENT); - break; case RSPAMD_TASK_STAGE_DONE: task->processed_stages |= RSPAMD_TASK_STAGE_DONE; @@ -843,17 +831,24 @@ rspamd_task_process (struct rspamd_task *task, guint stages) return ret; } - if (rspamd_session_events_pending (task->s) != 0) { - /* We have events pending, so we consider this stage as incomplete */ - msg_debug_task ("need more work on stage %d", st); - } - else { - /* Mark the current stage as done and go to the next stage */ - msg_debug_task ("completed stage %d", st); - task->processed_stages |= st; + if (ret) { + if (rspamd_session_events_pending (task->s) != 0) { + /* We have events pending, so we consider this stage as incomplete */ + msg_debug_task ("need more work on stage %d", st); + } + else { + if (all_done) { + /* Mark the current stage as done and go to the next stage */ + msg_debug_task ("completed stage %d", st); + task->processed_stages |= st; + } + else { + msg_debug_task ("need more processing on stage %d", st); + } - /* Tail recursion */ - return rspamd_task_process (task, stages); + /* Tail recursion */ + return rspamd_task_process (task, stages); + } } return ret;