diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2010-10-06 20:03:57 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2010-10-06 20:03:57 +0400 |
commit | 6b306ab8752befc28d259be55495f8249cc2df24 (patch) | |
tree | 0fa3d471aef61925563709e0f8ee5667b57c7a9d /src/worker.c | |
parent | 8d0053734fb5a4ccd8c3bda731e6b7c8261c6f67 (diff) | |
download | rspamd-6b306ab8752befc28d259be55495f8249cc2df24.tar.gz rspamd-6b306ab8752befc28d259be55495f8249cc2df24.zip |
Fixes types (use glib ones) no functional change.
Now all comments in commit logs beginning with '*' would be included in changelog, so
important changes would be separated from small ones.
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 1062 |
1 files changed, 579 insertions, 483 deletions
diff --git a/src/worker.c b/src/worker.c index aa8e40cc0..80c7dc48c 100644 --- a/src/worker.c +++ b/src/worker.c @@ -41,8 +41,8 @@ #include "lua/lua_common.h" #ifndef WITHOUT_PERL -# include <EXTERN.h> /* from the Perl distribution */ -# include <perl.h> /* from the Perl distribution */ +# include <EXTERN.h> /* from the Perl distribution */ +# include <perl.h> /* from the Perl distribution */ extern PerlInterpreter *perl_interpreter; #endif @@ -59,14 +59,15 @@ extern PerlInterpreter *perl_interpreter; #define MODULE_AFTER_CONNECT_FUNC "after_connect" #define MODULE_PARSE_LINE_FUNC "parse_line" -struct custom_filter { - char *filename; /*< filename */ - GModule *handle; /*< returned by dlopen */ - void (*init_func)(struct config_file *cfg); /*< called at start of worker */ - void* (*before_connect)(void); /*< called when clients connects */ - gboolean (*process_line)(const char *line, size_t len, char **output, void *user_data); /*< called when client send data line */ - void (*after_connect)(char **output, char **log_line, void *user_data); /*< called when client disconnects */ - void (*fin_func)(void); +struct custom_filter +{ + gchar *filename; /*< filename */ + GModule *handle; /*< returned by dlopen */ + void (*init_func) (struct config_file * cfg); /*< called at start of worker */ + void *(*before_connect) (void); /*< called when clients connects */ + gboolean (*process_line) (const gchar * line, size_t len, gchar ** output, void *user_data); /*< called when client send data line */ + void (*after_connect) (gchar ** output, gchar ** log_line, void *user_data); /*< called when client disconnects */ + void (*fin_func) (void); }; #endif @@ -74,70 +75,74 @@ struct custom_filter { /* * Worker's context */ -struct rspamd_worker_ctx { - struct timeval io_tv; - /* Detect whether this worker is mime worker */ - gboolean is_mime; - /* Detect whether this worker is mime worker */ - gboolean is_custom; - GList *custom_filters; - /* DNS resolver */ - struct rspamd_dns_resolver *resolver; +struct rspamd_worker_ctx +{ + struct timeval io_tv; + /* Detect whether this worker is mime worker */ + gboolean is_mime; + /* Detect whether this worker is mime worker */ + gboolean is_custom; + GList *custom_filters; + /* DNS resolver */ + struct rspamd_dns_resolver *resolver; }; static gboolean write_socket (void *arg); -static sig_atomic_t wanna_die = 0; +static sig_atomic_t wanna_die = 0; #ifndef HAVE_SA_SIGINFO static void -sig_handler (int signo) +sig_handler (gint signo) #else static void -sig_handler (int signo, siginfo_t *info, void *unused) +sig_handler (gint signo, siginfo_t * info, void *unused) #endif { - struct timeval tv; - - switch (signo) { - case SIGUSR1: - reopen_log (); - break; - case SIGINT: - case SIGTERM: - if (!wanna_die) { - wanna_die = 1; - tv.tv_sec = 0; - tv.tv_usec = 0; - event_loopexit (&tv); + struct timeval tv; + + switch (signo) + { + case SIGUSR1: + reopen_log (); + break; + case SIGINT: + case SIGTERM: + if (!wanna_die) + { + wanna_die = 1; + tv.tv_sec = 0; + tv.tv_usec = 0; + event_loopexit (&tv); #ifdef WITH_GPERF_TOOLS - ProfilerStop (); + ProfilerStop (); #endif - } - break; } + break; + } } /* * Config reload is designed by sending sigusr to active workers and pending shutdown of them */ static void -sigusr_handler (int fd, short what, void *arg) +sigusr_handler (gint fd, short what, void *arg) { - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - /* Do not accept new connections, preparing to end worker's process */ - struct timeval tv; - if (! wanna_die) { - tv.tv_sec = SOFT_SHUTDOWN_TIME; - tv.tv_usec = 0; - event_del (&worker->sig_ev); - event_del (&worker->bind_ev); - do_reopen_log = 1; - msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); - event_loopexit (&tv); - } - return; + struct rspamd_worker *worker = (struct rspamd_worker *) arg; + /* Do not accept new connections, preparing to end worker's process */ + struct timeval tv; + if (!wanna_die) + { + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + event_del (&worker->sig_ev); + event_del (&worker->bind_ev); + do_reopen_log = 1; + msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + event_loopexit (&tv); + } + return; } /* @@ -146,79 +151,94 @@ sigusr_handler (int fd, short what, void *arg) static void rcpt_destruct (void *pointer) { - struct worker_task *task = (struct worker_task *)pointer; + struct worker_task *task = (struct worker_task *) pointer; - if (task->rcpt) { - g_list_free (task->rcpt); - } + if (task->rcpt) + { + g_list_free (task->rcpt); + } } -#ifndef BUILD_STATIC +#ifndef BUILD_STATIC static void fin_custom_filters (struct worker_task *task) { - GList *cur, *curd; - struct custom_filter *filt; - char *output = NULL, *log = NULL; - struct rspamd_worker_ctx *ctx = task->worker->ctx; - - cur = ctx->custom_filters; - curd = task->rcpt; - while (cur) { - filt = cur->data; - if (filt->after_connect) { - filt->after_connect (&output, &log, curd->data); - if (output != NULL) { - if (! rspamd_dispatcher_write (task->dispatcher, output, strlen (output), FALSE, FALSE)) { - g_free (output); - return; - } - g_free (output); - } - if (log != NULL) { - msg_info ("%s", log); - g_free (log); - } - if (curd->next) { - curd = g_list_next (curd); - } + GList *cur, *curd; + struct custom_filter *filt; + gchar *output = NULL, *log = NULL; + struct rspamd_worker_ctx *ctx = task->worker->ctx; + + cur = ctx->custom_filters; + curd = task->rcpt; + while (cur) + { + filt = cur->data; + if (filt->after_connect) + { + filt->after_connect (&output, &log, curd->data); + if (output != NULL) + { + if (!rspamd_dispatcher_write + (task->dispatcher, output, strlen (output), FALSE, FALSE)) + { + g_free (output); + return; } - cur = g_list_next (cur); + g_free (output); + } + if (log != NULL) + { + msg_info ("%s", log); + g_free (log); + } + if (curd->next) + { + curd = g_list_next (curd); + } } + cur = g_list_next (cur); + } } -static gboolean -parse_line_custom (struct worker_task *task, f_str_t *in) +static gboolean +parse_line_custom (struct worker_task *task, f_str_t * in) { - GList *cur, *curd; - struct custom_filter *filt; - char *output = NULL; - gboolean res = TRUE; - struct rspamd_worker_ctx *ctx = task->worker->ctx; - - cur = ctx->custom_filters; - curd = task->rcpt; - while (cur) { - filt = cur->data; - if (filt->after_connect) { - if (! filt->process_line (in->begin, in->len, &output, curd->data)) { - res = FALSE; - } - if (output != NULL) { - if (! rspamd_dispatcher_write (task->dispatcher, output, strlen (output), FALSE, FALSE)) { - g_free (output); - return FALSE; - } - g_free (output); - } - if (curd->next) { - curd = g_list_next (curd); - } + GList *cur, *curd; + struct custom_filter *filt; + gchar *output = NULL; + gboolean res = TRUE; + struct rspamd_worker_ctx *ctx = task->worker->ctx; + + cur = ctx->custom_filters; + curd = task->rcpt; + while (cur) + { + filt = cur->data; + if (filt->after_connect) + { + if (!filt->process_line (in->begin, in->len, &output, curd->data)) + { + res = FALSE; + } + if (output != NULL) + { + if (!rspamd_dispatcher_write + (task->dispatcher, output, strlen (output), FALSE, FALSE)) + { + g_free (output); + return FALSE; } - cur = g_list_next (cur); + g_free (output); + } + if (curd->next) + { + curd = g_list_next (curd); + } } + cur = g_list_next (cur); + } - return res; + return res; } #else /* Stubs */ @@ -227,10 +247,11 @@ fin_custom_filters (struct worker_task *task) { } -static gboolean -parse_line_custom (struct worker_task *task, f_str_t *in) + +static gboolean +parse_line_custom (struct worker_task *task, f_str_t * in) { - return FALSE; + return FALSE; } #endif @@ -240,60 +261,70 @@ parse_line_custom (struct worker_task *task, f_str_t *in) void free_task (struct worker_task *task, gboolean is_soft) { - GList *part; - struct mime_part *p; - - if (task) { - debug_task ("free pointer %p", task); - while ((part = g_list_first (task->parts))) { - task->parts = g_list_remove_link (task->parts, part); - p = (struct mime_part *)part->data; - g_byte_array_free (p->content, TRUE); - g_list_free_1 (part); - } - if (task->text_parts) { - g_list_free (task->text_parts); - } - if (task->urls) { - g_list_free (task->urls); - } - if (task->images) { - g_list_free (task->images); - } - if (task->messages) { - g_list_free (task->messages); - } - memory_pool_delete (task->task_pool); - if (task->dispatcher) { - if (is_soft) { - /* Plan dispatcher shutdown */ - task->dispatcher->wanna_die = 1; - } - else { - rspamd_remove_dispatcher (task->dispatcher); - } - } - if (task->sock != -1) { - close (task->sock); - } - g_free (task); + GList *part; + struct mime_part *p; + + if (task) + { + debug_task ("free pointer %p", task); + while ((part = g_list_first (task->parts))) + { + task->parts = g_list_remove_link (task->parts, part); + p = (struct mime_part *) part->data; + g_byte_array_free (p->content, TRUE); + g_list_free_1 (part); + } + if (task->text_parts) + { + g_list_free (task->text_parts); + } + if (task->urls) + { + g_list_free (task->urls); + } + if (task->images) + { + g_list_free (task->images); + } + if (task->messages) + { + g_list_free (task->messages); + } + memory_pool_delete (task->task_pool); + if (task->dispatcher) + { + if (is_soft) + { + /* Plan dispatcher shutdown */ + task->dispatcher->wanna_die = 1; + } + else + { + rspamd_remove_dispatcher (task->dispatcher); + } + } + if (task->sock != -1) + { + close (task->sock); } + g_free (task); + } } void free_task_hard (gpointer ud) { - struct worker_task *task = ud; + struct worker_task *task = ud; - free_task (task, FALSE); + free_task (task, FALSE); } void free_task_soft (gpointer ud) { - struct worker_task *task = ud; + struct worker_task *task = ud; - free_task (task, FALSE); + free_task (task, FALSE); } /* @@ -302,78 +333,89 @@ free_task_soft (gpointer ud) static gboolean read_socket (f_str_t * in, void *arg) { - struct worker_task *task = (struct worker_task *)arg; - struct rspamd_worker_ctx *ctx; - ssize_t r; - - ctx = task->worker->ctx; - switch (task->state) { - case READ_COMMAND: - case READ_HEADER: - if (ctx->is_custom) { - if (! parse_line_custom (task, in)) { - task->last_error = "Read error"; - task->error_code = RSPAMD_NETWORK_ERROR; - task->state = WRITE_ERROR; - } - } - else { - if (read_rspamd_input_line (task, in) != 0) { - task->last_error = "Read error"; - task->error_code = RSPAMD_NETWORK_ERROR; - task->state = WRITE_ERROR; - } - } - if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) { - return write_socket (task); - } - break; - case READ_MESSAGE: - task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); - task->msg->begin = in->begin; - task->msg->len = in->len; - debug_task ("got string of length %ld", (long int)task->msg->len); - r = process_message (task); - if (r == -1) { - msg_warn ("processing of message failed"); - task->last_error = "MIME processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - return write_socket (task); - } - if (task->cmd == CMD_OTHER) { - /* Skip filters */ - task->state = WRITE_REPLY; - return write_socket (task); - } - r = process_filters (task); - if (r == -1) { - task->last_error = "Filter processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - return write_socket (task); - } - else if (r == 0) { - task->state = WAIT_FILTER; - rspamd_dispatcher_pause (task->dispatcher); - } - else { - process_statfiles (task); - lua_call_post_filters (task); - task->state = WRITE_REPLY; - return write_socket (task); - } - break; - case WRITE_REPLY: - case WRITE_ERROR: - return write_socket (task); - break; - default: - debug_task ("invalid state on reading stage"); - break; + struct worker_task *task = (struct worker_task *) arg; + struct rspamd_worker_ctx *ctx; + ssize_t r; + + ctx = task->worker->ctx; + switch (task->state) + { + case READ_COMMAND: + case READ_HEADER: + if (ctx->is_custom) + { + if (!parse_line_custom (task, in)) + { + task->last_error = "Read error"; + task->error_code = RSPAMD_NETWORK_ERROR; + task->state = WRITE_ERROR; + } } - - return TRUE; + else + { + if (read_rspamd_input_line (task, in) != 0) + { + task->last_error = "Read error"; + task->error_code = RSPAMD_NETWORK_ERROR; + task->state = WRITE_ERROR; + } + } + if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) + { + return write_socket (task); + } + break; + case READ_MESSAGE: + task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t)); + task->msg->begin = in->begin; + task->msg->len = in->len; + debug_task ("got string of length %z", task->msg->len); + r = process_message (task); + if (r == -1) + { + msg_warn ("processing of message failed"); + task->last_error = "MIME processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + return write_socket (task); + } + if (task->cmd == CMD_OTHER) + { + /* Skip filters */ + task->state = WRITE_REPLY; + return write_socket (task); + } + r = process_filters (task); + if (r == -1) + { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + return write_socket (task); + } + else if (r == 0) + { + task->state = WAIT_FILTER; + rspamd_dispatcher_pause (task->dispatcher); + } + else + { + process_statfiles (task); + lua_call_post_filters (task); + task->state = WRITE_REPLY; + return write_socket (task); + } + break; + case WRITE_REPLY: + case WRITE_ERROR: + return write_socket (task); + break; + default: + debug_task ("invalid state on reading stage"); + break; + } + + return TRUE; } /* @@ -382,51 +424,58 @@ read_socket (f_str_t * in, void *arg) static gboolean write_socket (void *arg) { - struct worker_task *task = (struct worker_task *)arg; - struct rspamd_worker_ctx *ctx; - - ctx = task->worker->ctx; - - switch (task->state) { - case WRITE_REPLY: - if (! write_reply (task)) { - destroy_session (task->s); - return FALSE; - } - if (ctx->is_custom) { - fin_custom_filters (task); - } - destroy_session (task->s); - return FALSE; - break; - case WRITE_ERROR: - if (! write_reply (task)) { - return FALSE; - } - if (ctx->is_custom) { - fin_custom_filters (task); - } - destroy_session (task->s); - return FALSE; - break; - case CLOSING_CONNECTION: - debug_task ("normally closing connection"); - if (ctx->is_custom) { - fin_custom_filters (task); - } - destroy_session (task->s); - return FALSE; - break; - default: - msg_info ("abnormally closing connection"); - if (ctx->is_custom) { - fin_custom_filters (task); - } - destroy_session (task->s); - return FALSE; - break; + struct worker_task *task = (struct worker_task *) arg; + struct rspamd_worker_ctx *ctx; + + ctx = task->worker->ctx; + + switch (task->state) + { + case WRITE_REPLY: + if (!write_reply (task)) + { + destroy_session (task->s); + return FALSE; + } + if (ctx->is_custom) + { + fin_custom_filters (task); + } + destroy_session (task->s); + return FALSE; + break; + case WRITE_ERROR: + if (!write_reply (task)) + { + return FALSE; + } + if (ctx->is_custom) + { + fin_custom_filters (task); } - return TRUE; + destroy_session (task->s); + return FALSE; + break; + case CLOSING_CONNECTION: + debug_task ("normally closing connection"); + if (ctx->is_custom) + { + fin_custom_filters (task); + } + destroy_session (task->s); + return FALSE; + break; + default: + msg_info ("abnormally closing connection"); + if (ctx->is_custom) + { + fin_custom_filters (task); + } + destroy_session (task->s); + return FALSE; + break; + } + return TRUE; } /* @@ -435,214 +484,251 @@ write_socket (void *arg) static void err_socket (GError * err, void *arg) { - struct worker_task *task = (struct worker_task *)arg; - struct rspamd_worker_ctx *ctx; - - ctx = task->worker->ctx; - msg_info ("abnormally closing connection, error: %s", err->message); - /* Free buffers */ - if (ctx->is_custom) { - fin_custom_filters (task); - } - if (task->state != WRITE_REPLY) { - destroy_session (task->s); - } + struct worker_task *task = (struct worker_task *) arg; + struct rspamd_worker_ctx *ctx; + + ctx = task->worker->ctx; + msg_info ("abnormally closing connection, error: %s", err->message); + /* Free buffers */ + if (ctx->is_custom) + { + fin_custom_filters (task); + } + if (task->state != WRITE_REPLY) + { + destroy_session (task->s); + } } struct worker_task * construct_task (struct rspamd_worker *worker) { - struct worker_task *new_task; + struct worker_task *new_task; - new_task = g_malloc (sizeof (struct worker_task)); + new_task = g_malloc (sizeof (struct worker_task)); - bzero (new_task, sizeof (struct worker_task)); - new_task->worker = worker; - new_task->state = READ_COMMAND; - new_task->cfg = worker->srv->cfg; - new_task->from_addr.s_addr = INADDR_NONE; - new_task->view_checked = FALSE; + bzero (new_task, sizeof (struct worker_task)); + new_task->worker = worker; + new_task->state = READ_COMMAND; + new_task->cfg = worker->srv->cfg; + new_task->from_addr.s_addr = INADDR_NONE; + new_task->view_checked = FALSE; #ifdef HAVE_CLOCK_GETTIME # ifdef HAVE_CLOCK_PROCESS_CPUTIME_ID - clock_gettime (CLOCK_PROCESS_CPUTIME_ID, &new_task->ts); + clock_gettime (CLOCK_PROCESS_CPUTIME_ID, &new_task->ts); # elif defined(HAVE_CLOCK_VIRTUAL) - clock_gettime (CLOCK_VIRTUAL, &new_task->ts); + clock_gettime (CLOCK_VIRTUAL, &new_task->ts); # else - clock_gettime (CLOCK_REALTIME, &new_task->ts); + clock_gettime (CLOCK_REALTIME, &new_task->ts); # endif #endif - if (gettimeofday (&new_task->tv, NULL) == -1) { - msg_warn ("gettimeofday failed: %s", strerror (errno)); - } - - new_task->task_pool = memory_pool_new (memory_pool_get_size ()); - - /* Add destructor for recipients list (it would be better to use anonymous function here */ - memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) rcpt_destruct, new_task); - new_task->results = g_hash_table_new (g_str_hash, g_str_equal); - memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) g_hash_table_destroy, new_task->results); - new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal); - memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) g_hash_table_destroy, new_task->re_cache); - new_task->s = new_async_session (new_task->task_pool, free_task_hard, new_task); - new_task->sock = -1; - new_task->is_mime = TRUE; - - return new_task; + if (gettimeofday (&new_task->tv, NULL) == -1) + { + msg_warn ("gettimeofday failed: %s", strerror (errno)); + } + + new_task->task_pool = memory_pool_new (memory_pool_get_size ()); + + /* Add destructor for recipients list (it would be better to use anonymous function here */ + memory_pool_add_destructor (new_task->task_pool, + (pool_destruct_func) rcpt_destruct, new_task); + new_task->results = g_hash_table_new (g_str_hash, g_str_equal); + memory_pool_add_destructor (new_task->task_pool, + (pool_destruct_func) g_hash_table_destroy, + new_task->results); + new_task->re_cache = g_hash_table_new (g_str_hash, g_str_equal); + memory_pool_add_destructor (new_task->task_pool, + (pool_destruct_func) g_hash_table_destroy, + new_task->re_cache); + new_task->s = + new_async_session (new_task->task_pool, free_task_hard, new_task); + new_task->sock = -1; + new_task->is_mime = TRUE; + + return new_task; } /* * Accept new connection and construct task */ static void -accept_socket (int fd, short what, void *arg) +accept_socket (gint fd, short what, void *arg) { - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - struct rspamd_worker_ctx *ctx; - union sa_union su; - struct worker_task *new_task; - GList *cur; - struct custom_filter *filt; - - socklen_t addrlen = sizeof (su.ss); - int nfd; - - ctx = worker->ctx; - if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { - msg_warn ("accept failed: %s", strerror (errno)); - return; - } - /* Check for EAGAIN */ - if (nfd == 0) { - return; - } - - new_task = construct_task (worker); - - if (su.ss.ss_family == AF_UNIX) { - msg_info ("accepted connection from unix socket"); - new_task->client_addr.s_addr = INADDR_NONE; - } - else if (su.ss.ss_family == AF_INET) { - msg_info ("accepted connection from %s port %d", inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port)); - memcpy (&new_task->client_addr, &su.s4.sin_addr, sizeof (struct in_addr)); - } - - new_task->sock = nfd; - new_task->is_mime = ctx->is_mime; - worker->srv->stat->connections_count++; - new_task->resolver = ctx->resolver; - ctx->io_tv.tv_sec = WORKER_IO_TIMEOUT; - ctx->io_tv.tv_usec = 0; - - /* Set up dispatcher */ - new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &ctx->io_tv, (void *)new_task); - new_task->dispatcher->peer_addr = new_task->client_addr.s_addr; - - /* Init custom filters */ -#ifndef BUILD_STATIC - if (ctx->is_custom) { - cur = ctx->custom_filters; - while (cur) { - filt = cur->data; - if (filt->before_connect) { - /* XXX: maybe not use rcpt list here for custom filters data, but this can save some bytes in task structure */ - new_task->rcpt = g_list_prepend (new_task->rcpt, filt->before_connect ()); - } - cur = g_list_next (cur); - } - /* Keep user data in the same order as custom filters */ - new_task->rcpt = g_list_reverse (new_task->rcpt); + struct rspamd_worker *worker = (struct rspamd_worker *) arg; + struct rspamd_worker_ctx *ctx; + union sa_union su; + struct worker_task *new_task; + GList *cur; + struct custom_filter *filt; + + socklen_t addrlen = sizeof (su.ss); + gint nfd; + + ctx = worker->ctx; + if ((nfd = + accept_from_socket (fd, (struct sockaddr *) &su.ss, &addrlen)) == -1) + { + msg_warn ("accept failed: %s", strerror (errno)); + return; + } + /* Check for EAGAIN */ + if (nfd == 0) + { + return; + } + + new_task = construct_task (worker); + + if (su.ss.ss_family == AF_UNIX) + { + msg_info ("accepted connection from unix socket"); + new_task->client_addr.s_addr = INADDR_NONE; + } + else if (su.ss.ss_family == AF_INET) + { + msg_info ("accepted connection from %s port %d", + inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port)); + memcpy (&new_task->client_addr, &su.s4.sin_addr, + sizeof (struct in_addr)); + } + + new_task->sock = nfd; + new_task->is_mime = ctx->is_mime; + worker->srv->stat->connections_count++; + new_task->resolver = ctx->resolver; + ctx->io_tv.tv_sec = WORKER_IO_TIMEOUT; + ctx->io_tv.tv_usec = 0; + + /* Set up dispatcher */ + new_task->dispatcher = + rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, + err_socket, &ctx->io_tv, (void *) new_task); + new_task->dispatcher->peer_addr = new_task->client_addr.s_addr; + + /* Init custom filters */ +#ifndef BUILD_STATIC + if (ctx->is_custom) + { + cur = ctx->custom_filters; + while (cur) + { + filt = cur->data; + if (filt->before_connect) + { + /* XXX: maybe not use rcpt list here for custom filters data, but this can save some bytes in task structure */ + new_task->rcpt = + g_list_prepend (new_task->rcpt, filt->before_connect ()); + } + cur = g_list_next (cur); } + /* Keep user data in the same order as custom filters */ + new_task->rcpt = g_list_reverse (new_task->rcpt); + } #endif } #ifndef BUILD_STATIC -static gboolean -load_custom_filter (struct config_file *cfg, const char *file, struct rspamd_worker_ctx *ctx) +static gboolean +load_custom_filter (struct config_file *cfg, const gchar * file, + struct rspamd_worker_ctx *ctx) { - struct custom_filter *filt; - struct stat st; - - if (stat (file, &st) == -1 || !S_ISREG (st.st_mode)) { - msg_info ("stat failed for %s", file); - return FALSE; - } - - filt = g_malloc (sizeof (struct custom_filter)); - - filt->handle = g_module_open (file, G_MODULE_BIND_LAZY); - if (!filt->handle) { - msg_info ("module load failed: %s", g_module_error ()); - g_free (filt); - return FALSE; - } - - /* Now extract functions from custom module */ - if (!g_module_symbol (filt->handle, MODULE_INIT_FUNC, (gpointer *)&filt->init_func) || - !g_module_symbol (filt->handle, MODULE_FINIT_FUNC, (gpointer *)&filt->fin_func) || - !g_module_symbol (filt->handle, MODULE_BEFORE_CONNECT_FUNC, (gpointer *)&filt->before_connect) || - !g_module_symbol (filt->handle, MODULE_AFTER_CONNECT_FUNC, (gpointer *)&filt->after_connect) || - !g_module_symbol (filt->handle, MODULE_PARSE_LINE_FUNC, (gpointer *)&filt->process_line)) { - - msg_info ("cannot find handlers in module %s: %s", file, g_module_error ()); - g_free (filt); - return FALSE; - } - - filt->init_func (cfg); - filt->filename = g_strdup (file); - ctx->custom_filters = g_list_prepend (ctx->custom_filters, filt); - - return TRUE; + struct custom_filter *filt; + struct stat st; + + if (stat (file, &st) == -1 || !S_ISREG (st.st_mode)) + { + msg_info ("stat failed for %s", file); + return FALSE; + } + + filt = g_malloc (sizeof (struct custom_filter)); + + filt->handle = g_module_open (file, G_MODULE_BIND_LAZY); + if (!filt->handle) + { + msg_info ("module load failed: %s", g_module_error ()); + g_free (filt); + return FALSE; + } + + /* Now extract functions from custom module */ + if (!g_module_symbol + (filt->handle, MODULE_INIT_FUNC, (gpointer *) & filt->init_func) + || !g_module_symbol (filt->handle, MODULE_FINIT_FUNC, + (gpointer *) & filt->fin_func) + || !g_module_symbol (filt->handle, MODULE_BEFORE_CONNECT_FUNC, + (gpointer *) & filt->before_connect) + || !g_module_symbol (filt->handle, MODULE_AFTER_CONNECT_FUNC, + (gpointer *) & filt->after_connect) + || !g_module_symbol (filt->handle, MODULE_PARSE_LINE_FUNC, + (gpointer *) & filt->process_line)) + { + + msg_info ("cannot find handlers in module %s: %s", file, + g_module_error ()); + g_free (filt); + return FALSE; + } + + filt->init_func (cfg); + filt->filename = g_strdup (file); + ctx->custom_filters = g_list_prepend (ctx->custom_filters, filt); + + return TRUE; } /* * Load custom filters from specified path */ -static gboolean -load_custom_filters (struct rspamd_worker *worker, const char *path) +static gboolean +load_custom_filters (struct rspamd_worker *worker, const gchar * path) { - glob_t gp; - int r, i; - struct rspamd_worker_ctx *ctx = worker->ctx; - - gp.gl_offs = 0; - if ((r = glob (path, GLOB_NOSORT, NULL, &gp)) != 0) { - msg_warn ("glob failed: %s, %d", strerror (errno), r); - return FALSE; - } - - for (i = 0; i < gp.gl_pathc; i ++) { - if (! load_custom_filter (worker->srv->cfg, gp.gl_pathv[i], ctx)) { - globfree (&gp); - return FALSE; - } + glob_t gp; + gint r, i; + struct rspamd_worker_ctx *ctx = worker->ctx; + + gp.gl_offs = 0; + if ((r = glob (path, GLOB_NOSORT, NULL, &gp)) != 0) + { + msg_warn ("glob failed: %s, %d", strerror (errno), r); + return FALSE; + } + + for (i = 0; i < gp.gl_pathc; i++) + { + if (!load_custom_filter (worker->srv->cfg, gp.gl_pathv[i], ctx)) + { + globfree (&gp); + return FALSE; } + } - globfree (&gp); + globfree (&gp); - return TRUE; + return TRUE; } static void unload_custom_filters (struct rspamd_worker_ctx *ctx) { - GList *cur; - struct custom_filter *filt; - - cur = ctx->custom_filters; - while (cur) { - filt = cur->data; - if (filt->fin_func) { - filt->fin_func (); - } - g_module_close (filt->handle); - g_free (filt); - cur = g_list_next (cur); + GList *cur; + struct custom_filter *filt; + + cur = ctx->custom_filters; + while (cur) + { + filt = cur->data; + if (filt->fin_func) + { + filt->fin_func (); } + g_module_close (filt->handle); + g_free (filt); + cur = g_list_next (cur); + } - g_list_free (ctx->custom_filters); + g_list_free (ctx->custom_filters); } #endif @@ -653,72 +739,82 @@ unload_custom_filters (struct rspamd_worker_ctx *ctx) void start_worker (struct rspamd_worker *worker) { - struct sigaction signals; - char *is_mime_str; - char *is_custom_str; - struct rspamd_worker_ctx *ctx; + struct sigaction signals; + gchar *is_mime_str; + gchar *is_custom_str; + struct rspamd_worker_ctx *ctx; #ifdef WITH_PROFILER - extern void _start (void), etext (void); - monstartup ((u_long) & _start, (u_long) & etext); + extern void _start (void), etext (void); + monstartup ((u_long) & _start, (u_long) & etext); #endif - gperf_profiler_init (worker->srv->cfg, "worker"); + gperf_profiler_init (worker->srv->cfg, "worker"); - worker->srv->pid = getpid (); + worker->srv->pid = getpid (); - event_init (); + event_init (); - init_signals (&signals, sig_handler); - sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + init_signals (&signals, sig_handler); + sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); - /* SIGUSR2 handler */ - signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker); - signal_add (&worker->sig_ev, NULL); + /* SIGUSR2 handler */ + signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker); + signal_add (&worker->sig_ev, NULL); - /* Accept event */ - event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); - event_add (&worker->bind_ev, NULL); + /* Accept event */ + event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, + accept_socket, (void *) worker); + event_add (&worker->bind_ev, NULL); - /* Fill ctx */ - ctx = g_malloc0 (sizeof (struct rspamd_worker_ctx)); - worker->ctx = ctx; + /* Fill ctx */ + ctx = g_malloc0 (sizeof (struct rspamd_worker_ctx)); + worker->ctx = ctx; -#ifndef BUILD_STATIC - /* Check if this worker is not usual rspamd worker, but uses custom filters from specified path */ - is_custom_str = g_hash_table_lookup (worker->cf->params, "custom_filters"); - if (is_custom_str && g_module_supported () && load_custom_filters (worker, is_custom_str)) { - msg_info ("starting custom process, loaded modules from %s", is_custom_str); - ctx->is_custom = TRUE; - } - else { +#ifndef BUILD_STATIC + /* Check if this worker is not usual rspamd worker, but uses custom filters from specified path */ + is_custom_str = g_hash_table_lookup (worker->cf->params, "custom_filters"); + if (is_custom_str && g_module_supported () + && load_custom_filters (worker, is_custom_str)) + { + msg_info ("starting custom process, loaded modules from %s", + is_custom_str); + ctx->is_custom = TRUE; + } + else + { #endif - /* Maps events */ - start_map_watch (); - /* Check whether we are mime worker */ - is_mime_str = g_hash_table_lookup (worker->cf->params, "mime"); - if (is_mime_str != NULL && (g_ascii_strcasecmp (is_mime_str, "no") == 0 || g_ascii_strcasecmp (is_mime_str, "false") == 0)) { - ctx->is_mime = FALSE; - } - else { - ctx->is_mime = TRUE; - } -#ifndef BUILD_STATIC + /* Maps events */ + start_map_watch (); + /* Check whether we are mime worker */ + is_mime_str = g_hash_table_lookup (worker->cf->params, "mime"); + if (is_mime_str != NULL + && (g_ascii_strcasecmp (is_mime_str, "no") == 0 + || g_ascii_strcasecmp (is_mime_str, "false") == 0)) + { + ctx->is_mime = FALSE; } + else + { + ctx->is_mime = TRUE; + } +#ifndef BUILD_STATIC + } #endif - ctx->resolver = dns_resolver_init (worker->srv->cfg); + ctx->resolver = dns_resolver_init (worker->srv->cfg); - event_loop (0); - -#ifndef BUILD_STATIC - if (ctx->is_custom) { - unload_custom_filters (ctx); - } + event_loop (0); + +#ifndef BUILD_STATIC + if (ctx->is_custom) + { + unload_custom_filters (ctx); + } #endif - close_log (); - exit (EXIT_SUCCESS); + close_log (); + exit (EXIT_SUCCESS); } /* |