You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

worker.c 9.7KB


  1. #include <sys/stat.h>
  2. #include <sys/param.h>
  3. #include <sys/types.h>
  4. #include <unistd.h>
  5. #include <stdio.h>
  6. #include <stdlib.h>
  7. #include <string.h>
  8. #include <time.h>
  9. #include <errno.h>
  10. #include <signal.h>
  11. #include <netinet/in.h>
  12. #include <syslog.h>
  13. #include <fcntl.h>
  14. #include <netdb.h>
  15. #include <EXTERN.h> /* from the Perl distribution */
  16. #include <perl.h> /* from the Perl distribution */
  17. #include <glib.h>
  18. #include <gmime/gmime.h>
  19. #include "util.h"
  20. #include "main.h"
  21. #include "protocol.h"
  22. #include "upstream.h"
  23. #include "cfg_file.h"
  24. #include "url.h"
  25. #include "modules.h"
  26. #define TASK_POOL_SIZE 4095
  27. const f_str_t CRLF = {
  28. /* begin */"\r\n",
  29. /* len */2,
  30. /* size */2
  31. };
  32. extern PerlInterpreter *perl_interpreter;
  33. static
  34. void sig_handler (int signo)
  35. {
  36. switch (signo) {
  37. case SIGINT:
  38. case SIGTERM:
  39. _exit (1);
  40. break;
  41. }
  42. }
  43. static void
  44. sigusr_handler (int fd, short what, void *arg)
  45. {
  46. struct rspamd_worker *worker = (struct rspamd_worker *)arg;
  47. /* Do not accept new connections, preparing to end worker's process */
  48. struct timeval tv;
  49. tv.tv_sec = SOFT_SHUTDOWN_TIME;
  50. tv.tv_usec = 0;
  51. event_del (&worker->sig_ev);
  52. event_del (&worker->bind_ev);
  53. do_reopen_log = 1;
  54. msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
  55. event_loopexit (&tv);
  56. return;
  57. }
  58. static void
  59. rcpt_destruct (void *pointer)
  60. {
  61. struct worker_task *task = (struct worker_task *)pointer;
  62. if (task->rcpt) {
  63. g_list_free (task->rcpt);
  64. }
  65. }
  66. static void
  67. free_task (struct worker_task *task)
  68. {
  69. struct mime_part *part;
  70. if (task) {
  71. if (task->memc_ctx) {
  72. memc_close_ctx (task->memc_ctx);
  73. }
  74. while (!TAILQ_EMPTY (&task->parts)) {
  75. part = TAILQ_FIRST (&task->parts);
  76. g_object_unref (part->type);
  77. g_object_unref (part->content);
  78. TAILQ_REMOVE (&task->parts, part, next);
  79. }
  80. memory_pool_delete (task->task_pool);
  81. bufferevent_disable (task->bev, EV_READ | EV_WRITE);
  82. bufferevent_free (task->bev);
  83. close (task->sock);
  84. g_free (task);
  85. }
  86. }
  87. static void
  88. mime_foreach_callback (GMimeObject *part, gpointer user_data)
  89. {
  90. struct worker_task *task = (struct worker_task *)user_data;
  91. struct mime_part *mime_part;
  92. GMimeContentType *type;
  93. GMimeDataWrapper *wrapper;
  94. GMimeStream *part_stream;
  95. GByteArray *part_content;
  96. task->parts_count ++;
  97. /* 'part' points to the current part node that g_mime_message_foreach_part() is iterating over */
  98. /* find out what class 'part' is... */
  99. if (GMIME_IS_MESSAGE_PART (part)) {
  100. /* message/rfc822 or message/news */
  101. GMimeMessage *message;
  102. /* g_mime_message_foreach_part() won't descend into
  103. child message parts, so if we want to count any
  104. subparts of this child message, we'll have to call
  105. g_mime_message_foreach_part() again here. */
  106. message = g_mime_message_part_get_message ((GMimeMessagePart *) part);
  107. g_mime_message_foreach_part (message, mime_foreach_callback, task);
  108. g_object_unref (message);
  109. } else if (GMIME_IS_MESSAGE_PARTIAL (part)) {
  110. /* message/partial */
  111. /* this is an incomplete message part, probably a
  112. large message that the sender has broken into
  113. smaller parts and is sending us bit by bit. we
  114. could save some info about it so that we could
  115. piece this back together again once we get all the
  116. parts? */
  117. } else if (GMIME_IS_MULTIPART (part)) {
  118. /* multipart/mixed, multipart/alternative, multipart/related, multipart/signed, multipart/encrypted, etc... */
  119. /* we'll get to finding out if this is a signed/encrypted multipart later... */
  120. } else if (GMIME_IS_PART (part)) {
  121. /* a normal leaf part, could be text/plain or image/jpeg etc */
  122. wrapper = g_mime_part_get_content_object (GMIME_PART (part));
  123. if (wrapper != NULL) {
  124. part_stream = g_mime_stream_mem_new ();
  125. if (g_mime_data_wrapper_write_to_stream (wrapper, part_stream) != -1) {
  126. part_content = g_mime_stream_mem_get_byte_array (GMIME_STREAM_MEM (part_stream));
  127. type = (GMimeContentType *)g_mime_part_get_content_type (GMIME_PART (part));
  128. mime_part = memory_pool_alloc (task->task_pool, sizeof (struct mime_part));
  129. mime_part->type = type;
  130. mime_part->content = part_content;
  131. TAILQ_INSERT_TAIL (&task->parts, mime_part, next);
  132. if (g_mime_content_type_is_type (type, "text", "html")) {
  133. url_parse_html (task, part_content);
  134. }
  135. else if (g_mime_content_type_is_type (type, "text", "plain")) {
  136. url_parse_text (task, part_content);
  137. }
  138. }
  139. }
  140. } else {
  141. g_assert_not_reached ();
  142. }
  143. }
  144. static int
  145. process_message (struct worker_task *task)
  146. {
  147. GMimeMessage *message;
  148. GMimeParser *parser;
  149. GMimeStream *stream;
  150. stream = g_mime_stream_mem_new_with_buffer (task->msg->buf->begin, task->msg->buf->len);
  151. /* create a new parser object to parse the stream */
  152. parser = g_mime_parser_new_with_stream (stream);
  153. /* unref the stream (parser owns a ref, so this object does not actually get free'd until we destroy the parser) */
  154. g_object_unref (stream);
  155. /* parse the message from the stream */
  156. message = g_mime_parser_construct_message (parser);
  157. task->message = message;
  158. memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_object_unref, task->message);
  159. /* free the parser (and the stream) */
  160. g_object_unref (parser);
  161. g_mime_message_foreach_part (message, mime_foreach_callback, task);
  162. msg_info ("process_message: found %d parts in message", task->parts_count);
  163. task->worker->srv->stat->messages_scanned ++;
  164. return process_filters (task);
  165. }
  166. static void
  167. read_socket (struct bufferevent *bev, void *arg)
  168. {
  169. struct worker_task *task = (struct worker_task *)arg;
  170. ssize_t r;
  171. char *s;
  172. switch (task->state) {
  173. case READ_COMMAND:
  174. case READ_HEADER:
  175. s = evbuffer_readline (EVBUFFER_INPUT (bev));
  176. if (read_rspamd_input_line (task, s) != 0) {
  177. task->last_error = "Read error";
  178. task->error_code = RSPAMD_NETWORK_ERROR;
  179. task->state = WRITE_ERROR;
  180. }
  181. if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
  182. bufferevent_enable (bev, EV_WRITE);
  183. bufferevent_disable (bev, EV_READ);
  184. }
  185. free (s);
  186. break;
  187. case READ_MESSAGE:
  188. r = bufferevent_read (bev, task->msg->pos, task->msg->free);
  189. if (r > 0) {
  190. task->msg->pos += r;
  191. update_buf_size (task->msg);
  192. if (task->msg->free == 0) {
  193. r = process_message (task);
  194. if (r == -1) {
  195. task->last_error = "Filter processing error";
  196. task->error_code = RSPAMD_FILTER_ERROR;
  197. task->state = WRITE_ERROR;
  198. }
  199. else if (r == 1) {
  200. task->state = WAIT_FILTER;
  201. }
  202. }
  203. if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) {
  204. bufferevent_enable (bev, EV_WRITE);
  205. bufferevent_disable (bev, EV_READ);
  206. }
  207. }
  208. else {
  209. msg_err ("read_socket: cannot read data to buffer: %ld", (long int)r);
  210. bufferevent_disable (bev, EV_READ);
  211. bufferevent_free (bev);
  212. free_task (task);
  213. }
  214. break;
  215. case WAIT_FILTER:
  216. bufferevent_disable (bev, EV_READ);
  217. break;
  218. }
  219. }
  220. static void
  221. write_socket (struct bufferevent *bev, void *arg)
  222. {
  223. struct worker_task *task = (struct worker_task *)arg;
  224. switch (task->state) {
  225. case WRITE_REPLY:
  226. write_reply (task);
  227. task->state = CLOSING_CONNECTION;
  228. bufferevent_disable (bev, EV_READ);
  229. break;
  230. case WRITE_ERROR:
  231. write_reply (task);
  232. task->state = CLOSING_CONNECTION;
  233. bufferevent_disable (bev, EV_READ);
  234. break;
  235. case CLOSING_CONNECTION:
  236. msg_debug ("write_socket: normally closing connection");
  237. free_task (task);
  238. break;
  239. default:
  240. msg_info ("write_socket: abnormally closing connection");
  241. free_task (task);
  242. break;
  243. }
  244. }
  245. static void
  246. err_socket (struct bufferevent *bev, short what, void *arg)
  247. {
  248. struct worker_task *task = (struct worker_task *)arg;
  249. msg_info ("err_socket: abnormally closing connection");
  250. /* Free buffers */
  251. free_task (task);
  252. }
  253. static void
  254. accept_socket (int fd, short what, void *arg)
  255. {
  256. struct rspamd_worker *worker = (struct rspamd_worker *)arg;
  257. struct sockaddr_storage ss;
  258. struct worker_task *new_task;
  259. socklen_t addrlen = sizeof(ss);
  260. int nfd;
  261. if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
  262. return;
  263. }
  264. if (event_make_socket_nonblocking(fd) < 0) {
  265. return;
  266. }
  267. new_task = g_malloc (sizeof (struct worker_task));
  268. if (new_task == NULL) {
  269. msg_err ("accept_socket: cannot allocate memory for task, %m");
  270. return;
  271. }
  272. bzero (new_task, sizeof (struct worker_task));
  273. new_task->worker = worker;
  274. new_task->state = READ_COMMAND;
  275. new_task->sock = nfd;
  276. new_task->cfg = worker->srv->cfg;
  277. TAILQ_INIT (&new_task->urls);
  278. TAILQ_INIT (&new_task->parts);
  279. new_task->task_pool = memory_pool_new (memory_pool_get_size ());
  280. /* Add destructor for recipients list (it would be better to use anonymous function here */
  281. memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)rcpt_destruct, new_task);
  282. worker->srv->stat->connections_count ++;
  283. /* Read event */
  284. new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task);
  285. bufferevent_enable (new_task->bev, EV_READ);
  286. }
  287. void
  288. start_worker (struct rspamd_worker *worker, int listen_sock)
  289. {
  290. struct sigaction signals;
  291. int i;
  292. worker->srv->pid = getpid ();
  293. worker->srv->type = TYPE_WORKER;
  294. event_init ();
  295. g_mime_init (0);
  296. init_signals (&signals, sig_handler);
  297. sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
  298. /* SIGUSR2 handler */
  299. signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
  300. signal_add (&worker->sig_ev, NULL);
  301. /* Accept event */
  302. event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
  303. event_add(&worker->bind_ev, NULL);
  304. /* Perform modules configuring */
  305. for (i = 0; i < MODULES_NUM; i ++) {
  306. modules[i].module_config_func (worker->srv->cfg);
  307. }
  308. /* Send SIGUSR2 to parent */
  309. kill (getppid (), SIGUSR2);
  310. event_loop (0);
  311. }
  312. /*
  313. * vi:ts=4
  314. */