aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/libutil/util.c6
-rw-r--r--src/rspamd.c471
2 files changed, 250 insertions, 227 deletions
diff --git a/src/libutil/util.c b/src/libutil/util.c
index b0fe7e766..5e8951f65 100644
--- a/src/libutil/util.c
+++ b/src/libutil/util.c
@@ -691,6 +691,12 @@ rspamd_signals_init (struct sigaction *signals, void (*sig_handler)(gint))
sigaction (SIGUSR1, signals, NULL);
sigaction (SIGUSR2, signals, NULL);
sigaction (SIGALRM, signals, NULL);
+#ifdef SIGPOLL
+ sigaction (SIGPOLL, signals, NULL);
+#endif
+#ifdef SIGIO
+ sigaction (SIGIO, signals, NULL);
+#endif
/* Ignore SIGPIPE as we handle write errors manually */
sigemptyset (&sigpipe_act.sa_mask);
diff --git a/src/rspamd.c b/src/rspamd.c
index 5218b62e0..5b4b3e2c3 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -62,6 +62,13 @@
#include <google/profiler.h>
#endif
+#ifdef HAVE_SYS_IOCTL_H
+#include <sys/ioctl.h>
+#endif
+#ifdef HAVE_STROPS_H
+#include <stropts.h>
+#endif
+
#ifdef HAVE_OPENSSL
#include <openssl/rand.h>
#include <openssl/err.h>
@@ -98,16 +105,13 @@ static struct rspamd_worker * fork_worker (struct rspamd_main *,
static gboolean load_rspamd_config (struct rspamd_config *cfg,
gboolean init_modules);
-sig_atomic_t do_restart = 0;
-sig_atomic_t do_reopen_log = 0;
-sig_atomic_t do_terminate = 0;
-sig_atomic_t child_dead = 0;
-sig_atomic_t got_alarm = 0;
+/* Signals */
+static sig_atomic_t got_alarm = 0;
-#ifdef HAVE_SA_SIGINFO
-GQueue *signals_info = NULL;
-#endif
+/* Control socket */
+static gint control_fd;
+/* Cmdline options */
static gboolean config_test = FALSE;
static gboolean no_fork = FALSE;
static gchar **cfg_names = NULL;
@@ -126,11 +130,6 @@ static gboolean encrypt_password = FALSE;
static GList *workers_pending = NULL;
static GHashTable *ucl_vars = NULL;
-#ifdef HAVE_SA_SIGINFO
-static siginfo_t static_sg[64];
-static sig_atomic_t cur_sg = 0;
-#endif
-
/* List of unrelated forked processes */
static GArray *other_workers = NULL;
@@ -177,83 +176,6 @@ static GOptionEntry entries[] =
{ NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL }
};
-#ifndef HAVE_SA_SIGINFO
-static void
-sig_handler (gint signo)
-#else
-static void
-sig_handler (gint signo, siginfo_t *info, void *unused)
-#endif
-{
-#ifdef HAVE_SA_SIGINFO
- if (cur_sg < (sig_atomic_t)G_N_ELEMENTS (static_sg)) {
- memcpy (&static_sg[cur_sg++], info, sizeof (siginfo_t));
- }
- /* XXX: discard more than 64 simultaneous signals */
-#endif
-
- switch (signo) {
- case SIGHUP:
- do_restart = 1;
- break;
- case SIGINT:
- case SIGTERM:
- do_terminate = 1;
- break;
- case SIGCHLD:
- child_dead = 1;
- break;
- case SIGUSR1:
- do_reopen_log = 1;
- break;
- case SIGUSR2:
- /* Do nothing */
- break;
- case SIGALRM:
- got_alarm = 1;
- break;
- }
-}
-
-#ifdef HAVE_SA_SIGINFO
-
-static const gchar *
-chldsigcode (gint code) {
- switch (code) {
-#ifdef CLD_EXITED
- case CLD_EXITED:
- return "Child exited normally";
- case CLD_KILLED:
- return "Child has terminated abnormally but did not create a core file";
- case CLD_DUMPED:
- return "Child has terminated abnormally and created a core file";
- case CLD_TRAPPED:
- return "Traced child has trapped";
-#endif
- default:
- return "Unknown reason";
- }
-}
-
-/* Prints info about incoming signals by parsing siginfo structures */
-static void
-print_signals_info (void)
-{
- siginfo_t *inf;
-
- while ((inf = g_queue_pop_head (signals_info))) {
- if (inf->si_signo == SIGCHLD) {
- msg_info_main ("got SIGCHLD from child: %P; reason: '%s'",
- inf->si_pid, chldsigcode (inf->si_code));
- }
- else {
- msg_info_main ("got signal: '%s'; received from pid: %P; uid: %ul",
- g_strsignal (inf->si_signo), inf->si_pid, (gulong)inf->si_uid);
- }
- }
-}
-#endif
-
static void
read_cmd_line (gint *argc, gchar ***argv, struct rspamd_config *cfg)
@@ -345,15 +267,15 @@ detect_priv (struct rspamd_main *rspamd)
rspamd->workers_gid = grp->gr_gid;
}
else {
- rspamd->workers_gid = -1;
+ rspamd->workers_gid = (gid_t)-1;
}
rspamd->workers_uid = pwd->pw_uid;
}
}
else {
rspamd->is_privilleged = FALSE;
- rspamd->workers_uid = -1;
- rspamd->workers_gid = -1;
+ rspamd->workers_uid = (uid_t)-1;
+ rspamd->workers_gid = (gid_t)-1;
}
}
@@ -1088,51 +1010,155 @@ do_encrypt_password (void)
rspamd_explicit_memzero (password, sizeof (password));
}
+/* Signal handlers */
static void
-rspamd_init_main (struct rspamd_main *rspamd)
+rspamd_term_handler (gint signo, short what, gpointer arg)
{
- rspamd->server_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
- "main");
- rspamd_main->stat = rspamd_mempool_alloc0_shared (rspamd_main->server_pool,
- sizeof (struct rspamd_stat));
- /* Create rolling history */
- rspamd_main->history = rspamd_roll_history_new (rspamd_main->server_pool);
+ struct event_base *ev_base = arg;
+
+ msg_info_main ("catch termination signal, waiting for children");
+ rspamd_pass_signal (rspamd_main->workers, SIGTERM);
+
+ event_base_loopexit (ev_base, NULL);
}
-gint
-main (gint argc, gchar **argv, gchar **env)
+static void
+rspamd_usr1_handler (gint signo, short what, gpointer arg)
+{
+ struct event_base *ev_base = arg;
+
+ rspamd_log_reopen_priv (rspamd_main->logger,
+ rspamd_main->workers_uid,
+ rspamd_main->workers_gid);
+ g_hash_table_foreach (rspamd_main->workers, reopen_log_handler,
+ NULL);
+}
+
+static void
+rspamd_hup_handler (gint signo, short what, gpointer arg)
+{
+ struct event_base *ev_base = arg;
+
+ rspamd_log_reopen_priv (rspamd_main->logger,
+ rspamd_main->workers_uid,
+ rspamd_main->workers_gid);
+ msg_info_main ("rspamd "
+ RVERSION
+ " is restarting");
+ g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL);
+ rspamd_map_remove_all (rspamd_main->cfg);
+ reread_config (rspamd_main);
+ spawn_workers (rspamd_main);
+}
+
+static void
+rspamd_cld_handler (gint signo, short what, gpointer arg)
{
- gint res = 0, i;
- struct sigaction signals;
+ struct event_base *ev_base = arg;
+ guint i;
+ gint res = 0;
struct rspamd_worker *cur;
pid_t wrk;
+
+ msg_debug_main ("catch SIGCHLD signal, finding terminated worker");
+ /* Remove dead child form children list */
+ wrk = waitpid (0, &res, 0);
+ if ((cur =
+ g_hash_table_lookup (rspamd_main->workers,
+ GSIZE_TO_POINTER (wrk))) != NULL) {
+ /* Unlink dead process from queue and hash table */
+
+ g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (
+ wrk));
+
+ if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
+ /* Normal worker termination, do not fork one more */
+ msg_info_main ("%s process %P terminated normally",
+ g_quark_to_string (cur->type),
+ cur->pid);
+ }
+ else {
+ if (WIFSIGNALED (res)) {
+#ifdef WCOREDUMP
+ if (WCOREDUMP (res)) {
+ msg_warn_main (
+ "%s process %P terminated abnormally by signal: %d"
+ " and created core file",
+ g_quark_to_string (cur->type),
+ cur->pid,
+ WTERMSIG (res));
+ }
+ else {
+ msg_warn_main (
+ "%s process %P terminated abnormally by signal: %d"
+ " but NOT created core file",
+ g_quark_to_string (cur->type),
+ cur->pid,
+ WTERMSIG (res));
+ }
+#else
+ msg_warn_main (
+ "%s process %P terminated abnormally by signal: %d",
+ g_quark_to_string (cur->type),
+ cur->pid,
+ WTERMSIG (res));
+#endif
+ }
+ else {
+ msg_warn_main ("%s process %P terminated abnormally "
+ "with exit code %d",
+ g_quark_to_string (cur->type),
+ cur->pid,
+ WEXITSTATUS (res));
+ }
+ /* Fork another worker in replace of dead one */
+ delay_fork (cur->cf, cur->index);
+ }
+
+ g_free (cur);
+ }
+ else {
+ for (i = 0; i < (gint) other_workers->len; i++) {
+ if (g_array_index (other_workers, pid_t, i) == wrk) {
+ g_array_remove_index_fast (other_workers, i);
+ msg_info_main ("related process %P terminated", wrk);
+ }
+ }
+ }
+}
+
+gint
+main (gint argc, gchar **argv, gchar **env)
+{
+ gint i, res = 0;
+ struct sigaction signals, sigpipe_act;
worker_t **pworker;
GQuark type;
gpointer keypair;
GString *keypair_out;
+ rspamd_inet_addr_t *control_addr = NULL;
+ struct event_base *ev_base;
+ struct event term_ev, int_ev, cld_ev, hup_ev, usr1_ev, control_ev;
-#ifdef HAVE_SA_SIGINFO
- signals_info = g_queue_new ();
-#endif
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
g_thread_init (NULL);
#endif
- rspamd_main = (struct rspamd_main *)g_malloc0 (sizeof (struct rspamd_main));
+ rspamd_main = (struct rspamd_main *) g_malloc0 (sizeof (struct rspamd_main));
+ rspamd_main->server_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
+ "main");
+ rspamd_main->stat = rspamd_mempool_alloc0_shared (rspamd_main->server_pool,
+ sizeof (struct rspamd_stat));
+ /* Create rolling history */
+ rspamd_main->history = rspamd_roll_history_new (rspamd_main->server_pool);
rspamd_main->cfg =
- (struct rspamd_config *)g_malloc0 (sizeof (struct rspamd_config));
-
- if (!rspamd_main || !rspamd_main->cfg) {
- fprintf (stderr, "Cannot allocate memory\n");
- exit (-errno);
- }
+ (struct rspamd_config *) g_malloc0 (sizeof (struct rspamd_config));
#ifndef HAVE_SETPROCTITLE
init_title (argc, argv, env);
#endif
rspamd_init_libs ();
- rspamd_init_main (rspamd_main);
rspamd_init_cfg (rspamd_main->cfg, TRUE);
memset (&signals, 0, sizeof (struct sigaction));
@@ -1143,7 +1169,7 @@ main (gint argc, gchar **argv, gchar **env)
if (argc > 0) {
/* Parse variables */
- for (i = 0; i < argc; i ++) {
+ for (i = 0; i < argc; i++) {
if (strchr (argv[i], '=') != NULL) {
gchar *k, *v, *t;
@@ -1174,7 +1200,7 @@ main (gint argc, gchar **argv, gchar **env)
/* First set logger to console logger */
rspamd_main->cfg->log_type = RSPAMD_LOG_CONSOLE;
rspamd_set_logger (rspamd_main->cfg, type, rspamd_main);
- (void)rspamd_log_open (rspamd_main->logger);
+ (void) rspamd_log_open (rspamd_main->logger);
g_log_set_default_handler (rspamd_glib_log_function, rspamd_main->logger);
g_set_printerr_handler (rspamd_glib_printerr_function);
@@ -1183,7 +1209,7 @@ main (gint argc, gchar **argv, gchar **env)
pworker = &workers[0];
while (*pworker) {
/* Init string quarks */
- (void)g_quark_from_static_string ((*pworker)->name);
+ (void) g_quark_from_static_string ((*pworker)->name);
pworker++;
}
@@ -1207,8 +1233,9 @@ main (gint argc, gchar **argv, gchar **env)
exit (EXIT_FAILURE);
}
keypair_out = rspamd_http_connection_print_key (keypair,
- RSPAMD_KEYPAIR_PUBKEY|RSPAMD_KEYPAIR_PRIVKEY|RSPAMD_KEYPAIR_ID|
- RSPAMD_KEYPAIR_BASE32|RSPAMD_KEYPAIR_HUMAN);
+ RSPAMD_KEYPAIR_PUBKEY | RSPAMD_KEYPAIR_PRIVKEY |
+ RSPAMD_KEYPAIR_ID |
+ RSPAMD_KEYPAIR_BASE32 | RSPAMD_KEYPAIR_HUMAN);
rspamd_printf ("%v", keypair_out);
exit (EXIT_SUCCESS);
}
@@ -1232,10 +1259,11 @@ main (gint argc, gchar **argv, gchar **env)
}
/* Insert classifiers symbols */
- (void)rspamd_config_insert_classify_symbols (rspamd_main->cfg);
+ rspamd_config_insert_classify_symbols (rspamd_main->cfg);
- if (!rspamd_symbols_cache_validate (rspamd_main->cfg->cache, rspamd_main->cfg,
- FALSE)) {
+ if (!rspamd_symbols_cache_validate (rspamd_main->cfg->cache,
+ rspamd_main->cfg,
+ FALSE)) {
res = FALSE;
}
if (dump_cache) {
@@ -1263,10 +1291,13 @@ main (gint argc, gchar **argv, gchar **env)
gperf_profiler_init (rspamd_main->cfg, "main");
- msg_info_main ("rspamd " RVERSION " is starting, build id: " RID);
+ msg_info_main ("rspamd "
+ RVERSION
+ " is starting, build id: "
+ RID);
rspamd_main->cfg->cfg_name = rspamd_mempool_strdup (
- rspamd_main->cfg->cfg_pool,
- rspamd_main->cfg->cfg_name);
+ rspamd_main->cfg->cfg_pool,
+ rspamd_main->cfg->cfg_name);
/* Daemonize */
if (!rspamd_main->cfg->no_fork && daemon (0, 0) == -1) {
@@ -1278,11 +1309,17 @@ main (gint argc, gchar **argv, gchar **env)
rspamd_main->pid = getpid ();
rspamd_main->type = type;
- rspamd_signals_init (&signals, sig_handler);
+ /* Ignore SIGPIPE as we handle write errors manually */
+ sigemptyset (&sigpipe_act.sa_mask);
+ sigaddset (&sigpipe_act.sa_mask, SIGPIPE);
+ sigpipe_act.sa_handler = SIG_IGN;
+ sigpipe_act.sa_flags = 0;
+ sigaction (SIGPIPE, &sigpipe_act, NULL);
if (rspamd_main->cfg->pid_file == NULL) {
msg_info("pid file is not specified, skipping writing it");
- } else if (rspamd_write_pid (rspamd_main) == -1) {
+ }
+ else if (rspamd_write_pid (rspamd_main) == -1) {
msg_err_main ("cannot write pid file %s", rspamd_main->cfg->pid_file);
exit (-errno);
}
@@ -1290,17 +1327,66 @@ main (gint argc, gchar **argv, gchar **env)
/* Block signals to use sigsuspend in future */
sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL);
+ /* Set title */
setproctitle ("main process");
/* Init config cache */
rspamd_symbols_cache_init (rspamd_main->cfg->cache);
/* Validate cache */
- (void)rspamd_symbols_cache_validate (rspamd_main->cfg->cache, rspamd_main->cfg, FALSE);
+ (void) rspamd_symbols_cache_validate (rspamd_main->cfg->cache,
+ rspamd_main->cfg,
+ FALSE);
/* Flush log */
rspamd_log_flush (rspamd_main->logger);
+ /* Open control socket if needed */
+ control_fd = -1;
+ if (rspamd_main->cfg->control_socket_path) {
+ if (!rspamd_parse_inet_address (&control_addr,
+ rspamd_main->cfg->control_socket_path)) {
+ msg_err_main ("cannot parse inet address %s",
+ rspamd_main->cfg->control_socket_path);
+ }
+ else {
+ control_fd = rspamd_inet_address_listen (control_addr, SOCK_STREAM,
+ FALSE);
+ if (control_fd == -1) {
+ msg_err_main ("cannot open control socket at path: %s",
+ rspamd_main->cfg->control_socket_path);
+ }
+ else {
+ /* Generate SIGIO on reads from this socket */
+ if (fcntl (control_fd, F_SETOWN, getpid ()) == -1) {
+ msg_err_main ("fnctl to set F_SETOWN failed: %s",
+ strerror (errno));
+ }
+#ifdef HAVE_SETSIG
+ if (ioctl (control_fd, I_SETSIG, S_INPUT) == -1) {
+ msg_err_main ("ioctl I_SETSIG to set S_INPUT failed: %s",
+ strerror (errno));
+ }
+#elif defined(HAVE_OASYNC)
+ gint flags = fcntl (control_fd, F_GETFL);
+
+ if (flags == -1 || fcntl (control_fd, F_SETFL,
+ flags | O_ASYNC | O_NONBLOCK) == -1) {
+ msg_err_main ("fnctl F_SETFL to set O_ASYNC failed: %s",
+ strerror (errno));
+ }
+#else
+ msg_err_main ("cannot get notifications about the control socket");
+#endif
+ }
+ }
+ }
+
+ if (control_fd != -1) {
+ msg_info_main ("listening for control commands on %s",
+ rspamd_inet_address_to_string (control_addr));
+ }
+
/* Maybe read roll history */
if (rspamd_main->cfg->history_file) {
rspamd_roll_history_load (rspamd_main->history,
@@ -1314,105 +1400,31 @@ main (gint argc, gchar **argv, gchar **env)
rspamd_main->workers = g_hash_table_new (g_direct_hash, g_direct_equal);
spawn_workers (rspamd_main);
- /* Signal processing cycle */
- for (;; ) {
- msg_debug_main ("calling sigsuspend");
- sigemptyset (&signals.sa_mask);
- sigsuspend (&signals.sa_mask);
-#ifdef HAVE_SA_SIGINFO
- for (i = 0; i < cur_sg; i ++) {
- g_queue_push_head (signals_info, &static_sg[i]);
- }
- cur_sg = 0;
- print_signals_info ();
-#endif
- if (do_terminate) {
- do_terminate = 0;
- msg_info_main ("catch termination signal, waiting for children");
- rspamd_pass_signal (rspamd_main->workers, SIGTERM);
- break;
- }
- if (child_dead) {
- child_dead = 0;
- msg_debug_main ("catch SIGCHLD signal, finding terminated worker");
- /* Remove dead child form children list */
- wrk = waitpid (0, &res, 0);
- if ((cur =
- g_hash_table_lookup (rspamd_main->workers,
- GSIZE_TO_POINTER (wrk))) != NULL) {
- /* Unlink dead process from queue and hash table */
-
- g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (
- wrk));
-
- if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
- /* Normal worker termination, do not fork one more */
- msg_info_main ("%s process %P terminated normally",
- g_quark_to_string (cur->type),
- cur->pid);
- }
- else {
- if (WIFSIGNALED (res)) {
- msg_warn_main (
- "%s process %P terminated abnormally by signal: %d",
- g_quark_to_string (cur->type),
- cur->pid,
- WTERMSIG (res));
- }
- else {
- msg_warn_main ("%s process %P terminated abnormally",
- g_quark_to_string (cur->type),
- cur->pid);
- }
- /* Fork another worker in replace of dead one */
- delay_fork (cur->cf, cur->index);
- }
-
- g_free (cur);
- }
- else {
- for (i = 0; i < (gint)other_workers->len; i++) {
- if (g_array_index (other_workers, pid_t, i) == wrk) {
- g_array_remove_index_fast (other_workers, i);
- msg_info_main ("related process %P terminated", wrk);
- }
- }
- }
- }
- if (do_restart) {
- do_restart = 0;
- rspamd_log_reopen_priv (rspamd_main->logger,
- rspamd_main->workers_uid,
- rspamd_main->workers_gid);
- msg_info_main ("rspamd " RVERSION " is restarting");
- g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL);
- rspamd_map_remove_all (rspamd_main->cfg);
- reread_config (rspamd_main);
- spawn_workers (rspamd_main);
- }
- if (do_reopen_log) {
- do_reopen_log = 0;
- rspamd_log_reopen_priv (rspamd_main->logger,
- rspamd_main->workers_uid,
- rspamd_main->workers_gid);
- g_hash_table_foreach (rspamd_main->workers, reopen_log_handler,
- NULL);
- }
- if (got_alarm) {
- got_alarm = 0;
- fork_delayed (rspamd_main);
- }
- }
-
- /* Restore some signals */
+ /* Init event base */
+ ev_base = event_init ();
+ /* Unblock signals */
sigemptyset (&signals.sa_mask);
- sigaddset (&signals.sa_mask, SIGALRM);
- sigaddset (&signals.sa_mask, SIGINT);
- sigaddset (&signals.sa_mask, SIGTERM);
- sigaction (SIGALRM, &signals, NULL);
- sigaction (SIGTERM, &signals, NULL);
- sigaction (SIGINT, &signals, NULL);
- sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+ sigprocmask (SIG_SETMASK, &signals.sa_mask, NULL);
+
+ /* Set events for signals */
+ evsignal_set (&term_ev, SIGTERM, rspamd_term_handler, ev_base);
+ event_base_set (ev_base, &term_ev);
+ event_add (&term_ev, NULL);
+ evsignal_set (&int_ev, SIGINT, rspamd_term_handler, ev_base);
+ event_base_set (ev_base, &int_ev);
+ event_add (&int_ev, NULL);
+ evsignal_set (&hup_ev, SIGHUP, rspamd_hup_handler, ev_base);
+ event_base_set (ev_base, &hup_ev);
+ event_add (&hup_ev, NULL);
+ evsignal_set (&cld_ev, SIGCHLD, rspamd_cld_handler, ev_base);
+ event_base_set (ev_base, &cld_ev);
+ event_add (&cld_ev, NULL);
+ evsignal_set (&usr1_ev, SIGUSR1, rspamd_usr1_handler, ev_base);
+ event_base_set (ev_base, &usr1_ev);
+ event_add (&usr1_ev, NULL);
+
+ event_base_loop (ev_base, 0);
+
/* Set alarm for hard termination */
if (getenv ("G_SLICE") != NULL) {
/* Special case if we are likely running with valgrind */
@@ -1432,6 +1444,11 @@ main (gint argc, gchar **argv, gchar **env)
}
msg_info_main ("terminating...");
+
+ if (control_fd != -1) {
+ close (control_fd);
+ }
+
rspamd_symbols_cache_destroy (rspamd_main->cfg->cache);
rspamd_log_close (rspamd_main->logger);
rspamd_config_free (rspamd_main->cfg);