From 861f1989d3dced229f77f191cc2db85771c3e023 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 10 Jun 2015 11:53:32 -0400 Subject: [PATCH] More fixes to dependencies logic. --- src/libserver/symbols_cache.c | 77 ++++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 15 deletions(-) diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c index 3d7ebb92d..c796976f1 100644 --- a/src/libserver/symbols_cache.c +++ b/src/libserver/symbols_cache.c @@ -92,6 +92,14 @@ struct cache_dependency { gint id; }; +struct cache_savepoint { + guchar *processed_bits; + guint pass; + struct metric_result *rs; + gdouble lim; + GPtrArray *waitq; +}; + /* XXX: Maybe make it configurable */ #define CACHE_RELOAD_TIME 60.0 /* weight, frequency, time */ @@ -102,6 +110,15 @@ struct cache_dependency { * ((f) > 0 ? (f) : FREQ_ALPHA) \ / (t > TIME_ALPHA ? t : TIME_ALPHA)) +static gboolean rspamd_symbols_cache_check_symbol (struct rspamd_task *task, + struct symbols_cache *cache, + struct cache_item *item, + struct cache_savepoint *checkpoint); +static gboolean rspamd_symbols_cache_check_deps (struct rspamd_task *task, + struct symbols_cache *cache, + struct cache_item *item, + struct cache_savepoint *checkpoint); + gint cache_logic_cmp (const void *p1, const void *p2, gpointer ud) { @@ -185,7 +202,7 @@ post_cache_init (struct symbols_cache *cache) if (dit != NULL) { if (dit->parent != -1) { - dit = g_ptr_array_index (cache->items_by_order, dit->parent); + dit = g_ptr_array_index (cache->items_by_id, dit->parent); } rdep = rspamd_mempool_alloc (cache->static_pool, sizeof (*rdep)); @@ -194,9 +211,12 @@ post_cache_init (struct symbols_cache *cache) rdep->id = i; g_ptr_array_add (dit->rdeps, rdep); dep->item = dit; + dep->id = dit->id; + + msg_debug ("add dependency from %d on %d", it->id, dit->id); } else { - msg_warn ("cannot find dependency on symbol %s", dep->sym); + msg_err ("cannot find dependency on symbol %s", dep->sym); } } } @@ -770,15 +790,6 @@ rspamd_symbols_cache_validate (struct symbols_cache *cache, return TRUE; } -struct cache_savepoint { - guchar *processed_bits; - guint pass; - struct metric_result *rs; - gdouble lim; - GPtrArray *waitq; -}; - - static gboolean check_metric_settings (struct rspamd_task *task, struct metric *metric, double *score) @@ -856,11 +867,28 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud) struct rspamd_task *task = sessiond; struct cache_item *item = ud; struct cache_savepoint *checkpoint; + struct symbols_cache *cache; + gint i; checkpoint = task->checkpoint; + cache = task->cfg->cache; /* Specify that we are done with this item */ setbit (checkpoint->processed_bits, item->id * 2 + 1); + + msg_debug ("finished watcher, %ud symbols waiting", checkpoint->waitq->len); + + for (i = 0; i < (gint)checkpoint->waitq->len; i ++) { + item = g_ptr_array_index (checkpoint->waitq, i); + if (!isset (checkpoint->processed_bits, item->id * 2)) { + if (!rspamd_symbols_cache_check_deps (task, cache, item, + checkpoint)) { + continue; + } + + rspamd_symbols_cache_check_symbol (task, cache, item, checkpoint); + } + } } static gboolean @@ -907,8 +935,6 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task, return TRUE; } - /* XXX: add watcher function for each symbol */ - return FALSE; } else { @@ -935,8 +961,8 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, g_assert (dep->item != NULL); - if (!isset (checkpoint->processed_bits, dep->item->id * 2 + 1)) { - if (!isset (checkpoint->processed_bits, dep->item->id * 2)) { + if (!isset (checkpoint->processed_bits, dep->id * 2 + 1)) { + if (!isset (checkpoint->processed_bits, dep->id * 2)) { /* Not started */ if (!rspamd_symbols_cache_check_deps (task, cache, dep->item, @@ -949,13 +975,21 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, checkpoint)) { /* Now started, but has events pending */ ret = FALSE; + msg_debug ("started check of %d symbol as dep for %d", + dep->id, item->id); } + msg_debug ("dependency %d for symbol %d is already processed", + dep->id, item->id); } else { /* Started but not finished */ ret = FALSE; } } + else { + msg_debug ("dependency %d for symbol %d is already checked", + dep->id, item->id); + } } } @@ -1017,6 +1051,8 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, if (!isset (checkpoint->processed_bits, i * 2)) { if (!rspamd_symbols_cache_check_deps (task, cache, item, checkpoint)) { + msg_debug ("blocked execution of %d unless deps are resolved", + item->id); g_ptr_array_add (checkpoint->waitq, item); continue; } @@ -1029,6 +1065,17 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, } else { /* We just go through the blocked symbols and check if they are ready */ + for (i = 0; i < (gint)checkpoint->waitq->len; i ++) { + item = g_ptr_array_index (checkpoint->waitq, i); + if (!isset (checkpoint->processed_bits, item->id * 2)) { + if (!rspamd_symbols_cache_check_deps (task, cache, item, + checkpoint)) { + continue; + } + + rspamd_symbols_cache_check_symbol (task, cache, item, checkpoint); + } + } } return TRUE; -- 2.39.5