}
}
+static gboolean
+rspamd_worker_finalize (gpointer user_data)
+{
+ struct rspamd_task *task = user_data;
+
+ if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
+ msg_info_task ("finishing actions has been processed, terminating");
+ /* ev_break (task->event_loop, EVBREAK_ALL); */
+ task->worker->state = rspamd_worker_wanna_die;
+ rspamd_session_destroy (task->s);
+
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+gboolean
+rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
+{
+ struct rspamd_task *task;
+ struct rspamd_config *cfg = worker->srv->cfg;
+ struct rspamd_abstract_worker_ctx *ctx;
+ struct rspamd_config_cfg_lua_script *sc;
+
+ if (cfg->on_term_scripts) {
+ ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx;
+ /* Create a fake task object for async events */
+ task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop);
+ task->resolver = ctx->resolver;
+ task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
+ task->s = rspamd_session_create (task->task_pool,
+ rspamd_worker_finalize,
+ NULL,
+ (event_finalizer_t) rspamd_task_free,
+ task);
+
+ DL_FOREACH (cfg->on_term_scripts, sc) {
+ lua_call_finish_script (sc, task);
+ }
+
+ task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING;
+
+ if (rspamd_session_pending (task->s)) {
+ return TRUE;
+ }
+ }
+
+ return FALSE;
+}
+
static void
rspamd_worker_terminate_handlers (struct rspamd_worker *w)
{
- guint i;
- gboolean (*cb)(struct rspamd_worker *);
struct rspamd_abstract_worker_ctx *actx;
- struct ev_loop *final_gift, *orig_loop;
- static ev_timer margin_call;
- static int nchecks = 0;
-
- if (w->finish_actions->len == 0) {
- /* Nothing to do */
- return;
- }
actx = (struct rspamd_abstract_worker_ctx *)w->ctx;
- /*
- * Here are dragons:
- * - we create a new loop
- * - we set a new ev_loop for worker via injection over rspamd_abstract_worker_ctx
- * - then we run finish actions
- * - then we create a special timer to kill worker if it fails to finish
- */
- final_gift = ev_loop_new (EVBACKEND_ALL);
- orig_loop = actx->event_loop;
- actx->event_loop = final_gift;
- margin_call.data = &nchecks;
- ev_timer_init (&margin_call, rspamd_worker_check_finished, 0.1,
- 0.1);
- ev_timer_start (final_gift, &margin_call);
-
- for (i = 0; i < w->finish_actions->len; i ++) {
- cb = g_ptr_array_index (w->finish_actions, i);
- cb (w);
+ if (w->nconns == 0 &&
+ (!(w->flags & RSPAMD_WORKER_SCANNER) || w->srv->cfg->on_term_scripts == NULL)) {
+ /*
+ * We are here either:
+ * - No active connections are represented
+ * - No term scripts are registered
+ * - Worker is not a scanner, so it can die safely
+ */
+ ev_break (actx->event_loop, EVBREAK_ALL);
+ w->state = rspamd_worker_wanna_die;
+
+ return;
+ }
+ else if (w->nconns > 0) {
+ /*
+ * Wait until all connections are terminated
+ */
+ w->state = rspamd_worker_wait_connections;
}
+ else {
+ /*
+ * Start finish scripts
+ */
+ w->state = rspamd_worker_wait_final_scripts;
+ msg_info ("performing finishing actions");
- ev_run (final_gift, 0);
- ev_loop_destroy (final_gift);
- /* Restore original loop */
- actx->event_loop = orig_loop;
+ if ((w->flags & RSPAMD_WORKER_SCANNER) &&
+ rspamd_worker_call_finish_handlers (w)) {
+ w->state = rspamd_worker_wait_final_scripts;
+ }
+ else {
+ /*
+ * We are done now
+ */
+ ev_break (actx->event_loop, EVBREAK_ALL);
+ w->state = rspamd_worker_wanna_die;
+ }
+ }
}
static void
rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
{
/* Do not accept new connections, preparing to end worker's process */
- if (!sigh->worker->wanna_die) {
+ if (sigh->worker->state == rspamd_worker_state_running) {
static ev_timer shutdown_ev;
ev_tstamp shutdown_ts;
sigh->worker->srv->cfg->task_timeout * 2.0);
rspamd_worker_ignore_signal (sigh);
- sigh->worker->wanna_die = TRUE;
- rspamd_worker_terminate_handlers (sigh->worker);
+ sigh->worker->state = rspamd_worker_state_terminating;
+
rspamd_default_log_function (G_LOG_LEVEL_INFO,
sigh->worker->srv->server_pool->tag.tagname,
sigh->worker->srv->server_pool->tag.uid,
static gboolean
rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
{
- if (!sigh->worker->wanna_die) {
+ if (sigh->worker->state == rspamd_worker_state_running) {
static ev_timer shutdown_ev;
rspamd_worker_ignore_signal (sigh);
+ sigh->worker->state = rspamd_worker_state_terminating;
rspamd_default_log_function (G_LOG_LEVEL_INFO,
sigh->worker->srv->server_pool->tag.tagname,
sigh->worker->srv->server_pool->tag.uid,
g_strsignal (sigh->signo));
rspamd_worker_terminate_handlers (sigh->worker);
- sigh->worker->wanna_die = 1;
ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
0.0, 0.0);
ev_timer_start (sigh->event_loop, &shutdown_ev);
REF_RETAIN (cf);
wrk->index = index;
wrk->ctx = cf->ctx;
- wrk->finish_actions = g_ptr_array_new ();
wrk->ppid = getpid ();
wrk->pid = fork ();
wrk->cores_throttled = rspamd_main->cores_throttling;
{
gboolean need_refork = TRUE;
- if (wrk->wanna_die || rspamd_main->wanna_die) {
+ if (wrk->state != rspamd_worker_state_running || rspamd_main->wanna_die) {
/* Do not refork workers that are intended to be terminated */
need_refork = FALSE;
}
gint64 nbeats; /**< positive for beats received, negative for beats missed */
};
+enum rspamd_worker_state {
+ rspamd_worker_state_running = 0,
+ rspamd_worker_state_terminating,
+ rspamd_worker_wait_connections,
+ rspamd_worker_wait_final_scripts,
+ rspamd_worker_wanna_die
+};
+
/**
* Worker process structure
*/
pid_t ppid; /**< pid of parent */
guint index; /**< index number */
guint nconns; /**< current connections count */
- gboolean wanna_die; /**< worker is terminating */
+ enum rspamd_worker_state state; /**< current worker state */
gboolean cores_throttled; /**< set to true if cores throttling took place */
gdouble start_time; /**< start time */
struct rspamd_main *srv; /**< pointer to server structure */
struct rspamd_worker_heartbeat hb; /**< heartbeat data */
gpointer control_data; /**< used by control protocol to handle commands */
gpointer tmp_data; /**< used to avoid race condition to deal with control messages */
- GPtrArray *finish_actions; /**< called when worker is terminated */
ev_child cld_ev; /**< to allow reaping */
rspamd_worker_term_cb term_handler; /**< custom term handler */
};
G_STRFUNC, \
__VA_ARGS__)
-static gboolean
-rspamd_worker_finalize (gpointer user_data)
-{
- struct rspamd_task *task = user_data;
-
- if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
- msg_info_task ("finishing actions has been processed, terminating");
- /* ev_break (task->event_loop, EVBREAK_ALL); */
- rspamd_session_destroy (task->s);
-
- return TRUE;
- }
-
- return FALSE;
-}
-
-static gboolean
-rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
-{
- struct rspamd_task *task;
- struct rspamd_config *cfg = worker->srv->cfg;
- struct rspamd_abstract_worker_ctx *ctx;
- struct rspamd_config_cfg_lua_script *sc;
-
- if (cfg->on_term_scripts) {
- ctx = worker->ctx;
- /* Create a fake task object for async events */
- task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop);
- task->resolver = ctx->resolver;
- task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
- task->s = rspamd_session_create (task->task_pool,
- rspamd_worker_finalize,
- NULL,
- (event_finalizer_t) rspamd_task_free,
- task);
-
- DL_FOREACH (cfg->on_term_scripts, sc) {
- lua_call_finish_script (sc, task);
- }
-
- task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING;
-
- if (rspamd_session_pending (task->s)) {
- return TRUE;
- }
- }
-
- return FALSE;
-}
-
/*
* Reduce number of tasks proceeded
*/
worker->nconns --;
- if (worker->wanna_die && worker->nconns == 0) {
+ if (worker->state == rspamd_worker_wait_connections && worker->nconns == 0) {
+
+ worker->state = rspamd_worker_wait_final_scripts;
msg_info ("performing finishing actions");
- rspamd_worker_call_finish_handlers (worker);
+
+ if (rspamd_worker_call_finish_handlers (worker)) {
+ worker->state = rspamd_worker_wait_final_scripts;
+ }
+ else {
+ worker->state = rspamd_worker_wanna_die;
+ }
+ }
+ else if (worker->state != rspamd_worker_state_running) {
+ worker->state = rspamd_worker_wait_connections;
}
}
return ctx;
}
-static gboolean
-rspamd_worker_on_terminate (struct rspamd_worker *worker)
-{
- if (worker->nconns == 0) {
- msg_info ("performing finishing actions");
- if (rspamd_worker_call_finish_handlers (worker)) {
- return TRUE;
- }
- }
-
- return FALSE;
-}
-
void
rspamd_worker_init_scanner (struct rspamd_worker *worker,
struct ev_loop *ev_base,
struct rspamd_lang_detector **plang_det)
{
rspamd_stat_init (worker->srv->cfg, ev_base);
- g_ptr_array_add (worker->finish_actions,
- (gpointer) rspamd_worker_on_terminate);
#ifdef WITH_HYPERSCAN
rspamd_control_worker_add_cmd_handler (worker,
RSPAMD_CONTROL_HYPERSCAN_LOADED,