You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

worker.c 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. /*
  17. * Rspamd worker implementation
  18. */
  19. #include "config.h"
  20. #include "libutil/util.h"
  21. #include "libutil/map.h"
  22. #include "libutil/upstream.h"
  23. #include "libserver/protocol.h"
  24. #include "libserver/cfg_file.h"
  25. #include "libserver/url.h"
  26. #include "libserver/dns.h"
  27. #include "libmime/message.h"
  28. #include "rspamd.h"
  29. #include "libstat/stat_api.h"
  30. #include "libserver/worker_util.h"
  31. #include "libserver/rspamd_control.h"
  32. #include "worker_private.h"
  33. #include "libutil/http_private.h"
  34. #include "libserver/cfg_file_private.h"
  35. #include <math.h>
  36. #include "unix-std.h"
  37. #include "lua/lua_common.h"
  38. /* 60 seconds for worker's IO */
  39. #define DEFAULT_WORKER_IO_TIMEOUT 60.0
  40. gpointer init_worker (struct rspamd_config *cfg);
  41. void start_worker (struct rspamd_worker *worker);
  42. worker_t normal_worker = {
  43. "normal", /* Name */
  44. init_worker, /* Init function */
  45. start_worker, /* Start function */
  46. RSPAMD_WORKER_HAS_SOCKET|RSPAMD_WORKER_KILLABLE|RSPAMD_WORKER_SCANNER,
  47. RSPAMD_WORKER_SOCKET_TCP, /* TCP socket */
  48. RSPAMD_WORKER_VER /* Version info */
  49. };
  50. #define msg_err_ctx(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
  51. "worker", ctx->cfg->cfg_pool->tag.uid, \
  52. G_STRFUNC, \
  53. __VA_ARGS__)
  54. #define msg_warn_ctx(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
  55. "worker", ctx->cfg->cfg_pool->tag.uid, \
  56. G_STRFUNC, \
  57. __VA_ARGS__)
  58. #define msg_info_ctx(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
  59. "worker", ctx->cfg->cfg_pool->tag.uid, \
  60. G_STRFUNC, \
  61. __VA_ARGS__)
  62. /*
  63. * Reduce number of tasks proceeded
  64. */
  65. static void
  66. reduce_tasks_count (gpointer arg)
  67. {
  68. struct rspamd_worker *worker = arg;
  69. worker->nconns --;
  70. if (worker->state == rspamd_worker_wait_connections && worker->nconns == 0) {
  71. worker->state = rspamd_worker_wait_final_scripts;
  72. msg_info ("performing finishing actions");
  73. if (rspamd_worker_call_finish_handlers (worker)) {
  74. worker->state = rspamd_worker_wait_final_scripts;
  75. }
  76. else {
  77. worker->state = rspamd_worker_wanna_die;
  78. }
  79. }
  80. else if (worker->state != rspamd_worker_state_running) {
  81. worker->state = rspamd_worker_wait_connections;
  82. }
  83. }
  84. void
  85. rspamd_task_timeout (EV_P_ ev_timer *w, int revents)
  86. {
  87. struct rspamd_task *task = (struct rspamd_task *)w->data;
  88. if (!(task->processed_stages & RSPAMD_TASK_STAGE_FILTERS)) {
  89. msg_info_task ("processing of task time out: %.1f second spent; forced processing",
  90. ev_now (task->event_loop) - task->task_timestamp);
  91. if (task->cfg->soft_reject_on_timeout) {
  92. struct rspamd_action *action, *soft_reject;
  93. action = rspamd_check_action_metric (task);
  94. if (action->action_type != METRIC_ACTION_REJECT) {
  95. soft_reject = rspamd_config_get_action_by_type (task->cfg,
  96. METRIC_ACTION_SOFT_REJECT);
  97. rspamd_add_passthrough_result (task,
  98. soft_reject,
  99. 0,
  100. NAN,
  101. "timeout processing message",
  102. "task timeout",
  103. 0);
  104. ucl_object_replace_key (task->messages,
  105. ucl_object_fromstring_common ("timeout processing message",
  106. 0, UCL_STRING_RAW),
  107. "smtp_message", 0,
  108. false);
  109. }
  110. }
  111. ev_timer_again (EV_A_ w);
  112. task->processed_stages |= RSPAMD_TASK_STAGE_FILTERS;
  113. rspamd_session_cleanup (task->s);
  114. rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
  115. rspamd_session_pending (task->s);
  116. }
  117. else {
  118. /* Postprocessing timeout */
  119. msg_info_task ("post-processing of task time out: %.1f second spent; forced processing",
  120. ev_now (task->event_loop) - task->task_timestamp);
  121. if (task->cfg->soft_reject_on_timeout) {
  122. struct rspamd_action *action, *soft_reject;
  123. action = rspamd_check_action_metric (task);
  124. if (action->action_type != METRIC_ACTION_REJECT) {
  125. soft_reject = rspamd_config_get_action_by_type (task->cfg,
  126. METRIC_ACTION_SOFT_REJECT);
  127. rspamd_add_passthrough_result (task,
  128. soft_reject,
  129. 0,
  130. NAN,
  131. "timeout post-processing message",
  132. "task timeout",
  133. 0);
  134. ucl_object_replace_key (task->messages,
  135. ucl_object_fromstring_common ("timeout post-processing message",
  136. 0, UCL_STRING_RAW),
  137. "smtp_message", 0,
  138. false);
  139. }
  140. }
  141. ev_timer_stop (EV_A_ w);
  142. task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
  143. rspamd_session_cleanup (task->s);
  144. rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
  145. rspamd_session_pending (task->s);
  146. }
  147. }
  148. void
  149. rspamd_worker_guard_handler (EV_P_ ev_io *w, int revents)
  150. {
  151. struct rspamd_task *task = (struct rspamd_task *)w->data;
  152. gchar fake_buf[1024];
  153. gssize r;
  154. r = read (w->fd, fake_buf, sizeof (fake_buf));
  155. if (r > 0) {
  156. msg_warn_task ("received extra data after task is loaded, ignoring");
  157. }
  158. else {
  159. if (r == 0) {
  160. /*
  161. * Poor man approach, that might break things in case of
  162. * shutdown (SHUT_WR) but sockets are so bad that there's no
  163. * reliable way to distinguish between shutdown(SHUT_WR) and
  164. * close.
  165. */
  166. if (task->cmd != CMD_CHECK_V2 && task->cfg->enable_shutdown_workaround) {
  167. msg_info_task ("workaround for shutdown enabled, please update "
  168. "your client, this support might be removed in future");
  169. shutdown (w->fd, SHUT_RD);
  170. ev_io_stop (task->event_loop, &task->guard_ev);
  171. }
  172. else {
  173. msg_err_task ("the peer has closed connection unexpectedly");
  174. rspamd_session_destroy (task->s);
  175. }
  176. }
  177. else if (errno != EAGAIN) {
  178. msg_err_task ("the peer has closed connection unexpectedly: %s",
  179. strerror (errno));
  180. rspamd_session_destroy (task->s);
  181. }
  182. else {
  183. return;
  184. }
  185. }
  186. }
  187. static gint
  188. rspamd_worker_body_handler (struct rspamd_http_connection *conn,
  189. struct rspamd_http_message *msg,
  190. const gchar *chunk, gsize len)
  191. {
  192. struct rspamd_task *task = (struct rspamd_task *) conn->ud;
  193. struct rspamd_worker_ctx *ctx;
  194. ctx = task->worker->ctx;
  195. if (!rspamd_protocol_handle_request (task, msg)) {
  196. msg_err_task ("cannot handle request: %e", task->err);
  197. task->flags |= RSPAMD_TASK_FLAG_SKIP;
  198. }
  199. else {
  200. if (task->cmd == CMD_PING) {
  201. task->flags |= RSPAMD_TASK_FLAG_SKIP;
  202. }
  203. else {
  204. if (!rspamd_task_load_message (task, msg, chunk, len)) {
  205. msg_err_task ("cannot load message: %e", task->err);
  206. task->flags |= RSPAMD_TASK_FLAG_SKIP;
  207. }
  208. }
  209. }
  210. /* Set global timeout for the task */
  211. if (ctx->task_timeout > 0.0) {
  212. task->timeout_ev.data = task;
  213. ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
  214. ctx->task_timeout,
  215. ctx->task_timeout);
  216. ev_timer_start (task->event_loop, &task->timeout_ev);
  217. }
  218. /* Set socket guard */
  219. task->guard_ev.data = task;
  220. ev_io_init (&task->guard_ev, rspamd_worker_guard_handler, task->sock, EV_READ);
  221. ev_io_start (task->event_loop, &task->guard_ev);
  222. rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
  223. return 0;
  224. }
  225. static void
  226. rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
  227. {
  228. struct rspamd_task *task = (struct rspamd_task *) conn->ud;
  229. struct rspamd_http_message *msg;
  230. rspamd_fstring_t *reply;
  231. msg_info_task ("abnormally closing connection from: %s, error: %e",
  232. rspamd_inet_address_to_string (task->client_addr), err);
  233. if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
  234. /* Terminate session immediately */
  235. rspamd_session_destroy (task->s);
  236. }
  237. else {
  238. task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED;
  239. msg = rspamd_http_new_message (HTTP_RESPONSE);
  240. if (err) {
  241. msg->status = rspamd_fstring_new_init (err->message,
  242. strlen (err->message));
  243. msg->code = err->code;
  244. }
  245. else {
  246. msg->status = rspamd_fstring_new_init ("Internal error",
  247. strlen ("Internal error"));
  248. msg->code = 500;
  249. }
  250. msg->date = time (NULL);
  251. reply = rspamd_fstring_sized_new (msg->status->len + 16);
  252. rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status);
  253. rspamd_http_message_set_body_from_fstring_steal (msg, reply);
  254. rspamd_http_connection_reset (task->http_conn);
  255. rspamd_http_connection_write_message (task->http_conn,
  256. msg,
  257. NULL,
  258. "application/json",
  259. task,
  260. 1.0);
  261. }
  262. }
  263. static gint
  264. rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
  265. struct rspamd_http_message *msg)
  266. {
  267. struct rspamd_task *task = (struct rspamd_task *) conn->ud;
  268. if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
  269. /* We are done here */
  270. msg_debug_task ("normally closing connection from: %s",
  271. rspamd_inet_address_to_string (task->client_addr));
  272. rspamd_session_destroy (task->s);
  273. }
  274. else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) {
  275. rspamd_session_pending (task->s);
  276. }
  277. return 0;
  278. }
  279. /*
  280. * Accept new connection and construct task
  281. */
  282. static void
  283. accept_socket (EV_P_ ev_io *w, int revents)
  284. {
  285. struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
  286. struct rspamd_worker_ctx *ctx;
  287. struct rspamd_task *task;
  288. rspamd_inet_addr_t *addr;
  289. gint nfd, http_opts = 0;
  290. ctx = worker->ctx;
  291. if (ctx->max_tasks != 0 && worker->nconns > ctx->max_tasks) {
  292. msg_info_ctx ("current tasks is now: %uD while maximum is: %uD",
  293. worker->nconns,
  294. ctx->max_tasks);
  295. return;
  296. }
  297. if ((nfd =
  298. rspamd_accept_from_socket (w->fd, &addr,
  299. rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
  300. msg_warn_ctx ("accept failed: %s", strerror (errno));
  301. return;
  302. }
  303. /* Check for EAGAIN */
  304. if (nfd == 0) {
  305. return;
  306. }
  307. task = rspamd_task_new (worker, ctx->cfg, NULL, ctx->lang_det, ctx->event_loop);
  308. msg_info_task ("accepted connection from %s port %d, task ptr: %p",
  309. rspamd_inet_address_to_string (addr),
  310. rspamd_inet_address_get_port (addr),
  311. task);
  312. /* Copy some variables */
  313. if (ctx->is_mime) {
  314. task->flags |= RSPAMD_TASK_FLAG_MIME;
  315. }
  316. else {
  317. task->flags &= ~RSPAMD_TASK_FLAG_MIME;
  318. }
  319. task->sock = nfd;
  320. task->client_addr = addr;
  321. worker->srv->stat->connections_count++;
  322. task->resolver = ctx->resolver;
  323. /* TODO: allow to disable autolearn in protocol */
  324. task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
  325. if (ctx->encrypted_only && !rspamd_inet_address_is_local (addr, FALSE)) {
  326. http_opts = RSPAMD_HTTP_REQUIRE_ENCRYPTION;
  327. }
  328. task->http_conn = rspamd_http_connection_new_server (
  329. ctx->http_ctx,
  330. nfd,
  331. rspamd_worker_body_handler,
  332. rspamd_worker_error_handler,
  333. rspamd_worker_finish_handler,
  334. http_opts);
  335. rspamd_http_connection_set_max_size (task->http_conn, task->cfg->max_message);
  336. worker->nconns++;
  337. rspamd_mempool_add_destructor (task->task_pool,
  338. (rspamd_mempool_destruct_t)reduce_tasks_count, worker);
  339. /* Set up async session */
  340. task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
  341. rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task);
  342. if (ctx->key) {
  343. rspamd_http_connection_set_key (task->http_conn, ctx->key);
  344. }
  345. rspamd_http_connection_read_message (task->http_conn,
  346. task,
  347. ctx->timeout);
  348. }
  349. gpointer
  350. init_worker (struct rspamd_config *cfg)
  351. {
  352. struct rspamd_worker_ctx *ctx;
  353. GQuark type;
  354. type = g_quark_try_string ("normal");
  355. ctx = rspamd_mempool_alloc0 (cfg->cfg_pool,
  356. sizeof (struct rspamd_worker_ctx));
  357. ctx->magic = rspamd_worker_magic;
  358. ctx->is_mime = TRUE;
  359. ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT;
  360. ctx->cfg = cfg;
  361. ctx->task_timeout = NAN;
  362. rspamd_rcl_register_worker_option (cfg,
  363. type,
  364. "mime",
  365. rspamd_rcl_parse_struct_boolean,
  366. ctx,
  367. G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime),
  368. 0,
  369. "Set to `false` if this worker is intended to work with non-MIME messages");
  370. rspamd_rcl_register_worker_option (cfg,
  371. type,
  372. "encrypted_only",
  373. rspamd_rcl_parse_struct_boolean,
  374. ctx,
  375. G_STRUCT_OFFSET (struct rspamd_worker_ctx, encrypted_only),
  376. 0,
  377. "Allow only encrypted connections");
  378. rspamd_rcl_register_worker_option (cfg,
  379. type,
  380. "timeout",
  381. rspamd_rcl_parse_struct_time,
  382. ctx,
  383. G_STRUCT_OFFSET (struct rspamd_worker_ctx,
  384. timeout),
  385. RSPAMD_CL_FLAG_TIME_FLOAT,
  386. "Protocol IO timeout");
  387. rspamd_rcl_register_worker_option (cfg,
  388. type,
  389. "task_timeout",
  390. rspamd_rcl_parse_struct_time,
  391. ctx,
  392. G_STRUCT_OFFSET (struct rspamd_worker_ctx,
  393. task_timeout),
  394. RSPAMD_CL_FLAG_TIME_FLOAT,
  395. "Maximum task processing time, default: 8.0 seconds");
  396. rspamd_rcl_register_worker_option (cfg,
  397. type,
  398. "max_tasks",
  399. rspamd_rcl_parse_struct_integer,
  400. ctx,
  401. G_STRUCT_OFFSET (struct rspamd_worker_ctx,
  402. max_tasks),
  403. RSPAMD_CL_FLAG_INT_32,
  404. "Maximum count of parallel tasks processed by a single worker process");
  405. rspamd_rcl_register_worker_option (cfg,
  406. type,
  407. "keypair",
  408. rspamd_rcl_parse_struct_keypair,
  409. ctx,
  410. G_STRUCT_OFFSET (struct rspamd_worker_ctx,
  411. key),
  412. 0,
  413. "Encryption keypair");
  414. return ctx;
  415. }
  416. /*
  417. * Start worker process
  418. */
  419. void
  420. start_worker (struct rspamd_worker *worker)
  421. {
  422. struct rspamd_worker_ctx *ctx = worker->ctx;
  423. gboolean is_controller = FALSE;
  424. g_assert (rspamd_worker_check_context (worker->ctx, rspamd_worker_magic));
  425. ctx->cfg = worker->srv->cfg;
  426. ctx->event_loop = rspamd_prepare_worker (worker, "normal", accept_socket);
  427. rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop,
  428. worker);
  429. if (isnan (ctx->task_timeout)) {
  430. if (isnan (ctx->cfg->task_timeout)) {
  431. ctx->task_timeout = 0;
  432. }
  433. else {
  434. ctx->task_timeout = ctx->cfg->task_timeout;
  435. }
  436. }
  437. ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
  438. ctx->event_loop,
  439. worker->srv->cfg);
  440. rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
  441. ctx->event_loop, ctx->resolver->r);
  442. ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
  443. ctx->cfg->ups_ctx);
  444. rspamd_mempool_add_destructor (ctx->cfg->cfg_pool,
  445. (rspamd_mempool_destruct_t)rspamd_http_context_free,
  446. ctx->http_ctx);
  447. rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
  448. &ctx->lang_det);
  449. if (worker->index == 0) {
  450. /* If there are no controllers, then pretend that we are a controller */
  451. gboolean controller_seen = FALSE;
  452. GList *cur;
  453. cur = worker->srv->cfg->workers;
  454. while (cur) {
  455. struct rspamd_worker_conf *cf;
  456. cf = (struct rspamd_worker_conf *)cur->data;
  457. if (cf->type == g_quark_from_static_string ("controller")) {
  458. if (cf->enabled && cf->count >= 0) {
  459. controller_seen = TRUE;
  460. break;
  461. }
  462. }
  463. cur = g_list_next (cur);
  464. }
  465. if (!controller_seen) {
  466. msg_info_ctx ("no controller workers defined, execute "
  467. "controller periodics in this worker");
  468. worker->flags |= RSPAMD_WORKER_CONTROLLER;
  469. is_controller = TRUE;
  470. }
  471. }
  472. if (is_controller) {
  473. rspamd_worker_init_controller (worker, NULL);
  474. }
  475. else {
  476. rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver,
  477. worker, RSPAMD_MAP_WATCH_SCANNER);
  478. }
  479. rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
  480. worker);
  481. ev_loop (ctx->event_loop, 0);
  482. rspamd_worker_block_signals ();
  483. if (is_controller) {
  484. rspamd_controller_on_terminate (worker, NULL);
  485. }
  486. rspamd_stat_close ();
  487. REF_RELEASE (ctx->cfg);
  488. rspamd_log_close (worker->srv->logger, TRUE);
  489. exit (EXIT_SUCCESS);
  490. }