]> source.dussan.org Git - rspamd.git/commitdiff
[Rework] Start rework of symbols cache updates
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 24 Jan 2017 14:12:20 +0000 (14:12 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 24 Jan 2017 14:12:20 +0000 (14:12 +0000)
src/controller.c
src/libserver/symbols_cache.c
src/libserver/symbols_cache.h
src/libserver/worker_util.c
src/libserver/worker_util.h
src/worker.c

index 44899632e049880659e6995be3594cb11fea00c3..599cea35c1ceb7b844b87bed9261fde036f036e7 100644 (file)
@@ -3598,7 +3598,8 @@ start_controller_worker (struct rspamd_worker *worker)
                        ctx->ev_base, ctx->resolver->r);
        /* Maps events */
        rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver);
-       rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base);
+       rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base,
+                       worker);
        rspamd_stat_init (worker->srv->cfg, ctx->ev_base);
 
        event_base_loop (ctx->ev_base, 0);
index 5fa5be37e3ed8343ca962169e78cf0f7e06eaf13..9292df7c0cd5f21b06eee814e6e851d5c0b54333 100644 (file)
@@ -22,6 +22,7 @@
 #include "lua/lua_common.h"
 #include "unix-std.h"
 #include "contrib/t1ha/t1ha.h"
+#include "libserver/worker_util.h"
 #include <math.h>
 
 #define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
@@ -76,26 +77,27 @@ struct symbols_cache {
        struct rspamd_config *cfg;
        rspamd_mempool_mutex_t *mtx;
        gdouble reload_time;
-       struct event resort_ev;
 };
 
 struct counter_data {
        gdouble mean;
        gdouble stddev;
-       gint number;
+       guint64 number;
 };
 
 struct item_stat {
        gdouble avg_time;
        gdouble weight;
-       guint32 frequency;
-       guint32 avg_counter;
+       guint64 frequency;
+       guint64 avg_counter;
 };
 
 struct cache_item {
        /* This block is likely shared */
        struct item_stat *st;
 
+       guint64 last_count;
+
        /* Per process counter */
        struct counter_data *cd;
        gchar *symbol;
@@ -156,6 +158,14 @@ struct cache_savepoint {
        struct symbols_cache_order *order;
 };
 
+struct rspamd_cache_refresh_cbdata {
+       gdouble last_resort;
+       struct event resort_ev;
+       struct symbols_cache *cache;
+       struct rspamd_worker *w;
+       struct event_base *ev_base;
+};
+
 /* XXX: Maybe make it configurable */
 #define CACHE_RELOAD_TIME 60.0
 /* weight, frequency, time */
@@ -170,8 +180,7 @@ static gboolean rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
                struct symbols_cache *cache,
                struct cache_item *item,
                struct cache_savepoint *checkpoint,
-               gdouble *total_diff,
-               gdouble pr);
+               gdouble *total_diff);
 static gboolean rspamd_symbols_cache_check_deps (struct rspamd_task *task,
                struct symbols_cache *cache,
                struct cache_item *item,
@@ -1163,7 +1172,6 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
        struct cache_savepoint *checkpoint;
        struct symbols_cache *cache;
        gint i, remain = 0;
-       gdouble pr = rspamd_random_double_fast ();
 
        checkpoint = task->checkpoint;
        cache = task->cfg->cache;
@@ -1183,7 +1191,7 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
                                }
 
                                rspamd_symbols_cache_check_symbol (task, cache, it, checkpoint,
-                                               NULL, pr);
+                                               NULL);
                        }
                }
        }
@@ -1196,8 +1204,7 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
                struct symbols_cache *cache,
                struct cache_item *item,
                struct cache_savepoint *checkpoint,
-               gdouble *total_diff,
-               gdouble pr)
+               gdouble *total_diff)
 {
        guint pending_before, pending_after;
        double t1 = 0, t2 = 0;
@@ -1237,31 +1244,26 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
                }
 
                if (check) {
-                       if (pr > 0.9) {
-                               t1 = rspamd_get_ticks ();
-                       }
-
+                       t1 = rspamd_get_ticks ();
                        pending_before = rspamd_session_events_pending (task->s);
                        /* Watch for events appeared */
                        rspamd_session_watch_start (task->s, rspamd_symbols_cache_watcher_cb,
                                        item);
-
                        msg_debug_task ("execute %s, %d", item->symbol, item->id);
                        item->func (task, item->user_data);
+                       t2 = rspamd_get_ticks ();
+                       diff = (t2 - t1) * 1e6;
 
-                       if (pr > 0.9) {
-                               t2 = rspamd_get_ticks ();
-                               diff = (t2 - t1) * 1e6;
-
-                               if (total_diff) {
-                                       *total_diff += diff;
-                               }
+                       if (total_diff) {
+                               *total_diff += diff;
+                       }
 
-                               if (diff > slow_diff_limit) {
-                                       msg_info_task ("slow rule: %s: %d ms", item->symbol,
-                                                       (gint)(diff / 1000.));
-                               }
+                       if (diff > slow_diff_limit) {
+                               msg_info_task ("slow rule: %s: %d ms", item->symbol,
+                                               (gint)(diff / 1000.));
+                       }
 
+                       if (rspamd_worker_is_normal (task->worker)) {
                                rspamd_set_counter (item, diff);
                        }
 
@@ -1303,7 +1305,6 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
        struct cache_dependency *dep;
        guint i;
        gboolean ret = TRUE;
-       gdouble pr = rspamd_random_double_fast ();
        static const guint max_recursion = 20;
 
        if (recursion > max_recursion) {
@@ -1339,8 +1340,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
                                        else if (!rspamd_symbols_cache_check_symbol (task, cache,
                                                        dep->item,
                                                        checkpoint,
-                                                       NULL,
-                                                       pr)) {
+                                                       NULL)) {
                                                /* Now started, but has events pending */
                                                ret = FALSE;
                                                msg_debug_task ("started check of %d symbol as dep for "
@@ -1532,7 +1532,6 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
        gint saved_priority;
        const gdouble max_microseconds = 3e5;
        guint start_events_pending;
-       gdouble pr = rspamd_random_double_fast ();
 
        g_assert (cache != NULL);
 
@@ -1578,8 +1577,9 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                                                return TRUE;
                                        }
                                }
+
                                rspamd_symbols_cache_check_symbol (task, cache, item,
-                                               checkpoint, &total_microseconds, pr);
+                                               checkpoint, &total_microseconds);
                        }
                }
 
