summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--contrib/libev/CMakeLists.txt6
-rw-r--r--contrib/libev/ev.c16
-rw-r--r--contrib/libev/ev.h17
-rw-r--r--src/libserver/worker_util.c107
-rw-r--r--src/libserver/worker_util.h15
-rw-r--r--src/rspamd.c375
-rw-r--r--src/rspamd.h6
7 files changed, 246 insertions, 296 deletions
diff --git a/contrib/libev/CMakeLists.txt b/contrib/libev/CMakeLists.txt
index d363c3dbc..f0350050d 100644
--- a/contrib/libev/CMakeLists.txt
+++ b/contrib/libev/CMakeLists.txt
@@ -55,7 +55,7 @@ ENDIF()
CONFIGURE_FILE(config.h.in libev-config.h)
-ADD_LIBRARY(rspamd-ev STATIC ${LIBEVSRC})
+ADD_LIBRARY(rspamd-ev SHARED ${LIBEVSRC})
ADD_DEFINITIONS("-DEV_CONFIG_H=\"${CMAKE_CURRENT_BINARY_DIR}/libev-config.h\""
-DEV_MULTIPLICITY=1
-DEV_USE_FLOOR=1
@@ -68,4 +68,6 @@ ENDIF()
IF(ENABLE_FULL_DEBUG MATCHES "ON")
ADD_DEFINITIONS(-DEV_VERIFY=3)
-ENDIF() \ No newline at end of file
+ENDIF()
+
+INSTALL(TARGETS rspamd-ev LIBRARY DESTINATION ${RSPAMD_LIBDIR}) \ No newline at end of file
diff --git a/contrib/libev/ev.c b/contrib/libev/ev.c
index 4f3f0b3ce..cb8127fc5 100644
--- a/contrib/libev/ev.c
+++ b/contrib/libev/ev.c
@@ -1840,9 +1840,7 @@ typedef struct
#include "ev_wrap.h"
static struct ev_loop default_loop_struct;
- EV_API_DECL struct ev_loop *ev_default_loop_ptr; /* needs to be initialised to make it a definition despite extern */
- struct ev_loop *ev_default_loop_ptr = 0;
-
+ static struct ev_loop *ev_default_loop_ptr = 0;
#else
EV_API_DECL ev_tstamp ev_rt_now = 0; /* needs to be initialised to make it a definition despite extern */
@@ -2855,6 +2853,18 @@ ev_set_loop_release_cb (EV_P_ void (*release)(EV_P) EV_NOEXCEPT, void (*acquire)
}
#endif
+EV_INLINE struct ev_loop *
+ev_default_loop_uc_ (void) EV_NOEXCEPT
+{
+ return ev_default_loop_ptr;
+}
+
+EV_INLINE int
+ev_is_default_loop (EV_P) EV_NOEXCEPT
+{
+ return EV_A == EV_DEFAULT_UC;
+}
+
/* initialise a loop structure, must be zero-initialised */
noinline ecb_cold
static void
diff --git a/contrib/libev/ev.h b/contrib/libev/ev.h
index f76c892e4..cb7b2e479 100644
--- a/contrib/libev/ev.h
+++ b/contrib/libev/ev.h
@@ -557,23 +557,6 @@ EV_API_DECL void ev_set_syserr_cb (void (*cb)(const char *msg) EV_NOEXCEPT) EV_N
/* you can call this as often as you like */
EV_API_DECL struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0)) EV_NOEXCEPT;
-#ifdef EV_API_STATIC
-EV_API_DECL struct ev_loop *ev_default_loop_ptr;
-#endif
-
-EV_INLINE struct ev_loop *
-ev_default_loop_uc_ (void) EV_NOEXCEPT
-{
- extern struct ev_loop *ev_default_loop_ptr;
-
- return ev_default_loop_ptr;
-}
-
-EV_INLINE int
-ev_is_default_loop (EV_P) EV_NOEXCEPT
-{
- return EV_A == EV_DEFAULT_UC;
-}
/* create and destroy alternative loops that don't handle signals */
EV_API_DECL struct ev_loop *ev_loop_new (unsigned int flags EV_CPP (= 0)) EV_NOEXCEPT;
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index f7b4ee9ab..d849f542e 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -628,11 +628,25 @@ rspamd_worker_set_limits (struct rspamd_main *rspamd_main,
}
}
+static void
+rspamd_worker_on_term (EV_P_ ev_child *w, int revents)
+{
+ struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
+
+ if (wrk->term_handler) {
+ wrk->term_handler (EV_A_ w, wrk->srv, wrk);
+ }
+ else {
+ rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
+ }
+}
+
struct rspamd_worker *
rspamd_fork_worker (struct rspamd_main *rspamd_main,
- struct rspamd_worker_conf *cf,
- guint index,
- struct ev_loop *ev_base)
+ struct rspamd_worker_conf *cf,
+ guint index,
+ struct ev_loop *ev_base,
+ rspamd_worker_term_cb term_handler)
{
struct rspamd_worker *wrk;
gint rc;
@@ -662,6 +676,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
wrk->ppid = getpid ();
wrk->pid = fork ();
wrk->cores_throttled = rspamd_main->cores_throttling;
+ wrk->term_handler = term_handler;
switch (wrk->pid) {
case 0:
@@ -753,6 +768,9 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
rspamd_socket_nonblocking (wrk->control_pipe[0]);
rspamd_socket_nonblocking (wrk->srv_pipe[0]);
rspamd_srv_start_watching (rspamd_main, wrk, ev_base);
+ wrk->cld_ev.data = wrk;
+ ev_child_init (&wrk->cld_ev, rspamd_worker_on_term, wrk->pid, 0);
+ ev_child_start (rspamd_main->event_loop, &wrk->cld_ev);
/* Insert worker into worker's table, pid is index */
g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (
wrk->pid), wrk);
@@ -1146,4 +1164,87 @@ rspamd_worker_throttle_accept_events (gint sock, void *data)
throttling, 0.0);
ev_timer_start (cur->event_loop, &cur->throttling_ev);
}
+}
+
+gboolean
+rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
+ struct rspamd_worker *wrk,
+ int res)
+{
+ gboolean need_refork = TRUE;
+
+ if (wrk->wanna_die) {
+ /* Do not refork workers that are intended to be terminated */
+ need_refork = FALSE;
+ }
+
+ if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
+ /* Normal worker termination, do not fork one more */
+ msg_info_main ("%s process %P terminated normally",
+ g_quark_to_string (wrk->type),
+ wrk->pid);
+ }
+ else {
+ if (WIFSIGNALED (res)) {
+#ifdef WCOREDUMP
+ if (WCOREDUMP (res)) {
+ msg_warn_main (
+ "%s process %P terminated abnormally by signal: %s"
+ " and created core file",
+ g_quark_to_string (wrk->type),
+ wrk->pid,
+ g_strsignal (WTERMSIG (res)));
+ }
+ else {
+#ifdef HAVE_SYS_RESOURCE_H
+ struct rlimit rlmt;
+ (void) getrlimit (RLIMIT_CORE, &rlmt);
+
+ msg_warn_main (
+ "%s process %P terminated abnormally by signal: %s"
+ " but NOT created core file (throttled=%s); "
+ "core file limits: %L current, %L max",
+ g_quark_to_string (wrk->type),
+ wrk->pid,
+ g_strsignal (WTERMSIG (res)),
+ wrk->cores_throttled ? "yes" : "no",
+ (gint64) rlmt.rlim_cur,
+ (gint64) rlmt.rlim_max);
+#else
+ msg_warn_main (
+ "%s process %P terminated abnormally by signal: %s"
+ " but NOT created core file (throttled=%s); ",
+ g_quark_to_string (wrk->type),
+ wrk->pid,
+ g_strsignal (WTERMSIG (res)),
+ wrk->cores_throttled ? "yes" : "no");
+#endif
+ }
+#else
+ msg_warn_main (
+ "%s process %P terminated abnormally by signal: %s",
+ g_quark_to_string (wrk->type),
+ wrk->pid,
+ g_strsignal (WTERMSIG (res)));
+#endif
+ if (WTERMSIG (res) == SIGUSR2) {
+ /*
+ * It is actually race condition when not started process
+ * has been requested to be reloaded.
+ *
+ * We shouldn't refork on this
+ */
+ need_refork = FALSE;
+ }
+ }
+ else {
+ msg_warn_main ("%s process %P terminated abnormally "
+ "with exit code %d",
+ g_quark_to_string (wrk->type),
+ wrk->pid,
+ WEXITSTATUS (res));
+ }
+ }
+
+ return need_refork;
} \ No newline at end of file
diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h
index 9693aa6ad..4946badcf 100644
--- a/src/libserver/worker_util.h
+++ b/src/libserver/worker_util.h
@@ -186,7 +186,9 @@ void rspamd_worker_session_cache_remove (void *cache, void *ptr);
* Fork new worker with the specified configuration
*/
struct rspamd_worker *rspamd_fork_worker (struct rspamd_main *,
- struct rspamd_worker_conf *, guint idx, struct ev_loop *ev_base);
+ struct rspamd_worker_conf *, guint idx,
+ struct ev_loop *ev_base,
+ rspamd_worker_term_cb term_handler);
/**
* Sets crash signals handlers if compiled with libunwind
@@ -210,6 +212,17 @@ void rspamd_worker_init_monitored (struct rspamd_worker *worker,
*/
void rspamd_worker_throttle_accept_events (gint sock, void *data);
+/**
+ * Checks (and logs) the worker's termination status. Returns TRUE if a worker
+ * should be restarted.
+ * @param rspamd_main
+ * @param wrk
+ * @param status waitpid res
+ * @return TRUE if refork is desired
+ */
+gboolean rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
+ struct rspamd_worker *wrk, int status);
+
#define msg_err_main(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
rspamd_main->server_pool->tag.tagname, rspamd_main->server_pool->tag.uid, \
G_STRFUNC, \
diff --git a/src/rspamd.c b/src/rspamd.c
index d07961757..a1badd635 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -70,13 +70,19 @@
#define TERMINATION_ATTEMPTS 50
static gboolean load_rspamd_config (struct rspamd_main *rspamd_main,
- struct rspamd_config *cfg,
- gboolean init_modules,
- enum rspamd_post_load_options opts,
- gboolean reload);
+ struct rspamd_config *cfg,
+ gboolean init_modules,
+ enum rspamd_post_load_options opts,
+ gboolean reload);
+static void rspamd_cld_handler (EV_P_ ev_child *w,
+ struct rspamd_main *rspamd_main,
+ struct rspamd_worker *wrk);
/* Control socket */
static gint control_fd;
+static ev_io control_ev;
+
+static gboolean valgrind_mode = FALSE;
/* Cmdline options */
static gboolean config_test = FALSE;
@@ -100,9 +106,6 @@ static gboolean skip_template = FALSE;
static gint term_attempts = 0;
-/* List of unrelated forked processes */
-static GArray *other_workers = NULL;
-
/* List of active listen sockets indexed by worker type */
static GHashTable *listen_sockets = NULL;
@@ -186,8 +189,7 @@ read_cmd_line (gint *argc, gchar ***argv, struct rspamd_config *cfg)
{
GError *error = NULL;
GOptionContext *context;
- guint i, cfg_num;
- pid_t r;
+ guint cfg_num;
context = g_option_context_new ("- run rspamd daemon");
#if defined(GIT_VERSION) && GIT_VERSION == 1
@@ -208,30 +210,13 @@ read_cmd_line (gint *argc, gchar ***argv, struct rspamd_config *cfg)
cfg->rspamd_user = rspamd_user;
cfg->rspamd_group = rspamd_group;
cfg_num = cfg_names != NULL ? g_strv_length (cfg_names) : 0;
+
if (cfg_num == 0) {
cfg->cfg_name = FIXED_CONFIG_FILE;
}
else {
cfg->cfg_name = cfg_names[0];
- }
-
- for (i = 1; i < cfg_num; i++) {
- r = fork ();
- if (r == 0) {
- /* Spawning new main process */
- cfg->cfg_name = cfg_names[i];
- (void)setsid ();
- }
- else if (r == -1) {
- fprintf (stderr,
- "fork failed while spawning process for %s configuration file: %s\n",
- cfg_names[i],
- strerror (errno));
- }
- else {
- /* Save pid to the list of other main processes, we need it to ignore SIGCHLD from them */
- g_array_append_val (other_workers, r);
- }
+ g_assert (cfg_num == 1);
}
cfg->pid_file = rspamd_pidfile;
@@ -371,7 +356,8 @@ rspamd_fork_delayed_cb (EV_P_ ev_timer *w, int revents)
ev_timer_stop (EV_A_ &waiting_worker->wait_ev);
rspamd_fork_worker (waiting_worker->rspamd_main, waiting_worker->cf,
waiting_worker->oldindex,
- waiting_worker->rspamd_main->event_loop);
+ waiting_worker->rspamd_main->event_loop,
+ rspamd_cld_handler);
REF_RELEASE (waiting_worker->cf);
g_free (waiting_worker);
}
@@ -553,7 +539,7 @@ make_listen_key (struct rspamd_worker_bind_conf *cf)
}
static void
-spawn_worker_type (struct rspamd_main *rspamd_main, struct ev_loop *ev_base,
+spawn_worker_type (struct rspamd_main *rspamd_main, struct ev_loop *event_loop,
struct rspamd_worker_conf *cf)
{
gint i;
@@ -570,14 +556,15 @@ spawn_worker_type (struct rspamd_main *rspamd_main, struct ev_loop *ev_base,
"cannot spawn more than 1 %s worker, so spawn one",
cf->worker->name);
}
- rspamd_fork_worker (rspamd_main, cf, 0, ev_base);
+ rspamd_fork_worker (rspamd_main, cf, 0, event_loop, rspamd_cld_handler);
}
else if (cf->worker->flags & RSPAMD_WORKER_THREADED) {
- rspamd_fork_worker (rspamd_main, cf, 0, ev_base);
+ rspamd_fork_worker (rspamd_main, cf, 0, event_loop, rspamd_cld_handler);
}
else {
for (i = 0; i < cf->count; i++) {
- rspamd_fork_worker (rspamd_main, cf, i, ev_base);
+ rspamd_fork_worker (rspamd_main, cf, i, event_loop,
+ rspamd_cld_handler);
}
}
}
@@ -823,19 +810,6 @@ hash_worker_wait_callback (gpointer key, gpointer value, gpointer unused)
return rspamd_worker_wait ((struct rspamd_worker *)value);
}
-static void
-rspamd_final_cld_handler (EV_P_ ev_signal *w, int revents)
-{
- struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
- g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback,
- NULL);
-
- if (g_hash_table_size (rspamd_main->workers) == 0) {
- ev_break (rspamd_main->event_loop, EVBREAK_ALL);
- }
-}
-
-
struct core_check_cbdata {
struct rspamd_config *cfg;
gsize total_count;
@@ -1011,19 +985,52 @@ stop_srv_ev (gpointer key, gpointer value, gpointer ud)
ev_io_stop (rspamd_main->event_loop, &cur->srv_ev);
}
+static void
+rspamd_final_timer_handler (EV_P_ ev_timer *w, int revents)
+{
+ struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
+
+ term_attempts--;
+
+ g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
+
+ if (g_hash_table_size (rspamd_main->workers) == 0) {
+ ev_break (rspamd_main->event_loop, EVBREAK_ALL);
+ }
+}
+
/* Signal handlers */
static void
rspamd_term_handler (struct ev_loop *loop, ev_signal *w, int revents)
{
struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
+ static ev_timer ev_finale;
- msg_info_main ("catch termination signal, waiting for children");
- rspamd_log_nolock (rspamd_main->logger);
- /* Stop srv events to avoid false notifications */
- g_hash_table_foreach (rspamd_main->workers, stop_srv_ev, rspamd_main);
- rspamd_pass_signal (rspamd_main->workers, w->signum);
+ if (!rspamd_main->wanna_die) {
+ rspamd_main->wanna_die = TRUE;
+ msg_info_main ("catch termination signal, waiting for children");
+ rspamd_log_nolock (rspamd_main->logger);
+ /* Stop srv events to avoid false notifications */
+ g_hash_table_foreach (rspamd_main->workers, stop_srv_ev, rspamd_main);
+ rspamd_pass_signal (rspamd_main->workers, w->signum);
+
+ if (control_fd != -1) {
+ ev_io_stop (rspamd_main->event_loop, &control_ev);
+ close (control_fd);
+ }
- ev_break (rspamd_main->event_loop, EVBREAK_ALL);
+ if (valgrind_mode) {
+ /* Special case if we are likely running with valgrind */
+ term_attempts = TERMINATION_ATTEMPTS * 10;
+ }
+ else {
+ term_attempts = TERMINATION_ATTEMPTS;
+ }
+
+ ev_finale.data = rspamd_main;
+ ev_timer_init (&ev_finale, rspamd_final_timer_handler, 0.2, 0.2);
+ ev_timer_start (rspamd_main->event_loop, &ev_finale);
+ }
}
static void
@@ -1031,11 +1038,13 @@ rspamd_usr1_handler (struct ev_loop *loop, ev_signal *w, int revents)
{
struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
- rspamd_log_reopen_priv (rspamd_main->logger,
- rspamd_main->workers_uid,
- rspamd_main->workers_gid);
- g_hash_table_foreach (rspamd_main->workers, reopen_log_handler,
- NULL);
+ if (!rspamd_main->wanna_die) {
+ rspamd_log_reopen_priv (rspamd_main->logger,
+ rspamd_main->workers_uid,
+ rspamd_main->workers_gid);
+ g_hash_table_foreach (rspamd_main->workers, reopen_log_handler,
+ NULL);
+ }
}
static void
@@ -1043,195 +1052,65 @@ rspamd_hup_handler (struct ev_loop *loop, ev_signal *w, int revents)
{
struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
- msg_info_main ("rspamd "
- RVERSION
- " is restarting");
- g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL);
- rspamd_log_close_priv (rspamd_main->logger,
+ if (!rspamd_main->wanna_die) {
+ msg_info_main ("rspamd "
+ RVERSION
+ " is restarting");
+ g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL);
+ rspamd_log_close_priv (rspamd_main->logger,
FALSE,
rspamd_main->workers_uid,
rspamd_main->workers_gid);
- reread_config (rspamd_main);
- rspamd_check_core_limits (rspamd_main);
- spawn_workers (rspamd_main, rspamd_main->event_loop);
+ reread_config (rspamd_main);
+ rspamd_check_core_limits (rspamd_main);
+ spawn_workers (rspamd_main, rspamd_main->event_loop);
+ }
}
+/* Called when a dead child has been found */
+
static void
-rspamd_cld_handler (EV_P_ ev_signal *w, int revents)
+rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
+ struct rspamd_worker *wrk)
{
- struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
- guint i;
- gint res = 0;
- struct rspamd_worker *cur;
- pid_t pid;
- gboolean need_refork = TRUE, found_proc = FALSE;
+ gboolean need_refork;
/* Turn off locking for logger */
rspamd_log_nolock (rspamd_main->logger);
msg_info_main ("got SIGCHLD signal, finding terminated workers");
/* Remove dead child form children list */
- for (;;) {
- pid = waitpid (0, &res, WNOHANG);
-
- if (pid == -1) {
- if (errno != EINTR) {
- msg_warn_main ("got unexpected system error when waiting: %s",
- strerror (errno));
- break;
- }
- else {
- continue;
- }
- }
- else if (pid == 0) {
- /* No more processes to wait */
- break;
- }
-
- found_proc = TRUE;
-
- if ((cur =
- g_hash_table_lookup (rspamd_main->workers,
- GSIZE_TO_POINTER (pid))) != NULL) {
- /* Unlink dead process from queue and hash table */
-
- g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (
- pid));
-
- if (cur->wanna_die) {
- /* Do not refork workers that are intended to be terminated */
- need_refork = FALSE;
- }
-
- if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
- /* Normal worker termination, do not fork one more */
- msg_info_main ("%s process %P terminated normally",
- g_quark_to_string (cur->type),
- cur->pid);
- }
- else {
- if (WIFSIGNALED (res)) {
-#ifdef WCOREDUMP
- if (WCOREDUMP (res)) {
- msg_warn_main (
- "%s process %P terminated abnormally by signal: %s"
- " and created core file",
- g_quark_to_string (cur->type),
- cur->pid,
- g_strsignal (WTERMSIG (res)));
- }
- else {
-#ifdef HAVE_SYS_RESOURCE_H
- struct rlimit rlmt;
- (void)getrlimit (RLIMIT_CORE, &rlmt);
-
- msg_warn_main (
- "%s process %P terminated abnormally by signal: %s"
- " but NOT created core file (throttled=%s); "
- "core file limits: %L current, %L max",
- g_quark_to_string (cur->type),
- cur->pid,
- g_strsignal (WTERMSIG (res)),
- cur->cores_throttled ? "yes" : "no",
- (gint64)rlmt.rlim_cur,
- (gint64)rlmt.rlim_max);
-#else
- msg_warn_main (
- "%s process %P terminated abnormally by signal: %s"
- " but NOT created core file (throttled=%s); ",
- g_quark_to_string (cur->type),
- cur->pid,
- g_strsignal (WTERMSIG (res)),
- cur->cores_throttled ? "yes" : "no");
-#endif
- }
-#else
- msg_warn_main (
- "%s process %P terminated abnormally by signal: %s",
- g_quark_to_string (cur->type),
- cur->pid,
- g_strsignal (WTERMSIG (res)));
-#endif
- if (WTERMSIG (res) == SIGUSR2) {
- /*
- * It is actually race condition when not started process
- * has been requested to be reloaded.
- *
- * We shouldn't refork on this
- */
- need_refork = FALSE;
- }
- }
- else {
- msg_warn_main ("%s process %P terminated abnormally "
- "with exit code %d",
- g_quark_to_string (cur->type),
- cur->pid,
- WEXITSTATUS (res));
- }
-
- if (need_refork) {
- /* Fork another worker in replace of dead one */
- rspamd_check_core_limits (rspamd_main);
-
-
- rspamd_fork_delayed (cur->cf, cur->index, rspamd_main);
- }
- }
-
- if (cur->srv_pipe[0] != -1) {
- /* Ugly workaround */
- if (cur->tmp_data) {
- g_free (cur->tmp_data);
- }
- ev_io_stop (rspamd_main->event_loop, &cur->srv_ev);
- }
-
- if (cur->control_pipe[0] != -1) {
- /* We also need to clean descriptors left */
- close (cur->control_pipe[0]);
- close (cur->srv_pipe[0]);
- }
-
- REF_RELEASE (cur->cf);
-
- if (cur->finish_actions) {
- g_ptr_array_free (cur->finish_actions, TRUE);
- }
-
- g_free (cur);
- }
- else {
- for (i = 0; i < other_workers->len; i++) {
- if (g_array_index (other_workers, pid_t, i) == pid) {
- g_array_remove_index_fast (other_workers, i);
- msg_info_main ("related process %P terminated", pid);
- }
- }
+ g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (wrk->pid));
+ if (wrk->srv_pipe[0] != -1) {
+ /* Ugly workaround */
+ if (wrk->tmp_data) {
+ g_free (wrk->tmp_data);
}
+ ev_io_stop (rspamd_main->event_loop, &wrk->srv_ev);
}
- if (!found_proc) {
- msg_err_main ("got SIGCHLD but no workers were able to be waited: %s",
- strerror (errno));
+ if (wrk->control_pipe[0] != -1) {
+ /* We also need to clean descriptors left */
+ close (wrk->control_pipe[0]);
+ close (wrk->srv_pipe[0]);
}
- rspamd_log_lock (rspamd_main->logger);
-}
+ REF_RELEASE (wrk->cf);
-static void
-rspamd_final_term_handler (EV_P_ ev_timer *w, int revents)
-{
- struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
+ if (wrk->finish_actions) {
+ g_ptr_array_free (wrk->finish_actions, TRUE);
+ }
- term_attempts--;
+ need_refork = rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
- g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
-
- if (g_hash_table_size (rspamd_main->workers) == 0) {
- ev_break (rspamd_main->event_loop, EVBREAK_ALL);
+ if (need_refork) {
+ /* Fork another worker in replace of dead one */
+ rspamd_check_core_limits (rspamd_main);
+ rspamd_fork_delayed (wrk->cf, wrk->index, rspamd_main);
}
+
+ g_free (wrk);
+ rspamd_log_lock (rspamd_main->logger);
}
/* Control socket handler */
@@ -1299,10 +1178,9 @@ main (gint argc, gchar **argv, gchar **env)
GQuark type;
rspamd_inet_addr_t *control_addr = NULL;
struct ev_loop *event_loop;
- static ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev;
- static ev_io control_ev;
+ static ev_signal term_ev, int_ev, hup_ev, usr1_ev;
struct rspamd_main *rspamd_main;
- gboolean skip_pid = FALSE, valgrind_mode = FALSE;
+ gboolean skip_pid = FALSE;
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
g_thread_init (NULL);
@@ -1328,7 +1206,6 @@ main (gint argc, gchar **argv, gchar **env)
rspamd_main->cfg->libs_ctx = rspamd_init_libs ();
memset (&signals, 0, sizeof (struct sigaction));
- other_workers = g_array_new (FALSE, TRUE, sizeof (pid_t));
read_cmd_line (&argc, &argv, rspamd_main->cfg);
@@ -1517,7 +1394,6 @@ main (gint argc, gchar **argv, gchar **env)
/* Set title */
setproctitle ("main process");
-
/* Flush log */
rspamd_log_flush (rspamd_main->logger);
@@ -1576,12 +1452,6 @@ main (gint argc, gchar **argv, gchar **env)
usr1_ev.data = rspamd_main;
ev_signal_start (event_loop, &usr1_ev);
-
- /* XXX: deal with children */
- cld_ev.data = rspamd_main;
- ev_signal_init (&cld_ev, rspamd_cld_handler, SIGCHLD);
- ev_signal_start (event_loop, &cld_ev);
-
rspamd_check_core_limits (rspamd_main);
rspamd_mempool_lock_mutex (rspamd_main->start_mtx);
spawn_workers (rspamd_main, event_loop);
@@ -1599,41 +1469,6 @@ main (gint argc, gchar **argv, gchar **env)
}
ev_loop (event_loop, 0);
- /* We need to block signals unless children are waited for */
- rspamd_worker_block_signals ();
- ev_signal_stop (event_loop, &term_ev);
- ev_signal_stop (event_loop, &int_ev);
- ev_signal_stop (event_loop, &hup_ev);
- ev_signal_stop (event_loop, &cld_ev);
- ev_signal_stop (event_loop, &usr1_ev);
-
- if (control_fd != -1) {
- ev_io_stop (event_loop, &control_ev);
- close (control_fd);
- }
-
- if (valgrind_mode) {
- /* Special case if we are likely running with valgrind */
- term_attempts = TERMINATION_ATTEMPTS * 10;
- }
- else {
- term_attempts = TERMINATION_ATTEMPTS;
- }
-
-
- /* Wait for workers termination */
- ev_signal_init (&cld_ev, rspamd_final_cld_handler, SIGCHLD);
- ev_signal_start (event_loop, &cld_ev);
-
- g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
-
- static ev_timer ev_finale;
-
- ev_finale.data = rspamd_main;
- ev_timer_init (&ev_finale, rspamd_final_term_handler, 0.2, 0.2);
- ev_timer_start (event_loop, &ev_finale);
-
- ev_loop (event_loop, 0);
/* Maybe save roll history */
if (rspamd_main->cfg->history_file) {
diff --git a/src/rspamd.h b/src/rspamd.h
index e47271ca3..97e4c4c6a 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -69,6 +69,9 @@ struct rspamd_worker_accept_event {
struct rspamd_worker_accept_event *prev, *next;
};
+typedef void (*rspamd_worker_term_cb)(EV_P_ ev_child *, struct rspamd_main *,
+ struct rspamd_worker *);
+
/**
* Worker process structure
*/
@@ -95,6 +98,8 @@ struct rspamd_worker {
gpointer control_data; /**< used by control protocol to handle commands */
gpointer tmp_data; /**< used to avoid race condition to deal with control messages */
GPtrArray *finish_actions; /**< called when worker is terminated */
+ ev_child cld_ev; /**< to allow reaping */
+ rspamd_worker_term_cb term_handler; /**< custom term handler */
};
struct rspamd_abstract_worker_ctx {
@@ -280,6 +285,7 @@ struct rspamd_main {
uid_t workers_uid; /**< worker's uid running to */
gid_t workers_gid; /**< worker's gid running to */
gboolean is_privilleged; /**< true if run in privilleged mode */
+ gboolean wanna_die; /**< no respawn of processes */
gboolean cores_throttling; /**< turn off cores when limits are exceeded */
struct roll_history *history; /**< rolling history */
struct ev_loop *event_loop;