}
}
+static void
+rspamd_worker_on_term (EV_P_ ev_child *w, int revents)
+{
+ struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
+
+ if (wrk->term_handler) {
+ wrk->term_handler (EV_A_ w, wrk->srv, wrk);
+ }
+ else {
+ rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
+ }
+}
+
struct rspamd_worker *
rspamd_fork_worker (struct rspamd_main *rspamd_main,
- struct rspamd_worker_conf *cf,
- guint index,
- struct ev_loop *ev_base)
+ struct rspamd_worker_conf *cf,
+ guint index,
+ struct ev_loop *ev_base,
+ rspamd_worker_term_cb term_handler)
{
struct rspamd_worker *wrk;
gint rc;
wrk->ppid = getpid ();
wrk->pid = fork ();
wrk->cores_throttled = rspamd_main->cores_throttling;
+ wrk->term_handler = term_handler;
switch (wrk->pid) {
case 0:
rspamd_socket_nonblocking (wrk->control_pipe[0]);
rspamd_socket_nonblocking (wrk->srv_pipe[0]);
rspamd_srv_start_watching (rspamd_main, wrk, ev_base);
+ wrk->cld_ev.data = wrk;
+ ev_child_init (&wrk->cld_ev, rspamd_worker_on_term, wrk->pid, 0);
+ ev_child_start (rspamd_main->event_loop, &wrk->cld_ev);
/* Insert worker into worker's table, pid is index */
g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (
wrk->pid), wrk);
throttling, 0.0);
ev_timer_start (cur->event_loop, &cur->throttling_ev);
}
+}
+
+gboolean
+rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
+ struct rspamd_worker *wrk,
+ int res)
+{
+ gboolean need_refork = TRUE;
+
+ if (wrk->wanna_die) {
+ /* Do not refork workers that are intended to be terminated */
+ need_refork = FALSE;
+ }
+
+ if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
+ /* Normal worker termination, do not fork one more */
+ msg_info_main ("%s process %P terminated normally",
+ g_quark_to_string (wrk->type),
+ wrk->pid);
+ }
+ else {
+ if (WIFSIGNALED (res)) {
+#ifdef WCOREDUMP
+ if (WCOREDUMP (res)) {
+ msg_warn_main (
+ "%s process %P terminated abnormally by signal: %s"
+ " and created core file",
+ g_quark_to_string (wrk->type),
+ wrk->pid,
+ g_strsignal (WTERMSIG (res)));
+ }
+ else {
+#ifdef HAVE_SYS_RESOURCE_H
+ struct rlimit rlmt;
+ (void) getrlimit (RLIMIT_CORE, &rlmt);
+
+ msg_warn_main (
+ "%s process %P terminated abnormally by signal: %s"
+ " but NOT created core file (throttled=%s); "
+ "core file limits: %L current, %L max",
+ g_quark_to_string (wrk->type),
+ wrk->pid,
+ g_strsignal (WTERMSIG (res)),
+ wrk->cores_throttled ? "yes" : "no",
+ (gint64) rlmt.rlim_cur,
+ (gint64) rlmt.rlim_max);
+#else
+ msg_warn_main (
+ "%s process %P terminated abnormally by signal: %s"
+ " but NOT created core file (throttled=%s); ",
+ g_quark_to_string (wrk->type),
+ wrk->pid,
+ g_strsignal (WTERMSIG (res)),
+ wrk->cores_throttled ? "yes" : "no");
+#endif
+ }
+#else
+ msg_warn_main (
+ "%s process %P terminated abnormally by signal: %s",
+ g_quark_to_string (wrk->type),
+ wrk->pid,
+ g_strsignal (WTERMSIG (res)));
+#endif
+ if (WTERMSIG (res) == SIGUSR2) {
+ /*
+ * It is actually race condition when not started process
+ * has been requested to be reloaded.
+ *
+ * We shouldn't refork on this
+ */
+ need_refork = FALSE;
+ }
+ }
+ else {
+ msg_warn_main ("%s process %P terminated abnormally "
+ "with exit code %d",
+ g_quark_to_string (wrk->type),
+ wrk->pid,
+ WEXITSTATUS (res));
+ }
+ }
+
+ return need_refork;
}
\ No newline at end of file
#define TERMINATION_ATTEMPTS 50
static gboolean load_rspamd_config (struct rspamd_main *rspamd_main,
- struct rspamd_config *cfg,
- gboolean init_modules,
- enum rspamd_post_load_options opts,
- gboolean reload);
+ struct rspamd_config *cfg,
+ gboolean init_modules,
+ enum rspamd_post_load_options opts,
+ gboolean reload);
+static void rspamd_cld_handler (EV_P_ ev_child *w,
+ struct rspamd_main *rspamd_main,
+ struct rspamd_worker *wrk);
/* Control socket */
static gint control_fd;
+static ev_io control_ev;
+
+static gboolean valgrind_mode = FALSE;
/* Cmdline options */
static gboolean config_test = FALSE;
static gint term_attempts = 0;
-/* List of unrelated forked processes */
-static GArray *other_workers = NULL;
-
/* List of active listen sockets indexed by worker type */
static GHashTable *listen_sockets = NULL;
{
GError *error = NULL;
GOptionContext *context;
- guint i, cfg_num;
- pid_t r;
+ guint cfg_num;
context = g_option_context_new ("- run rspamd daemon");
#if defined(GIT_VERSION) && GIT_VERSION == 1
cfg->rspamd_user = rspamd_user;
cfg->rspamd_group = rspamd_group;
cfg_num = cfg_names != NULL ? g_strv_length (cfg_names) : 0;
+
if (cfg_num == 0) {
cfg->cfg_name = FIXED_CONFIG_FILE;
}
else {
cfg->cfg_name = cfg_names[0];
- }
-
- for (i = 1; i < cfg_num; i++) {
- r = fork ();
- if (r == 0) {
- /* Spawning new main process */
- cfg->cfg_name = cfg_names[i];
- (void)setsid ();
- }
- else if (r == -1) {
- fprintf (stderr,
- "fork failed while spawning process for %s configuration file: %s\n",
- cfg_names[i],
- strerror (errno));
- }
- else {
- /* Save pid to the list of other main processes, we need it to ignore SIGCHLD from them */
- g_array_append_val (other_workers, r);
- }
+ g_assert (cfg_num == 1);
}
cfg->pid_file = rspamd_pidfile;
ev_timer_stop (EV_A_ &waiting_worker->wait_ev);
rspamd_fork_worker (waiting_worker->rspamd_main, waiting_worker->cf,
waiting_worker->oldindex,
- waiting_worker->rspamd_main->event_loop);
+ waiting_worker->rspamd_main->event_loop,
+ rspamd_cld_handler);
REF_RELEASE (waiting_worker->cf);
g_free (waiting_worker);
}
}
static void
-spawn_worker_type (struct rspamd_main *rspamd_main, struct ev_loop *ev_base,
+spawn_worker_type (struct rspamd_main *rspamd_main, struct ev_loop *event_loop,
struct rspamd_worker_conf *cf)
{
gint i;
"cannot spawn more than 1 %s worker, so spawn one",
cf->worker->name);
}
- rspamd_fork_worker (rspamd_main, cf, 0, ev_base);
+ rspamd_fork_worker (rspamd_main, cf, 0, event_loop, rspamd_cld_handler);
}
else if (cf->worker->flags & RSPAMD_WORKER_THREADED) {
- rspamd_fork_worker (rspamd_main, cf, 0, ev_base);
+ rspamd_fork_worker (rspamd_main, cf, 0, event_loop, rspamd_cld_handler);
}
else {
for (i = 0; i < cf->count; i++) {
- rspamd_fork_worker (rspamd_main, cf, i, ev_base);
+ rspamd_fork_worker (rspamd_main, cf, i, event_loop,
+ rspamd_cld_handler);
}
}
}
return rspamd_worker_wait ((struct rspamd_worker *)value);
}
-static void
-rspamd_final_cld_handler (EV_P_ ev_signal *w, int revents)
-{
- struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
- g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback,
- NULL);
-
- if (g_hash_table_size (rspamd_main->workers) == 0) {
- ev_break (rspamd_main->event_loop, EVBREAK_ALL);
- }
-}
-
-
struct core_check_cbdata {
struct rspamd_config *cfg;
gsize total_count;
ev_io_stop (rspamd_main->event_loop, &cur->srv_ev);
}
+static void
+rspamd_final_timer_handler (EV_P_ ev_timer *w, int revents)
+{
+ struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
+
+ term_attempts--;
+
+ g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
+
+ if (g_hash_table_size (rspamd_main->workers) == 0) {
+ ev_break (rspamd_main->event_loop, EVBREAK_ALL);
+ }
+}
+
/* Signal handlers */
static void
rspamd_term_handler (struct ev_loop *loop, ev_signal *w, int revents)
{
struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
+ static ev_timer ev_finale;
- msg_info_main ("catch termination signal, waiting for children");
- rspamd_log_nolock (rspamd_main->logger);
- /* Stop srv events to avoid false notifications */
- g_hash_table_foreach (rspamd_main->workers, stop_srv_ev, rspamd_main);
- rspamd_pass_signal (rspamd_main->workers, w->signum);
+ if (!rspamd_main->wanna_die) {
+ rspamd_main->wanna_die = TRUE;
+ msg_info_main ("catch termination signal, waiting for children");
+ rspamd_log_nolock (rspamd_main->logger);
+ /* Stop srv events to avoid false notifications */
+ g_hash_table_foreach (rspamd_main->workers, stop_srv_ev, rspamd_main);
+ rspamd_pass_signal (rspamd_main->workers, w->signum);
+
+ if (control_fd != -1) {
+ ev_io_stop (rspamd_main->event_loop, &control_ev);
+ close (control_fd);
+ }
- ev_break (rspamd_main->event_loop, EVBREAK_ALL);
+ if (valgrind_mode) {
+ /* Special case if we are likely running with valgrind */
+ term_attempts = TERMINATION_ATTEMPTS * 10;
+ }
+ else {
+ term_attempts = TERMINATION_ATTEMPTS;
+ }
+
+ ev_finale.data = rspamd_main;
+ ev_timer_init (&ev_finale, rspamd_final_timer_handler, 0.2, 0.2);
+ ev_timer_start (rspamd_main->event_loop, &ev_finale);
+ }
}
static void
{
struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
- rspamd_log_reopen_priv (rspamd_main->logger,
- rspamd_main->workers_uid,
- rspamd_main->workers_gid);
- g_hash_table_foreach (rspamd_main->workers, reopen_log_handler,
- NULL);
+ if (!rspamd_main->wanna_die) {
+ rspamd_log_reopen_priv (rspamd_main->logger,
+ rspamd_main->workers_uid,
+ rspamd_main->workers_gid);
+ g_hash_table_foreach (rspamd_main->workers, reopen_log_handler,
+ NULL);
+ }
}
static void
{
struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
- msg_info_main ("rspamd "
- RVERSION
- " is restarting");
- g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL);
- rspamd_log_close_priv (rspamd_main->logger,
+ if (!rspamd_main->wanna_die) {
+ msg_info_main ("rspamd "
+ RVERSION
+ " is restarting");
+ g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL);
+ rspamd_log_close_priv (rspamd_main->logger,
FALSE,
rspamd_main->workers_uid,
rspamd_main->workers_gid);
- reread_config (rspamd_main);
- rspamd_check_core_limits (rspamd_main);
- spawn_workers (rspamd_main, rspamd_main->event_loop);
+ reread_config (rspamd_main);
+ rspamd_check_core_limits (rspamd_main);
+ spawn_workers (rspamd_main, rspamd_main->event_loop);
+ }
}
+/* Called when a dead child has been found */
+
static void
-rspamd_cld_handler (EV_P_ ev_signal *w, int revents)
+rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
+ struct rspamd_worker *wrk)
{
- struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
- guint i;
- gint res = 0;
- struct rspamd_worker *cur;
- pid_t pid;
- gboolean need_refork = TRUE, found_proc = FALSE;
+ gboolean need_refork;
/* Turn off locking for logger */
rspamd_log_nolock (rspamd_main->logger);
msg_info_main ("got SIGCHLD signal, finding terminated workers");
/* Remove dead child form children list */
- for (;;) {
- pid = waitpid (0, &res, WNOHANG);
-
- if (pid == -1) {
- if (errno != EINTR) {
- msg_warn_main ("got unexpected system error when waiting: %s",
- strerror (errno));
- break;
- }
- else {
- continue;
- }
- }
- else if (pid == 0) {
- /* No more processes to wait */
- break;
- }
-
- found_proc = TRUE;
-
- if ((cur =
- g_hash_table_lookup (rspamd_main->workers,
- GSIZE_TO_POINTER (pid))) != NULL) {
- /* Unlink dead process from queue and hash table */
-
- g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (
- pid));
-
- if (cur->wanna_die) {
- /* Do not refork workers that are intended to be terminated */
- need_refork = FALSE;
- }
-
- if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
- /* Normal worker termination, do not fork one more */
- msg_info_main ("%s process %P terminated normally",
- g_quark_to_string (cur->type),
- cur->pid);
- }
- else {
- if (WIFSIGNALED (res)) {
-#ifdef WCOREDUMP
- if (WCOREDUMP (res)) {
- msg_warn_main (
- "%s process %P terminated abnormally by signal: %s"
- " and created core file",
- g_quark_to_string (cur->type),
- cur->pid,
- g_strsignal (WTERMSIG (res)));
- }
- else {
-#ifdef HAVE_SYS_RESOURCE_H
- struct rlimit rlmt;
- (void)getrlimit (RLIMIT_CORE, &rlmt);
-
- msg_warn_main (
- "%s process %P terminated abnormally by signal: %s"
- " but NOT created core file (throttled=%s); "
- "core file limits: %L current, %L max",
- g_quark_to_string (cur->type),
- cur->pid,
- g_strsignal (WTERMSIG (res)),
- cur->cores_throttled ? "yes" : "no",
- (gint64)rlmt.rlim_cur,
- (gint64)rlmt.rlim_max);
-#else
- msg_warn_main (
- "%s process %P terminated abnormally by signal: %s"
- " but NOT created core file (throttled=%s); ",
- g_quark_to_string (cur->type),
- cur->pid,
- g_strsignal (WTERMSIG (res)),
- cur->cores_throttled ? "yes" : "no");
-#endif
- }
-#else
- msg_warn_main (
- "%s process %P terminated abnormally by signal: %s",
- g_quark_to_string (cur->type),
- cur->pid,
- g_strsignal (WTERMSIG (res)));
-#endif
- if (WTERMSIG (res) == SIGUSR2) {
- /*
- * It is actually race condition when not started process
- * has been requested to be reloaded.
- *
- * We shouldn't refork on this
- */
- need_refork = FALSE;
- }
- }
- else {
- msg_warn_main ("%s process %P terminated abnormally "
- "with exit code %d",
- g_quark_to_string (cur->type),
- cur->pid,
- WEXITSTATUS (res));
- }
-
- if (need_refork) {
- /* Fork another worker in replace of dead one */
- rspamd_check_core_limits (rspamd_main);
-
-
- rspamd_fork_delayed (cur->cf, cur->index, rspamd_main);
- }
- }
-
- if (cur->srv_pipe[0] != -1) {
- /* Ugly workaround */
- if (cur->tmp_data) {
- g_free (cur->tmp_data);
- }
- ev_io_stop (rspamd_main->event_loop, &cur->srv_ev);
- }
-
- if (cur->control_pipe[0] != -1) {
- /* We also need to clean descriptors left */
- close (cur->control_pipe[0]);
- close (cur->srv_pipe[0]);
- }
-
- REF_RELEASE (cur->cf);
-
- if (cur->finish_actions) {
- g_ptr_array_free (cur->finish_actions, TRUE);
- }
-
- g_free (cur);
- }
- else {
- for (i = 0; i < other_workers->len; i++) {
- if (g_array_index (other_workers, pid_t, i) == pid) {
- g_array_remove_index_fast (other_workers, i);
- msg_info_main ("related process %P terminated", pid);
- }
- }
+ g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (wrk->pid));
+ if (wrk->srv_pipe[0] != -1) {
+ /* Ugly workaround */
+ if (wrk->tmp_data) {
+ g_free (wrk->tmp_data);
}
+ ev_io_stop (rspamd_main->event_loop, &wrk->srv_ev);
}
- if (!found_proc) {
- msg_err_main ("got SIGCHLD but no workers were able to be waited: %s",
- strerror (errno));
+ if (wrk->control_pipe[0] != -1) {
+ /* We also need to clean descriptors left */
+ close (wrk->control_pipe[0]);
+ close (wrk->srv_pipe[0]);
}
- rspamd_log_lock (rspamd_main->logger);
-}
+ REF_RELEASE (wrk->cf);
-static void
-rspamd_final_term_handler (EV_P_ ev_timer *w, int revents)
-{
- struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
+ if (wrk->finish_actions) {
+ g_ptr_array_free (wrk->finish_actions, TRUE);
+ }
- term_attempts--;
+ need_refork = rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
- g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
-
- if (g_hash_table_size (rspamd_main->workers) == 0) {
- ev_break (rspamd_main->event_loop, EVBREAK_ALL);
+ if (need_refork) {
+ /* Fork another worker in replace of dead one */
+ rspamd_check_core_limits (rspamd_main);
+ rspamd_fork_delayed (wrk->cf, wrk->index, rspamd_main);
}
+
+ g_free (wrk);
+ rspamd_log_lock (rspamd_main->logger);
}
/* Control socket handler */
GQuark type;
rspamd_inet_addr_t *control_addr = NULL;
struct ev_loop *event_loop;
- static ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev;
- static ev_io control_ev;
+ static ev_signal term_ev, int_ev, hup_ev, usr1_ev;
struct rspamd_main *rspamd_main;
- gboolean skip_pid = FALSE, valgrind_mode = FALSE;
+ gboolean skip_pid = FALSE;
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
g_thread_init (NULL);
rspamd_main->cfg->libs_ctx = rspamd_init_libs ();
memset (&signals, 0, sizeof (struct sigaction));
- other_workers = g_array_new (FALSE, TRUE, sizeof (pid_t));
read_cmd_line (&argc, &argv, rspamd_main->cfg);
/* Set title */
setproctitle ("main process");
-
/* Flush log */
rspamd_log_flush (rspamd_main->logger);
usr1_ev.data = rspamd_main;
ev_signal_start (event_loop, &usr1_ev);
-
- /* XXX: deal with children */
- cld_ev.data = rspamd_main;
- ev_signal_init (&cld_ev, rspamd_cld_handler, SIGCHLD);
- ev_signal_start (event_loop, &cld_ev);
-
rspamd_check_core_limits (rspamd_main);
rspamd_mempool_lock_mutex (rspamd_main->start_mtx);
spawn_workers (rspamd_main, event_loop);
ev_io_start (event_loop, &control_ev);
}
- ev_loop (event_loop, 0);
- /* We need to block signals unless children are waited for */
- rspamd_worker_block_signals ();
- ev_signal_stop (event_loop, &term_ev);
- ev_signal_stop (event_loop, &int_ev);
- ev_signal_stop (event_loop, &hup_ev);
- ev_signal_stop (event_loop, &cld_ev);
- ev_signal_stop (event_loop, &usr1_ev);
-
- if (control_fd != -1) {
- ev_io_stop (event_loop, &control_ev);
- close (control_fd);
- }
-
- if (valgrind_mode) {
- /* Special case if we are likely running with valgrind */
- term_attempts = TERMINATION_ATTEMPTS * 10;
- }
- else {
- term_attempts = TERMINATION_ATTEMPTS;
- }
-
-
- /* Wait for workers termination */
- ev_signal_init (&cld_ev, rspamd_final_cld_handler, SIGCHLD);
- ev_signal_start (event_loop, &cld_ev);
-
- g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
-
- static ev_timer ev_finale;
-
- ev_finale.data = rspamd_main;
- ev_timer_init (&ev_finale, rspamd_final_term_handler, 0.2, 0.2);
- ev_timer_start (event_loop, &ev_finale);
-
ev_loop (event_loop, 0);
/* Maybe save roll history */