You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

main.c 9.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. #include <sys/types.h>
  2. #include <sys/time.h>
  3. #include <sys/wait.h>
  4. #include <sys/param.h>
  5. #include <unistd.h>
  6. #include <stdio.h>
  7. #include <stdlib.h>
  8. #include <string.h>
  9. #include <time.h>
  10. #include <errno.h>
  11. #include <signal.h>
  12. #ifdef HAVE_LIBUTIL_H
  13. #include <libutil.h>
  14. #endif
  15. #include <syslog.h>
  16. #include <EXTERN.h> /* from the Perl distribution */
  17. #include <perl.h> /* from the Perl distribution */
  18. #include "main.h"
  19. #include "cfg_file.h"
  20. #include "util.h"
  21. struct config_file *cfg;
  22. static void sig_handler (int );
  23. static struct rspamd_worker * fork_worker (struct rspamd_main *, int, int, enum process_type);
  24. sig_atomic_t do_restart;
  25. sig_atomic_t do_terminate;
  26. sig_atomic_t child_dead;
  27. sig_atomic_t child_ready;
  28. extern int yynerrs;
  29. extern FILE *yyin;
  30. extern void boot_DynaLoader (pTHX_ CV* cv);
  31. extern void boot_Socket (pTHX_ CV* cv);
  32. PerlInterpreter *perl_interpreter;
  33. /* XXX: remove this shit when it would be clear why perl need this line */
  34. PerlInterpreter *my_perl;
  35. static
  36. void sig_handler (int signo)
  37. {
  38. switch (signo) {
  39. case SIGHUP:
  40. do_restart = 1;
  41. break;
  42. case SIGINT:
  43. case SIGTERM:
  44. do_terminate = 1;
  45. break;
  46. case SIGCHLD:
  47. child_dead = 1;
  48. break;
  49. case SIGUSR2:
  50. child_ready = 1;
  51. break;
  52. }
  53. }
  54. void
  55. xs_init(pTHX)
  56. {
  57. dXSUB_SYS;
  58. /* DynaLoader is a special case */
  59. newXS ("DynaLoader::boot_DynaLoader", boot_DynaLoader, __FILE__);
  60. }
  61. static void
  62. init_filters (struct config_file *cfg)
  63. {
  64. struct perl_module *module;
  65. LIST_FOREACH (module, &cfg->perl_modules, next) {
  66. if (module->path) {
  67. require_pv (module->path);
  68. }
  69. }
  70. }
  71. static struct rspamd_worker *
  72. fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum process_type type)
  73. {
  74. struct rspamd_worker *cur;
  75. char *cfg_file;
  76. FILE *f;
  77. struct config_file *tmp_cfg;
  78. /* Starting worker process */
  79. cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker));
  80. if (cur) {
  81. /* Reconfig needed */
  82. if (reconfig) {
  83. tmp_cfg = (struct config_file *) g_malloc (sizeof (struct config_file));
  84. if (tmp_cfg) {
  85. bzero (tmp_cfg, sizeof (struct config_file));
  86. tmp_cfg->cfg_pool = memory_pool_new (32768);
  87. cfg_file = memory_pool_strdup (tmp_cfg->cfg_pool, rspamd->cfg->cfg_name);
  88. f = fopen (rspamd->cfg->cfg_name , "r");
  89. if (f == NULL) {
  90. msg_warn ("fork_worker: cannot open file: %s", rspamd->cfg->cfg_name );
  91. }
  92. else {
  93. yyin = f;
  94. yyrestart (yyin);
  95. if (yyparse() != 0 || yynerrs > 0) {
  96. msg_warn ("fork_worker: yyparse: cannot parse config file, %d errors", yynerrs);
  97. fclose (f);
  98. }
  99. else {
  100. free_config (rspamd->cfg);
  101. g_free (rspamd->cfg);
  102. rspamd->cfg = tmp_cfg;
  103. rspamd->cfg->cfg_name = cfg_file;
  104. }
  105. }
  106. }
  107. }
  108. bzero (cur, sizeof (struct rspamd_worker));
  109. TAILQ_INSERT_HEAD (&rspamd->workers, cur, next);
  110. cur->srv = rspamd;
  111. cur->pid = fork();
  112. switch (cur->pid) {
  113. case 0:
  114. /* TODO: add worker code */
  115. switch (type) {
  116. case TYPE_WORKER:
  117. default:
  118. setproctitle ("worker process");
  119. pidfile_close (rspamd->pfh);
  120. msg_info ("fork_worker: starting worker process %d", getpid ());
  121. cur->type = TYPE_WORKER;
  122. start_worker (cur, listen_sock);
  123. break;
  124. }
  125. break;
  126. case -1:
  127. msg_err ("fork_worker: cannot fork main process. %m");
  128. pidfile_remove (rspamd->pfh);
  129. exit (-errno);
  130. break;
  131. }
  132. }
  133. return cur;
  134. }
  135. int
  136. main (int argc, char **argv)
  137. {
  138. struct rspamd_main *rspamd;
  139. struct module_ctx *cur_module = NULL;
  140. int res = 0, i, listen_sock;
  141. struct sigaction signals;
  142. struct rspamd_worker *cur, *cur_tmp, *active_worker;
  143. struct sockaddr_un *un_addr;
  144. FILE *f;
  145. pid_t wrk;
  146. char *args[] = { "", "-e", "0", NULL };
  147. rspamd = (struct rspamd_main *)g_malloc (sizeof (struct rspamd_main));
  148. bzero (rspamd, sizeof (struct rspamd_main));
  149. cfg = (struct config_file *)g_malloc (sizeof (struct config_file));
  150. rspamd->cfg = cfg;
  151. if (!rspamd || !rspamd->cfg) {
  152. fprintf(stderr, "Cannot allocate memory\n");
  153. exit(-errno);
  154. }
  155. do_terminate = 0;
  156. do_restart = 0;
  157. child_dead = 0;
  158. child_ready = 0;
  159. active_worker = NULL;
  160. bzero (rspamd->cfg, sizeof (struct config_file));
  161. rspamd->cfg->cfg_pool = memory_pool_new (32768);
  162. init_defaults (rspamd->cfg);
  163. bzero (&signals, sizeof (struct sigaction));
  164. rspamd->cfg->cfg_name = memory_pool_strdup (rspamd->cfg->cfg_pool, FIXED_CONFIG_FILE);
  165. read_cmd_line (argc, argv, rspamd->cfg);
  166. msg_warn ("(main) starting...");
  167. #ifndef HAVE_SETPROCTITLE
  168. init_title (argc, argv, environ);
  169. #endif
  170. f = fopen (rspamd->cfg->cfg_name , "r");
  171. if (f == NULL) {
  172. msg_warn ("cannot open file: %s", rspamd->cfg->cfg_name );
  173. return EBADF;
  174. }
  175. yyin = f;
  176. if (yyparse() != 0 || yynerrs > 0) {
  177. msg_warn ("yyparse: cannot parse config file, %d errors", yynerrs);
  178. return EBADF;
  179. }
  180. fclose (f);
  181. rspamd->cfg->cfg_name = memory_pool_strdup (rspamd->cfg->cfg_pool, rspamd->cfg->cfg_name );
  182. /* Strictly set temp dir */
  183. if (!rspamd->cfg->temp_dir) {
  184. msg_warn ("tempdir is not set, trying to use $TMPDIR");
  185. rspamd->cfg->temp_dir = memory_pool_strdup (rspamd->cfg->cfg_pool, getenv ("TMPDIR"));
  186. if (!rspamd->cfg->temp_dir) {
  187. rspamd->cfg->temp_dir = memory_pool_strdup (rspamd->cfg->cfg_pool, "/tmp");
  188. }
  189. }
  190. if (!rspamd->cfg->no_fork && daemon (1, 1) == -1) {
  191. fprintf (stderr, "Cannot daemonize\n");
  192. exit (-errno);
  193. }
  194. if (write_pid (rspamd) == -1) {
  195. msg_err ("main: cannot write pid file %s", rspamd->cfg->pid_file);
  196. exit (-errno);
  197. }
  198. /* Init C modules */
  199. for (i = 0; i < MODULES_NUM; i ++) {
  200. cur_module = memory_pool_alloc (rspamd->cfg->cfg_pool, sizeof (struct module_ctx));
  201. if (modules[i].module_init_func(cfg, &cur_module) == 0) {
  202. g_hash_table_insert (cfg->c_modules, (gpointer)modules[i].name, cur_module);
  203. }
  204. }
  205. rspamd->pid = getpid();
  206. rspamd->type = TYPE_MAIN;
  207. init_signals (&signals, sig_handler);
  208. /* Init perl interpreter */
  209. PERL_SYS_INIT3 (&argc, &argv, &env);
  210. perl_interpreter = perl_alloc ();
  211. if (perl_interpreter == NULL) {
  212. msg_err ("main: cannot allocate perl interpreter, %m");
  213. exit (-errno);
  214. }
  215. my_perl = perl_interpreter;
  216. PERL_SET_CONTEXT (perl_interpreter);
  217. perl_construct (perl_interpreter);
  218. PL_exit_flags |= PERL_EXIT_DESTRUCT_END;
  219. perl_parse (perl_interpreter, xs_init, 3, args, NULL);
  220. /* Block signals to use sigsuspend in future */
  221. sigprocmask(SIG_BLOCK, &signals.sa_mask, NULL);
  222. if (rspamd->cfg->bind_family == AF_INET) {
  223. if ((listen_sock = make_socket (rspamd->cfg->bind_host, rspamd->cfg->bind_port)) == -1) {
  224. msg_err ("main: cannot create tcp listen socket. %m");
  225. exit(-errno);
  226. }
  227. }
  228. else {
  229. un_addr = (struct sockaddr_un *) g_malloc (sizeof (struct sockaddr_un));
  230. if (!un_addr || (listen_sock = make_unix_socket (rspamd->cfg->bind_host, un_addr)) == -1) {
  231. msg_err ("main: cannot create unix listen socket. %m");
  232. exit(-errno);
  233. }
  234. }
  235. if (listen (listen_sock, -1) == -1) {
  236. msg_err ("main: cannot listen on socket. %m");
  237. exit(-errno);
  238. }
  239. TAILQ_INIT (&rspamd->workers);
  240. setproctitle ("main process");
  241. for (i = 0; i < cfg->workers_number; i++) {
  242. fork_worker (rspamd, listen_sock, 0, TYPE_WORKER);
  243. }
  244. /* Signal processing cycle */
  245. for (;;) {
  246. msg_debug ("main: calling sigsuspend");
  247. sigemptyset (&signals.sa_mask);
  248. sigsuspend (&signals.sa_mask);
  249. if (do_terminate) {
  250. msg_debug ("main: catch termination signal, waiting for childs");
  251. pass_signal_worker (&rspamd->workers, SIGTERM);
  252. break;
  253. }
  254. if (child_dead) {
  255. child_dead = 0;
  256. msg_debug ("main: catch SIGCHLD signal, finding terminated worker");
  257. /* Remove dead child form childs list */
  258. wrk = waitpid (0, &res, 0);
  259. TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
  260. if (wrk == cur->pid) {
  261. /* Catch situations if active worker is abnormally terminated */
  262. if (cur == active_worker) {
  263. active_worker = NULL;
  264. }
  265. TAILQ_REMOVE(&rspamd->workers, cur, next);
  266. if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
  267. /* Normal worker termination, do not fork one more */
  268. msg_info ("main: worker process %d terminated normally", cur->pid);
  269. }
  270. else {
  271. if (WIFSIGNALED (res)) {
  272. msg_warn ("main: worker process %d terminated abnormally by signal: %d",
  273. cur->pid, WTERMSIG(res));
  274. }
  275. else {
  276. msg_warn ("main: worker process %d terminated abnormally", cur->pid);
  277. }
  278. /* Fork another worker in replace of dead one */
  279. fork_worker (rspamd, listen_sock, 0, cur->type);
  280. }
  281. g_free (cur);
  282. }
  283. }
  284. }
  285. if (do_restart) {
  286. do_restart = 0;
  287. if (active_worker == NULL) {
  288. /* Start new worker that would reread configuration*/
  289. active_worker = fork_worker (rspamd, listen_sock, 1, TYPE_WORKER);
  290. }
  291. /* Do not start new workers untill active worker is not ready for accept */
  292. }
  293. if (child_ready) {
  294. child_ready = 0;
  295. if (active_worker != NULL) {
  296. msg_info ("main: worker process %d has been successfully started", active_worker->pid);
  297. TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
  298. if (cur != active_worker && !cur->is_dying) {
  299. /* Send to old workers SIGUSR2 */
  300. kill (cur->pid, SIGUSR2);
  301. cur->is_dying = 1;
  302. }
  303. }
  304. active_worker = NULL;
  305. }
  306. }
  307. }
  308. /* Wait for workers termination */
  309. while (!TAILQ_EMPTY(&rspamd->workers)) {
  310. cur = TAILQ_FIRST(&rspamd->workers);
  311. waitpid (cur->pid, &res, 0);
  312. msg_debug ("main(cleaning): worker process %d terminated", cur->pid);
  313. TAILQ_REMOVE(&rspamd->workers, cur, next);
  314. g_free(cur);
  315. }
  316. msg_info ("main: terminating...");
  317. if (rspamd->cfg->bind_family == AF_UNIX) {
  318. unlink (rspamd->cfg->bind_host);
  319. }
  320. free_config (rspamd->cfg);
  321. g_free (rspamd->cfg);
  322. g_free (rspamd);
  323. return (res);
  324. }
  325. /*
  326. * vi:ts=4
  327. */