]> source.dussan.org Git - rspamd.git/commitdiff
[Rework] Rework children operations
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 20 Jun 2019 15:11:07 +0000 (16:11 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 22 Jun 2019 09:57:29 +0000 (10:57 +0100)
contrib/libev/CMakeLists.txt
contrib/libev/ev.c
contrib/libev/ev.h
src/libserver/worker_util.c
src/libserver/worker_util.h
src/rspamd.c
src/rspamd.h

index d363c3dbc8c32f55396f9b11ea8c4585de1b0d4a..f0350050d9f6f31d333ab3900453f004c132b887 100644 (file)
@@ -55,7 +55,7 @@ ENDIF()
 
 CONFIGURE_FILE(config.h.in libev-config.h)
 
-ADD_LIBRARY(rspamd-ev STATIC ${LIBEVSRC})
+ADD_LIBRARY(rspamd-ev SHARED ${LIBEVSRC})
 ADD_DEFINITIONS("-DEV_CONFIG_H=\"${CMAKE_CURRENT_BINARY_DIR}/libev-config.h\""
                -DEV_MULTIPLICITY=1
                -DEV_USE_FLOOR=1
@@ -68,4 +68,6 @@ ENDIF()
 
 IF(ENABLE_FULL_DEBUG MATCHES "ON")
        ADD_DEFINITIONS(-DEV_VERIFY=3)
-ENDIF()
\ No newline at end of file
+ENDIF()
+
+INSTALL(TARGETS rspamd-ev LIBRARY DESTINATION ${RSPAMD_LIBDIR})
\ No newline at end of file
index 4f3f0b3cec2ef65d5710f0ed228ebd61cc6e37dc..cb8127fc5ba911386a306bcfb107efe010e0495e 100644 (file)
@@ -1840,9 +1840,7 @@ typedef struct
   #include "ev_wrap.h"
 
   static struct ev_loop default_loop_struct;
-  EV_API_DECL struct ev_loop *ev_default_loop_ptr; /* needs to be initialised to make it a definition despite extern */
-  struct ev_loop *ev_default_loop_ptr = 0;
-
+  static struct ev_loop *ev_default_loop_ptr = 0;
 #else
 
   EV_API_DECL ev_tstamp ev_rt_now = 0; /* needs to be initialised to make it a definition despite extern */
@@ -2855,6 +2853,18 @@ ev_set_loop_release_cb (EV_P_ void (*release)(EV_P) EV_NOEXCEPT, void (*acquire)
 }
 #endif
 
+EV_INLINE struct ev_loop *
+ev_default_loop_uc_ (void) EV_NOEXCEPT
+{
+       return ev_default_loop_ptr;
+}
+
+EV_INLINE int
+ev_is_default_loop (EV_P) EV_NOEXCEPT
+{
+       return EV_A == EV_DEFAULT_UC;
+}
+
 /* initialise a loop structure, must be zero-initialised */
 noinline ecb_cold
 static void
index f76c892e430479e747e033f1ffa9c9c285848e25..cb7b2e4798192b963f8f9ca1c6b4dd892cb26fcd 100644 (file)
@@ -557,23 +557,6 @@ EV_API_DECL void ev_set_syserr_cb (void (*cb)(const char *msg) EV_NOEXCEPT) EV_N
 /* you can call this as often as you like */
 EV_API_DECL struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0)) EV_NOEXCEPT;
 
-#ifdef EV_API_STATIC
-EV_API_DECL struct ev_loop *ev_default_loop_ptr;
-#endif
-
-EV_INLINE struct ev_loop *
-ev_default_loop_uc_ (void) EV_NOEXCEPT
-{
-  extern struct ev_loop *ev_default_loop_ptr;
-
-  return ev_default_loop_ptr;
-}
-
-EV_INLINE int
-ev_is_default_loop (EV_P) EV_NOEXCEPT
-{
-  return EV_A == EV_DEFAULT_UC;
-}
 
 /* create and destroy alternative loops that don't handle signals */
 EV_API_DECL struct ev_loop *ev_loop_new (unsigned int flags EV_CPP (= 0)) EV_NOEXCEPT;
