From 390620fc357bfdb9e7f20835e3c61e857e3a5da2 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 20 Jun 2019 16:11:07 +0100 Subject: [PATCH] [Rework] Rework children operations --- contrib/libev/CMakeLists.txt | 6 +- contrib/libev/ev.c | 16 +- contrib/libev/ev.h | 17 -- src/libserver/worker_util.c | 107 +++++++++- src/libserver/worker_util.h | 15 +- src/rspamd.c | 375 ++++++++++------------------------- src/rspamd.h | 6 + 7 files changed, 246 insertions(+), 296 deletions(-) diff --git a/contrib/libev/CMakeLists.txt b/contrib/libev/CMakeLists.txt index d363c3dbc..f0350050d 100644 --- a/contrib/libev/CMakeLists.txt +++ b/contrib/libev/CMakeLists.txt @@ -55,7 +55,7 @@ ENDIF() CONFIGURE_FILE(config.h.in libev-config.h) -ADD_LIBRARY(rspamd-ev STATIC ${LIBEVSRC}) +ADD_LIBRARY(rspamd-ev SHARED ${LIBEVSRC}) ADD_DEFINITIONS("-DEV_CONFIG_H=\"${CMAKE_CURRENT_BINARY_DIR}/libev-config.h\"" -DEV_MULTIPLICITY=1 -DEV_USE_FLOOR=1 @@ -68,4 +68,6 @@ ENDIF() IF(ENABLE_FULL_DEBUG MATCHES "ON") ADD_DEFINITIONS(-DEV_VERIFY=3) -ENDIF() \ No newline at end of file +ENDIF() + +INSTALL(TARGETS rspamd-ev LIBRARY DESTINATION ${RSPAMD_LIBDIR}) \ No newline at end of file diff --git a/contrib/libev/ev.c b/contrib/libev/ev.c index 4f3f0b3ce..cb8127fc5 100644 --- a/contrib/libev/ev.c +++ b/contrib/libev/ev.c @@ -1840,9 +1840,7 @@ typedef struct #include "ev_wrap.h" static struct ev_loop default_loop_struct; - EV_API_DECL struct ev_loop *ev_default_loop_ptr; /* needs to be initialised to make it a definition despite extern */ - struct ev_loop *ev_default_loop_ptr = 0; - + static struct ev_loop *ev_default_loop_ptr = 0; #else EV_API_DECL ev_tstamp ev_rt_now = 0; /* needs to be initialised to make it a definition despite extern */ @@ -2855,6 +2853,18 @@ ev_set_loop_release_cb (EV_P_ void (*release)(EV_P) EV_NOEXCEPT, void (*acquire) } #endif +EV_INLINE struct ev_loop * +ev_default_loop_uc_ (void) EV_NOEXCEPT +{ + return ev_default_loop_ptr; +} + +EV_INLINE int +ev_is_default_loop (EV_P) EV_NOEXCEPT +{ + return EV_A == EV_DEFAULT_UC; +} + /* initialise a loop structure, must be zero-initialised */ noinline ecb_cold static void diff --git a/contrib/libev/ev.h b/contrib/libev/ev.h index f76c892e4..cb7b2e479 100644 --- a/contrib/libev/ev.h +++ b/contrib/libev/ev.h @@ -557,23 +557,6 @@ EV_API_DECL void ev_set_syserr_cb (void (*cb)(const char *msg) EV_NOEXCEPT) EV_N /* you can call this as often as you like */ EV_API_DECL struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0)) EV_NOEXCEPT; -#ifdef EV_API_STATIC -EV_API_DECL struct ev_loop *ev_default_loop_ptr; -#endif - -EV_INLINE struct ev_loop * -ev_default_loop_uc_ (void) EV_NOEXCEPT -{ - extern struct ev_loop *ev_default_loop_ptr; - - return ev_default_loop_ptr; -} - -EV_INLINE int -ev_is_default_loop (EV_P) EV_NOEXCEPT -{ - return EV_A == EV_DEFAULT_UC; -} /* create and destroy alternative loops that don't handle signals */ EV_API_DECL struct ev_loop *ev_loop_new (unsigned int flags EV_CPP (= 0)) EV_NOEXCEPT; diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index f7b4ee9ab..d849f542e 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -628,11 +628,25 @@ rspamd_worker_set_limits (struct rspamd_main *rspamd_main, } } +static void +rspamd_worker_on_term (EV_P_ ev_child *w, int revents) +{ + struct rspamd_worker *wrk = (struct rspamd_worker *)w->data; + + if (wrk->term_handler) { + wrk->term_handler (EV_A_ w, wrk->srv, wrk); + } + else { + rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus); + } +} + struct rspamd_worker * rspamd_fork_worker (struct rspamd_main *rspamd_main, - struct rspamd_worker_conf *cf, - guint index, - struct ev_loop *ev_base) + struct rspamd_worker_conf *cf, + guint index, + struct ev_loop *ev_base, + rspamd_worker_term_cb term_handler) { struct rspamd_worker *wrk; gint rc; @@ -662,6 +676,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, wrk->ppid = getpid (); wrk->pid = fork (); wrk->cores_throttled = rspamd_main->cores_throttling; + wrk->term_handler = term_handler; switch (wrk->pid) { case 0: @@ -753,6 +768,9 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, rspamd_socket_nonblocking (wrk->control_pipe[0]); rspamd_socket_nonblocking (wrk->srv_pipe[0]); rspamd_srv_start_watching (rspamd_main, wrk, ev_base); + wrk->cld_ev.data = wrk; + ev_child_init (&wrk->cld_ev, rspamd_worker_on_term, wrk->pid, 0); + ev_child_start (rspamd_main->event_loop, &wrk->cld_ev); /* Insert worker into worker's table, pid is index */ g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER ( wrk->pid), wrk); @@ -1146,4 +1164,87 @@ rspamd_worker_throttle_accept_events (gint sock, void *data) throttling, 0.0); ev_timer_start (cur->event_loop, &cur->throttling_ev); } +} + +gboolean +rspamd_check_termination_clause (struct rspamd_main *rspamd_main, + struct rspamd_worker *wrk, + int res) +{ + gboolean need_refork = TRUE; + + if (wrk->wanna_die) { + /* Do not refork workers that are intended to be terminated */ + need_refork = FALSE; + } + + if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { + /* Normal worker termination, do not fork one more */ + msg_info_main ("%s process %P terminated normally", + g_quark_to_string (wrk->type), + wrk->pid); + } + else { + if (WIFSIGNALED (res)) { +#ifdef WCOREDUMP + if (WCOREDUMP (res)) { + msg_warn_main ( + "%s process %P terminated abnormally by signal: %s" + " and created core file", + g_quark_to_string (wrk->type), + wrk->pid, + g_strsignal (WTERMSIG (res))); + } + else { +#ifdef HAVE_SYS_RESOURCE_H + struct rlimit rlmt; + (void) getrlimit (RLIMIT_CORE, &rlmt); + + msg_warn_main ( + "%s process %P terminated abnormally by signal: %s" + " but NOT created core file (throttled=%s); " + "core file limits: %L current, %L max", + g_quark_to_string (wrk->type), + wrk->pid, + g_strsignal (WTERMSIG (res)), + wrk->cores_throttled ? "yes" : "no", + (gint64) rlmt.rlim_cur, + (gint64) rlmt.rlim_max); +#else + msg_warn_main ( + "%s process %P terminated abnormally by signal: %s" + " but NOT created core file (throttled=%s); ", + g_quark_to_string (wrk->type), + wrk->pid, + g_strsignal (WTERMSIG (res)), + wrk->cores_throttled ? "yes" : "no"); +#endif + } +#else + msg_warn_main ( + "%s process %P terminated abnormally by signal: %s", + g_quark_to_string (wrk->type), + wrk->pid, + g_strsignal (WTERMSIG (res))); +#endif + if (WTERMSIG (res) == SIGUSR2) { + /* + * It is actually race condition when not started process + * has been requested to be reloaded. + * + * We shouldn't refork on this + */ + need_refork = FALSE; + } + } + else { + msg_warn_main ("%s process %P terminated abnormally " + "with exit code %d", + g_quark_to_string (wrk->type), + wrk->pid, + WEXITSTATUS (res)); + } + } + + return need_refork; } \ No newline at end of file diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h index 9693aa6ad..4946badcf 100644 --- a/src/libserver/worker_util.h +++ b/src/libserver/worker_util.h @@ -186,7 +186,9 @@ void rspamd_worker_session_cache_remove (void *cache, void *ptr); * Fork new worker with the specified configuration */ struct rspamd_worker *rspamd_fork_worker (struct rspamd_main *, - struct rspamd_worker_conf *, guint idx, struct ev_loop *ev_base); + struct rspamd_worker_conf *, guint idx, + struct ev_loop *ev_base, + rspamd_worker_term_cb term_handler); /** * Sets crash signals handlers if compiled with libunwind @@ -210,6 +212,17 @@ void rspamd_worker_init_monitored (struct rspamd_worker *worker, */ void rspamd_worker_throttle_accept_events (gint sock, void *data); +/** + * Checks (and logs) the worker's termination status. Returns TRUE if a worker + * should be restarted. + * @param rspamd_main + * @param wrk + * @param status waitpid res + * @return TRUE if refork is desired + */ +gboolean rspamd_check_termination_clause (struct rspamd_main *rspamd_main, + struct rspamd_worker *wrk, int status); + #define msg_err_main(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ rspamd_main->server_pool->tag.tagname, rspamd_main->server_pool->tag.uid, \ G_STRFUNC, \ diff --git a/src/rspamd.c b/src/rspamd.c index d07961757..a1badd635 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -70,13 +70,19 @@ #define TERMINATION_ATTEMPTS 50 static gboolean load_rspamd_config (struct rspamd_main *rspamd_main, - struct rspamd_config *cfg, - gboolean init_modules, - enum rspamd_post_load_options opts, - gboolean reload); + struct rspamd_config *cfg, + gboolean init_modules, + enum rspamd_post_load_options opts, + gboolean reload); +static void rspamd_cld_handler (EV_P_ ev_child *w, + struct rspamd_main *rspamd_main, + struct rspamd_worker *wrk); /* Control socket */ static gint control_fd; +static ev_io control_ev; + +static gboolean valgrind_mode = FALSE; /* Cmdline options */ static gboolean config_test = FALSE; @@ -100,9 +106,6 @@ static gboolean skip_template = FALSE; static gint term_attempts = 0; -/* List of unrelated forked processes */ -static GArray *other_workers = NULL; - /* List of active listen sockets indexed by worker type */ static GHashTable *listen_sockets = NULL; @@ -186,8 +189,7 @@ read_cmd_line (gint *argc, gchar ***argv, struct rspamd_config *cfg) { GError *error = NULL; GOptionContext *context; - guint i, cfg_num; - pid_t r; + guint cfg_num; context = g_option_context_new ("- run rspamd daemon"); #if defined(GIT_VERSION) && GIT_VERSION == 1 @@ -208,30 +210,13 @@ read_cmd_line (gint *argc, gchar ***argv, struct rspamd_config *cfg) cfg->rspamd_user = rspamd_user; cfg->rspamd_group = rspamd_group; cfg_num = cfg_names != NULL ? g_strv_length (cfg_names) : 0; + if (cfg_num == 0) { cfg->cfg_name = FIXED_CONFIG_FILE; } else { cfg->cfg_name = cfg_names[0]; - } - - for (i = 1; i < cfg_num; i++) { - r = fork (); - if (r == 0) { - /* Spawning new main process */ - cfg->cfg_name = cfg_names[i]; - (void)setsid (); - } - else if (r == -1) { - fprintf (stderr, - "fork failed while spawning process for %s configuration file: %s\n", - cfg_names[i], - strerror (errno)); - } - else { - /* Save pid to the list of other main processes, we need it to ignore SIGCHLD from them */ - g_array_append_val (other_workers, r); - } + g_assert (cfg_num == 1); } cfg->pid_file = rspamd_pidfile; @@ -371,7 +356,8 @@ rspamd_fork_delayed_cb (EV_P_ ev_timer *w, int revents) ev_timer_stop (EV_A_ &waiting_worker->wait_ev); rspamd_fork_worker (waiting_worker->rspamd_main, waiting_worker->cf, waiting_worker->oldindex, - waiting_worker->rspamd_main->event_loop); + waiting_worker->rspamd_main->event_loop, + rspamd_cld_handler); REF_RELEASE (waiting_worker->cf); g_free (waiting_worker); } @@ -553,7 +539,7 @@ make_listen_key (struct rspamd_worker_bind_conf *cf) } static void -spawn_worker_type (struct rspamd_main *rspamd_main, struct ev_loop *ev_base, +spawn_worker_type (struct rspamd_main *rspamd_main, struct ev_loop *event_loop, struct rspamd_worker_conf *cf) { gint i; @@ -570,14 +556,15 @@ spawn_worker_type (struct rspamd_main *rspamd_main, struct ev_loop *ev_base, "cannot spawn more than 1 %s worker, so spawn one", cf->worker->name); } - rspamd_fork_worker (rspamd_main, cf, 0, ev_base); + rspamd_fork_worker (rspamd_main, cf, 0, event_loop, rspamd_cld_handler); } else if (cf->worker->flags & RSPAMD_WORKER_THREADED) { - rspamd_fork_worker (rspamd_main, cf, 0, ev_base); + rspamd_fork_worker (rspamd_main, cf, 0, event_loop, rspamd_cld_handler); } else { for (i = 0; i < cf->count; i++) { - rspamd_fork_worker (rspamd_main, cf, i, ev_base); + rspamd_fork_worker (rspamd_main, cf, i, event_loop, + rspamd_cld_handler); } } } @@ -823,19 +810,6 @@ hash_worker_wait_callback (gpointer key, gpointer value, gpointer unused) return rspamd_worker_wait ((struct rspamd_worker *)value); } -static void -rspamd_final_cld_handler (EV_P_ ev_signal *w, int revents) -{ - struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data; - g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, - NULL); - - if (g_hash_table_size (rspamd_main->workers) == 0) { - ev_break (rspamd_main->event_loop, EVBREAK_ALL); - } -} - - struct core_check_cbdata { struct rspamd_config *cfg; gsize total_count; @@ -1011,19 +985,52 @@ stop_srv_ev (gpointer key, gpointer value, gpointer ud) ev_io_stop (rspamd_main->event_loop, &cur->srv_ev); } +static void +rspamd_final_timer_handler (EV_P_ ev_timer *w, int revents) +{ + struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data; + + term_attempts--; + + g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL); + + if (g_hash_table_size (rspamd_main->workers) == 0) { + ev_break (rspamd_main->event_loop, EVBREAK_ALL); + } +} + /* Signal handlers */ static void rspamd_term_handler (struct ev_loop *loop, ev_signal *w, int revents) { struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data; + static ev_timer ev_finale; - msg_info_main ("catch termination signal, waiting for children"); - rspamd_log_nolock (rspamd_main->logger); - /* Stop srv events to avoid false notifications */ - g_hash_table_foreach (rspamd_main->workers, stop_srv_ev, rspamd_main); - rspamd_pass_signal (rspamd_main->workers, w->signum); + if (!rspamd_main->wanna_die) { + rspamd_main->wanna_die = TRUE; + msg_info_main ("catch termination signal, waiting for children"); + rspamd_log_nolock (rspamd_main->logger); + /* Stop srv events to avoid false notifications */ + g_hash_table_foreach (rspamd_main->workers, stop_srv_ev, rspamd_main); + rspamd_pass_signal (rspamd_main->workers, w->signum); + + if (control_fd != -1) { + ev_io_stop (rspamd_main->event_loop, &control_ev); + close (control_fd); + } - ev_break (rspamd_main->event_loop, EVBREAK_ALL); + if (valgrind_mode) { + /* Special case if we are likely running with valgrind */ + term_attempts = TERMINATION_ATTEMPTS * 10; + } + else { + term_attempts = TERMINATION_ATTEMPTS; + } + + ev_finale.data = rspamd_main; + ev_timer_init (&ev_finale, rspamd_final_timer_handler, 0.2, 0.2); + ev_timer_start (rspamd_main->event_loop, &ev_finale); + } } static void @@ -1031,11 +1038,13 @@ rspamd_usr1_handler (struct ev_loop *loop, ev_signal *w, int revents) { struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data; - rspamd_log_reopen_priv (rspamd_main->logger, - rspamd_main->workers_uid, - rspamd_main->workers_gid); - g_hash_table_foreach (rspamd_main->workers, reopen_log_handler, - NULL); + if (!rspamd_main->wanna_die) { + rspamd_log_reopen_priv (rspamd_main->logger, + rspamd_main->workers_uid, + rspamd_main->workers_gid); + g_hash_table_foreach (rspamd_main->workers, reopen_log_handler, + NULL); + } } static void @@ -1043,195 +1052,65 @@ rspamd_hup_handler (struct ev_loop *loop, ev_signal *w, int revents) { struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data; - msg_info_main ("rspamd " - RVERSION - " is restarting"); - g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL); - rspamd_log_close_priv (rspamd_main->logger, + if (!rspamd_main->wanna_die) { + msg_info_main ("rspamd " + RVERSION + " is restarting"); + g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL); + rspamd_log_close_priv (rspamd_main->logger, FALSE, rspamd_main->workers_uid, rspamd_main->workers_gid); - reread_config (rspamd_main); - rspamd_check_core_limits (rspamd_main); - spawn_workers (rspamd_main, rspamd_main->event_loop); + reread_config (rspamd_main); + rspamd_check_core_limits (rspamd_main); + spawn_workers (rspamd_main, rspamd_main->event_loop); + } } +/* Called when a dead child has been found */ + static void -rspamd_cld_handler (EV_P_ ev_signal *w, int revents) +rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main, + struct rspamd_worker *wrk) { - struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data; - guint i; - gint res = 0; - struct rspamd_worker *cur; - pid_t pid; - gboolean need_refork = TRUE, found_proc = FALSE; + gboolean need_refork; /* Turn off locking for logger */ rspamd_log_nolock (rspamd_main->logger); msg_info_main ("got SIGCHLD signal, finding terminated workers"); /* Remove dead child form children list */ - for (;;) { - pid = waitpid (0, &res, WNOHANG); - - if (pid == -1) { - if (errno != EINTR) { - msg_warn_main ("got unexpected system error when waiting: %s", - strerror (errno)); - break; - } - else { - continue; - } - } - else if (pid == 0) { - /* No more processes to wait */ - break; - } - - found_proc = TRUE; - - if ((cur = - g_hash_table_lookup (rspamd_main->workers, - GSIZE_TO_POINTER (pid))) != NULL) { - /* Unlink dead process from queue and hash table */ - - g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER ( - pid)); - - if (cur->wanna_die) { - /* Do not refork workers that are intended to be terminated */ - need_refork = FALSE; - } - - if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { - /* Normal worker termination, do not fork one more */ - msg_info_main ("%s process %P terminated normally", - g_quark_to_string (cur->type), - cur->pid); - } - else { - if (WIFSIGNALED (res)) { -#ifdef WCOREDUMP - if (WCOREDUMP (res)) { - msg_warn_main ( - "%s process %P terminated abnormally by signal: %s" - " and created core file", - g_quark_to_string (cur->type), - cur->pid, - g_strsignal (WTERMSIG (res))); - } - else { -#ifdef HAVE_SYS_RESOURCE_H - struct rlimit rlmt; - (void)getrlimit (RLIMIT_CORE, &rlmt); - - msg_warn_main ( - "%s process %P terminated abnormally by signal: %s" - " but NOT created core file (throttled=%s); " - "core file limits: %L current, %L max", - g_quark_to_string (cur->type), - cur->pid, - g_strsignal (WTERMSIG (res)), - cur->cores_throttled ? "yes" : "no", - (gint64)rlmt.rlim_cur, - (gint64)rlmt.rlim_max); -#else - msg_warn_main ( - "%s process %P terminated abnormally by signal: %s" - " but NOT created core file (throttled=%s); ", - g_quark_to_string (cur->type), - cur->pid, - g_strsignal (WTERMSIG (res)), - cur->cores_throttled ? "yes" : "no"); -#endif - } -#else - msg_warn_main ( - "%s process %P terminated abnormally by signal: %s", - g_quark_to_string (cur->type), - cur->pid, - g_strsignal (WTERMSIG (res))); -#endif - if (WTERMSIG (res) == SIGUSR2) { - /* - * It is actually race condition when not started process - * has been requested to be reloaded. - * - * We shouldn't refork on this - */ - need_refork = FALSE; - } - } - else { - msg_warn_main ("%s process %P terminated abnormally " - "with exit code %d", - g_quark_to_string (cur->type), - cur->pid, - WEXITSTATUS (res)); - } - - if (need_refork) { - /* Fork another worker in replace of dead one */ - rspamd_check_core_limits (rspamd_main); - - - rspamd_fork_delayed (cur->cf, cur->index, rspamd_main); - } - } - - if (cur->srv_pipe[0] != -1) { - /* Ugly workaround */ - if (cur->tmp_data) { - g_free (cur->tmp_data); - } - ev_io_stop (rspamd_main->event_loop, &cur->srv_ev); - } - - if (cur->control_pipe[0] != -1) { - /* We also need to clean descriptors left */ - close (cur->control_pipe[0]); - close (cur->srv_pipe[0]); - } - - REF_RELEASE (cur->cf); - - if (cur->finish_actions) { - g_ptr_array_free (cur->finish_actions, TRUE); - } - - g_free (cur); - } - else { - for (i = 0; i < other_workers->len; i++) { - if (g_array_index (other_workers, pid_t, i) == pid) { - g_array_remove_index_fast (other_workers, i); - msg_info_main ("related process %P terminated", pid); - } - } + g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (wrk->pid)); + if (wrk->srv_pipe[0] != -1) { + /* Ugly workaround */ + if (wrk->tmp_data) { + g_free (wrk->tmp_data); } + ev_io_stop (rspamd_main->event_loop, &wrk->srv_ev); } - if (!found_proc) { - msg_err_main ("got SIGCHLD but no workers were able to be waited: %s", - strerror (errno)); + if (wrk->control_pipe[0] != -1) { + /* We also need to clean descriptors left */ + close (wrk->control_pipe[0]); + close (wrk->srv_pipe[0]); } - rspamd_log_lock (rspamd_main->logger); -} + REF_RELEASE (wrk->cf); -static void -rspamd_final_term_handler (EV_P_ ev_timer *w, int revents) -{ - struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data; + if (wrk->finish_actions) { + g_ptr_array_free (wrk->finish_actions, TRUE); + } - term_attempts--; + need_refork = rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus); - g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL); - - if (g_hash_table_size (rspamd_main->workers) == 0) { - ev_break (rspamd_main->event_loop, EVBREAK_ALL); + if (need_refork) { + /* Fork another worker in replace of dead one */ + rspamd_check_core_limits (rspamd_main); + rspamd_fork_delayed (wrk->cf, wrk->index, rspamd_main); } + + g_free (wrk); + rspamd_log_lock (rspamd_main->logger); } /* Control socket handler */ @@ -1299,10 +1178,9 @@ main (gint argc, gchar **argv, gchar **env) GQuark type; rspamd_inet_addr_t *control_addr = NULL; struct ev_loop *event_loop; - static ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev; - static ev_io control_ev; + static ev_signal term_ev, int_ev, hup_ev, usr1_ev; struct rspamd_main *rspamd_main; - gboolean skip_pid = FALSE, valgrind_mode = FALSE; + gboolean skip_pid = FALSE; #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) g_thread_init (NULL); @@ -1328,7 +1206,6 @@ main (gint argc, gchar **argv, gchar **env) rspamd_main->cfg->libs_ctx = rspamd_init_libs (); memset (&signals, 0, sizeof (struct sigaction)); - other_workers = g_array_new (FALSE, TRUE, sizeof (pid_t)); read_cmd_line (&argc, &argv, rspamd_main->cfg); @@ -1517,7 +1394,6 @@ main (gint argc, gchar **argv, gchar **env) /* Set title */ setproctitle ("main process"); - /* Flush log */ rspamd_log_flush (rspamd_main->logger); @@ -1576,12 +1452,6 @@ main (gint argc, gchar **argv, gchar **env) usr1_ev.data = rspamd_main; ev_signal_start (event_loop, &usr1_ev); - - /* XXX: deal with children */ - cld_ev.data = rspamd_main; - ev_signal_init (&cld_ev, rspamd_cld_handler, SIGCHLD); - ev_signal_start (event_loop, &cld_ev); - rspamd_check_core_limits (rspamd_main); rspamd_mempool_lock_mutex (rspamd_main->start_mtx); spawn_workers (rspamd_main, event_loop); @@ -1598,41 +1468,6 @@ main (gint argc, gchar **argv, gchar **env) ev_io_start (event_loop, &control_ev); } - ev_loop (event_loop, 0); - /* We need to block signals unless children are waited for */ - rspamd_worker_block_signals (); - ev_signal_stop (event_loop, &term_ev); - ev_signal_stop (event_loop, &int_ev); - ev_signal_stop (event_loop, &hup_ev); - ev_signal_stop (event_loop, &cld_ev); - ev_signal_stop (event_loop, &usr1_ev); - - if (control_fd != -1) { - ev_io_stop (event_loop, &control_ev); - close (control_fd); - } - - if (valgrind_mode) { - /* Special case if we are likely running with valgrind */ - term_attempts = TERMINATION_ATTEMPTS * 10; - } - else { - term_attempts = TERMINATION_ATTEMPTS; - } - - - /* Wait for workers termination */ - ev_signal_init (&cld_ev, rspamd_final_cld_handler, SIGCHLD); - ev_signal_start (event_loop, &cld_ev); - - g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL); - - static ev_timer ev_finale; - - ev_finale.data = rspamd_main; - ev_timer_init (&ev_finale, rspamd_final_term_handler, 0.2, 0.2); - ev_timer_start (event_loop, &ev_finale); - ev_loop (event_loop, 0); /* Maybe save roll history */ diff --git a/src/rspamd.h b/src/rspamd.h index e47271ca3..97e4c4c6a 100644 --- a/src/rspamd.h +++ b/src/rspamd.h @@ -69,6 +69,9 @@ struct rspamd_worker_accept_event { struct rspamd_worker_accept_event *prev, *next; }; +typedef void (*rspamd_worker_term_cb)(EV_P_ ev_child *, struct rspamd_main *, + struct rspamd_worker *); + /** * Worker process structure */ @@ -95,6 +98,8 @@ struct rspamd_worker { gpointer control_data; /**< used by control protocol to handle commands */ gpointer tmp_data; /**< used to avoid race condition to deal with control messages */ GPtrArray *finish_actions; /**< called when worker is terminated */ + ev_child cld_ev; /**< to allow reaping */ + rspamd_worker_term_cb term_handler; /**< custom term handler */ }; struct rspamd_abstract_worker_ctx { @@ -280,6 +285,7 @@ struct rspamd_main { uid_t workers_uid; /**< worker's uid running to */ gid_t workers_gid; /**< worker's gid running to */ gboolean is_privilleged; /**< true if run in privilleged mode */ + gboolean wanna_die; /**< no respawn of processes */ gboolean cores_throttling; /**< turn off cores when limits are exceeded */ struct roll_history *history; /**< rolling history */ struct ev_loop *event_loop; -- 2.39.5