diff options
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 364 |
1 files changed, 364 insertions, 0 deletions
diff --git a/src/worker.c b/src/worker.c new file mode 100644 index 000000000..268e8123b --- /dev/null +++ b/src/worker.c @@ -0,0 +1,364 @@ +#include <sys/stat.h> +#include <sys/param.h> +#include <sys/types.h> +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <errno.h> +#include <signal.h> + +#include <netinet/in.h> +#include <syslog.h> +#include <fcntl.h> +#include <netdb.h> + +#include <EXTERN.h> /* from the Perl distribution */ +#include <perl.h> /* from the Perl distribution */ + +#include <glib.h> +#include <gmime/gmime.h> + +#include "util.h" +#include "main.h" +#include "protocol.h" +#include "upstream.h" +#include "cfg_file.h" +#include "url.h" +#include "modules.h" + +#define TASK_POOL_SIZE 4095 + +const f_str_t CRLF = { + /* begin */"\r\n", + /* len */2, + /* size */2 +}; + +extern PerlInterpreter *perl_interpreter; + +static +void sig_handler (int signo) +{ + switch (signo) { + case SIGINT: + case SIGTERM: + _exit (1); + break; + } +} + +static void +sigusr_handler (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + /* Do not accept new connections, preparing to end worker's process */ + struct timeval tv; + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + event_del (&worker->sig_ev); + event_del (&worker->bind_ev); + do_reopen_log = 1; + msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + event_loopexit (&tv); + return; +} + +static void +rcpt_destruct (void *pointer) +{ + struct worker_task *task = (struct worker_task *)pointer; + + if (task->rcpt) { + g_list_free (task->rcpt); + } +} + +static void +free_task (struct worker_task *task) +{ + struct mime_part *part; + + if (task) { + if (task->memc_ctx) { + memc_close_ctx (task->memc_ctx); + } + while (!TAILQ_EMPTY (&task->parts)) { + part = TAILQ_FIRST (&task->parts); + g_object_unref (part->type); + g_object_unref (part->content); + TAILQ_REMOVE (&task->parts, part, next); + } + memory_pool_delete (task->task_pool); + bufferevent_disable (task->bev, EV_READ | EV_WRITE); + bufferevent_free (task->bev); + close (task->sock); + g_free (task); + } +} + +static void +mime_foreach_callback (GMimeObject *part, gpointer user_data) +{ + struct worker_task *task = (struct worker_task *)user_data; + struct mime_part *mime_part; + GMimeContentType *type; + GMimeDataWrapper *wrapper; + GMimeStream *part_stream; + GByteArray *part_content; + + task->parts_count ++; + + /* 'part' points to the current part node that g_mime_message_foreach_part() is iterating over */ + + /* find out what class 'part' is... */ + if (GMIME_IS_MESSAGE_PART (part)) { + /* message/rfc822 or message/news */ + GMimeMessage *message; + + /* g_mime_message_foreach_part() won't descend into + child message parts, so if we want to count any + subparts of this child message, we'll have to call + g_mime_message_foreach_part() again here. */ + + message = g_mime_message_part_get_message ((GMimeMessagePart *) part); + g_mime_message_foreach_part (message, mime_foreach_callback, task); + g_object_unref (message); + } else if (GMIME_IS_MESSAGE_PARTIAL (part)) { + /* message/partial */ + + /* this is an incomplete message part, probably a + large message that the sender has broken into + smaller parts and is sending us bit by bit. we + could save some info about it so that we could + piece this back together again once we get all the + parts? */ + } else if (GMIME_IS_MULTIPART (part)) { + /* multipart/mixed, multipart/alternative, multipart/related, multipart/signed, multipart/encrypted, etc... */ + + /* we'll get to finding out if this is a signed/encrypted multipart later... */ + } else if (GMIME_IS_PART (part)) { + /* a normal leaf part, could be text/plain or image/jpeg etc */ + wrapper = g_mime_part_get_content_object (GMIME_PART (part)); + if (wrapper != NULL) { + part_stream = g_mime_stream_mem_new (); + if (g_mime_data_wrapper_write_to_stream (wrapper, part_stream) != -1) { + part_content = g_mime_stream_mem_get_byte_array (GMIME_STREAM_MEM (part_stream)); + type = (GMimeContentType *)g_mime_part_get_content_type (GMIME_PART (part)); + mime_part = memory_pool_alloc (task->task_pool, sizeof (struct mime_part)); + mime_part->type = type; + mime_part->content = part_content; + TAILQ_INSERT_TAIL (&task->parts, mime_part, next); + if (g_mime_content_type_is_type (type, "text", "html")) { + url_parse_html (task, part_content); + } + else if (g_mime_content_type_is_type (type, "text", "plain")) { + url_parse_text (task, part_content); + } + } + } + } else { + g_assert_not_reached (); + } +} + +static int +process_message (struct worker_task *task) +{ + GMimeMessage *message; + GMimeParser *parser; + GMimeStream *stream; + + stream = g_mime_stream_mem_new_with_buffer (task->msg->buf->begin, task->msg->buf->len); + /* create a new parser object to parse the stream */ + parser = g_mime_parser_new_with_stream (stream); + + /* unref the stream (parser owns a ref, so this object does not actually get free'd until we destroy the parser) */ + g_object_unref (stream); + + /* parse the message from the stream */ + message = g_mime_parser_construct_message (parser); + + task->message = message; + memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_object_unref, task->message); + + /* free the parser (and the stream) */ + g_object_unref (parser); + + g_mime_message_foreach_part (message, mime_foreach_callback, task); + + msg_info ("process_message: found %d parts in message", task->parts_count); + + task->worker->srv->stat->messages_scanned ++; + + return process_filters (task); +} + +static void +read_socket (struct bufferevent *bev, void *arg) +{ + struct worker_task *task = (struct worker_task *)arg; + ssize_t r; + char *s; + + switch (task->state) { + case READ_COMMAND: + case READ_HEADER: + s = evbuffer_readline (EVBUFFER_INPUT (bev)); + if (read_rspamd_input_line (task, s) != 0) { + task->last_error = "Read error"; + task->error_code = RSPAMD_NETWORK_ERROR; + task->state = WRITE_ERROR; + } + if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) { + bufferevent_enable (bev, EV_WRITE); + bufferevent_disable (bev, EV_READ); + } + free (s); + break; + case READ_MESSAGE: + r = bufferevent_read (bev, task->msg->pos, task->msg->free); + if (r > 0) { + task->msg->pos += r; + update_buf_size (task->msg); + if (task->msg->free == 0) { + r = process_message (task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + } + else if (r == 1) { + task->state = WAIT_FILTER; + } + } + if (task->state == WRITE_ERROR || task->state == WRITE_REPLY) { + bufferevent_enable (bev, EV_WRITE); + bufferevent_disable (bev, EV_READ); + } + } + else { + msg_err ("read_socket: cannot read data to buffer: %ld", (long int)r); + bufferevent_disable (bev, EV_READ); + bufferevent_free (bev); + free_task (task); + } + break; + case WAIT_FILTER: + bufferevent_disable (bev, EV_READ); + break; + } +} + +static void +write_socket (struct bufferevent *bev, void *arg) +{ + struct worker_task *task = (struct worker_task *)arg; + + switch (task->state) { + case WRITE_REPLY: + write_reply (task); + task->state = CLOSING_CONNECTION; + bufferevent_disable (bev, EV_READ); + break; + case WRITE_ERROR: + write_reply (task); + task->state = CLOSING_CONNECTION; + bufferevent_disable (bev, EV_READ); + break; + case CLOSING_CONNECTION: + msg_debug ("write_socket: normally closing connection"); + free_task (task); + break; + default: + msg_info ("write_socket: abnormally closing connection"); + free_task (task); + break; + } +} + +static void +err_socket (struct bufferevent *bev, short what, void *arg) +{ + struct worker_task *task = (struct worker_task *)arg; + msg_info ("err_socket: abnormally closing connection"); + /* Free buffers */ + free_task (task); +} + +static void +accept_socket (int fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct sockaddr_storage ss; + struct worker_task *new_task; + socklen_t addrlen = sizeof(ss); + int nfd; + + if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { + return; + } + if (event_make_socket_nonblocking(fd) < 0) { + return; + } + + new_task = g_malloc (sizeof (struct worker_task)); + if (new_task == NULL) { + msg_err ("accept_socket: cannot allocate memory for task, %m"); + return; + } + bzero (new_task, sizeof (struct worker_task)); + new_task->worker = worker; + new_task->state = READ_COMMAND; + new_task->sock = nfd; + new_task->cfg = worker->srv->cfg; + TAILQ_INIT (&new_task->urls); + TAILQ_INIT (&new_task->parts); + new_task->task_pool = memory_pool_new (memory_pool_get_size ()); + /* Add destructor for recipients list (it would be better to use anonymous function here */ + memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)rcpt_destruct, new_task); + worker->srv->stat->connections_count ++; + + /* Read event */ + new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task); + bufferevent_enable (new_task->bev, EV_READ); +} + +void +start_worker (struct rspamd_worker *worker, int listen_sock) +{ + struct sigaction signals; + int i; + + + worker->srv->pid = getpid (); + worker->srv->type = TYPE_WORKER; + event_init (); + g_mime_init (0); + + init_signals (&signals, sig_handler); + sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + + /* SIGUSR2 handler */ + signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker); + signal_add (&worker->sig_ev, NULL); + + /* Accept event */ + event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_add(&worker->bind_ev, NULL); + + /* Perform modules configuring */ + for (i = 0; i < MODULES_NUM; i ++) { + modules[i].module_config_func (worker->srv->cfg); + } + + /* Send SIGUSR2 to parent */ + kill (getppid (), SIGUSR2); + + event_loop (0); +} + +/* + * vi:ts=4 + */ |