Browse Source

[Project] Remove unneeded phases, add safe guards, fix issues

tags/1.8.2
Vsevolod Stakhov 5 years ago
parent
commit
42a4525a65
1 changed files with 23 additions and 45 deletions
  1. 23
    45
      src/libserver/symbols_cache.c

+ 23
- 45
src/libserver/symbols_cache.c View File

@@ -155,11 +155,8 @@ struct delayed_cache_condition {
enum rspamd_cache_savepoint_stage {
RSPAMD_CACHE_PASS_INIT = 0,
RSPAMD_CACHE_PASS_PREFILTERS,
RSPAMD_CACHE_PASS_WAIT_PREFILTERS,
RSPAMD_CACHE_PASS_FILTERS,
RSPAMD_CACHE_PASS_WAIT_FILTERS,
RSPAMD_CACHE_PASS_POSTFILTERS,
RSPAMD_CACHE_PASS_WAIT_POSTFILTERS,
RSPAMD_CACHE_PASS_IDEMPOTENT,
RSPAMD_CACHE_PASS_WAIT_IDEMPOTENT,
RSPAMD_CACHE_PASS_DONE,
@@ -1307,6 +1304,7 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
if (item->func) {

g_assert (item->func != NULL);
g_assert (!isset (checkpoint->processed_bits, item->id * 2));
/* Check has been started */
setbit (checkpoint->processed_bits, item->id * 2);

@@ -1351,8 +1349,10 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
checkpoint->items_inflight ++;
/* Callback now must finalize itself */
item->func (task, item, item->user_data);
checkpoint->cur_item = NULL;

if (checkpoint->items_inflight == 0) {

return TRUE;
}

@@ -1641,6 +1641,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
case RSPAMD_CACHE_PASS_PREFILTERS:
/* Check for prefilters */
saved_priority = G_MININT;
all_done = TRUE;

for (i = 0; i < (gint)cache->prefilters->len; i ++) {
item = g_ptr_array_index (cache->prefilters, i);
@@ -1669,21 +1670,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,

rspamd_symbols_cache_check_symbol (task, cache, item,
checkpoint);
}
}

checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_PREFILTERS;
break;

case RSPAMD_CACHE_PASS_WAIT_PREFILTERS:
all_done = TRUE;

for (i = 0; i < (gint)cache->prefilters->len; i ++) {
item = g_ptr_array_index (cache->prefilters, i);

if (!isset (checkpoint->processed_bits, item->id * 2 + 1)) {
all_done = FALSE;
break;
}
}

@@ -1694,13 +1681,17 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
if (stage == RSPAMD_TASK_STAGE_FILTERS) {
return rspamd_symbols_cache_process_symbols (task, cache, stage);
}

break;

case RSPAMD_CACHE_PASS_FILTERS:
/*
* On the first pass we check symbols that do not have dependencies
* If we figure out symbol that has no dependencies satisfied, then
* we just save it for another pass
*/
all_done = TRUE;

for (i = 0; i < (gint)checkpoint->version; i ++) {
if (RSPAMD_TASK_IS_SKIPPED (task)) {
return TRUE;
@@ -1724,11 +1715,10 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
}

if (!isset (checkpoint->processed_bits, item->id * 2)) {
all_done = FALSE;

if (!rspamd_symbols_cache_check_deps (task, cache, item,
checkpoint, 0, FALSE)) {
gboolean found = FALSE;
guint j;
struct rspamd_symcache_item *tmp_it;

msg_debug_cache_task ("blocked execution of %d(%s) unless deps are "
"resolved",
@@ -1742,12 +1732,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
}
}

checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_FILTERS;
break;

case RSPAMD_CACHE_PASS_WAIT_FILTERS:
if (checkpoint->items_inflight == 0 ||
stage == RSPAMD_TASK_STAGE_POST_FILTERS) {
if (all_done || stage == RSPAMD_TASK_STAGE_POST_FILTERS) {
checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
}

@@ -1760,6 +1745,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
case RSPAMD_CACHE_PASS_POSTFILTERS:
/* Check for postfilters */
saved_priority = G_MININT;
all_done = TRUE;

for (i = 0; i < (gint)cache->postfilters->len; i ++) {
if (RSPAMD_TASK_IS_SKIPPED (task)) {
@@ -1771,6 +1757,8 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
if (!isset (checkpoint->processed_bits, item->id * 2) &&
!isset (checkpoint->processed_bits, item->id * 2 + 1)) {
/* Check priorities */
all_done = FALSE;

if (saved_priority == G_MININT) {
saved_priority = item->priority;
}
@@ -1790,20 +1778,6 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
checkpoint);
}
}
checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_POSTFILTERS;
break;

case RSPAMD_CACHE_PASS_WAIT_POSTFILTERS:
all_done = TRUE;

for (i = 0; i < (gint)cache->postfilters->len; i ++) {
item = g_ptr_array_index (cache->postfilters, i);

if (!isset (checkpoint->processed_bits, item->id * 2 + 1)) {
all_done = FALSE;
break;
}
}

if (all_done) {
checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
@@ -2561,7 +2535,7 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
struct rspamd_symcache_item *item)
{
struct cache_savepoint *checkpoint = task->checkpoint;
struct rspamd_symcache_item *rdep;
struct cache_dependency *rdep;
gdouble total_ticks = 0, t2, diff;
guint i;
struct timeval tv;
@@ -2572,7 +2546,7 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
g_assert (item->async_events == 0);

msg_debug_cache_task ("process finalize for item %s", item->symbol);
setbit (checkpoint->processed_bits, item->id + 1);
setbit (checkpoint->processed_bits, item->id * 2 + 1);
checkpoint->items_inflight --;
checkpoint->cur_item = NULL;

@@ -2603,9 +2577,13 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task,

/* Process all reverse dependencies */
PTR_ARRAY_FOREACH (item->rdeps, i, rdep) {
rspamd_symbols_cache_check_symbol (task, task->cfg->cache,
rdep,
checkpoint);
if (rdep->item) {
if (!isset (checkpoint->processed_bits, rdep->item->id * 2)) {
rspamd_symbols_cache_check_symbol (task, task->cfg->cache,
rdep->item,
checkpoint);
}
}
}
}


Loading…
Cancel
Save