]> source.dussan.org Git - rspamd.git/commitdiff
[CritFix] Fix issue with inconsistent resorting
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sun, 28 Feb 2016 11:58:14 +0000 (11:58 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sun, 28 Feb 2016 11:58:14 +0000 (11:58 +0000)
This is a long term issue that was in rspamd for many years. Sometimes,
some rules are not scanned or scanned twice. That happened because of
resorting called during pending tasks exist in the wait queue. In this
case, resorting breaks logic of symbols processing, causing unprocessed
rules to be treated as processed and, vice versa, processed rules as
unprocessed.

This commit introduces refcounted approach for resorting keeping state
for each individual task. This allows to resort independently from the
existing tasks waiting in the queue.

src/libserver/symbols_cache.c

index 9e68f2a235dbc3265560f00291a3e51617a10126..e02c251d97f0facc7831aaeff3249446c4c2224a 100644 (file)
@@ -49,10 +49,15 @@ struct rspamd_symbols_cache_header {
        guchar unused[128];
 };
 
+struct symbols_cache_order {
+       GPtrArray *d;
+       ref_entry_t ref;
+};
+
 struct symbols_cache {
        /* Hash table for fast access */
        GHashTable *items_by_symbol;
-       GPtrArray *items_by_order;
+       struct symbols_cache_order *items_by_order;
        GPtrArray *items_by_id;
        GList *delayed_deps;
        GList *delayed_conditions;
@@ -125,6 +130,7 @@ struct cache_savepoint {
        struct metric_result *rs;
        gdouble lim;
        GPtrArray *waitq;
+       struct symbols_cache_order *order;
 };
 
 /* XXX: Maybe make it configurable */
@@ -153,6 +159,35 @@ rspamd_symbols_cache_quark (void)
        return g_quark_from_static_string ("symbols-cache");
 }
 
+static void
+rspamd_symbols_cache_order_dtor (gpointer p)
+{
+       struct symbols_cache_order *ord = p;
+
+       g_ptr_array_free (ord->d, TRUE);
+       g_slice_free1 (sizeof (*ord), ord);
+}
+
+static void
+rspamd_symbols_cache_order_unref (gpointer p)
+{
+       struct symbols_cache_order *ord = p;
+
+       REF_RELEASE (ord);
+}
+
+static struct symbols_cache_order *
+rspamd_symbols_cache_order_new (gsize nelts)
+{
+       struct symbols_cache_order *ord;
+
+       ord = g_slice_alloc (sizeof (*ord));
+       ord->d = g_ptr_array_sized_new (nelts);
+       REF_INIT_RETAIN (ord, rspamd_symbols_cache_order_dtor);
+
+       return ord;
+}
+
 static gint
 cache_logic_cmp (const void *p1, const void *p2, gpointer ud)
 {
@@ -217,9 +252,31 @@ rspamd_set_counter (struct cache_item *item, gdouble value)
        return cd->value;
 }
 
+static void
+rspamd_symbols_cache_resort (struct symbols_cache *cache)
+{
+       struct symbols_cache_order *ord;
+       guint i;
+       gpointer p;
+
+       ord = rspamd_symbols_cache_order_new (cache->used_items);
+
+       for (i = 0; i < cache->used_items; i ++) {
+               p = g_ptr_array_index (cache->items_by_id, i);
+               g_ptr_array_add (ord->d, p);
+       }
+
+       g_ptr_array_sort_with_data (ord->d, cache_logic_cmp, cache);
+
+       if (cache->items_by_order) {
+               REF_RELEASE (cache->items_by_order);
+               cache->items_by_order = ord;
+       }
+}
+
 /* Sort items in logical order */
 static void
-post_cache_init (struct symbols_cache *cache)
+rspamd_symbols_cache_post_init (struct symbols_cache *cache)
 {
        struct cache_item *it, *dit;
        struct cache_dependency *dep, *rdep;
@@ -229,7 +286,7 @@ post_cache_init (struct symbols_cache *cache)
        guint i, j;
        gint id;
 
-       g_ptr_array_sort_with_data (cache->items_by_order, cache_logic_cmp, cache);
+       rspamd_symbols_cache_resort (cache);
 
        cur = cache->delayed_deps;
        while (cur) {
@@ -573,7 +630,6 @@ rspamd_symbols_cache_add_symbol (struct symbols_cache *cache,
        msg_debug_cache ("used items: %d, added symbol: %s", cache->used_items, name);
        rspamd_set_counter (item, 0);
        g_ptr_array_add (cache->items_by_id, item);
-       g_ptr_array_add (cache->items_by_order, item);
        item->deps = g_ptr_array_new ();
        item->rdeps = g_ptr_array_new ();
        rspamd_mempool_add_destructor (cache->static_pool,
@@ -691,7 +747,7 @@ rspamd_symbols_cache_destroy (struct symbols_cache *cache)
                g_hash_table_destroy (cache->items_by_symbol);
                rspamd_mempool_delete (cache->static_pool);
                g_ptr_array_free (cache->items_by_id, TRUE);
-               g_ptr_array_free (cache->items_by_order, TRUE);
+               REF_RELEASE (cache->items_by_order);
                g_slice_free1 (sizeof (*cache), cache);
        }
 }
@@ -706,7 +762,6 @@ rspamd_symbols_cache_new (struct rspamd_config *cfg)
                        rspamd_mempool_new (rspamd_mempool_suggest_size (), "symcache");
        cache->items_by_symbol = g_hash_table_new (rspamd_str_hash,
                        rspamd_str_equal);
-       cache->items_by_order = g_ptr_array_new ();
        cache->items_by_id = g_ptr_array_new ();
        cache->mtx = rspamd_mempool_get_mutex (cache->static_pool);
        cache->reload_time = CACHE_RELOAD_TIME;
@@ -727,12 +782,13 @@ rspamd_symbols_cache_init (struct symbols_cache* cache)
 
        /* Just in-memory cache */
        if (cache->cfg->cache_filename == NULL) {
-               post_cache_init (cache);
+               rspamd_symbols_cache_post_init (cache);
                return TRUE;
        }
 
        /* Copy saved cache entries */
        res = rspamd_symbols_cache_load_items (cache, cache->cfg->cache_filename);
+       rspamd_symbols_cache_post_init (cache);
 
        return res;
 }
@@ -905,8 +961,6 @@ rspamd_symbols_cache_validate (struct symbols_cache *cache,
                }
        }
 
-       post_cache_init (cache);
-
        return ret;
 }
 
