gint id;
};
+struct cache_savepoint {
+ guchar *processed_bits;
+ guint pass;
+ struct metric_result *rs;
+ gdouble lim;
+ GPtrArray *waitq;
+};
+
/* XXX: Maybe make it configurable */
#define CACHE_RELOAD_TIME 60.0
/* weight, frequency, time */
* ((f) > 0 ? (f) : FREQ_ALPHA) \
/ (t > TIME_ALPHA ? t : TIME_ALPHA))
+static gboolean rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
+ struct symbols_cache *cache,
+ struct cache_item *item,
+ struct cache_savepoint *checkpoint);
+static gboolean rspamd_symbols_cache_check_deps (struct rspamd_task *task,
+ struct symbols_cache *cache,
+ struct cache_item *item,
+ struct cache_savepoint *checkpoint);
+
gint
cache_logic_cmp (const void *p1, const void *p2, gpointer ud)
{
if (dit != NULL) {
if (dit->parent != -1) {
- dit = g_ptr_array_index (cache->items_by_order, dit->parent);
+ dit = g_ptr_array_index (cache->items_by_id, dit->parent);
}
rdep = rspamd_mempool_alloc (cache->static_pool, sizeof (*rdep));
rdep->id = i;
g_ptr_array_add (dit->rdeps, rdep);
dep->item = dit;
+ dep->id = dit->id;
+
+ msg_debug ("add dependency from %d on %d", it->id, dit->id);
}
else {
- msg_warn ("cannot find dependency on symbol %s", dep->sym);
+ msg_err ("cannot find dependency on symbol %s", dep->sym);
}
}
}
return TRUE;
}
-struct cache_savepoint {
- guchar *processed_bits;
- guint pass;
- struct metric_result *rs;
- gdouble lim;
- GPtrArray *waitq;
-};
-
-
static gboolean
check_metric_settings (struct rspamd_task *task, struct metric *metric,
double *score)
struct rspamd_task *task = sessiond;
struct cache_item *item = ud;
struct cache_savepoint *checkpoint;
+ struct symbols_cache *cache;
+ gint i;
checkpoint = task->checkpoint;
+ cache = task->cfg->cache;
/* Specify that we are done with this item */
setbit (checkpoint->processed_bits, item->id * 2 + 1);
+
+ msg_debug ("finished watcher, %ud symbols waiting", checkpoint->waitq->len);
+
+ for (i = 0; i < (gint)checkpoint->waitq->len; i ++) {
+ item = g_ptr_array_index (checkpoint->waitq, i);
+ if (!isset (checkpoint->processed_bits, item->id * 2)) {
+ if (!rspamd_symbols_cache_check_deps (task, cache, item,
+ checkpoint)) {
+ continue;
+ }
+
+ rspamd_symbols_cache_check_symbol (task, cache, item, checkpoint);
+ }
+ }
}
static gboolean
return TRUE;
}
- /* XXX: add watcher function for each symbol */
-
return FALSE;
}
else {
g_assert (dep->item != NULL);
- if (!isset (checkpoint->processed_bits, dep->item->id * 2 + 1)) {
- if (!isset (checkpoint->processed_bits, dep->item->id * 2)) {
+ if (!isset (checkpoint->processed_bits, dep->id * 2 + 1)) {
+ if (!isset (checkpoint->processed_bits, dep->id * 2)) {
/* Not started */
if (!rspamd_symbols_cache_check_deps (task, cache,
dep->item,
checkpoint)) {
/* Now started, but has events pending */
ret = FALSE;
+ msg_debug ("started check of %d symbol as dep for %d",
+ dep->id, item->id);
}
+ msg_debug ("dependency %d for symbol %d is already processed",
+ dep->id, item->id);
}
else {
/* Started but not finished */
ret = FALSE;
}
}
+ else {
+ msg_debug ("dependency %d for symbol %d is already checked",
+ dep->id, item->id);
+ }
}
}
if (!isset (checkpoint->processed_bits, i * 2)) {
if (!rspamd_symbols_cache_check_deps (task, cache, item,
checkpoint)) {
+ msg_debug ("blocked execution of %d unless deps are resolved",
+ item->id);
g_ptr_array_add (checkpoint->waitq, item);
continue;
}
}
else {
/* We just go through the blocked symbols and check if they are ready */
+ for (i = 0; i < (gint)checkpoint->waitq->len; i ++) {
+ item = g_ptr_array_index (checkpoint->waitq, i);
+ if (!isset (checkpoint->processed_bits, item->id * 2)) {
+ if (!rspamd_symbols_cache_check_deps (task, cache, item,
+ checkpoint)) {
+ continue;
+ }
+
+ rspamd_symbols_cache_check_symbol (task, cache, item, checkpoint);
+ }
+ }
}
return TRUE;