]> source.dussan.org Git - rspamd.git/commitdiff
Implement cache resorting.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 28 May 2015 15:40:29 +0000 (16:40 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 28 May 2015 15:40:29 +0000 (16:40 +0100)
src/controller.c
src/libserver/symbols_cache.c
src/libserver/symbols_cache.h
src/worker.c

index 8c3b4909177662425cb2007d983fc3f91f85e655..73a563340f7103cfe3c5467a135d67c293d3657c 100644 (file)
@@ -1594,8 +1594,6 @@ rspamd_controller_handle_counters (
 {
        struct rspamd_controller_session *session = conn_ent->ud;
        ucl_object_t *top;
-       GList *cur;
-       struct cache_item *item;
        struct symbols_cache *cache;
 
        if (!rspamd_controller_check_password (conn_ent, session, msg, FALSE)) {
@@ -1932,6 +1930,7 @@ start_controller_worker (struct rspamd_worker *worker)
        rspamd_upstreams_library_config (worker->srv->cfg);
        /* Maps events */
        rspamd_map_watch (worker->srv->cfg, ctx->ev_base);
+       rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base);
 
        event_base_loop (ctx->ev_base, 0);
 
index 232700447e669d2855e86905efd4a2262e185ceb..2ee753fdb50a7318833e31ef533c4153e8984ab9 100644 (file)
 #include "message.h"
 #include "symbols_cache.h"
 #include "cfg_file.h"
-#include "blake2.h"
 
-/* After which number of messages try to resort cache */
-#define MAX_USES 100
 static const guchar rspamd_symbols_cache_magic[8] = {'r', 's', 'c', 1, 0, 0, 0, 0 };
 
 struct rspamd_symbols_cache_header {
        guchar magic[8];
        guint nitems;
-       guchar checksum[BLAKE2B_OUTBYTES];
+       guchar checksum[64];
        guchar unused[128];
 };
 
@@ -50,6 +47,9 @@ struct symbols_cache {
        guint used_items;
        guint uses;
        struct rspamd_config *cfg;
+       rspamd_mempool_mutex_t *mtx;
+       gdouble reload_time;
+       struct event resort_ev;
 };
 
 struct counter_data {
@@ -62,6 +62,7 @@ struct cache_item {
        gdouble avg_time;
        gdouble weight;
        guint32 frequency;
+       guint32 avg_counter;
 
        /* Per process counter */
        struct counter_data *cd;
@@ -80,6 +81,8 @@ struct cache_item {
        gdouble metric_weight;
 };
 
+/* XXX: Maybe make it configurable */
+#define CACHE_RELOAD_TIME 60.0
 /* weight, frequency, time */
 #define TIME_ALPHA (1.0 / 1000000.0)
 #define WEIGHT_ALPHA (0.001)
@@ -232,22 +235,25 @@ rspamd_symbols_cache_load_items (struct symbols_cache *cache, const gchar *name)
                        elt = ucl_object_find_key (cur, "weight");
 
                        if (elt) {
-                               w = ucl_object_todouble (cur);
+                               w = ucl_object_todouble (elt);
                                if (w != 0) {
                                        item->weight = w;
                                }
                        }
 
                        elt = ucl_object_find_key (cur, "time");
+                       if (elt) {
+                               item->avg_time = ucl_object_todouble (elt);
+                       }
 
+                       elt = ucl_object_find_key (cur, "count");
                        if (elt) {
-                               item->avg_time = ucl_object_todouble (cur);
+                               item->avg_counter = ucl_object_toint (elt);
                        }
 
                        elt = ucl_object_find_key (cur, "frequency");
-
                        if (elt) {
-                               item->frequency = ucl_object_toint (cur);
+                               item->frequency = ucl_object_toint (elt);
                        }
                }
        }
@@ -265,6 +271,7 @@ rspamd_symbols_cache_load_items (struct symbols_cache *cache, const gchar *name)
                 * parent item avg_time
                 */
                parent->avg_time = item->avg_time;
+               parent->avg_counter = item->avg_counter;
        }
 
        ucl_object_iterate_free (it);
@@ -315,6 +322,8 @@ rspamd_symbols_cache_save_items (struct symbols_cache *cache, const gchar *name)
                                "weight", 0, false);
                ucl_object_insert_key (elt, ucl_object_fromdouble (item->avg_time),
                                "time", 0, false);
+               ucl_object_insert_key (elt, ucl_object_fromdouble (item->avg_counter),
+                               "count", 0, false);
                ucl_object_insert_key (elt, ucl_object_fromint (item->frequency),
                                "frequency", 0, false);
 
@@ -492,6 +501,8 @@ rspamd_symbols_cache_new (void)
        cache->items_by_symbol = g_hash_table_new (rspamd_str_hash,
                        rspamd_str_equal);
        cache->items_by_order = g_ptr_array_new ();
+       cache->mtx = rspamd_mempool_get_mutex (cache->static_pool);
+       cache->reload_time = CACHE_RELOAD_TIME;
 
        return cache;
 }
