]> source.dussan.org Git - rspamd.git/commitdiff
[Rework] Rework final scripts logic
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 7 Nov 2019 14:31:08 +0000 (14:31 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 7 Nov 2019 14:31:08 +0000 (14:31 +0000)
src/fuzzy_storage.c
src/libserver/worker_util.c
src/libserver/worker_util.h
src/libutil/map.c
src/rspamd.c
src/rspamd.h
src/worker.c

index f7aec3e27a74366fb64a5fc03a7372aefac7dfc6..f8b18b78a37610c21ffafa74a6d21ac1cd55956f 100644 (file)
@@ -481,7 +481,7 @@ rspamd_fuzzy_updates_cb (gboolean success,
                }
        }
 
-       if (ctx->worker->wanna_die) {
+       if (ctx->worker->state != rspamd_worker_state_running) {
                /* Plan exit */
                ev_break (ctx->event_loop, EVBREAK_ALL);
        }
index ddf74136d0bb67e6f0def50d0bc06454e524d503..63342a72a1bb0731e7132816d62f1fe821233f8d 100644 (file)
@@ -113,47 +113,102 @@ rspamd_worker_check_finished (EV_P_ ev_timer *w, int revents)
        }
 }
 
+static gboolean
+rspamd_worker_finalize (gpointer user_data)
+{
+       struct rspamd_task *task = user_data;
+
+       if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
+               msg_info_task ("finishing actions has been processed, terminating");
+               /* ev_break (task->event_loop, EVBREAK_ALL); */
+               task->worker->state = rspamd_worker_wanna_die;
+               rspamd_session_destroy (task->s);
+
+               return TRUE;
+       }
+
+       return FALSE;
+}
+
+gboolean
+rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
+{
+       struct rspamd_task *task;
+       struct rspamd_config *cfg = worker->srv->cfg;
+       struct rspamd_abstract_worker_ctx *ctx;
+       struct rspamd_config_cfg_lua_script *sc;
+
+       if (cfg->on_term_scripts) {
+               ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx;
+               /* Create a fake task object for async events */
+               task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop);
+               task->resolver = ctx->resolver;
+               task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
+               task->s = rspamd_session_create (task->task_pool,
+                               rspamd_worker_finalize,
+                               NULL,
+                               (event_finalizer_t) rspamd_task_free,
+                               task);
+
+               DL_FOREACH (cfg->on_term_scripts, sc) {
+                       lua_call_finish_script (sc, task);
+               }
+
+               task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING;
+
+               if (rspamd_session_pending (task->s)) {
+                       return TRUE;
+               }
+       }
+
+       return FALSE;
+}
+
 static void
 rspamd_worker_terminate_handlers (struct rspamd_worker *w)
 {
-       guint i;
-       gboolean (*cb)(struct rspamd_worker *);
        struct rspamd_abstract_worker_ctx *actx;
-       struct ev_loop *final_gift, *orig_loop;
-       static ev_timer margin_call;
-       static int nchecks = 0;
-
-       if (w->finish_actions->len == 0) {
-               /* Nothing to do */
-               return;
-       }
 
        actx = (struct rspamd_abstract_worker_ctx *)w->ctx;
 
-       /*
-        * Here are dragons:
-        * - we create a new loop
-        * - we set a new ev_loop for worker via injection over rspamd_abstract_worker_ctx
-        * - then we run finish actions
-        * - then we create a special timer to kill worker if it fails to finish
-        */
-       final_gift = ev_loop_new (EVBACKEND_ALL);
-       orig_loop = actx->event_loop;
-       actx->event_loop = final_gift;
-       margin_call.data = &nchecks;
-       ev_timer_init (&margin_call, rspamd_worker_check_finished, 0.1,
-                       0.1);
-       ev_timer_start (final_gift, &margin_call);
-
-       for (i = 0; i < w->finish_actions->len; i ++) {
-               cb = g_ptr_array_index (w->finish_actions, i);
-               cb (w);
+       if (w->nconns == 0 &&
+               (!(w->flags & RSPAMD_WORKER_SCANNER) || w->srv->cfg->on_term_scripts == NULL)) {
+               /*
+                * We are here either:
+                * - No active connections are represented
+                * - No term scripts are registered
+                * - Worker is not a scanner, so it can die safely
+                */
+               ev_break (actx->event_loop, EVBREAK_ALL);
+               w->state = rspamd_worker_wanna_die;
+
+               return;
+       }
+       else if (w->nconns > 0) {
+               /*
+                * Wait until all connections are terminated
+                */
+               w->state = rspamd_worker_wait_connections;
        }
+       else {
+               /*
+                * Start finish scripts
+                */
+               w->state = rspamd_worker_wait_final_scripts;
+               msg_info ("performing finishing actions");
 
-       ev_run (final_gift, 0);
-       ev_loop_destroy (final_gift);
-       /* Restore original loop */
-       actx->event_loop = orig_loop;
+               if ((w->flags & RSPAMD_WORKER_SCANNER) &&
+                       rspamd_worker_call_finish_handlers (w)) {
+                       w->state = rspamd_worker_wait_final_scripts;
+               }
+               else {
+                       /*
+                        * We are done now
+                        */
+                       ev_break (actx->event_loop, EVBREAK_ALL);
+                       w->state = rspamd_worker_wanna_die;
+               }
+       }
 }
 
 static void
@@ -172,7 +227,7 @@ static gboolean
 rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
 {
        /* Do not accept new connections, preparing to end worker's process */
-       if (!sigh->worker->wanna_die) {
+       if (sigh->worker->state == rspamd_worker_state_running) {
                static ev_timer shutdown_ev;
                ev_tstamp shutdown_ts;
 
@@ -180,8 +235,8 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg
                                sigh->worker->srv->cfg->task_timeout * 2.0);
 
                rspamd_worker_ignore_signal (sigh);
-               sigh->worker->wanna_die = TRUE;
-               rspamd_worker_terminate_handlers (sigh->worker);
+               sigh->worker->state = rspamd_worker_state_terminating;
+
                rspamd_default_log_function (G_LOG_LEVEL_INFO,
                                sigh->worker->srv->server_pool->tag.tagname,
                                sigh->worker->srv->server_pool->tag.uid,
@@ -216,10 +271,11 @@ rspamd_worker_usr1_handler (struct rspamd_worker_signal_handler *sigh, void *arg
 static gboolean
 rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
 {
-       if (!sigh->worker->wanna_die) {
+       if (sigh->worker->state == rspamd_worker_state_running) {
                static ev_timer shutdown_ev;
 
                rspamd_worker_ignore_signal (sigh);
+               sigh->worker->state = rspamd_worker_state_terminating;
                rspamd_default_log_function (G_LOG_LEVEL_INFO,
                                sigh->worker->srv->server_pool->tag.tagname,
                                sigh->worker->srv->server_pool->tag.uid,
@@ -228,7 +284,6 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg
                                g_strsignal (sigh->signo));
 
                rspamd_worker_terminate_handlers (sigh->worker);
-               sigh->worker->wanna_die = 1;
                ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
                                0.0, 0.0);
                ev_timer_start (sigh->event_loop, &shutdown_ev);
@@ -862,7 +917,6 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
        REF_RETAIN (cf);
        wrk->index = index;
        wrk->ctx = cf->ctx;
-       wrk->finish_actions = g_ptr_array_new ();
        wrk->ppid = getpid ();
        wrk->pid = fork ();
        wrk->cores_throttled = rspamd_main->cores_throttling;
@@ -1377,7 +1431,7 @@ rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
 {
        gboolean need_refork = TRUE;
 
-       if (wrk->wanna_die || rspamd_main->wanna_die) {
+       if (wrk->state != rspamd_worker_state_running || rspamd_main->wanna_die) {
                /* Do not refork workers that are intended to be terminated */
                need_refork = FALSE;
        }
index 6fbda0b4ace835372c5a7642013a8396e4cc9021..15d79df2fa96e91e5421cb947f3857c7c83a751e 100644 (file)
@@ -237,6 +237,13 @@ void rspamd_worker_throttle_accept_events (gint sock, void *data);
 gboolean rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
                                                                                  struct rspamd_worker *wrk, int status);
 
+/**
+ * Call for final scripts for a worker
+ * @param worker
+ * @return
+ */
+gboolean rspamd_worker_call_finish_handlers (struct rspamd_worker *worker);
+
 #ifdef WITH_HYPERSCAN
 struct rspamd_control_command;
 
index 441b408ba9248efe748efb75e4356f817ee4ce28..8da3c7bd4cb80e404b8131c21a85d4aea6435403 100644 (file)
@@ -970,7 +970,7 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
                g_atomic_int_set (periodic->map->locked, 0);
                msg_debug_map ("unlocked map %s", periodic->map->name);
 
-               if (!periodic->map->wrk->wanna_die) {
+               if (periodic->map->wrk->state == rspamd_worker_state_running) {
                        rspamd_map_schedule_periodic (periodic->map,
                                        RSPAMD_SYMBOL_RESULT_NORMAL);
                }
@@ -1001,7 +1001,7 @@ rspamd_map_schedule_periodic (struct rspamd_map *map, int how)
        gdouble timeout;
        struct map_periodic_cbdata *cbd;
 
-       if (map->scheduled_check || (map->wrk && map->wrk->wanna_die)) {
+       if (map->scheduled_check || (map->wrk && map->wrk->state == rspamd_worker_state_running)) {
                /* Do not schedule check if some check is already scheduled */
                return;
        }
@@ -1897,7 +1897,7 @@ rspamd_map_process_periodic (struct map_periodic_cbdata *cbd)
                return;
        }
 
-       if (!(cbd->map->wrk && cbd->map->wrk->wanna_die)) {
+       if (!(cbd->map->wrk && cbd->map->wrk->state == rspamd_worker_state_running)) {
                bk = g_ptr_array_index (cbd->map->backends, cbd->cur_backend);
                g_assert (bk != NULL);
 
index 495da45d9b615f79ffcc345601f6fff1fc646d4b..4099e500347b09ac53ae6bb91709b10c63559bf8 100644 (file)
@@ -691,8 +691,8 @@ kill_old_workers (gpointer key, gpointer value, gpointer unused)
 
        rspamd_main = w->srv;
 
-       if (!w->wanna_die) {
-               w->wanna_die = TRUE;
+       if (w->state == rspamd_worker_state_running) {
+               w->state = rspamd_worker_state_terminating;
                kill (w->pid, SIGUSR2);
                ev_io_stop (rspamd_main->event_loop, &w->srv_ev);
                msg_info_main ("send signal to worker %P", w->pid);
@@ -1095,10 +1095,6 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
                rspamd_control_broadcast_srv_cmd (rspamd_main, &cmd, wrk->pid);
        }
 
-       if (wrk->finish_actions) {
-               g_ptr_array_free (wrk->finish_actions, TRUE);
-       }
-
        need_refork = rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
 
        if (need_refork) {
index d32681359c611e734ce0739038cfac13a2fc6094..773be7c567c21da9b9e2667249e2ccd545743804 100644 (file)
@@ -82,6 +82,14 @@ struct rspamd_worker_heartbeat {
        gint64 nbeats;                  /**< positive for beats received, negative for beats missed */
 };
 
+enum rspamd_worker_state {
+       rspamd_worker_state_running = 0,
+       rspamd_worker_state_terminating,
+       rspamd_worker_wait_connections,
+       rspamd_worker_wait_final_scripts,
+       rspamd_worker_wanna_die
+};
+
 /**
  * Worker process structure
  */
@@ -90,7 +98,7 @@ struct rspamd_worker {
        pid_t ppid;                     /**< pid of parent                                                                      */
        guint index;                    /**< index number                                                                       */
        guint nconns;                   /**< current connections count                                          */
-       gboolean wanna_die;             /**< worker is terminating                                                      */
+       enum rspamd_worker_state state; /**< current worker state                                                       */
        gboolean cores_throttled;       /**< set to true if cores throttling took place         */
        gdouble start_time;             /**< start time                                                                         */
        struct rspamd_main *srv;        /**< pointer to server structure                                        */
@@ -108,7 +116,6 @@ struct rspamd_worker {
        struct rspamd_worker_heartbeat hb; /**< heartbeat data */
        gpointer control_data;          /**< used by control protocol to handle commands        */
        gpointer tmp_data;              /**< used to avoid race condition to deal with control messages */
-       GPtrArray *finish_actions;      /**< called when worker is terminated                           */
        ev_child cld_ev;                /**< to allow reaping                                                           */
        rspamd_worker_term_cb term_handler; /**< custom term handler                                            */
 };
index 04447feea7a3fe428984c9961591643bdaacc887..81ec0904ae8743cee80e2c73f2f4fb0b0979f590 100644 (file)
@@ -69,56 +69,6 @@ worker_t normal_worker = {
         G_STRFUNC, \
         __VA_ARGS__)
 
-static gboolean
-rspamd_worker_finalize (gpointer user_data)
-{
-       struct rspamd_task *task = user_data;
-
-       if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
-               msg_info_task ("finishing actions has been processed, terminating");
-               /* ev_break (task->event_loop, EVBREAK_ALL); */
-               rspamd_session_destroy (task->s);
-
-               return TRUE;
-       }
-
-       return FALSE;
-}
-
-static gboolean
-rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
-{
-       struct rspamd_task *task;
-       struct rspamd_config *cfg = worker->srv->cfg;
-       struct rspamd_abstract_worker_ctx *ctx;
-       struct rspamd_config_cfg_lua_script *sc;
-
-       if (cfg->on_term_scripts) {
-               ctx = worker->ctx;
-               /* Create a fake task object for async events */
-               task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop);
-               task->resolver = ctx->resolver;
-               task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
-               task->s = rspamd_session_create (task->task_pool,
-                               rspamd_worker_finalize,
-                               NULL,
-                               (event_finalizer_t) rspamd_task_free,
-                               task);
-
-               DL_FOREACH (cfg->on_term_scripts, sc) {
-                       lua_call_finish_script (sc, task);
-               }
-
-               task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING;
-
-               if (rspamd_session_pending (task->s)) {
-                       return TRUE;
-               }
-       }
-
-       return FALSE;
-}
-
 /*
  * Reduce number of tasks proceeded
  */
@@ -129,9 +79,20 @@ reduce_tasks_count (gpointer arg)
 
        worker->nconns --;
 
-       if (worker->wanna_die && worker->nconns == 0) {
+       if (worker->state == rspamd_worker_wait_connections && worker->nconns == 0) {
+
+               worker->state = rspamd_worker_wait_final_scripts;
                msg_info ("performing finishing actions");
-               rspamd_worker_call_finish_handlers (worker);
+
+               if (rspamd_worker_call_finish_handlers (worker)) {
+                       worker->state = rspamd_worker_wait_final_scripts;
+               }
+               else {
+                       worker->state = rspamd_worker_wanna_die;
+               }
+       }
+       else if (worker->state != rspamd_worker_state_running) {
+               worker->state = rspamd_worker_wait_connections;
        }
 }
 
@@ -596,19 +557,6 @@ init_worker (struct rspamd_config *cfg)
        return ctx;
 }
 
-static gboolean
-rspamd_worker_on_terminate (struct rspamd_worker *worker)
-{
-       if (worker->nconns == 0) {
-               msg_info ("performing finishing actions");
-               if (rspamd_worker_call_finish_handlers (worker)) {
-                       return TRUE;
-               }
-       }
-
-       return FALSE;
-}
-
 void
 rspamd_worker_init_scanner (struct rspamd_worker *worker,
                struct ev_loop *ev_base,
@@ -616,8 +564,6 @@ rspamd_worker_init_scanner (struct rspamd_worker *worker,
                struct rspamd_lang_detector **plang_det)
 {
        rspamd_stat_init (worker->srv->cfg, ev_base);
-       g_ptr_array_add (worker->finish_actions,
-                       (gpointer) rspamd_worker_on_terminate);
 #ifdef WITH_HYPERSCAN
        rspamd_control_worker_add_cmd_handler (worker,
                        RSPAMD_CONTROL_HYPERSCAN_LOADED,