index f7b4ee9ab29070c2b41e1840b15f24b718eb655f..d849f542ef02e10b2a650d0db91760b0fc326b2a 100644 (file)
@@ -628,11 +628,25 @@ rspamd_worker_set_limits (struct rspamd_main *rspamd_main,
        }
 }
 
+static void
+rspamd_worker_on_term (EV_P_ ev_child *w, int revents)
+{
+       struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
+
+       if (wrk->term_handler) {
+               wrk->term_handler (EV_A_ w, wrk->srv, wrk);
+       }
+       else {
+               rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
+       }
+}
+
 struct rspamd_worker *
 rspamd_fork_worker (struct rspamd_main *rspamd_main,
-               struct rspamd_worker_conf *cf,
-               guint index,
-               struct ev_loop *ev_base)
+                                       struct rspamd_worker_conf *cf,
+                                       guint index,
+                                       struct ev_loop *ev_base,
+                                       rspamd_worker_term_cb term_handler)
 {
        struct rspamd_worker *wrk;
        gint rc;
@@ -662,6 +676,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
        wrk->ppid = getpid ();
        wrk->pid = fork ();
        wrk->cores_throttled = rspamd_main->cores_throttling;
+       wrk->term_handler = term_handler;
 
        switch (wrk->pid) {
        case 0:
@@ -753,6 +768,9 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
                rspamd_socket_nonblocking (wrk->control_pipe[0]);
                rspamd_socket_nonblocking (wrk->srv_pipe[0]);
                rspamd_srv_start_watching (rspamd_main, wrk, ev_base);
+               wrk->cld_ev.data = wrk;
+               ev_child_init (&wrk->cld_ev, rspamd_worker_on_term, wrk->pid, 0);
+               ev_child_start (rspamd_main->event_loop, &wrk->cld_ev);
                /* Insert worker into worker's table, pid is index */
                g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (
                                wrk->pid), wrk);
@@ -1146,4 +1164,87 @@ rspamd_worker_throttle_accept_events (gint sock, void *data)
                                throttling, 0.0);
                ev_timer_start (cur->event_loop, &cur->throttling_ev);
        }
+}
+
+gboolean
+rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
+                                                                struct rspamd_worker *wrk,
+                                                                int res)
+{
+       gboolean need_refork = TRUE;
+
+       if (wrk->wanna_die) {
+               /* Do not refork workers that are intended to be terminated */
+               need_refork = FALSE;
+       }
+
+       if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
+               /* Normal worker termination, do not fork one more */
+               msg_info_main ("%s process %P terminated normally",
+                               g_quark_to_string (wrk->type),
+                               wrk->pid);
+       }
+       else {
+               if (WIFSIGNALED (res)) {
+#ifdef WCOREDUMP
+                       if (WCOREDUMP (res)) {
+                               msg_warn_main (
+                                               "%s process %P terminated abnormally by signal: %s"
+                                               " and created core file",
+                                               g_quark_to_string (wrk->type),
+                                               wrk->pid,
+                                               g_strsignal (WTERMSIG (res)));
+                       }
+                       else {
+#ifdef HAVE_SYS_RESOURCE_H
+                               struct rlimit rlmt;
+                               (void) getrlimit (RLIMIT_CORE, &rlmt);
+
+                               msg_warn_main (
+                                               "%s process %P terminated abnormally by signal: %s"
+                                               " but NOT created core file (throttled=%s); "
+                                               "core file limits: %L current, %L max",
+                                               g_quark_to_string (wrk->type),
+                                               wrk->pid,
+                                               g_strsignal (WTERMSIG (res)),
+                                               wrk->cores_throttled ? "yes" : "no",
+                                               (gint64) rlmt.rlim_cur,
+                                               (gint64) rlmt.rlim_max);
+#else
+                               msg_warn_main (
+                                                               "%s process %P terminated abnormally by signal: %s"
+                                                               " but NOT created core file (throttled=%s); ",
+                                                               g_quark_to_string (wrk->type),
+                                                               wrk->pid,
+                                                               g_strsignal (WTERMSIG (res)),
+                                                               wrk->cores_throttled ? "yes" : "no");
+#endif
+                       }
+#else
+                       msg_warn_main (
+                                                       "%s process %P terminated abnormally by signal: %s",
+                                                       g_quark_to_string (wrk->type),
+                                                       wrk->pid,
+                                                       g_strsignal (WTERMSIG (res)));
+#endif
+                       if (WTERMSIG (res) == SIGUSR2) {
+                               /*
+                                * It is actually race condition when not started process
+                                * has been requested to be reloaded.
+                                *
+                                * We shouldn't refork on this
+                                */
+                               need_refork = FALSE;
+                       }
+               }
+               else {
+                       msg_warn_main ("%s process %P terminated abnormally "
+                                                  "with exit code %d",
+                                       g_quark_to_string (wrk->type),
+                                       wrk->pid,
+                                       WEXITSTATUS (res));
+               }
+       }
+
+       return need_refork;
 }
