lua_State *L;
};
-enum rspamd_cache_savepoint_stage {
- RSPAMD_CACHE_PASS_INIT = 0,
- RSPAMD_CACHE_PASS_PREFILTERS_EMPTY,
- RSPAMD_CACHE_PASS_PREFILTERS,
- RSPAMD_CACHE_PASS_FILTERS,
- RSPAMD_CACHE_PASS_POSTFILTERS,
- RSPAMD_CACHE_PASS_IDEMPOTENT,
- RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT,
- RSPAMD_CACHE_PASS_DONE,
-};
-
struct cache_savepoint {
- enum rspamd_cache_savepoint_stage pass;
guint version;
guint items_inflight;
gboolean profile;
cache->last_profile = now;
}
- checkpoint->pass = RSPAMD_CACHE_PASS_INIT;
task->checkpoint = checkpoint;
-
return checkpoint;
}
struct rspamd_symcache_item *item = NULL;
struct rspamd_symcache_dynamic_item *dyn_item;
struct cache_savepoint *checkpoint;
- GPtrArray *sel;
gint i;
- gboolean all_done;
+ gboolean all_done = TRUE;
gint saved_priority;
- enum rspamd_cache_savepoint_stage next;
- gint next_task_stage;
guint start_events_pending;
g_assert (cache != NULL);
checkpoint = task->checkpoint;
}
- if (stage == RSPAMD_TASK_STAGE_POST_FILTERS && checkpoint->pass <
- RSPAMD_CACHE_PASS_POSTFILTERS) {
- checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
- }
-
- if (stage == RSPAMD_TASK_STAGE_IDEMPOTENT && checkpoint->pass <
- RSPAMD_CACHE_PASS_IDEMPOTENT) {
- checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
- }
-
- msg_debug_cache_task ("symbols processing stage at pass: %d, %d stage requested",
- checkpoint->pass, stage);
+ msg_debug_cache_task ("symbols processing stage at pass: %d");
start_events_pending = rspamd_session_events_pending (task->s);
- switch (checkpoint->pass) {
- case RSPAMD_CACHE_PASS_INIT:
- case RSPAMD_CACHE_PASS_PREFILTERS:
- case RSPAMD_CACHE_PASS_PREFILTERS_EMPTY:
+ switch (stage) {
+ case RSPAMD_TASK_STAGE_PRE_FILTERS_EMPTY:
/* Check for prefilters */
saved_priority = G_MININT;
all_done = TRUE;
- if (checkpoint->pass != RSPAMD_CACHE_PASS_PREFILTERS) {
- sel = cache->prefilters_empty;
- next = RSPAMD_CACHE_PASS_PREFILTERS;
- next_task_stage = RSPAMD_TASK_STAGE_PRE_FILTERS;
- checkpoint->pass = RSPAMD_CACHE_PASS_PREFILTERS_EMPTY;
- }
- else {
- sel = cache->prefilters;
- next = RSPAMD_CACHE_PASS_FILTERS;
- checkpoint->pass = RSPAMD_CACHE_PASS_PREFILTERS;
- next_task_stage = RSPAMD_TASK_STAGE_FILTERS;
- }
-
-
- for (i = 0; i < (gint)sel->len; i ++) {
- item = g_ptr_array_index (sel, i);
+ for (i = 0; i < (gint) cache->prefilters_empty->len; i++) {
+ item = g_ptr_array_index (cache->prefilters_empty, i);
dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
if (RSPAMD_TASK_IS_SKIPPED (task)) {
}
if (!CHECK_START_BIT (checkpoint, dyn_item) &&
- !CHECK_FINISH_BIT (checkpoint, dyn_item)) {
+ !CHECK_FINISH_BIT (checkpoint, dyn_item)) {
/* Check priorities */
if (saved_priority == G_MININT) {
saved_priority = item->priority;
}
else {
if (item->priority < saved_priority &&
- rspamd_session_events_pending (task->s) > start_events_pending) {
+ rspamd_session_events_pending (task->s) > start_events_pending) {
/*
* Delay further checks as we have higher
* priority filters to be processed
*/
- return TRUE;
+ return FALSE;
}
}
}
}
- if (all_done || stage == next_task_stage) {
- checkpoint->pass = next;
- }
+ break;
+
+ case RSPAMD_TASK_STAGE_PRE_FILTERS:
+ /* Check for prefilters */
+ saved_priority = G_MININT;
+ all_done = TRUE;
+
+ for (i = 0; i < (gint) cache->prefilters->len; i++) {
+ item = g_ptr_array_index (cache->prefilters, i);
+ dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
+
+ if (RSPAMD_TASK_IS_SKIPPED (task)) {
+ return TRUE;
+ }
+
+ if (!CHECK_START_BIT (checkpoint, dyn_item) &&
+ !CHECK_FINISH_BIT (checkpoint, dyn_item)) {
+ /* Check priorities */
+ if (saved_priority == G_MININT) {
+ saved_priority = item->priority;
+ }
+ else {
+ if (item->priority < saved_priority &&
+ rspamd_session_events_pending (task->s) > start_events_pending) {
+ /*
+ * Delay further checks as we have higher
+ * priority filters to be processed
+ */
+ return FALSE;
+ }
+ }
- if (stage == next_task_stage) {
- return rspamd_symcache_process_symbols (task, cache, stage);
+ rspamd_symcache_check_symbol (task, cache, item,
+ checkpoint);
+ all_done = FALSE;
+ }
}
break;
- case RSPAMD_CACHE_PASS_FILTERS:
- /*
- * On the first pass we check symbols that do not have dependencies
- * If we figure out symbol that has no dependencies satisfied, then
- * we just save it for another pass
- */
+ case RSPAMD_TASK_STAGE_FILTERS:
all_done = TRUE;
- for (i = 0; i < (gint)checkpoint->version; i ++) {
+ for (i = 0; i < (gint) checkpoint->version; i++) {
if (RSPAMD_TASK_IS_SKIPPED (task)) {
return TRUE;
}
checkpoint, 0, FALSE)) {
msg_debug_cache_task ("blocked execution of %d(%s) unless deps are "
- "resolved",
+ "resolved",
item->id, item->symbol);
continue;
}
}
- if (all_done || stage == RSPAMD_TASK_STAGE_POST_FILTERS) {
- checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
- }
-
- if (stage == RSPAMD_TASK_STAGE_POST_FILTERS) {
-
- return rspamd_symcache_process_symbols (task, cache, stage);
- }
-
break;
- case RSPAMD_CACHE_PASS_POSTFILTERS:
+ case RSPAMD_TASK_STAGE_POST_FILTERS:
/* Check for postfilters */
saved_priority = G_MININT;
all_done = TRUE;
- for (i = 0; i < (gint)cache->postfilters->len; i ++) {
+ for (i = 0; i < (gint) cache->postfilters->len; i++) {
if (RSPAMD_TASK_IS_SKIPPED (task)) {
return TRUE;
}
dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
if (!CHECK_START_BIT (checkpoint, dyn_item) &&
- !CHECK_FINISH_BIT (checkpoint, dyn_item)) {
+ !CHECK_FINISH_BIT (checkpoint, dyn_item)) {
/* Check priorities */
all_done = FALSE;
}
else {
if (item->priority > saved_priority &&
- rspamd_session_events_pending (task->s) > start_events_pending) {
+ rspamd_session_events_pending (task->s) > start_events_pending) {
/*
* Delay further checks as we have higher
* priority filters to be processed
*/
- checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
- return TRUE;
+ return FALSE;
}
}
}
}
- if (all_done) {
- checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
- }
-
- if (checkpoint->items_inflight == 0 ||
- stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
- checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
- }
-
- if (stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
- return rspamd_symcache_process_symbols (task, cache, stage);
- }
-
break;
- case RSPAMD_CACHE_PASS_IDEMPOTENT:
+ case RSPAMD_TASK_STAGE_IDEMPOTENT:
/* Check for postfilters */
saved_priority = G_MININT;
- for (i = 0; i < (gint)cache->idempotent->len; i ++) {
+ for (i = 0; i < (gint) cache->idempotent->len; i++) {
item = g_ptr_array_index (cache->idempotent, i);
dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
if (!CHECK_START_BIT (checkpoint, dyn_item) &&
- !CHECK_FINISH_BIT (checkpoint, dyn_item)) {
+ !CHECK_FINISH_BIT (checkpoint, dyn_item)) {
/* Check priorities */
if (saved_priority == G_MININT) {
saved_priority = item->priority;
}
else {
if (item->priority > saved_priority &&
- rspamd_session_events_pending (task->s) > start_events_pending) {
+ rspamd_session_events_pending (task->s) > start_events_pending) {
/*
* Delay further checks as we have higher
* priority filters to be processed
*/
- checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
-
- return TRUE;
+ return FALSE;
}
}
rspamd_symcache_check_symbol (task, cache, item,
checkpoint);
}
}
- checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT;
- break;
-
- case RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT:
- all_done = TRUE;
-
- for (i = 0; i < (gint)cache->idempotent->len; i ++) {
- item = g_ptr_array_index (cache->idempotent, i);
- dyn_item = rspamd_symcache_get_dynamic (checkpoint, item);
-
- if (!CHECK_FINISH_BIT (checkpoint, dyn_item)) {
- all_done = FALSE;
- break;
- }
- }
-
- if (all_done) {
- checkpoint->pass = RSPAMD_CACHE_PASS_DONE;
-
- return TRUE;
- }
- break;
-
- case RSPAMD_CACHE_PASS_DONE:
- return TRUE;
break;
+ default:
+ g_assert_not_reached ();
}
- return FALSE;
+ return all_done;
}
struct counters_cbdata {
rspamd_task_process (struct rspamd_task *task, guint stages)
{
gint st;
- gboolean ret = TRUE;
+ gboolean ret = TRUE, all_done = TRUE;
GError *stat_error = NULL;
/* Avoid nested calls */
break;
case RSPAMD_TASK_STAGE_PRE_FILTERS_EMPTY:
- rspamd_symcache_process_symbols (task, task->cfg->cache,
- RSPAMD_TASK_STAGE_PRE_FILTERS_EMPTY);
+ case RSPAMD_TASK_STAGE_PRE_FILTERS:
+ case RSPAMD_TASK_STAGE_FILTERS:
+ case RSPAMD_TASK_STAGE_IDEMPOTENT:
+ all_done = rspamd_symcache_process_symbols (task, task->cfg->cache, st);
break;
case RSPAMD_TASK_STAGE_PROCESS_MESSAGE:
}
break;
- case RSPAMD_TASK_STAGE_PRE_FILTERS:
- rspamd_symcache_process_symbols (task, task->cfg->cache,
- RSPAMD_TASK_STAGE_PRE_FILTERS);
- break;
-
- case RSPAMD_TASK_STAGE_FILTERS:
- rspamd_symcache_process_symbols (task, task->cfg->cache,
- RSPAMD_TASK_STAGE_FILTERS);
- break;
-
case RSPAMD_TASK_STAGE_CLASSIFIERS:
case RSPAMD_TASK_STAGE_CLASSIFIERS_PRE:
case RSPAMD_TASK_STAGE_CLASSIFIERS_POST:
break;
case RSPAMD_TASK_STAGE_POST_FILTERS:
- rspamd_symcache_process_symbols (task, task->cfg->cache,
- RSPAMD_TASK_STAGE_POST_FILTERS);
+ all_done = rspamd_symcache_process_symbols (task, task->cfg->cache,
+ st);
- if ((task->flags & RSPAMD_TASK_FLAG_LEARN_AUTO) &&
+ if (all_done && (task->flags & RSPAMD_TASK_FLAG_LEARN_AUTO) &&
!RSPAMD_TASK_IS_EMPTY (task) &&
!(task->flags & (RSPAMD_TASK_FLAG_LEARN_SPAM|RSPAMD_TASK_FLAG_LEARN_HAM))) {
rspamd_stat_check_autolearn (task);
/* Second run of composites processing before idempotent filters */
rspamd_make_composites (task);
break;
- case RSPAMD_TASK_STAGE_IDEMPOTENT:
- rspamd_symcache_process_symbols (task, task->cfg->cache,
- RSPAMD_TASK_STAGE_IDEMPOTENT);
- break;
case RSPAMD_TASK_STAGE_DONE:
task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
return ret;
}
- if (rspamd_session_events_pending (task->s) != 0) {
- /* We have events pending, so we consider this stage as incomplete */
- msg_debug_task ("need more work on stage %d", st);
- }
- else {
- /* Mark the current stage as done and go to the next stage */
- msg_debug_task ("completed stage %d", st);
- task->processed_stages |= st;
+ if (ret) {
+ if (rspamd_session_events_pending (task->s) != 0) {
+ /* We have events pending, so we consider this stage as incomplete */
+ msg_debug_task ("need more work on stage %d", st);
+ }
+ else {
+ if (all_done) {
+ /* Mark the current stage as done and go to the next stage */
+ msg_debug_task ("completed stage %d", st);
+ task->processed_stages |= st;
+ }
+ else {
+ msg_debug_task ("need more processing on stage %d", st);
+ }
- /* Tail recursion */
- return rspamd_task_process (task, stages);
+ /* Tail recursion */
+ return rspamd_task_process (task, stages);
+ }
}
return ret;