aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-02-28 11:58:14 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-02-28 11:58:14 +0000
commitf309a84d17bce0456f369b693ae270288e72892f (patch)
tree4046a179935d8982b58ad14eec927ed40640dfd1 /src/libserver
parent512d64b8f33e09908a574b48fcce4c06bb97e4bd (diff)
downloadrspamd-f309a84d17bce0456f369b693ae270288e72892f.tar.gz
rspamd-f309a84d17bce0456f369b693ae270288e72892f.zip
[CritFix] Fix issue with inconsistent resorting
This is a long term issue that was in rspamd for many years. Sometimes, some rules are not scanned or scanned twice. That happened because of resorting called during pending tasks exist in the wait queue. In this case, resorting breaks logic of symbols processing, causing unprocessed rules to be treated as processed and, vice versa, processed rules as unprocessed. This commit introduces refcounted approach for resorting keeping state for each individual task. This allows to resort independently from the existing tasks waiting in the queue.
Diffstat (limited to 'src/libserver')
-rw-r--r--src/libserver/symbols_cache.c90
1 files changed, 75 insertions, 15 deletions
diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c
index 9e68f2a23..e02c251d9 100644
--- a/src/libserver/symbols_cache.c
+++ b/src/libserver/symbols_cache.c
@@ -49,10 +49,15 @@ struct rspamd_symbols_cache_header {
guchar unused[128];
};
+struct symbols_cache_order {
+ GPtrArray *d;
+ ref_entry_t ref;
+};
+
struct symbols_cache {
/* Hash table for fast access */
GHashTable *items_by_symbol;
- GPtrArray *items_by_order;
+ struct symbols_cache_order *items_by_order;
GPtrArray *items_by_id;
GList *delayed_deps;
GList *delayed_conditions;
@@ -125,6 +130,7 @@ struct cache_savepoint {
struct metric_result *rs;
gdouble lim;
GPtrArray *waitq;
+ struct symbols_cache_order *order;
};
/* XXX: Maybe make it configurable */
@@ -153,6 +159,35 @@ rspamd_symbols_cache_quark (void)
return g_quark_from_static_string ("symbols-cache");
}
+static void
+rspamd_symbols_cache_order_dtor (gpointer p)
+{
+ struct symbols_cache_order *ord = p;
+
+ g_ptr_array_free (ord->d, TRUE);
+ g_slice_free1 (sizeof (*ord), ord);
+}
+
+static void
+rspamd_symbols_cache_order_unref (gpointer p)
+{
+ struct symbols_cache_order *ord = p;
+
+ REF_RELEASE (ord);
+}
+
+static struct symbols_cache_order *
+rspamd_symbols_cache_order_new (gsize nelts)
+{
+ struct symbols_cache_order *ord;
+
+ ord = g_slice_alloc (sizeof (*ord));
+ ord->d = g_ptr_array_sized_new (nelts);
+ REF_INIT_RETAIN (ord, rspamd_symbols_cache_order_dtor);
+
+ return ord;
+}
+
static gint
cache_logic_cmp (const void *p1, const void *p2, gpointer ud)
{
@@ -217,9 +252,31 @@ rspamd_set_counter (struct cache_item *item, gdouble value)
return cd->value;
}
+static void
+rspamd_symbols_cache_resort (struct symbols_cache *cache)
+{
+ struct symbols_cache_order *ord;
+ guint i;
+ gpointer p;
+
+ ord = rspamd_symbols_cache_order_new (cache->used_items);
+
+ for (i = 0; i < cache->used_items; i ++) {
+ p = g_ptr_array_index (cache->items_by_id, i);
+ g_ptr_array_add (ord->d, p);
+ }
+
+ g_ptr_array_sort_with_data (ord->d, cache_logic_cmp, cache);
+
+ if (cache->items_by_order) {
+ REF_RELEASE (cache->items_by_order);
+ cache->items_by_order = ord;
+ }
+}
+
/* Sort items in logical order */
static void
-post_cache_init (struct symbols_cache *cache)
+rspamd_symbols_cache_post_init (struct symbols_cache *cache)
{
struct cache_item *it, *dit;
struct cache_dependency *dep, *rdep;
@@ -229,7 +286,7 @@ post_cache_init (struct symbols_cache *cache)
guint i, j;
gint id;
- g_ptr_array_sort_with_data (cache->items_by_order, cache_logic_cmp, cache);
+ rspamd_symbols_cache_resort (cache);
cur = cache->delayed_deps;
while (cur) {
@@ -573,7 +630,6 @@ rspamd_symbols_cache_add_symbol (struct symbols_cache *cache,
msg_debug_cache ("used items: %d, added symbol: %s", cache->used_items, name);
rspamd_set_counter (item, 0);
g_ptr_array_add (cache->items_by_id, item);
- g_ptr_array_add (cache->items_by_order, item);
item->deps = g_ptr_array_new ();
item->rdeps = g_ptr_array_new ();
rspamd_mempool_add_destructor (cache->static_pool,
@@ -691,7 +747,7 @@ rspamd_symbols_cache_destroy (struct symbols_cache *cache)
g_hash_table_destroy (cache->items_by_symbol);
rspamd_mempool_delete (cache->static_pool);
g_ptr_array_free (cache->items_by_id, TRUE);
- g_ptr_array_free (cache->items_by_order, TRUE);
+ REF_RELEASE (cache->items_by_order);
g_slice_free1 (sizeof (*cache), cache);
}
}
@@ -706,7 +762,6 @@ rspamd_symbols_cache_new (struct rspamd_config *cfg)
rspamd_mempool_new (rspamd_mempool_suggest_size (), "symcache");
cache->items_by_symbol = g_hash_table_new (rspamd_str_hash,
rspamd_str_equal);
- cache->items_by_order = g_ptr_array_new ();
cache->items_by_id = g_ptr_array_new ();
cache->mtx = rspamd_mempool_get_mutex (cache->static_pool);
cache->reload_time = CACHE_RELOAD_TIME;
@@ -727,12 +782,13 @@ rspamd_symbols_cache_init (struct symbols_cache* cache)
/* Just in-memory cache */
if (cache->cfg->cache_filename == NULL) {
- post_cache_init (cache);
+ rspamd_symbols_cache_post_init (cache);
return TRUE;
}
/* Copy saved cache entries */
res = rspamd_symbols_cache_load_items (cache, cache->cfg->cache_filename);
+ rspamd_symbols_cache_post_init (cache);
return res;
}
@@ -905,8 +961,6 @@ rspamd_symbols_cache_validate (struct symbols_cache *cache,
}
}
- post_cache_init (cache);
-
return ret;
}
@@ -1225,7 +1279,12 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
checkpoint->processed_bits = rspamd_mempool_alloc0 (task->task_pool,
NBYTES (cache->used_items) * 2);
checkpoint->waitq = g_ptr_array_new ();
- checkpoint->version = cache->used_items;
+ g_assert (cache->items_by_order != NULL);
+ checkpoint->version = cache->items_by_order->d->len;
+ checkpoint->order = cache->items_by_order;
+ REF_RETAIN (checkpoint->order);
+ rspamd_mempool_add_destructor (task->task_pool,
+ rspamd_symbols_cache_order_unref, checkpoint->order);
rspamd_mempool_add_destructor (task->task_pool,
rspamd_ptr_array_free_hard, checkpoint->waitq);
task->checkpoint = checkpoint;
@@ -1265,7 +1324,8 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
return TRUE;
}
- item = g_ptr_array_index (cache->items_by_order, i);
+ item = g_ptr_array_index (checkpoint->order->d, i);
+
if (!isset (checkpoint->processed_bits, item->id * 2)) {
if (!rspamd_symbols_cache_check_deps (task, cache, item,
checkpoint)) {
@@ -1395,7 +1455,7 @@ rspamd_symbols_cache_counters (struct symbols_cache * cache)
top = ucl_object_typed_new (UCL_ARRAY);
cbd.top = top;
cbd.cache = cache;
- g_ptr_array_foreach (cache->items_by_order,
+ g_ptr_array_foreach (cache->items_by_order->d,
rspamd_symbols_cache_counters_cb, &cbd);
return top;
@@ -1421,8 +1481,8 @@ rspamd_symbols_cache_resort_cb (gint fd, short what, gpointer ud)
rspamd_mempool_lock_mutex (cache->mtx);
/* Gather stats from shared execution times */
- for (i = 0; i < cache->items_by_order->len; i ++) {
- item = g_ptr_array_index (cache->items_by_order, i);
+ for (i = 0; i < cache->items_by_id->len; i ++) {
+ item = g_ptr_array_index (cache->items_by_id, i);
if (item->type & (SYMBOL_TYPE_CALLBACK|SYMBOL_TYPE_NORMAL)) {
if (item->cd->number > 0) {
@@ -1451,7 +1511,7 @@ rspamd_symbols_cache_resort_cb (gint fd, short what, gpointer ud)
rspamd_mempool_unlock_mutex (cache->mtx);
- g_ptr_array_sort_with_data (cache->items_by_order, cache_logic_cmp, cache);
+ rspamd_symbols_cache_resort (cache);
}
void