\ No newline at end of file
index 9693aa6ad60b63e8da85162610baddcfc39ec4e6..4946badcfceee46e29a080f3468361ad649591d8 100644 (file)
@@ -186,7 +186,9 @@ void rspamd_worker_session_cache_remove (void *cache, void *ptr);
  * Fork new worker with the specified configuration
  */
 struct rspamd_worker *rspamd_fork_worker (struct rspamd_main *,
-               struct rspamd_worker_conf *, guint idx, struct ev_loop *ev_base);
+                                                                                 struct rspamd_worker_conf *, guint idx,
+                                                                                 struct ev_loop *ev_base,
+                                                                                 rspamd_worker_term_cb term_handler);
 
 /**
  * Sets crash signals handlers if compiled with libunwind
@@ -210,6 +212,17 @@ void rspamd_worker_init_monitored (struct rspamd_worker *worker,
  */
 void rspamd_worker_throttle_accept_events (gint sock, void *data);
 
+/**
+ * Checks (and logs) the worker's termination status. Returns TRUE if a worker
+ * should be restarted.
+ * @param rspamd_main
+ * @param wrk
+ * @param status waitpid res
+ * @return TRUE if refork is desired
+ */
+gboolean rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
+               struct rspamd_worker *wrk, int status);
+
 #define msg_err_main(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
         rspamd_main->server_pool->tag.tagname, rspamd_main->server_pool->tag.uid, \
         G_STRFUNC, \
index d079617571baea03d1551aa1c41e0fc0cb679669..a1badd635c1241ff3ba15750bfab091934167dda 100644 (file)
 #define TERMINATION_ATTEMPTS 50
 
 static gboolean load_rspamd_config (struct rspamd_main *rspamd_main,
-               struct rspamd_config *cfg,
-               gboolean init_modules,
-               enum rspamd_post_load_options opts,
-               gboolean reload);
+                                                                       struct rspamd_config *cfg,
+                                                                       gboolean init_modules,
+                                                                       enum rspamd_post_load_options opts,
+                                                                       gboolean reload);
+static void rspamd_cld_handler (EV_P_ ev_child *w,
+                                                               struct rspamd_main *rspamd_main,
+                                                               struct rspamd_worker *wrk);
 
 /* Control socket */
 static gint control_fd;
+static ev_io control_ev;
+
+static gboolean valgrind_mode = FALSE;
 
 /* Cmdline options */
 static gboolean config_test = FALSE;
@@ -100,9 +106,6 @@ static gboolean skip_template = FALSE;
 
 static gint term_attempts = 0;
 
-/* List of unrelated forked processes */
-static GArray *other_workers = NULL;
-
 /* List of active listen sockets indexed by worker type */
 static GHashTable *listen_sockets = NULL;
 
