Browse Source

Implement cache resorting.

tags/1.0.0
Vsevolod Stakhov 9 years ago
parent
commit
4e0dec94f6
4 changed files with 97 additions and 12 deletions
  1. 1
    2
      src/controller.c
  2. 87
    9
      src/libserver/symbols_cache.c
  3. 8
    0
      src/libserver/symbols_cache.h
  4. 1
    1
      src/worker.c

+ 1
- 2
src/controller.c View File

@@ -1594,8 +1594,6 @@ rspamd_controller_handle_counters (
{
struct rspamd_controller_session *session = conn_ent->ud;
ucl_object_t *top;
GList *cur;
struct cache_item *item;
struct symbols_cache *cache;

if (!rspamd_controller_check_password (conn_ent, session, msg, FALSE)) {
@@ -1932,6 +1930,7 @@ start_controller_worker (struct rspamd_worker *worker)
rspamd_upstreams_library_config (worker->srv->cfg);
/* Maps events */
rspamd_map_watch (worker->srv->cfg, ctx->ev_base);
rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base);

event_base_loop (ctx->ev_base, 0);


+ 87
- 9
src/libserver/symbols_cache.c View File

@@ -28,16 +28,13 @@
#include "message.h"
#include "symbols_cache.h"
#include "cfg_file.h"
#include "blake2.h"

/* After which number of messages try to resort cache */
#define MAX_USES 100
static const guchar rspamd_symbols_cache_magic[8] = {'r', 's', 'c', 1, 0, 0, 0, 0 };

struct rspamd_symbols_cache_header {
guchar magic[8];
guint nitems;
guchar checksum[BLAKE2B_OUTBYTES];
guchar checksum[64];
guchar unused[128];
};

@@ -50,6 +47,9 @@ struct symbols_cache {
guint used_items;
guint uses;
struct rspamd_config *cfg;
rspamd_mempool_mutex_t *mtx;
gdouble reload_time;
struct event resort_ev;
};

struct counter_data {
@@ -62,6 +62,7 @@ struct cache_item {
gdouble avg_time;
gdouble weight;
guint32 frequency;
guint32 avg_counter;

/* Per process counter */
struct counter_data *cd;
@@ -80,6 +81,8 @@ struct cache_item {
gdouble metric_weight;
};

/* XXX: Maybe make it configurable */
#define CACHE_RELOAD_TIME 60.0
/* weight, frequency, time */
#define TIME_ALPHA (1.0 / 1000000.0)
#define WEIGHT_ALPHA (0.001)
@@ -232,22 +235,25 @@ rspamd_symbols_cache_load_items (struct symbols_cache *cache, const gchar *name)
elt = ucl_object_find_key (cur, "weight");

if (elt) {
w = ucl_object_todouble (cur);
w = ucl_object_todouble (elt);
if (w != 0) {
item->weight = w;
}
}

elt = ucl_object_find_key (cur, "time");
if (elt) {
item->avg_time = ucl_object_todouble (elt);
}

elt = ucl_object_find_key (cur, "count");
if (elt) {
item->avg_time = ucl_object_todouble (cur);
item->avg_counter = ucl_object_toint (elt);
}

elt = ucl_object_find_key (cur, "frequency");

if (elt) {
item->frequency = ucl_object_toint (cur);
item->frequency = ucl_object_toint (elt);
}
}
}
@@ -265,6 +271,7 @@ rspamd_symbols_cache_load_items (struct symbols_cache *cache, const gchar *name)
* parent item avg_time
*/
parent->avg_time = item->avg_time;
parent->avg_counter = item->avg_counter;
}

ucl_object_iterate_free (it);
@@ -315,6 +322,8 @@ rspamd_symbols_cache_save_items (struct symbols_cache *cache, const gchar *name)
"weight", 0, false);
ucl_object_insert_key (elt, ucl_object_fromdouble (item->avg_time),
"time", 0, false);
ucl_object_insert_key (elt, ucl_object_fromdouble (item->avg_counter),
"count", 0, false);
ucl_object_insert_key (elt, ucl_object_fromint (item->frequency),
"frequency", 0, false);

