Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

rspamd.c 39KB

10 år sedan
16 år sedan
16 år sedan
9 år sedan
9 år sedan
9 år sedan
9 år sedan
12 år sedan
12 år sedan
12 år sedan
12 år sedan
12 år sedan
12 år sedan
16 år sedan
16 år sedan
16 år sedan
16 år sedan
16 år sedan
16 år sedan
16 år sedan
16 år sedan
16 år sedan
16 år sedan

  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "rspamd.h"
  18. #include "libutil/map.h"
  19. #include "lua/lua_common.h"
  20. #include "libserver/worker_util.h"
  21. #include "libserver/rspamd_control.h"
  22. #include "ottery.h"
  23. #include "cryptobox.h"
  24. #include "utlist.h"
  25. #include "unix-std.h"
  26. /* sysexits */
  27. #ifdef HAVE_SYSEXITS_H
  28. #include <sysexits.h>
  29. #endif
  30. /* pwd and grp */
  31. #ifdef HAVE_PWD_H
  32. #include <pwd.h>
  33. #endif
  34. #ifdef HAVE_GRP_H
  35. #include <grp.h>
  36. #endif
  37. #ifdef HAVE_NFTW
  38. #include <ftw.h>
  39. #endif
  40. #include <signal.h>
  41. #ifdef HAVE_SYS_WAIT_H
  42. #include <sys/wait.h>
  43. #endif
  44. #ifdef HAVE_SYS_RESOURCE_H
  45. #include <sys/resource.h>
  46. #endif
  47. #ifdef HAVE_LIBUTIL_H
  48. #include <libutil.h>
  49. #endif
  50. #ifdef WITH_GPERF_TOOLS
  51. #include <gperftools/profiler.h>
  52. #endif
  53. #ifdef HAVE_STROPS_H
  54. #include <stropts.h>
  55. #endif
  56. #ifdef HAVE_OPENSSL
  57. #include <openssl/err.h>
  58. #include <openssl/evp.h>
  59. #endif
  60. #include "sqlite3.h"
  61. #include "contrib/libev/ev.h"
  62. /* 2 seconds to fork new process in place of dead one */
  63. #define SOFT_FORK_TIME 2
  64. /* 10 seconds after getting termination signal to terminate all workers with SIGKILL */
  65. #define TERMINATION_ATTEMPTS 50
  66. static gboolean load_rspamd_config (struct rspamd_main *rspamd_main,
  67. struct rspamd_config *cfg,
  68. gboolean init_modules,
  69. enum rspamd_post_load_options opts,
  70. gboolean reload);
  71. static void rspamd_cld_handler (EV_P_ ev_child *w,
  72. struct rspamd_main *rspamd_main,
  73. struct rspamd_worker *wrk);
  74. /* Control socket */
  75. static gint control_fd;
  76. static ev_io control_ev;
  77. static struct rspamd_stat old_stat;
  78. static ev_timer stat_ev;
  79. static gboolean valgrind_mode = FALSE;
  80. /* Cmdline options */
  81. static gboolean no_fork = FALSE;
  82. static gboolean show_version = FALSE;
  83. static gchar **cfg_names = NULL;
  84. static gchar *rspamd_user = NULL;
  85. static gchar *rspamd_group = NULL;
  86. static gchar *rspamd_pidfile = NULL;
  87. static gboolean is_debug = FALSE;
  88. static gboolean is_insecure = FALSE;
  89. static GHashTable *ucl_vars = NULL;
  90. static gchar **lua_env = NULL;
  91. static gboolean skip_template = FALSE;
  92. static gint term_attempts = 0;
  93. /* List of active listen sockets indexed by worker type */
  94. static GHashTable *listen_sockets = NULL;
  95. /* Defined in modules.c */
  96. extern module_t *modules[];
  97. extern worker_t *workers[];
  98. /* Command line options */
  99. static gboolean rspamd_parse_var (const gchar *option_name,
  100. const gchar *value, gpointer data,
  101. GError **error);
  102. static GOptionEntry entries[] =
  103. {
  104. { "no-fork", 'f', 0, G_OPTION_ARG_NONE, &no_fork,
  105. "Do not daemonize main process", NULL },
  106. { "config", 'c', 0, G_OPTION_ARG_FILENAME_ARRAY, &cfg_names,
  107. "Specify config file(s)", NULL },
  108. { "user", 'u', 0, G_OPTION_ARG_STRING, &rspamd_user,
  109. "User to run rspamd as", NULL },
  110. { "group", 'g', 0, G_OPTION_ARG_STRING, &rspamd_group,
  111. "Group to run rspamd as", NULL },
  112. { "pid", 'p', 0, G_OPTION_ARG_STRING, &rspamd_pidfile, "Path to pidfile",
  113. NULL },
  114. { "debug", 'd', 0, G_OPTION_ARG_NONE, &is_debug, "Force debug output",
  115. NULL },
  116. { "insecure", 'i', 0, G_OPTION_ARG_NONE, &is_insecure,
  117. "Ignore running workers as privileged users (insecure)", NULL },
  118. { "version", 'v', 0, G_OPTION_ARG_NONE, &show_version,
  119. "Show version and exit", NULL },
  120. {"var", 0, 0, G_OPTION_ARG_CALLBACK, (gpointer)&rspamd_parse_var,
  121. "Redefine/define environment variable", NULL},
  122. {"skip-template", 'T', 0, G_OPTION_ARG_NONE, &skip_template,
  123. "Do not apply Jinja templates", NULL},
  124. {"lua-env", '\0', 0, G_OPTION_ARG_FILENAME_ARRAY, &lua_env,
  125. "Load lua environment from the specified files", NULL},
  126. { NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL }
  127. };
  128. static gboolean
  129. rspamd_parse_var (const gchar *option_name,
  130. const gchar *value, gpointer data,
  131. GError **error)
  132. {
  133. gchar *k, *v, *t;
  134. t = strchr (value, '=');
  135. if (t != NULL) {
  136. k = g_strdup (value);
  137. t = k + (t - value);
  138. v = g_strdup (t + 1);
  139. *t = '\0';
  140. g_hash_table_insert (ucl_vars, k, v);
  141. }
  142. else {
  143. g_set_error (error, g_quark_try_string ("main"), EINVAL,
  144. "Bad variable format: %s", value);
  145. return FALSE;
  146. }
  147. return TRUE;
  148. }
  149. static void
  150. read_cmd_line (gint *argc, gchar ***argv, struct rspamd_config *cfg)
  151. {
  152. GError *error = NULL;
  153. GOptionContext *context;
  154. guint cfg_num;
  155. context = g_option_context_new ("- run rspamd daemon");
  156. #if defined(GIT_VERSION) && GIT_VERSION == 1
  157. g_option_context_set_summary (context,
  158. "Summary:\n Rspamd daemon version " RVERSION "-git\n Git id: " RID);
  159. #else
  160. g_option_context_set_summary (context,
  161. "Summary:\n Rspamd daemon version " RVERSION);
  162. #endif
  163. g_option_context_add_main_entries (context, entries, NULL);
  164. if (!g_option_context_parse (context, argc, argv, &error)) {
  165. fprintf (stderr, "option parsing failed: %s\n", error->message);
  166. g_option_context_free (context);
  167. exit (1);
  168. }
  169. cfg->rspamd_user = rspamd_user;
  170. cfg->rspamd_group = rspamd_group;
  171. cfg_num = cfg_names != NULL ? g_strv_length (cfg_names) : 0;
  172. if (cfg_num == 0) {
  173. cfg->cfg_name = FIXED_CONFIG_FILE;
  174. }
  175. else {
  176. cfg->cfg_name = cfg_names[0];
  177. g_assert (cfg_num == 1);
  178. }
  179. cfg->pid_file = rspamd_pidfile;
  180. g_option_context_free (context);
  181. }
  182. /* Detect privilleged mode */
  183. static void
  184. detect_priv (struct rspamd_main *rspamd_main)
  185. {
  186. struct passwd *pwd;
  187. struct group *grp;
  188. uid_t euid;
  189. euid = geteuid ();
  190. if (euid == 0) {
  191. if (!rspamd_main->cfg->rspamd_user && !is_insecure) {
  192. msg_err_main (
  193. "cannot run rspamd workers as root user, please add -u and -g options to select a proper unprivilleged user or specify --insecure flag");
  194. exit (EXIT_FAILURE);
  195. }
  196. else if (is_insecure) {
  197. rspamd_main->is_privilleged = TRUE;
  198. rspamd_main->workers_uid = 0;
  199. rspamd_main->workers_gid = 0;
  200. }
  201. else {
  202. rspamd_main->is_privilleged = TRUE;
  203. pwd = getpwnam (rspamd_main->cfg->rspamd_user);
  204. if (pwd == NULL) {
  205. msg_err_main ("user specified does not exists (%s), aborting",
  206. strerror (errno));
  207. exit (-errno);
  208. }
  209. if (rspamd_main->cfg->rspamd_group) {
  210. grp = getgrnam (rspamd_main->cfg->rspamd_group);
  211. if (grp == NULL) {
  212. msg_err_main ("group specified does not exists (%s), aborting",
  213. strerror (errno));
  214. exit (-errno);
  215. }
  216. rspamd_main->workers_gid = grp->gr_gid;
  217. }
  218. else {
  219. rspamd_main->workers_gid = (gid_t)-1;
  220. }
  221. rspamd_main->workers_uid = pwd->pw_uid;
  222. }
  223. }
  224. else {
  225. rspamd_main->is_privilleged = FALSE;
  226. rspamd_main->workers_uid = (uid_t)-1;
  227. rspamd_main->workers_gid = (gid_t)-1;
  228. }
  229. }
  230. static void
  231. config_logger (rspamd_mempool_t *pool, gpointer ud)
  232. {
  233. struct rspamd_main *rspamd_main = ud;
  234. rspamd_set_logger (rspamd_main->cfg, g_quark_try_string ("main"),
  235. &rspamd_main->logger, rspamd_main->server_pool);
  236. if (rspamd_log_open_priv (rspamd_main->logger,
  237. rspamd_main->workers_uid, rspamd_main->workers_gid) == -1) {
  238. fprintf (stderr, "Fatal error, cannot open logfile, exiting\n");
  239. exit (EXIT_FAILURE);
  240. }
  241. rspamd_logger_configure_modules (rspamd_main->cfg->debug_modules);
  242. }
  243. static void
  244. reread_config (struct rspamd_main *rspamd_main)
  245. {
  246. struct rspamd_config *tmp_cfg, *old_cfg;
  247. gchar *cfg_file;
  248. int load_opts = RSPAMD_CONFIG_INIT_VALIDATE|RSPAMD_CONFIG_INIT_SYMCACHE|
  249. RSPAMD_CONFIG_INIT_LIBS|RSPAMD_CONFIG_INIT_URL;
  250. rspamd_symcache_save (rspamd_main->cfg->cache);
  251. tmp_cfg = rspamd_config_new (RSPAMD_CONFIG_INIT_DEFAULT);
  252. tmp_cfg->libs_ctx = rspamd_main->cfg->libs_ctx;
  253. REF_RETAIN (tmp_cfg->libs_ctx);
  254. cfg_file = rspamd_mempool_strdup (tmp_cfg->cfg_pool,
  255. rspamd_main->cfg->cfg_name);
  256. /* Save some variables */
  257. tmp_cfg->cfg_name = cfg_file;
  258. old_cfg = rspamd_main->cfg;
  259. rspamd_main->cfg = tmp_cfg;
  260. if (!load_rspamd_config (rspamd_main, tmp_cfg, TRUE, load_opts, TRUE)) {
  261. rspamd_main->cfg = old_cfg;
  262. rspamd_log_close_priv (rspamd_main->logger,
  263. FALSE,
  264. rspamd_main->workers_uid,
  265. rspamd_main->workers_gid);
  266. rspamd_set_logger (rspamd_main->cfg, g_quark_try_string ("main"),
  267. &rspamd_main->logger, rspamd_main->server_pool);
  268. rspamd_log_open_priv (rspamd_main->logger,
  269. rspamd_main->workers_uid,
  270. rspamd_main->workers_gid);
  271. msg_err_main ("cannot parse new config file, revert to old one");
  272. REF_RELEASE (tmp_cfg);
  273. }
  274. else {
  275. msg_info_main ("replacing config");
  276. REF_RELEASE (old_cfg);
  277. rspamd_main->cfg->rspamd_user = rspamd_user;
  278. rspamd_main->cfg->rspamd_group = rspamd_group;
  279. /* Here, we can do post actions with the existing config */
  280. /*
  281. * As some rules are defined in lua, we need to process them, then init
  282. * modules and merely afterwards to init modules
  283. */
  284. rspamd_lua_post_load_config (tmp_cfg);
  285. rspamd_init_filters (tmp_cfg, TRUE);
  286. /* Do post-load actions */
  287. rspamd_config_post_load (tmp_cfg,
  288. load_opts|RSPAMD_CONFIG_INIT_POST_LOAD_LUA|RSPAMD_CONFIG_INIT_PRELOAD_MAPS);
  289. msg_info_main ("config has been reread successfully");
  290. }
  291. }
  292. struct waiting_worker {
  293. struct rspamd_main *rspamd_main;
  294. struct ev_timer wait_ev;
  295. struct rspamd_worker_conf *cf;
  296. guint oldindex;
  297. };
  298. static void
  299. rspamd_fork_delayed_cb (EV_P_ ev_timer *w, int revents)
  300. {
  301. struct waiting_worker *waiting_worker = (struct waiting_worker *)w->data;
  302. ev_timer_stop (EV_A_ &waiting_worker->wait_ev);
  303. rspamd_fork_worker (waiting_worker->rspamd_main, waiting_worker->cf,
  304. waiting_worker->oldindex,
  305. waiting_worker->rspamd_main->event_loop,
  306. rspamd_cld_handler);
  307. REF_RELEASE (waiting_worker->cf);
  308. g_free (waiting_worker);
  309. }
  310. static void
  311. rspamd_fork_delayed (struct rspamd_worker_conf *cf,
  312. guint index,
  313. struct rspamd_main *rspamd_main)
  314. {
  315. struct waiting_worker *nw;
  316. struct timeval tv;
  317. nw = g_malloc0 (sizeof (*nw));
  318. nw->cf = cf;
  319. nw->oldindex = index;
  320. nw->rspamd_main = rspamd_main;
  321. tv.tv_sec = SOFT_FORK_TIME;
  322. tv.tv_usec = 0;
  323. REF_RETAIN (cf);
  324. nw->wait_ev.data = nw;
  325. ev_timer_init (&nw->wait_ev, rspamd_fork_delayed_cb, SOFT_FORK_TIME, 0.0);
  326. ev_timer_start (rspamd_main->event_loop, &nw->wait_ev);
  327. }
  328. static GList *
  329. create_listen_socket (GPtrArray *addrs, guint cnt,
  330. enum rspamd_worker_socket_type listen_type)
  331. {
  332. GList *result = NULL;
  333. gint fd;
  334. guint i;
  335. struct rspamd_worker_listen_socket *ls;
  336. g_ptr_array_sort (addrs, rspamd_inet_address_compare_ptr);
  337. for (i = 0; i < cnt; i ++) {
  338. /*
  339. * Copy address to avoid reload issues
  340. */
  341. if (listen_type & RSPAMD_WORKER_SOCKET_TCP) {
  342. fd = rspamd_inet_address_listen (g_ptr_array_index (addrs, i),
  343. SOCK_STREAM, TRUE);
  344. if (fd != -1) {
  345. ls = g_malloc0 (sizeof (*ls));
  346. ls->addr = rspamd_inet_address_copy (g_ptr_array_index (addrs, i));
  347. ls->fd = fd;
  348. ls->type = RSPAMD_WORKER_SOCKET_TCP;
  349. result = g_list_prepend (result, ls);
  350. }
  351. }
  352. if (listen_type & RSPAMD_WORKER_SOCKET_UDP) {
  353. fd = rspamd_inet_address_listen (g_ptr_array_index (addrs, i),
  354. SOCK_DGRAM, TRUE);
  355. if (fd != -1) {
  356. ls = g_malloc0 (sizeof (*ls));
  357. ls->addr = rspamd_inet_address_copy (g_ptr_array_index (addrs, i));
  358. ls->fd = fd;
  359. ls->type = RSPAMD_WORKER_SOCKET_UDP;
  360. result = g_list_prepend (result, ls);
  361. }
  362. }
  363. }
  364. return result;
  365. }
  366. static GList *
  367. systemd_get_socket (struct rspamd_main *rspamd_main, gint number)
  368. {
  369. int sock, num_passed, flags;
  370. GList *result = NULL;
  371. const gchar *e;
  372. gchar *err;
  373. struct stat st;
  374. /* XXX: can we trust the current choice ? */
  375. static const int sd_listen_fds_start = 3;
  376. struct rspamd_worker_listen_socket *ls;
  377. union {
  378. struct sockaddr_storage ss;
  379. struct sockaddr sa;
  380. } addr_storage;
  381. socklen_t slen = sizeof (addr_storage);
  382. gint stype;
  383. e = getenv ("LISTEN_FDS");
  384. if (e != NULL) {
  385. errno = 0;
  386. num_passed = strtoul (e, &err, 10);
  387. if ((err == NULL || *err == '\0') && num_passed > number) {
  388. sock = number + sd_listen_fds_start;
  389. if (fstat (sock, &st) == -1) {
  390. msg_warn_main ("cannot stat systemd descriptor %d", sock);
  391. return NULL;
  392. }
  393. if (!S_ISSOCK (st.st_mode)) {
  394. msg_warn_main ("systemd descriptor %d is not a socket", sock);
  395. errno = EINVAL;
  396. return NULL;
  397. }
  398. flags = fcntl (sock, F_GETFD);
  399. if (flags != -1) {
  400. (void)fcntl (sock, F_SETFD, flags | FD_CLOEXEC);
  401. }
  402. rspamd_socket_nonblocking (sock);
  403. if (getsockname (sock, &addr_storage.sa, &slen) == -1) {
  404. msg_warn_main ("cannot get name for systemd descriptor %d: %s",
  405. sock, strerror (errno));
  406. errno = EINVAL;
  407. return NULL;
  408. }
  409. ls = g_malloc0 (sizeof (*ls));
  410. ls->addr = rspamd_inet_address_from_sa (&addr_storage.sa, slen);
  411. ls->fd = sock;
  412. slen = sizeof (stype);
  413. if (getsockopt (sock, SOL_SOCKET, SO_TYPE, &stype, &slen) != -1) {
  414. if (stype == SOCK_STREAM) {
  415. ls->type = RSPAMD_WORKER_SOCKET_TCP;
  416. }
  417. else {
  418. ls->type = RSPAMD_WORKER_SOCKET_UDP;
  419. }
  420. }
  421. else {
  422. msg_warn_main ("cannot get type for systemd descriptor %d: %s",
  423. sock, strerror (errno));
  424. ls->type = RSPAMD_WORKER_SOCKET_TCP;
  425. }
  426. result = g_list_prepend (result, ls);
  427. }
  428. else if (num_passed <= number) {
  429. msg_err_main ("systemd LISTEN_FDS does not contain the expected fd: %d",
  430. num_passed);
  431. errno = EOVERFLOW;
  432. }
  433. }
  434. else {
  435. msg_err_main ("cannot get systemd variable 'LISTEN_FDS'");
  436. errno = ENOENT;
  437. }
  438. return result;
  439. }
  440. static inline uintptr_t
  441. make_listen_key (struct rspamd_worker_bind_conf *cf)
  442. {
  443. rspamd_cryptobox_fast_hash_state_t st;
  444. guint i, keylen = 0;
  445. guint8 *key;
  446. rspamd_inet_addr_t *addr;
  447. guint16 port;
  448. rspamd_cryptobox_fast_hash_init (&st, rspamd_hash_seed ());
  449. if (cf->is_systemd) {
  450. rspamd_cryptobox_fast_hash_update (&st, "systemd", sizeof ("systemd"));
  451. rspamd_cryptobox_fast_hash_update (&st, &cf->cnt, sizeof (cf->cnt));
  452. }
  453. else {
  454. rspamd_cryptobox_fast_hash_update (&st, cf->name, strlen (cf->name));
  455. for (i = 0; i < cf->cnt; i ++) {
  456. addr = g_ptr_array_index (cf->addrs, i);
  457. key = rspamd_inet_address_get_hash_key (
  458. addr, &keylen);
  459. rspamd_cryptobox_fast_hash_update (&st, key, keylen);
  460. port = rspamd_inet_address_get_port (addr);
  461. rspamd_cryptobox_fast_hash_update (&st, &port, sizeof (port));
  462. }
  463. }
  464. return rspamd_cryptobox_fast_hash_final (&st);
  465. }
  466. static void
  467. spawn_worker_type (struct rspamd_main *rspamd_main, struct ev_loop *event_loop,
  468. struct rspamd_worker_conf *cf)
  469. {
  470. gint i;
  471. if (cf->count < 0) {
  472. msg_info_main ("skip spawning of worker %s: disabled in configuration",
  473. cf->worker->name);
  474. return;
  475. }
  476. if (cf->worker->flags & RSPAMD_WORKER_UNIQUE) {
  477. if (cf->count > 1) {
  478. msg_warn_main (
  479. "cannot spawn more than 1 %s worker, so spawn one",
  480. cf->worker->name);
  481. }
  482. rspamd_fork_worker (rspamd_main, cf, 0, event_loop, rspamd_cld_handler);
  483. }
  484. else if (cf->worker->flags & RSPAMD_WORKER_THREADED) {
  485. rspamd_fork_worker (rspamd_main, cf, 0, event_loop, rspamd_cld_handler);
  486. }
  487. else {
  488. for (i = 0; i < cf->count; i++) {
  489. rspamd_fork_worker (rspamd_main, cf, i, event_loop,
  490. rspamd_cld_handler);
  491. }
  492. }
  493. }
  494. static void
  495. spawn_workers (struct rspamd_main *rspamd_main, struct ev_loop *ev_base)
  496. {
  497. GList *cur, *ls;
  498. struct rspamd_worker_conf *cf;
  499. gpointer p;
  500. guintptr key;
  501. struct rspamd_worker_bind_conf *bcf;
  502. gboolean listen_ok = FALSE;
  503. GPtrArray *seen_mandatory_workers;
  504. worker_t **cw, *wrk;
  505. guint i;
  506. /* Special hack for hs_helper if it's not defined in a config */
  507. seen_mandatory_workers = g_ptr_array_new ();
  508. cur = rspamd_main->cfg->workers;
  509. while (cur) {
  510. cf = cur->data;
  511. listen_ok = FALSE;
  512. if (cf->worker == NULL) {
  513. msg_err_main ("type of worker is unspecified, skip spawning");
  514. }
  515. else {
  516. if (!cf->enabled || cf->count <= 0) {
  517. msg_info_main ("worker of type %s(%s) is disabled in the config, "
  518. "skip spawning", g_quark_to_string (cf->type),
  519. cf->bind_conf ? cf->bind_conf->bind_line : "none");
  520. cur = g_list_next (cur);
  521. continue;
  522. }
  523. if (cf->worker->flags & RSPAMD_WORKER_ALWAYS_START) {
  524. g_ptr_array_add (seen_mandatory_workers, cf->worker);
  525. }
  526. if (cf->worker->flags & RSPAMD_WORKER_HAS_SOCKET) {
  527. LL_FOREACH (cf->bind_conf, bcf) {
  528. key = make_listen_key (bcf);
  529. if ((p =
  530. g_hash_table_lookup (listen_sockets,
  531. GINT_TO_POINTER (key))) == NULL) {
  532. if (!bcf->is_systemd) {
  533. /* Create listen socket */
  534. ls = create_listen_socket (bcf->addrs, bcf->cnt,
  535. cf->worker->listen_type);
  536. }
  537. else {
  538. ls = systemd_get_socket (rspamd_main, bcf->cnt);
  539. }
  540. if (ls == NULL) {
  541. msg_err_main ("cannot listen on %s socket %s: %s",
  542. bcf->is_systemd ? "systemd" : "normal",
  543. bcf->name,
  544. strerror (errno));
  545. }
  546. else {
  547. g_hash_table_insert (listen_sockets, (gpointer)key, ls);
  548. listen_ok = TRUE;
  549. }
  550. }
  551. else {
  552. /* We had socket for this type of worker */
  553. ls = p;
  554. listen_ok = TRUE;
  555. }
  556. /* Do not add existing lists as it causes loops */
  557. if (g_list_position (cf->listen_socks, ls) == -1) {
  558. cf->listen_socks = g_list_concat (cf->listen_socks, ls);
  559. }
  560. }
  561. if (listen_ok) {
  562. spawn_worker_type (rspamd_main, ev_base, cf);
  563. }
  564. else {
  565. msg_err_main ("cannot create listen socket for %s at %s",
  566. g_quark_to_string (cf->type), cf->bind_conf->name);
  567. rspamd_hard_terminate (rspamd_main);
  568. g_assert_not_reached ();
  569. }
  570. }
  571. else {
  572. spawn_worker_type (rspamd_main, ev_base, cf);
  573. }
  574. }
  575. cur = g_list_next (cur);
  576. }
  577. for (cw = workers; *cw != NULL; cw ++) {
  578. gboolean seen = FALSE;
  579. wrk = *cw;
  580. if (wrk->flags & RSPAMD_WORKER_ALWAYS_START) {
  581. for (i = 0; i < seen_mandatory_workers->len; i ++) {
  582. if (wrk == g_ptr_array_index (seen_mandatory_workers, i)) {
  583. seen = TRUE;
  584. break;
  585. }
  586. }
  587. if (!seen) {
  588. cf = rspamd_config_new_worker (rspamd_main->cfg, NULL);
  589. cf->count = 1;
  590. cf->worker = wrk;
  591. cf->type = g_quark_from_static_string (wrk->name);
  592. if (cf->worker->worker_init_func) {
  593. cf->ctx = cf->worker->worker_init_func (rspamd_main->cfg);
  594. }
  595. spawn_worker_type (rspamd_main, ev_base, cf);
  596. }
  597. }
  598. }
  599. g_ptr_array_free (seen_mandatory_workers, TRUE);
  600. }
  601. static void
  602. kill_old_workers (gpointer key, gpointer value, gpointer unused)
  603. {
  604. struct rspamd_worker *w = value;
  605. struct rspamd_main *rspamd_main;
  606. rspamd_main = w->srv;
  607. if (!w->wanna_die) {
  608. w->wanna_die = TRUE;
  609. kill (w->pid, SIGUSR2);
  610. ev_io_stop (rspamd_main->event_loop, &w->srv_ev);
  611. msg_info_main ("send signal to worker %P", w->pid);
  612. }
  613. else {
  614. msg_info_main ("do not send signal to worker %P, already sent", w->pid);
  615. }
  616. }
  617. static void
  618. rspamd_worker_wait (struct rspamd_worker *w)
  619. {
  620. struct rspamd_main *rspamd_main;
  621. rspamd_main = w->srv;
  622. if (term_attempts < 0) {
  623. if (w->cf->worker->flags & RSPAMD_WORKER_KILLABLE) {
  624. msg_warn_main ("terminate worker %s(%P) with SIGKILL",
  625. g_quark_to_string (w->type), w->pid);
  626. if (kill (w->pid, SIGKILL) == -1) {
  627. if (errno == ESRCH) {
  628. /* We have actually killed the process */
  629. return;
  630. }
  631. }
  632. }
  633. else {
  634. if (term_attempts > -(TERMINATION_ATTEMPTS * 2)) {
  635. if (term_attempts % 10 == 0) {
  636. msg_info_main ("waiting for worker %s(%P) to sync, "
  637. "%d seconds remain",
  638. g_quark_to_string (w->type), w->pid,
  639. (TERMINATION_ATTEMPTS * 2 + term_attempts) / 5);
  640. kill (w->pid, SIGTERM);
  641. if (errno == ESRCH) {
  642. /* We have actually killed the process */
  643. return;
  644. }
  645. }
  646. }
  647. else {
  648. msg_err_main ("data corruption warning: terminating "
  649. "special worker %s(%P) with SIGKILL",
  650. g_quark_to_string (w->type), w->pid);
  651. kill (w->pid, SIGKILL);
  652. if (errno == ESRCH) {
  653. /* We have actually killed the process */
  654. return;
  655. }
  656. }
  657. }
  658. }
  659. }
  660. static void
  661. hash_worker_wait_callback (gpointer key, gpointer value, gpointer unused)
  662. {
  663. rspamd_worker_wait ((struct rspamd_worker *)value);
  664. }
  665. struct core_check_cbdata {
  666. struct rspamd_config *cfg;
  667. gsize total_count;
  668. gsize total_size;
  669. };
  670. #ifdef HAVE_NFTW
  671. static struct core_check_cbdata cores_cbdata;
  672. static gint
  673. rspamd_check_core_cb (const gchar *path, const struct stat *st,
  674. gint flag, struct FTW *ft)
  675. {
  676. if (S_ISREG (st->st_mode)) {
  677. cores_cbdata.total_count ++;
  678. /* Use physical size instead of displayed one */
  679. cores_cbdata.total_size += st->st_blocks * 512;
  680. }
  681. return 0;
  682. }
  683. #endif
  684. static void
  685. rspamd_check_core_limits (struct rspamd_main *rspamd_main)
  686. {
  687. #ifdef HAVE_NFTW
  688. struct rspamd_config *cfg = rspamd_main->cfg;
  689. cores_cbdata.cfg = cfg;
  690. cores_cbdata.total_count = 0;
  691. cores_cbdata.total_size = 0;
  692. if (cfg->cores_dir && (cfg->max_cores_count || cfg->max_cores_size)) {
  693. if (nftw (cfg->cores_dir, rspamd_check_core_cb, 1, FTW_MOUNT|FTW_PHYS)
  694. == -1) {
  695. msg_err_main ("nftw failed for path %s: %s", cfg->cores_dir,
  696. strerror (errno));
  697. }
  698. else {
  699. if (!rspamd_main->cores_throttling) {
  700. if (cfg->max_cores_size &&
  701. cores_cbdata.total_size > cfg->max_cores_size) {
  702. msg_warn_main (
  703. "enable cores throttling as size of cores in"
  704. " %s is %Hz, limit is %Hz",
  705. cfg->cores_dir,
  706. cores_cbdata.total_size,
  707. cfg->max_cores_size);
  708. rspamd_main->cores_throttling = TRUE;
  709. }
  710. if (cfg->max_cores_count &&
  711. cores_cbdata.total_count > cfg->max_cores_count) {
  712. msg_warn_main (
  713. "enable cores throttling as count of cores in"
  714. " %s is %z, limit is %z",
  715. cfg->cores_dir,
  716. cores_cbdata.total_count,
  717. cfg->max_cores_count);
  718. rspamd_main->cores_throttling = TRUE;
  719. }
  720. }
  721. else {
  722. if (cfg->max_cores_size &&
  723. cores_cbdata.total_size < cfg->max_cores_size) {
  724. msg_info_main (
  725. "disable cores throttling as size of cores in"
  726. " %s is now %Hz, limit is %Hz",
  727. cfg->cores_dir,
  728. cores_cbdata.total_size,
  729. cfg->max_cores_size);
  730. rspamd_main->cores_throttling = FALSE;
  731. }
  732. if (cfg->max_cores_count &&
  733. cores_cbdata.total_count < cfg->max_cores_count) {
  734. msg_info_main (
  735. "disable cores throttling as count of cores in"
  736. " %s is %z, limit is %z",
  737. cfg->cores_dir,
  738. cores_cbdata.total_count,
  739. cfg->max_cores_count);
  740. rspamd_main->cores_throttling = FALSE;
  741. }
  742. }
  743. }
  744. }
  745. #endif
  746. }
  747. static void
  748. reopen_log_handler (gpointer key, gpointer value, gpointer unused)
  749. {
  750. struct rspamd_worker *w = value;
  751. struct rspamd_main *rspamd_main;
  752. rspamd_main = w->srv;
  753. if (kill (w->pid, SIGUSR1) == -1) {
  754. msg_err_main ("kill failed for pid %P: %s", w->pid, strerror (errno));
  755. }
  756. }
  757. static gboolean
  758. load_rspamd_config (struct rspamd_main *rspamd_main,
  759. struct rspamd_config *cfg, gboolean init_modules,
  760. enum rspamd_post_load_options opts,
  761. gboolean reload)
  762. {
  763. cfg->compiled_modules = modules;
  764. cfg->compiled_workers = workers;
  765. if (!rspamd_config_read (cfg, cfg->cfg_name, config_logger, rspamd_main,
  766. ucl_vars, skip_template, lua_env)) {
  767. return FALSE;
  768. }
  769. /* Strictly set temp dir */
  770. if (!cfg->temp_dir) {
  771. msg_warn_main ("tempdir is not set, trying to use $TMPDIR");
  772. cfg->temp_dir =
  773. rspamd_mempool_strdup (cfg->cfg_pool, getenv ("TMPDIR"));
  774. if (!cfg->temp_dir) {
  775. msg_warn_main ("$TMPDIR is empty too, using /tmp as default");
  776. cfg->temp_dir = rspamd_mempool_strdup (cfg->cfg_pool, "/tmp");
  777. }
  778. }
  779. if (!reload) {
  780. /*
  781. * As some rules are defined in lua, we need to process them, then init
  782. * modules and merely afterwards to init modules
  783. */
  784. rspamd_lua_post_load_config (cfg);
  785. if (init_modules) {
  786. rspamd_init_filters (cfg, reload);
  787. }
  788. /* Do post-load actions */
  789. rspamd_config_post_load (cfg, opts);
  790. }
  791. return TRUE;
  792. }
  793. static void
  794. stop_srv_ev (gpointer key, gpointer value, gpointer ud)
  795. {
  796. struct rspamd_worker *cur = (struct rspamd_worker *)value;
  797. struct rspamd_main *rspamd_main = (struct rspamd_main *)ud;
  798. ev_io_stop (rspamd_main->event_loop, &cur->srv_ev);
  799. }
  800. static void
  801. rspamd_final_timer_handler (EV_P_ ev_timer *w, int revents)
  802. {
  803. struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
  804. term_attempts--;
  805. g_hash_table_foreach (rspamd_main->workers, hash_worker_wait_callback, NULL);
  806. if (g_hash_table_size (rspamd_main->workers) == 0) {
  807. ev_break (rspamd_main->event_loop, EVBREAK_ALL);
  808. }
  809. }
  810. /* Signal handlers */
  811. static void
  812. rspamd_term_handler (struct ev_loop *loop, ev_signal *w, int revents)
  813. {
  814. struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
  815. static ev_timer ev_finale;
  816. if (!rspamd_main->wanna_die) {
  817. rspamd_main->wanna_die = TRUE;
  818. msg_info_main ("catch termination signal, waiting for children");
  819. rspamd_log_nolock (rspamd_main->logger);
  820. /* Stop srv events to avoid false notifications */
  821. g_hash_table_foreach (rspamd_main->workers, stop_srv_ev, rspamd_main);
  822. rspamd_pass_signal (rspamd_main->workers, SIGTERM);
  823. if (control_fd != -1) {
  824. ev_io_stop (rspamd_main->event_loop, &control_ev);
  825. close (control_fd);
  826. }
  827. if (valgrind_mode) {
  828. /* Special case if we are likely running with valgrind */
  829. term_attempts = TERMINATION_ATTEMPTS * 10;
  830. }
  831. else {
  832. term_attempts = TERMINATION_ATTEMPTS;
  833. }
  834. ev_finale.data = rspamd_main;
  835. ev_timer_init (&ev_finale, rspamd_final_timer_handler, 0.2, 0.2);
  836. ev_timer_start (rspamd_main->event_loop, &ev_finale);
  837. }
  838. }
  839. static void
  840. rspamd_usr1_handler (struct ev_loop *loop, ev_signal *w, int revents)
  841. {
  842. struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
  843. if (!rspamd_main->wanna_die) {
  844. rspamd_log_reopen_priv (rspamd_main->logger,
  845. rspamd_main->workers_uid,
  846. rspamd_main->workers_gid);
  847. g_hash_table_foreach (rspamd_main->workers, reopen_log_handler,
  848. NULL);
  849. }
  850. }
  851. static void
  852. rspamd_stat_update_handler (struct ev_loop *loop, ev_timer *w, int revents)
  853. {
  854. struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
  855. struct rspamd_stat cur_stat;
  856. gchar proctitle[128];
  857. memcpy (&cur_stat, rspamd_main->stat, sizeof (cur_stat));
  858. if (old_stat.messages_scanned > 0 &&
  859. cur_stat.messages_scanned > old_stat.messages_scanned) {
  860. gdouble rate = (double)(cur_stat.messages_scanned - old_stat.messages_scanned) /
  861. w->repeat;
  862. gdouble old_spam = old_stat.actions_stat[METRIC_ACTION_REJECT] +
  863. old_stat.actions_stat[METRIC_ACTION_ADD_HEADER] +
  864. old_stat.actions_stat[METRIC_ACTION_REWRITE_SUBJECT];
  865. gdouble old_ham = old_stat.actions_stat[METRIC_ACTION_NOACTION];
  866. gdouble new_spam = cur_stat.actions_stat[METRIC_ACTION_REJECT] +
  867. cur_stat.actions_stat[METRIC_ACTION_ADD_HEADER] +
  868. cur_stat.actions_stat[METRIC_ACTION_REWRITE_SUBJECT];
  869. gdouble new_ham = cur_stat.actions_stat[METRIC_ACTION_NOACTION];
  870. rspamd_snprintf (proctitle, sizeof (proctitle),
  871. "main process; %.1f msg/sec, %.1f msg/sec spam, %.1f msg/sec ham",
  872. rate,
  873. (new_spam - old_spam) / w->repeat,
  874. (new_ham - old_ham) / w->repeat);
  875. setproctitle (proctitle);
  876. }
  877. memcpy (&old_stat, &cur_stat, sizeof (cur_stat));
  878. }
  879. static void
  880. rspamd_hup_handler (struct ev_loop *loop, ev_signal *w, int revents)
  881. {
  882. struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
  883. if (!rspamd_main->wanna_die) {
  884. msg_info_main ("rspamd "
  885. RVERSION
  886. " is restarting");
  887. g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL);
  888. rspamd_log_close_priv (rspamd_main->logger,
  889. FALSE,
  890. rspamd_main->workers_uid,
  891. rspamd_main->workers_gid);
  892. reread_config (rspamd_main);
  893. rspamd_check_core_limits (rspamd_main);
  894. spawn_workers (rspamd_main, rspamd_main->event_loop);
  895. }
  896. }
  897. /* Called when a dead child has been found */
  898. static void
  899. rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
  900. struct rspamd_worker *wrk)
  901. {
  902. gboolean need_refork;
  903. /* Turn off locking for logger */
  904. ev_child_stop (EV_A_ w);
  905. rspamd_log_nolock (rspamd_main->logger);
  906. /* Remove dead child form children list */
  907. g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (wrk->pid));
  908. if (wrk->srv_pipe[0] != -1) {
  909. /* Ugly workaround */
  910. if (wrk->tmp_data) {
  911. g_free (wrk->tmp_data);
  912. }
  913. ev_io_stop (rspamd_main->event_loop, &wrk->srv_ev);
  914. ev_timer_stop (rspamd_main->event_loop, &wrk->hb.heartbeat_ev);
  915. }
  916. if (wrk->control_pipe[0] != -1) {
  917. /* We also need to clean descriptors left */
  918. close (wrk->control_pipe[0]);
  919. close (wrk->srv_pipe[0]);
  920. }
  921. REF_RELEASE (wrk->cf);
  922. if (wrk->finish_actions) {
  923. g_ptr_array_free (wrk->finish_actions, TRUE);
  924. }
  925. need_refork = rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
  926. if (need_refork) {
  927. /* Fork another worker in replace of dead one */
  928. msg_info_main ("respawn process %s in lieu of terminated process with pid %P",
  929. g_quark_to_string (wrk->type),
  930. wrk->pid);
  931. rspamd_check_core_limits (rspamd_main);
  932. rspamd_fork_delayed (wrk->cf, wrk->index, rspamd_main);
  933. rspamd_log_lock (rspamd_main->logger);
  934. }
  935. else {
  936. msg_info_main ("do not respawn process %s after found terminated process with pid %P",
  937. g_quark_to_string (wrk->type),
  938. wrk->pid);
  939. }
  940. g_free (wrk);
  941. }
  942. /* Control socket handler */
  943. static void
  944. rspamd_control_handler (EV_P_ ev_io *w, int revents)
  945. {
  946. struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
  947. rspamd_inet_addr_t *addr;
  948. gint nfd;
  949. if ((nfd =
  950. rspamd_accept_from_socket (w->fd, &addr, NULL, NULL)) == -1) {
  951. msg_warn_main ("accept failed: %s", strerror (errno));
  952. return;
  953. }
  954. /* Check for EAGAIN */
  955. if (nfd == 0) {
  956. return;
  957. }
  958. msg_info_main ("accepted control connection from %s",
  959. rspamd_inet_address_to_string (addr));
  960. rspamd_control_process_client_socket (rspamd_main, nfd, addr);
  961. }
  962. static guint
  963. rspamd_spair_hash (gconstpointer p)
  964. {
  965. return rspamd_cryptobox_fast_hash (p, PAIR_ID_LEN, rspamd_hash_seed ());
  966. }
  967. static gboolean
  968. rspamd_spair_equal (gconstpointer a, gconstpointer b)
  969. {
  970. return memcmp (a, b, PAIR_ID_LEN) == 0;
  971. }
  972. static void
  973. rspamd_spair_close (gpointer p)
  974. {
  975. gint *fds = p;
  976. close (fds[0]);
  977. close (fds[1]);
  978. g_free (p);
  979. }
  980. static void
  981. version (void)
  982. {
  983. #if defined(GIT_VERSION) && GIT_VERSION == 1
  984. rspamd_printf ("Rspamd daemon version " RVERSION "-git." RID "\n");
  985. #else
  986. rspamd_printf ("Rspamd daemon version " RVERSION "\n");
  987. #endif
  988. }
  989. gint
  990. main (gint argc, gchar **argv, gchar **env)
  991. {
  992. gint i, res = 0;
  993. struct sigaction signals, sigpipe_act;
  994. worker_t **pworker;
  995. GQuark type;
  996. rspamd_inet_addr_t *control_addr = NULL;
  997. struct ev_loop *event_loop;
  998. struct rspamd_main *rspamd_main;
  999. gboolean skip_pid = FALSE;
  1000. rspamd_main = (struct rspamd_main *) g_malloc0 (sizeof (struct rspamd_main));
  1001. rspamd_main->server_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
  1002. "main");
  1003. rspamd_main->stat = rspamd_mempool_alloc0_shared (rspamd_main->server_pool,
  1004. sizeof (struct rspamd_stat));
  1005. rspamd_main->cfg = rspamd_config_new (RSPAMD_CONFIG_INIT_DEFAULT);
  1006. rspamd_main->spairs = g_hash_table_new_full (rspamd_spair_hash,
  1007. rspamd_spair_equal, g_free, rspamd_spair_close);
  1008. rspamd_main->start_mtx = rspamd_mempool_get_mutex (rspamd_main->server_pool);
  1009. if (getenv ("VALGRIND") != NULL) {
  1010. valgrind_mode = TRUE;
  1011. }
  1012. #ifndef HAVE_SETPROCTITLE
  1013. init_title (rspamd_main, argc, argv, env);
  1014. #endif
  1015. rspamd_main->cfg->libs_ctx = rspamd_init_libs ();
  1016. memset (&signals, 0, sizeof (struct sigaction));
  1017. read_cmd_line (&argc, &argv, rspamd_main->cfg);
  1018. if (show_version) {
  1019. version ();
  1020. exit (EXIT_SUCCESS);
  1021. }
  1022. if (argc > 0) {
  1023. /* Parse variables */
  1024. for (i = 0; i < argc; i++) {
  1025. if (strchr (argv[i], '=') != NULL) {
  1026. gchar *k, *v, *t;
  1027. k = g_strdup (argv[i]);
  1028. t = strchr (k, '=');
  1029. v = g_strdup (t + 1);
  1030. *t = '\0';
  1031. if (ucl_vars == NULL) {
  1032. ucl_vars = g_hash_table_new_full (rspamd_strcase_hash,
  1033. rspamd_strcase_equal, g_free, g_free);
  1034. }
  1035. g_hash_table_insert (ucl_vars, k, v);
  1036. }
  1037. }
  1038. }
  1039. if (is_debug) {
  1040. rspamd_main->cfg->log_level = G_LOG_LEVEL_DEBUG;
  1041. }
  1042. else {
  1043. rspamd_main->cfg->log_level = G_LOG_LEVEL_MESSAGE;
  1044. }
  1045. type = g_quark_from_static_string ("main");
  1046. /* First set logger to console logger */
  1047. rspamd_main->cfg->log_type = RSPAMD_LOG_CONSOLE;
  1048. rspamd_set_logger (rspamd_main->cfg, type,
  1049. &rspamd_main->logger, rspamd_main->server_pool);
  1050. (void) rspamd_log_open (rspamd_main->logger);
  1051. g_log_set_default_handler (rspamd_glib_log_function, rspamd_main->logger);
  1052. g_set_printerr_handler (rspamd_glib_printerr_function);
  1053. detect_priv (rspamd_main);
  1054. pworker = &workers[0];
  1055. while (*pworker) {
  1056. /* Init string quarks */
  1057. (void) g_quark_from_static_string ((*pworker)->name);
  1058. pworker++;
  1059. }
  1060. /* Init listen sockets hash */
  1061. listen_sockets = g_hash_table_new (g_direct_hash, g_direct_equal);
  1062. rspamd_log_close_priv (rspamd_main->logger, FALSE,
  1063. rspamd_main->workers_uid, rspamd_main->workers_gid);
  1064. sqlite3_initialize ();
  1065. /* Load config */
  1066. if (!load_rspamd_config (rspamd_main, rspamd_main->cfg, TRUE,
  1067. RSPAMD_CONFIG_LOAD_ALL, FALSE)) {
  1068. exit (EXIT_FAILURE);
  1069. }
  1070. /* Override pidfile from configuration by command line argument */
  1071. if (rspamd_pidfile != NULL) {
  1072. rspamd_main->cfg->pid_file = rspamd_pidfile;
  1073. }
  1074. /* Force debug log */
  1075. if (is_debug) {
  1076. rspamd_main->cfg->log_level = G_LOG_LEVEL_DEBUG;
  1077. }
  1078. /* Create rolling history */
  1079. rspamd_main->history = rspamd_roll_history_new (rspamd_main->server_pool,
  1080. rspamd_main->cfg->history_rows, rspamd_main->cfg);
  1081. gperf_profiler_init (rspamd_main->cfg, "main");
  1082. msg_info_main ("rspamd "
  1083. RVERSION
  1084. " is starting, build id: "
  1085. RID);
  1086. rspamd_main->cfg->cfg_name = rspamd_mempool_strdup (
  1087. rspamd_main->cfg->cfg_pool,
  1088. rspamd_main->cfg->cfg_name);
  1089. msg_info_main ("cpu features: %s",
  1090. rspamd_main->cfg->libs_ctx->crypto_ctx->cpu_extensions);
  1091. msg_info_main ("cryptobox configuration: curve25519(libsodium), "
  1092. "chacha20(%s), poly1305(libsodium), siphash(libsodium), blake2(libsodium), base64(%s)",
  1093. rspamd_main->cfg->libs_ctx->crypto_ctx->chacha20_impl,
  1094. rspamd_main->cfg->libs_ctx->crypto_ctx->base64_impl);
  1095. msg_info_main ("libottery prf: %s", ottery_get_impl_name ());
  1096. /* Daemonize */
  1097. if (!no_fork && daemon (0, 0) == -1) {
  1098. rspamd_fprintf (stderr, "Cannot daemonize\n");
  1099. exit (-errno);
  1100. }
  1101. /* Write info */
  1102. rspamd_main->pid = getpid ();
  1103. rspamd_main->type = type;
  1104. if (!valgrind_mode) {
  1105. rspamd_set_crash_handler (rspamd_main);
  1106. }
  1107. /* Ignore SIGPIPE as we handle write errors manually */
  1108. sigemptyset (&sigpipe_act.sa_mask);
  1109. sigaddset (&sigpipe_act.sa_mask, SIGPIPE);
  1110. sigpipe_act.sa_handler = SIG_IGN;
  1111. sigpipe_act.sa_flags = 0;
  1112. sigaction (SIGPIPE, &sigpipe_act, NULL);
  1113. if (rspamd_main->cfg->pid_file == NULL) {
  1114. msg_info_main ("pid file is not specified, skipping writing it");
  1115. skip_pid = TRUE;
  1116. }
  1117. else if (no_fork) {
  1118. msg_info_main ("skip writing pid in no-fork mode");
  1119. skip_pid = TRUE;
  1120. }
  1121. else if (rspamd_write_pid (rspamd_main) == -1) {
  1122. msg_err_main ("cannot write pid file %s", rspamd_main->cfg->pid_file);
  1123. exit (-errno);
  1124. }
  1125. /* Block signals to use sigsuspend in future */
  1126. sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL);
  1127. /* Set title */
  1128. setproctitle ("main process");
  1129. /* Flush log */
  1130. rspamd_log_flush (rspamd_main->logger);
  1131. /* Open control socket if needed */
  1132. control_fd = -1;
  1133. if (rspamd_main->cfg->control_socket_path) {
  1134. if (!rspamd_parse_inet_address (&control_addr,
  1135. rspamd_main->cfg->control_socket_path,
  1136. 0)) {
  1137. msg_err_main ("cannot parse inet address %s",
  1138. rspamd_main->cfg->control_socket_path);
  1139. }
  1140. else {
  1141. control_fd = rspamd_inet_address_listen (control_addr, SOCK_STREAM,
  1142. TRUE);
  1143. if (control_fd == -1) {
  1144. msg_err_main ("cannot open control socket at path: %s",
  1145. rspamd_main->cfg->control_socket_path);
  1146. }
  1147. }
  1148. }
  1149. /* Maybe read roll history */
  1150. if (rspamd_main->cfg->history_file) {
  1151. rspamd_roll_history_load (rspamd_main->history,
  1152. rspamd_main->cfg->history_file);
  1153. }
  1154. #if defined(WITH_GPERF_TOOLS)
  1155. ProfilerStop ();
  1156. #endif
  1157. /* Spawn workers */
  1158. rspamd_main->workers = g_hash_table_new (g_direct_hash, g_direct_equal);
  1159. /* Init event base */
  1160. event_loop = ev_default_loop (EVFLAG_SIGNALFD|EVBACKEND_ALL);
  1161. rspamd_main->event_loop = event_loop;
  1162. if (event_loop) {
  1163. unsigned loop_type = ev_backend (event_loop);
  1164. const gchar *loop_str = "unknown";
  1165. gboolean poor_backend = TRUE;
  1166. switch (loop_type) {
  1167. case EVBACKEND_EPOLL:
  1168. loop_str = "epoll";
  1169. poor_backend = FALSE;
  1170. break;
  1171. case EVBACKEND_POLL:
  1172. loop_str = "poll";
  1173. break;
  1174. case EVBACKEND_SELECT:
  1175. loop_str = "select";
  1176. break;
  1177. case EVBACKEND_KQUEUE:
  1178. loop_str = "kqueue";
  1179. poor_backend = FALSE;
  1180. break;
  1181. case EVBACKEND_PORT:
  1182. loop_str = "port";
  1183. poor_backend = FALSE;
  1184. break;
  1185. case EVBACKEND_DEVPOLL:
  1186. loop_str = "/dev/poll";
  1187. poor_backend = FALSE;
  1188. break;
  1189. default:
  1190. break;
  1191. }
  1192. if (poor_backend) {
  1193. msg_warn_main ("event loop uses non-optimal backend: %s", loop_str);
  1194. }
  1195. else {
  1196. msg_info_main ("event loop initialised with backend: %s", loop_str);
  1197. }
  1198. }
  1199. else {
  1200. msg_err ("cannot init event loop! exiting");
  1201. exit (EXIT_FAILURE);
  1202. }
  1203. /* Unblock signals */
  1204. sigemptyset (&signals.sa_mask);
  1205. sigprocmask (SIG_SETMASK, &signals.sa_mask, NULL);
  1206. /* Set events for signals */
  1207. ev_signal_init (&rspamd_main->term_ev, rspamd_term_handler, SIGTERM);
  1208. rspamd_main->term_ev.data = rspamd_main;
  1209. ev_signal_start (event_loop, &rspamd_main->term_ev);
  1210. ev_signal_init (&rspamd_main->int_ev, rspamd_term_handler, SIGINT);
  1211. rspamd_main->int_ev.data = rspamd_main;
  1212. ev_signal_start (event_loop, &rspamd_main->int_ev);
  1213. ev_signal_init (&rspamd_main->hup_ev, rspamd_hup_handler, SIGHUP);
  1214. rspamd_main->hup_ev.data = rspamd_main;
  1215. ev_signal_start (event_loop, &rspamd_main->hup_ev);
  1216. ev_signal_init (&rspamd_main->usr1_ev, rspamd_usr1_handler, SIGUSR1);
  1217. rspamd_main->usr1_ev.data = rspamd_main;
  1218. ev_signal_start (event_loop, &rspamd_main->usr1_ev);
  1219. /* Update proctitle according to number of messages processed */
  1220. static const ev_tstamp stat_update_time = 10.0;
  1221. memset (&old_stat, 0, sizeof (old_stat));
  1222. stat_ev.data = rspamd_main;
  1223. ev_timer_init (&stat_ev, rspamd_stat_update_handler,
  1224. stat_update_time, stat_update_time);
  1225. ev_timer_start (event_loop, &stat_ev);
  1226. rspamd_check_core_limits (rspamd_main);
  1227. rspamd_mempool_lock_mutex (rspamd_main->start_mtx);
  1228. spawn_workers (rspamd_main, event_loop);
  1229. rspamd_mempool_unlock_mutex (rspamd_main->start_mtx);
  1230. rspamd_main->http_ctx = rspamd_http_context_create (rspamd_main->cfg,
  1231. event_loop, rspamd_main->cfg->ups_ctx);
  1232. if (control_fd != -1) {
  1233. msg_info_main ("listening for control commands on %s",
  1234. rspamd_inet_address_to_string (control_addr));
  1235. ev_io_init (&control_ev, rspamd_control_handler, control_fd, EV_READ);
  1236. control_ev.data = rspamd_main;
  1237. ev_io_start (event_loop, &control_ev);
  1238. }
  1239. ev_loop (event_loop, 0);
  1240. /* Maybe save roll history */
  1241. if (rspamd_main->cfg->history_file) {
  1242. rspamd_roll_history_save (rspamd_main->history,
  1243. rspamd_main->cfg->history_file);
  1244. }
  1245. msg_info_main ("terminating...");
  1246. REF_RELEASE (rspamd_main->cfg);
  1247. rspamd_log_close (rspamd_main->logger, TRUE);
  1248. g_hash_table_unref (rspamd_main->spairs);
  1249. g_hash_table_unref (rspamd_main->workers);
  1250. rspamd_mempool_delete (rspamd_main->server_pool);
  1251. if (!skip_pid) {
  1252. rspamd_pidfile_close (rspamd_main->pfh);
  1253. }
  1254. g_free (rspamd_main);
  1255. ev_unref (event_loop);
  1256. sqlite3_shutdown ();
  1257. if (control_addr) {
  1258. rspamd_inet_address_free (control_addr);
  1259. }
  1260. return (res);
  1261. }