@@ -628,7 +639,10 @@ rspamd_symbols_cache_metric_validate_cb (gpointer k, gpointer v, gpointer ud)
 
        if (item) {
                item->metric_weight = weight;
-               item->weight = item->weight * weight;
+
+               if (abs (item->weight) < abs (weight) || weight < 0) {
+                       item->weight = weight;
+               }
        }
 }
 
@@ -787,3 +801,67 @@ rspamd_symbols_cache_counters (struct symbols_cache * cache)
 
        return top;
 }
+
+static void
+rspamd_symbols_cache_resort_cb (gint fd, short what, gpointer ud)
+{
+       struct timeval tv;
+       gdouble tm;
+       struct symbols_cache *cache = ud;
+       struct cache_item *item, *parent;
+       guint i;
+
+       /* Plan new event */
+       tm = rspamd_time_jitter (cache->reload_time, 0);
+       msg_info ("resort symbols cache, next reload in %.2f seconds", tm);
+       g_assert (cache != NULL);
+       evtimer_set (&cache->resort_ev, rspamd_symbols_cache_resort_cb, cache);
+       double_to_tv (tm, &tv);
+       event_add (&cache->resort_ev, &tv);
+
+       rspamd_mempool_lock_mutex (cache->mtx);
+
+       /* Gather stats from shared execution times */
+       for (i = 0; i < cache->items_by_order->len; i ++) {
+               item = g_ptr_array_index (cache->items_by_order, i);
+
+               if (item->type == SYMBOL_TYPE_CALLBACK ||
+                               item->type == SYMBOL_TYPE_NORMAL) {
+                       if (item->cd->number > 0) {
+                               item->avg_counter += item->cd->number + 1;
+                               item->avg_time = item->avg_time +
+                                               (item->cd->value - item->avg_time) /
+                                               item->avg_counter;
+                               item->cd->value = item->avg_time;
+                               item->cd->number = item->avg_counter;
+                       }
+               }
+       }
+       /* Sync virtual symbols */
+       for (i = 0; i < cache->items_by_order->len; i ++) {
+               if (item->parent != -1) {
+                       parent = g_ptr_array_index (cache->items_by_order, item->parent);
+                       item->avg_time = parent->avg_time;
+                       item->avg_counter = parent->avg_counter;
+               }
+       }
+
+       rspamd_mempool_unlock_mutex (cache->mtx);
+
+       post_cache_init (cache);
+}
+
+void
+rspamd_symbols_cache_start_refresh (struct symbols_cache * cache,
+               struct event_base *ev_base)
+{
+       struct timeval tv;
+       gdouble tm;
+
+       tm = rspamd_time_jitter (cache->reload_time, 0);
+       g_assert (cache != NULL);
+       evtimer_set (&cache->resort_ev, rspamd_symbols_cache_resort_cb, cache);
+       event_base_set (ev_base, &cache->resort_ev);
+       double_to_tv (tm, &tv);
+       event_add (&cache->resort_ev, &tv);
+}
index e9690c1d6e4a6cfae1362ca6861d8b08c65886a7..013dc91ed216b4e9465f463ae51ca85c10576f7b 100644 (file)
@@ -157,4 +157,12 @@ gboolean rspamd_symbols_cache_validate (struct symbols_cache *cache,
  */
 ucl_object_t *rspamd_symbols_cache_counters (struct symbols_cache * cache);
 
+/**
+ * Start cache reloading
+ * @param cache
+ * @param ev_base
+ */
+void rspamd_symbols_cache_start_refresh (struct symbols_cache * cache,
+               struct event_base *ev_base);
+
 #endif
index f142da22c26997caf68a45172490413fc1724abe..f723c2f47f5c07196add51f4f377faa49b3f308b 100644 (file)
@@ -307,7 +307,7 @@ start_worker (struct rspamd_worker *worker)
        msec_to_tv (ctx->timeout, &ctx->io_tv);
 
        rspamd_map_watch (worker->srv->cfg, ctx->ev_base);
-
+       rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base);
 
        ctx->resolver = dns_resolver_init (worker->srv->logger,
                        ctx->ev_base,