You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

worker_util.c 51KB


  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "rspamd.h"
  18. #include "lua/lua_common.h"
  19. #include "worker_util.h"
  20. #include "unix-std.h"
  21. #include "utlist.h"
  22. #include "ottery.h"
  23. #include "rspamd_control.h"
  24. #include "libserver/maps/map.h"
  25. #include "libserver/maps/map_private.h"
  26. #include "libserver/http/http_private.h"
  27. #include "libserver/http/http_router.h"
  28. #include "libutil/rrd.h"
  29. /* sys/resource.h */
  30. #ifdef HAVE_SYS_RESOURCE_H
  31. #include <sys/resource.h>
  32. #endif
  33. /* pwd and grp */
  34. #ifdef HAVE_PWD_H
  35. #include <pwd.h>
  36. #endif
  37. #ifdef HAVE_GRP_H
  38. #include <grp.h>
  39. #endif
  40. #ifdef HAVE_LIBUTIL_H
  41. #include <libutil.h>
  42. #endif
  43. #include "zlib.h"
  44. #ifdef WITH_LIBUNWIND
  45. #define UNW_LOCAL_ONLY 1
  46. #include <libunwind.h>
  47. #define UNWIND_BACKTRACE_DEPTH 256
  48. #endif
  49. #ifdef HAVE_UCONTEXT_H
  50. #include <ucontext.h>
  51. #elif defined(HAVE_SYS_UCONTEXT_H)
  52. #include <sys/ucontext.h>
  53. #endif
  54. #ifdef HAVE_SYS_WAIT_H
  55. #include <sys/wait.h>
  56. #endif
  57. #include "contrib/libev/ev.h"
  58. #include "libstat/stat_api.h"
  59. /* Forward declaration */
  60. static void rspamd_worker_heartbeat_start (struct rspamd_worker *,
  61. struct ev_loop *);
  62. static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *);
  63. /**
  64. * Return worker's control structure by its type
  65. * @param type
  66. * @return worker's control structure or NULL
  67. */
  68. worker_t *
  69. rspamd_get_worker_by_type (struct rspamd_config *cfg, GQuark type)
  70. {
  71. worker_t **pwrk;
  72. pwrk = cfg->compiled_workers;
  73. while (pwrk && *pwrk) {
  74. if (rspamd_check_worker (cfg, *pwrk)) {
  75. if (g_quark_from_string ((*pwrk)->name) == type) {
  76. return *pwrk;
  77. }
  78. }
  79. pwrk++;
  80. }
  81. return NULL;
  82. }
  83. static void
  84. rspamd_worker_check_finished (EV_P_ ev_timer *w, int revents)
  85. {
  86. int *pnchecks = (int *)w->data;
  87. if (*pnchecks > SOFT_SHUTDOWN_TIME * 10) {
  88. msg_warn ("terminating worker before finishing of terminate handlers");
  89. ev_break (EV_A_ EVBREAK_ONE);
  90. }
  91. else {
  92. int refcount = ev_active_cnt (EV_A);
  93. if (refcount == 1) {
  94. ev_break (EV_A_ EVBREAK_ONE);
  95. }
  96. else {
  97. ev_timer_again (EV_A_ w);
  98. }
  99. }
  100. }
  101. static gboolean
  102. rspamd_worker_finalize (gpointer user_data)
  103. {
  104. struct rspamd_task *task = user_data;
  105. if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
  106. msg_info_task ("finishing actions has been processed, terminating");
  107. /* ev_break (task->event_loop, EVBREAK_ALL); */
  108. task->worker->state = rspamd_worker_wanna_die;
  109. rspamd_session_destroy (task->s);
  110. return TRUE;
  111. }
  112. return FALSE;
  113. }
  114. gboolean
  115. rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
  116. {
  117. struct rspamd_task *task;
  118. struct rspamd_config *cfg = worker->srv->cfg;
  119. struct rspamd_abstract_worker_ctx *ctx;
  120. struct rspamd_config_cfg_lua_script *sc;
  121. if (cfg->on_term_scripts) {
  122. ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx;
  123. /* Create a fake task object for async events */
  124. task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop, FALSE);
  125. task->resolver = ctx->resolver;
  126. task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
  127. task->s = rspamd_session_create (task->task_pool,
  128. rspamd_worker_finalize,
  129. NULL,
  130. (event_finalizer_t) rspamd_task_free,
  131. task);
  132. DL_FOREACH (cfg->on_term_scripts, sc) {
  133. lua_call_finish_script (sc, task);
  134. }
  135. task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING;
  136. if (rspamd_session_pending (task->s)) {
  137. return TRUE;
  138. }
  139. }
  140. return FALSE;
  141. }
  142. static void
  143. rspamd_worker_terminate_handlers (struct rspamd_worker *w)
  144. {
  145. if (w->nconns == 0 &&
  146. (!(w->flags & RSPAMD_WORKER_SCANNER) || w->srv->cfg->on_term_scripts == NULL)) {
  147. /*
  148. * We are here either:
  149. * - No active connections are represented
  150. * - No term scripts are registered
  151. * - Worker is not a scanner, so it can die safely
  152. */
  153. w->state = rspamd_worker_wanna_die;
  154. }
  155. else {
  156. if (w->nconns > 0) {
  157. /*
  158. * Wait until all connections are terminated
  159. */
  160. w->state = rspamd_worker_wait_connections;
  161. }
  162. else {
  163. /*
  164. * Start finish scripts
  165. */
  166. if (w->state != rspamd_worker_wait_final_scripts) {
  167. w->state = rspamd_worker_wait_final_scripts;
  168. if ((w->flags & RSPAMD_WORKER_SCANNER) &&
  169. rspamd_worker_call_finish_handlers (w)) {
  170. msg_info ("performing async finishing actions");
  171. w->state = rspamd_worker_wait_final_scripts;
  172. }
  173. else {
  174. /*
  175. * We are done now
  176. */
  177. msg_info ("no async finishing actions, terminating");
  178. w->state = rspamd_worker_wanna_die;
  179. }
  180. }
  181. }
  182. }
  183. }
  184. static void
  185. rspamd_worker_on_delayed_shutdown (EV_P_ ev_timer *w, int revents)
  186. {
  187. struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
  188. worker->state = rspamd_worker_wanna_die;
  189. ev_timer_stop (EV_A_ w);
  190. ev_break (loop, EVBREAK_ALL);
  191. }
  192. static void
  193. rspamd_worker_shutdown_check (EV_P_ ev_timer *w, int revents)
  194. {
  195. struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
  196. if (worker->state != rspamd_worker_wanna_die) {
  197. rspamd_worker_terminate_handlers (worker);
  198. if (worker->state == rspamd_worker_wanna_die) {
  199. /* We are done, kill event loop */
  200. ev_timer_stop (EV_A_ w);
  201. ev_break (EV_A_ EVBREAK_ALL);
  202. }
  203. else {
  204. /* Try again later */
  205. ev_timer_again (EV_A_ w);
  206. }
  207. }
  208. else {
  209. ev_timer_stop (EV_A_ w);
  210. ev_break (EV_A_ EVBREAK_ALL);
  211. }
  212. }
  213. /*
  214. * Config reload is designed by sending sigusr2 to active workers and pending shutdown of them
  215. */
  216. static gboolean
  217. rspamd_worker_usr2_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
  218. {
  219. /* Do not accept new connections, preparing to end worker's process */
  220. if (sigh->worker->state == rspamd_worker_state_running) {
  221. static ev_timer shutdown_ev, shutdown_check_ev;
  222. ev_tstamp shutdown_ts;
  223. if (sigh->worker->flags & RSPAMD_WORKER_NO_TERMINATE_DELAY) {
  224. shutdown_ts = 0.0;
  225. }
  226. else {
  227. shutdown_ts = MAX (SOFT_SHUTDOWN_TIME,
  228. sigh->worker->srv->cfg->task_timeout * 2.0);
  229. }
  230. rspamd_worker_ignore_signal (sigh);
  231. sigh->worker->state = rspamd_worker_state_terminating;
  232. rspamd_default_log_function (G_LOG_LEVEL_INFO,
  233. sigh->worker->srv->server_pool->tag.tagname,
  234. sigh->worker->srv->server_pool->tag.uid,
  235. G_STRFUNC,
  236. "worker's shutdown is pending in %.2f sec",
  237. shutdown_ts);
  238. /* Soft shutdown timer */
  239. shutdown_ev.data = sigh->worker;
  240. ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
  241. shutdown_ts, 0.0);
  242. ev_timer_start (sigh->event_loop, &shutdown_ev);
  243. if (!(sigh->worker->flags & RSPAMD_WORKER_NO_TERMINATE_DELAY)) {
  244. /* This timer checks if we are ready to die and is called frequently */
  245. shutdown_check_ev.data = sigh->worker;
  246. ev_timer_init (&shutdown_check_ev, rspamd_worker_shutdown_check,
  247. 0.5, 0.5);
  248. ev_timer_start (sigh->event_loop, &shutdown_check_ev);
  249. }
  250. rspamd_worker_stop_accept (sigh->worker);
  251. }
  252. /* No more signals */
  253. return FALSE;
  254. }
  255. /*
  256. * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them
  257. */
  258. static gboolean
  259. rspamd_worker_usr1_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
  260. {
  261. struct rspamd_main *rspamd_main = sigh->worker->srv;
  262. rspamd_log_reopen (sigh->worker->srv->logger, rspamd_main->cfg, -1, -1);
  263. msg_info_main ("logging reinitialised");
  264. /* Get more signals */
  265. return TRUE;
  266. }
  267. static gboolean
  268. rspamd_worker_term_handler (struct rspamd_worker_signal_handler *sigh, void *arg)
  269. {
  270. if (sigh->worker->state == rspamd_worker_state_running) {
  271. static ev_timer shutdown_ev, shutdown_check_ev;
  272. ev_tstamp shutdown_ts;
  273. if (sigh->worker->flags & RSPAMD_WORKER_NO_TERMINATE_DELAY) {
  274. shutdown_ts = 0.0;
  275. }
  276. else {
  277. shutdown_ts = MAX (SOFT_SHUTDOWN_TIME,
  278. sigh->worker->srv->cfg->task_timeout * 2.0);
  279. }
  280. rspamd_worker_ignore_signal (sigh);
  281. sigh->worker->state = rspamd_worker_state_terminating;
  282. rspamd_default_log_function (G_LOG_LEVEL_INFO,
  283. sigh->worker->srv->server_pool->tag.tagname,
  284. sigh->worker->srv->server_pool->tag.uid,
  285. G_STRFUNC,
  286. "terminating after receiving signal %s",
  287. g_strsignal (sigh->signo));
  288. rspamd_worker_stop_accept (sigh->worker);
  289. rspamd_worker_terminate_handlers (sigh->worker);
  290. /* Check if we are ready to die */
  291. if (sigh->worker->state != rspamd_worker_wanna_die) {
  292. /* This timer is called when we have no choices but to die */
  293. shutdown_ev.data = sigh->worker;
  294. ev_timer_init (&shutdown_ev, rspamd_worker_on_delayed_shutdown,
  295. shutdown_ts, 0.0);
  296. ev_timer_start (sigh->event_loop, &shutdown_ev);
  297. if (!(sigh->worker->flags & RSPAMD_WORKER_NO_TERMINATE_DELAY)) {
  298. /* This timer checks if we are ready to die and is called frequently */
  299. shutdown_check_ev.data = sigh->worker;
  300. ev_timer_init (&shutdown_check_ev, rspamd_worker_shutdown_check,
  301. 0.5, 0.5);
  302. ev_timer_start (sigh->event_loop, &shutdown_check_ev);
  303. }
  304. }
  305. else {
  306. /* Flag to die has been already set */
  307. ev_break (sigh->event_loop, EVBREAK_ALL);
  308. }
  309. }
  310. /* Stop reacting on signals */
  311. return FALSE;
  312. }
  313. static void
  314. rspamd_worker_signal_handle (EV_P_ ev_signal *w, int revents)
  315. {
  316. struct rspamd_worker_signal_handler *sigh =
  317. (struct rspamd_worker_signal_handler *)w->data;
  318. struct rspamd_worker_signal_handler_elt *cb, *cbtmp;
  319. /* Call all signal handlers registered */
  320. DL_FOREACH_SAFE (sigh->cb, cb, cbtmp) {
  321. if (!cb->handler (sigh, cb->handler_data)) {
  322. DL_DELETE (sigh->cb, cb);
  323. g_free (cb);
  324. }
  325. }
  326. }
  327. static void
  328. rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *sigh)
  329. {
  330. sigset_t set;
  331. ev_signal_stop (sigh->event_loop, &sigh->ev_sig);
  332. sigemptyset (&set);
  333. sigaddset (&set, sigh->signo);
  334. sigprocmask (SIG_BLOCK, &set, NULL);
  335. }
  336. static void
  337. rspamd_worker_default_signal (int signo)
  338. {
  339. struct sigaction sig;
  340. sigemptyset (&sig.sa_mask);
  341. sigaddset (&sig.sa_mask, signo);
  342. sig.sa_handler = SIG_DFL;
  343. sig.sa_flags = 0;
  344. sigaction (signo, &sig, NULL);
  345. }
  346. static void
  347. rspamd_sigh_free (void *p)
  348. {
  349. struct rspamd_worker_signal_handler *sigh = p;
  350. struct rspamd_worker_signal_handler_elt *cb, *tmp;
  351. DL_FOREACH_SAFE (sigh->cb, cb, tmp) {
  352. DL_DELETE (sigh->cb, cb);
  353. g_free (cb);
  354. }
  355. ev_signal_stop (sigh->event_loop, &sigh->ev_sig);
  356. rspamd_worker_default_signal (sigh->signo);
  357. g_free (sigh);
  358. }
  359. void
  360. rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker,
  361. struct ev_loop *event_loop,
  362. rspamd_worker_signal_cb_t handler,
  363. void *handler_data)
  364. {
  365. struct rspamd_worker_signal_handler *sigh;
  366. struct rspamd_worker_signal_handler_elt *cb;
  367. sigh = g_hash_table_lookup (worker->signal_events, GINT_TO_POINTER (signo));
  368. if (sigh == NULL) {
  369. sigh = g_malloc0 (sizeof (*sigh));
  370. sigh->signo = signo;
  371. sigh->worker = worker;
  372. sigh->event_loop = event_loop;
  373. sigh->enabled = TRUE;
  374. sigh->ev_sig.data = sigh;
  375. ev_signal_init (&sigh->ev_sig, rspamd_worker_signal_handle, signo);
  376. ev_signal_start (event_loop, &sigh->ev_sig);
  377. g_hash_table_insert (worker->signal_events,
  378. GINT_TO_POINTER (signo),
  379. sigh);
  380. }
  381. cb = g_malloc0 (sizeof (*cb));
  382. cb->handler = handler;
  383. cb->handler_data = handler_data;
  384. DL_APPEND (sigh->cb, cb);
  385. }
  386. void
  387. rspamd_worker_init_signals (struct rspamd_worker *worker,
  388. struct ev_loop *event_loop)
  389. {
  390. /* A set of terminating signals */
  391. rspamd_worker_set_signal_handler (SIGTERM, worker, event_loop,
  392. rspamd_worker_term_handler, NULL);
  393. rspamd_worker_set_signal_handler (SIGINT, worker, event_loop,
  394. rspamd_worker_term_handler, NULL);
  395. rspamd_worker_set_signal_handler (SIGHUP, worker, event_loop,
  396. rspamd_worker_term_handler, NULL);
  397. /* Special purpose signals */
  398. rspamd_worker_set_signal_handler (SIGUSR1, worker, event_loop,
  399. rspamd_worker_usr1_handler, NULL);
  400. rspamd_worker_set_signal_handler (SIGUSR2, worker, event_loop,
  401. rspamd_worker_usr2_handler, NULL);
  402. }
  403. struct ev_loop *
  404. rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
  405. rspamd_accept_handler hdl)
  406. {
  407. struct ev_loop *event_loop;
  408. GList *cur;
  409. struct rspamd_worker_listen_socket *ls;
  410. struct rspamd_worker_accept_event *accept_ev;
  411. worker->signal_events = g_hash_table_new_full (g_direct_hash, g_direct_equal,
  412. NULL, rspamd_sigh_free);
  413. event_loop = ev_loop_new (rspamd_config_ev_backend_get (worker->srv->cfg) |
  414. EVFLAG_SIGNALFD);
  415. worker->srv->event_loop = event_loop;
  416. rspamd_worker_init_signals (worker, event_loop);
  417. rspamd_control_worker_add_default_cmd_handlers (worker, event_loop);
  418. rspamd_worker_heartbeat_start (worker, event_loop);
  419. #ifdef WITH_HIREDIS
  420. rspamd_redis_pool_config (worker->srv->cfg->redis_pool,
  421. worker->srv->cfg, event_loop);
  422. #endif
  423. /* Accept all sockets */
  424. if (hdl) {
  425. cur = worker->cf->listen_socks;
  426. while (cur) {
  427. ls = cur->data;
  428. if (ls->fd != -1) {
  429. accept_ev = g_malloc0 (sizeof (*accept_ev));
  430. accept_ev->event_loop = event_loop;
  431. accept_ev->accept_ev.data = worker;
  432. ev_io_init (&accept_ev->accept_ev, hdl, ls->fd, EV_READ);
  433. ev_io_start (event_loop, &accept_ev->accept_ev);
  434. DL_APPEND (worker->accept_events, accept_ev);
  435. }
  436. cur = g_list_next (cur);
  437. }
  438. }
  439. return event_loop;
  440. }
  441. void
  442. rspamd_worker_stop_accept (struct rspamd_worker *worker)
  443. {
  444. struct rspamd_worker_accept_event *cur, *tmp;
  445. /* Remove all events */
  446. DL_FOREACH_SAFE (worker->accept_events, cur, tmp) {
  447. if (ev_can_stop (&cur->accept_ev)) {
  448. ev_io_stop (cur->event_loop, &cur->accept_ev);
  449. }
  450. if (ev_can_stop (&cur->throttling_ev)) {
  451. ev_timer_stop (cur->event_loop, &cur->throttling_ev);
  452. }
  453. g_free (cur);
  454. }
  455. /* XXX: we need to do it much later */
  456. #if 0
  457. g_hash_table_iter_init (&it, worker->signal_events);
  458. while (g_hash_table_iter_next (&it, &k, &v)) {
  459. sigh = (struct rspamd_worker_signal_handler *)v;
  460. g_hash_table_iter_steal (&it);
  461. if (sigh->enabled) {
  462. event_del (&sigh->ev);
  463. }
  464. g_free (sigh);
  465. }
  466. g_hash_table_unref (worker->signal_events);
  467. #endif
  468. }
  469. static rspamd_fstring_t *
  470. rspamd_controller_maybe_compress (struct rspamd_http_connection_entry *entry,
  471. rspamd_fstring_t *buf, struct rspamd_http_message *msg)
  472. {
  473. if (entry->support_gzip) {
  474. if (rspamd_fstring_gzip (&buf)) {
  475. rspamd_http_message_add_header (msg, "Content-Encoding", "gzip");
  476. }
  477. }
  478. return buf;
  479. }
  480. void
  481. rspamd_controller_send_error (struct rspamd_http_connection_entry *entry,
  482. gint code, const gchar *error_msg, ...)
  483. {
  484. struct rspamd_http_message *msg;
  485. va_list args;
  486. rspamd_fstring_t *reply;
  487. msg = rspamd_http_new_message (HTTP_RESPONSE);
  488. va_start (args, error_msg);
  489. msg->status = rspamd_fstring_new ();
  490. rspamd_vprintf_fstring (&msg->status, error_msg, args);
  491. va_end (args);
  492. msg->date = time (NULL);
  493. msg->code = code;
  494. reply = rspamd_fstring_sized_new (msg->status->len + 16);
  495. rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status);
  496. rspamd_http_message_set_body_from_fstring_steal (msg,
  497. rspamd_controller_maybe_compress (entry, reply, msg));
  498. rspamd_http_connection_reset (entry->conn);
  499. rspamd_http_router_insert_headers (entry->rt, msg);
  500. rspamd_http_connection_write_message (entry->conn,
  501. msg,
  502. NULL,
  503. "application/json",
  504. entry,
  505. entry->rt->timeout);
  506. entry->is_reply = TRUE;
  507. }
  508. void
  509. rspamd_controller_send_string (struct rspamd_http_connection_entry *entry,
  510. const gchar *str)
  511. {
  512. struct rspamd_http_message *msg;
  513. rspamd_fstring_t *reply;
  514. msg = rspamd_http_new_message (HTTP_RESPONSE);
  515. msg->date = time (NULL);
  516. msg->code = 200;
  517. msg->status = rspamd_fstring_new_init ("OK", 2);
  518. if (str) {
  519. reply = rspamd_fstring_new_init (str, strlen (str));
  520. }
  521. else {
  522. reply = rspamd_fstring_new_init ("null", 4);
  523. }
  524. rspamd_http_message_set_body_from_fstring_steal (msg,
  525. rspamd_controller_maybe_compress (entry, reply, msg));
  526. rspamd_http_connection_reset (entry->conn);
  527. rspamd_http_router_insert_headers (entry->rt, msg);
  528. rspamd_http_connection_write_message (entry->conn,
  529. msg,
  530. NULL,
  531. "application/json",
  532. entry,
  533. entry->rt->timeout);
  534. entry->is_reply = TRUE;
  535. }
  536. void
  537. rspamd_controller_send_ucl (struct rspamd_http_connection_entry *entry,
  538. ucl_object_t *obj)
  539. {
  540. struct rspamd_http_message *msg;
  541. rspamd_fstring_t *reply;
  542. msg = rspamd_http_new_message (HTTP_RESPONSE);
  543. msg->date = time (NULL);
  544. msg->code = 200;
  545. msg->status = rspamd_fstring_new_init ("OK", 2);
  546. reply = rspamd_fstring_sized_new (BUFSIZ);
  547. rspamd_ucl_emit_fstring (obj, UCL_EMIT_JSON_COMPACT, &reply);
  548. rspamd_http_message_set_body_from_fstring_steal (msg,
  549. rspamd_controller_maybe_compress (entry, reply, msg));
  550. rspamd_http_connection_reset (entry->conn);
  551. rspamd_http_router_insert_headers (entry->rt, msg);
  552. rspamd_http_connection_write_message (entry->conn,
  553. msg,
  554. NULL,
  555. "application/json",
  556. entry,
  557. entry->rt->timeout);
  558. entry->is_reply = TRUE;
  559. }
  560. static void
  561. rspamd_worker_drop_priv (struct rspamd_main *rspamd_main)
  562. {
  563. if (rspamd_main->is_privilleged) {
  564. if (setgid (rspamd_main->workers_gid) == -1) {
  565. msg_err_main ("cannot setgid to %d (%s), aborting",
  566. (gint) rspamd_main->workers_gid,
  567. strerror (errno));
  568. exit (-errno);
  569. }
  570. if (rspamd_main->cfg->rspamd_user &&
  571. initgroups (rspamd_main->cfg->rspamd_user,
  572. rspamd_main->workers_gid) == -1) {
  573. msg_err_main ("initgroups failed (%s), aborting", strerror (errno));
  574. exit (-errno);
  575. }
  576. if (setuid (rspamd_main->workers_uid) == -1) {
  577. msg_err_main ("cannot setuid to %d (%s), aborting",
  578. (gint) rspamd_main->workers_uid,
  579. strerror (errno));
  580. exit (-errno);
  581. }
  582. }
  583. }
  584. static void
  585. rspamd_worker_set_limits (struct rspamd_main *rspamd_main,
  586. struct rspamd_worker_conf *cf)
  587. {
  588. struct rlimit rlmt;
  589. if (cf->rlimit_nofile != 0) {
  590. rlmt.rlim_cur = (rlim_t) cf->rlimit_nofile;
  591. rlmt.rlim_max = (rlim_t) cf->rlimit_nofile;
  592. if (setrlimit (RLIMIT_NOFILE, &rlmt) == -1) {
  593. msg_warn_main ("cannot set files rlimit: %L, %s",
  594. cf->rlimit_nofile,
  595. strerror (errno));
  596. }
  597. memset (&rlmt, 0, sizeof (rlmt));
  598. if (getrlimit (RLIMIT_NOFILE, &rlmt) == -1) {
  599. msg_warn_main ("cannot get max files rlimit: %HL, %s",
  600. cf->rlimit_maxcore,
  601. strerror (errno));
  602. }
  603. else {
  604. msg_info_main ("set max file descriptors limit: %HL cur and %HL max",
  605. (guint64) rlmt.rlim_cur,
  606. (guint64) rlmt.rlim_max);
  607. }
  608. }
  609. else {
  610. /* Just report */
  611. if (getrlimit (RLIMIT_NOFILE, &rlmt) == -1) {
  612. msg_warn_main ("cannot get max files rlimit: %HL, %s",
  613. cf->rlimit_maxcore,
  614. strerror (errno));
  615. }
  616. else {
  617. msg_info_main ("use system max file descriptors limit: %HL cur and %HL max",
  618. (guint64) rlmt.rlim_cur,
  619. (guint64) rlmt.rlim_max);
  620. }
  621. }
  622. if (rspamd_main->cores_throttling) {
  623. msg_info_main ("disable core files for the new worker as limits are reached");
  624. rlmt.rlim_cur = 0;
  625. rlmt.rlim_max = 0;
  626. if (setrlimit (RLIMIT_CORE, &rlmt) == -1) {
  627. msg_warn_main ("cannot disable core dumps: error when setting limits: %s",
  628. strerror (errno));
  629. }
  630. }
  631. else {
  632. if (cf->rlimit_maxcore != 0) {
  633. rlmt.rlim_cur = (rlim_t) cf->rlimit_maxcore;
  634. rlmt.rlim_max = (rlim_t) cf->rlimit_maxcore;
  635. if (setrlimit (RLIMIT_CORE, &rlmt) == -1) {
  636. msg_warn_main ("cannot set max core size limit: %HL, %s",
  637. cf->rlimit_maxcore,
  638. strerror (errno));
  639. }
  640. /* Ensure that we did it */
  641. memset (&rlmt, 0, sizeof (rlmt));
  642. if (getrlimit (RLIMIT_CORE, &rlmt) == -1) {
  643. msg_warn_main ("cannot get max core size rlimit: %HL, %s",
  644. cf->rlimit_maxcore,
  645. strerror (errno));
  646. }
  647. else {
  648. if (rlmt.rlim_cur != cf->rlimit_maxcore ||
  649. rlmt.rlim_max != cf->rlimit_maxcore) {
  650. msg_warn_main ("setting of core file limits was unsuccessful: "
  651. "%HL was wanted, "
  652. "but we have %HL cur and %HL max",
  653. cf->rlimit_maxcore,
  654. (guint64) rlmt.rlim_cur,
  655. (guint64) rlmt.rlim_max);
  656. }
  657. else {
  658. msg_info_main ("set max core size limit: %HL cur and %HL max",
  659. (guint64) rlmt.rlim_cur,
  660. (guint64) rlmt.rlim_max);
  661. }
  662. }
  663. }
  664. else {
  665. /* Just report */
  666. if (getrlimit (RLIMIT_CORE, &rlmt) == -1) {
  667. msg_warn_main ("cannot get max core size limit: %HL, %s",
  668. cf->rlimit_maxcore,
  669. strerror (errno));
  670. }
  671. else {
  672. msg_info_main ("use system max core size limit: %HL cur and %HL max",
  673. (guint64) rlmt.rlim_cur,
  674. (guint64) rlmt.rlim_max);
  675. }
  676. }
  677. }
  678. }
  679. static void
  680. rspamd_worker_on_term (EV_P_ ev_child *w, int revents)
  681. {
  682. struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
  683. if (wrk->ppid == getpid ()) {
  684. if (wrk->term_handler) {
  685. wrk->term_handler (EV_A_ w, wrk->srv, wrk);
  686. }
  687. else {
  688. rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
  689. }
  690. }
  691. else {
  692. /* Ignore SIGCHLD for not our children... */
  693. }
  694. }
  695. static void
  696. rspamd_worker_heartbeat_cb (EV_P_ ev_timer *w, int revents)
  697. {
  698. struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
  699. struct rspamd_srv_command cmd;
  700. memset (&cmd, 0, sizeof (cmd));
  701. cmd.type = RSPAMD_SRV_HEARTBEAT;
  702. rspamd_srv_send_command (wrk, EV_A, &cmd, -1, NULL, NULL);
  703. }
  704. static void
  705. rspamd_worker_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop)
  706. {
  707. wrk->hb.heartbeat_ev.data = (void *)wrk;
  708. ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_worker_heartbeat_cb,
  709. 0.0, wrk->srv->cfg->heartbeat_interval);
  710. ev_timer_start (event_loop, &wrk->hb.heartbeat_ev);
  711. }
  712. static void
  713. rspamd_main_heartbeat_cb (EV_P_ ev_timer *w, int revents)
  714. {
  715. struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
  716. gdouble time_from_last = ev_time ();
  717. struct rspamd_main *rspamd_main;
  718. static struct rspamd_control_command cmd;
  719. struct tm tm;
  720. gchar timebuf[64];
  721. gchar usec_buf[16];
  722. gint r;
  723. time_from_last -= wrk->hb.last_event;
  724. rspamd_main = wrk->srv;
  725. if (wrk->hb.last_event > 0 &&
  726. time_from_last > 0 &&
  727. time_from_last >= rspamd_main->cfg->heartbeat_interval * 2) {
  728. rspamd_localtime (wrk->hb.last_event, &tm);
  729. r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm);
  730. rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f",
  731. wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event);
  732. rspamd_snprintf (timebuf + r, sizeof (timebuf) - r,
  733. "%s", usec_buf + 1);
  734. if (wrk->hb.nbeats > 0) {
  735. /* First time lost event */
  736. cmd.type = RSPAMD_CONTROL_CHILD_CHANGE;
  737. cmd.cmd.child_change.what = rspamd_child_offline;
  738. cmd.cmd.child_change.pid = wrk->pid;
  739. rspamd_control_broadcast_srv_cmd (rspamd_main, &cmd, wrk->pid);
  740. msg_warn_main ("lost heartbeat from worker type %s with pid %P, "
  741. "last beat on: %s (%L beats received previously)",
  742. g_quark_to_string (wrk->type), wrk->pid,
  743. timebuf,
  744. wrk->hb.nbeats);
  745. wrk->hb.nbeats = -1;
  746. /* TODO: send notify about worker problem */
  747. }
  748. else {
  749. wrk->hb.nbeats --;
  750. msg_warn_main ("lost %L heartbeat from worker type %s with pid %P, "
  751. "last beat on: %s",
  752. -(wrk->hb.nbeats),
  753. g_quark_to_string (wrk->type),
  754. wrk->pid,
  755. timebuf);
  756. if (rspamd_main->cfg->heartbeats_loss_max > 0 &&
  757. -(wrk->hb.nbeats) >= rspamd_main->cfg->heartbeats_loss_max) {
  758. if (-(wrk->hb.nbeats) > rspamd_main->cfg->heartbeats_loss_max + 1) {
  759. msg_err_main ("force kill worker type %s with pid %P, "
  760. "last beat on: %s; %L heartbeat lost",
  761. g_quark_to_string (wrk->type),
  762. wrk->pid,
  763. timebuf,
  764. -(wrk->hb.nbeats));
  765. kill (wrk->pid, SIGKILL);
  766. }
  767. else {
  768. msg_err_main ("terminate worker type %s with pid %P, "
  769. "last beat on: %s; %L heartbeat lost",
  770. g_quark_to_string (wrk->type),
  771. wrk->pid,
  772. timebuf,
  773. -(wrk->hb.nbeats));
  774. kill (wrk->pid, SIGTERM);
  775. }
  776. }
  777. }
  778. }
  779. else if (wrk->hb.nbeats < 0) {
  780. rspamd_localtime (wrk->hb.last_event, &tm);
  781. r = strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tm);
  782. rspamd_snprintf (usec_buf, sizeof (usec_buf), "%.5f",
  783. wrk->hb.last_event - (gdouble)(time_t)wrk->hb.last_event);
  784. rspamd_snprintf (timebuf + r, sizeof (timebuf) - r,
  785. "%s", usec_buf + 1);
  786. cmd.type = RSPAMD_CONTROL_CHILD_CHANGE;
  787. cmd.cmd.child_change.what = rspamd_child_online;
  788. cmd.cmd.child_change.pid = wrk->pid;
  789. rspamd_control_broadcast_srv_cmd (rspamd_main, &cmd, wrk->pid);
  790. msg_info_main ("received heartbeat from worker type %s with pid %P, "
  791. "last beat on: %s (%L beats lost previously)",
  792. g_quark_to_string (wrk->type), wrk->pid,
  793. timebuf,
  794. -(wrk->hb.nbeats));
  795. wrk->hb.nbeats = 1;
  796. /* TODO: send notify about worker restoration */
  797. }
  798. }
  799. static void
  800. rspamd_main_heartbeat_start (struct rspamd_worker *wrk, struct ev_loop *event_loop)
  801. {
  802. wrk->hb.heartbeat_ev.data = (void *)wrk;
  803. ev_timer_init (&wrk->hb.heartbeat_ev, rspamd_main_heartbeat_cb,
  804. 0.0, wrk->srv->cfg->heartbeat_interval * 2);
  805. ev_timer_start (event_loop, &wrk->hb.heartbeat_ev);
  806. }
  807. struct rspamd_worker *
  808. rspamd_fork_worker (struct rspamd_main *rspamd_main,
  809. struct rspamd_worker_conf *cf,
  810. guint index,
  811. struct ev_loop *ev_base,
  812. rspamd_worker_term_cb term_handler)
  813. {
  814. struct rspamd_worker *wrk;
  815. gint rc;
  816. struct rlimit rlim;
  817. /* Starting worker process */
  818. wrk = (struct rspamd_worker *) g_malloc0 (sizeof (struct rspamd_worker));
  819. if (!rspamd_socketpair (wrk->control_pipe, 0)) {
  820. msg_err ("socketpair failure: %s", strerror (errno));
  821. rspamd_hard_terminate (rspamd_main);
  822. }
  823. if (!rspamd_socketpair (wrk->srv_pipe, 0)) {
  824. msg_err ("socketpair failure: %s", strerror (errno));
  825. rspamd_hard_terminate (rspamd_main);
  826. }
  827. if (cf->bind_conf) {
  828. msg_info_main ("prepare to fork process %s (%d); listen on: %s",
  829. cf->worker->name,
  830. index, cf->bind_conf->name);
  831. }
  832. else {
  833. msg_info_main ("prepare to fork process %s (%d), no bind socket",
  834. cf->worker->name,
  835. index);
  836. }
  837. wrk->srv = rspamd_main;
  838. wrk->type = cf->type;
  839. wrk->cf = cf;
  840. wrk->flags = cf->worker->flags;
  841. REF_RETAIN (cf);
  842. wrk->index = index;
  843. wrk->ctx = cf->ctx;
  844. wrk->ppid = getpid ();
  845. wrk->pid = fork ();
  846. wrk->cores_throttled = rspamd_main->cores_throttling;
  847. wrk->term_handler = term_handler;
  848. switch (wrk->pid) {
  849. case 0:
  850. /* Update pid for logging */
  851. rspamd_log_on_fork (cf->type, rspamd_main->cfg, rspamd_main->logger);
  852. wrk->pid = getpid ();
  853. /* Init PRNG after fork */
  854. rc = ottery_init (rspamd_main->cfg->libs_ctx->ottery_cfg);
  855. if (rc != OTTERY_ERR_NONE) {
  856. msg_err_main ("cannot initialize PRNG: %d", rc);
  857. abort ();
  858. }
  859. rspamd_random_seed_fast ();
  860. #ifdef HAVE_EVUTIL_RNG_INIT
  861. evutil_secure_rng_init ();
  862. #endif
  863. /*
  864. * Libev stores all signals in a global table, so
  865. * previous handlers must be explicitly detached and forgotten
  866. * before starting a new loop
  867. */
  868. ev_signal_stop (rspamd_main->event_loop, &rspamd_main->int_ev);
  869. ev_signal_stop (rspamd_main->event_loop, &rspamd_main->term_ev);
  870. ev_signal_stop (rspamd_main->event_loop, &rspamd_main->hup_ev);
  871. ev_signal_stop (rspamd_main->event_loop, &rspamd_main->usr1_ev);
  872. /* Remove the inherited event base */
  873. ev_loop_destroy (rspamd_main->event_loop);
  874. rspamd_main->event_loop = NULL;
  875. /* Drop privileges */
  876. rspamd_worker_drop_priv (rspamd_main);
  877. /* Set limits */
  878. rspamd_worker_set_limits (rspamd_main, cf);
  879. /* Re-set stack limit */
  880. getrlimit (RLIMIT_STACK, &rlim);
  881. rlim.rlim_cur = 100 * 1024 * 1024;
  882. rlim.rlim_max = rlim.rlim_cur;
  883. setrlimit (RLIMIT_STACK, &rlim);
  884. if (cf->bind_conf) {
  885. setproctitle ("%s process (%s)", cf->worker->name,
  886. cf->bind_conf->bind_line);
  887. }
  888. else {
  889. setproctitle ("%s process", cf->worker->name);
  890. }
  891. if (rspamd_main->pfh) {
  892. rspamd_pidfile_close (rspamd_main->pfh);
  893. }
  894. if (rspamd_main->cfg->log_silent_workers) {
  895. rspamd_log_set_log_level (rspamd_main->logger, G_LOG_LEVEL_MESSAGE);
  896. }
  897. wrk->start_time = rspamd_get_calendar_ticks ();
  898. if (cf->bind_conf) {
  899. msg_info_main ("starting %s process %P (%d); listen on: %s",
  900. cf->worker->name,
  901. getpid (), index, cf->bind_conf->bind_line);
  902. }
  903. else {
  904. msg_info_main ("starting %s process %P (%d)", cf->worker->name,
  905. getpid (), index);
  906. }
  907. /* Close parent part of socketpair */
  908. close (wrk->control_pipe[0]);
  909. close (wrk->srv_pipe[0]);
  910. rspamd_socket_nonblocking (wrk->control_pipe[1]);
  911. rspamd_socket_nonblocking (wrk->srv_pipe[1]);
  912. rspamd_main->cfg->cur_worker = wrk;
  913. /* Execute worker (this function should not return normally!) */
  914. cf->worker->worker_start_func (wrk);
  915. /* To distinguish from normal termination */
  916. exit (EXIT_FAILURE);
  917. break;
  918. case -1:
  919. msg_err_main ("cannot fork main process: %s", strerror (errno));
  920. if (rspamd_main->pfh) {
  921. rspamd_pidfile_remove (rspamd_main->pfh);
  922. }
  923. rspamd_hard_terminate (rspamd_main);
  924. break;
  925. default:
  926. /* Close worker part of socketpair */
  927. close (wrk->control_pipe[1]);
  928. close (wrk->srv_pipe[1]);
  929. rspamd_socket_nonblocking (wrk->control_pipe[0]);
  930. rspamd_socket_nonblocking (wrk->srv_pipe[0]);
  931. rspamd_srv_start_watching (rspamd_main, wrk, ev_base);
  932. wrk->cld_ev.data = wrk;
  933. ev_child_init (&wrk->cld_ev, rspamd_worker_on_term, wrk->pid, 0);
  934. ev_child_start (rspamd_main->event_loop, &wrk->cld_ev);
  935. rspamd_main_heartbeat_start (wrk, rspamd_main->event_loop);
  936. /* Insert worker into worker's table, pid is index */
  937. g_hash_table_insert (rspamd_main->workers,
  938. GSIZE_TO_POINTER (wrk->pid), wrk);
  939. break;
  940. }
  941. return wrk;
  942. }
  943. void
  944. rspamd_worker_block_signals (void)
  945. {
  946. sigset_t set;
  947. sigemptyset (&set);
  948. sigaddset (&set, SIGTERM);
  949. sigaddset (&set, SIGINT);
  950. sigaddset (&set, SIGHUP);
  951. sigaddset (&set, SIGUSR1);
  952. sigaddset (&set, SIGUSR2);
  953. sigprocmask (SIG_BLOCK, &set, NULL);
  954. }
  955. void
  956. rspamd_worker_unblock_signals (void)
  957. {
  958. sigset_t set;
  959. sigemptyset (&set);
  960. sigaddset (&set, SIGTERM);
  961. sigaddset (&set, SIGINT);
  962. sigaddset (&set, SIGHUP);
  963. sigaddset (&set, SIGUSR1);
  964. sigaddset (&set, SIGUSR2);
  965. sigprocmask (SIG_UNBLOCK, &set, NULL);
  966. }
  967. void
  968. rspamd_hard_terminate (struct rspamd_main *rspamd_main)
  969. {
  970. GHashTableIter it;
  971. gpointer k, v;
  972. struct rspamd_worker *w;
  973. sigset_t set;
  974. /* Block all signals */
  975. sigemptyset (&set);
  976. sigaddset (&set, SIGTERM);
  977. sigaddset (&set, SIGINT);
  978. sigaddset (&set, SIGHUP);
  979. sigaddset (&set, SIGUSR1);
  980. sigaddset (&set, SIGUSR2);
  981. sigaddset (&set, SIGCHLD);
  982. sigprocmask (SIG_BLOCK, &set, NULL);
  983. /* We need to terminate all workers that might be already spawned */
  984. rspamd_worker_block_signals ();
  985. g_hash_table_iter_init (&it, rspamd_main->workers);
  986. while (g_hash_table_iter_next (&it, &k, &v)) {
  987. w = v;
  988. msg_err_main ("kill worker %P as Rspamd is terminating due to "
  989. "an unrecoverable error", w->pid);
  990. kill (w->pid, SIGKILL);
  991. }
  992. msg_err_main ("shutting down Rspamd due to fatal error");
  993. rspamd_log_close (rspamd_main->logger);
  994. exit (EXIT_FAILURE);
  995. }
  996. gboolean
  997. rspamd_worker_is_scanner (struct rspamd_worker *w)
  998. {
  999. if (w) {
  1000. return !!(w->flags & RSPAMD_WORKER_SCANNER);
  1001. }
  1002. return FALSE;
  1003. }
  1004. gboolean
  1005. rspamd_worker_is_primary_controller (struct rspamd_worker *w)
  1006. {
  1007. if (w) {
  1008. return !!(w->flags & RSPAMD_WORKER_CONTROLLER) && w->index == 0;
  1009. }
  1010. return FALSE;
  1011. }
  1012. struct rspamd_worker_session_elt {
  1013. void *ptr;
  1014. guint *pref;
  1015. const gchar *tag;
  1016. time_t when;
  1017. };
  1018. struct rspamd_worker_session_cache {
  1019. struct ev_loop *ev_base;
  1020. GHashTable *cache;
  1021. struct rspamd_config *cfg;
  1022. struct ev_timer periodic;
  1023. };
  1024. static gint
  1025. rspamd_session_cache_sort_cmp (gconstpointer pa, gconstpointer pb)
  1026. {
  1027. const struct rspamd_worker_session_elt
  1028. *e1 = *(const struct rspamd_worker_session_elt **)pa,
  1029. *e2 = *(const struct rspamd_worker_session_elt **)pb;
  1030. return e2->when < e1->when;
  1031. }
  1032. static void
  1033. rspamd_sessions_cache_periodic (EV_P_ ev_timer *w, int revents)
  1034. {
  1035. struct rspamd_worker_session_cache *c =
  1036. (struct rspamd_worker_session_cache *)w->data;
  1037. GHashTableIter it;
  1038. gchar timebuf[32];
  1039. gpointer k, v;
  1040. struct rspamd_worker_session_elt *elt;
  1041. struct tm tms;
  1042. GPtrArray *res;
  1043. guint i;
  1044. if (g_hash_table_size (c->cache) > c->cfg->max_sessions_cache) {
  1045. res = g_ptr_array_sized_new (g_hash_table_size (c->cache));
  1046. g_hash_table_iter_init (&it, c->cache);
  1047. while (g_hash_table_iter_next (&it, &k, &v)) {
  1048. g_ptr_array_add (res, v);
  1049. }
  1050. msg_err ("sessions cache is overflowed %d elements where %d is limit",
  1051. (gint)res->len, (gint)c->cfg->max_sessions_cache);
  1052. g_ptr_array_sort (res, rspamd_session_cache_sort_cmp);
  1053. PTR_ARRAY_FOREACH (res, i, elt) {
  1054. rspamd_localtime (elt->when, &tms);
  1055. strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", &tms);
  1056. msg_warn ("redundant session; ptr: %p, "
  1057. "tag: %s, refcount: %d, time: %s",
  1058. elt->ptr, elt->tag ? elt->tag : "unknown",
  1059. elt->pref ? *elt->pref : 0,
  1060. timebuf);
  1061. }
  1062. }
  1063. ev_timer_again (EV_A_ w);
  1064. }
  1065. void *
  1066. rspamd_worker_session_cache_new (struct rspamd_worker *w,
  1067. struct ev_loop *ev_base)
  1068. {
  1069. struct rspamd_worker_session_cache *c;
  1070. static const gdouble periodic_interval = 60.0;
  1071. c = g_malloc0 (sizeof (*c));
  1072. c->ev_base = ev_base;
  1073. c->cache = g_hash_table_new_full (g_direct_hash, g_direct_equal,
  1074. NULL, g_free);
  1075. c->cfg = w->srv->cfg;
  1076. c->periodic.data = c;
  1077. ev_timer_init (&c->periodic, rspamd_sessions_cache_periodic, periodic_interval,
  1078. periodic_interval);
  1079. ev_timer_start (ev_base, &c->periodic);
  1080. return c;
  1081. }
  1082. void
  1083. rspamd_worker_session_cache_add (void *cache, const gchar *tag,
  1084. guint *pref, void *ptr)
  1085. {
  1086. struct rspamd_worker_session_cache *c = cache;
  1087. struct rspamd_worker_session_elt *elt;
  1088. elt = g_malloc0 (sizeof (*elt));
  1089. elt->pref = pref;
  1090. elt->ptr = ptr;
  1091. elt->tag = tag;
  1092. elt->when = time (NULL);
  1093. g_hash_table_insert (c->cache, elt->ptr, elt);
  1094. }
  1095. void
  1096. rspamd_worker_session_cache_remove (void *cache, void *ptr)
  1097. {
  1098. struct rspamd_worker_session_cache *c = cache;
  1099. g_hash_table_remove (c->cache, ptr);
  1100. }
  1101. static void
  1102. rspamd_worker_monitored_on_change (struct rspamd_monitored_ctx *ctx,
  1103. struct rspamd_monitored *m, gboolean alive,
  1104. void *ud)
  1105. {
  1106. struct rspamd_worker *worker = ud;
  1107. struct rspamd_config *cfg = worker->srv->cfg;
  1108. struct ev_loop *ev_base;
  1109. guchar tag[RSPAMD_MONITORED_TAG_LEN];
  1110. static struct rspamd_srv_command srv_cmd;
  1111. rspamd_monitored_get_tag (m, tag);
  1112. ev_base = rspamd_monitored_ctx_get_ev_base (ctx);
  1113. memset (&srv_cmd, 0, sizeof (srv_cmd));
  1114. srv_cmd.type = RSPAMD_SRV_MONITORED_CHANGE;
  1115. rspamd_strlcpy (srv_cmd.cmd.monitored_change.tag, tag,
  1116. sizeof (srv_cmd.cmd.monitored_change.tag));
  1117. srv_cmd.cmd.monitored_change.alive = alive;
  1118. srv_cmd.cmd.monitored_change.sender = getpid ();
  1119. msg_info_config ("broadcast monitored update for %s: %s",
  1120. srv_cmd.cmd.monitored_change.tag, alive ? "alive" : "dead");
  1121. rspamd_srv_send_command (worker, ev_base, &srv_cmd, -1, NULL, NULL);
  1122. }
  1123. void
  1124. rspamd_worker_init_monitored (struct rspamd_worker *worker,
  1125. struct ev_loop *ev_base,
  1126. struct rspamd_dns_resolver *resolver)
  1127. {
  1128. rspamd_monitored_ctx_config (worker->srv->cfg->monitored_ctx,
  1129. worker->srv->cfg, ev_base, resolver->r,
  1130. rspamd_worker_monitored_on_change, worker);
  1131. }
  1132. #ifdef HAVE_SA_SIGINFO
  1133. #ifdef WITH_LIBUNWIND
  1134. static void
  1135. rspamd_print_crash (ucontext_t *uap)
  1136. {
  1137. unw_cursor_t cursor;
  1138. unw_word_t ip, off;
  1139. guint level;
  1140. gint ret;
  1141. if ((ret = unw_init_local (&cursor, uap)) != 0) {
  1142. msg_err ("unw_init_local: %d", ret);
  1143. return;
  1144. }
  1145. level = 0;
  1146. ret = 0;
  1147. for (;;) {
  1148. char name[128];
  1149. if (level >= UNWIND_BACKTRACE_DEPTH) {
  1150. break;
  1151. }
  1152. unw_get_reg (&cursor, UNW_REG_IP, &ip);
  1153. ret = unw_get_proc_name(&cursor, name, sizeof (name), &off);
  1154. if (ret == 0) {
  1155. msg_err ("%d: %p: %s()+0x%xl",
  1156. level, ip, name, (uintptr_t)off);
  1157. } else {
  1158. msg_err ("%d: %p: <unknown>", level, ip);
  1159. }
  1160. level++;
  1161. ret = unw_step (&cursor);
  1162. if (ret <= 0) {
  1163. break;
  1164. }
  1165. }
  1166. if (ret < 0) {
  1167. msg_err ("unw_step_ptr: %d", ret);
  1168. }
  1169. }
  1170. #endif
  1171. static struct rspamd_main *saved_main = NULL;
  1172. static gboolean
  1173. rspamd_crash_propagate (gpointer key, gpointer value, gpointer unused)
  1174. {
  1175. struct rspamd_worker *w = value;
  1176. /* Kill children softly */
  1177. kill (w->pid, SIGTERM);
  1178. return TRUE;
  1179. }
  1180. static void
  1181. rspamd_crash_sig_handler (int sig, siginfo_t *info, void *ctx)
  1182. {
  1183. struct sigaction sa;
  1184. ucontext_t *uap = ctx;
  1185. pid_t pid;
  1186. pid = getpid ();
  1187. msg_err ("caught fatal signal %d(%s), "
  1188. "pid: %P, trace: ",
  1189. sig, strsignal (sig), pid);
  1190. (void)uap;
  1191. #ifdef WITH_LIBUNWIND
  1192. rspamd_print_crash (uap);
  1193. #endif
  1194. msg_err ("please see Rspamd FAQ to learn how to dump core files and how to "
  1195. "fill a bug report");
  1196. if (saved_main) {
  1197. if (pid == saved_main->pid) {
  1198. /*
  1199. * Main process has crashed, propagate crash further to trigger
  1200. * monitoring alerts and mass panic
  1201. */
  1202. g_hash_table_foreach_remove (saved_main->workers,
  1203. rspamd_crash_propagate, NULL);
  1204. }
  1205. }
  1206. /*
  1207. * Invoke signal with the default handler
  1208. */
  1209. sigemptyset (&sa.sa_mask);
  1210. sa.sa_handler = SIG_DFL;
  1211. sa.sa_flags = 0;
  1212. sigaction (sig, &sa, NULL);
  1213. kill (pid, sig);
  1214. }
  1215. #endif
  1216. void
  1217. rspamd_set_crash_handler (struct rspamd_main *rspamd_main)
  1218. {
  1219. #ifdef HAVE_SA_SIGINFO
  1220. struct sigaction sa;
  1221. #ifdef HAVE_SIGALTSTACK
  1222. stack_t ss;
  1223. memset (&ss, 0, sizeof ss);
  1224. /* Allocate special stack, NOT freed at the end so far */
  1225. ss.ss_size = MAX (SIGSTKSZ, 8192 * 4);
  1226. ss.ss_sp = g_malloc0 (ss.ss_size);
  1227. sigaltstack (&ss, NULL);
  1228. #endif
  1229. saved_main = rspamd_main;
  1230. sigemptyset (&sa.sa_mask);
  1231. sa.sa_sigaction = &rspamd_crash_sig_handler;
  1232. sa.sa_flags = SA_RESTART | SA_SIGINFO | SA_ONSTACK;
  1233. sigaction (SIGSEGV, &sa, NULL);
  1234. sigaction (SIGBUS, &sa, NULL);
  1235. sigaction (SIGABRT, &sa, NULL);
  1236. sigaction (SIGFPE, &sa, NULL);
  1237. sigaction (SIGSYS, &sa, NULL);
  1238. #endif
  1239. }
  1240. static void
  1241. rspamd_enable_accept_event (EV_P_ ev_timer *w, int revents)
  1242. {
  1243. struct rspamd_worker_accept_event *ac_ev =
  1244. (struct rspamd_worker_accept_event *)w->data;
  1245. ev_timer_stop (EV_A_ w);
  1246. ev_io_start (EV_A_ &ac_ev->accept_ev);
  1247. }
  1248. void
  1249. rspamd_worker_throttle_accept_events (gint sock, void *data)
  1250. {
  1251. struct rspamd_worker_accept_event *head, *cur;
  1252. const gdouble throttling = 0.5;
  1253. head = (struct rspamd_worker_accept_event *)data;
  1254. DL_FOREACH (head, cur) {
  1255. ev_io_stop (cur->event_loop, &cur->accept_ev);
  1256. cur->throttling_ev.data = cur;
  1257. ev_timer_init (&cur->throttling_ev, rspamd_enable_accept_event,
  1258. throttling, 0.0);
  1259. ev_timer_start (cur->event_loop, &cur->throttling_ev);
  1260. }
  1261. }
  1262. gboolean
  1263. rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
  1264. struct rspamd_worker *wrk,
  1265. int res)
  1266. {
  1267. gboolean need_refork = TRUE;
  1268. if (wrk->state != rspamd_worker_state_running || rspamd_main->wanna_die) {
  1269. /* Do not refork workers that are intended to be terminated */
  1270. need_refork = FALSE;
  1271. }
  1272. if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
  1273. /* Normal worker termination, do not fork one more */
  1274. if (wrk->hb.nbeats < 0 && rspamd_main->cfg->heartbeats_loss_max > 0 &&
  1275. -(wrk->hb.nbeats) >= rspamd_main->cfg->heartbeats_loss_max) {
  1276. msg_info_main ("%s process %P terminated normally, but lost %L "
  1277. "heartbeats, refork it",
  1278. g_quark_to_string (wrk->type),
  1279. wrk->pid,
  1280. -(wrk->hb.nbeats));
  1281. need_refork = TRUE;
  1282. }
  1283. else {
  1284. msg_info_main ("%s process %P terminated normally",
  1285. g_quark_to_string (wrk->type),
  1286. wrk->pid);
  1287. need_refork = FALSE;
  1288. }
  1289. }
  1290. else {
  1291. if (WIFSIGNALED (res)) {
  1292. #ifdef WCOREDUMP
  1293. if (WCOREDUMP (res)) {
  1294. msg_warn_main (
  1295. "%s process %P terminated abnormally by signal: %s"
  1296. " and created core file; please see Rspamd FAQ "
  1297. "to learn how to extract data from core file and "
  1298. "fill a bug report",
  1299. g_quark_to_string (wrk->type),
  1300. wrk->pid,
  1301. g_strsignal (WTERMSIG (res)));
  1302. }
  1303. else {
  1304. #ifdef HAVE_SYS_RESOURCE_H
  1305. struct rlimit rlmt;
  1306. (void) getrlimit (RLIMIT_CORE, &rlmt);
  1307. msg_warn_main (
  1308. "%s process %P terminated abnormally with exit code %d by "
  1309. "signal: %s"
  1310. " but NOT created core file (throttled=%s); "
  1311. "core file limits: %L current, %L max",
  1312. g_quark_to_string (wrk->type),
  1313. wrk->pid,
  1314. WEXITSTATUS (res),
  1315. g_strsignal (WTERMSIG (res)),
  1316. wrk->cores_throttled ? "yes" : "no",
  1317. (gint64) rlmt.rlim_cur,
  1318. (gint64) rlmt.rlim_max);
  1319. #else
  1320. msg_warn_main (
  1321. "%s process %P terminated abnormally with exit code %d by signal: %s"
  1322. " but NOT created core file (throttled=%s); ",
  1323. g_quark_to_string (wrk->type),
  1324. wrk->pid, WEXITSTATUS (res),
  1325. g_strsignal (WTERMSIG (res)),
  1326. wrk->cores_throttled ? "yes" : "no");
  1327. #endif
  1328. }
  1329. #else
  1330. msg_warn_main (
  1331. "%s process %P terminated abnormally with exit code %d by signal: %s",
  1332. g_quark_to_string (wrk->type),
  1333. wrk->pid, WEXITSTATUS (res),
  1334. g_strsignal (WTERMSIG (res)));
  1335. #endif
  1336. if (WTERMSIG (res) == SIGUSR2) {
  1337. /*
  1338. * It is actually race condition when not started process
  1339. * has been requested to be reloaded.
  1340. *
  1341. * We shouldn't refork on this
  1342. */
  1343. need_refork = FALSE;
  1344. }
  1345. }
  1346. else {
  1347. msg_warn_main ("%s process %P terminated abnormally "
  1348. "(but it was not killed by a signal) "
  1349. "with exit code %d",
  1350. g_quark_to_string (wrk->type),
  1351. wrk->pid,
  1352. WEXITSTATUS (res));
  1353. }
  1354. }
  1355. return need_refork;
  1356. }
  1357. #ifdef WITH_HYPERSCAN
  1358. gboolean
  1359. rspamd_worker_hyperscan_ready (struct rspamd_main *rspamd_main,
  1360. struct rspamd_worker *worker, gint fd,
  1361. gint attached_fd,
  1362. struct rspamd_control_command *cmd,
  1363. gpointer ud) {
  1364. struct rspamd_control_reply rep;
  1365. struct rspamd_re_cache *cache = worker->srv->cfg->re_cache;
  1366. memset (&rep, 0, sizeof (rep));
  1367. rep.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
  1368. if (!rspamd_re_cache_is_hs_loaded (cache) || cmd->cmd.hs_loaded.forced) {
  1369. msg_info ("loading hyperscan expressions after receiving compilation "
  1370. "notice: %s",
  1371. (!rspamd_re_cache_is_hs_loaded (cache)) ?
  1372. "new db" : "forced update");
  1373. rep.reply.hs_loaded.status = rspamd_re_cache_load_hyperscan (
  1374. worker->srv->cfg->re_cache, cmd->cmd.hs_loaded.cache_dir);
  1375. }
  1376. if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
  1377. msg_err ("cannot write reply to the control socket: %s",
  1378. strerror (errno));
  1379. }
  1380. return TRUE;
  1381. }
  1382. #endif /* With Hyperscan */
  1383. gboolean
  1384. rspamd_worker_check_context (gpointer ctx, guint64 magic)
  1385. {
  1386. struct rspamd_abstract_worker_ctx *actx = (struct rspamd_abstract_worker_ctx*)ctx;
  1387. return actx->magic == magic;
  1388. }
  1389. static gboolean
  1390. rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
  1391. struct rspamd_worker *worker, gint fd,
  1392. gint attached_fd,
  1393. struct rspamd_control_command *cmd,
  1394. gpointer ud)
  1395. {
  1396. struct rspamd_config *cfg = ud;
  1397. struct rspamd_worker_log_pipe *lp;
  1398. struct rspamd_control_reply rep;
  1399. memset (&rep, 0, sizeof (rep));
  1400. rep.type = RSPAMD_CONTROL_LOG_PIPE;
  1401. if (attached_fd != -1) {
  1402. lp = g_malloc0 (sizeof (*lp));
  1403. lp->fd = attached_fd;
  1404. lp->type = cmd->cmd.log_pipe.type;
  1405. DL_APPEND (cfg->log_pipes, lp);
  1406. msg_info ("added new log pipe");
  1407. }
  1408. else {
  1409. rep.reply.log_pipe.status = ENOENT;
  1410. msg_err ("cannot attach log pipe: invalid fd");
  1411. }
  1412. if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
  1413. msg_err ("cannot write reply to the control socket: %s",
  1414. strerror (errno));
  1415. }
  1416. return TRUE;
  1417. }
  1418. static gboolean
  1419. rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
  1420. struct rspamd_worker *worker, gint fd,
  1421. gint attached_fd,
  1422. struct rspamd_control_command *cmd,
  1423. gpointer ud)
  1424. {
  1425. struct rspamd_control_reply rep;
  1426. struct rspamd_monitored *m;
  1427. struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;
  1428. struct rspamd_config *cfg = ud;
  1429. memset (&rep, 0, sizeof (rep));
  1430. rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
  1431. if (cmd->cmd.monitored_change.sender != getpid ()) {
  1432. m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag);
  1433. if (m != NULL) {
  1434. rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
  1435. rep.reply.monitored_change.status = 1;
  1436. msg_info_config ("updated monitored status for %s: %s",
  1437. cmd->cmd.monitored_change.tag,
  1438. cmd->cmd.monitored_change.alive ? "alive" : "dead");
  1439. } else {
  1440. msg_err ("cannot find monitored by tag: %*s", 32,
  1441. cmd->cmd.monitored_change.tag);
  1442. rep.reply.monitored_change.status = 0;
  1443. }
  1444. }
  1445. if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
  1446. msg_err ("cannot write reply to the control socket: %s",
  1447. strerror (errno));
  1448. }
  1449. return TRUE;
  1450. }
  1451. void
  1452. rspamd_worker_init_scanner (struct rspamd_worker *worker,
  1453. struct ev_loop *ev_base,
  1454. struct rspamd_dns_resolver *resolver,
  1455. struct rspamd_lang_detector **plang_det)
  1456. {
  1457. rspamd_stat_init (worker->srv->cfg, ev_base);
  1458. #ifdef WITH_HYPERSCAN
  1459. rspamd_control_worker_add_cmd_handler (worker,
  1460. RSPAMD_CONTROL_HYPERSCAN_LOADED,
  1461. rspamd_worker_hyperscan_ready,
  1462. NULL);
  1463. #endif
  1464. rspamd_control_worker_add_cmd_handler (worker,
  1465. RSPAMD_CONTROL_LOG_PIPE,
  1466. rspamd_worker_log_pipe_handler,
  1467. worker->srv->cfg);
  1468. rspamd_control_worker_add_cmd_handler (worker,
  1469. RSPAMD_CONTROL_MONITORED_CHANGE,
  1470. rspamd_worker_monitored_handler,
  1471. worker->srv->cfg);
  1472. *plang_det = worker->srv->cfg->lang_det;
  1473. }
  1474. void
  1475. rspamd_controller_store_saved_stats (struct rspamd_main *rspamd_main,
  1476. struct rspamd_config *cfg)
  1477. {
  1478. struct rspamd_stat *stat;
  1479. ucl_object_t *top, *sub;
  1480. struct ucl_emitter_functions *efuncs;
  1481. gint i, fd;
  1482. gchar fpath[PATH_MAX];
  1483. if (cfg->stats_file == NULL) {
  1484. return;
  1485. }
  1486. rspamd_snprintf (fpath, sizeof (fpath), "%s.XXXXXXXX", cfg->stats_file);
  1487. fd = g_mkstemp_full (fpath, O_WRONLY|O_TRUNC, 00644);
  1488. if (fd == -1) {
  1489. msg_err_config ("cannot open for writing controller stats from %s: %s",
  1490. fpath, strerror (errno));
  1491. return;
  1492. }
  1493. stat = rspamd_main->stat;
  1494. top = ucl_object_typed_new (UCL_OBJECT);
  1495. ucl_object_insert_key (top, ucl_object_fromint (
  1496. stat->messages_scanned), "scanned", 0, false);
  1497. ucl_object_insert_key (top, ucl_object_fromint (
  1498. stat->messages_learned), "learned", 0, false);
  1499. if (stat->messages_scanned > 0) {
  1500. sub = ucl_object_typed_new (UCL_OBJECT);
  1501. for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
  1502. ucl_object_insert_key (sub,
  1503. ucl_object_fromint (stat->actions_stat[i]),
  1504. rspamd_action_to_str (i), 0, false);
  1505. }
  1506. ucl_object_insert_key (top, sub, "actions", 0, false);
  1507. }
  1508. ucl_object_insert_key (top,
  1509. ucl_object_fromint (stat->connections_count),
  1510. "connections", 0, false);
  1511. ucl_object_insert_key (top,
  1512. ucl_object_fromint (stat->control_connections_count),
  1513. "control_connections", 0, false);
  1514. efuncs = ucl_object_emit_fd_funcs (fd);
  1515. if (!ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT,
  1516. efuncs, NULL)) {
  1517. msg_err_config ("cannot write stats to %s: %s",
  1518. fpath, strerror (errno));
  1519. unlink (fpath);
  1520. }
  1521. else {
  1522. if (rename (fpath, cfg->stats_file) == -1) {
  1523. msg_err_config ("cannot rename stats from %s to %s: %s",
  1524. fpath, cfg->stats_file, strerror (errno));
  1525. }
  1526. }
  1527. ucl_object_unref (top);
  1528. close (fd);
  1529. ucl_object_emit_funcs_free (efuncs);
  1530. }
  1531. static ev_timer rrd_timer;
  1532. void
  1533. rspamd_controller_on_terminate (struct rspamd_worker *worker,
  1534. struct rspamd_rrd_file *rrd)
  1535. {
  1536. struct rspamd_abstract_worker_ctx *ctx;
  1537. ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx;
  1538. rspamd_controller_store_saved_stats (worker->srv, worker->srv->cfg);
  1539. if (rrd) {
  1540. ev_timer_stop (ctx->event_loop, &rrd_timer);
  1541. msg_info ("closing rrd file: %s", rrd->filename);
  1542. rspamd_rrd_close (rrd);
  1543. }
  1544. }
  1545. static void
  1546. rspamd_controller_load_saved_stats (struct rspamd_main *rspamd_main,
  1547. struct rspamd_config *cfg)
  1548. {
  1549. struct ucl_parser *parser;
  1550. ucl_object_t *obj;
  1551. const ucl_object_t *elt, *subelt;
  1552. struct rspamd_stat *stat, stat_copy;
  1553. gint i;
  1554. if (cfg->stats_file == NULL) {
  1555. return;
  1556. }
  1557. if (access (cfg->stats_file, R_OK) == -1) {
  1558. msg_err_config ("cannot load controller stats from %s: %s",
  1559. cfg->stats_file, strerror (errno));
  1560. return;
  1561. }
  1562. parser = ucl_parser_new (0);
  1563. if (!ucl_parser_add_file (parser, cfg->stats_file)) {
  1564. msg_err_config ("cannot parse controller stats from %s: %s",
  1565. cfg->stats_file, ucl_parser_get_error (parser));
  1566. ucl_parser_free (parser);
  1567. return;
  1568. }
  1569. obj = ucl_parser_get_object (parser);
  1570. ucl_parser_free (parser);
  1571. stat = rspamd_main->stat;
  1572. memcpy (&stat_copy, stat, sizeof (stat_copy));
  1573. elt = ucl_object_lookup (obj, "scanned");
  1574. if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
  1575. stat_copy.messages_scanned = ucl_object_toint (elt);
  1576. }
  1577. elt = ucl_object_lookup (obj, "learned");
  1578. if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
  1579. stat_copy.messages_learned = ucl_object_toint (elt);
  1580. }
  1581. elt = ucl_object_lookup (obj, "actions");
  1582. if (elt != NULL) {
  1583. for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
  1584. subelt = ucl_object_lookup (elt, rspamd_action_to_str (i));
  1585. if (subelt && ucl_object_type (subelt) == UCL_INT) {
  1586. stat_copy.actions_stat[i] = ucl_object_toint (subelt);
  1587. }
  1588. }
  1589. }
  1590. elt = ucl_object_lookup (obj, "connections_count");
  1591. if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
  1592. stat_copy.connections_count = ucl_object_toint (elt);
  1593. }
  1594. elt = ucl_object_lookup (obj, "control_connections_count");
  1595. if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
  1596. stat_copy.control_connections_count = ucl_object_toint (elt);
  1597. }
  1598. ucl_object_unref (obj);
  1599. memcpy (stat, &stat_copy, sizeof (stat_copy));
  1600. }
  1601. struct rspamd_controller_periodics_cbdata {
  1602. struct rspamd_worker *worker;
  1603. struct rspamd_rrd_file *rrd;
  1604. struct rspamd_stat *stat;
  1605. ev_timer save_stats_event;
  1606. };
  1607. static void
  1608. rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents)
  1609. {
  1610. struct rspamd_controller_periodics_cbdata *cbd =
  1611. (struct rspamd_controller_periodics_cbdata *)w->data;
  1612. struct rspamd_stat *stat;
  1613. GArray ar;
  1614. gdouble points[METRIC_ACTION_MAX];
  1615. GError *err = NULL;
  1616. guint i;
  1617. g_assert (cbd->rrd != NULL);
  1618. stat = cbd->stat;
  1619. for (i = METRIC_ACTION_REJECT; i < METRIC_ACTION_MAX; i ++) {
  1620. points[i] = stat->actions_stat[i];
  1621. }
  1622. ar.data = (gchar *)points;
  1623. ar.len = sizeof (points);
  1624. if (!rspamd_rrd_add_record (cbd->rrd, &ar, rspamd_get_calendar_ticks (),
  1625. &err)) {
  1626. msg_err ("cannot update rrd file: %e", err);
  1627. g_error_free (err);
  1628. }
  1629. /* Plan new event */
  1630. ev_timer_again (EV_A_ w);
  1631. }
  1632. static void
  1633. rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents)
  1634. {
  1635. struct rspamd_controller_periodics_cbdata *cbd =
  1636. (struct rspamd_controller_periodics_cbdata *)w->data;
  1637. rspamd_controller_store_saved_stats (cbd->worker->srv, cbd->worker->srv->cfg);
  1638. ev_timer_again (EV_A_ w);
  1639. }
  1640. void
  1641. rspamd_worker_init_controller (struct rspamd_worker *worker,
  1642. struct rspamd_rrd_file **prrd)
  1643. {
  1644. struct rspamd_abstract_worker_ctx *ctx;
  1645. static const ev_tstamp rrd_update_time = 1.0;
  1646. ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx;
  1647. rspamd_controller_load_saved_stats (worker->srv, worker->srv->cfg);
  1648. if (worker->index == 0) {
  1649. /* Enable periodics and other stuff */
  1650. static struct rspamd_controller_periodics_cbdata cbd;
  1651. const ev_tstamp save_stats_interval = 60; /* 1 minute */
  1652. memset (&cbd, 0, sizeof (cbd));
  1653. cbd.save_stats_event.data = &cbd;
  1654. cbd.worker = worker;
  1655. cbd.stat = worker->srv->stat;
  1656. ev_timer_init (&cbd.save_stats_event,
  1657. rspamd_controller_stats_save_periodic,
  1658. save_stats_interval, save_stats_interval);
  1659. ev_timer_start (ctx->event_loop, &cbd.save_stats_event);
  1660. rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
  1661. ctx->resolver, worker,
  1662. RSPAMD_MAP_WATCH_PRIMARY_CONTROLLER);
  1663. if (prrd != NULL) {
  1664. if (ctx->cfg->rrd_file && worker->index == 0) {
  1665. GError *rrd_err = NULL;
  1666. *prrd = rspamd_rrd_file_default (ctx->cfg->rrd_file, &rrd_err);
  1667. if (*prrd) {
  1668. cbd.rrd = *prrd;
  1669. rrd_timer.data = &cbd;
  1670. ev_timer_init (&rrd_timer, rspamd_controller_rrd_update,
  1671. rrd_update_time, rrd_update_time);
  1672. ev_timer_start (ctx->event_loop, &rrd_timer);
  1673. }
  1674. else if (rrd_err) {
  1675. msg_err ("cannot load rrd from %s: %e", ctx->cfg->rrd_file,
  1676. rrd_err);
  1677. g_error_free (rrd_err);
  1678. }
  1679. else {
  1680. msg_err ("cannot load rrd from %s: unknown error",
  1681. ctx->cfg->rrd_file);
  1682. }
  1683. }
  1684. else {
  1685. *prrd = NULL;
  1686. }
  1687. }
  1688. if (!ctx->cfg->disable_monitored) {
  1689. rspamd_worker_init_monitored (worker,
  1690. ctx->event_loop, ctx->resolver);
  1691. }
  1692. }
  1693. else {
  1694. rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
  1695. ctx->resolver, worker, RSPAMD_MAP_WATCH_SCANNER);
  1696. }
  1697. }