@@ -186,8 +189,7 @@ read_cmd_line (gint *argc, gchar ***argv, struct rspamd_config *cfg)
 {
        GError *error = NULL;
        GOptionContext *context;
-       guint i, cfg_num;
-       pid_t r;
+       guint cfg_num;
 
        context = g_option_context_new ("- run rspamd daemon");
 #if defined(GIT_VERSION) && GIT_VERSION == 1
@@ -208,30 +210,13 @@ read_cmd_line (gint *argc, gchar ***argv, struct rspamd_config *cfg)
        cfg->rspamd_user = rspamd_user;
        cfg->rspamd_group = rspamd_group;
        cfg_num = cfg_names != NULL ? g_strv_length (cfg_names) : 0;
+
        if (cfg_num == 0) {
                cfg->cfg_name = FIXED_CONFIG_FILE;
        }
        else {
                cfg->cfg_name = cfg_names[0];
-       }
-
-       for (i = 1; i < cfg_num; i++) {
-               r = fork ();
-               if (r == 0) {
-                       /* Spawning new main process */
-                       cfg->cfg_name = cfg_names[i];
-                       (void)setsid ();
-               }
-               else if (r == -1) {
-                       fprintf (stderr,
-                               "fork failed while spawning process for %s configuration file: %s\n",
-                               cfg_names[i],
-                               strerror (errno));
-               }
-               else {
-                       /* Save pid to the list of other main processes, we need it to ignore SIGCHLD from them */
-                       g_array_append_val (other_workers, r);
-               }
+               g_assert (cfg_num == 1);
        }
 
        cfg->pid_file = rspamd_pidfile;
@@ -371,7 +356,8 @@ rspamd_fork_delayed_cb (EV_P_ ev_timer *w, int revents)
        ev_timer_stop (EV_A_ &waiting_worker->wait_ev);
        rspamd_fork_worker (waiting_worker->rspamd_main, waiting_worker->cf,
                        waiting_worker->oldindex,
-                       waiting_worker->rspamd_main->event_loop);
+                       waiting_worker->rspamd_main->event_loop,
+                       rspamd_cld_handler);
        REF_RELEASE (waiting_worker->cf);
        g_free (waiting_worker);
 }
@@ -553,7 +539,7 @@ make_listen_key (struct rspamd_worker_bind_conf *cf)
 }
 
 static void
-spawn_worker_type (struct rspamd_main *rspamd_main, struct ev_loop *ev_base,
+spawn_worker_type (struct rspamd_main *rspamd_main, struct ev_loop *event_loop,
                struct rspamd_worker_conf *cf)
 {
        gint i;
@@ -570,14 +556,15 @@ spawn_worker_type (struct rspamd_main *rspamd_main, struct ev_loop *ev_base,
                                        "cannot spawn more than 1 %s worker, so spawn one",
                                        cf->worker->name);
                }
-               rspamd_fork_worker (rspamd_main, cf, 0, ev_base);
+               rspamd_fork_worker (rspamd_main, cf, 0, event_loop, rspamd_cld_handler);
        }
        else if (cf->worker->flags & RSPAMD_WORKER_THREADED) {
-               rspamd_fork_worker (rspamd_main, cf, 0, ev_base);
+               rspamd_fork_worker (rspamd_main, cf, 0, event_loop, rspamd_cld_handler);
        }
        else {
                for (i = 0; i < cf->count; i++) {
-                       rspamd_fork_worker (rspamd_main, cf, i, ev_base);
+                       rspamd_fork_worker (rspamd_main, cf, i, event_loop,
+                                       rspamd_cld_handler);
                }
        }
 }
@@ -823,19 +810,6 @@ hash_worker_wait_callback (gpointer key, gpointer value, gpointer unused)
        return rspamd_worker_wait ((struct rspamd_worker *)value);
 }
 
-static void
-rspamd_final_cld_handler (EV_P_ ev_signal *w, int revents)
-{
-       struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
-       g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback,
-                       NULL);
-
-       if (g_hash_table_size (rspamd_main->workers) == 0) {
-               ev_break (rspamd_main->event_loop, EVBREAK_ALL);
-       }
-}
-
-
 struct core_check_cbdata {
        struct rspamd_config *cfg;
        gsize total_count;
@@ -1011,19 +985,52 @@ stop_srv_ev (gpointer key, gpointer value, gpointer ud)
        ev_io_stop (rspamd_main->event_loop, &cur->srv_ev);
 }
 
