diff options
-rw-r--r-- | src/controller.c | 3 | ||||
-rw-r--r-- | src/libserver/symbols_cache.c | 148 | ||||
-rw-r--r-- | src/libserver/symbols_cache.h | 3 | ||||
-rw-r--r-- | src/libserver/worker_util.c | 19 | ||||
-rw-r--r-- | src/libserver/worker_util.h | 7 | ||||
-rw-r--r-- | src/worker.c | 3 |
6 files changed, 113 insertions, 70 deletions
diff --git a/src/controller.c b/src/controller.c index 44899632e..599cea35c 100644 --- a/src/controller.c +++ b/src/controller.c @@ -3598,7 +3598,8 @@ start_controller_worker (struct rspamd_worker *worker) ctx->ev_base, ctx->resolver->r); /* Maps events */ rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver); - rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base); + rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base, + worker); rspamd_stat_init (worker->srv->cfg, ctx->ev_base); event_base_loop (ctx->ev_base, 0); diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c index 5fa5be37e..9292df7c0 100644 --- a/src/libserver/symbols_cache.c +++ b/src/libserver/symbols_cache.c @@ -22,6 +22,7 @@ #include "lua/lua_common.h" #include "unix-std.h" #include "contrib/t1ha/t1ha.h" +#include "libserver/worker_util.h" #include <math.h> #define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ @@ -76,26 +77,27 @@ struct symbols_cache { struct rspamd_config *cfg; rspamd_mempool_mutex_t *mtx; gdouble reload_time; - struct event resort_ev; }; struct counter_data { gdouble mean; gdouble stddev; - gint number; + guint64 number; }; struct item_stat { gdouble avg_time; gdouble weight; - guint32 frequency; - guint32 avg_counter; + guint64 frequency; + guint64 avg_counter; }; struct cache_item { /* This block is likely shared */ struct item_stat *st; + guint64 last_count; + /* Per process counter */ struct counter_data *cd; gchar *symbol; @@ -156,6 +158,14 @@ struct cache_savepoint { struct symbols_cache_order *order; }; +struct rspamd_cache_refresh_cbdata { + gdouble last_resort; + struct event resort_ev; + struct symbols_cache *cache; + struct rspamd_worker *w; + struct event_base *ev_base; +}; + /* XXX: Maybe make it configurable */ #define CACHE_RELOAD_TIME 60.0 /* weight, frequency, time */ @@ -170,8 +180,7 @@ static gboolean rspamd_symbols_cache_check_symbol (struct rspamd_task *task, struct symbols_cache *cache, struct cache_item *item, struct cache_savepoint *checkpoint, - gdouble *total_diff, - gdouble pr); + gdouble *total_diff); static gboolean rspamd_symbols_cache_check_deps (struct rspamd_task *task, struct symbols_cache *cache, struct cache_item *item, @@ -1163,7 +1172,6 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud) struct cache_savepoint *checkpoint; struct symbols_cache *cache; gint i, remain = 0; - gdouble pr = rspamd_random_double_fast (); checkpoint = task->checkpoint; cache = task->cfg->cache; @@ -1183,7 +1191,7 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud) } rspamd_symbols_cache_check_symbol (task, cache, it, checkpoint, - NULL, pr); + NULL); } } } @@ -1196,8 +1204,7 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task, struct symbols_cache *cache, struct cache_item *item, struct cache_savepoint *checkpoint, - gdouble *total_diff, - gdouble pr) + gdouble *total_diff) { guint pending_before, pending_after; double t1 = 0, t2 = 0; @@ -1237,31 +1244,26 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task, } if (check) { - if (pr > 0.9) { - t1 = rspamd_get_ticks (); - } - + t1 = rspamd_get_ticks (); pending_before = rspamd_session_events_pending (task->s); /* Watch for events appeared */ rspamd_session_watch_start (task->s, rspamd_symbols_cache_watcher_cb, item); - msg_debug_task ("execute %s, %d", item->symbol, item->id); item->func (task, item->user_data); + t2 = rspamd_get_ticks (); + diff = (t2 - t1) * 1e6; - if (pr > 0.9) { - t2 = rspamd_get_ticks (); - diff = (t2 - t1) * 1e6; - - if (total_diff) { - *total_diff += diff; - } + if (total_diff) { + *total_diff += diff; + } - if (diff > slow_diff_limit) { - msg_info_task ("slow rule: %s: %d ms", item->symbol, - (gint)(diff / 1000.)); - } + if (diff > slow_diff_limit) { + msg_info_task ("slow rule: %s: %d ms", item->symbol, + (gint)(diff / 1000.)); + } + if (rspamd_worker_is_normal (task->worker)) { rspamd_set_counter (item, diff); } @@ -1303,7 +1305,6 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, struct cache_dependency *dep; guint i; gboolean ret = TRUE; - gdouble pr = rspamd_random_double_fast (); static const guint max_recursion = 20; if (recursion > max_recursion) { @@ -1339,8 +1340,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task, else if (!rspamd_symbols_cache_check_symbol (task, cache, dep->item, checkpoint, - NULL, - pr)) { + NULL)) { /* Now started, but has events pending */ ret = FALSE; msg_debug_task ("started check of %d symbol as dep for " @@ -1532,7 +1532,6 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, gint saved_priority; const gdouble max_microseconds = 3e5; guint start_events_pending; - gdouble pr = rspamd_random_double_fast (); g_assert (cache != NULL); @@ -1578,8 +1577,9 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, return TRUE; } } + rspamd_symbols_cache_check_symbol (task, cache, item, - checkpoint, &total_microseconds, pr); + checkpoint, &total_microseconds); } } @@ -1634,7 +1634,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, } rspamd_symbols_cache_check_symbol (task, cache, item, - checkpoint, &total_microseconds, pr); + checkpoint, &total_microseconds); } if (total_microseconds > max_microseconds) { @@ -1680,7 +1680,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, } rspamd_symbols_cache_check_symbol (task, cache, item, - checkpoint, &total_microseconds, pr); + checkpoint, &total_microseconds); } if (total_microseconds > max_microseconds) { @@ -1731,7 +1731,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task, } } rspamd_symbols_cache_check_symbol (task, cache, item, - checkpoint, &total_microseconds, pr); + checkpoint, &total_microseconds); } } checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_POSTFILTERS; @@ -1828,67 +1828,81 @@ rspamd_symbols_cache_resort_cb (gint fd, short what, gpointer ud) { struct timeval tv; gdouble tm; - struct symbols_cache *cache = ud; + struct rspamd_cache_refresh_cbdata *cbdata = ud; + struct symbols_cache *cache; struct cache_item *item, *parent; guint i; + gdouble cur_ticks; + cache = cbdata->cache; /* Plan new event */ tm = rspamd_time_jitter (cache->reload_time, 0); + cur_ticks = rspamd_get_ticks (); msg_debug_cache ("resort symbols cache, next reload in %.2f seconds", tm); g_assert (cache != NULL); - evtimer_set (&cache->resort_ev, rspamd_symbols_cache_resort_cb, cache); + evtimer_set (&cbdata->resort_ev, rspamd_symbols_cache_resort_cb, cbdata); + event_base_set (cbdata->ev_base, &cbdata->resort_ev); double_to_tv (tm, &tv); - event_add (&cache->resort_ev, &tv); - - rspamd_mempool_lock_mutex (cache->mtx); - - /* Gather stats from shared execution times */ - for (i = 0; i < cache->items_by_id->len; i ++) { - item = g_ptr_array_index (cache->items_by_id, i); - - if (item->type & (SYMBOL_TYPE_CALLBACK|SYMBOL_TYPE_NORMAL)) { - if (item->cd->number > 0) { - item->st->avg_counter += item->cd->number + 1; - item->st->avg_time = item->st->avg_time + - (item->cd->mean - item->st->avg_time) / - (gdouble)item->st->avg_counter; - item->cd->mean = item->st->avg_time; - item->cd->number = item->st->avg_counter; + event_add (&cbdata->resort_ev, &tv); + + if (rspamd_worker_is_normal (cbdata->w)) { + rspamd_mempool_lock_mutex (cache->mtx); + + /* Gather stats from shared execution times */ + for (i = 0; i < cache->items_by_id->len; i ++) { + item = g_ptr_array_index (cache->items_by_id, i); + + if (item->type & (SYMBOL_TYPE_CALLBACK|SYMBOL_TYPE_NORMAL)) { + if (item->cd->number > 0) { + item->st->avg_counter += item->cd->number + 1; + item->st->avg_time = item->st->avg_time + + (item->cd->mean - item->st->avg_time) / + (gdouble)item->st->avg_counter; + item->cd->mean = item->st->avg_time; + item->cd->number = item->st->avg_counter; + } } } - } - /* Sync virtual symbols */ - for (i = 0; i < cache->items_by_id->len; i ++) { - item = g_ptr_array_index (cache->items_by_id, i); + /* Sync virtual symbols */ + for (i = 0; i < cache->items_by_id->len; i ++) { + item = g_ptr_array_index (cache->items_by_id, i); - if (item->parent != -1) { - parent = g_ptr_array_index (cache->items_by_id, item->parent); + if (item->parent != -1) { + parent = g_ptr_array_index (cache->items_by_id, item->parent); - if (parent) { - item->st->avg_time = parent->st->avg_time; - item->st->avg_counter = parent->st->avg_counter; + if (parent) { + item->st->avg_time = parent->st->avg_time; + item->st->avg_counter = parent->st->avg_counter; + } } } - } - rspamd_mempool_unlock_mutex (cache->mtx); + rspamd_mempool_unlock_mutex (cache->mtx); + } + cbdata->last_resort = cur_ticks; rspamd_symbols_cache_resort (cache); } void rspamd_symbols_cache_start_refresh (struct symbols_cache * cache, - struct event_base *ev_base) + struct event_base *ev_base, struct rspamd_worker *w) { struct timeval tv; gdouble tm; + struct rspamd_cache_refresh_cbdata *cbdata; + cbdata = rspamd_mempool_alloc0 (cache->static_pool, sizeof (*cbdata)); + cbdata->last_resort = rspamd_get_ticks (); + cbdata->ev_base = ev_base; + cbdata->w = w; + cbdata->cache = cache; tm = rspamd_time_jitter (cache->reload_time, 0); g_assert (cache != NULL); - evtimer_set (&cache->resort_ev, rspamd_symbols_cache_resort_cb, cache); - event_base_set (ev_base, &cache->resort_ev); + evtimer_set (&cbdata->resort_ev, rspamd_symbols_cache_resort_cb, cbdata); + event_base_set (ev_base, &cbdata->resort_ev); double_to_tv (tm, &tv); - event_add (&cache->resort_ev, &tv); + event_add (&cbdata->resort_ev, &tv); } void diff --git a/src/libserver/symbols_cache.h b/src/libserver/symbols_cache.h index f6015c2ef..daecfaa24 100644 --- a/src/libserver/symbols_cache.h +++ b/src/libserver/symbols_cache.h @@ -24,6 +24,7 @@ struct rspamd_task; struct rspamd_config; struct symbols_cache; +struct rspamd_worker; typedef void (*symbol_func_t)(struct rspamd_task *task, gpointer user_data); @@ -185,7 +186,7 @@ ucl_object_t *rspamd_symbols_cache_counters (struct symbols_cache * cache); * @param ev_base */ void rspamd_symbols_cache_start_refresh (struct symbols_cache * cache, - struct event_base *ev_base); + struct event_base *ev_base, struct rspamd_worker *w); /** * Increases counter for a specific symbol diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index ac87f2870..f9d777e31 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -676,3 +676,22 @@ rspamd_hard_terminate (struct rspamd_main *rspamd_main) rspamd_log_close (rspamd_main->logger); exit (EXIT_FAILURE); } + +gboolean +rspamd_worker_is_normal (struct rspamd_worker *w) +{ + static GQuark normal_quark = (GQuark)0; + + if (w) { + if (normal_quark) { + return w->type == normal_quark; + } + else { + normal_quark = g_quark_from_static_string ("normal"); + } + + return w->type == normal_quark; + } + + return FALSE; +} diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h index 634ffe67b..305c7f455 100644 --- a/src/libserver/worker_util.h +++ b/src/libserver/worker_util.h @@ -130,6 +130,13 @@ void rspamd_worker_block_signals (void); void rspamd_hard_terminate (struct rspamd_main *rspamd_main) G_GNUC_NORETURN; /** + * Returns TRUE if a specific worker is normal worker + * @param w + * @return + */ +gboolean rspamd_worker_is_normal (struct rspamd_worker *w); + +/** * Fork new worker with the specified configuration */ struct rspamd_worker *rspamd_fork_worker (struct rspamd_main *, diff --git a/src/worker.c b/src/worker.c index 94ef2c9d1..12a6acf2c 100644 --- a/src/worker.c +++ b/src/worker.c @@ -603,7 +603,8 @@ start_worker (struct rspamd_worker *worker) ctx->cfg = worker->srv->cfg; ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket, TRUE); msec_to_tv (ctx->timeout, &ctx->io_tv); - rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base); + rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base, + worker); ctx->resolver = dns_resolver_init (worker->srv->logger, ctx->ev_base, |