guint version;
guint items_inflight;
gboolean profile;
+ gboolean has_slow;
gdouble profile_start;
struct rspamd_scan_result *rs;
msg_debug_cache_task ("execute %s, %d", item->symbol, item->id);
if (checkpoint->profile) {
- dyn_item->start_msec = (rspamd_get_virtual_ticks () -
+ ev_now_update_if_cheap (task->event_loop);
+ dyn_item->start_msec = (ev_now (task->event_loop) -
checkpoint->profile_start) * 1e3;
}
rspamd_symcache_order_unref, checkpoint->order);
/* Calculate profile probability */
+ ev_now_update_if_cheap (task->event_loop);
ev_tstamp now = ev_now (task->event_loop);
+ checkpoint->profile_start = now;
if ((cache->last_profile == 0.0 || now > cache->last_profile + PROFILE_MAX_TIME) ||
(task->msg.len >= PROFILE_MESSAGE_SIZE_THRESHOLD) ||
(rspamd_random_double_fast () >= (1 - PROFILE_PROBABILITY))) {
msg_debug_cache_task ("enable profiling of symbols for task");
checkpoint->profile = TRUE;
- checkpoint->profile_start = rspamd_get_virtual_ticks ();
cache->last_profile = now;
}
gboolean
rspamd_symcache_process_symbols (struct rspamd_task *task,
- struct rspamd_symcache *cache, gint stage)
+ struct rspamd_symcache *cache,
+ gint stage)
{
struct rspamd_symcache_item *item = NULL;
struct rspamd_symcache_dynamic_item *dyn_item;
return ex;
}
+struct rspamd_symcache_delayed_cbdata {
+ struct rspamd_symcache_item *item;
+ struct rspamd_task *task;
+ struct ev_timer tm;
+};
+
+static void
+rspamd_symcache_delayed_item_cb (EV_P_ ev_timer *w, int what)
+{
+ struct rspamd_symcache_delayed_cbdata *cbd =
+ (struct rspamd_symcache_delayed_cbdata *)w->data;
+ struct rspamd_symcache_item *item;
+ struct rspamd_task *task;
+ struct cache_dependency *rdep;
+ struct cache_savepoint *checkpoint;
+ struct rspamd_symcache_dynamic_item *dyn_item;
+ guint i;
+
+ item = cbd->item;
+ task = cbd->task;
+ checkpoint = task->checkpoint;
+ checkpoint->has_slow = FALSE;
+ ev_timer_stop (EV_A_ w);
+
+ /* Process all reverse dependencies */
+ PTR_ARRAY_FOREACH (item->rdeps, i, rdep) {
+ if (rdep->item) {
+ dyn_item = rspamd_symcache_get_dynamic (checkpoint, rdep->item);
+ if (!CHECK_START_BIT (checkpoint, dyn_item)) {
+ msg_debug_cache_task ("check item %d(%s) rdep of %s ",
+ rdep->item->id, rdep->item->symbol, item->symbol);
+
+ if (!rspamd_symcache_check_deps (task, task->cfg->cache,
+ rdep->item,
+ checkpoint, 0, FALSE)) {
+ msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s "
+ "unless deps are resolved",
+ rdep->item->id, rdep->item->symbol, item->symbol);
+ }
+ else {
+ rspamd_symcache_check_symbol (task, task->cfg->cache,
+ rdep->item,
+ checkpoint);
+ }
+ }
+ }
+ }
+}
+
+static void
+rspamd_delayed_timer_dtor (gpointer d)
+{
+ struct rspamd_symcache_delayed_cbdata *cbd =
+ (struct rspamd_symcache_delayed_cbdata *)d;
+
+ ev_timer_stop (cbd->task->event_loop, &cbd->tm);
+}
/**
* Finalize the current async element potentially calling its deps
checkpoint->cur_item = NULL;
if (checkpoint->profile) {
- diff = ((rspamd_get_virtual_ticks () - checkpoint->profile_start) * 1e3 -
+ ev_now_update_if_cheap (task->event_loop);
+ diff = ((ev_now (task->event_loop) - checkpoint->profile_start) * 1e3 -
dyn_item->start_msec);
+
if (diff > slow_diff_limit) {
msg_info_task ("slow rule: %s(%d): %.2f ms", item->symbol, item->id,
diff);
+ checkpoint->has_slow = TRUE;
}
if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) {
}
}
+ if (checkpoint->has_slow) {
+ struct rspamd_symcache_delayed_cbdata *cbd = rspamd_mempool_alloc (task->task_pool,
+ sizeof (*cbd));
+ /* Add timer to allow something else to be executed */
+ ev_timer *tm = &cbd->tm;
+
+ ev_timer_init (tm, rspamd_symcache_delayed_item_cb, 0.1, 0.0);
+ ev_set_priority (tm, EV_MINPRI);
+ rspamd_mempool_add_destructor (task->task_pool,
+ rspamd_delayed_timer_dtor, cbd);
+ cbd->task = task;
+ cbd->item = item;
+ tm->data = cbd;
+ ev_timer_start (task->event_loop, tm);
+
+ return;
+ }
+
/* Process all reverse dependencies */
PTR_ARRAY_FOREACH (item->rdeps, i, rdep) {
if (rdep->item) {
}
return NULL;
+}
+
+void
+rspamd_symcache_enable_profile (struct rspamd_task *task)
+{
+ struct cache_savepoint *checkpoint = task->checkpoint;
+
+ if (checkpoint && !checkpoint->profile) {
+ ev_now_update_if_cheap (task->event_loop);
+ ev_tstamp now = ev_now (task->event_loop);
+ checkpoint->profile_start = now;
+
+ msg_debug_cache_task ("enable profiling of symbols for task");
+ checkpoint->profile = TRUE;
+ }
}
\ No newline at end of file