diff options
Diffstat (limited to 'src/main.c')
-rw-r--r-- | src/main.c | 427 |
1 files changed, 427 insertions, 0 deletions
diff --git a/src/main.c b/src/main.c new file mode 100644 index 000000000..d2dec7e47 --- /dev/null +++ b/src/main.c @@ -0,0 +1,427 @@ + +#include <sys/types.h> +#include <sys/time.h> +#include <sys/wait.h> +#include <sys/param.h> + +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <errno.h> +#include <signal.h> +#ifdef HAVE_LIBUTIL_H +#include <libutil.h> +#endif +#include <syslog.h> + +#include <EXTERN.h> /* from the Perl distribution */ +#include <perl.h> /* from the Perl distribution */ + +#include "main.h" +#include "cfg_file.h" +#include "util.h" + +struct config_file *cfg; + +static void sig_handler (int ); +static struct rspamd_worker * fork_worker (struct rspamd_main *, int, int, enum process_type); + +sig_atomic_t do_restart; +sig_atomic_t do_terminate; +sig_atomic_t child_dead; +sig_atomic_t child_ready; + +extern int yynerrs; +extern FILE *yyin; +extern void boot_DynaLoader (pTHX_ CV* cv); +extern void boot_Socket (pTHX_ CV* cv); + +PerlInterpreter *perl_interpreter; +/* XXX: remove this shit when it would be clear why perl need this line */ +PerlInterpreter *my_perl; + +static +void sig_handler (int signo) +{ + switch (signo) { + case SIGHUP: + do_restart = 1; + do_reopen_log = 1; + break; + case SIGINT: + case SIGTERM: + do_terminate = 1; + break; + case SIGCHLD: + child_dead = 1; + break; + case SIGUSR2: + child_ready = 1; + break; + } +} + +void +xs_init(pTHX) +{ + dXSUB_SYS; + /* DynaLoader is a special case */ + newXS ("DynaLoader::boot_DynaLoader", boot_DynaLoader, __FILE__); +} + +static void +init_filters (struct config_file *cfg) +{ + struct perl_module *module; + + LIST_FOREACH (module, &cfg->perl_modules, next) { + if (module->path) { + require_pv (module->path); + } + } +} + +static struct rspamd_worker * +fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum process_type type) +{ + struct rspamd_worker *cur; + char *cfg_file; + FILE *f; + struct config_file *tmp_cfg; + /* Starting worker process */ + cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker)); + if (cur) { + /* Reconfig needed */ + if (reconfig) { + tmp_cfg = (struct config_file *) g_malloc (sizeof (struct config_file)); + if (tmp_cfg) { + bzero (tmp_cfg, sizeof (struct config_file)); + tmp_cfg->cfg_pool = memory_pool_new (32768); + cfg_file = memory_pool_strdup (tmp_cfg->cfg_pool, rspamd->cfg->cfg_name); + f = fopen (rspamd->cfg->cfg_name , "r"); + if (f == NULL) { + msg_warn ("fork_worker: cannot open file: %s", rspamd->cfg->cfg_name ); + } + else { + yyin = f; + yyrestart (yyin); + + if (yyparse() != 0 || yynerrs > 0) { + msg_warn ("fork_worker: yyparse: cannot parse config file, %d errors", yynerrs); + fclose (f); + } + else { + free_config (rspamd->cfg); + g_free (rspamd->cfg); + rspamd->cfg = tmp_cfg; + rspamd->cfg->cfg_name = cfg_file; + } + } + } + } + bzero (cur, sizeof (struct rspamd_worker)); + TAILQ_INSERT_HEAD (&rspamd->workers, cur, next); + cur->srv = rspamd; + cur->pid = fork(); + switch (cur->pid) { + case 0: + /* TODO: add worker code */ + switch (type) { + case TYPE_CONTROLLER: + setproctitle ("controller process"); + pidfile_close (rspamd->pfh); + msg_info ("fork_worker: starting controller process %d", getpid ()); + cur->type = TYPE_CONTROLLER; + start_controller (cur); + break; + case TYPE_WORKER: + default: + setproctitle ("worker process"); + pidfile_close (rspamd->pfh); + msg_info ("fork_worker: starting worker process %d", getpid ()); + cur->type = TYPE_WORKER; + start_worker (cur, listen_sock); + break; + } + break; + case -1: + msg_err ("fork_worker: cannot fork main process. %m"); + pidfile_remove (rspamd->pfh); + exit (-errno); + break; + } + } + + return cur; +} + +int +main (int argc, char **argv) +{ + struct rspamd_main *rspamd; + struct module_ctx *cur_module = NULL; + int res = 0, i, listen_sock; + struct sigaction signals; + struct rspamd_worker *cur, *cur_tmp, *active_worker; + struct sockaddr_un *un_addr; + FILE *f; + pid_t wrk; + char *args[] = { "", "-e", "0", NULL }; + + rspamd = (struct rspamd_main *)g_malloc (sizeof (struct rspamd_main)); + bzero (rspamd, sizeof (struct rspamd_main)); + rspamd->server_pool = memory_pool_new (memory_pool_get_size ()); + cfg = (struct config_file *)g_malloc (sizeof (struct config_file)); + rspamd->cfg = cfg; + if (!rspamd || !rspamd->cfg) { + fprintf(stderr, "Cannot allocate memory\n"); + exit(-errno); + } + + do_terminate = 0; + do_restart = 0; + child_dead = 0; + child_ready = 0; + do_reopen_log = 0; + active_worker = NULL; + + rspamd->stat = memory_pool_alloc_shared (rspamd->server_pool, sizeof (struct rspamd_stat)); + bzero (rspamd->stat, sizeof (struct rspamd_stat)); + + bzero (rspamd->cfg, sizeof (struct config_file)); + rspamd->cfg->cfg_pool = memory_pool_new (memory_pool_get_size ()); + init_defaults (rspamd->cfg); + + bzero (&signals, sizeof (struct sigaction)); + + rspamd->cfg->cfg_name = memory_pool_strdup (rspamd->cfg->cfg_pool, FIXED_CONFIG_FILE); + read_cmd_line (argc, argv, rspamd->cfg); + + msg_warn ("(main) starting..."); + + #ifndef HAVE_SETPROCTITLE + init_title (argc, argv, environ); + #endif + + f = fopen (rspamd->cfg->cfg_name , "r"); + if (f == NULL) { + msg_warn ("cannot open file: %s", rspamd->cfg->cfg_name ); + return EBADF; + } + yyin = f; + + if (yyparse() != 0 || yynerrs > 0) { + msg_warn ("yyparse: cannot parse config file, %d errors", yynerrs); + return EBADF; + } + + fclose (f); + rspamd->cfg->cfg_name = memory_pool_strdup (rspamd->cfg->cfg_pool, rspamd->cfg->cfg_name ); + + /* Strictly set temp dir */ + if (!rspamd->cfg->temp_dir) { + msg_warn ("tempdir is not set, trying to use $TMPDIR"); + rspamd->cfg->temp_dir = memory_pool_strdup (rspamd->cfg->cfg_pool, getenv ("TMPDIR")); + + if (!rspamd->cfg->temp_dir) { + msg_warn ("$TMPDIR is empty too, using /tmp as default"); + rspamd->cfg->temp_dir = memory_pool_strdup (rspamd->cfg->cfg_pool, "/tmp"); + } + } + + switch (cfg->log_type) { + case RSPAMD_LOG_CONSOLE: + if (!rspamd->cfg->no_fork) { + fprintf (stderr, "Cannot log to console while daemonized, disable logging"); + cfg->log_fd = -1; + } + else { + cfg->log_fd = 2; + } + g_log_set_default_handler (file_log_function, cfg); + break; + case RSPAMD_LOG_FILE: + if (cfg->log_file == NULL || open_log (cfg) == -1) { + fprintf (stderr, "Fatal error, cannot open logfile, exiting"); + exit (EXIT_FAILURE); + } + g_log_set_default_handler (file_log_function, cfg); + break; + case RSPAMD_LOG_SYSLOG: + if (open_log (cfg) == -1) { + fprintf (stderr, "Fatal error, cannot open syslog facility, exiting"); + exit (EXIT_FAILURE); + } + g_log_set_default_handler (syslog_log_function, cfg); + break; + } + + if (!rspamd->cfg->no_fork && daemon (1, 1) == -1) { + fprintf (stderr, "Cannot daemonize\n"); + exit (-errno); + } + + if (write_pid (rspamd) == -1) { + msg_err ("main: cannot write pid file %s", rspamd->cfg->pid_file); + exit (-errno); + } + + /* Init C modules */ + for (i = 0; i < MODULES_NUM; i ++) { + cur_module = memory_pool_alloc (rspamd->cfg->cfg_pool, sizeof (struct module_ctx)); + if (modules[i].module_init_func(cfg, &cur_module) == 0) { + g_hash_table_insert (cfg->c_modules, (gpointer)modules[i].name, cur_module); + } + } + + rspamd->pid = getpid(); + rspamd->type = TYPE_MAIN; + + init_signals (&signals, sig_handler); + /* Init perl interpreter */ + PERL_SYS_INIT3 (&argc, &argv, &env); + perl_interpreter = perl_alloc (); + if (perl_interpreter == NULL) { + msg_err ("main: cannot allocate perl interpreter, %m"); + exit (-errno); + } + + my_perl = perl_interpreter; + PERL_SET_CONTEXT (perl_interpreter); + perl_construct (perl_interpreter); + PL_exit_flags |= PERL_EXIT_DESTRUCT_END; + perl_parse (perl_interpreter, xs_init, 3, args, NULL); + /* Block signals to use sigsuspend in future */ + sigprocmask(SIG_BLOCK, &signals.sa_mask, NULL); + + if (rspamd->cfg->bind_family == AF_INET) { + if ((listen_sock = make_socket (rspamd->cfg->bind_host, rspamd->cfg->bind_port)) == -1) { + msg_err ("main: cannot create tcp listen socket. %m"); + exit(-errno); + } + } + else { + un_addr = (struct sockaddr_un *) g_malloc (sizeof (struct sockaddr_un)); + if (!un_addr || (listen_sock = make_unix_socket (rspamd->cfg->bind_host, un_addr)) == -1) { + msg_err ("main: cannot create unix listen socket. %m"); + exit(-errno); + } + } + + if (listen (listen_sock, -1) == -1) { + msg_err ("main: cannot listen on socket. %m"); + exit(-errno); + } + + TAILQ_INIT (&rspamd->workers); + + setproctitle ("main process"); + + for (i = 0; i < cfg->workers_number; i++) { + fork_worker (rspamd, listen_sock, 0, TYPE_WORKER); + } + /* Start controller if enabled */ + if (cfg->controller_enabled) { + fork_worker (rspamd, listen_sock, 0, TYPE_CONTROLLER); + } + + /* Signal processing cycle */ + for (;;) { + msg_debug ("main: calling sigsuspend"); + sigemptyset (&signals.sa_mask); + sigsuspend (&signals.sa_mask); + if (do_terminate) { + msg_debug ("main: catch termination signal, waiting for childs"); + pass_signal_worker (&rspamd->workers, SIGTERM); + break; + } + if (child_dead) { + child_dead = 0; + msg_debug ("main: catch SIGCHLD signal, finding terminated worker"); + /* Remove dead child form childs list */ + wrk = waitpid (0, &res, 0); + TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) { + if (wrk == cur->pid) { + /* Catch situations if active worker is abnormally terminated */ + if (cur == active_worker) { + active_worker = NULL; + } + TAILQ_REMOVE(&rspamd->workers, cur, next); + if (cur->type == TYPE_CONTROLLER) { + msg_info ("main: do not restart dead controller"); + g_free (cur); + break; + } + if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { + /* Normal worker termination, do not fork one more */ + msg_info ("main: worker process %d terminated normally", cur->pid); + } + else { + if (WIFSIGNALED (res)) { + msg_warn ("main: worker process %d terminated abnormally by signal: %d", + cur->pid, WTERMSIG(res)); + } + else { + msg_warn ("main: worker process %d terminated abnormally", cur->pid); + } + /* Fork another worker in replace of dead one */ + fork_worker (rspamd, listen_sock, 0, cur->type); + } + g_free (cur); + } + } + } + if (do_restart) { + do_restart = 0; + + if (active_worker == NULL) { + /* Start new worker that would reread configuration*/ + active_worker = fork_worker (rspamd, listen_sock, 1, TYPE_WORKER); + } + /* Do not start new workers untill active worker is not ready for accept */ + } + if (child_ready) { + child_ready = 0; + + if (active_worker != NULL) { + msg_info ("main: worker process %d has been successfully started", active_worker->pid); + TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) { + if (cur != active_worker && !cur->is_dying) { + /* Send to old workers SIGUSR2 */ + kill (cur->pid, SIGUSR2); + cur->is_dying = 1; + } + } + active_worker = NULL; + } + } + } + + /* Wait for workers termination */ + while (!TAILQ_EMPTY(&rspamd->workers)) { + cur = TAILQ_FIRST(&rspamd->workers); + waitpid (cur->pid, &res, 0); + msg_debug ("main(cleaning): worker process %d terminated", cur->pid); + TAILQ_REMOVE(&rspamd->workers, cur, next); + g_free(cur); + } + + msg_info ("main: terminating..."); + + + if (rspamd->cfg->bind_family == AF_UNIX) { + unlink (rspamd->cfg->bind_host); + } + + free_config (rspamd->cfg); + g_free (rspamd->cfg); + g_free (rspamd); + + return (res); +} + +/* + * vi:ts=4 + */ |