aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c247
1 files changed, 123 insertions, 124 deletions
diff --git a/src/worker.c b/src/worker.c
index 49324031c..880da71ab 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -40,35 +40,36 @@
#include <evdns.h>
#ifndef WITHOUT_PERL
-#include <EXTERN.h> /* from the Perl distribution */
-#include <perl.h> /* from the Perl distribution */
+# include <EXTERN.h> /* from the Perl distribution */
+# include <perl.h> /* from the Perl distribution */
-extern PerlInterpreter *perl_interpreter;
+extern PerlInterpreter *perl_interpreter;
#endif
#ifdef WITH_GPERF_TOOLS
-#include <glib/gprintf.h>
+# include <glib/gprintf.h>
#endif
-static struct timeval io_tv;
+static struct timeval io_tv;
-static gboolean write_socket (void *arg);
+static gboolean write_socket (void *arg);
-static
-void sig_handler (int signo)
+static
+ void
+sig_handler (int signo)
{
switch (signo) {
- case SIGINT:
- case SIGTERM:
+ case SIGINT:
+ case SIGTERM:
#ifdef WITH_GPERF_TOOLS
- ProfilerStop ();
+ ProfilerStop ();
#endif
#ifdef WITH_PROFILER
- exit (0);
+ exit (0);
#else
- _exit (1);
+ _exit (1);
#endif
- break;
+ break;
}
}
@@ -78,9 +79,9 @@ void sig_handler (int signo)
static void
sigusr_handler (int fd, short what, void *arg)
{
- struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
/* Do not accept new connections, preparing to end worker's process */
- struct timeval tv;
+ struct timeval tv;
tv.tv_sec = SOFT_SHUTDOWN_TIME;
tv.tv_usec = 0;
event_del (&worker->sig_ev);
@@ -97,7 +98,7 @@ sigusr_handler (int fd, short what, void *arg)
static void
rcpt_destruct (void *pointer)
{
- struct worker_task *task = (struct worker_task *)pointer;
+ struct worker_task *task = (struct worker_task *)pointer;
if (task->rcpt) {
g_list_free (task->rcpt);
@@ -110,8 +111,8 @@ rcpt_destruct (void *pointer)
void
free_task (struct worker_task *task, gboolean is_soft)
{
- GList *part;
- struct mime_part *p;
+ GList *part;
+ struct mime_part *p;
if (task) {
msg_debug ("free_task: free pointer %p", task);
@@ -147,10 +148,10 @@ free_task (struct worker_task *task, gboolean is_soft)
}
}
-static void
+static void
free_task_hard (void *ud)
{
- struct worker_task *task = ud;
+ struct worker_task *task = ud;
free_task (task, FALSE);
}
@@ -158,62 +159,62 @@ free_task_hard (void *ud)
/*
* Callback that is called when there is data to read in buffer
*/
-static gboolean
-read_socket (f_str_t *in, void *arg)
+static gboolean
+read_socket (f_str_t * in, void *arg)
{
- struct worker_task *task = (struct worker_task *)arg;
- ssize_t r;
+ struct worker_task *task = (struct worker_task *)arg;
+ ssize_t r;
switch (task->state) {
- case READ_COMMAND:
- case READ_HEADER:
- if (read_rspamd_input_line (task, in) != 0) {
- task->last_error = "Read error";
- task->error_code = RSPAMD_NETWORK_ERROR;
- task->state = WRITE_ERROR;
- }
- if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) {
- write_socket (task);
- }
- break;
- case READ_MESSAGE:
- task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
- task->msg->begin = in->begin;
- task->msg->len = in->len;
- msg_debug ("read_socket: got string of length %ld", (long int)task->msg->len);
- r = process_message (task);
- if (r == -1) {
- msg_warn ("read_socket: processing of message failed");
- task->last_error = "MIME processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
- write_socket (task);
- }
- if (task->cmd == CMD_OTHER) {
- /* Skip filters */
- task->state = WRITE_REPLY;
- write_socket (task);
- return TRUE;
- }
- r = process_filters (task);
- if (r == -1) {
- task->last_error = "Filter processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
- write_socket (task);
- }
- else if (r == 0) {
- task->state = WAIT_FILTER;
- rspamd_dispatcher_pause (task->dispatcher);
- }
- else {
- process_statfiles (task);
- write_socket (task);
- }
- break;
- default:
- msg_debug ("read_socket: invalid state on reading stage");
- break;
+ case READ_COMMAND:
+ case READ_HEADER:
+ if (read_rspamd_input_line (task, in) != 0) {
+ task->last_error = "Read error";
+ task->error_code = RSPAMD_NETWORK_ERROR;
+ task->state = WRITE_ERROR;
+ }
+ if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) {
+ write_socket (task);
+ }
+ break;
+ case READ_MESSAGE:
+ task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
+ task->msg->begin = in->begin;
+ task->msg->len = in->len;
+ msg_debug ("read_socket: got string of length %ld", (long int)task->msg->len);
+ r = process_message (task);
+ if (r == -1) {
+ msg_warn ("read_socket: processing of message failed");
+ task->last_error = "MIME processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ write_socket (task);
+ }
+ if (task->cmd == CMD_OTHER) {
+ /* Skip filters */
+ task->state = WRITE_REPLY;
+ write_socket (task);
+ return TRUE;
+ }
+ r = process_filters (task);
+ if (r == -1) {
+ task->last_error = "Filter processing error";
+ task->error_code = RSPAMD_FILTER_ERROR;
+ task->state = WRITE_ERROR;
+ write_socket (task);
+ }
+ else if (r == 0) {
+ task->state = WAIT_FILTER;
+ rspamd_dispatcher_pause (task->dispatcher);
+ }
+ else {
+ process_statfiles (task);
+ write_socket (task);
+ }
+ break;
+ default:
+ msg_debug ("read_socket: invalid state on reading stage");
+ break;
}
return TRUE;
@@ -222,32 +223,32 @@ read_socket (f_str_t *in, void *arg)
/*
* Callback for socket writing
*/
-static gboolean
+static gboolean
write_socket (void *arg)
{
- struct worker_task *task = (struct worker_task *)arg;
-
+ struct worker_task *task = (struct worker_task *)arg;
+
switch (task->state) {
- case WRITE_REPLY:
- write_reply (task);
- destroy_session (task->s);
- return FALSE;
- break;
- case WRITE_ERROR:
- write_reply (task);
- destroy_session (task->s);
- return FALSE;
- break;
- case CLOSING_CONNECTION:
- msg_debug ("write_socket: normally closing connection");
- destroy_session (task->s);
- return FALSE;
- break;
- default:
- msg_info ("write_socket: abnormally closing connection");
- destroy_session (task->s);
- return FALSE;
- break;
+ case WRITE_REPLY:
+ write_reply (task);
+ destroy_session (task->s);
+ return FALSE;
+ break;
+ case WRITE_ERROR:
+ write_reply (task);
+ destroy_session (task->s);
+ return FALSE;
+ break;
+ case CLOSING_CONNECTION:
+ msg_debug ("write_socket: normally closing connection");
+ destroy_session (task->s);
+ return FALSE;
+ break;
+ default:
+ msg_info ("write_socket: abnormally closing connection");
+ destroy_session (task->s);
+ return FALSE;
+ break;
}
return TRUE;
}
@@ -256,19 +257,19 @@ write_socket (void *arg)
* Called if something goes wrong
*/
static void
-err_socket (GError *err, void *arg)
+err_socket (GError * err, void *arg)
{
- struct worker_task *task = (struct worker_task *)arg;
+ struct worker_task *task = (struct worker_task *)arg;
msg_info ("err_socket: abnormally closing connection, error: %s", err->message);
/* Free buffers */
destroy_session (task->s);
}
-struct worker_task *
+struct worker_task *
construct_task (struct rspamd_worker *worker)
{
- struct worker_task *new_task;
-
+ struct worker_task *new_task;
+
new_task = g_malloc (sizeof (struct worker_task));
msg_debug ("accept_socket: new task allocated: %p", new_task);
@@ -289,11 +290,11 @@ construct_task (struct rspamd_worker *worker)
io_tv.tv_usec = 0;
new_task->task_pool = memory_pool_new (memory_pool_get_size ());
/* Add destructor for recipients list (it would be better to use anonymous function here */
- memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)rcpt_destruct, new_task);
+ memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) rcpt_destruct, new_task);
new_task->results = g_hash_table_new (g_str_hash, g_str_equal);
- memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->results);
+ memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) g_hash_table_destroy, new_task->results);
new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal);
- memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)g_hash_table_destroy, new_task->re_cache);
+ memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) g_hash_table_destroy, new_task->re_cache);
new_task->s = new_async_session (new_task->task_pool, free_task_hard, new_task);
new_task->sock = -1;
@@ -306,13 +307,13 @@ construct_task (struct rspamd_worker *worker)
static void
accept_socket (int fd, short what, void *arg)
{
- struct rspamd_worker *worker = (struct rspamd_worker *)arg;
- struct sockaddr_storage ss;
- struct sockaddr_in *sin;
- struct worker_task *new_task;
- socklen_t addrlen = sizeof(ss);
- int nfd;
-
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct sockaddr_storage ss;
+ struct sockaddr_in *sin;
+ struct worker_task *new_task;
+ socklen_t addrlen = sizeof (ss);
+ int nfd;
+
if ((nfd = accept_from_socket (fd, (struct sockaddr *)&ss, &addrlen)) == -1) {
msg_warn ("accept_socket: accept failed: %s", strerror (errno));
return;
@@ -327,19 +328,17 @@ accept_socket (int fd, short what, void *arg)
msg_info ("accept_socket: accepted connection from unix socket");
}
else if (ss.ss_family == AF_INET) {
- sin = (struct sockaddr_in *) &ss;
+ sin = (struct sockaddr_in *)&ss;
msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
}
new_task = construct_task (worker);
- new_task->sock = nfd;
- worker->srv->stat->connections_count ++;
+ new_task->sock = nfd;
+ worker->srv->stat->connections_count++;
/* Set up dispatcher */
- new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket,
- write_socket, err_socket, &io_tv,
- (void *)new_task);
+ new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &io_tv, (void *)new_task);
}
@@ -349,11 +348,11 @@ accept_socket (int fd, short what, void *arg)
void
start_worker (struct rspamd_worker *worker)
{
- struct sigaction signals;
+ struct sigaction signals;
#ifdef WITH_PROFILER
- extern void _start (void), etext (void);
- monstartup ((u_long) &_start, (u_long) &etext);
+ extern void _start (void), etext (void);
+ monstartup ((u_long) & _start, (u_long) & etext);
#endif
gperf_profiler_init (worker->srv->cfg, "worker");
@@ -367,12 +366,12 @@ start_worker (struct rspamd_worker *worker)
sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
/* SIGUSR2 handler */
- signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
+ signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
signal_add (&worker->sig_ev, NULL);
/* Accept event */
- event_set(&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
- event_add(&worker->bind_ev, NULL);
+ event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_add (&worker->bind_ev, NULL);
/* Maps events */
start_map_watch ();