From ce6035a9ecaf26c8db6ca51e118c3bf2a9947810 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Fri, 9 Oct 2015 14:46:04 +0100 Subject: Rework signals processing in main. --- src/rspamd.c | 471 +++++++++++++++++++++++++++++++---------------------------- 1 file changed, 244 insertions(+), 227 deletions(-) (limited to 'src/rspamd.c') diff --git a/src/rspamd.c b/src/rspamd.c index 5218b62e0..5b4b3e2c3 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -62,6 +62,13 @@ #include #endif +#ifdef HAVE_SYS_IOCTL_H +#include +#endif +#ifdef HAVE_STROPS_H +#include +#endif + #ifdef HAVE_OPENSSL #include #include @@ -98,16 +105,13 @@ static struct rspamd_worker * fork_worker (struct rspamd_main *, static gboolean load_rspamd_config (struct rspamd_config *cfg, gboolean init_modules); -sig_atomic_t do_restart = 0; -sig_atomic_t do_reopen_log = 0; -sig_atomic_t do_terminate = 0; -sig_atomic_t child_dead = 0; -sig_atomic_t got_alarm = 0; +/* Signals */ +static sig_atomic_t got_alarm = 0; -#ifdef HAVE_SA_SIGINFO -GQueue *signals_info = NULL; -#endif +/* Control socket */ +static gint control_fd; +/* Cmdline options */ static gboolean config_test = FALSE; static gboolean no_fork = FALSE; static gchar **cfg_names = NULL; @@ -126,11 +130,6 @@ static gboolean encrypt_password = FALSE; static GList *workers_pending = NULL; static GHashTable *ucl_vars = NULL; -#ifdef HAVE_SA_SIGINFO -static siginfo_t static_sg[64]; -static sig_atomic_t cur_sg = 0; -#endif - /* List of unrelated forked processes */ static GArray *other_workers = NULL; @@ -177,83 +176,6 @@ static GOptionEntry entries[] = { NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL } }; -#ifndef HAVE_SA_SIGINFO -static void -sig_handler (gint signo) -#else -static void -sig_handler (gint signo, siginfo_t *info, void *unused) -#endif -{ -#ifdef HAVE_SA_SIGINFO - if (cur_sg < (sig_atomic_t)G_N_ELEMENTS (static_sg)) { - memcpy (&static_sg[cur_sg++], info, sizeof (siginfo_t)); - } - /* XXX: discard more than 64 simultaneous signals */ -#endif - - switch (signo) { - case SIGHUP: - do_restart = 1; - break; - case SIGINT: - case SIGTERM: - do_terminate = 1; - break; - case SIGCHLD: - child_dead = 1; - break; - case SIGUSR1: - do_reopen_log = 1; - break; - case SIGUSR2: - /* Do nothing */ - break; - case SIGALRM: - got_alarm = 1; - break; - } -} - -#ifdef HAVE_SA_SIGINFO - -static const gchar * -chldsigcode (gint code) { - switch (code) { -#ifdef CLD_EXITED - case CLD_EXITED: - return "Child exited normally"; - case CLD_KILLED: - return "Child has terminated abnormally but did not create a core file"; - case CLD_DUMPED: - return "Child has terminated abnormally and created a core file"; - case CLD_TRAPPED: - return "Traced child has trapped"; -#endif - default: - return "Unknown reason"; - } -} - -/* Prints info about incoming signals by parsing siginfo structures */ -static void -print_signals_info (void) -{ - siginfo_t *inf; - - while ((inf = g_queue_pop_head (signals_info))) { - if (inf->si_signo == SIGCHLD) { - msg_info_main ("got SIGCHLD from child: %P; reason: '%s'", - inf->si_pid, chldsigcode (inf->si_code)); - } - else { - msg_info_main ("got signal: '%s'; received from pid: %P; uid: %ul", - g_strsignal (inf->si_signo), inf->si_pid, (gulong)inf->si_uid); - } - } -} -#endif - static void read_cmd_line (gint *argc, gchar ***argv, struct rspamd_config *cfg) @@ -345,15 +267,15 @@ detect_priv (struct rspamd_main *rspamd) rspamd->workers_gid = grp->gr_gid; } else { - rspamd->workers_gid = -1; + rspamd->workers_gid = (gid_t)-1; } rspamd->workers_uid = pwd->pw_uid; } } else { rspamd->is_privilleged = FALSE; - rspamd->workers_uid = -1; - rspamd->workers_gid = -1; + rspamd->workers_uid = (uid_t)-1; + rspamd->workers_gid = (gid_t)-1; } } @@ -1088,51 +1010,155 @@ do_encrypt_password (void) rspamd_explicit_memzero (password, sizeof (password)); } +/* Signal handlers */ static void -rspamd_init_main (struct rspamd_main *rspamd) +rspamd_term_handler (gint signo, short what, gpointer arg) { - rspamd->server_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), - "main"); - rspamd_main->stat = rspamd_mempool_alloc0_shared (rspamd_main->server_pool, - sizeof (struct rspamd_stat)); - /* Create rolling history */ - rspamd_main->history = rspamd_roll_history_new (rspamd_main->server_pool); + struct event_base *ev_base = arg; + + msg_info_main ("catch termination signal, waiting for children"); + rspamd_pass_signal (rspamd_main->workers, SIGTERM); + + event_base_loopexit (ev_base, NULL); } -gint -main (gint argc, gchar **argv, gchar **env) +static void +rspamd_usr1_handler (gint signo, short what, gpointer arg) +{ + struct event_base *ev_base = arg; + + rspamd_log_reopen_priv (rspamd_main->logger, + rspamd_main->workers_uid, + rspamd_main->workers_gid); + g_hash_table_foreach (rspamd_main->workers, reopen_log_handler, + NULL); +} + +static void +rspamd_hup_handler (gint signo, short what, gpointer arg) +{ + struct event_base *ev_base = arg; + + rspamd_log_reopen_priv (rspamd_main->logger, + rspamd_main->workers_uid, + rspamd_main->workers_gid); + msg_info_main ("rspamd " + RVERSION + " is restarting"); + g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL); + rspamd_map_remove_all (rspamd_main->cfg); + reread_config (rspamd_main); + spawn_workers (rspamd_main); +} + +static void +rspamd_cld_handler (gint signo, short what, gpointer arg) { - gint res = 0, i; - struct sigaction signals; + struct event_base *ev_base = arg; + guint i; + gint res = 0; struct rspamd_worker *cur; pid_t wrk; + + msg_debug_main ("catch SIGCHLD signal, finding terminated worker"); + /* Remove dead child form children list */ + wrk = waitpid (0, &res, 0); + if ((cur = + g_hash_table_lookup (rspamd_main->workers, + GSIZE_TO_POINTER (wrk))) != NULL) { + /* Unlink dead process from queue and hash table */ + + g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER ( + wrk)); + + if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { + /* Normal worker termination, do not fork one more */ + msg_info_main ("%s process %P terminated normally", + g_quark_to_string (cur->type), + cur->pid); + } + else { + if (WIFSIGNALED (res)) { +#ifdef WCOREDUMP + if (WCOREDUMP (res)) { + msg_warn_main ( + "%s process %P terminated abnormally by signal: %d" + " and created core file", + g_quark_to_string (cur->type), + cur->pid, + WTERMSIG (res)); + } + else { + msg_warn_main ( + "%s process %P terminated abnormally by signal: %d" + " but NOT created core file", + g_quark_to_string (cur->type), + cur->pid, + WTERMSIG (res)); + } +#else + msg_warn_main ( + "%s process %P terminated abnormally by signal: %d", + g_quark_to_string (cur->type), + cur->pid, + WTERMSIG (res)); +#endif + } + else { + msg_warn_main ("%s process %P terminated abnormally " + "with exit code %d", + g_quark_to_string (cur->type), + cur->pid, + WEXITSTATUS (res)); + } + /* Fork another worker in replace of dead one */ + delay_fork (cur->cf, cur->index); + } + + g_free (cur); + } + else { + for (i = 0; i < (gint) other_workers->len; i++) { + if (g_array_index (other_workers, pid_t, i) == wrk) { + g_array_remove_index_fast (other_workers, i); + msg_info_main ("related process %P terminated", wrk); + } + } + } +} + +gint +main (gint argc, gchar **argv, gchar **env) +{ + gint i, res = 0; + struct sigaction signals, sigpipe_act; worker_t **pworker; GQuark type; gpointer keypair; GString *keypair_out; + rspamd_inet_addr_t *control_addr = NULL; + struct event_base *ev_base; + struct event term_ev, int_ev, cld_ev, hup_ev, usr1_ev, control_ev; -#ifdef HAVE_SA_SIGINFO - signals_info = g_queue_new (); -#endif #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) g_thread_init (NULL); #endif - rspamd_main = (struct rspamd_main *)g_malloc0 (sizeof (struct rspamd_main)); + rspamd_main = (struct rspamd_main *) g_malloc0 (sizeof (struct rspamd_main)); + rspamd_main->server_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), + "main"); + rspamd_main->stat = rspamd_mempool_alloc0_shared (rspamd_main->server_pool, + sizeof (struct rspamd_stat)); + /* Create rolling history */ + rspamd_main->history = rspamd_roll_history_new (rspamd_main->server_pool); rspamd_main->cfg = - (struct rspamd_config *)g_malloc0 (sizeof (struct rspamd_config)); - - if (!rspamd_main || !rspamd_main->cfg) { - fprintf (stderr, "Cannot allocate memory\n"); - exit (-errno); - } + (struct rspamd_config *) g_malloc0 (sizeof (struct rspamd_config)); #ifndef HAVE_SETPROCTITLE init_title (argc, argv, env); #endif rspamd_init_libs (); - rspamd_init_main (rspamd_main); rspamd_init_cfg (rspamd_main->cfg, TRUE); memset (&signals, 0, sizeof (struct sigaction)); @@ -1143,7 +1169,7 @@ main (gint argc, gchar **argv, gchar **env) if (argc > 0) { /* Parse variables */ - for (i = 0; i < argc; i ++) { + for (i = 0; i < argc; i++) { if (strchr (argv[i], '=') != NULL) { gchar *k, *v, *t; @@ -1174,7 +1200,7 @@ main (gint argc, gchar **argv, gchar **env) /* First set logger to console logger */ rspamd_main->cfg->log_type = RSPAMD_LOG_CONSOLE; rspamd_set_logger (rspamd_main->cfg, type, rspamd_main); - (void)rspamd_log_open (rspamd_main->logger); + (void) rspamd_log_open (rspamd_main->logger); g_log_set_default_handler (rspamd_glib_log_function, rspamd_main->logger); g_set_printerr_handler (rspamd_glib_printerr_function); @@ -1183,7 +1209,7 @@ main (gint argc, gchar **argv, gchar **env) pworker = &workers[0]; while (*pworker) { /* Init string quarks */ - (void)g_quark_from_static_string ((*pworker)->name); + (void) g_quark_from_static_string ((*pworker)->name); pworker++; } @@ -1207,8 +1233,9 @@ main (gint argc, gchar **argv, gchar **env) exit (EXIT_FAILURE); } keypair_out = rspamd_http_connection_print_key (keypair, - RSPAMD_KEYPAIR_PUBKEY|RSPAMD_KEYPAIR_PRIVKEY|RSPAMD_KEYPAIR_ID| - RSPAMD_KEYPAIR_BASE32|RSPAMD_KEYPAIR_HUMAN); + RSPAMD_KEYPAIR_PUBKEY | RSPAMD_KEYPAIR_PRIVKEY | + RSPAMD_KEYPAIR_ID | + RSPAMD_KEYPAIR_BASE32 | RSPAMD_KEYPAIR_HUMAN); rspamd_printf ("%v", keypair_out); exit (EXIT_SUCCESS); } @@ -1232,10 +1259,11 @@ main (gint argc, gchar **argv, gchar **env) } /* Insert classifiers symbols */ - (void)rspamd_config_insert_classify_symbols (rspamd_main->cfg); + rspamd_config_insert_classify_symbols (rspamd_main->cfg); - if (!rspamd_symbols_cache_validate (rspamd_main->cfg->cache, rspamd_main->cfg, - FALSE)) { + if (!rspamd_symbols_cache_validate (rspamd_main->cfg->cache, + rspamd_main->cfg, + FALSE)) { res = FALSE; } if (dump_cache) { @@ -1263,10 +1291,13 @@ main (gint argc, gchar **argv, gchar **env) gperf_profiler_init (rspamd_main->cfg, "main"); - msg_info_main ("rspamd " RVERSION " is starting, build id: " RID); + msg_info_main ("rspamd " + RVERSION + " is starting, build id: " + RID); rspamd_main->cfg->cfg_name = rspamd_mempool_strdup ( - rspamd_main->cfg->cfg_pool, - rspamd_main->cfg->cfg_name); + rspamd_main->cfg->cfg_pool, + rspamd_main->cfg->cfg_name); /* Daemonize */ if (!rspamd_main->cfg->no_fork && daemon (0, 0) == -1) { @@ -1278,11 +1309,17 @@ main (gint argc, gchar **argv, gchar **env) rspamd_main->pid = getpid (); rspamd_main->type = type; - rspamd_signals_init (&signals, sig_handler); + /* Ignore SIGPIPE as we handle write errors manually */ + sigemptyset (&sigpipe_act.sa_mask); + sigaddset (&sigpipe_act.sa_mask, SIGPIPE); + sigpipe_act.sa_handler = SIG_IGN; + sigpipe_act.sa_flags = 0; + sigaction (SIGPIPE, &sigpipe_act, NULL); if (rspamd_main->cfg->pid_file == NULL) { msg_info("pid file is not specified, skipping writing it"); - } else if (rspamd_write_pid (rspamd_main) == -1) { + } + else if (rspamd_write_pid (rspamd_main) == -1) { msg_err_main ("cannot write pid file %s", rspamd_main->cfg->pid_file); exit (-errno); } @@ -1290,17 +1327,66 @@ main (gint argc, gchar **argv, gchar **env) /* Block signals to use sigsuspend in future */ sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL); + /* Set title */ setproctitle ("main process"); /* Init config cache */ rspamd_symbols_cache_init (rspamd_main->cfg->cache); /* Validate cache */ - (void)rspamd_symbols_cache_validate (rspamd_main->cfg->cache, rspamd_main->cfg, FALSE); + (void) rspamd_symbols_cache_validate (rspamd_main->cfg->cache, + rspamd_main->cfg, + FALSE); /* Flush log */ rspamd_log_flush (rspamd_main->logger); + /* Open control socket if needed */ + control_fd = -1; + if (rspamd_main->cfg->control_socket_path) { + if (!rspamd_parse_inet_address (&control_addr, + rspamd_main->cfg->control_socket_path)) { + msg_err_main ("cannot parse inet address %s", + rspamd_main->cfg->control_socket_path); + } + else { + control_fd = rspamd_inet_address_listen (control_addr, SOCK_STREAM, + FALSE); + if (control_fd == -1) { + msg_err_main ("cannot open control socket at path: %s", + rspamd_main->cfg->control_socket_path); + } + else { + /* Generate SIGIO on reads from this socket */ + if (fcntl (control_fd, F_SETOWN, getpid ()) == -1) { + msg_err_main ("fnctl to set F_SETOWN failed: %s", + strerror (errno)); + } +#ifdef HAVE_SETSIG + if (ioctl (control_fd, I_SETSIG, S_INPUT) == -1) { + msg_err_main ("ioctl I_SETSIG to set S_INPUT failed: %s", + strerror (errno)); + } +#elif defined(HAVE_OASYNC) + gint flags = fcntl (control_fd, F_GETFL); + + if (flags == -1 || fcntl (control_fd, F_SETFL, + flags | O_ASYNC | O_NONBLOCK) == -1) { + msg_err_main ("fnctl F_SETFL to set O_ASYNC failed: %s", + strerror (errno)); + } +#else + msg_err_main ("cannot get notifications about the control socket"); +#endif + } + } + } + + if (control_fd != -1) { + msg_info_main ("listening for control commands on %s", + rspamd_inet_address_to_string (control_addr)); + } + /* Maybe read roll history */ if (rspamd_main->cfg->history_file) { rspamd_roll_history_load (rspamd_main->history, @@ -1314,105 +1400,31 @@ main (gint argc, gchar **argv, gchar **env) rspamd_main->workers = g_hash_table_new (g_direct_hash, g_direct_equal); spawn_workers (rspamd_main); - /* Signal processing cycle */ - for (;; ) { - msg_debug_main ("calling sigsuspend"); - sigemptyset (&signals.sa_mask); - sigsuspend (&signals.sa_mask); -#ifdef HAVE_SA_SIGINFO - for (i = 0; i < cur_sg; i ++) { - g_queue_push_head (signals_info, &static_sg[i]); - } - cur_sg = 0; - print_signals_info (); -#endif - if (do_terminate) { - do_terminate = 0; - msg_info_main ("catch termination signal, waiting for children"); - rspamd_pass_signal (rspamd_main->workers, SIGTERM); - break; - } - if (child_dead) { - child_dead = 0; - msg_debug_main ("catch SIGCHLD signal, finding terminated worker"); - /* Remove dead child form children list */ - wrk = waitpid (0, &res, 0); - if ((cur = - g_hash_table_lookup (rspamd_main->workers, - GSIZE_TO_POINTER (wrk))) != NULL) { - /* Unlink dead process from queue and hash table */ - - g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER ( - wrk)); - - if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { - /* Normal worker termination, do not fork one more */ - msg_info_main ("%s process %P terminated normally", - g_quark_to_string (cur->type), - cur->pid); - } - else { - if (WIFSIGNALED (res)) { - msg_warn_main ( - "%s process %P terminated abnormally by signal: %d", - g_quark_to_string (cur->type), - cur->pid, - WTERMSIG (res)); - } - else { - msg_warn_main ("%s process %P terminated abnormally", - g_quark_to_string (cur->type), - cur->pid); - } - /* Fork another worker in replace of dead one */ - delay_fork (cur->cf, cur->index); - } - - g_free (cur); - } - else { - for (i = 0; i < (gint)other_workers->len; i++) { - if (g_array_index (other_workers, pid_t, i) == wrk) { - g_array_remove_index_fast (other_workers, i); - msg_info_main ("related process %P terminated", wrk); - } - } - } - } - if (do_restart) { - do_restart = 0; - rspamd_log_reopen_priv (rspamd_main->logger, - rspamd_main->workers_uid, - rspamd_main->workers_gid); - msg_info_main ("rspamd " RVERSION " is restarting"); - g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL); - rspamd_map_remove_all (rspamd_main->cfg); - reread_config (rspamd_main); - spawn_workers (rspamd_main); - } - if (do_reopen_log) { - do_reopen_log = 0; - rspamd_log_reopen_priv (rspamd_main->logger, - rspamd_main->workers_uid, - rspamd_main->workers_gid); - g_hash_table_foreach (rspamd_main->workers, reopen_log_handler, - NULL); - } - if (got_alarm) { - got_alarm = 0; - fork_delayed (rspamd_main); - } - } - - /* Restore some signals */ + /* Init event base */ + ev_base = event_init (); + /* Unblock signals */ sigemptyset (&signals.sa_mask); - sigaddset (&signals.sa_mask, SIGALRM); - sigaddset (&signals.sa_mask, SIGINT); - sigaddset (&signals.sa_mask, SIGTERM); - sigaction (SIGALRM, &signals, NULL); - sigaction (SIGTERM, &signals, NULL); - sigaction (SIGINT, &signals, NULL); - sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + sigprocmask (SIG_SETMASK, &signals.sa_mask, NULL); + + /* Set events for signals */ + evsignal_set (&term_ev, SIGTERM, rspamd_term_handler, ev_base); + event_base_set (ev_base, &term_ev); + event_add (&term_ev, NULL); + evsignal_set (&int_ev, SIGINT, rspamd_term_handler, ev_base); + event_base_set (ev_base, &int_ev); + event_add (&int_ev, NULL); + evsignal_set (&hup_ev, SIGHUP, rspamd_hup_handler, ev_base); + event_base_set (ev_base, &hup_ev); + event_add (&hup_ev, NULL); + evsignal_set (&cld_ev, SIGCHLD, rspamd_cld_handler, ev_base); + event_base_set (ev_base, &cld_ev); + event_add (&cld_ev, NULL); + evsignal_set (&usr1_ev, SIGUSR1, rspamd_usr1_handler, ev_base); + event_base_set (ev_base, &usr1_ev); + event_add (&usr1_ev, NULL); + + event_base_loop (ev_base, 0); + /* Set alarm for hard termination */ if (getenv ("G_SLICE") != NULL) { /* Special case if we are likely running with valgrind */ @@ -1432,6 +1444,11 @@ main (gint argc, gchar **argv, gchar **env) } msg_info_main ("terminating..."); + + if (control_fd != -1) { + close (control_fd); + } + rspamd_symbols_cache_destroy (rspamd_main->cfg->cache); rspamd_log_close (rspamd_main->logger); rspamd_config_free (rspamd_main->cfg); -- cgit v1.2.3