@@ -492,6 +501,8 @@ rspamd_symbols_cache_new (void)
cache->items_by_symbol = g_hash_table_new (rspamd_str_hash,
rspamd_str_equal);
cache->items_by_order = g_ptr_array_new ();
cache->mtx = rspamd_mempool_get_mutex (cache->static_pool);
cache->reload_time = CACHE_RELOAD_TIME;

return cache;
}
@@ -628,7 +639,10 @@ rspamd_symbols_cache_metric_validate_cb (gpointer k, gpointer v, gpointer ud)

if (item) {
item->metric_weight = weight;
item->weight = item->weight * weight;

if (abs (item->weight) < abs (weight) || weight < 0) {
item->weight = weight;
}
}
}

@@ -787,3 +801,67 @@ rspamd_symbols_cache_counters (struct symbols_cache * cache)

return top;
}

static void
rspamd_symbols_cache_resort_cb (gint fd, short what, gpointer ud)
{
struct timeval tv;
gdouble tm;
struct symbols_cache *cache = ud;
struct cache_item *item, *parent;
guint i;

/* Plan new event */
tm = rspamd_time_jitter (cache->reload_time, 0);
msg_info ("resort symbols cache, next reload in %.2f seconds", tm);
g_assert (cache != NULL);
evtimer_set (&cache->resort_ev, rspamd_symbols_cache_resort_cb, cache);
double_to_tv (tm, &tv);
event_add (&cache->resort_ev, &tv);

rspamd_mempool_lock_mutex (cache->mtx);

/* Gather stats from shared execution times */
for (i = 0; i < cache->items_by_order->len; i ++) {
item = g_ptr_array_index (cache->items_by_order, i);

if (item->type == SYMBOL_TYPE_CALLBACK ||
item->type == SYMBOL_TYPE_NORMAL) {
if (item->cd->number > 0) {
item->avg_counter += item->cd->number + 1;
item->avg_time = item->avg_time +
(item->cd->value - item->avg_time) /
item->avg_counter;
item->cd->value = item->avg_time;
item->cd->number = item->avg_counter;
}
}
}
/* Sync virtual symbols */
for (i = 0; i < cache->items_by_order->len; i ++) {
if (item->parent != -1) {
parent = g_ptr_array_index (cache->items_by_order, item->parent);
item->avg_time = parent->avg_time;
item->avg_counter = parent->avg_counter;
}
}

rspamd_mempool_unlock_mutex (cache->mtx);

post_cache_init (cache);
}

void
rspamd_symbols_cache_start_refresh (struct symbols_cache * cache,
struct event_base *ev_base)
{
struct timeval tv;
gdouble tm;

tm = rspamd_time_jitter (cache->reload_time, 0);
g_assert (cache != NULL);
evtimer_set (&cache->resort_ev, rspamd_symbols_cache_resort_cb, cache);
event_base_set (ev_base, &cache->resort_ev);
double_to_tv (tm, &tv);
event_add (&cache->resort_ev, &tv);
}

+ 8
- 0
src/libserver/symbols_cache.h View File

@@ -157,4 +157,12 @@ gboolean rspamd_symbols_cache_validate (struct symbols_cache *cache,
*/
ucl_object_t *rspamd_symbols_cache_counters (struct symbols_cache * cache);

/**
* Start cache reloading
* @param cache
* @param ev_base
*/
void rspamd_symbols_cache_start_refresh (struct symbols_cache * cache,
struct event_base *ev_base);

#endif

+ 1
- 1
src/worker.c View File

@@ -307,7 +307,7 @@ start_worker (struct rspamd_worker *worker)
msec_to_tv (ctx->timeout, &ctx->io_tv);

rspamd_map_watch (worker->srv->cfg, ctx->ev_base);
rspamd_symbols_cache_start_refresh (worker->srv->cfg->cache, ctx->ev_base);

ctx->resolver = dns_resolver_init (worker->srv->logger,
ctx->ev_base,

Loading…
Cancel
Save