+static void
+rspamd_final_timer_handler (EV_P_ ev_timer *w, int revents)
+{
+       struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
+
+       term_attempts--;
+
+       g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
+
+       if (g_hash_table_size (rspamd_main->workers) == 0) {
+               ev_break (rspamd_main->event_loop, EVBREAK_ALL);
+       }
+}
+
 /* Signal handlers */
 static void
 rspamd_term_handler (struct ev_loop *loop, ev_signal *w, int revents)
 {
        struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
+       static ev_timer ev_finale;
 
-       msg_info_main ("catch termination signal, waiting for children");
-       rspamd_log_nolock (rspamd_main->logger);
-       /* Stop srv events to avoid false notifications */
-       g_hash_table_foreach (rspamd_main->workers, stop_srv_ev, rspamd_main);
-       rspamd_pass_signal (rspamd_main->workers, w->signum);
+       if (!rspamd_main->wanna_die) {
+               rspamd_main->wanna_die = TRUE;
+               msg_info_main ("catch termination signal, waiting for children");
+               rspamd_log_nolock (rspamd_main->logger);
+               /* Stop srv events to avoid false notifications */
+               g_hash_table_foreach (rspamd_main->workers, stop_srv_ev, rspamd_main);
+               rspamd_pass_signal (rspamd_main->workers, w->signum);
+
+               if (control_fd != -1) {
+                       ev_io_stop (rspamd_main->event_loop, &control_ev);
+                       close (control_fd);
+               }
 
-       ev_break (rspamd_main->event_loop, EVBREAK_ALL);
+               if (valgrind_mode) {
+                       /* Special case if we are likely running with valgrind */
+                       term_attempts = TERMINATION_ATTEMPTS * 10;
+               }
+               else {
+                       term_attempts = TERMINATION_ATTEMPTS;
+               }
+
+               ev_finale.data = rspamd_main;
+               ev_timer_init (&ev_finale, rspamd_final_timer_handler, 0.2, 0.2);
+               ev_timer_start (rspamd_main->event_loop, &ev_finale);
+       }
 }
 
 static void
@@ -1031,11 +1038,13 @@ rspamd_usr1_handler (struct ev_loop *loop, ev_signal *w, int revents)
 {
        struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
 
-       rspamd_log_reopen_priv (rspamd_main->logger,
-                       rspamd_main->workers_uid,
-                       rspamd_main->workers_gid);
-       g_hash_table_foreach (rspamd_main->workers, reopen_log_handler,
-                       NULL);
+       if (!rspamd_main->wanna_die) {
+               rspamd_log_reopen_priv (rspamd_main->logger,
+                               rspamd_main->workers_uid,
+                               rspamd_main->workers_gid);
+               g_hash_table_foreach (rspamd_main->workers, reopen_log_handler,
+                               NULL);
+       }
 }
 
 static void
@@ -1043,195 +1052,65 @@ rspamd_hup_handler (struct ev_loop *loop, ev_signal *w, int revents)
 {
        struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
 
-       msg_info_main ("rspamd "
-                       RVERSION
-                       " is restarting");
-       g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL);
-       rspamd_log_close_priv (rspamd_main->logger,
+       if (!rspamd_main->wanna_die) {
+               msg_info_main ("rspamd "
+                               RVERSION
+                               " is restarting");
+               g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL);
+               rspamd_log_close_priv (rspamd_main->logger,
                                FALSE,
                                rspamd_main->workers_uid,
                                rspamd_main->workers_gid);
-       reread_config (rspamd_main);
-       rspamd_check_core_limits (rspamd_main);
-       spawn_workers (rspamd_main, rspamd_main->event_loop);
+               reread_config (rspamd_main);
+               rspamd_check_core_limits (rspamd_main);
+               spawn_workers (rspamd_main, rspamd_main->event_loop);
+       }
 }
 
