summaryrefslogtreecommitdiffstats
path: root/src/libserver/symbols_cache.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-10-20 14:48:06 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-10-20 19:43:32 +0100
commit1332f06a95c874480cfd155befe8ba840871b28d (patch)
treeb1265930c015a5d3bcf58d6ccb1c16b9916ad4ba /src/libserver/symbols_cache.c
parentb849942bc86cf57d4ae4e57a676ed0f6a057cad2 (diff)
downloadrspamd-1332f06a95c874480cfd155befe8ba840871b28d.tar.gz
rspamd-1332f06a95c874480cfd155befe8ba840871b28d.zip
[Project] Further cleanup from the watchers
Diffstat (limited to 'src/libserver/symbols_cache.c')
-rw-r--r--src/libserver/symbols_cache.c79
1 files changed, 5 insertions, 74 deletions
diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c
index 216f8aabf..5d30f46dc 100644
--- a/src/libserver/symbols_cache.c
+++ b/src/libserver/symbols_cache.c
@@ -1293,51 +1293,6 @@ rspamd_symbols_cache_metric_limit (struct rspamd_task *task,
return FALSE;
}
-static void
-rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
-{
- struct rspamd_task *task = sessiond;
- struct rspamd_symcache_item *item = ud, *it;
- struct cache_savepoint *checkpoint;
- struct symbols_cache *cache;
- gint i, remain = 0;
-
- checkpoint = task->checkpoint;
- cache = task->cfg->cache;
-
- /* Specify that we are done with this item */
- setbit (checkpoint->processed_bits, item->id * 2 + 1);
-
- if (checkpoint->pass > 0) {
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
- event_base_update_cache_time (task->ev_base);
-#endif
- for (i = 0; i < (gint)checkpoint->waitq->len; i ++) {
- it = g_ptr_array_index (checkpoint->waitq, i);
-
- if (!isset (checkpoint->processed_bits, it->id * 2)) {
- if (!rspamd_symbols_cache_check_deps (task, cache, it,
- checkpoint, 0, TRUE)) {
- remain ++;
- }
- else {
- msg_debug_cache_task ("watcher for %d(%s), unblocked item %d(%s)",
- item->id,
- item->symbol,
- it->id,
- it->symbol);
- rspamd_symbols_cache_check_symbol (task, cache, it,
- checkpoint);
- }
- }
- }
- }
-
- msg_debug_cache_task ("finished watcher for %d(%s), %ud symbols waiting",
- item->id, item->symbol,
- remain);
-}
-
static gboolean
rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
struct symbols_cache *cache,
@@ -1391,10 +1346,11 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
#endif
item->start_ticks = t1;
item->async_events = 0;
+ g_assert (checkpoint->cur_item == NULL);
+ checkpoint->cur_item = item;
checkpoint->items_inflight ++;
/* Callback now must finalize itself */
item->func (task, item, item->user_data);
- rspamd_session_watch_stop (task->s);
if (checkpoint->items_inflight == 0) {
return TRUE;
@@ -1778,17 +1734,6 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
"resolved",
item->id, item->symbol);
- PTR_ARRAY_FOREACH (checkpoint->waitq, j, tmp_it) {
- if (item->id == tmp_it->id) {
- found = TRUE;
- break;
- }
- }
-
- if (!found) {
- g_ptr_array_add (checkpoint->waitq, item);
- }
-
continue;
}
@@ -1801,22 +1746,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
break;
case RSPAMD_CACHE_PASS_WAIT_FILTERS:
- /* We just go through the blocked symbols and check if they are ready */
- for (i = 0; i < (gint)checkpoint->waitq->len; i ++) {
- item = g_ptr_array_index (checkpoint->waitq, i);
-
- if (!isset (checkpoint->processed_bits, item->id * 2)) {
- if (!rspamd_symbols_cache_check_deps (task, cache, item,
- checkpoint, 0, FALSE)) {
- break;
- }
-
- rspamd_symbols_cache_check_symbol (task, cache, item,
- checkpoint);
- }
- }
-
- if (checkpoint->waitq->len == 0 ||
+ if (checkpoint->items_inflight == 0 ||
stage == RSPAMD_TASK_STAGE_POST_FILTERS) {
checkpoint->pass = RSPAMD_CACHE_PASS_POSTFILTERS;
}
@@ -1879,7 +1809,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
}
- if (checkpoint->waitq->len == 0 ||
+ if (checkpoint->items_inflight == 0 ||
stage == RSPAMD_TASK_STAGE_IDEMPOTENT) {
checkpoint->pass = RSPAMD_CACHE_PASS_IDEMPOTENT;
}
@@ -2644,6 +2574,7 @@ rspamd_symbols_cache_finalize_item (struct rspamd_task *task,
msg_debug_cache_task ("process finalize for item %s", item->symbol);
setbit (checkpoint->processed_bits, item->id + 1);
checkpoint->items_inflight --;
+ checkpoint->cur_item = NULL;
#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
event_base_update_cache_time (task->ev_base);