diff options
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 247 |
1 files changed, 123 insertions, 124 deletions
diff --git a/src/worker.c b/src/worker.c index 49324031c..880da71ab 100644 --- a/src/worker.c +++ b/src/worker.c @@ -40,35 +40,36 @@ #include <evdns.h> #ifndef WITHOUT_PERL -#include <EXTERN.h> /* from the Perl distribution */ -#include <perl.h> /* from the Perl distribution */ +# include <EXTERN.h> /* from the Perl distribution */ +# include <perl.h> /* from the Perl distribution */ -extern PerlInterpreter *perl_interpreter; +extern PerlInterpreter *perl_interpreter; #endif #ifdef WITH_GPERF_TOOLS -#include <glib/gprintf.h> +# include <glib/gprintf.h> #endif -static struct timeval io_tv; +static struct timeval io_tv; -static gboolean write_socket (void *arg); +static gboolean write_socket (void *arg); -static -void sig_handler (int signo) +static + void +sig_handler (int signo) { switch (signo) { - case SIGINT: - case SIGTERM: + case SIGINT: + case SIGTERM: #ifdef WITH_GPERF_TOOLS - ProfilerStop (); + ProfilerStop (); #endif #ifdef WITH_PROFILER - exit (0); + exit (0); #else - _exit (1); + _exit (1); #endif - break; + break; } } @@ -78,9 +79,9 @@ void sig_handler (int signo) static void sigusr_handler (int fd, short what, void *arg) { - struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct rspamd_worker *worker = (struct rspamd_worker *)arg; /* Do not accept new connections, preparing to end worker's process */ - struct timeval tv; + struct timeval tv; tv.tv_sec = SOFT_SHUTDOWN_TIME; tv.tv_usec = 0; event_del (&worker->sig_ev); @@ -97,7 +98,7 @@ sigusr_handler (int fd, short what, void *arg) static void rcpt_destruct (void *pointer) { - struct worker_task *task = (struct worker_task *)pointer; + struct worker_task *task = (struct worker_task *)pointer; if (task->rcpt) { g_list_free (task->rcpt); @@ -110,8 +111,8 @@ rcpt_destruct (void *pointer) void free_task (struct worker_task *task, gboolean is_soft) { - GList *part; - struct mime_part *p; + GList *part; + struct mime_part *p; if (task) { msg_debug ("free_task: free pointer %p", task); @@ -147,10 +148,10 @@ free_task (struct worker_task *task, gboolean is_soft) } } -static void +static void free_task_hard (void *ud) { - struct worker_task *task = ud; + struct worker_task *task = ud; free_task (task, FALSE); } @@ -158,62 +159,62 @@ free_task_hard (void *ud) /* * Callback that is called when there is data to read in buffer */ -static gboolean -read_socket (f_str_t *in, void *arg) +static gboolean +read_socket (f_str_t * in, void *arg) { - struct worker_task *task = (struct worker_task *)arg; - ssize_t r; + struct worker_task *task = (struct worker_task *)arg; + ssize_t r; switch (task->state) { - case READ_COMMAND: - case READ_HEADER: - if (read_rspamd_input_line (task, in) != 0) { - task->last_error = "Read error"; - task->error_code = RSPAMD_NETWORK_ERROR; - task->state = WRITE_ERROR; - } - if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) { - write_socket (task); - } - break; - case READ_MESSAGE: - task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); - task->msg->begin = in->begin; - task->msg->len = in->len; - msg_debug ("read_socket: got string of length %ld", (long int)task->msg->len); - r = process_message (task); - if (r == -1) { - msg_warn ("read_socket: processing of message failed"); - task->last_error = "MIME processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - write_socket (task); - } - if (task->cmd == CMD_OTHER) { - /* Skip filters */ - task->state = WRITE_REPLY; - write_socket (task); - return TRUE; - } - r = process_filters (task); - if (r == -1) { - task->last_error = "Filter processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - write_socket (task); - } - else if (r == 0) { - task->state = WAIT_FILTER; - rspamd_dispatcher_pause (task->dispatcher); - } - else { - process_statfiles (task); - write_socket (task); - } - break; - default: - msg_debug ("read_socket: invalid state on reading stage"); - break; + case READ_COMMAND: + case READ_HEADER: + if (read_rspamd_input_line (task, in) != 0) { + task->last_error = "Read error"; + task->error_code = RSPAMD_NETWORK_ERROR; + task->state = WRITE_ERROR; + } + if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) { + write_socket (task); + } + break; + case READ_MESSAGE: + task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); + task->msg->begin = in->begin; + task->msg->len = in->len; + msg_debug ("read_socket: got string of length %ld", (long int)task->msg->len); + r = process_message (task); + if (r == -1) { + msg_warn ("read_socket: processing of message failed"); + task->last_error = "MIME processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + write_socket (task); + } + if (task->cmd == CMD_OTHER) { + /* Skip filters */ + task->state = WRITE_REPLY; + write_socket (task); + return TRUE; + } + r = process_filters (task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + write_socket (task); + } + else if (r == 0) { + task->state = WAIT_FILTER; + rspamd_dispatcher_pause (task->dispatcher); + } + else { + process_statfiles (task); + write_socket (task); + } + break; + default: + msg_debug ("read_socket: invalid state on reading stage"); + break; } return TRUE; @@ -222,32 +223,32 @@ read_socket (f_str_t *in, void *arg) /* * Callback for socket writing */ -static gboolean +static gboolean write_socket (void *arg) { - struct worker_task *task = (struct worker_task *)arg; - + struct worker_task *task = (struct worker_task *)arg; + switch (task->state) { - case WRITE_REPLY: - write_reply (task); - destroy_session (task->s); - return FALSE; - break; - case WRITE_ERROR: - write_reply (task); - destroy_session (task->s); - return FALSE; - break; - case CLOSING_CONNECTION: - msg_debug ("write_socket: normally closing connection"); - destroy_session (task->s); - return FALSE; - break; - default: - msg_info ("write_socket: abnormally closing connection"); - destroy_session (task->s); - return FALSE; - break; + case WRITE_REPLY: + write_reply (task); + destroy_session (task->s); + return FALSE; + break; + case WRITE_ERROR: + write_reply (task); + destroy_session (task->s); + return FALSE; + break; + case CLOSING_CONNECTION: + msg_debug ("write_socket: normally closing connection"); + destroy_session (task->s); + return FALSE; + break; + default: + msg_info ("write_socket: abnormally closing connection"); + destroy_session (task->s); + return FALSE; + break; } return TRUE; } @@ -256,19 +257,19 @@ write_socket (void *arg) * Called if something goes wrong */ static void -err_socket (GError *err, void *arg) +err_socket (GError * err, void *arg) { - struct worker_task *task = (struct worker_task *)arg; + struct worker_task *task = (struct worker_task *)arg; msg_info ("err_socket: abnormally closing connection, error: %s", err->message); /* Free buffers */ destroy_session (task->s); } -struct worker_task * +struct worker_task * construct_task (struct rspamd_worker *worker) { - struct worker_task *new_task; - + struct worker_task *new_task; + new_task = g_malloc (sizeof (struct worker_task)); msg_debug ("accept_socket: new task allocated: %p", new_task); @@ -289,11 +290,11 @@ construct_task (struct rspamd_worker *worker) io_tv.tv_usec = 0; new_task->task_pool = memory_pool_new (memory_pool_get_size ()); /* Add destructor for recipients list (it would be better to use anonymous function here */ - memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)rcpt_destruct, new_task); + memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) rcpt_destruct, new_task); new_task->results = g_hash_table_new (g_str_hash, g_str_equal); - memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results); + memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) g_hash_table_destroy, new_task->results); new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal); - memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->re_cache); + memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) g_hash_table_destroy, new_task->re_cache); new_task->s = new_async_session (new_task->task_pool, free_task_hard, new_task); new_task->sock = -1; @@ -306,13 +307,13 @@ construct_task (struct rspamd_worker *worker) static void accept_socket (int fd, short what, void *arg) { - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - struct sockaddr_storage ss; - struct sockaddr_in *sin; - struct worker_task *new_task; - socklen_t addrlen = sizeof(ss); - int nfd; - + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct sockaddr_storage ss; + struct sockaddr_in *sin; + struct worker_task *new_task; + socklen_t addrlen = sizeof (ss); + int nfd; + if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { msg_warn ("accept_socket: accept failed: %s", strerror (errno)); return; @@ -327,19 +328,17 @@ accept_socket (int fd, short what, void *arg) msg_info ("accept_socket: accepted connection from unix socket"); } else if (ss.ss_family == AF_INET) { - sin = (struct sockaddr_in *) &ss; + sin = (struct sockaddr_in *)&ss; msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port)); } new_task = construct_task (worker); - new_task->sock = nfd; - worker->srv->stat->connections_count ++; + new_task->sock = nfd; + worker->srv->stat->connections_count++; /* Set up dispatcher */ - new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, - write_socket, err_socket, &io_tv, - (void *)new_task); + new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &io_tv, (void *)new_task); } @@ -349,11 +348,11 @@ accept_socket (int fd, short what, void *arg) void start_worker (struct rspamd_worker *worker) { - struct sigaction signals; + struct sigaction signals; #ifdef WITH_PROFILER - extern void _start (void), etext (void); - monstartup ((u_long) &_start, (u_long) &etext); + extern void _start (void), etext (void); + monstartup ((u_long) & _start, (u_long) & etext); #endif gperf_profiler_init (worker->srv->cfg, "worker"); @@ -367,12 +366,12 @@ start_worker (struct rspamd_worker *worker) sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); /* SIGUSR2 handler */ - signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker); + signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker); signal_add (&worker->sig_ev, NULL); /* Accept event */ - event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); - event_add(&worker->bind_ev, NULL); + event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_add (&worker->bind_ev, NULL); /* Maps events */ start_map_watch (); |