From: Vsevolod Stakhov Date: Tue, 24 Dec 2019 16:26:12 +0000 (+0000) Subject: [Project] Add logic to break execution when processing symbols X-Git-Tag: 2.3~174 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=f827a09afb7785c97010e7116dc63b61efde6a25;p=rspamd.git [Project] Add logic to break execution when processing symbols --- diff --git a/src/controller.c b/src/controller.c index d36431702..adbc2b848 100644 --- a/src/controller.c +++ b/src/controller.c @@ -2126,6 +2126,7 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent, ev_timer_init (&task->timeout_ev, rspamd_task_timeout, ctx->task_timeout, ctx->task_timeout); ev_timer_start (task->event_loop, &task->timeout_ev); + ev_set_priority (&task->timeout_ev, EV_MAXPRI); } end: diff --git a/src/libserver/rspamd_symcache.c b/src/libserver/rspamd_symcache.c index f232a6a24..31bba1386 100644 --- a/src/libserver/rspamd_symcache.c +++ b/src/libserver/rspamd_symcache.c @@ -204,6 +204,7 @@ struct cache_savepoint { guint version; guint items_inflight; gboolean profile; + gboolean has_slow; gdouble profile_start; struct rspamd_scan_result *rs; @@ -1761,7 +1762,8 @@ rspamd_symcache_check_symbol (struct rspamd_task *task, msg_debug_cache_task ("execute %s, %d", item->symbol, item->id); if (checkpoint->profile) { - dyn_item->start_msec = (rspamd_get_virtual_ticks () - + ev_now_update_if_cheap (task->event_loop); + dyn_item->start_msec = (ev_now (task->event_loop) - checkpoint->profile_start) * 1e3; } @@ -1914,14 +1916,15 @@ rspamd_symcache_make_checkpoint (struct rspamd_task *task, rspamd_symcache_order_unref, checkpoint->order); /* Calculate profile probability */ + ev_now_update_if_cheap (task->event_loop); ev_tstamp now = ev_now (task->event_loop); + checkpoint->profile_start = now; if ((cache->last_profile == 0.0 || now > cache->last_profile + PROFILE_MAX_TIME) || (task->msg.len >= PROFILE_MESSAGE_SIZE_THRESHOLD) || (rspamd_random_double_fast () >= (1 - PROFILE_PROBABILITY))) { msg_debug_cache_task ("enable profiling of symbols for task"); checkpoint->profile = TRUE; - checkpoint->profile_start = rspamd_get_virtual_ticks (); cache->last_profile = now; } @@ -2029,7 +2032,8 @@ rspamd_symcache_process_settings (struct rspamd_task *task, gboolean rspamd_symcache_process_symbols (struct rspamd_task *task, - struct rspamd_symcache *cache, gint stage) + struct rspamd_symcache *cache, + gint stage) { struct rspamd_symcache_item *item = NULL; struct rspamd_symcache_dynamic_item *dyn_item; @@ -2986,6 +2990,63 @@ rspamd_symcache_set_cur_item (struct rspamd_task *task, return ex; } +struct rspamd_symcache_delayed_cbdata { + struct rspamd_symcache_item *item; + struct rspamd_task *task; + struct ev_timer tm; +}; + +static void +rspamd_symcache_delayed_item_cb (EV_P_ ev_timer *w, int what) +{ + struct rspamd_symcache_delayed_cbdata *cbd = + (struct rspamd_symcache_delayed_cbdata *)w->data; + struct rspamd_symcache_item *item; + struct rspamd_task *task; + struct cache_dependency *rdep; + struct cache_savepoint *checkpoint; + struct rspamd_symcache_dynamic_item *dyn_item; + guint i; + + item = cbd->item; + task = cbd->task; + checkpoint = task->checkpoint; + checkpoint->has_slow = FALSE; + ev_timer_stop (EV_A_ w); + + /* Process all reverse dependencies */ + PTR_ARRAY_FOREACH (item->rdeps, i, rdep) { + if (rdep->item) { + dyn_item = rspamd_symcache_get_dynamic (checkpoint, rdep->item); + if (!CHECK_START_BIT (checkpoint, dyn_item)) { + msg_debug_cache_task ("check item %d(%s) rdep of %s ", + rdep->item->id, rdep->item->symbol, item->symbol); + + if (!rspamd_symcache_check_deps (task, task->cfg->cache, + rdep->item, + checkpoint, 0, FALSE)) { + msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s " + "unless deps are resolved", + rdep->item->id, rdep->item->symbol, item->symbol); + } + else { + rspamd_symcache_check_symbol (task, task->cfg->cache, + rdep->item, + checkpoint); + } + } + } + } +} + +static void +rspamd_delayed_timer_dtor (gpointer d) +{ + struct rspamd_symcache_delayed_cbdata *cbd = + (struct rspamd_symcache_delayed_cbdata *)d; + + ev_timer_stop (cbd->task->event_loop, &cbd->tm); +} /** * Finalize the current async element potentially calling its deps @@ -3027,11 +3088,14 @@ rspamd_symcache_finalize_item (struct rspamd_task *task, checkpoint->cur_item = NULL; if (checkpoint->profile) { - diff = ((rspamd_get_virtual_ticks () - checkpoint->profile_start) * 1e3 - + ev_now_update_if_cheap (task->event_loop); + diff = ((ev_now (task->event_loop) - checkpoint->profile_start) * 1e3 - dyn_item->start_msec); + if (diff > slow_diff_limit) { msg_info_task ("slow rule: %s(%d): %.2f ms", item->symbol, item->id, diff); + checkpoint->has_slow = TRUE; } if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) { @@ -3043,6 +3107,24 @@ rspamd_symcache_finalize_item (struct rspamd_task *task, } } + if (checkpoint->has_slow) { + struct rspamd_symcache_delayed_cbdata *cbd = rspamd_mempool_alloc (task->task_pool, + sizeof (*cbd)); + /* Add timer to allow something else to be executed */ + ev_timer *tm = &cbd->tm; + + ev_timer_init (tm, rspamd_symcache_delayed_item_cb, 0.1, 0.0); + ev_set_priority (tm, EV_MINPRI); + rspamd_mempool_add_destructor (task->task_pool, + rspamd_delayed_timer_dtor, cbd); + cbd->task = task; + cbd->item = item; + tm->data = cbd; + ev_timer_start (task->event_loop, tm); + + return; + } + /* Process all reverse dependencies */ PTR_ARRAY_FOREACH (item->rdeps, i, rdep) { if (rdep->item) { @@ -3598,4 +3680,19 @@ rspamd_symcache_item_get_rdeps (struct rspamd_symcache_item *item) } return NULL; +} + +void +rspamd_symcache_enable_profile (struct rspamd_task *task) +{ + struct cache_savepoint *checkpoint = task->checkpoint; + + if (checkpoint && !checkpoint->profile) { + ev_now_update_if_cheap (task->event_loop); + ev_tstamp now = ev_now (task->event_loop); + checkpoint->profile_start = now; + + msg_debug_cache_task ("enable profiling of symbols for task"); + checkpoint->profile = TRUE; + } } \ No newline at end of file diff --git a/src/libserver/rspamd_symcache.h b/src/libserver/rspamd_symcache.h index 0eab043fd..c220b1ccc 100644 --- a/src/libserver/rspamd_symcache.h +++ b/src/libserver/rspamd_symcache.h @@ -569,6 +569,11 @@ const GPtrArray* rspamd_symcache_item_get_rdeps ( struct rspamd_symcache_item *item); +/** + * Enable profiling for task (e.g. when a slow rule has been found) + * @param task + */ +void rspamd_symcache_enable_profile (struct rspamd_task *task); #ifdef __cplusplus } #endif