]> source.dussan.org Git - rspamd.git/commitdiff
More fixes to dependencies logic.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 10 Jun 2015 15:53:32 +0000 (11:53 -0400)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 10 Jun 2015 15:53:32 +0000 (11:53 -0400)
src/libserver/symbols_cache.c

index 3d7ebb92df25a1c064a0085373f2e19ed54306ef..c796976f1b454277646b156ad9159687e2b10cbd 100644 (file)
@@ -92,6 +92,14 @@ struct cache_dependency {
        gint id;
 };
 
+struct cache_savepoint {
+       guchar *processed_bits;
+       guint pass;
+       struct metric_result *rs;
+       gdouble lim;
+       GPtrArray *waitq;
+};
+
 /* XXX: Maybe make it configurable */
 #define CACHE_RELOAD_TIME 60.0
 /* weight, frequency, time */
@@ -102,6 +110,15 @@ struct cache_dependency {
                * ((f) > 0 ? (f) : FREQ_ALPHA) \
                / (t > TIME_ALPHA ? t : TIME_ALPHA))
 
+static gboolean rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
+               struct symbols_cache *cache,
+               struct cache_item *item,
+               struct cache_savepoint *checkpoint);
+static gboolean rspamd_symbols_cache_check_deps (struct rspamd_task *task,
+               struct symbols_cache *cache,
+               struct cache_item *item,
+               struct cache_savepoint *checkpoint);
+
 gint
 cache_logic_cmp (const void *p1, const void *p2, gpointer ud)
 {
@@ -185,7 +202,7 @@ post_cache_init (struct symbols_cache *cache)
 
                        if (dit != NULL) {
                                if (dit->parent != -1) {
-                                       dit = g_ptr_array_index (cache->items_by_order, dit->parent);
+                                       dit = g_ptr_array_index (cache->items_by_id, dit->parent);
                                }
 
                                rdep = rspamd_mempool_alloc (cache->static_pool, sizeof (*rdep));
@@ -194,9 +211,12 @@ post_cache_init (struct symbols_cache *cache)
                                rdep->id = i;
                                g_ptr_array_add (dit->rdeps, rdep);
                                dep->item = dit;
+                               dep->id = dit->id;
+
+                               msg_debug ("add dependency from %d on %d", it->id, dit->id);
                        }
                        else {
-                               msg_warn ("cannot find dependency on symbol %s", dep->sym);
+                               msg_err ("cannot find dependency on symbol %s", dep->sym);
                        }
                }
        }
@@ -770,15 +790,6 @@ rspamd_symbols_cache_validate (struct symbols_cache *cache,
        return TRUE;
 }
 
-struct cache_savepoint {
-       guchar *processed_bits;
-       guint pass;
-       struct metric_result *rs;
-       gdouble lim;
-       GPtrArray *waitq;
-};
-
-
 static gboolean
 check_metric_settings (struct rspamd_task *task, struct metric *metric,
        double *score)
@@ -856,11 +867,28 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
        struct rspamd_task *task = sessiond;
        struct cache_item *item = ud;
        struct cache_savepoint *checkpoint;
+       struct symbols_cache *cache;
+       gint i;
 
        checkpoint = task->checkpoint;
+       cache = task->cfg->cache;
 
        /* Specify that we are done with this item */
        setbit (checkpoint->processed_bits, item->id * 2 + 1);
+
+       msg_debug ("finished watcher, %ud symbols waiting", checkpoint->waitq->len);
+
+       for (i = 0; i < (gint)checkpoint->waitq->len; i ++) {
+               item = g_ptr_array_index (checkpoint->waitq, i);
+               if (!isset (checkpoint->processed_bits, item->id * 2)) {
+                       if (!rspamd_symbols_cache_check_deps (task, cache, item,
+                                       checkpoint)) {
+                               continue;
+                       }
+
+                       rspamd_symbols_cache_check_symbol (task, cache, item, checkpoint);
+               }
+       }
 }
 
 static gboolean
@@ -907,8 +935,6 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
                        return TRUE;
                }
 
-               /* XXX: add watcher function for each symbol */
-
                return FALSE;
        }
        else {
@@ -935,8 +961,8 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
 
                        g_assert (dep->item != NULL);
 
-                       if (!isset (checkpoint->processed_bits, dep->item->id * 2 + 1)) {
-                               if (!isset (checkpoint->processed_bits, dep->item->id * 2)) {
+                       if (!isset (checkpoint->processed_bits, dep->id * 2 + 1)) {
+                               if (!isset (checkpoint->processed_bits, dep->id * 2)) {
                                        /* Not started */
                                        if (!rspamd_symbols_cache_check_deps (task, cache,
                                                        dep->item,
@@ -949,13 +975,21 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
                                                        checkpoint)) {
                                                /* Now started, but has events pending */
                                                ret = FALSE;
+                                               msg_debug ("started check of %d symbol as dep for %d",
+                                                               dep->id, item->id);
                                        }
+                                       msg_debug ("dependency %d for symbol %d is already processed",
+                                                               dep->id, item->id);
                                }
                                else {
                                        /* Started but not finished */
                                        ret = FALSE;
                                }
                        }
+                       else {
+                               msg_debug ("dependency %d for symbol %d is already checked",
+                                               dep->id, item->id);
+                       }
                }
        }
 
@@ -1017,6 +1051,8 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                        if (!isset (checkpoint->processed_bits, i * 2)) {
                                if (!rspamd_symbols_cache_check_deps (task, cache, item,
                                                checkpoint)) {
+                                       msg_debug ("blocked execution of %d unless deps are resolved",
+                                                       item->id);
                                        g_ptr_array_add (checkpoint->waitq, item);
                                        continue;
                                }
@@ -1029,6 +1065,17 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
        }
        else {
                /* We just go through the blocked symbols and check if they are ready */
+               for (i = 0; i < (gint)checkpoint->waitq->len; i ++) {
+                       item = g_ptr_array_index (checkpoint->waitq, i);
+                       if (!isset (checkpoint->processed_bits, item->id * 2)) {
+                               if (!rspamd_symbols_cache_check_deps (task, cache, item,
+                                               checkpoint)) {
+                                       continue;
+                               }
+
+                               rspamd_symbols_cache_check_symbol (task, cache, item, checkpoint);
+                       }
+               }
        }
 
        return TRUE;