diff options
-rw-r--r-- | src/fuzzy_storage.c | 2 | ||||
-rw-r--r-- | src/libserver/worker_util.c | 132 | ||||
-rw-r--r-- | src/libserver/worker_util.h | 7 | ||||
-rw-r--r-- | src/libutil/map.c | 6 | ||||
-rw-r--r-- | src/rspamd.c | 8 | ||||
-rw-r--r-- | src/rspamd.h | 11 | ||||
-rw-r--r-- | src/worker.c | 80 |
7 files changed, 128 insertions, 118 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index f7aec3e27..f8b18b78a 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -481,7 +481,7 @@ rspamd_fuzzy_updates_cb (gboolean success, } } - if (ctx->worker->wanna_die) { + if (ctx->worker->state != rspamd_worker_state_running) { /* Plan exit */ ev_break (ctx->event_loop, EVBREAK_ALL); } diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index ddf74136d..63342a72a 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -113,47 +113,102 @@ rspamd_worker_check_finished (EV_P_ ev_timer *w, int revents) } } +static gboolean +rspamd_worker_finalize (gpointer user_data) +{ + struct rspamd_task *task = user_data; + + if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) { + msg_info_task ("finishing actions has been processed, terminating"); + /* ev_break (task->event_loop, EVBREAK_ALL); */ + task->worker->state = rspamd_worker_wanna_die; + rspamd_session_destroy (task->s); + + return TRUE; + } + + return FALSE; +} + +gboolean +rspamd_worker_call_finish_handlers (struct rspamd_worker *worker) +{ + struct rspamd_task *task; + struct rspamd_config *cfg = worker->srv->cfg; + struct rspamd_abstract_worker_ctx *ctx; + struct rspamd_config_cfg_lua_script *sc; + + if (cfg->on_term_scripts) { + ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx; + /* Create a fake task object for async events */ + task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop); + task->resolver = ctx->resolver; + task->flags |= RSPAMD_TASK_FLAG_PROCESSING; + task->s = rspamd_session_create (task->task_pool, + rspamd_worker_finalize, + NULL, + (event_finalizer_t) rspamd_task_free, + task); + + DL_FOREACH (cfg->on_term_scripts, sc) { + lua_call_finish_script (sc, task); + } + + task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING; + + if (rspamd_session_pending (task->s)) { + return TRUE; + } + } + + return FALSE; +} + static void rspamd_worker_terminate_handlers (struct rspamd_worker *w) { - guint i; - gboolean (*cb)(struct rspamd_worker *); struct rspamd_abstract_worker_ctx *actx; - struct ev_loop *final_gift, *orig_loop; - static ev_timer margin_call; - static int nchecks = 0; - - if (w->finish_actions->len == 0) { - /* Nothing to do */ - return; - } actx = (struct rspamd_abstract_worker_ctx *)w->ctx; - /* - * Here are dragons: - * - we create a new loop - * - we set a new ev_loop for worker via injection over rspamd_abstract_worker_ctx - * - then we run finish actions - * - then we create a special timer to kill worker if it fails to finish - */ - final_gift = ev_loop_new (EVBACKEND_ALL); - orig_loop = actx->event_loop; - actx->event_loop = final_gift; - margin_call.data = &nchecks; - ev_timer_init (&margin_call, rspamd_worker_check_finished, 0.1, - 0.1); - ev_timer_start (final_gift, &margin_call); - - for (i = 0; i < w->finish_actions->len; i ++) { - cb = g_ptr_array_index (w->finish_actions, i); - cb (w); + if (w->nconns == 0 && + (!(w->flags & RSPAMD_WORKER_SCANNER) || w->srv->cfg->on_term_scripts == NULL)) { + /* + * We are here either: + * - No active connections are represented + * - No term scripts are registered + * - Worker is not a scanner, so it can die safely + */ + ev_break (actx->event_loop, EVBREAK_ALL); + w->state = rspamd_worker_wanna_die; + + return; + } + else if (w->nconns > 0) { + /* + * Wait until all connections are terminated + */ + w->state = rspamd_worker_wait_connections; } + else { + /* + * Start finish scripts + */ + w->state = rspamd_worker_wait_final_scripts; + msg_info ("performing finishing actions"); - ev_run (final_gift, 0); - ev_loop_destroy (final_gift); - /* Restore original loop */ - actx->event_loop = orig_loop; + if ((w->flags & RSPAMD_WORKER_SCANNER) && + rspamd_worker_call_finish_handlers (w)) { + w->state = rspamd_worker_wait_final_scripts; + } + else { + /* + * We are done now + */ + ev_break (actx->event_loop, EVBREAK_ALL); + w->state = rspamd_worker_wanna_die; + } + } } static void @@ -172,7 +227,7 @@ static gboolean rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg) { /* Do not accept new connections, preparing to end worker's process */ - if (!sigh->worker->wanna_die) { + if (sigh->worker->state == rspamd_worker_state_running) { static ev_timer shutdown_ev; ev_tstamp shutdown_ts; @@ -180,8 +235,8 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg sigh->worker->srv->cfg->task_timeout * 2.0); rspamd_worker_ignore_signal (sigh); - sigh->worker->wanna_die = TRUE; - rspamd_worker_terminate_handlers (sigh->worker); + sigh->worker->state = rspamd_worker_state_terminating; + rspamd_default_log_function (G_LOG_LEVEL_INFO, sigh->worker->srv->server_pool->tag.tagname, sigh->worker->srv->server_pool->tag.uid, @@ -216,10 +271,11 @@ rspamd_worker_usr1_handler (struct rspamd_worker_signal_handler *sigh, void *arg static gboolean rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg) { - if (!sigh->worker->wanna_die) { + if (sigh->worker->state == rspamd_worker_state_running) { static ev_timer shutdown_ev; rspamd_worker_ignore_signal (sigh); + sigh->worker->state = rspamd_worker_state_terminating; rspamd_default_log_function (G_LOG_LEVEL_INFO, sigh->worker->srv->server_pool->tag.tagname, sigh->worker->srv->server_pool->tag.uid, @@ -228,7 +284,6 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg g_strsignal (sigh->signo)); rspamd_worker_terminate_handlers (sigh->worker); - sigh->worker->wanna_die = 1; ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown, 0.0, 0.0); ev_timer_start (sigh->event_loop, &shutdown_ev); @@ -862,7 +917,6 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, REF_RETAIN (cf); wrk->index = index; wrk->ctx = cf->ctx; - wrk->finish_actions = g_ptr_array_new (); wrk->ppid = getpid (); wrk->pid = fork (); wrk->cores_throttled = rspamd_main->cores_throttling; @@ -1377,7 +1431,7 @@ rspamd_check_termination_clause (struct rspamd_main *rspamd_main, { gboolean need_refork = TRUE; - if (wrk->wanna_die || rspamd_main->wanna_die) { + if (wrk->state != rspamd_worker_state_running || rspamd_main->wanna_die) { /* Do not refork workers that are intended to be terminated */ need_refork = FALSE; } diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h index 6fbda0b4a..15d79df2f 100644 --- a/src/libserver/worker_util.h +++ b/src/libserver/worker_util.h @@ -237,6 +237,13 @@ void rspamd_worker_throttle_accept_events (gint sock, void *data); gboolean rspamd_check_termination_clause (struct rspamd_main *rspamd_main, struct rspamd_worker *wrk, int status); +/** + * Call for final scripts for a worker + * @param worker + * @return + */ +gboolean rspamd_worker_call_finish_handlers (struct rspamd_worker *worker); + #ifdef WITH_HYPERSCAN struct rspamd_control_command; diff --git a/src/libutil/map.c b/src/libutil/map.c index 441b408ba..8da3c7bd4 100644 --- a/src/libutil/map.c +++ b/src/libutil/map.c @@ -970,7 +970,7 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic) g_atomic_int_set (periodic->map->locked, 0); msg_debug_map ("unlocked map %s", periodic->map->name); - if (!periodic->map->wrk->wanna_die) { + if (periodic->map->wrk->state == rspamd_worker_state_running) { rspamd_map_schedule_periodic (periodic->map, RSPAMD_SYMBOL_RESULT_NORMAL); } @@ -1001,7 +1001,7 @@ rspamd_map_schedule_periodic (struct rspamd_map *map, int how) gdouble timeout; struct map_periodic_cbdata *cbd; - if (map->scheduled_check || (map->wrk && map->wrk->wanna_die)) { + if (map->scheduled_check || (map->wrk && map->wrk->state == rspamd_worker_state_running)) { /* Do not schedule check if some check is already scheduled */ return; } @@ -1897,7 +1897,7 @@ rspamd_map_process_periodic (struct map_periodic_cbdata *cbd) return; } - if (!(cbd->map->wrk && cbd->map->wrk->wanna_die)) { + if (!(cbd->map->wrk && cbd->map->wrk->state == rspamd_worker_state_running)) { bk = g_ptr_array_index (cbd->map->backends, cbd->cur_backend); g_assert (bk != NULL); diff --git a/src/rspamd.c b/src/rspamd.c index 495da45d9..4099e5003 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -691,8 +691,8 @@ kill_old_workers (gpointer key, gpointer value, gpointer unused) rspamd_main = w->srv; - if (!w->wanna_die) { - w->wanna_die = TRUE; + if (w->state == rspamd_worker_state_running) { + w->state = rspamd_worker_state_terminating; kill (w->pid, SIGUSR2); ev_io_stop (rspamd_main->event_loop, &w->srv_ev); msg_info_main ("send signal to worker %P", w->pid); @@ -1095,10 +1095,6 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main, rspamd_control_broadcast_srv_cmd (rspamd_main, &cmd, wrk->pid); } - if (wrk->finish_actions) { - g_ptr_array_free (wrk->finish_actions, TRUE); - } - need_refork = rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus); if (need_refork) { diff --git a/src/rspamd.h b/src/rspamd.h index d32681359..773be7c56 100644 --- a/src/rspamd.h +++ b/src/rspamd.h @@ -82,6 +82,14 @@ struct rspamd_worker_heartbeat { gint64 nbeats; /**< positive for beats received, negative for beats missed */ }; +enum rspamd_worker_state { + rspamd_worker_state_running = 0, + rspamd_worker_state_terminating, + rspamd_worker_wait_connections, + rspamd_worker_wait_final_scripts, + rspamd_worker_wanna_die +}; + /** * Worker process structure */ @@ -90,7 +98,7 @@ struct rspamd_worker { pid_t ppid; /**< pid of parent */ guint index; /**< index number */ guint nconns; /**< current connections count */ - gboolean wanna_die; /**< worker is terminating */ + enum rspamd_worker_state state; /**< current worker state */ gboolean cores_throttled; /**< set to true if cores throttling took place */ gdouble start_time; /**< start time */ struct rspamd_main *srv; /**< pointer to server structure */ @@ -108,7 +116,6 @@ struct rspamd_worker { struct rspamd_worker_heartbeat hb; /**< heartbeat data */ gpointer control_data; /**< used by control protocol to handle commands */ gpointer tmp_data; /**< used to avoid race condition to deal with control messages */ - GPtrArray *finish_actions; /**< called when worker is terminated */ ev_child cld_ev; /**< to allow reaping */ rspamd_worker_term_cb term_handler; /**< custom term handler */ }; diff --git a/src/worker.c b/src/worker.c index 04447feea..81ec0904a 100644 --- a/src/worker.c +++ b/src/worker.c @@ -69,56 +69,6 @@ worker_t normal_worker = { G_STRFUNC, \ __VA_ARGS__) -static gboolean -rspamd_worker_finalize (gpointer user_data) -{ - struct rspamd_task *task = user_data; - - if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) { - msg_info_task ("finishing actions has been processed, terminating"); - /* ev_break (task->event_loop, EVBREAK_ALL); */ - rspamd_session_destroy (task->s); - - return TRUE; - } - - return FALSE; -} - -static gboolean -rspamd_worker_call_finish_handlers (struct rspamd_worker *worker) -{ - struct rspamd_task *task; - struct rspamd_config *cfg = worker->srv->cfg; - struct rspamd_abstract_worker_ctx *ctx; - struct rspamd_config_cfg_lua_script *sc; - - if (cfg->on_term_scripts) { - ctx = worker->ctx; - /* Create a fake task object for async events */ - task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop); - task->resolver = ctx->resolver; - task->flags |= RSPAMD_TASK_FLAG_PROCESSING; - task->s = rspamd_session_create (task->task_pool, - rspamd_worker_finalize, - NULL, - (event_finalizer_t) rspamd_task_free, - task); - - DL_FOREACH (cfg->on_term_scripts, sc) { - lua_call_finish_script (sc, task); - } - - task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING; - - if (rspamd_session_pending (task->s)) { - return TRUE; - } - } - - return FALSE; -} - /* * Reduce number of tasks proceeded */ @@ -129,9 +79,20 @@ reduce_tasks_count (gpointer arg) worker->nconns --; - if (worker->wanna_die && worker->nconns == 0) { + if (worker->state == rspamd_worker_wait_connections && worker->nconns == 0) { + + worker->state = rspamd_worker_wait_final_scripts; msg_info ("performing finishing actions"); - rspamd_worker_call_finish_handlers (worker); + + if (rspamd_worker_call_finish_handlers (worker)) { + worker->state = rspamd_worker_wait_final_scripts; + } + else { + worker->state = rspamd_worker_wanna_die; + } + } + else if (worker->state != rspamd_worker_state_running) { + worker->state = rspamd_worker_wait_connections; } } @@ -596,19 +557,6 @@ init_worker (struct rspamd_config *cfg) return ctx; } -static gboolean -rspamd_worker_on_terminate (struct rspamd_worker *worker) -{ - if (worker->nconns == 0) { - msg_info ("performing finishing actions"); - if (rspamd_worker_call_finish_handlers (worker)) { - return TRUE; - } - } - - return FALSE; -} - void rspamd_worker_init_scanner (struct rspamd_worker *worker, struct ev_loop *ev_base, @@ -616,8 +564,6 @@ rspamd_worker_init_scanner (struct rspamd_worker *worker, struct rspamd_lang_detector **plang_det) { rspamd_stat_init (worker->srv->cfg, ev_base); - g_ptr_array_add (worker->finish_actions, - (gpointer) rspamd_worker_on_terminate); #ifdef WITH_HYPERSCAN rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_HYPERSCAN_LOADED, |