You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

worker_util.c 32KB


  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "rspamd.h"
  18. #include "lua/lua_common.h"
  19. #include "worker_util.h"
  20. #include "unix-std.h"
  21. #include "utlist.h"
  22. #include "ottery.h"
  23. #include "rspamd_control.h"
  24. #include "libutil/map.h"
  25. #include "libutil/map_private.h"
  26. #include "libutil/http_private.h"
  27. #include "libutil/http_router.h"
  28. #ifdef WITH_GPERF_TOOLS
  29. #include <gperftools/profiler.h>
  30. #endif
  31. /* sys/resource.h */
  32. #ifdef HAVE_SYS_RESOURCE_H
  33. #include <sys/resource.h>
  34. #endif
  35. /* pwd and grp */
  36. #ifdef HAVE_PWD_H
  37. #include <pwd.h>
  38. #endif
  39. #ifdef HAVE_GRP_H
  40. #include <grp.h>
  41. #endif
  42. #ifdef HAVE_LIBUTIL_H
  43. #include <libutil.h>
  44. #endif
  45. #include "zlib.h"
  46. #ifdef WITH_LIBUNWIND
  47. #define UNW_LOCAL_ONLY 1
  48. #include <libunwind.h>
  49. #define UNWIND_BACKTRACE_DEPTH 256
  50. #endif
  51. #ifdef HAVE_UCONTEXT_H
  52. #include <ucontext.h>
  53. #elif defined(HAVE_SYS_UCONTEXT_H)
  54. #include <sys/ucontext.h>
  55. #endif
  56. #include "contrib/libev/ev.h"
  57. static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *);
  58. /**
  59. * Return worker's control structure by its type
  60. * @param type
  61. * @return worker's control structure or NULL
  62. */
  63. worker_t *
  64. rspamd_get_worker_by_type (struct rspamd_config *cfg, GQuark type)
  65. {
  66. worker_t **pwrk;
  67. pwrk = cfg->compiled_workers;
  68. while (pwrk && *pwrk) {
  69. if (rspamd_check_worker (cfg, *pwrk)) {
  70. if (g_quark_from_string ((*pwrk)->name) == type) {
  71. return *pwrk;
  72. }
  73. }
  74. pwrk++;
  75. }
  76. return NULL;
  77. }
  78. static void
  79. rspamd_worker_check_finished (EV_P_ ev_timer *w, int revents)
  80. {
  81. int *pnchecks = (int *)w->data;
  82. if (*pnchecks > SOFT_SHUTDOWN_TIME * 10) {
  83. msg_warn ("terminating worker before finishing of terminate handlers");
  84. ev_break (EV_A_ EVBREAK_ONE);
  85. }
  86. else {
  87. int refcount = ev_active_cnt (EV_A);
  88. if (refcount == 1) {
  89. ev_break (EV_A_ EVBREAK_ONE);
  90. }
  91. }
  92. }
  93. static void
  94. rspamd_worker_terminate_handlers (struct rspamd_worker *w)
  95. {
  96. guint i;
  97. gboolean (*cb)(struct rspamd_worker *);
  98. struct rspamd_abstract_worker_ctx *actx;
  99. struct ev_loop *final_gift, *orig_loop;
  100. static ev_timer margin_call;
  101. static int nchecks = 0;
  102. if (w->finish_actions->len == 0) {
  103. /* Nothing to do */
  104. return;
  105. }
  106. actx = (struct rspamd_abstract_worker_ctx *)w->ctx;
  107. /*
  108. * Here are dragons:
  109. * - we create a new loop
  110. * - we set a new ev_loop for worker via injection over rspamd_abstract_worker_ctx
  111. * - then we run finish actions
  112. * - then we create a special timer to kill worker if it fails to finish
  113. */
  114. final_gift = ev_loop_new (EVBACKEND_ALL);
  115. orig_loop = actx->event_loop;
  116. actx->event_loop = final_gift;
  117. margin_call.data = &nchecks;
  118. ev_timer_init (&margin_call, rspamd_worker_check_finished, 0.1,
  119. 0.1);
  120. ev_timer_start (final_gift, &margin_call);
  121. for (i = 0; i < w->finish_actions->len; i ++) {
  122. cb = g_ptr_array_index (w->finish_actions, i);
  123. cb (w);
  124. }
  125. ev_run (final_gift, 0);
  126. ev_loop_destroy (final_gift);
  127. /* Restore original loop */
  128. actx->event_loop = orig_loop;
  129. }
  130. static void
  131. rspamd_worker_on_delayed_shutdown (EV_P_ ev_timer *w, int revents)
  132. {
  133. ev_break (loop, EVBREAK_ALL);
  134. #ifdef WITH_GPERF_TOOLS
  135. ProfilerStop ();
  136. #endif
  137. }
  138. /*
  139. * Config reload is designed by sending sigusr2 to active workers and pending shutdown of them
  140. */
  141. static gboolean
  142. rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
  143. {
  144. /* Do not accept new connections, preparing to end worker's process */
  145. struct timeval tv;
  146. if (!sigh->worker->wanna_die) {
  147. static ev_timer shutdown_ev;
  148. rspamd_worker_ignore_signal (sigh);
  149. tv.tv_sec = SOFT_SHUTDOWN_TIME;
  150. tv.tv_usec = 0;
  151. sigh->worker->wanna_die = TRUE;
  152. rspamd_worker_terminate_handlers (sigh->worker);
  153. rspamd_default_log_function (G_LOG_LEVEL_INFO,
  154. sigh->worker->srv->server_pool->tag.tagname,
  155. sigh->worker->srv->server_pool->tag.uid,
  156. G_STRFUNC,
  157. "worker's shutdown is pending in %d sec",
  158. SOFT_SHUTDOWN_TIME);
  159. ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
  160. SOFT_SHUTDOWN_TIME, 0.0);
  161. ev_timer_start (sigh->event_loop, &shutdown_ev);
  162. rspamd_worker_stop_accept (sigh->worker);
  163. }
  164. /* No more signals */
  165. return FALSE;
  166. }
  167. /*
  168. * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them
  169. */
  170. static gboolean
  171. rspamd_worker_usr1_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
  172. {
  173. rspamd_log_reopen (sigh->worker->srv->logger);
  174. /* Get more signals */
  175. return TRUE;
  176. }
  177. static gboolean
  178. rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
  179. {
  180. if (!sigh->worker->wanna_die) {
  181. static ev_timer shutdown_ev;
  182. rspamd_worker_ignore_signal (sigh);
  183. rspamd_default_log_function (G_LOG_LEVEL_INFO,
  184. sigh->worker->srv->server_pool->tag.tagname,
  185. sigh->worker->srv->server_pool->tag.uid,
  186. G_STRFUNC,
  187. "terminating after receiving signal %s",
  188. g_strsignal (sigh->signo));
  189. rspamd_worker_terminate_handlers (sigh->worker);
  190. sigh->worker->wanna_die = 1;
  191. ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
  192. 0.0, 0.0);
  193. ev_timer_start (sigh->event_loop, &shutdown_ev);
  194. rspamd_worker_stop_accept (sigh->worker);
  195. }
  196. /* Stop reacting on signals */
  197. return FALSE;
  198. }
  199. static void
  200. rspamd_worker_signal_handle (EV_P_ ev_signal *w, int revents)
  201. {
  202. struct rspamd_worker_signal_handler *sigh =
  203. (struct rspamd_worker_signal_handler *)w->data;
  204. struct rspamd_worker_signal_cb *cb, *cbtmp;
  205. /* Call all signal handlers registered */
  206. DL_FOREACH_SAFE (sigh->cb, cb, cbtmp) {
  207. if (!cb->handler (sigh, cb->handler_data)) {
  208. DL_DELETE (sigh->cb, cb);
  209. }
  210. }
  211. }
  212. static void
  213. rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *sigh)
  214. {
  215. sigset_t set;
  216. ev_signal_stop (sigh->event_loop, &sigh->ev_sig);
  217. sigemptyset (&set);
  218. sigaddset (&set, sigh->signo);
  219. sigprocmask (SIG_BLOCK, &set, NULL);
  220. }
  221. static void
  222. rspamd_worker_default_signal (int signo)
  223. {
  224. struct sigaction sig;
  225. sigemptyset (&sig.sa_mask);
  226. sigaddset (&sig.sa_mask, signo);
  227. sig.sa_handler = SIG_DFL;
  228. sig.sa_flags = 0;
  229. sigaction (signo, &sig, NULL);
  230. }
  231. static void
  232. rspamd_sigh_free (void *p)
  233. {
  234. struct rspamd_worker_signal_handler *sigh = p;
  235. struct rspamd_worker_signal_cb *cb, *tmp;
  236. DL_FOREACH_SAFE (sigh->cb, cb, tmp) {
  237. DL_DELETE (sigh->cb, cb);
  238. g_free (cb);
  239. }
  240. ev_signal_stop (sigh->event_loop, &sigh->ev_sig);
  241. rspamd_worker_default_signal (sigh->signo);
  242. g_free (sigh);
  243. }
  244. void
  245. rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker,
  246. struct ev_loop *event_loop,
  247. rspamd_worker_signal_handler handler,
  248. void *handler_data)
  249. {
  250. struct rspamd_worker_signal_handler *sigh;
  251. struct rspamd_worker_signal_cb *cb;
  252. sigh = g_hash_table_lookup (worker->signal_events, GINT_TO_POINTER (signo));
  253. if (sigh == NULL) {
  254. sigh = g_malloc0 (sizeof (*sigh));
  255. sigh->signo = signo;
  256. sigh->worker = worker;
  257. sigh->event_loop = event_loop;
  258. sigh->enabled = TRUE;
  259. sigh->ev_sig.data = sigh;
  260. ev_signal_init (&sigh->ev_sig, rspamd_worker_signal_handle, signo);
  261. ev_signal_start (event_loop, &sigh->ev_sig);
  262. g_hash_table_insert (worker->signal_events,
  263. GINT_TO_POINTER (signo),
  264. sigh);
  265. }
  266. cb = g_malloc0 (sizeof (*cb));
  267. cb->handler = handler;
  268. cb->handler_data = handler_data;
  269. DL_APPEND (sigh->cb, cb);
  270. }
  271. void
  272. rspamd_worker_init_signals (struct rspamd_worker *worker,
  273. struct ev_loop *event_loop)
  274. {
  275. /* A set of terminating signals */
  276. rspamd_worker_set_signal_handler (SIGTERM, worker, event_loop,
  277. rspamd_worker_term_handler, NULL);
  278. rspamd_worker_set_signal_handler (SIGINT, worker, event_loop,
  279. rspamd_worker_term_handler, NULL);
  280. rspamd_worker_set_signal_handler (SIGHUP, worker, event_loop,
  281. rspamd_worker_term_handler, NULL);
  282. /* Special purpose signals */
  283. rspamd_worker_set_signal_handler (SIGUSR1, worker, event_loop,
  284. rspamd_worker_usr1_handler, NULL);
  285. rspamd_worker_set_signal_handler (SIGUSR2, worker, event_loop,
  286. rspamd_worker_usr2_handler, NULL);
  287. }
  288. struct ev_loop *
  289. rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
  290. rspamd_accept_handler hdl)
  291. {
  292. struct ev_loop *event_loop;
  293. GList *cur;
  294. struct rspamd_worker_listen_socket *ls;
  295. struct rspamd_worker_accept_event *accept_ev;
  296. #ifdef WITH_PROFILER
  297. extern void _start (void), etext (void);
  298. monstartup ((u_long) & _start, (u_long) & etext);
  299. #endif
  300. gperf_profiler_init (worker->srv->cfg, name);
  301. worker->signal_events = g_hash_table_new_full (g_direct_hash, g_direct_equal,
  302. NULL, rspamd_sigh_free);
  303. event_loop = ev_loop_new (EVFLAG_SIGNALFD);
  304. worker->srv->event_loop = event_loop;
  305. rspamd_worker_init_signals (worker, event_loop);
  306. rspamd_control_worker_add_default_handler (worker, event_loop);
  307. #ifdef WITH_HIREDIS
  308. rspamd_redis_pool_config (worker->srv->cfg->redis_pool,
  309. worker->srv->cfg, event_loop);
  310. #endif
  311. /* Accept all sockets */
  312. if (hdl) {
  313. cur = worker->cf->listen_socks;
  314. while (cur) {
  315. ls = cur->data;
  316. if (ls->fd != -1) {
  317. accept_ev = g_malloc0 (sizeof (*accept_ev));
  318. accept_ev->event_loop = event_loop;
  319. accept_ev->accept_ev.data = worker;
  320. ev_io_init (&accept_ev->accept_ev, hdl, ls->fd, EV_READ);
  321. ev_io_start (event_loop, &accept_ev->accept_ev);
  322. DL_APPEND (worker->accept_events, accept_ev);
  323. }
  324. cur = g_list_next (cur);
  325. }
  326. }
  327. return event_loop;
  328. }
  329. void
  330. rspamd_worker_stop_accept (struct rspamd_worker *worker)
  331. {
  332. struct rspamd_worker_accept_event *cur, *tmp;
  333. /* Remove all events */
  334. DL_FOREACH_SAFE (worker->accept_events, cur, tmp) {
  335. if (ev_is_active (&cur->accept_ev) || ev_is_pending (&cur->accept_ev)) {
  336. ev_io_stop (cur->event_loop, &cur->accept_ev);
  337. }
  338. if (ev_is_active (&cur->throttling_ev) || ev_is_pending (&cur->throttling_ev)) {
  339. ev_timer_stop (cur->event_loop, &cur->throttling_ev);
  340. }
  341. g_free (cur);
  342. }
  343. /* XXX: we need to do it much later */
  344. #if 0
  345. g_hash_table_iter_init (&it, worker->signal_events);
  346. while (g_hash_table_iter_next (&it, &k, &v)) {
  347. sigh = (struct rspamd_worker_signal_handler *)v;
  348. g_hash_table_iter_steal (&it);
  349. if (sigh->enabled) {
  350. event_del (&sigh->ev);
  351. }
  352. g_free (sigh);
  353. }
  354. g_hash_table_unref (worker->signal_events);
  355. #endif
  356. }
  357. static rspamd_fstring_t *
  358. rspamd_controller_maybe_compress (struct rspamd_http_connection_entry *entry,
  359. rspamd_fstring_t *buf, struct rspamd_http_message *msg)
  360. {
  361. if (entry->support_gzip) {
  362. if (rspamd_fstring_gzip (&buf)) {
  363. rspamd_http_message_add_header (msg, "Content-Encoding", "gzip");
  364. }
  365. }
  366. return buf;
  367. }
  368. void
  369. rspamd_controller_send_error (struct rspamd_http_connection_entry *entry,
  370. gint code, const gchar *error_msg, ...)
  371. {
  372. struct rspamd_http_message *msg;
  373. va_list args;
  374. rspamd_fstring_t *reply;
  375. msg = rspamd_http_new_message (HTTP_RESPONSE);
  376. va_start (args, error_msg);
  377. msg->status = rspamd_fstring_new ();
  378. rspamd_vprintf_fstring (&msg->status, error_msg, args);
  379. va_end (args);
  380. msg->date = time (NULL);
  381. msg->code = code;
  382. reply = rspamd_fstring_sized_new (msg->status->len + 16);
  383. rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status);
  384. rspamd_http_message_set_body_from_fstring_steal (msg,
  385. rspamd_controller_maybe_compress (entry, reply, msg));
  386. rspamd_http_connection_reset (entry->conn);
  387. rspamd_http_router_insert_headers (entry->rt, msg);
  388. rspamd_http_connection_write_message (entry->conn,
  389. msg,
  390. NULL,
  391. "application/json",
  392. entry,
  393. entry->rt->timeout);
  394. entry->is_reply = TRUE;
  395. }
  396. void
  397. rspamd_controller_send_string (struct rspamd_http_connection_entry *entry,
  398. const gchar *str)
  399. {
  400. struct rspamd_http_message *msg;
  401. rspamd_fstring_t *reply;
  402. msg = rspamd_http_new_message (HTTP_RESPONSE);
  403. msg->date = time (NULL);
  404. msg->code = 200;
  405. msg->status = rspamd_fstring_new_init ("OK", 2);
  406. if (str) {
  407. reply = rspamd_fstring_new_init (str, strlen (str));
  408. }
  409. else {
  410. reply = rspamd_fstring_new_init ("null", 4);
  411. }
  412. rspamd_http_message_set_body_from_fstring_steal (msg,
  413. rspamd_controller_maybe_compress (entry, reply, msg));
  414. rspamd_http_connection_reset (entry->conn);
  415. rspamd_http_router_insert_headers (entry->rt, msg);
  416. rspamd_http_connection_write_message (entry->conn,
  417. msg,
  418. NULL,
  419. "application/json",
  420. entry,
  421. entry->rt->timeout);
  422. entry->is_reply = TRUE;
  423. }
  424. void
  425. rspamd_controller_send_ucl (struct rspamd_http_connection_entry *entry,
  426. ucl_object_t *obj)
  427. {
  428. struct rspamd_http_message *msg;
  429. rspamd_fstring_t *reply;
  430. msg = rspamd_http_new_message (HTTP_RESPONSE);
  431. msg->date = time (NULL);
  432. msg->code = 200;
  433. msg->status = rspamd_fstring_new_init ("OK", 2);
  434. reply = rspamd_fstring_sized_new (BUFSIZ);
  435. rspamd_ucl_emit_fstring (obj, UCL_EMIT_JSON_COMPACT, &reply);
  436. rspamd_http_message_set_body_from_fstring_steal (msg,
  437. rspamd_controller_maybe_compress (entry, reply, msg));
  438. rspamd_http_connection_reset (entry->conn);
  439. rspamd_http_router_insert_headers (entry->rt, msg);
  440. rspamd_http_connection_write_message (entry->conn,
  441. msg,
  442. NULL,
  443. "application/json",
  444. entry,
  445. entry->rt->timeout);
  446. entry->is_reply = TRUE;
  447. }
  448. static void
  449. rspamd_worker_drop_priv (struct rspamd_main *rspamd_main)
  450. {
  451. if (rspamd_main->is_privilleged) {
  452. if (setgid (rspamd_main->workers_gid) == -1) {
  453. msg_err_main ("cannot setgid to %d (%s), aborting",
  454. (gint) rspamd_main->workers_gid,
  455. strerror (errno));
  456. exit (-errno);
  457. }
  458. if (rspamd_main->cfg->rspamd_user &&
  459. initgroups (rspamd_main->cfg->rspamd_user,
  460. rspamd_main->workers_gid) == -1) {
  461. msg_err_main ("initgroups failed (%s), aborting", strerror (errno));
  462. exit (-errno);
  463. }
  464. if (setuid (rspamd_main->workers_uid) == -1) {
  465. msg_err_main ("cannot setuid to %d (%s), aborting",
  466. (gint) rspamd_main->workers_uid,
  467. strerror (errno));
  468. exit (-errno);
  469. }
  470. }
  471. }
  472. static void
  473. rspamd_worker_set_limits (struct rspamd_main *rspamd_main,
  474. struct rspamd_worker_conf *cf)
  475. {
  476. struct rlimit rlmt;
  477. if (cf->rlimit_nofile != 0) {
  478. rlmt.rlim_cur = (rlim_t) cf->rlimit_nofile;
  479. rlmt.rlim_max = (rlim_t) cf->rlimit_nofile;
  480. if (setrlimit (RLIMIT_NOFILE, &rlmt) == -1) {
  481. msg_warn_main ("cannot set files rlimit: %L, %s",
  482. cf->rlimit_nofile,
  483. strerror (errno));
  484. }
  485. memset (&rlmt, 0, sizeof (rlmt));
  486. if (getrlimit (RLIMIT_NOFILE, &rlmt) == -1) {
  487. msg_warn_main ("cannot get max files rlimit: %HL, %s",
  488. cf->rlimit_maxcore,
  489. strerror (errno));
  490. }
  491. else {
  492. msg_info_main ("set max file descriptors limit: %HL cur and %HL max",
  493. (guint64) rlmt.rlim_cur,
  494. (guint64) rlmt.rlim_max);
  495. }
  496. }
  497. else {
  498. /* Just report */
  499. if (getrlimit (RLIMIT_NOFILE, &rlmt) == -1) {
  500. msg_warn_main ("cannot get max files rlimit: %HL, %s",
  501. cf->rlimit_maxcore,
  502. strerror (errno));
  503. }
  504. else {
  505. msg_info_main ("use system max file descriptors limit: %HL cur and %HL max",
  506. (guint64) rlmt.rlim_cur,
  507. (guint64) rlmt.rlim_max);
  508. }
  509. }
  510. if (rspamd_main->cores_throttling) {
  511. msg_info_main ("disable core files for the new worker as limits are reached");
  512. rlmt.rlim_cur = 0;
  513. rlmt.rlim_max = 0;
  514. if (setrlimit (RLIMIT_CORE, &rlmt) == -1) {
  515. msg_warn_main ("cannot disable core dumps: error when setting limits: %s",
  516. strerror (errno));
  517. }
  518. }
  519. else {
  520. if (cf->rlimit_maxcore != 0) {
  521. rlmt.rlim_cur = (rlim_t) cf->rlimit_maxcore;
  522. rlmt.rlim_max = (rlim_t) cf->rlimit_maxcore;
  523. if (setrlimit (RLIMIT_CORE, &rlmt) == -1) {
  524. msg_warn_main ("cannot set max core size limit: %HL, %s",
  525. cf->rlimit_maxcore,
  526. strerror (errno));
  527. }
  528. /* Ensure that we did it */
  529. memset (&rlmt, 0, sizeof (rlmt));
  530. if (getrlimit (RLIMIT_CORE, &rlmt) == -1) {
  531. msg_warn_main ("cannot get max core size rlimit: %HL, %s",
  532. cf->rlimit_maxcore,
  533. strerror (errno));
  534. }
  535. else {
  536. if (rlmt.rlim_cur != cf->rlimit_maxcore ||
  537. rlmt.rlim_max != cf->rlimit_maxcore) {
  538. msg_warn_main ("setting of core file limits was unsuccessful: "
  539. "%HL was wanted, "
  540. "but we have %HL cur and %HL max",
  541. cf->rlimit_maxcore,
  542. (guint64) rlmt.rlim_cur,
  543. (guint64) rlmt.rlim_max);
  544. }
  545. else {
  546. msg_info_main ("set max core size limit: %HL cur and %HL max",
  547. (guint64) rlmt.rlim_cur,
  548. (guint64) rlmt.rlim_max);
  549. }
  550. }
  551. }
  552. else {
  553. /* Just report */
  554. if (getrlimit (RLIMIT_CORE, &rlmt) == -1) {
  555. msg_warn_main ("cannot get max core size limit: %HL, %s",
  556. cf->rlimit_maxcore,
  557. strerror (errno));
  558. }
  559. else {
  560. msg_info_main ("use system max core size limit: %HL cur and %HL max",
  561. (guint64) rlmt.rlim_cur,
  562. (guint64) rlmt.rlim_max);
  563. }
  564. }
  565. }
  566. }
  567. static void
  568. rspamd_worker_on_term (EV_P_ ev_child *w, int revents)
  569. {
  570. struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
  571. if (wrk->ppid == getpid ()) {
  572. if (wrk->term_handler) {
  573. wrk->term_handler (EV_A_ w, wrk->srv, wrk);
  574. }
  575. else {
  576. rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
  577. }
  578. }
  579. else {
  580. /* Ignore SIGCHLD for not our children... */
  581. }
  582. }
  583. struct rspamd_worker *
  584. rspamd_fork_worker (struct rspamd_main *rspamd_main,
  585. struct rspamd_worker_conf *cf,
  586. guint index,
  587. struct ev_loop *ev_base,
  588. rspamd_worker_term_cb term_handler)
  589. {
  590. struct rspamd_worker *wrk;
  591. gint rc;
  592. struct rlimit rlim;
  593. /* Starting worker process */
  594. wrk = (struct rspamd_worker *) g_malloc0 (sizeof (struct rspamd_worker));
  595. if (!rspamd_socketpair (wrk->control_pipe, 0)) {
  596. msg_err ("socketpair failure: %s", strerror (errno));
  597. rspamd_hard_terminate (rspamd_main);
  598. }
  599. if (!rspamd_socketpair (wrk->srv_pipe, 0)) {
  600. msg_err ("socketpair failure: %s", strerror (errno));
  601. rspamd_hard_terminate (rspamd_main);
  602. }
  603. wrk->srv = rspamd_main;
  604. wrk->type = cf->type;
  605. wrk->cf = cf;
  606. wrk->flags = cf->worker->flags;
  607. REF_RETAIN (cf);
  608. wrk->index = index;
  609. wrk->ctx = cf->ctx;
  610. wrk->finish_actions = g_ptr_array_new ();
  611. wrk->ppid = getpid ();
  612. wrk->pid = fork ();
  613. wrk->cores_throttled = rspamd_main->cores_throttling;
  614. wrk->term_handler = term_handler;
  615. switch (wrk->pid) {
  616. case 0:
  617. /* Update pid for logging */
  618. rspamd_log_update_pid (cf->type, rspamd_main->logger);
  619. wrk->pid = getpid ();
  620. /* Init PRNG after fork */
  621. rc = ottery_init (rspamd_main->cfg->libs_ctx->ottery_cfg);
  622. if (rc != OTTERY_ERR_NONE) {
  623. msg_err_main ("cannot initialize PRNG: %d", rc);
  624. abort ();
  625. }
  626. rspamd_random_seed_fast ();
  627. #ifdef HAVE_EVUTIL_RNG_INIT
  628. evutil_secure_rng_init ();
  629. #endif
  630. /*
  631. * Libev stores all signals in a global table, so
  632. * previous handlers must be explicitly detached and forgotten
  633. * before starting a new loop
  634. */
  635. ev_signal_stop (rspamd_main->event_loop, &rspamd_main->int_ev);
  636. ev_signal_stop (rspamd_main->event_loop, &rspamd_main->term_ev);
  637. ev_signal_stop (rspamd_main->event_loop, &rspamd_main->hup_ev);
  638. ev_signal_stop (rspamd_main->event_loop, &rspamd_main->usr1_ev);
  639. /* Remove the inherited event base */
  640. ev_loop_destroy (rspamd_main->event_loop);
  641. rspamd_main->event_loop = NULL;
  642. /* Drop privileges */
  643. rspamd_worker_drop_priv (rspamd_main);
  644. /* Set limits */
  645. rspamd_worker_set_limits (rspamd_main, cf);
  646. /* Re-set stack limit */
  647. getrlimit (RLIMIT_STACK, &rlim);
  648. rlim.rlim_cur = 100 * 1024 * 1024;
  649. rlim.rlim_max = rlim.rlim_cur;
  650. setrlimit (RLIMIT_STACK, &rlim);
  651. if (cf->bind_conf) {
  652. setproctitle ("%s process (%s)", cf->worker->name,
  653. cf->bind_conf->bind_line);
  654. }
  655. else {
  656. setproctitle ("%s process", cf->worker->name);
  657. }
  658. if (rspamd_main->pfh) {
  659. rspamd_pidfile_close (rspamd_main->pfh);
  660. }
  661. /* Do silent log reopen to avoid collisions */
  662. rspamd_log_close (rspamd_main->logger, FALSE);
  663. if (rspamd_main->cfg->log_silent_workers) {
  664. rspamd_main->cfg->log_level = G_LOG_LEVEL_MESSAGE;
  665. rspamd_set_logger (rspamd_main->cfg, cf->type,
  666. &rspamd_main->logger, rspamd_main->server_pool);
  667. }
  668. rspamd_log_open (rspamd_main->logger);
  669. wrk->start_time = rspamd_get_calendar_ticks ();
  670. if (cf->bind_conf) {
  671. msg_info_main ("starting %s process %P (%d); listen on: %s",
  672. cf->worker->name,
  673. getpid (), index, cf->bind_conf->bind_line);
  674. }
  675. else {
  676. msg_info_main ("starting %s process %P (%d)", cf->worker->name,
  677. getpid (), index);
  678. }
  679. /* Close parent part of socketpair */
  680. close (wrk->control_pipe[0]);
  681. close (wrk->srv_pipe[0]);
  682. rspamd_socket_nonblocking (wrk->control_pipe[1]);
  683. rspamd_socket_nonblocking (wrk->srv_pipe[1]);
  684. /* Execute worker */
  685. cf->worker->worker_start_func (wrk);
  686. exit (EXIT_FAILURE);
  687. break;
  688. case -1:
  689. msg_err_main ("cannot fork main process. %s", strerror (errno));
  690. if (rspamd_main->pfh) {
  691. rspamd_pidfile_remove (rspamd_main->pfh);
  692. }
  693. rspamd_hard_terminate (rspamd_main);
  694. break;
  695. default:
  696. /* Close worker part of socketpair */
  697. close (wrk->control_pipe[1]);
  698. close (wrk->srv_pipe[1]);
  699. rspamd_socket_nonblocking (wrk->control_pipe[0]);
  700. rspamd_socket_nonblocking (wrk->srv_pipe[0]);
  701. rspamd_srv_start_watching (rspamd_main, wrk, ev_base);
  702. wrk->cld_ev.data = wrk;
  703. ev_child_init (&wrk->cld_ev, rspamd_worker_on_term, wrk->pid, 0);
  704. ev_child_start (rspamd_main->event_loop, &wrk->cld_ev);
  705. /* Insert worker into worker's table, pid is index */
  706. g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (
  707. wrk->pid), wrk);
  708. break;
  709. }
  710. return wrk;
  711. }
  712. void
  713. rspamd_worker_block_signals (void)
  714. {
  715. sigset_t set;
  716. sigemptyset (&set);
  717. sigaddset (&set, SIGTERM);
  718. sigaddset (&set, SIGINT);
  719. sigaddset (&set, SIGHUP);
  720. sigaddset (&set, SIGUSR1);
  721. sigaddset (&set, SIGUSR2);
  722. sigprocmask (SIG_BLOCK, &set, NULL);
  723. }
  724. void
  725. rspamd_worker_unblock_signals (void)
  726. {
  727. sigset_t set;
  728. sigemptyset (&set);
  729. sigaddset (&set, SIGTERM);
  730. sigaddset (&set, SIGINT);
  731. sigaddset (&set, SIGHUP);
  732. sigaddset (&set, SIGUSR1);
  733. sigaddset (&set, SIGUSR2);
  734. sigprocmask (SIG_UNBLOCK, &set, NULL);
  735. }
  736. void
  737. rspamd_hard_terminate (struct rspamd_main *rspamd_main)
  738. {
  739. GHashTableIter it;
  740. gpointer k, v;
  741. struct rspamd_worker *w;
  742. sigset_t set;
  743. /* Block all signals */
  744. sigemptyset (&set);
  745. sigaddset (&set, SIGTERM);
  746. sigaddset (&set, SIGINT);
  747. sigaddset (&set, SIGHUP);
  748. sigaddset (&set, SIGUSR1);
  749. sigaddset (&set, SIGUSR2);
  750. sigaddset (&set, SIGCHLD);
  751. sigprocmask (SIG_BLOCK, &set, NULL);
  752. /* We need to terminate all workers that might be already spawned */
  753. rspamd_worker_block_signals ();
  754. g_hash_table_iter_init (&it, rspamd_main->workers);
  755. while (g_hash_table_iter_next (&it, &k, &v)) {
  756. w = v;
  757. msg_err_main ("kill worker %P as Rspamd is terminating due to "
  758. "an unrecoverable error", w->pid);
  759. kill (w->pid, SIGKILL);
  760. }
  761. msg_err_main ("shutting down Rspamd due to fatal error");
  762. rspamd_log_close (rspamd_main->logger, TRUE);
  763. exit (EXIT_FAILURE);
  764. }
  765. gboolean
  766. rspamd_worker_is_scanner (struct rspamd_worker *w)
  767. {
  768. if (w) {
  769. return !!(w->flags & RSPAMD_WORKER_SCANNER);
  770. }
  771. return FALSE;
  772. }
  773. gboolean
  774. rspamd_worker_is_primary_controller (struct rspamd_worker *w)
  775. {
  776. if (w) {
  777. return !!(w->flags & RSPAMD_WORKER_CONTROLLER) && w->index == 0;
  778. }
  779. return FALSE;
  780. }
  781. struct rspamd_worker_session_elt {
  782. void *ptr;
  783. guint *pref;
  784. const gchar *tag;
  785. time_t when;
  786. };
  787. struct rspamd_worker_session_cache {
  788. struct ev_loop *ev_base;
  789. GHashTable *cache;
  790. struct rspamd_config *cfg;
  791. struct ev_timer periodic;
  792. };
  793. static gint
  794. rspamd_session_cache_sort_cmp (gconstpointer pa, gconstpointer pb)
  795. {
  796. const struct rspamd_worker_session_elt
  797. *e1 = *(const struct rspamd_worker_session_elt **)pa,
  798. *e2 = *(const struct rspamd_worker_session_elt **)pb;
  799. return e2->when < e1->when;
  800. }
  801. static void
  802. rspamd_sessions_cache_periodic (EV_P_ ev_timer *w, int revents)
  803. {
  804. struct rspamd_worker_session_cache *c =
  805. (struct rspamd_worker_session_cache *)w->data;
  806. GHashTableIter it;
  807. gchar timebuf[32];
  808. gpointer k, v;
  809. struct rspamd_worker_session_elt *elt;
  810. struct tm tms;
  811. GPtrArray *res;
  812. guint i;
  813. if (g_hash_table_size (c->cache) > c->cfg->max_sessions_cache) {
  814. res = g_ptr_array_sized_new (g_hash_table_size (c->cache));
  815. g_hash_table_iter_init (&it, c->cache);
  816. while (g_hash_table_iter_next (&it, &k, &v)) {
  817. g_ptr_array_add (res, v);
  818. }
  819. msg_err ("sessions cache is overflowed %d elements where %d is limit",
  820. (gint)res->len, (gint)c->cfg->max_sessions_cache);
  821. g_ptr_array_sort (res, rspamd_session_cache_sort_cmp);
  822. PTR_ARRAY_FOREACH (res, i, elt) {
  823. rspamd_localtime (elt->when, &tms);
  824. strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tms);
  825. msg_warn ("redundant session; ptr: %p, "
  826. "tag: %s, refcount: %d, time: %s",
  827. elt->ptr, elt->tag ? elt->tag : "unknown",
  828. elt->pref ? *elt->pref : 0,
  829. timebuf);
  830. }
  831. }
  832. ev_timer_again (EV_A_ w);
  833. }
  834. void *
  835. rspamd_worker_session_cache_new (struct rspamd_worker *w,
  836. struct ev_loop *ev_base)
  837. {
  838. struct rspamd_worker_session_cache *c;
  839. static const gdouble periodic_interval = 60.0;
  840. c = g_malloc0 (sizeof (*c));
  841. c->ev_base = ev_base;
  842. c->cache = g_hash_table_new_full (g_direct_hash, g_direct_equal,
  843. NULL, g_free);
  844. c->cfg = w->srv->cfg;
  845. c->periodic.data = c;
  846. ev_timer_init (&c->periodic, rspamd_sessions_cache_periodic, periodic_interval,
  847. periodic_interval);
  848. ev_timer_start (ev_base, &c->periodic);
  849. return c;
  850. }
  851. void
  852. rspamd_worker_session_cache_add (void *cache, const gchar *tag,
  853. guint *pref, void *ptr)
  854. {
  855. struct rspamd_worker_session_cache *c = cache;
  856. struct rspamd_worker_session_elt *elt;
  857. elt = g_malloc0 (sizeof (*elt));
  858. elt->pref = pref;
  859. elt->ptr = ptr;
  860. elt->tag = tag;
  861. elt->when = time (NULL);
  862. g_hash_table_insert (c->cache, elt->ptr, elt);
  863. }
  864. void
  865. rspamd_worker_session_cache_remove (void *cache, void *ptr)
  866. {
  867. struct rspamd_worker_session_cache *c = cache;
  868. g_hash_table_remove (c->cache, ptr);
  869. }
  870. static void
  871. rspamd_worker_monitored_on_change (struct rspamd_monitored_ctx *ctx,
  872. struct rspamd_monitored *m, gboolean alive,
  873. void *ud)
  874. {
  875. struct rspamd_worker *worker = ud;
  876. struct rspamd_config *cfg = worker->srv->cfg;
  877. struct ev_loop *ev_base;
  878. guchar tag[RSPAMD_MONITORED_TAG_LEN];
  879. static struct rspamd_srv_command srv_cmd;
  880. rspamd_monitored_get_tag (m, tag);
  881. ev_base = rspamd_monitored_ctx_get_ev_base (ctx);
  882. memset (&srv_cmd, 0, sizeof (srv_cmd));
  883. srv_cmd.type = RSPAMD_SRV_MONITORED_CHANGE;
  884. rspamd_strlcpy (srv_cmd.cmd.monitored_change.tag, tag,
  885. sizeof (srv_cmd.cmd.monitored_change.tag));
  886. srv_cmd.cmd.monitored_change.alive = alive;
  887. srv_cmd.cmd.monitored_change.sender = getpid ();
  888. msg_info_config ("broadcast monitored update for %s: %s",
  889. srv_cmd.cmd.monitored_change.tag, alive ? "alive" : "dead");
  890. rspamd_srv_send_command (worker, ev_base, &srv_cmd, -1, NULL, NULL);
  891. }
  892. void
  893. rspamd_worker_init_monitored (struct rspamd_worker *worker,
  894. struct ev_loop *ev_base,
  895. struct rspamd_dns_resolver *resolver)
  896. {
  897. rspamd_monitored_ctx_config (worker->srv->cfg->monitored_ctx,
  898. worker->srv->cfg, ev_base, resolver->r,
  899. rspamd_worker_monitored_on_change, worker);
  900. }
  901. #ifdef HAVE_SA_SIGINFO
  902. #ifdef WITH_LIBUNWIND
  903. static void
  904. rspamd_print_crash (ucontext_t *uap)
  905. {
  906. unw_cursor_t cursor;
  907. unw_word_t ip, off;
  908. guint level;
  909. gint ret;
  910. if ((ret = unw_init_local (&cursor, uap)) != 0) {
  911. msg_err ("unw_init_local: %d", ret);
  912. return;
  913. }
  914. level = 0;
  915. ret = 0;
  916. for (;;) {
  917. char name[128];
  918. if (level >= UNWIND_BACKTRACE_DEPTH) {
  919. break;
  920. }
  921. unw_get_reg (&cursor, UNW_REG_IP, &ip);
  922. ret = unw_get_proc_name(&cursor, name, sizeof (name), &off);
  923. if (ret == 0) {
  924. msg_err ("%d: %p: %s()+0x%xl",
  925. level, ip, name, (uintptr_t)off);
  926. } else {
  927. msg_err ("%d: %p: <unknown>", level, ip);
  928. }
  929. level++;
  930. ret = unw_step (&cursor);
  931. if (ret <= 0) {
  932. break;
  933. }
  934. }
  935. if (ret < 0) {
  936. msg_err ("unw_step_ptr: %d", ret);
  937. }
  938. }
  939. #endif
  940. static struct rspamd_main *saved_main = NULL;
  941. static gboolean
  942. rspamd_crash_propagate (gpointer key, gpointer value, gpointer unused)
  943. {
  944. struct rspamd_worker *w = value;
  945. /* Kill children softly */
  946. kill (w->pid, SIGTERM);
  947. return TRUE;
  948. }
  949. static void
  950. rspamd_crash_sig_handler (int sig, siginfo_t *info, void *ctx)
  951. {
  952. struct sigaction sa;
  953. ucontext_t *uap = ctx;
  954. pid_t pid;
  955. pid = getpid ();
  956. msg_err ("caught fatal signal %d(%s), "
  957. "pid: %P, trace: ",
  958. sig, strsignal (sig), pid);
  959. (void)uap;
  960. #ifdef WITH_LIBUNWIND
  961. rspamd_print_crash (uap);
  962. #endif
  963. if (saved_main) {
  964. if (pid == saved_main->pid) {
  965. /*
  966. * Main process has crashed, propagate crash further to trigger
  967. * monitoring alerts and mass panic
  968. */
  969. g_hash_table_foreach_remove (saved_main->workers,
  970. rspamd_crash_propagate, NULL);
  971. }
  972. }
  973. /*
  974. * Invoke signal with the default handler
  975. */
  976. sigemptyset (&sa.sa_mask);
  977. sa.sa_handler = SIG_DFL;
  978. sa.sa_flags = 0;
  979. sigaction (sig, &sa, NULL);
  980. kill (pid, sig);
  981. }
  982. #endif
  983. void
  984. rspamd_set_crash_handler (struct rspamd_main *rspamd_main)
  985. {
  986. #ifdef HAVE_SA_SIGINFO
  987. struct sigaction sa;
  988. #ifdef HAVE_SIGALTSTACK
  989. stack_t ss;
  990. memset (&ss, 0, sizeof ss);
  991. /* Allocate special stack, NOT freed at the end so far */
  992. ss.ss_size = MAX (SIGSTKSZ, 8192 * 4);
  993. ss.ss_sp = g_malloc0 (ss.ss_size);
  994. sigaltstack (&ss, NULL);
  995. #endif
  996. saved_main = rspamd_main;
  997. sigemptyset (&sa.sa_mask);
  998. sa.sa_sigaction = &rspamd_crash_sig_handler;
  999. sa.sa_flags = SA_RESTART | SA_SIGINFO | SA_ONSTACK;
  1000. sigaction (SIGSEGV, &sa, NULL);
  1001. sigaction (SIGBUS, &sa, NULL);
  1002. sigaction (SIGABRT, &sa, NULL);
  1003. sigaction (SIGFPE, &sa, NULL);
  1004. sigaction (SIGSYS, &sa, NULL);
  1005. #endif
  1006. }
  1007. static void
  1008. rspamd_enable_accept_event (EV_P_ ev_timer *w, int revents)
  1009. {
  1010. struct rspamd_worker_accept_event *ac_ev =
  1011. (struct rspamd_worker_accept_event *)w->data;
  1012. ev_timer_stop (EV_A_ w);
  1013. ev_io_start (EV_A_ &ac_ev->accept_ev);
  1014. }
  1015. void
  1016. rspamd_worker_throttle_accept_events (gint sock, void *data)
  1017. {
  1018. struct rspamd_worker_accept_event *head, *cur;
  1019. const gdouble throttling = 0.5;
  1020. head = (struct rspamd_worker_accept_event *)data;
  1021. DL_FOREACH (head, cur) {
  1022. ev_io_stop (cur->event_loop, &cur->accept_ev);
  1023. cur->throttling_ev.data = cur;
  1024. ev_timer_init (&cur->throttling_ev, rspamd_enable_accept_event,
  1025. throttling, 0.0);
  1026. ev_timer_start (cur->event_loop, &cur->throttling_ev);
  1027. }
  1028. }
  1029. gboolean
  1030. rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
  1031. struct rspamd_worker *wrk,
  1032. int res)
  1033. {
  1034. gboolean need_refork = TRUE;
  1035. if (wrk->wanna_die || rspamd_main->wanna_die) {
  1036. /* Do not refork workers that are intended to be terminated */
  1037. need_refork = FALSE;
  1038. }
  1039. if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
  1040. /* Normal worker termination, do not fork one more */
  1041. msg_info_main ("%s process %P terminated normally",
  1042. g_quark_to_string (wrk->type),
  1043. wrk->pid);
  1044. need_refork = FALSE;
  1045. }
  1046. else {
  1047. if (WIFSIGNALED (res)) {
  1048. #ifdef WCOREDUMP
  1049. if (WCOREDUMP (res)) {
  1050. msg_warn_main (
  1051. "%s process %P terminated abnormally by signal: %s"
  1052. " and created core file",
  1053. g_quark_to_string (wrk->type),
  1054. wrk->pid,
  1055. g_strsignal (WTERMSIG (res)));
  1056. }
  1057. else {
  1058. #ifdef HAVE_SYS_RESOURCE_H
  1059. struct rlimit rlmt;
  1060. (void) getrlimit (RLIMIT_CORE, &rlmt);
  1061. msg_warn_main (
  1062. "%s process %P terminated abnormally by signal: %s"
  1063. " but NOT created core file (throttled=%s); "
  1064. "core file limits: %L current, %L max",
  1065. g_quark_to_string (wrk->type),
  1066. wrk->pid,
  1067. g_strsignal (WTERMSIG (res)),
  1068. wrk->cores_throttled ? "yes" : "no",
  1069. (gint64) rlmt.rlim_cur,
  1070. (gint64) rlmt.rlim_max);
  1071. #else
  1072. msg_warn_main (
  1073. "%s process %P terminated abnormally by signal: %s"
  1074. " but NOT created core file (throttled=%s); ",
  1075. g_quark_to_string (wrk->type),
  1076. wrk->pid,
  1077. g_strsignal (WTERMSIG (res)),
  1078. wrk->cores_throttled ? "yes" : "no");
  1079. #endif
  1080. }
  1081. #else
  1082. msg_warn_main (
  1083. "%s process %P terminated abnormally by signal: %s",
  1084. g_quark_to_string (wrk->type),
  1085. wrk->pid,
  1086. g_strsignal (WTERMSIG (res)));
  1087. #endif
  1088. if (WTERMSIG (res) == SIGUSR2) {
  1089. /*
  1090. * It is actually race condition when not started process
  1091. * has been requested to be reloaded.
  1092. *
  1093. * We shouldn't refork on this
  1094. */
  1095. need_refork = FALSE;
  1096. }
  1097. }
  1098. else {
  1099. msg_warn_main ("%s process %P terminated abnormally "
  1100. "with exit code %d",
  1101. g_quark_to_string (wrk->type),
  1102. wrk->pid,
  1103. WEXITSTATUS (res));
  1104. }
  1105. }
  1106. return need_refork;
  1107. }