@@ -1634,7 +1634,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                                }
 
                                rspamd_symbols_cache_check_symbol (task, cache, item,
-                                               checkpoint, &total_microseconds, pr);
+                                               checkpoint, &total_microseconds);
                        }
 
                        if (total_microseconds > max_microseconds) {
@@ -1680,7 +1680,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                                }
 
                                rspamd_symbols_cache_check_symbol (task, cache, item,
-                                               checkpoint, &total_microseconds, pr);
+                                               checkpoint, &total_microseconds);
                        }
 
                        if (total_microseconds > max_microseconds) {
@@ -1731,7 +1731,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
                                        }
                                }
                                rspamd_symbols_cache_check_symbol (task, cache, item,
-                                               checkpoint, &total_microseconds, pr);
+                                               checkpoint, &total_microseconds);
                        }
                }
                checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_POSTFILTERS;
@@ -1828,67 +1828,81 @@ rspamd_symbols_cache_resort_cb (gint fd, short what, gpointer ud)
 {
        struct timeval tv;
        gdouble tm;
-       struct symbols_cache *cache = ud;
+       struct rspamd_cache_refresh_cbdata *cbdata = ud;
+       struct symbols_cache *cache;
        struct cache_item *item, *parent;
        guint i;
+       gdouble cur_ticks;
 
+       cache = cbdata->cache;
        /* Plan new event */
        tm = rspamd_time_jitter (cache->reload_time, 0);
+       cur_ticks = rspamd_get_ticks ();
        msg_debug_cache ("resort symbols cache, next reload in %.2f seconds", tm);
        g_assert (cache != NULL);
-       evtimer_set (&cache->resort_ev, rspamd_symbols_cache_resort_cb, cache);
+       evtimer_set (&cbdata->resort_ev, rspamd_symbols_cache_resort_cb, cbdata);
+       event_base_set (cbdata->ev_base, &cbdata->resort_ev);
        double_to_tv (tm, &tv);
-       event_add (&cache->resort_ev, &tv);
-
-       rspamd_mempool_lock_mutex (cache->mtx);
-
-       /* Gather stats from shared execution times */
-       for (i = 0; i < cache->items_by_id->len; i ++) {
-               item = g_ptr_array_index (cache->items_by_id, i);
-
-               if (item->type & (SYMBOL_TYPE_CALLBACK|SYMBOL_TYPE_NORMAL)) {
-                       if (item->cd->number > 0) {
-                               item->st->avg_counter += item->cd->number + 1;
-                               item->st->avg_time = item->st->avg_time +
-                                               (item->cd->mean - item->st->avg_time) /
-                                                               (gdouble)item->st->avg_counter;
-                               item->cd->mean = item->st->avg_time;
-                               item->cd->number = item->st->avg_counter;
+       event_add (&cbdata->resort_ev, &tv);
+
+       if (rspamd_worker_is_normal (cbdata->w)) {
+               rspamd_mempool_lock_mutex (cache->mtx);
+
+               /* Gather stats from shared execution times */
+               for (i = 0; i < cache->items_by_id->len; i ++) {
+                       item = g_ptr_array_index (cache->items_by_id, i);
+
+                       if (item->type & (SYMBOL_TYPE_CALLBACK|SYMBOL_TYPE_NORMAL)) {
+                               if (item->cd->number > 0) {
+                                       item->st->avg_counter += item->cd->number + 1;
+                                       item->st->avg_time = item->st->avg_time +
+                                                       (item->cd->mean - item->st->avg_time) /
+                                                       (gdouble)item->st->avg_counter;
+                                       item->cd->mean = item->st->avg_time;
+                                       item->cd->number = item->st->avg_counter;
+                               }
                        }
                }
-       }
-       /* Sync virtual symbols */
-       for (i = 0; i < cache->items_by_id->len; i ++) {
-               item = g_ptr_array_index (cache->items_by_id, i);
+               /* Sync virtual symbols */
+               for (i = 0; i < cache->items_by_id->len; i ++) {
+                       item = g_ptr_array_index (cache->items_by_id, i);
 
-               if (item->parent != -1) {
-                       parent = g_ptr_array_index (cache->items_by_id, item->parent);
+                       if (item->parent != -1) {
+                               parent = g_ptr_array_index (cache->items_by_id, item->parent);
 
-                       if (parent) {
-                               item->st->avg_time = parent->st->avg_time;
-                               item->st->avg_counter = parent->st->avg_counter;
+                               if (parent) {
+                                       item->st->avg_time = parent->st->avg_time;
+                                       item->st->avg_counter = parent->st->avg_counter;
+                               }
                        }
                }
-       }
 
-       rspamd_mempool_unlock_mutex (cache->mtx);
+               rspamd_mempool_unlock_mutex (cache->mtx);
+       }
 
+       cbdata->last_resort = cur_ticks;
        rspamd_symbols_cache_resort (cache);
 }
 
 void
 rspamd_symbols_cache_start_refresh (struct symbols_cache * cache,
-               struct event_base *ev_base)
+               struct event_base *ev_base, struct rspamd_worker *w)
 {
        struct timeval tv;
        gdouble tm;
+       struct rspamd_cache_refresh_cbdata *cbdata;
 
+       cbdata = rspamd_mempool_alloc0 (cache->static_pool, sizeof (*cbdata));
+       cbdata->last_resort = rspamd_get_ticks ();
+       cbdata->ev_base = ev_base;
+       cbdata->w = w;
+       cbdata->cache = cache;
        tm = rspamd_time_jitter (cache->reload_time, 0);
        g_assert (cache != NULL);
-       evtimer_set (&cache->resort_ev, rspamd_symbols_cache_resort_cb, cache);
-       event_base_set (ev_base, &cache->resort_ev);
+       evtimer_set (&cbdata->resort_ev, rspamd_symbols_cache_resort_cb, cbdata);
+       event_base_set (ev_base, &cbdata->resort_ev);
        double_to_tv (tm, &tv);
-       event_add (&cache->resort_ev, &tv);
+       event_add (&cbdata->resort_ev, &tv);
 }
 
 void
index f6015c2ef712c8a24c5c600e01885d7b57d78d7f..daecfaa2450f3c068b0d62c5115ec59d11f7244a 100644 (file)
@@ -24,6 +24,7 @@
 struct rspamd_task;
 struct rspamd_config;
 struct symbols_cache;
+struct rspamd_worker;
 
 typedef void (*symbol_func_t)(struct rspamd_task *task, gpointer user_data);
 
@@ -185,7 +186,7 @@ ucl_object_t *rspamd_symbols_cache_counters (struct symbols_cache * cache);
  * @param ev_base
  */
 void rspamd_symbols_cache_start_refresh (struct symbols_cache * cache,
-               struct event_base *ev_base);
+               struct event_base *ev_base, struct rspamd_worker *w);
 
 /**
  * Increases counter for a specific symbol
index ac87f2870a628d3e041ed645d0d7deafde8e2d74..f9d777e31073bd6f1a54fc7f5fe59a2261d5fa65 100644 (file)
@@ -676,3 +676,22 @@ rspamd_hard_terminate (struct rspamd_main *rspamd_main)
        rspamd_log_close (rspamd_main->logger);
        exit (EXIT_FAILURE);
 }
+
+gboolean
+rspamd_worker_is_normal (struct rspamd_worker *w)
+{
+       static GQuark normal_quark = (GQuark)0;
+
+       if (w) {
+               if (normal_quark) {
+                       return w->type == normal_quark;
+               }
+               else {
+                       normal_quark = g_quark_from_static_string ("normal");
+               }
+
+               return w->type == normal_quark;
+       }
+
+       return FALSE;
+}
index 634ffe67b9cc0f0c2822ddd3dbaabf7a0e72d469..305c7f4559ca8d93f10272191d07528be4a2244c 100644 (file)
@@ -129,6 +129,13 @@ void rspamd_worker_block_signals (void);
  */
 void rspamd_hard_terminate (struct rspamd_main *rspamd_main) G_GNUC_NORETURN;
 
+/**
+ * Returns TRUE if a specific worker is normal worker
+ * @param w
+ * @return
+ */
+gboolean rspamd_worker_is_normal (struct rspamd_worker *w);
+
 /**
  * Fork new worker with the specified configuration
  */
index 94ef2c9d10e0df2654d78ba5c62a6ea0fda5a4a2..12a6acf2cd9b2773917e6f914467b337128f9a5e 100644 (file)
@@ -603,7 +603,8 @@ start_worker (struct rspamd_worker *worker)
        ctx->cfg = worker->srv->cfg;
        ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket, TRUE);
        msec_to_tv (ctx->timeout, &ctx->io_tv);
-       rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base);
+       rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base,
+                       worker);
 
        ctx->resolver = dns_resolver_init (worker->srv->logger,
                        ctx->ev_base,