@@ -1225,7 +1279,12 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                checkpoint->processed_bits = rspamd_mempool_alloc0 (task->task_pool,
                                NBYTES (cache->used_items) * 2);
                checkpoint->waitq = g_ptr_array_new ();
-               checkpoint->version = cache->used_items;
+               g_assert (cache->items_by_order != NULL);
+               checkpoint->version = cache->items_by_order->d->len;
+               checkpoint->order = cache->items_by_order;
+               REF_RETAIN (checkpoint->order);
+               rspamd_mempool_add_destructor (task->task_pool,
+                               rspamd_symbols_cache_order_unref, checkpoint->order);
                rspamd_mempool_add_destructor (task->task_pool,
                                rspamd_ptr_array_free_hard, checkpoint->waitq);
                task->checkpoint = checkpoint;
@@ -1265,7 +1324,8 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                                return TRUE;
                        }
 
-                       item = g_ptr_array_index (cache->items_by_order, i);
+                       item = g_ptr_array_index (checkpoint->order->d, i);
+
                        if (!isset (checkpoint->processed_bits, item->id * 2)) {
                                if (!rspamd_symbols_cache_check_deps (task, cache, item,
                                                checkpoint)) {
@@ -1395,7 +1455,7 @@ rspamd_symbols_cache_counters (struct symbols_cache * cache)
        top = ucl_object_typed_new (UCL_ARRAY);
        cbd.top = top;
        cbd.cache = cache;
-       g_ptr_array_foreach (cache->items_by_order,
+       g_ptr_array_foreach (cache->items_by_order->d,
                        rspamd_symbols_cache_counters_cb, &cbd);
 
        return top;
@@ -1421,8 +1481,8 @@ rspamd_symbols_cache_resort_cb (gint fd, short what, gpointer ud)
        rspamd_mempool_lock_mutex (cache->mtx);
 
        /* Gather stats from shared execution times */
-       for (i = 0; i < cache->items_by_order->len; i ++) {
-               item = g_ptr_array_index (cache->items_by_order, i);
+       for (i = 0; i < cache->items_by_id->len; i ++) {
+               item = g_ptr_array_index (cache->items_by_id, i);
 
                if (item->type & (SYMBOL_TYPE_CALLBACK|SYMBOL_TYPE_NORMAL)) {
                        if (item->cd->number > 0) {
@@ -1451,7 +1511,7 @@ rspamd_symbols_cache_resort_cb (gint fd, short what, gpointer ud)
 
        rspamd_mempool_unlock_mutex (cache->mtx);
 
-       g_ptr_array_sort_with_data (cache->items_by_order, cache_logic_cmp, cache);
+       rspamd_symbols_cache_resort (cache);
 }
 
 void