+/* Called when a dead child has been found */
+
 static void
-rspamd_cld_handler (EV_P_ ev_signal *w, int revents)
+rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
+                                       struct rspamd_worker *wrk)
 {
-       struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
-       guint i;
-       gint res = 0;
-       struct rspamd_worker *cur;
-       pid_t pid;
-       gboolean need_refork = TRUE, found_proc = FALSE;
+       gboolean need_refork;
 
        /* Turn off locking for logger */
        rspamd_log_nolock (rspamd_main->logger);
 
        msg_info_main ("got SIGCHLD signal, finding terminated workers");
        /* Remove dead child form children list */
-       for (;;) {
-               pid = waitpid (0, &res, WNOHANG);
-
-               if (pid == -1) {
-                       if (errno != EINTR) {
-                               msg_warn_main ("got unexpected system error when waiting: %s",
-                                               strerror (errno));
-                               break;
-                       }
-                       else {
-                               continue;
-                       }
-               }
-               else if (pid == 0) {
-                       /* No more processes to wait */
-                       break;
-               }
-
-               found_proc = TRUE;
-
-               if ((cur =
-                               g_hash_table_lookup (rspamd_main->workers,
-                                               GSIZE_TO_POINTER (pid))) != NULL) {
-                       /* Unlink dead process from queue and hash table */
-
-                       g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (
-                                       pid));
-
-                       if (cur->wanna_die) {
-                               /* Do not refork workers that are intended to be terminated */
-                               need_refork = FALSE;
-                       }
-
-                       if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
-                               /* Normal worker termination, do not fork one more */
-                               msg_info_main ("%s process %P terminated normally",
-                                               g_quark_to_string (cur->type),
-                                               cur->pid);
-                       }
-                       else {
-                               if (WIFSIGNALED (res)) {
-#ifdef WCOREDUMP
-                                       if (WCOREDUMP (res)) {
-                                               msg_warn_main (
-                                                               "%s process %P terminated abnormally by signal: %s"
-                                                               " and created core file",
-                                                               g_quark_to_string (cur->type),
-                                                               cur->pid,
-                                                               g_strsignal (WTERMSIG (res)));
-                                       }
-                                       else {
-#ifdef HAVE_SYS_RESOURCE_H
-                                               struct rlimit rlmt;
-                                               (void)getrlimit (RLIMIT_CORE, &rlmt);
-
-                                               msg_warn_main (
-                                                               "%s process %P terminated abnormally by signal: %s"
-                                                               " but NOT created core file (throttled=%s); "
-                                                               "core file limits: %L current, %L max",
-                                                               g_quark_to_string (cur->type),
-                                                               cur->pid,
-                                                               g_strsignal (WTERMSIG (res)),
-                                                               cur->cores_throttled ? "yes" : "no",
-                                                               (gint64)rlmt.rlim_cur,
-                                                               (gint64)rlmt.rlim_max);
-#else
-                                               msg_warn_main (
-                                                               "%s process %P terminated abnormally by signal: %s"
-                                                               " but NOT created core file (throttled=%s); ",
-                                                               g_quark_to_string (cur->type),
-                                                               cur->pid,
-                                                               g_strsignal (WTERMSIG (res)),
-                                                               cur->cores_throttled ? "yes" : "no");
-#endif
-                                       }
-#else
-                                       msg_warn_main (
-                                                       "%s process %P terminated abnormally by signal: %s",
-                                                       g_quark_to_string (cur->type),
-                                                       cur->pid,
-                                                       g_strsignal (WTERMSIG (res)));
-#endif
-                                       if (WTERMSIG (res) == SIGUSR2) {
-                                               /*
-                                                * It is actually race condition when not started process
-                                                * has been requested to be reloaded.
-                                                *
-                                                * We shouldn't refork on this
-                                                */
-                                               need_refork = FALSE;
-                                       }
-                               }
-                               else {
-                                       msg_warn_main ("%s process %P terminated abnormally "
-                                                       "with exit code %d",
-                                                       g_quark_to_string (cur->type),
-                                                       cur->pid,
-                                                       WEXITSTATUS (res));
-                               }
-
-                               if (need_refork) {
-                                       /* Fork another worker in replace of dead one */
-                                       rspamd_check_core_limits (rspamd_main);
-
-
-                                       rspamd_fork_delayed (cur->cf, cur->index, rspamd_main);
-                               }
-                       }
-
-                       if (cur->srv_pipe[0] != -1) {
-                               /* Ugly workaround */
-                               if (cur->tmp_data) {
-                                       g_free (cur->tmp_data);
-                               }
-                               ev_io_stop (rspamd_main->event_loop, &cur->srv_ev);
-                       }
-
-                       if (cur->control_pipe[0] != -1) {
-                               /* We also need to clean descriptors left */
-                               close (cur->control_pipe[0]);
-                               close (cur->srv_pipe[0]);
-                       }
-
-                       REF_RELEASE (cur->cf);
-
-                       if (cur->finish_actions) {
-                               g_ptr_array_free (cur->finish_actions, TRUE);
-                       }
-
-                       g_free (cur);
-               }
-               else {
-                       for (i = 0; i < other_workers->len; i++) {
-                               if (g_array_index (other_workers, pid_t, i) == pid) {
-                                       g_array_remove_index_fast (other_workers, i);
-                                       msg_info_main ("related process %P terminated", pid);
-                               }
-                       }
+       g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (wrk->pid));
+       if (wrk->srv_pipe[0] != -1) {
+               /* Ugly workaround */
+               if (wrk->tmp_data) {
+                       g_free (wrk->tmp_data);
                }
+               ev_io_stop (rspamd_main->event_loop, &wrk->srv_ev);
        }
 
