aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/fuzzy_storage.c2
-rw-r--r--src/libserver/worker_util.c132
-rw-r--r--src/libserver/worker_util.h7
-rw-r--r--src/libutil/map.c6
-rw-r--r--src/rspamd.c8
-rw-r--r--src/rspamd.h11
-rw-r--r--src/worker.c80
7 files changed, 128 insertions, 118 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index f7aec3e27..f8b18b78a 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -481,7 +481,7 @@ rspamd_fuzzy_updates_cb (gboolean success,
}
}
- if (ctx->worker->wanna_die) {
+ if (ctx->worker->state != rspamd_worker_state_running) {
/* Plan exit */
ev_break (ctx->event_loop, EVBREAK_ALL);
}
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index ddf74136d..63342a72a 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -113,47 +113,102 @@ rspamd_worker_check_finished (EV_P_ ev_timer *w, int revents)
}
}
+static gboolean
+rspamd_worker_finalize (gpointer user_data)
+{
+ struct rspamd_task *task = user_data;
+
+ if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
+ msg_info_task ("finishing actions has been processed, terminating");
+ /* ev_break (task->event_loop, EVBREAK_ALL); */
+ task->worker->state = rspamd_worker_wanna_die;
+ rspamd_session_destroy (task->s);
+
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+gboolean
+rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
+{
+ struct rspamd_task *task;
+ struct rspamd_config *cfg = worker->srv->cfg;
+ struct rspamd_abstract_worker_ctx *ctx;
+ struct rspamd_config_cfg_lua_script *sc;
+
+ if (cfg->on_term_scripts) {
+ ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx;
+ /* Create a fake task object for async events */
+ task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop);
+ task->resolver = ctx->resolver;
+ task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
+ task->s = rspamd_session_create (task->task_pool,
+ rspamd_worker_finalize,
+ NULL,
+ (event_finalizer_t) rspamd_task_free,
+ task);
+
+ DL_FOREACH (cfg->on_term_scripts, sc) {
+ lua_call_finish_script (sc, task);
+ }
+
+ task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING;
+
+ if (rspamd_session_pending (task->s)) {
+ return TRUE;
+ }
+ }
+
+ return FALSE;
+}
+
static void
rspamd_worker_terminate_handlers (struct rspamd_worker *w)
{
- guint i;
- gboolean (*cb)(struct rspamd_worker *);
struct rspamd_abstract_worker_ctx *actx;
- struct ev_loop *final_gift, *orig_loop;
- static ev_timer margin_call;
- static int nchecks = 0;
-
- if (w->finish_actions->len == 0) {
- /* Nothing to do */
- return;
- }
actx = (struct rspamd_abstract_worker_ctx *)w->ctx;
- /*
- * Here are dragons:
- * - we create a new loop
- * - we set a new ev_loop for worker via injection over rspamd_abstract_worker_ctx
- * - then we run finish actions
- * - then we create a special timer to kill worker if it fails to finish
- */
- final_gift = ev_loop_new (EVBACKEND_ALL);
- orig_loop = actx->event_loop;
- actx->event_loop = final_gift;
- margin_call.data = &nchecks;
- ev_timer_init (&margin_call, rspamd_worker_check_finished, 0.1,
- 0.1);
- ev_timer_start (final_gift, &margin_call);
-
- for (i = 0; i < w->finish_actions->len; i ++) {
- cb = g_ptr_array_index (w->finish_actions, i);
- cb (w);
+ if (w->nconns == 0 &&
+ (!(w->flags & RSPAMD_WORKER_SCANNER) || w->srv->cfg->on_term_scripts == NULL)) {
+ /*
+ * We are here either:
+ * - No active connections are represented
+ * - No term scripts are registered
+ * - Worker is not a scanner, so it can die safely
+ */
+ ev_break (actx->event_loop, EVBREAK_ALL);
+ w->state = rspamd_worker_wanna_die;
+
+ return;
+ }
+ else if (w->nconns > 0) {
+ /*
+ * Wait until all connections are terminated
+ */
+ w->state = rspamd_worker_wait_connections;
}
+ else {
+ /*
+ * Start finish scripts
+ */
+ w->state = rspamd_worker_wait_final_scripts;
+ msg_info ("performing finishing actions");
- ev_run (final_gift, 0);
- ev_loop_destroy (final_gift);
- /* Restore original loop */
- actx->event_loop = orig_loop;
+ if ((w->flags & RSPAMD_WORKER_SCANNER) &&
+ rspamd_worker_call_finish_handlers (w)) {
+ w->state = rspamd_worker_wait_final_scripts;
+ }
+ else {
+ /*
+ * We are done now
+ */
+ ev_break (actx->event_loop, EVBREAK_ALL);
+ w->state = rspamd_worker_wanna_die;
+ }
+ }
}
static void
@@ -172,7 +227,7 @@ static gboolean
rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
{
/* Do not accept new connections, preparing to end worker's process */
- if (!sigh->worker->wanna_die) {
+ if (sigh->worker->state == rspamd_worker_state_running) {
static ev_timer shutdown_ev;
ev_tstamp shutdown_ts;
@@ -180,8 +235,8 @@ rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg
sigh->worker->srv->cfg->task_timeout * 2.0);
rspamd_worker_ignore_signal (sigh);
- sigh->worker->wanna_die = TRUE;
- rspamd_worker_terminate_handlers (sigh->worker);
+ sigh->worker->state = rspamd_worker_state_terminating;
+
rspamd_default_log_function (G_LOG_LEVEL_INFO,
sigh->worker->srv->server_pool->tag.tagname,
sigh->worker->srv->server_pool->tag.uid,
@@ -216,10 +271,11 @@ rspamd_worker_usr1_handler (struct rspamd_worker_signal_handler *sigh, void *arg
static gboolean
rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
{
- if (!sigh->worker->wanna_die) {
+ if (sigh->worker->state == rspamd_worker_state_running) {
static ev_timer shutdown_ev;
rspamd_worker_ignore_signal (sigh);
+ sigh->worker->state = rspamd_worker_state_terminating;
rspamd_default_log_function (G_LOG_LEVEL_INFO,
sigh->worker->srv->server_pool->tag.tagname,
sigh->worker->srv->server_pool->tag.uid,
@@ -228,7 +284,6 @@ rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg
g_strsignal (sigh->signo));
rspamd_worker_terminate_handlers (sigh->worker);
- sigh->worker->wanna_die = 1;
ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
0.0, 0.0);
ev_timer_start (sigh->event_loop, &shutdown_ev);
@@ -862,7 +917,6 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
REF_RETAIN (cf);
wrk->index = index;
wrk->ctx = cf->ctx;
- wrk->finish_actions = g_ptr_array_new ();
wrk->ppid = getpid ();
wrk->pid = fork ();
wrk->cores_throttled = rspamd_main->cores_throttling;
@@ -1377,7 +1431,7 @@ rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
{
gboolean need_refork = TRUE;
- if (wrk->wanna_die || rspamd_main->wanna_die) {
+ if (wrk->state != rspamd_worker_state_running || rspamd_main->wanna_die) {
/* Do not refork workers that are intended to be terminated */
need_refork = FALSE;
}
diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h
index 6fbda0b4a..15d79df2f 100644
--- a/src/libserver/worker_util.h
+++ b/src/libserver/worker_util.h
@@ -237,6 +237,13 @@ void rspamd_worker_throttle_accept_events (gint sock, void *data);
gboolean rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
struct rspamd_worker *wrk, int status);
+/**
+ * Call for final scripts for a worker
+ * @param worker
+ * @return
+ */
+gboolean rspamd_worker_call_finish_handlers (struct rspamd_worker *worker);
+
#ifdef WITH_HYPERSCAN
struct rspamd_control_command;
diff --git a/src/libutil/map.c b/src/libutil/map.c
index 441b408ba..8da3c7bd4 100644
--- a/src/libutil/map.c
+++ b/src/libutil/map.c
@@ -970,7 +970,7 @@ rspamd_map_periodic_dtor (struct map_periodic_cbdata *periodic)
g_atomic_int_set (periodic->map->locked, 0);
msg_debug_map ("unlocked map %s", periodic->map->name);
- if (!periodic->map->wrk->wanna_die) {
+ if (periodic->map->wrk->state == rspamd_worker_state_running) {
rspamd_map_schedule_periodic (periodic->map,
RSPAMD_SYMBOL_RESULT_NORMAL);
}
@@ -1001,7 +1001,7 @@ rspamd_map_schedule_periodic (struct rspamd_map *map, int how)
gdouble timeout;
struct map_periodic_cbdata *cbd;
- if (map->scheduled_check || (map->wrk && map->wrk->wanna_die)) {
+ if (map->scheduled_check || (map->wrk && map->wrk->state == rspamd_worker_state_running)) {
/* Do not schedule check if some check is already scheduled */
return;
}
@@ -1897,7 +1897,7 @@ rspamd_map_process_periodic (struct map_periodic_cbdata *cbd)
return;
}
- if (!(cbd->map->wrk && cbd->map->wrk->wanna_die)) {
+ if (!(cbd->map->wrk && cbd->map->wrk->state == rspamd_worker_state_running)) {
bk = g_ptr_array_index (cbd->map->backends, cbd->cur_backend);
g_assert (bk != NULL);
diff --git a/src/rspamd.c b/src/rspamd.c
index 495da45d9..4099e5003 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -691,8 +691,8 @@ kill_old_workers (gpointer key, gpointer value, gpointer unused)
rspamd_main = w->srv;
- if (!w->wanna_die) {
- w->wanna_die = TRUE;
+ if (w->state == rspamd_worker_state_running) {
+ w->state = rspamd_worker_state_terminating;
kill (w->pid, SIGUSR2);
ev_io_stop (rspamd_main->event_loop, &w->srv_ev);
msg_info_main ("send signal to worker %P", w->pid);
@@ -1095,10 +1095,6 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
rspamd_control_broadcast_srv_cmd (rspamd_main, &cmd, wrk->pid);
}
- if (wrk->finish_actions) {
- g_ptr_array_free (wrk->finish_actions, TRUE);
- }
-
need_refork = rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
if (need_refork) {
diff --git a/src/rspamd.h b/src/rspamd.h
index d32681359..773be7c56 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -82,6 +82,14 @@ struct rspamd_worker_heartbeat {
gint64 nbeats; /**< positive for beats received, negative for beats missed */
};
+enum rspamd_worker_state {
+ rspamd_worker_state_running = 0,
+ rspamd_worker_state_terminating,
+ rspamd_worker_wait_connections,
+ rspamd_worker_wait_final_scripts,
+ rspamd_worker_wanna_die
+};
+
/**
* Worker process structure
*/
@@ -90,7 +98,7 @@ struct rspamd_worker {
pid_t ppid; /**< pid of parent */
guint index; /**< index number */
guint nconns; /**< current connections count */
- gboolean wanna_die; /**< worker is terminating */
+ enum rspamd_worker_state state; /**< current worker state */
gboolean cores_throttled; /**< set to true if cores throttling took place */
gdouble start_time; /**< start time */
struct rspamd_main *srv; /**< pointer to server structure */
@@ -108,7 +116,6 @@ struct rspamd_worker {
struct rspamd_worker_heartbeat hb; /**< heartbeat data */
gpointer control_data; /**< used by control protocol to handle commands */
gpointer tmp_data; /**< used to avoid race condition to deal with control messages */
- GPtrArray *finish_actions; /**< called when worker is terminated */
ev_child cld_ev; /**< to allow reaping */
rspamd_worker_term_cb term_handler; /**< custom term handler */
};
diff --git a/src/worker.c b/src/worker.c
index 04447feea..81ec0904a 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -69,56 +69,6 @@ worker_t normal_worker = {
G_STRFUNC, \
__VA_ARGS__)
-static gboolean
-rspamd_worker_finalize (gpointer user_data)
-{
- struct rspamd_task *task = user_data;
-
- if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
- msg_info_task ("finishing actions has been processed, terminating");
- /* ev_break (task->event_loop, EVBREAK_ALL); */
- rspamd_session_destroy (task->s);
-
- return TRUE;
- }
-
- return FALSE;
-}
-
-static gboolean
-rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
-{
- struct rspamd_task *task;
- struct rspamd_config *cfg = worker->srv->cfg;
- struct rspamd_abstract_worker_ctx *ctx;
- struct rspamd_config_cfg_lua_script *sc;
-
- if (cfg->on_term_scripts) {
- ctx = worker->ctx;
- /* Create a fake task object for async events */
- task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop);
- task->resolver = ctx->resolver;
- task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
- task->s = rspamd_session_create (task->task_pool,
- rspamd_worker_finalize,
- NULL,
- (event_finalizer_t) rspamd_task_free,
- task);
-
- DL_FOREACH (cfg->on_term_scripts, sc) {
- lua_call_finish_script (sc, task);
- }
-
- task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING;
-
- if (rspamd_session_pending (task->s)) {
- return TRUE;
- }
- }
-
- return FALSE;
-}
-
/*
* Reduce number of tasks proceeded
*/
@@ -129,9 +79,20 @@ reduce_tasks_count (gpointer arg)
worker->nconns --;
- if (worker->wanna_die && worker->nconns == 0) {
+ if (worker->state == rspamd_worker_wait_connections && worker->nconns == 0) {
+
+ worker->state = rspamd_worker_wait_final_scripts;
msg_info ("performing finishing actions");
- rspamd_worker_call_finish_handlers (worker);
+
+ if (rspamd_worker_call_finish_handlers (worker)) {
+ worker->state = rspamd_worker_wait_final_scripts;
+ }
+ else {
+ worker->state = rspamd_worker_wanna_die;
+ }
+ }
+ else if (worker->state != rspamd_worker_state_running) {
+ worker->state = rspamd_worker_wait_connections;
}
}
@@ -596,19 +557,6 @@ init_worker (struct rspamd_config *cfg)
return ctx;
}
-static gboolean
-rspamd_worker_on_terminate (struct rspamd_worker *worker)
-{
- if (worker->nconns == 0) {
- msg_info ("performing finishing actions");
- if (rspamd_worker_call_finish_handlers (worker)) {
- return TRUE;
- }
- }
-
- return FALSE;
-}
-
void
rspamd_worker_init_scanner (struct rspamd_worker *worker,
struct ev_loop *ev_base,
@@ -616,8 +564,6 @@ rspamd_worker_init_scanner (struct rspamd_worker *worker,
struct rspamd_lang_detector **plang_det)
{
rspamd_stat_init (worker->srv->cfg, ev_base);
- g_ptr_array_add (worker->finish_actions,
- (gpointer) rspamd_worker_on_terminate);
#ifdef WITH_HYPERSCAN
rspamd_control_worker_add_cmd_handler (worker,
RSPAMD_CONTROL_HYPERSCAN_LOADED,