You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

worker.c 18KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687
  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. /*
  17. * Rspamd worker implementation
  18. */
  19. #include "config.h"
  20. #include "libutil/util.h"
  21. #include "libutil/map.h"
  22. #include "libutil/upstream.h"
  23. #include "libserver/protocol.h"
  24. #include "libserver/cfg_file.h"
  25. #include "libserver/url.h"
  26. #include "libserver/dns.h"
  27. #include "libmime/message.h"
  28. #include "rspamd.h"
  29. #include "keypairs_cache.h"
  30. #include "libstat/stat_api.h"
  31. #include "libserver/worker_util.h"
  32. #include "libserver/rspamd_control.h"
  33. #include "worker_private.h"
  34. #include "utlist.h"
  35. #include "libutil/http_private.h"
  36. #include "libmime/lang_detection.h"
  37. #include <math.h>
  38. #include <src/libserver/cfg_file_private.h>
  39. #include "unix-std.h"
  40. #include "lua/lua_common.h"
  41. /* 60 seconds for worker's IO */
  42. #define DEFAULT_WORKER_IO_TIMEOUT 60.0
  43. gpointer init_worker (struct rspamd_config *cfg);
  44. void start_worker (struct rspamd_worker *worker);
  45. worker_t normal_worker = {
  46. "normal", /* Name */
  47. init_worker, /* Init function */
  48. start_worker, /* Start function */
  49. RSPAMD_WORKER_HAS_SOCKET|RSPAMD_WORKER_KILLABLE|RSPAMD_WORKER_SCANNER,
  50. RSPAMD_WORKER_SOCKET_TCP, /* TCP socket */
  51. RSPAMD_WORKER_VER /* Version info */
  52. };
  53. #define msg_err_ctx(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
  54. "controller", ctx->cfg->cfg_pool->tag.uid, \
  55. G_STRFUNC, \
  56. __VA_ARGS__)
  57. #define msg_warn_ctx(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
  58. "controller", ctx->cfg->cfg_pool->tag.uid, \
  59. G_STRFUNC, \
  60. __VA_ARGS__)
  61. #define msg_info_ctx(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
  62. "controller", ctx->cfg->cfg_pool->tag.uid, \
  63. G_STRFUNC, \
  64. __VA_ARGS__)
  65. static gboolean
  66. rspamd_worker_finalize (gpointer user_data)
  67. {
  68. struct rspamd_task *task = user_data;
  69. if (!(task->flags & RSPAMD_TASK_FLAG_PROCESSING)) {
  70. msg_info_task ("finishing actions has been processed, terminating");
  71. /* ev_break (task->event_loop, EVBREAK_ALL); */
  72. rspamd_session_destroy (task->s);
  73. return TRUE;
  74. }
  75. return FALSE;
  76. }
  77. static gboolean
  78. rspamd_worker_call_finish_handlers (struct rspamd_worker *worker)
  79. {
  80. struct rspamd_task *task;
  81. struct rspamd_config *cfg = worker->srv->cfg;
  82. struct rspamd_abstract_worker_ctx *ctx;
  83. struct rspamd_config_cfg_lua_script *sc;
  84. if (cfg->on_term_scripts) {
  85. ctx = worker->ctx;
  86. /* Create a fake task object for async events */
  87. task = rspamd_task_new (worker, cfg, NULL, NULL, ctx->event_loop);
  88. task->resolver = ctx->resolver;
  89. task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
  90. task->s = rspamd_session_create (task->task_pool,
  91. rspamd_worker_finalize,
  92. NULL,
  93. (event_finalizer_t) rspamd_task_free,
  94. task);
  95. DL_FOREACH (cfg->on_term_scripts, sc) {
  96. lua_call_finish_script (sc, task);
  97. }
  98. task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING;
  99. if (rspamd_session_pending (task->s)) {
  100. return TRUE;
  101. }
  102. }
  103. return FALSE;
  104. }
  105. /*
  106. * Reduce number of tasks proceeded
  107. */
  108. static void
  109. reduce_tasks_count (gpointer arg)
  110. {
  111. struct rspamd_worker *worker = arg;
  112. worker->nconns --;
  113. if (worker->wanna_die && worker->nconns == 0) {
  114. msg_info ("performing finishing actions");
  115. rspamd_worker_call_finish_handlers (worker);
  116. }
  117. }
  118. void
  119. rspamd_task_timeout (EV_P_ ev_timer *w, int revents)
  120. {
  121. struct rspamd_task *task = (struct rspamd_task *)w->data;
  122. if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) {
  123. msg_info_task ("processing of task time out: %.1f second spent; forced processing",
  124. ev_now (task->event_loop) - task->task_timestamp);
  125. if (task->cfg->soft_reject_on_timeout) {
  126. struct rspamd_action *action, *soft_reject;
  127. action = rspamd_check_action_metric (task);
  128. if (action->action_type != METRIC_ACTION_REJECT) {
  129. soft_reject = rspamd_config_get_action_by_type (task->cfg,
  130. METRIC_ACTION_SOFT_REJECT);
  131. rspamd_add_passthrough_result (task,
  132. soft_reject,
  133. 0,
  134. NAN,
  135. "timeout processing message",
  136. "task timeout",
  137. 0);
  138. ucl_object_replace_key (task->messages,
  139. ucl_object_fromstring_common ("timeout processing message",
  140. 0, UCL_STRING_RAW),
  141. "smtp_message", 0,
  142. false);
  143. }
  144. }
  145. ev_timer_again (EV_A_ w);
  146. task->processed_stages |= RSPAMD_TASK_STAGE_FILTERS;
  147. rspamd_session_cleanup (task->s);
  148. rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
  149. rspamd_session_pending (task->s);
  150. }
  151. else {
  152. /* Postprocessing timeout */
  153. msg_info_task ("post-processing of task time out: %.1f second spent; forced processing",
  154. ev_now (task->event_loop) - task->task_timestamp);
  155. if (task->cfg->soft_reject_on_timeout) {
  156. struct rspamd_action *action, *soft_reject;
  157. action = rspamd_check_action_metric (task);
  158. if (action->action_type != METRIC_ACTION_REJECT) {
  159. soft_reject = rspamd_config_get_action_by_type (task->cfg,
  160. METRIC_ACTION_SOFT_REJECT);
  161. rspamd_add_passthrough_result (task,
  162. soft_reject,
  163. 0,
  164. NAN,
  165. "timeout post-processing message",
  166. "task timeout",
  167. 0);
  168. ucl_object_replace_key (task->messages,
  169. ucl_object_fromstring_common ("timeout post-processing message",
  170. 0, UCL_STRING_RAW),
  171. "smtp_message", 0,
  172. false);
  173. }
  174. }
  175. ev_timer_stop (EV_A_ w);
  176. task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
  177. rspamd_session_cleanup (task->s);
  178. rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
  179. rspamd_session_pending (task->s);
  180. }
  181. }
  182. void
  183. rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents)
  184. {
  185. struct rspamd_task *task = (struct rspamd_task *)w->data;
  186. gchar fake_buf[1024];
  187. gssize r;
  188. r = read (w->fd, fake_buf, sizeof (fake_buf));
  189. if (r > 0) {
  190. msg_warn_task ("received extra data after task is loaded, ignoring");
  191. }
  192. else {
  193. if (r == 0) {
  194. /*
  195. * Poor man approach, that might break things in case of
  196. * shutdown (SHUT_WR) but sockets are so bad that there's no
  197. * reliable way to distinguish between shutdown(SHUT_WR) and
  198. * close.
  199. */
  200. if (task->cmd != CMD_CHECK_V2 && task->cfg->enable_shutdown_workaround) {
  201. msg_info_task ("workaround for shutdown enabled, please update "
  202. "your client, this support might be removed in future");
  203. shutdown (w->fd, SHUT_RD);
  204. ev_io_stop (task->event_loop, &task->guard_ev);
  205. }
  206. else {
  207. msg_err_task ("the peer has closed connection unexpectedly");
  208. rspamd_session_destroy (task->s);
  209. }
  210. }
  211. else if (errno != EAGAIN) {
  212. msg_err_task ("the peer has closed connection unexpectedly: %s",
  213. strerror (errno));
  214. rspamd_session_destroy (task->s);
  215. }
  216. else {
  217. return;
  218. }
  219. }
  220. }
  221. static gint
  222. rspamd_worker_body_handler (struct rspamd_http_connection *conn,
  223. struct rspamd_http_message *msg,
  224. const gchar *chunk, gsize len)
  225. {
  226. struct rspamd_task *task = (struct rspamd_task *) conn->ud;
  227. struct rspamd_worker_ctx *ctx;
  228. ctx = task->worker->ctx;
  229. if (!rspamd_protocol_handle_request (task, msg)) {
  230. msg_err_task ("cannot handle request: %e", task->err);
  231. task->flags |= RSPAMD_TASK_FLAG_SKIP;
  232. }
  233. else {
  234. if (task->cmd == CMD_PING) {
  235. task->flags |= RSPAMD_TASK_FLAG_SKIP;
  236. }
  237. else {
  238. if (!rspamd_task_load_message (task, msg, chunk, len)) {
  239. msg_err_task ("cannot load message: %e", task->err);
  240. task->flags |= RSPAMD_TASK_FLAG_SKIP;
  241. }
  242. }
  243. }
  244. /* Set global timeout for the task */
  245. if (ctx->task_timeout > 0.0) {
  246. task->timeout_ev.data = task;
  247. ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
  248. ctx->task_timeout,
  249. ctx->task_timeout);
  250. ev_timer_start (task->event_loop, &task->timeout_ev);
  251. }
  252. /* Set socket guard */
  253. task->guard_ev.data = task;
  254. ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ);
  255. ev_io_start (task->event_loop, &task->guard_ev);
  256. rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
  257. return 0;
  258. }
  259. static void
  260. rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
  261. {
  262. struct rspamd_task *task = (struct rspamd_task *) conn->ud;
  263. struct rspamd_http_message *msg;
  264. rspamd_fstring_t *reply;
  265. msg_info_task ("abnormally closing connection from: %s, error: %e",
  266. rspamd_inet_address_to_string (task->client_addr), err);
  267. if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
  268. /* Terminate session immediately */
  269. rspamd_session_destroy (task->s);
  270. }
  271. else {
  272. task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED;
  273. msg = rspamd_http_new_message (HTTP_RESPONSE);
  274. if (err) {
  275. msg->status = rspamd_fstring_new_init (err->message,
  276. strlen (err->message));
  277. msg->code = err->code;
  278. }
  279. else {
  280. msg->status = rspamd_fstring_new_init ("Internal error",
  281. strlen ("Internal error"));
  282. msg->code = 500;
  283. }
  284. msg->date = time (NULL);
  285. reply = rspamd_fstring_sized_new (msg->status->len + 16);
  286. rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status);
  287. rspamd_http_message_set_body_from_fstring_steal (msg, reply);
  288. rspamd_http_connection_reset (task->http_conn);
  289. rspamd_http_connection_write_message (task->http_conn,
  290. msg,
  291. NULL,
  292. "application/json",
  293. task,
  294. 1.0);
  295. }
  296. }
  297. static gint
  298. rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
  299. struct rspamd_http_message *msg)
  300. {
  301. struct rspamd_task *task = (struct rspamd_task *) conn->ud;
  302. if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
  303. /* We are done here */
  304. msg_debug_task ("normally closing connection from: %s",
  305. rspamd_inet_address_to_string (task->client_addr));
  306. rspamd_session_destroy (task->s);
  307. }
  308. else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) {
  309. rspamd_session_pending (task->s);
  310. }
  311. return 0;
  312. }
  313. /*
  314. * Accept new connection and construct task
  315. */
  316. static void
  317. accept_socket (EV_P_ ev_io *w, int revents)
  318. {
  319. struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
  320. struct rspamd_worker_ctx *ctx;
  321. struct rspamd_task *task;
  322. rspamd_inet_addr_t *addr;
  323. gint nfd, http_opts = 0;
  324. ctx = worker->ctx;
  325. if (ctx->max_tasks != 0 && worker->nconns > ctx->max_tasks) {
  326. msg_info_ctx ("current tasks is now: %uD while maximum is: %uD",
  327. worker->nconns,
  328. ctx->max_tasks);
  329. return;
  330. }
  331. if ((nfd =
  332. rspamd_accept_from_socket (w->fd, &addr,
  333. rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
  334. msg_warn_ctx ("accept failed: %s", strerror (errno));
  335. return;
  336. }
  337. /* Check for EAGAIN */
  338. if (nfd == 0) {
  339. return;
  340. }
  341. task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop);
  342. msg_info_task ("accepted connection from %s port %d, task ptr: %p",
  343. rspamd_inet_address_to_string (addr),
  344. rspamd_inet_address_get_port (addr),
  345. task);
  346. /* Copy some variables */
  347. if (ctx->is_mime) {
  348. task->flags |= RSPAMD_TASK_FLAG_MIME;
  349. }
  350. else {
  351. task->flags &= ~RSPAMD_TASK_FLAG_MIME;
  352. }
  353. task->sock = nfd;
  354. task->client_addr = addr;
  355. worker->srv->stat->connections_count++;
  356. task->resolver = ctx->resolver;
  357. /* TODO: allow to disable autolearn in protocol */
  358. task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
  359. if (ctx->encrypted_only && !rspamd_inet_address_is_local (addr, FALSE)) {
  360. http_opts = RSPAMD_HTTP_REQUIRE_ENCRYPTION;
  361. }
  362. task->http_conn = rspamd_http_connection_new_server (
  363. ctx->http_ctx,
  364. nfd,
  365. rspamd_worker_body_handler,
  366. rspamd_worker_error_handler,
  367. rspamd_worker_finish_handler,
  368. http_opts);
  369. rspamd_http_connection_set_max_size (task->http_conn, task->cfg->max_message);
  370. worker->nconns++;
  371. rspamd_mempool_add_destructor (task->task_pool,
  372. (rspamd_mempool_destruct_t)reduce_tasks_count, worker);
  373. /* Set up async session */
  374. task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
  375. rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task);
  376. if (ctx->key) {
  377. rspamd_http_connection_set_key (task->http_conn, ctx->key);
  378. }
  379. rspamd_http_connection_read_message (task->http_conn,
  380. task,
  381. ctx->timeout);
  382. }
  383. static gboolean
  384. rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
  385. struct rspamd_worker *worker, gint fd,
  386. gint attached_fd,
  387. struct rspamd_control_command *cmd,
  388. gpointer ud)
  389. {
  390. struct rspamd_config *cfg = ud;
  391. struct rspamd_worker_log_pipe *lp;
  392. struct rspamd_control_reply rep;
  393. memset (&rep, 0, sizeof (rep));
  394. rep.type = RSPAMD_CONTROL_LOG_PIPE;
  395. if (attached_fd != -1) {
  396. lp = g_malloc0 (sizeof (*lp));
  397. lp->fd = attached_fd;
  398. lp->type = cmd->cmd.log_pipe.type;
  399. DL_APPEND (cfg->log_pipes, lp);
  400. msg_info ("added new log pipe");
  401. }
  402. else {
  403. rep.reply.log_pipe.status = ENOENT;
  404. msg_err ("cannot attach log pipe: invalid fd");
  405. }
  406. if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
  407. msg_err ("cannot write reply to the control socket: %s",
  408. strerror (errno));
  409. }
  410. return TRUE;
  411. }
  412. static gboolean
  413. rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
  414. struct rspamd_worker *worker, gint fd,
  415. gint attached_fd,
  416. struct rspamd_control_command *cmd,
  417. gpointer ud)
  418. {
  419. struct rspamd_control_reply rep;
  420. struct rspamd_monitored *m;
  421. struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;
  422. struct rspamd_config *cfg = ud;
  423. memset (&rep, 0, sizeof (rep));
  424. rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
  425. if (cmd->cmd.monitored_change.sender != getpid ()) {
  426. m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag);
  427. if (m != NULL) {
  428. rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
  429. rep.reply.monitored_change.status = 1;
  430. msg_info_config ("updated monitored status for %s: %s",
  431. cmd->cmd.monitored_change.tag,
  432. cmd->cmd.monitored_change.alive ? "alive" : "dead");
  433. } else {
  434. msg_err ("cannot find monitored by tag: %*s", 32,
  435. cmd->cmd.monitored_change.tag);
  436. rep.reply.monitored_change.status = 0;
  437. }
  438. }
  439. if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
  440. msg_err ("cannot write reply to the control socket: %s",
  441. strerror (errno));
  442. }
  443. return TRUE;
  444. }
  445. gpointer
  446. init_worker (struct rspamd_config *cfg)
  447. {
  448. struct rspamd_worker_ctx *ctx;
  449. GQuark type;
  450. type = g_quark_try_string ("normal");
  451. ctx = rspamd_mempool_alloc0 (cfg->cfg_pool,
  452. sizeof (struct rspamd_worker_ctx));
  453. ctx->magic = rspamd_worker_magic;
  454. ctx->is_mime = TRUE;
  455. ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT;
  456. ctx->cfg = cfg;
  457. ctx->task_timeout = NAN;
  458. rspamd_rcl_register_worker_option (cfg,
  459. type,
  460. "mime",
  461. rspamd_rcl_parse_struct_boolean,
  462. ctx,
  463. G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime),
  464. 0,
  465. "Set to `false` if this worker is intended to work with non-MIME messages");
  466. rspamd_rcl_register_worker_option (cfg,
  467. type,
  468. "encrypted_only",
  469. rspamd_rcl_parse_struct_boolean,
  470. ctx,
  471. G_STRUCT_OFFSET (struct rspamd_worker_ctx, encrypted_only),
  472. 0,
  473. "Allow only encrypted connections");
  474. rspamd_rcl_register_worker_option (cfg,
  475. type,
  476. "timeout",
  477. rspamd_rcl_parse_struct_time,
  478. ctx,
  479. G_STRUCT_OFFSET (struct rspamd_worker_ctx,
  480. timeout),
  481. RSPAMD_CL_FLAG_TIME_FLOAT,
  482. "Protocol IO timeout");
  483. rspamd_rcl_register_worker_option (cfg,
  484. type,
  485. "task_timeout",
  486. rspamd_rcl_parse_struct_time,
  487. ctx,
  488. G_STRUCT_OFFSET (struct rspamd_worker_ctx,
  489. task_timeout),
  490. RSPAMD_CL_FLAG_TIME_FLOAT,
  491. "Maximum task processing time, default: 8.0 seconds");
  492. rspamd_rcl_register_worker_option (cfg,
  493. type,
  494. "max_tasks",
  495. rspamd_rcl_parse_struct_integer,
  496. ctx,
  497. G_STRUCT_OFFSET (struct rspamd_worker_ctx,
  498. max_tasks),
  499. RSPAMD_CL_FLAG_INT_32,
  500. "Maximum count of parallel tasks processed by a single worker process");
  501. rspamd_rcl_register_worker_option (cfg,
  502. type,
  503. "keypair",
  504. rspamd_rcl_parse_struct_keypair,
  505. ctx,
  506. G_STRUCT_OFFSET (struct rspamd_worker_ctx,
  507. key),
  508. 0,
  509. "Encryption keypair");
  510. return ctx;
  511. }
  512. static gboolean
  513. rspamd_worker_on_terminate (struct rspamd_worker *worker)
  514. {
  515. if (worker->nconns == 0) {
  516. msg_info ("performing finishing actions");
  517. if (rspamd_worker_call_finish_handlers (worker)) {
  518. return TRUE;
  519. }
  520. }
  521. return FALSE;
  522. }
  523. void
  524. rspamd_worker_init_scanner (struct rspamd_worker *worker,
  525. struct ev_loop *ev_base,
  526. struct rspamd_dns_resolver *resolver,
  527. struct rspamd_lang_detector **plang_det)
  528. {
  529. rspamd_stat_init (worker->srv->cfg, ev_base);
  530. g_ptr_array_add (worker->finish_actions,
  531. (gpointer) rspamd_worker_on_terminate);
  532. #ifdef WITH_HYPERSCAN
  533. rspamd_control_worker_add_cmd_handler (worker,
  534. RSPAMD_CONTROL_HYPERSCAN_LOADED,
  535. rspamd_worker_hyperscan_ready,
  536. NULL);
  537. #endif
  538. rspamd_control_worker_add_cmd_handler (worker,
  539. RSPAMD_CONTROL_LOG_PIPE,
  540. rspamd_worker_log_pipe_handler,
  541. worker->srv->cfg);
  542. rspamd_control_worker_add_cmd_handler (worker,
  543. RSPAMD_CONTROL_MONITORED_CHANGE,
  544. rspamd_worker_monitored_handler,
  545. worker->srv->cfg);
  546. *plang_det = worker->srv->cfg->lang_det;
  547. }
  548. /*
  549. * Start worker process
  550. */
  551. void
  552. start_worker (struct rspamd_worker *worker)
  553. {
  554. struct rspamd_worker_ctx *ctx = worker->ctx;
  555. g_assert (rspamd_worker_check_context (worker->ctx, rspamd_worker_magic));
  556. ctx->cfg = worker->srv->cfg;
  557. ctx->event_loop = rspamd_prepare_worker (worker, "normal", accept_socket);
  558. rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop,
  559. worker);
  560. if (isnan (ctx->task_timeout)) {
  561. if (isnan (ctx->cfg->task_timeout)) {
  562. ctx->task_timeout = 0;
  563. }
  564. else {
  565. ctx->task_timeout = ctx->cfg->task_timeout;
  566. }
  567. }
  568. ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
  569. ctx->event_loop,
  570. worker->srv->cfg);
  571. rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
  572. rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
  573. ctx->event_loop, ctx->resolver->r);
  574. ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
  575. ctx->cfg->ups_ctx);
  576. rspamd_mempool_add_destructor (ctx->cfg->cfg_pool,
  577. (rspamd_mempool_destruct_t)rspamd_http_context_free,
  578. ctx->http_ctx);
  579. rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
  580. &ctx->lang_det);
  581. rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
  582. worker);
  583. ev_loop (ctx->event_loop, 0);
  584. rspamd_worker_block_signals ();
  585. rspamd_stat_close ();
  586. REF_RELEASE (ctx->cfg);
  587. rspamd_log_close (worker->srv->logger, TRUE);
  588. exit (EXIT_SUCCESS);
  589. }