-       if (!found_proc) {
-               msg_err_main ("got SIGCHLD but no workers were able to be waited: %s",
-                               strerror (errno));
+       if (wrk->control_pipe[0] != -1) {
+               /* We also need to clean descriptors left */
+               close (wrk->control_pipe[0]);
+               close (wrk->srv_pipe[0]);
        }
 
-       rspamd_log_lock (rspamd_main->logger);
-}
+       REF_RELEASE (wrk->cf);
 
-static void
-rspamd_final_term_handler (EV_P_ ev_timer *w, int revents)
-{
-       struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
+       if (wrk->finish_actions) {
+               g_ptr_array_free (wrk->finish_actions, TRUE);
+       }
 
-       term_attempts--;
+       need_refork = rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
 
-       g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
-
-       if (g_hash_table_size (rspamd_main->workers) == 0) {
-               ev_break (rspamd_main->event_loop, EVBREAK_ALL);
+       if (need_refork) {
+               /* Fork another worker in replace of dead one */
+               rspamd_check_core_limits (rspamd_main);
+               rspamd_fork_delayed (wrk->cf, wrk->index, rspamd_main);
        }
+
+       g_free (wrk);
+       rspamd_log_lock (rspamd_main->logger);
 }
 
 /* Control socket handler */
@@ -1299,10 +1178,9 @@ main (gint argc, gchar **argv, gchar **env)
        GQuark type;
        rspamd_inet_addr_t *control_addr = NULL;
        struct ev_loop *event_loop;
-       static ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev;
-       static ev_io control_ev;
+       static ev_signal term_ev, int_ev, hup_ev, usr1_ev;
        struct rspamd_main *rspamd_main;
-       gboolean skip_pid = FALSE, valgrind_mode = FALSE;
+       gboolean skip_pid = FALSE;
 
 #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
        g_thread_init (NULL);
@@ -1328,7 +1206,6 @@ main (gint argc, gchar **argv, gchar **env)
 
        rspamd_main->cfg->libs_ctx = rspamd_init_libs ();
        memset (&signals, 0, sizeof (struct sigaction));
-       other_workers = g_array_new (FALSE, TRUE, sizeof (pid_t));
 
        read_cmd_line (&argc, &argv, rspamd_main->cfg);
 
@@ -1517,7 +1394,6 @@ main (gint argc, gchar **argv, gchar **env)
        /* Set title */
        setproctitle ("main process");
 
-
        /* Flush log */
        rspamd_log_flush (rspamd_main->logger);
 
@@ -1576,12 +1452,6 @@ main (gint argc, gchar **argv, gchar **env)
        usr1_ev.data = rspamd_main;
        ev_signal_start (event_loop, &usr1_ev);
 
-
-       /* XXX: deal with children */
-       cld_ev.data = rspamd_main;
-       ev_signal_init (&cld_ev, rspamd_cld_handler, SIGCHLD);
-       ev_signal_start (event_loop, &cld_ev);
-
        rspamd_check_core_limits (rspamd_main);
        rspamd_mempool_lock_mutex (rspamd_main->start_mtx);
        spawn_workers (rspamd_main, event_loop);
@@ -1598,41 +1468,6 @@ main (gint argc, gchar **argv, gchar **env)
                ev_io_start (event_loop, &control_ev);
        }
 
