aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-06-10 11:53:32 -0400
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-06-10 11:53:32 -0400
commit861f1989d3dced229f77f191cc2db85771c3e023 (patch)
treeaee95fbb8b46467aff211f5020d06e1abd538b8a /src
parent5c9bb7ee3f406114bc518d73913766db94b50375 (diff)
downloadrspamd-861f1989d3dced229f77f191cc2db85771c3e023.tar.gz
rspamd-861f1989d3dced229f77f191cc2db85771c3e023.zip
More fixes to dependencies logic.
Diffstat (limited to 'src')
-rw-r--r--src/libserver/symbols_cache.c77
1 files changed, 62 insertions, 15 deletions
diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c
index 3d7ebb92d..c796976f1 100644
--- a/src/libserver/symbols_cache.c
+++ b/src/libserver/symbols_cache.c
@@ -92,6 +92,14 @@ struct cache_dependency {
gint id;
};
+struct cache_savepoint {
+ guchar *processed_bits;
+ guint pass;
+ struct metric_result *rs;
+ gdouble lim;
+ GPtrArray *waitq;
+};
+
/* XXX: Maybe make it configurable */
#define CACHE_RELOAD_TIME 60.0
/* weight, frequency, time */
@@ -102,6 +110,15 @@ struct cache_dependency {
* ((f) > 0 ? (f) : FREQ_ALPHA) \
/ (t > TIME_ALPHA ? t : TIME_ALPHA))
+static gboolean rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
+ struct symbols_cache *cache,
+ struct cache_item *item,
+ struct cache_savepoint *checkpoint);
+static gboolean rspamd_symbols_cache_check_deps (struct rspamd_task *task,
+ struct symbols_cache *cache,
+ struct cache_item *item,
+ struct cache_savepoint *checkpoint);
+
gint
cache_logic_cmp (const void *p1, const void *p2, gpointer ud)
{
@@ -185,7 +202,7 @@ post_cache_init (struct symbols_cache *cache)
if (dit != NULL) {
if (dit->parent != -1) {
- dit = g_ptr_array_index (cache->items_by_order, dit->parent);
+ dit = g_ptr_array_index (cache->items_by_id, dit->parent);
}
rdep = rspamd_mempool_alloc (cache->static_pool, sizeof (*rdep));
@@ -194,9 +211,12 @@ post_cache_init (struct symbols_cache *cache)
rdep->id = i;
g_ptr_array_add (dit->rdeps, rdep);
dep->item = dit;
+ dep->id = dit->id;
+
+ msg_debug ("add dependency from %d on %d", it->id, dit->id);
}
else {
- msg_warn ("cannot find dependency on symbol %s", dep->sym);
+ msg_err ("cannot find dependency on symbol %s", dep->sym);
}
}
}
@@ -770,15 +790,6 @@ rspamd_symbols_cache_validate (struct symbols_cache *cache,
return TRUE;
}
-struct cache_savepoint {
- guchar *processed_bits;
- guint pass;
- struct metric_result *rs;
- gdouble lim;
- GPtrArray *waitq;
-};
-
-
static gboolean
check_metric_settings (struct rspamd_task *task, struct metric *metric,
double *score)
@@ -856,11 +867,28 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
struct rspamd_task *task = sessiond;
struct cache_item *item = ud;
struct cache_savepoint *checkpoint;
+ struct symbols_cache *cache;
+ gint i;
checkpoint = task->checkpoint;
+ cache = task->cfg->cache;
/* Specify that we are done with this item */
setbit (checkpoint->processed_bits, item->id * 2 + 1);
+
+ msg_debug ("finished watcher, %ud symbols waiting", checkpoint->waitq->len);
+
+ for (i = 0; i < (gint)checkpoint->waitq->len; i ++) {
+ item = g_ptr_array_index (checkpoint->waitq, i);
+ if (!isset (checkpoint->processed_bits, item->id * 2)) {
+ if (!rspamd_symbols_cache_check_deps (task, cache, item,
+ checkpoint)) {
+ continue;
+ }
+
+ rspamd_symbols_cache_check_symbol (task, cache, item, checkpoint);
+ }
+ }
}
static gboolean
@@ -907,8 +935,6 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
return TRUE;
}
- /* XXX: add watcher function for each symbol */
-
return FALSE;
}
else {
@@ -935,8 +961,8 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
g_assert (dep->item != NULL);
- if (!isset (checkpoint->processed_bits, dep->item->id * 2 + 1)) {
- if (!isset (checkpoint->processed_bits, dep->item->id * 2)) {
+ if (!isset (checkpoint->processed_bits, dep->id * 2 + 1)) {
+ if (!isset (checkpoint->processed_bits, dep->id * 2)) {
/* Not started */
if (!rspamd_symbols_cache_check_deps (task, cache,
dep->item,
@@ -949,13 +975,21 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
checkpoint)) {
/* Now started, but has events pending */
ret = FALSE;
+ msg_debug ("started check of %d symbol as dep for %d",
+ dep->id, item->id);
}
+ msg_debug ("dependency %d for symbol %d is already processed",
+ dep->id, item->id);
}
else {
/* Started but not finished */
ret = FALSE;
}
}
+ else {
+ msg_debug ("dependency %d for symbol %d is already checked",
+ dep->id, item->id);
+ }
}
}
@@ -1017,6 +1051,8 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
if (!isset (checkpoint->processed_bits, i * 2)) {
if (!rspamd_symbols_cache_check_deps (task, cache, item,
checkpoint)) {
+ msg_debug ("blocked execution of %d unless deps are resolved",
+ item->id);
g_ptr_array_add (checkpoint->waitq, item);
continue;
}
@@ -1029,6 +1065,17 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
}
else {
/* We just go through the blocked symbols and check if they are ready */
+ for (i = 0; i < (gint)checkpoint->waitq->len; i ++) {
+ item = g_ptr_array_index (checkpoint->waitq, i);
+ if (!isset (checkpoint->processed_bits, item->id * 2)) {
+ if (!rspamd_symbols_cache_check_deps (task, cache, item,
+ checkpoint)) {
+ continue;
+ }
+
+ rspamd_symbols_cache_check_symbol (task, cache, item, checkpoint);
+ }
+ }
}
return TRUE;