You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

worker.c 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. /*
  17. * Rspamd worker implementation
  18. */
  19. #include "config.h"
  20. #include "libutil/util.h"
  21. #include "libserver/maps/map.h"
  22. #include "libutil/upstream.h"
  23. #include "libserver/protocol.h"
  24. #include "libserver/cfg_file.h"
  25. #include "libserver/url.h"
  26. #include "libserver/dns.h"
  27. #include "libmime/message.h"
  28. #include "rspamd.h"
  29. #include "libstat/stat_api.h"
  30. #include "libserver/worker_util.h"
  31. #include "libserver/rspamd_control.h"
  32. #include "worker_private.h"
  33. #include "libserver/http/http_private.h"
  34. #include "libserver/cfg_file_private.h"
  35. #include <math.h>
  36. #include "unix-std.h"
  37. #include "lua/lua_common.h"
  38. /* 60 seconds for worker's IO */
  39. #define DEFAULT_WORKER_IO_TIMEOUT 60.0
  40. gpointer init_worker (struct rspamd_config *cfg);
  41. void start_worker (struct rspamd_worker *worker);
  42. worker_t normal_worker = {
  43. "normal", /* Name */
  44. init_worker, /* Init function */
  45. start_worker, /* Start function */
  46. RSPAMD_WORKER_HAS_SOCKET|RSPAMD_WORKER_KILLABLE|RSPAMD_WORKER_SCANNER,
  47. RSPAMD_WORKER_SOCKET_TCP, /* TCP socket */
  48. RSPAMD_WORKER_VER /* Version info */
  49. };
  50. #define msg_err_ctx(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
  51. "worker", ctx->cfg->cfg_pool->tag.uid, \
  52. G_STRFUNC, \
  53. __VA_ARGS__)
  54. #define msg_warn_ctx(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
  55. "worker", ctx->cfg->cfg_pool->tag.uid, \
  56. G_STRFUNC, \
  57. __VA_ARGS__)
  58. #define msg_info_ctx(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
  59. "worker", ctx->cfg->cfg_pool->tag.uid, \
  60. G_STRFUNC, \
  61. __VA_ARGS__)
  62. struct rspamd_worker_session {
  63. gint64 magic;
  64. struct rspamd_task *task;
  65. gint fd;
  66. rspamd_inet_addr_t *addr;
  67. struct rspamd_worker_ctx *ctx;
  68. struct rspamd_http_connection *http_conn;
  69. struct rspamd_worker *worker;
  70. };
  71. /*
  72. * Reduce number of tasks proceeded
  73. */
  74. static void
  75. reduce_tasks_count (gpointer arg)
  76. {
  77. struct rspamd_worker *worker = arg;
  78. worker->nconns --;
  79. if (worker->state == rspamd_worker_wait_connections && worker->nconns == 0) {
  80. worker->state = rspamd_worker_wait_final_scripts;
  81. msg_info ("performing finishing actions");
  82. if (rspamd_worker_call_finish_handlers (worker)) {
  83. worker->state = rspamd_worker_wait_final_scripts;
  84. }
  85. else {
  86. worker->state = rspamd_worker_wanna_die;
  87. }
  88. }
  89. else if (worker->state != rspamd_worker_state_running) {
  90. worker->state = rspamd_worker_wait_connections;
  91. }
  92. }
  93. static gint
  94. rspamd_worker_body_handler (struct rspamd_http_connection *conn,
  95. struct rspamd_http_message *msg,
  96. const gchar *chunk, gsize len)
  97. {
  98. struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
  99. struct rspamd_task *task;
  100. struct rspamd_worker_ctx *ctx;
  101. const rspamd_ftok_t *hv_tok;
  102. gboolean debug_mempool = FALSE;
  103. ctx = session->ctx;
  104. /* Check debug */
  105. if ((hv_tok = rspamd_http_message_find_header (msg, "Memory")) != NULL) {
  106. rspamd_ftok_t cmp;
  107. RSPAMD_FTOK_ASSIGN (&cmp, "debug");
  108. if (rspamd_ftok_cmp (hv_tok, &cmp) == 0) {
  109. debug_mempool = TRUE;
  110. }
  111. }
  112. task = rspamd_task_new (session->worker,
  113. session->ctx->cfg, NULL, session->ctx->lang_det,
  114. session->ctx->event_loop,
  115. debug_mempool);
  116. session->task = task;
  117. msg_info_task ("accepted connection from %s port %d, task ptr: %p",
  118. rspamd_inet_address_to_string (session->addr),
  119. rspamd_inet_address_get_port (session->addr),
  120. task);
  121. /* Copy some variables */
  122. if (ctx->is_mime) {
  123. task->flags |= RSPAMD_TASK_FLAG_MIME;
  124. }
  125. else {
  126. task->flags &= ~RSPAMD_TASK_FLAG_MIME;
  127. }
  128. /* We actually transfer ownership from session to task here */
  129. task->sock = session->fd;
  130. task->client_addr = session->addr;
  131. task->worker = session->worker;
  132. task->http_conn = session->http_conn;
  133. task->resolver = ctx->resolver;
  134. /* TODO: allow to disable autolearn in protocol */
  135. task->flags |= RSPAMD_TASK_FLAG_LEARN_AUTO;
  136. session->worker->nconns++;
  137. rspamd_mempool_add_destructor (task->task_pool,
  138. (rspamd_mempool_destruct_t)reduce_tasks_count,
  139. session->worker);
  140. /* Session memory is also now handled by task */
  141. rspamd_mempool_add_destructor (task->task_pool,
  142. (rspamd_mempool_destruct_t)g_free,
  143. session);
  144. /* Set up async session */
  145. task->s = rspamd_session_create (task->task_pool, rspamd_task_fin,
  146. rspamd_task_restore, (event_finalizer_t )rspamd_task_free, task);
  147. if (!rspamd_protocol_handle_request (task, msg)) {
  148. msg_err_task ("cannot handle request: %e", task->err);
  149. task->flags |= RSPAMD_TASK_FLAG_SKIP;
  150. }
  151. else {
  152. if (task->cmd == CMD_PING) {
  153. task->flags |= RSPAMD_TASK_FLAG_SKIP;
  154. }
  155. else {
  156. if (!rspamd_task_load_message (task, msg, chunk, len)) {
  157. msg_err_task ("cannot load message: %e", task->err);
  158. task->flags |= RSPAMD_TASK_FLAG_SKIP;
  159. }
  160. }
  161. }
  162. /* Set global timeout for the task */
  163. if (ctx->task_timeout > 0.0) {
  164. task->timeout_ev.data = task;
  165. ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
  166. ctx->task_timeout,
  167. ctx->task_timeout);
  168. ev_set_priority (&task->timeout_ev, EV_MAXPRI);
  169. ev_timer_start (task->event_loop, &task->timeout_ev);
  170. }
  171. /* Set socket guard */
  172. task->guard_ev.data = task;
  173. ev_io_init (&task->guard_ev,
  174. rspamd_worker_guard_handler,
  175. task->sock, EV_READ);
  176. ev_io_start (task->event_loop, &task->guard_ev);
  177. rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL);
  178. return 0;
  179. }
  180. static void
  181. rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err)
  182. {
  183. struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
  184. struct rspamd_task *task;
  185. struct rspamd_http_message *msg;
  186. rspamd_fstring_t *reply;
  187. /*
  188. * This function can be called with both struct rspamd_worker_session *
  189. * and struct rspamd_task *
  190. *
  191. * The first case is when we read message and it is controlled by this code;
  192. * the second case is when a reply is written and we do not control it normally,
  193. * as it is managed by `rspamd_protocol_reply` in protocol.c
  194. *
  195. * Hence, we need to distinguish our arguments...
  196. *
  197. * The approach here is simple:
  198. * - struct rspamd_worker_session starts with gint64 `magic` and we set it to
  199. * MAX_INT64
  200. * - struct rspamd_task starts with a pointer (or pointer + command on 32 bit system)
  201. *
  202. * The idea is simple: no sane pointer would reach MAX_INT64, so if this field
  203. * is MAX_INT64 then it is our session, and if it is not then it is a task.
  204. */
  205. if (session->magic == G_MAXINT64) {
  206. task = session->task;
  207. }
  208. else {
  209. task = (struct rspamd_task *)conn->ud;
  210. }
  211. if (task) {
  212. msg_info_task ("abnormally closing connection from: %s, error: %e",
  213. rspamd_inet_address_to_string_pretty (task->client_addr), err);
  214. if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
  215. /* Terminate session immediately */
  216. rspamd_session_destroy (task->s);
  217. }
  218. else {
  219. task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED;
  220. msg = rspamd_http_new_message (HTTP_RESPONSE);
  221. if (err) {
  222. msg->status = rspamd_fstring_new_init (err->message,
  223. strlen (err->message));
  224. msg->code = err->code;
  225. }
  226. else {
  227. msg->status = rspamd_fstring_new_init ("Internal error",
  228. strlen ("Internal error"));
  229. msg->code = 500;
  230. }
  231. msg->date = time (NULL);
  232. reply = rspamd_fstring_sized_new (msg->status->len + 16);
  233. rspamd_printf_fstring (&reply, "{\"error\":\"%V\"}", msg->status);
  234. rspamd_http_message_set_body_from_fstring_steal (msg, reply);
  235. rspamd_http_connection_reset (task->http_conn);
  236. /* Use a shorter timeout for writing reply */
  237. rspamd_http_connection_write_message (task->http_conn,
  238. msg,
  239. NULL,
  240. "application/json",
  241. task,
  242. session->ctx->timeout / 10.0);
  243. }
  244. }
  245. else {
  246. /* If there was no task, then session is unmanaged */
  247. msg_info ("no data received from: %s, error: %e",
  248. rspamd_inet_address_to_string_pretty (session->addr), err);
  249. rspamd_http_connection_reset (session->http_conn);
  250. rspamd_http_connection_unref (session->http_conn);
  251. rspamd_inet_address_free (session->addr);
  252. close (session->fd);
  253. g_free (session);
  254. }
  255. }
  256. static gint
  257. rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
  258. struct rspamd_http_message *msg)
  259. {
  260. struct rspamd_worker_session *session = (struct rspamd_worker_session *)conn->ud;
  261. struct rspamd_task *task;
  262. /* Read the comment to rspamd_worker_error_handler */
  263. if (session->magic == G_MAXINT64) {
  264. task = session->task;
  265. }
  266. else {
  267. task = (struct rspamd_task *)conn->ud;
  268. }
  269. if (task) {
  270. if (task->processed_stages & RSPAMD_TASK_STAGE_REPLIED) {
  271. /* We are done here */
  272. msg_debug_task ("normally closing connection from: %s",
  273. rspamd_inet_address_to_string (task->client_addr));
  274. rspamd_session_destroy (task->s);
  275. }
  276. else if (task->processed_stages & RSPAMD_TASK_STAGE_DONE) {
  277. rspamd_session_pending (task->s);
  278. }
  279. }
  280. else {
  281. /* If there was no task, then session is unmanaged */
  282. msg_info ("no data received from: %s, closing connection",
  283. rspamd_inet_address_to_string_pretty (session->addr));
  284. rspamd_inet_address_free (session->addr);
  285. rspamd_http_connection_reset (session->http_conn);
  286. rspamd_http_connection_unref (session->http_conn);
  287. close (session->fd);
  288. g_free (session);
  289. }
  290. return 0;
  291. }
  292. /*
  293. * Accept new connection and construct task
  294. */
  295. static void
  296. accept_socket (EV_P_ ev_io *w, int revents)
  297. {
  298. struct rspamd_worker *worker = (struct rspamd_worker *) w->data;
  299. struct rspamd_worker_ctx *ctx;
  300. struct rspamd_worker_session *session;
  301. rspamd_inet_addr_t *addr = NULL;
  302. gint nfd, http_opts = 0;
  303. ctx = worker->ctx;
  304. if (ctx->max_tasks != 0 && worker->nconns > ctx->max_tasks) {
  305. msg_info_ctx ("current tasks is now: %uD while maximum is: %uD",
  306. worker->nconns,
  307. ctx->max_tasks);
  308. return;
  309. }
  310. if ((nfd =
  311. rspamd_accept_from_socket (w->fd, &addr,
  312. rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
  313. msg_warn_ctx ("accept failed: %s", strerror (errno));
  314. return;
  315. }
  316. /* Check for EAGAIN */
  317. if (nfd == 0) {
  318. rspamd_inet_address_free (addr);
  319. return;
  320. }
  321. session = g_malloc0 (sizeof (*session));
  322. session->magic = G_MAXINT64;
  323. session->addr = addr;
  324. session->fd = nfd;
  325. session->ctx = ctx;
  326. session->worker = worker;
  327. if (ctx->encrypted_only && !rspamd_inet_address_is_local (addr)) {
  328. http_opts = RSPAMD_HTTP_REQUIRE_ENCRYPTION;
  329. }
  330. session->http_conn = rspamd_http_connection_new_server (
  331. ctx->http_ctx,
  332. nfd,
  333. rspamd_worker_body_handler,
  334. rspamd_worker_error_handler,
  335. rspamd_worker_finish_handler,
  336. http_opts);
  337. worker->srv->stat->connections_count++;
  338. rspamd_http_connection_set_max_size (session->http_conn,
  339. ctx->cfg->max_message);
  340. if (ctx->key) {
  341. rspamd_http_connection_set_key (session->http_conn, ctx->key);
  342. }
  343. rspamd_http_connection_read_message (session->http_conn,
  344. session,
  345. ctx->timeout);
  346. }
  347. gpointer
  348. init_worker (struct rspamd_config *cfg)
  349. {
  350. struct rspamd_worker_ctx *ctx;
  351. GQuark type;
  352. type = g_quark_try_string ("normal");
  353. ctx = rspamd_mempool_alloc0 (cfg->cfg_pool,
  354. sizeof (struct rspamd_worker_ctx));
  355. ctx->magic = rspamd_worker_magic;
  356. ctx->is_mime = TRUE;
  357. ctx->timeout = DEFAULT_WORKER_IO_TIMEOUT;
  358. ctx->cfg = cfg;
  359. ctx->task_timeout = NAN;
  360. rspamd_rcl_register_worker_option (cfg,
  361. type,
  362. "mime",
  363. rspamd_rcl_parse_struct_boolean,
  364. ctx,
  365. G_STRUCT_OFFSET (struct rspamd_worker_ctx, is_mime),
  366. 0,
  367. "Set to `false` if this worker is intended to work with non-MIME messages");
  368. rspamd_rcl_register_worker_option (cfg,
  369. type,
  370. "encrypted_only",
  371. rspamd_rcl_parse_struct_boolean,
  372. ctx,
  373. G_STRUCT_OFFSET (struct rspamd_worker_ctx, encrypted_only),
  374. 0,
  375. "Allow only encrypted connections");
  376. rspamd_rcl_register_worker_option (cfg,
  377. type,
  378. "timeout",
  379. rspamd_rcl_parse_struct_time,
  380. ctx,
  381. G_STRUCT_OFFSET (struct rspamd_worker_ctx,
  382. timeout),
  383. RSPAMD_CL_FLAG_TIME_FLOAT,
  384. "Protocol IO timeout");
  385. rspamd_rcl_register_worker_option (cfg,
  386. type,
  387. "task_timeout",
  388. rspamd_rcl_parse_struct_time,
  389. ctx,
  390. G_STRUCT_OFFSET (struct rspamd_worker_ctx,
  391. task_timeout),
  392. RSPAMD_CL_FLAG_TIME_FLOAT,
  393. "Maximum task processing time, default: 8.0 seconds");
  394. rspamd_rcl_register_worker_option (cfg,
  395. type,
  396. "max_tasks",
  397. rspamd_rcl_parse_struct_integer,
  398. ctx,
  399. G_STRUCT_OFFSET (struct rspamd_worker_ctx,
  400. max_tasks),
  401. RSPAMD_CL_FLAG_INT_32,
  402. "Maximum count of parallel tasks processed by a single worker process");
  403. rspamd_rcl_register_worker_option (cfg,
  404. type,
  405. "keypair",
  406. rspamd_rcl_parse_struct_keypair,
  407. ctx,
  408. G_STRUCT_OFFSET (struct rspamd_worker_ctx,
  409. key),
  410. 0,
  411. "Encryption keypair");
  412. return ctx;
  413. }
  414. /*
  415. * Start worker process
  416. */
  417. __attribute__((noreturn))
  418. void
  419. start_worker (struct rspamd_worker *worker)
  420. {
  421. struct rspamd_worker_ctx *ctx = worker->ctx;
  422. gboolean is_controller = FALSE;
  423. g_assert (rspamd_worker_check_context (worker->ctx, rspamd_worker_magic));
  424. ctx->cfg = worker->srv->cfg;
  425. ctx->event_loop = rspamd_prepare_worker (worker, "normal", accept_socket);
  426. rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop,
  427. worker);
  428. if (isnan (ctx->task_timeout)) {
  429. if (isnan (ctx->cfg->task_timeout)) {
  430. ctx->task_timeout = 0;
  431. }
  432. else {
  433. ctx->task_timeout = ctx->cfg->task_timeout;
  434. }
  435. }
  436. ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
  437. ctx->event_loop,
  438. worker->srv->cfg);
  439. rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
  440. ctx->event_loop, ctx->resolver->r);
  441. ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
  442. ctx->cfg->ups_ctx);
  443. rspamd_mempool_add_destructor (ctx->cfg->cfg_pool,
  444. (rspamd_mempool_destruct_t)rspamd_http_context_free,
  445. ctx->http_ctx);
  446. rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
  447. &ctx->lang_det);
  448. if (worker->index == 0) {
  449. /* If there are no controllers, then pretend that we are a controller */
  450. gboolean controller_seen = FALSE;
  451. GList *cur;
  452. cur = worker->srv->cfg->workers;
  453. while (cur) {
  454. struct rspamd_worker_conf *cf;
  455. cf = (struct rspamd_worker_conf *)cur->data;
  456. if (cf->type == g_quark_from_static_string ("controller")) {
  457. if (cf->enabled && cf->count >= 0) {
  458. controller_seen = TRUE;
  459. break;
  460. }
  461. }
  462. cur = g_list_next (cur);
  463. }
  464. if (!controller_seen) {
  465. msg_info_ctx ("no controller workers defined, execute "
  466. "controller periodics in this worker");
  467. worker->flags |= RSPAMD_WORKER_CONTROLLER;
  468. is_controller = TRUE;
  469. }
  470. }
  471. if (is_controller) {
  472. rspamd_worker_init_controller (worker, NULL);
  473. }
  474. else {
  475. rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver,
  476. worker, RSPAMD_MAP_WATCH_SCANNER);
  477. }
  478. rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
  479. worker);
  480. ev_loop (ctx->event_loop, 0);
  481. rspamd_worker_block_signals ();
  482. if (is_controller) {
  483. rspamd_controller_on_terminate (worker, NULL);
  484. }
  485. rspamd_stat_close ();
  486. REF_RELEASE (ctx->cfg);
  487. rspamd_log_close (worker->srv->logger);
  488. exit (EXIT_SUCCESS);
  489. }