-       ev_loop (event_loop, 0);
-       /* We need to block signals unless children are waited for */
-       rspamd_worker_block_signals ();
-       ev_signal_stop (event_loop, &term_ev);
-       ev_signal_stop (event_loop, &int_ev);
-       ev_signal_stop (event_loop, &hup_ev);
-       ev_signal_stop (event_loop, &cld_ev);
-       ev_signal_stop (event_loop, &usr1_ev);
-
-       if (control_fd != -1) {
-               ev_io_stop (event_loop, &control_ev);
-               close (control_fd);
-       }
-
-       if (valgrind_mode) {
-               /* Special case if we are likely running with valgrind */
-               term_attempts = TERMINATION_ATTEMPTS * 10;
-       }
-       else {
-               term_attempts = TERMINATION_ATTEMPTS;
-       }
-
-
-       /* Wait for workers termination */
-       ev_signal_init (&cld_ev, rspamd_final_cld_handler, SIGCHLD);
-       ev_signal_start (event_loop, &cld_ev);
-
-       g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
-
-       static ev_timer ev_finale;
-
-       ev_finale.data = rspamd_main;
-       ev_timer_init (&ev_finale, rspamd_final_term_handler, 0.2, 0.2);
-       ev_timer_start (event_loop, &ev_finale);
-
        ev_loop (event_loop, 0);
 
        /* Maybe save roll history */
index e47271ca35398a4b46c770376a6f81dd5bfe901b..97e4c4c6a9cc679a89a2451a7a5552787b53a0a0 100644 (file)
@@ -69,6 +69,9 @@ struct rspamd_worker_accept_event {
        struct rspamd_worker_accept_event *prev, *next;
 };
 
+typedef void (*rspamd_worker_term_cb)(EV_P_ ev_child *, struct rspamd_main *,
+               struct rspamd_worker *);
+
 /**
  * Worker process structure
  */
@@ -95,6 +98,8 @@ struct rspamd_worker {
        gpointer control_data;          /**< used by control protocol to handle commands        */
        gpointer tmp_data;              /**< used to avoid race condition to deal with control messages */
        GPtrArray *finish_actions;      /**< called when worker is terminated                           */
+       ev_child cld_ev;                /**< to allow reaping                                                           */
+       rspamd_worker_term_cb term_handler; /**< custom term handler                                            */
 };
 
 struct rspamd_abstract_worker_ctx {
@@ -280,6 +285,7 @@ struct rspamd_main {
        uid_t workers_uid;                                          /**< worker's uid running to                        */
        gid_t workers_gid;                                          /**< worker's gid running to                                                */
        gboolean is_privilleged;                                    /**< true if run in privilleged mode                */
+       gboolean wanna_die;                                         /**< no respawn of processes                                                */
        gboolean cores_throttling;                                  /**< turn off cores when limits are exceeded                */
        struct roll_history *history;                               /**< rolling history                                                                */
        struct ev_loop *event_loop;