aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/controller.c3
-rw-r--r--src/libserver/symbols_cache.c148
-rw-r--r--src/libserver/symbols_cache.h3
-rw-r--r--src/libserver/worker_util.c19
-rw-r--r--src/libserver/worker_util.h7
-rw-r--r--src/worker.c3
6 files changed, 113 insertions, 70 deletions
diff --git a/src/controller.c b/src/controller.c
index 44899632e..599cea35c 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -3598,7 +3598,8 @@ start_controller_worker (struct rspamd_worker *worker)
ctx->ev_base, ctx->resolver->r);
/* Maps events */
rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver);
- rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base);
+ rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base,
+ worker);
rspamd_stat_init (worker->srv->cfg, ctx->ev_base);
event_base_loop (ctx->ev_base, 0);
diff --git a/src/libserver/symbols_cache.c b/src/libserver/symbols_cache.c
index 5fa5be37e..9292df7c0 100644
--- a/src/libserver/symbols_cache.c
+++ b/src/libserver/symbols_cache.c
@@ -22,6 +22,7 @@
#include "lua/lua_common.h"
#include "unix-std.h"
#include "contrib/t1ha/t1ha.h"
+#include "libserver/worker_util.h"
#include <math.h>
#define msg_err_cache(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
@@ -76,26 +77,27 @@ struct symbols_cache {
struct rspamd_config *cfg;
rspamd_mempool_mutex_t *mtx;
gdouble reload_time;
- struct event resort_ev;
};
struct counter_data {
gdouble mean;
gdouble stddev;
- gint number;
+ guint64 number;
};
struct item_stat {
gdouble avg_time;
gdouble weight;
- guint32 frequency;
- guint32 avg_counter;
+ guint64 frequency;
+ guint64 avg_counter;
};
struct cache_item {
/* This block is likely shared */
struct item_stat *st;
+ guint64 last_count;
+
/* Per process counter */
struct counter_data *cd;
gchar *symbol;
@@ -156,6 +158,14 @@ struct cache_savepoint {
struct symbols_cache_order *order;
};
+struct rspamd_cache_refresh_cbdata {
+ gdouble last_resort;
+ struct event resort_ev;
+ struct symbols_cache *cache;
+ struct rspamd_worker *w;
+ struct event_base *ev_base;
+};
+
/* XXX: Maybe make it configurable */
#define CACHE_RELOAD_TIME 60.0
/* weight, frequency, time */
@@ -170,8 +180,7 @@ static gboolean rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
struct symbols_cache *cache,
struct cache_item *item,
struct cache_savepoint *checkpoint,
- gdouble *total_diff,
- gdouble pr);
+ gdouble *total_diff);
static gboolean rspamd_symbols_cache_check_deps (struct rspamd_task *task,
struct symbols_cache *cache,
struct cache_item *item,
@@ -1163,7 +1172,6 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
struct cache_savepoint *checkpoint;
struct symbols_cache *cache;
gint i, remain = 0;
- gdouble pr = rspamd_random_double_fast ();
checkpoint = task->checkpoint;
cache = task->cfg->cache;
@@ -1183,7 +1191,7 @@ rspamd_symbols_cache_watcher_cb (gpointer sessiond, gpointer ud)
}
rspamd_symbols_cache_check_symbol (task, cache, it, checkpoint,
- NULL, pr);
+ NULL);
}
}
}
@@ -1196,8 +1204,7 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
struct symbols_cache *cache,
struct cache_item *item,
struct cache_savepoint *checkpoint,
- gdouble *total_diff,
- gdouble pr)
+ gdouble *total_diff)
{
guint pending_before, pending_after;
double t1 = 0, t2 = 0;
@@ -1237,31 +1244,26 @@ rspamd_symbols_cache_check_symbol (struct rspamd_task *task,
}
if (check) {
- if (pr > 0.9) {
- t1 = rspamd_get_ticks ();
- }
-
+ t1 = rspamd_get_ticks ();
pending_before = rspamd_session_events_pending (task->s);
/* Watch for events appeared */
rspamd_session_watch_start (task->s, rspamd_symbols_cache_watcher_cb,
item);
-
msg_debug_task ("execute %s, %d", item->symbol, item->id);
item->func (task, item->user_data);
+ t2 = rspamd_get_ticks ();
+ diff = (t2 - t1) * 1e6;
- if (pr > 0.9) {
- t2 = rspamd_get_ticks ();
- diff = (t2 - t1) * 1e6;
-
- if (total_diff) {
- *total_diff += diff;
- }
+ if (total_diff) {
+ *total_diff += diff;
+ }
- if (diff > slow_diff_limit) {
- msg_info_task ("slow rule: %s: %d ms", item->symbol,
- (gint)(diff / 1000.));
- }
+ if (diff > slow_diff_limit) {
+ msg_info_task ("slow rule: %s: %d ms", item->symbol,
+ (gint)(diff / 1000.));
+ }
+ if (rspamd_worker_is_normal (task->worker)) {
rspamd_set_counter (item, diff);
}
@@ -1303,7 +1305,6 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
struct cache_dependency *dep;
guint i;
gboolean ret = TRUE;
- gdouble pr = rspamd_random_double_fast ();
static const guint max_recursion = 20;
if (recursion > max_recursion) {
@@ -1339,8 +1340,7 @@ rspamd_symbols_cache_check_deps (struct rspamd_task *task,
else if (!rspamd_symbols_cache_check_symbol (task, cache,
dep->item,
checkpoint,
- NULL,
- pr)) {
+ NULL)) {
/* Now started, but has events pending */
ret = FALSE;
msg_debug_task ("started check of %d symbol as dep for "
@@ -1532,7 +1532,6 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
gint saved_priority;
const gdouble max_microseconds = 3e5;
guint start_events_pending;
- gdouble pr = rspamd_random_double_fast ();
g_assert (cache != NULL);
@@ -1578,8 +1577,9 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
return TRUE;
}
}
+
rspamd_symbols_cache_check_symbol (task, cache, item,
- checkpoint, &total_microseconds, pr);
+ checkpoint, &total_microseconds);
}
}
@@ -1634,7 +1634,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
}
rspamd_symbols_cache_check_symbol (task, cache, item,
- checkpoint, &total_microseconds, pr);
+ checkpoint, &total_microseconds);
}
if (total_microseconds > max_microseconds) {
@@ -1680,7 +1680,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
}
rspamd_symbols_cache_check_symbol (task, cache, item,
- checkpoint, &total_microseconds, pr);
+ checkpoint, &total_microseconds);
}
if (total_microseconds > max_microseconds) {
@@ -1731,7 +1731,7 @@ rspamd_symbols_cache_process_symbols (struct rspamd_task * task,
}
}
rspamd_symbols_cache_check_symbol (task, cache, item,
- checkpoint, &total_microseconds, pr);
+ checkpoint, &total_microseconds);
}
}
checkpoint->pass = RSPAMD_CACHE_PASS_WAIT_POSTFILTERS;
@@ -1828,67 +1828,81 @@ rspamd_symbols_cache_resort_cb (gint fd, short what, gpointer ud)
{
struct timeval tv;
gdouble tm;
- struct symbols_cache *cache = ud;
+ struct rspamd_cache_refresh_cbdata *cbdata = ud;
+ struct symbols_cache *cache;
struct cache_item *item, *parent;
guint i;
+ gdouble cur_ticks;
+ cache = cbdata->cache;
/* Plan new event */
tm = rspamd_time_jitter (cache->reload_time, 0);
+ cur_ticks = rspamd_get_ticks ();
msg_debug_cache ("resort symbols cache, next reload in %.2f seconds", tm);
g_assert (cache != NULL);
- evtimer_set (&cache->resort_ev, rspamd_symbols_cache_resort_cb, cache);
+ evtimer_set (&cbdata->resort_ev, rspamd_symbols_cache_resort_cb, cbdata);
+ event_base_set (cbdata->ev_base, &cbdata->resort_ev);
double_to_tv (tm, &tv);
- event_add (&cache->resort_ev, &tv);
-
- rspamd_mempool_lock_mutex (cache->mtx);
-
- /* Gather stats from shared execution times */
- for (i = 0; i < cache->items_by_id->len; i ++) {
- item = g_ptr_array_index (cache->items_by_id, i);
-
- if (item->type & (SYMBOL_TYPE_CALLBACK|SYMBOL_TYPE_NORMAL)) {
- if (item->cd->number > 0) {
- item->st->avg_counter += item->cd->number + 1;
- item->st->avg_time = item->st->avg_time +
- (item->cd->mean - item->st->avg_time) /
- (gdouble)item->st->avg_counter;
- item->cd->mean = item->st->avg_time;
- item->cd->number = item->st->avg_counter;
+ event_add (&cbdata->resort_ev, &tv);
+
+ if (rspamd_worker_is_normal (cbdata->w)) {
+ rspamd_mempool_lock_mutex (cache->mtx);
+
+ /* Gather stats from shared execution times */
+ for (i = 0; i < cache->items_by_id->len; i ++) {
+ item = g_ptr_array_index (cache->items_by_id, i);
+
+ if (item->type & (SYMBOL_TYPE_CALLBACK|SYMBOL_TYPE_NORMAL)) {
+ if (item->cd->number > 0) {
+ item->st->avg_counter += item->cd->number + 1;
+ item->st->avg_time = item->st->avg_time +
+ (item->cd->mean - item->st->avg_time) /
+ (gdouble)item->st->avg_counter;
+ item->cd->mean = item->st->avg_time;
+ item->cd->number = item->st->avg_counter;
+ }
}
}
- }
- /* Sync virtual symbols */
- for (i = 0; i < cache->items_by_id->len; i ++) {
- item = g_ptr_array_index (cache->items_by_id, i);
+ /* Sync virtual symbols */
+ for (i = 0; i < cache->items_by_id->len; i ++) {
+ item = g_ptr_array_index (cache->items_by_id, i);
- if (item->parent != -1) {
- parent = g_ptr_array_index (cache->items_by_id, item->parent);
+ if (item->parent != -1) {
+ parent = g_ptr_array_index (cache->items_by_id, item->parent);
- if (parent) {
- item->st->avg_time = parent->st->avg_time;
- item->st->avg_counter = parent->st->avg_counter;
+ if (parent) {
+ item->st->avg_time = parent->st->avg_time;
+ item->st->avg_counter = parent->st->avg_counter;
+ }
}
}
- }
- rspamd_mempool_unlock_mutex (cache->mtx);
+ rspamd_mempool_unlock_mutex (cache->mtx);
+ }
+ cbdata->last_resort = cur_ticks;
rspamd_symbols_cache_resort (cache);
}
void
rspamd_symbols_cache_start_refresh (struct symbols_cache * cache,
- struct event_base *ev_base)
+ struct event_base *ev_base, struct rspamd_worker *w)
{
struct timeval tv;
gdouble tm;
+ struct rspamd_cache_refresh_cbdata *cbdata;
+ cbdata = rspamd_mempool_alloc0 (cache->static_pool, sizeof (*cbdata));
+ cbdata->last_resort = rspamd_get_ticks ();
+ cbdata->ev_base = ev_base;
+ cbdata->w = w;
+ cbdata->cache = cache;
tm = rspamd_time_jitter (cache->reload_time, 0);
g_assert (cache != NULL);
- evtimer_set (&cache->resort_ev, rspamd_symbols_cache_resort_cb, cache);
- event_base_set (ev_base, &cache->resort_ev);
+ evtimer_set (&cbdata->resort_ev, rspamd_symbols_cache_resort_cb, cbdata);
+ event_base_set (ev_base, &cbdata->resort_ev);
double_to_tv (tm, &tv);
- event_add (&cache->resort_ev, &tv);
+ event_add (&cbdata->resort_ev, &tv);
}
void
diff --git a/src/libserver/symbols_cache.h b/src/libserver/symbols_cache.h
index f6015c2ef..daecfaa24 100644
--- a/src/libserver/symbols_cache.h
+++ b/src/libserver/symbols_cache.h
@@ -24,6 +24,7 @@
struct rspamd_task;
struct rspamd_config;
struct symbols_cache;
+struct rspamd_worker;
typedef void (*symbol_func_t)(struct rspamd_task *task, gpointer user_data);
@@ -185,7 +186,7 @@ ucl_object_t *rspamd_symbols_cache_counters (struct symbols_cache * cache);
* @param ev_base
*/
void rspamd_symbols_cache_start_refresh (struct symbols_cache * cache,
- struct event_base *ev_base);
+ struct event_base *ev_base, struct rspamd_worker *w);
/**
* Increases counter for a specific symbol
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index ac87f2870..f9d777e31 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -676,3 +676,22 @@ rspamd_hard_terminate (struct rspamd_main *rspamd_main)
rspamd_log_close (rspamd_main->logger);
exit (EXIT_FAILURE);
}
+
+gboolean
+rspamd_worker_is_normal (struct rspamd_worker *w)
+{
+ static GQuark normal_quark = (GQuark)0;
+
+ if (w) {
+ if (normal_quark) {
+ return w->type == normal_quark;
+ }
+ else {
+ normal_quark = g_quark_from_static_string ("normal");
+ }
+
+ return w->type == normal_quark;
+ }
+
+ return FALSE;
+}
diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h
index 634ffe67b..305c7f455 100644
--- a/src/libserver/worker_util.h
+++ b/src/libserver/worker_util.h
@@ -130,6 +130,13 @@ void rspamd_worker_block_signals (void);
void rspamd_hard_terminate (struct rspamd_main *rspamd_main) G_GNUC_NORETURN;
/**
+ * Returns TRUE if a specific worker is normal worker
+ * @param w
+ * @return
+ */
+gboolean rspamd_worker_is_normal (struct rspamd_worker *w);
+
+/**
* Fork new worker with the specified configuration
*/
struct rspamd_worker *rspamd_fork_worker (struct rspamd_main *,
diff --git a/src/worker.c b/src/worker.c
index 94ef2c9d1..12a6acf2c 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -603,7 +603,8 @@ start_worker (struct rspamd_worker *worker)
ctx->cfg = worker->srv->cfg;
ctx->ev_base = rspamd_prepare_worker (worker, "normal", accept_socket, TRUE);
msec_to_tv (ctx->timeout, &ctx->io_tv);
- rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base);
+ rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base,
+ worker);
ctx->resolver = dns_resolver_init (worker->srv->logger,
ctx->ev_base,