enum rspamd_cache_savepoint_stage {
RSPAMD_CACHE_PASS_INIT = 0,
RSPAMD_CACHE_PASS_PREFILTERS,
- RSPAMD_CACHE_PASS_WAIT_PREFILTERS,
RSPAMD_CACHE_PASS_FILTERS,
- RSPAMD_CACHE_PASS_WAIT_FILTERS,
RSPAMD_CACHE_PASS_POSTFILTERS,
- RSPAMD_CACHE_PASS_WAIT_POSTFILTERS,
RSPAMD_CACHE_PASS_IDEMPOTENT,
RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT,
RSPAMD_CACHE_PASS_DONE,
if (item->func) {
g_assert (item->func != NULL);
+ g_assert (!isset (checkpoint->processed_bits, item->id * 2));
/* Check has been started */
setbit (checkpoint->processed_bits, item->id * 2);
checkpoint->items_inflight ++;
/* Callback now must finalize itself */
item->func (task, item, item->user_data);
+ checkpoint->cur_item = NULL;
if (checkpoint->items_inflight == 0) {
+
return TRUE;
}
case RSPAMD_CACHE_PASS_PREFILTERS:
/* Check for prefilters */
saved_priority = G_MININT;
+ all_done = TRUE;
for (i = 0; i < (gint)cache->prefilters->len; i ++) {
item = g_ptr_array_index (cache->prefilters, i);
rspamd_symbols_cache_check_symbol (task, cache, item,
checkpoint);
- }
- }
-
- checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_PREFILTERS;
- break;
-
- case RSPAMD_CACHE_PASS_WAIT_PREFILTERS:
- all_done = TRUE;
-
- for (i = 0; i < (gint)cache->prefilters->len; i ++) {
- item = g_ptr_array_index (cache->prefilters, i);
-
- if (!isset (checkpoint->processed_bits, item->id * 2 + 1)) {
all_done = FALSE;
- break;
}
}
if (stage == RSPAMD_TASK_STAGE_FILTERS) {
return rspamd_symbols_cache_process_symbols (task, cache, stage);
}
+
break;
+
case RSPAMD_CACHE_PASS_FILTERS:
/*
* On the first pass we check symbols that do not have dependencies
* If we figure out symbol that has no dependencies satisfied, then
* we just save it for another pass
*/
+ all_done = TRUE;
+
for (i = 0; i < (gint)checkpoint->version; i ++) {
if (RSPAMD_TASK_IS_SKIPPED (task)) {
return TRUE;
}
if (!isset (checkpoint->processed_bits, item->id * 2)) {
+ all_done = FALSE;
+
if (!rspamd_symbols_cache_check_deps (task, cache, item,
checkpoint, 0, FALSE)) {
- gboolean found = FALSE;
- guint j;
- struct rspamd_symcache_item *tmp_it;
msg_debug_cache_task ("blocked execution of %d(%s) unless deps are "
"resolved",
}
}
- checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_FILTERS;
- break;
-
- case RSPAMD_CACHE_PASS_WAIT_FILTERS:
- if (checkpoint->items_inflight == 0 ||
- stage == RSPAMD_TASK_STAGE_POST_FILTERS) {
+ if (all_done || stage == RSPAMD_TASK_STAGE_POST_FILTERS) {
checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
}
case RSPAMD_CACHE_PASS_POSTFILTERS:
/* Check for postfilters */
saved_priority = G_MININT;
+ all_done = TRUE;
for (i = 0; i < (gint)cache->postfilters->len; i ++) {
if (RSPAMD_TASK_IS_SKIPPED (task)) {
if (!isset (checkpoint->processed_bits, item->id * 2) &&
!isset (checkpoint->processed_bits, item->id * 2 + 1)) {
/* Check priorities */
+ all_done = FALSE;
+
if (saved_priority == G_MININT) {
saved_priority = item->priority;
}
checkpoint);
}
}
- checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_POSTFILTERS;
- break;
-
- case RSPAMD_CACHE_PASS_WAIT_POSTFILTERS:
- all_done = TRUE;
-
- for (i = 0; i < (gint)cache->postfilters->len; i ++) {
- item = g_ptr_array_index (cache->postfilters, i);
-
- if (!isset (checkpoint->processed_bits, item->id * 2 + 1)) {
- all_done = FALSE;
- break;
- }
- }
if (all_done) {
checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
struct rspamd_symcache_item *item)
{
struct cache_savepoint *checkpoint = task->checkpoint;
- struct rspamd_symcache_item *rdep;
+ struct cache_dependency *rdep;
gdouble total_ticks = 0, t2, diff;
guint i;
struct timeval tv;
g_assert (item->async_events == 0);
msg_debug_cache_task ("process finalize for item %s", item->symbol);
- setbit (checkpoint->processed_bits, item->id + 1);
+ setbit (checkpoint->processed_bits, item->id * 2 + 1);
checkpoint->items_inflight --;
checkpoint->cur_item = NULL;
/* Process all reverse dependencies */
PTR_ARRAY_FOREACH (item->rdeps, i, rdep) {
- rspamd_symbols_cache_check_symbol (task, task->cfg->cache,
- rdep,
- checkpoint);
+ if (rdep->item) {
+ if (!isset (checkpoint->processed_bits, rdep->item->id * 2)) {
+ rspamd_symbols_cache_check_symbol (task, task->cfg->cache,
+ rdep->item,
+ checkpoint);
+ }
+ }
}
}