]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Add logic to break execution when processing symbols
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 24 Dec 2019 16:26:12 +0000 (16:26 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 24 Dec 2019 16:26:12 +0000 (16:26 +0000)
src/controller.c
src/libserver/rspamd_symcache.c
src/libserver/rspamd_symcache.h

index d36431702cd1ecb9745b6d2e4389839993918774..adbc2b8485cb1df62614eab6148c4be5a56d8ff3 100644 (file)
@@ -2126,6 +2126,7 @@ rspamd_controller_handle_scan (struct rspamd_http_connection_entry *conn_ent,
                ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
                                ctx->task_timeout, ctx->task_timeout);
                ev_timer_start (task->event_loop, &task->timeout_ev);
+               ev_set_priority (&task->timeout_ev, EV_MAXPRI);
        }
 
 end:
index f232a6a242be22c554496fab3484be81a540e07c..31bba13860cbabcde1cd1e864fd72e1488a84c3f 100644 (file)
@@ -204,6 +204,7 @@ struct cache_savepoint {
        guint version;
        guint items_inflight;
        gboolean profile;
+       gboolean has_slow;
        gdouble profile_start;
 
        struct rspamd_scan_result *rs;
@@ -1761,7 +1762,8 @@ rspamd_symcache_check_symbol (struct rspamd_task *task,
                msg_debug_cache_task ("execute %s, %d", item->symbol, item->id);
 
                if (checkpoint->profile) {
-                       dyn_item->start_msec = (rspamd_get_virtual_ticks () -
+                       ev_now_update_if_cheap (task->event_loop);
+                       dyn_item->start_msec = (ev_now (task->event_loop) -
                                        checkpoint->profile_start) * 1e3;
                }
 
@@ -1914,14 +1916,15 @@ rspamd_symcache_make_checkpoint (struct rspamd_task *task,
                        rspamd_symcache_order_unref, checkpoint->order);
 
        /* Calculate profile probability */
+       ev_now_update_if_cheap (task->event_loop);
        ev_tstamp now = ev_now (task->event_loop);
+       checkpoint->profile_start = now;
 
        if ((cache->last_profile == 0.0 || now > cache->last_profile + PROFILE_MAX_TIME) ||
                        (task->msg.len >= PROFILE_MESSAGE_SIZE_THRESHOLD) ||
                        (rspamd_random_double_fast () >= (1 - PROFILE_PROBABILITY))) {
                msg_debug_cache_task ("enable profiling of symbols for task");
                checkpoint->profile = TRUE;
-               checkpoint->profile_start = rspamd_get_virtual_ticks ();
                cache->last_profile = now;
        }
 
@@ -2029,7 +2032,8 @@ rspamd_symcache_process_settings (struct rspamd_task *task,
 
 gboolean
 rspamd_symcache_process_symbols (struct rspamd_task *task,
-                                                                struct rspamd_symcache *cache, gint stage)
+                                                                struct rspamd_symcache *cache,
+                                                                gint stage)
 {
        struct rspamd_symcache_item *item = NULL;
        struct rspamd_symcache_dynamic_item *dyn_item;
@@ -2986,6 +2990,63 @@ rspamd_symcache_set_cur_item (struct rspamd_task *task,
        return ex;
 }
 
+struct rspamd_symcache_delayed_cbdata {
+       struct rspamd_symcache_item *item;
+       struct rspamd_task *task;
+       struct ev_timer tm;
+};
+
+static void
+rspamd_symcache_delayed_item_cb (EV_P_ ev_timer *w, int what)
+{
+       struct rspamd_symcache_delayed_cbdata *cbd =
+                       (struct rspamd_symcache_delayed_cbdata *)w->data;
+       struct rspamd_symcache_item *item;
+       struct rspamd_task *task;
+       struct cache_dependency *rdep;
+       struct cache_savepoint *checkpoint;
+       struct rspamd_symcache_dynamic_item *dyn_item;
+       guint i;
+
+       item = cbd->item;
+       task = cbd->task;
+       checkpoint = task->checkpoint;
+       checkpoint->has_slow = FALSE;
+       ev_timer_stop (EV_A_ w);
+
+       /* Process all reverse dependencies */
+       PTR_ARRAY_FOREACH (item->rdeps, i, rdep) {
+               if (rdep->item) {
+                       dyn_item = rspamd_symcache_get_dynamic (checkpoint, rdep->item);
+                       if (!CHECK_START_BIT (checkpoint, dyn_item)) {
+                               msg_debug_cache_task ("check item %d(%s) rdep of %s ",
+                                               rdep->item->id, rdep->item->symbol, item->symbol);
+
+                               if (!rspamd_symcache_check_deps (task, task->cfg->cache,
+                                               rdep->item,
+                                               checkpoint, 0, FALSE)) {
+                                       msg_debug_cache_task ("blocked execution of %d(%s) rdep of %s "
+                                                                                 "unless deps are resolved",
+                                                       rdep->item->id, rdep->item->symbol, item->symbol);
+                               }
+                               else {
+                                       rspamd_symcache_check_symbol (task, task->cfg->cache,
+                                                       rdep->item,
+                                                       checkpoint);
+                               }
+                       }
+               }
+       }
+}
+
+static void
+rspamd_delayed_timer_dtor (gpointer d)
+{
+       struct rspamd_symcache_delayed_cbdata *cbd =
+                       (struct rspamd_symcache_delayed_cbdata *)d;
+
+       ev_timer_stop (cbd->task->event_loop, &cbd->tm);
+}
 
 /**
  * Finalize the current async element potentially calling its deps
@@ -3027,11 +3088,14 @@ rspamd_symcache_finalize_item (struct rspamd_task *task,
        checkpoint->cur_item = NULL;
 
        if (checkpoint->profile) {
-               diff = ((rspamd_get_virtual_ticks () - checkpoint->profile_start) * 1e3 -
+               ev_now_update_if_cheap (task->event_loop);
+               diff = ((ev_now (task->event_loop) - checkpoint->profile_start) * 1e3 -
                                dyn_item->start_msec);
+
                if (diff > slow_diff_limit) {
                        msg_info_task ("slow rule: %s(%d): %.2f ms", item->symbol, item->id,
                                        diff);
+                       checkpoint->has_slow = TRUE;
                }
 
                if (G_UNLIKELY (RSPAMD_TASK_IS_PROFILING (task))) {
@@ -3043,6 +3107,24 @@ rspamd_symcache_finalize_item (struct rspamd_task *task,
                }
        }
 
+       if (checkpoint->has_slow) {
+               struct rspamd_symcache_delayed_cbdata *cbd = rspamd_mempool_alloc (task->task_pool,
+                               sizeof (*cbd));
+               /* Add timer to allow something else to be executed */
+               ev_timer *tm = &cbd->tm;
+
+               ev_timer_init (tm, rspamd_symcache_delayed_item_cb, 0.1, 0.0);
+               ev_set_priority (tm, EV_MINPRI);
+               rspamd_mempool_add_destructor (task->task_pool,
+                               rspamd_delayed_timer_dtor, cbd);
+               cbd->task = task;
+               cbd->item = item;
+               tm->data = cbd;
+               ev_timer_start (task->event_loop, tm);
+
+               return;
+       }
+
        /* Process all reverse dependencies */
        PTR_ARRAY_FOREACH (item->rdeps, i, rdep) {
                if (rdep->item) {
@@ -3598,4 +3680,19 @@ rspamd_symcache_item_get_rdeps (struct rspamd_symcache_item *item)
        }
 
        return NULL;
+}
+
+void
+rspamd_symcache_enable_profile (struct rspamd_task *task)
+{
+       struct cache_savepoint *checkpoint = task->checkpoint;
+
+       if (checkpoint && !checkpoint->profile) {
+               ev_now_update_if_cheap (task->event_loop);
+               ev_tstamp now = ev_now (task->event_loop);
+               checkpoint->profile_start = now;
+
+               msg_debug_cache_task ("enable profiling of symbols for task");
+               checkpoint->profile = TRUE;
+       }
 }
\ No newline at end of file
index 0eab043fdac208fa347af112dc61ae6be65e0fac..c220b1cccff7b26d4a3328478ec289c15d1ce0e6 100644 (file)
@@ -569,6 +569,11 @@ const GPtrArray* rspamd_symcache_item_get_rdeps (
                struct rspamd_symcache_item *item);
 
 
+/**
+ * Enable profiling for task (e.g. when a slow rule has been found)
+ * @param task
+ */
+void rspamd_symcache_enable_profile (struct rspamd_task *task);
 #ifdef  __cplusplus
 }
 #endif