diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2010-07-08 20:07:07 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2010-07-08 20:07:07 +0400 |
commit | 3d1c40c972d68623f88875ec03ae7c8bafbadad5 (patch) | |
tree | 75a34069f368ebb52b47e8c3f605dcde1de3e9cd /src/worker.c | |
parent | 75bf13b9bda0d1eb98671b68064becd4f6946c14 (diff) | |
download | rspamd-3d1c40c972d68623f88875ec03ae7c8bafbadad5.tar.gz rspamd-3d1c40c972d68623f88875ec03ae7c8bafbadad5.zip |
* Make DNS resolver working
* Many improvements to rspamd test suite: now it CAN be used for testing rspamd functionality
* Write DNS resolver tests
* Fix issues with memory_pool mutexes and with creating of statfiles
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 93 |
1 files changed, 61 insertions, 32 deletions
diff --git a/src/worker.c b/src/worker.c index 02077d5c9..11bb24867 100644 --- a/src/worker.c +++ b/src/worker.c @@ -36,6 +36,7 @@ #include "modules.h" #include "message.h" #include "map.h" +#include "dns.h" #include "evdns/evdns.h" @@ -70,13 +71,19 @@ struct custom_filter { #endif -static struct timeval io_tv; -/* Detect whether this worker is mime worker */ -static gboolean is_mime; - -/* Detect whether this worker bypass normal filters and is using custom filters */ -static gboolean is_custom; -static GList *custom_filters; +/* + * Worker's context + */ +struct rspamd_worker_ctx { + struct timeval io_tv; + /* Detect whether this worker is mime worker */ + gboolean is_mime; + /* Detect whether this worker is mime worker */ + gboolean is_custom; + GList *custom_filters; + /* DNS resolver */ + struct rspamd_dns_resolver *resolver; +}; static gboolean write_socket (void *arg); @@ -150,8 +157,9 @@ fin_custom_filters (struct worker_task *task) GList *cur, *curd; struct custom_filter *filt; char *output = NULL, *log = NULL; + struct rspamd_worker_ctx *ctx = task->worker->ctx; - cur = custom_filters; + cur = ctx->custom_filters; curd = task->rcpt; while (cur) { filt = cur->data; @@ -183,8 +191,9 @@ parse_line_custom (struct worker_task *task, f_str_t *in) struct custom_filter *filt; char *output = NULL; gboolean res = TRUE; + struct rspamd_worker_ctx *ctx = task->worker->ctx; - cur = custom_filters; + cur = ctx->custom_filters; curd = task->rcpt; while (cur) { filt = cur->data; @@ -280,12 +289,14 @@ static gboolean read_socket (f_str_t * in, void *arg) { struct worker_task *task = (struct worker_task *)arg; + struct rspamd_worker_ctx *ctx; ssize_t r; + ctx = task->worker->ctx; switch (task->state) { case READ_COMMAND: case READ_HEADER: - if (is_custom) { + if (ctx->is_custom) { if (! parse_line_custom (task, in)) { task->last_error = "Read error"; task->error_code = RSPAMD_NETWORK_ERROR; @@ -352,13 +363,16 @@ static gboolean write_socket (void *arg) { struct worker_task *task = (struct worker_task *)arg; + struct rspamd_worker_ctx *ctx; + + ctx = task->worker->ctx; switch (task->state) { case WRITE_REPLY: if (! write_reply (task)) { return FALSE; } - if (is_custom) { + if (ctx->is_custom) { fin_custom_filters (task); } destroy_session (task->s); @@ -368,7 +382,7 @@ write_socket (void *arg) if (! write_reply (task)) { return FALSE; } - if (is_custom) { + if (ctx->is_custom) { fin_custom_filters (task); } destroy_session (task->s); @@ -376,7 +390,7 @@ write_socket (void *arg) break; case CLOSING_CONNECTION: debug_task ("normally closing connection"); - if (is_custom) { + if (ctx->is_custom) { fin_custom_filters (task); } destroy_session (task->s); @@ -384,7 +398,7 @@ write_socket (void *arg) break; default: msg_info ("abnormally closing connection"); - if (is_custom) { + if (ctx->is_custom) { fin_custom_filters (task); } destroy_session (task->s); @@ -401,9 +415,12 @@ static void err_socket (GError * err, void *arg) { struct worker_task *task = (struct worker_task *)arg; + struct rspamd_worker_ctx *ctx; + + ctx = task->worker->ctx; msg_info ("abnormally closing connection, error: %s", err->message); /* Free buffers */ - if (is_custom) { + if (ctx->is_custom) { fin_custom_filters (task); } destroy_session (task->s); @@ -434,8 +451,7 @@ construct_task (struct rspamd_worker *worker) if (gettimeofday (&new_task->tv, NULL) == -1) { msg_warn ("gettimeofday failed: %s", strerror (errno)); } - io_tv.tv_sec = WORKER_IO_TIMEOUT; - io_tv.tv_usec = 0; + new_task->task_pool = memory_pool_new (memory_pool_get_size ()); /* Add destructor for recipients list (it would be better to use anonymous function here */ @@ -458,6 +474,7 @@ static void accept_socket (int fd, short what, void *arg) { struct rspamd_worker *worker = (struct rspamd_worker *)arg; + struct rspamd_worker_ctx *ctx; union sa_union su; struct worker_task *new_task; GList *cur; @@ -466,6 +483,7 @@ accept_socket (int fd, short what, void *arg) socklen_t addrlen = sizeof (su.ss); int nfd; + ctx = worker->ctx; if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { msg_warn ("accept failed: %s", strerror (errno)); return; @@ -487,17 +505,20 @@ accept_socket (int fd, short what, void *arg) } new_task->sock = nfd; - new_task->is_mime = is_mime; + new_task->is_mime = ctx->is_mime; worker->srv->stat->connections_count++; + new_task->resolver = ctx->resolver; + ctx->io_tv.tv_sec = WORKER_IO_TIMEOUT; + ctx->io_tv.tv_usec = 0; /* Set up dispatcher */ - new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &io_tv, (void *)new_task); + new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &ctx->io_tv, (void *)new_task); new_task->dispatcher->peer_addr = new_task->client_addr.s_addr; /* Init custom filters */ #ifndef BUILD_STATIC - if (is_custom) { - cur = custom_filters; + if (ctx->is_custom) { + cur = ctx->custom_filters; while (cur) { filt = cur->data; if (filt->before_connect) { @@ -515,7 +536,7 @@ accept_socket (int fd, short what, void *arg) #ifndef BUILD_STATIC static gboolean -load_custom_filter (struct config_file *cfg, const char *file) +load_custom_filter (struct config_file *cfg, const char *file, struct rspamd_worker_ctx *ctx) { struct custom_filter *filt; struct stat st; @@ -548,7 +569,7 @@ load_custom_filter (struct config_file *cfg, const char *file) filt->init_func (cfg); filt->filename = g_strdup (file); - custom_filters = g_list_prepend (custom_filters, filt); + ctx->custom_filters = g_list_prepend (ctx->custom_filters, filt); return TRUE; } @@ -561,6 +582,7 @@ load_custom_filters (struct rspamd_worker *worker, const char *path) { glob_t gp; int r, i; + struct rspamd_worker_ctx *ctx = worker->ctx; gp.gl_offs = 0; if ((r = glob (path, GLOB_NOSORT, NULL, &gp)) != 0) { @@ -569,7 +591,7 @@ load_custom_filters (struct rspamd_worker *worker, const char *path) } for (i = 0; i < gp.gl_pathc; i ++) { - if (! load_custom_filter (worker->srv->cfg, gp.gl_pathv[i])) { + if (! load_custom_filter (worker->srv->cfg, gp.gl_pathv[i], ctx)) { globfree (&gp); return FALSE; } @@ -581,12 +603,12 @@ load_custom_filters (struct rspamd_worker *worker, const char *path) } static void -unload_custom_filters (void) +unload_custom_filters (struct rspamd_worker_ctx *ctx) { GList *cur; struct custom_filter *filt; - cur = custom_filters; + cur = ctx->custom_filters; while (cur) { filt = cur->data; if (filt->fin_func) { @@ -597,7 +619,7 @@ unload_custom_filters (void) cur = g_list_next (cur); } - g_list_free (custom_filters); + g_list_free (ctx->custom_filters); } #endif @@ -611,6 +633,7 @@ start_worker (struct rspamd_worker *worker) struct sigaction signals; char *is_mime_str; char *is_custom_str; + struct rspamd_worker_ctx *ctx; #ifdef WITH_PROFILER extern void _start (void), etext (void); @@ -635,12 +658,16 @@ start_worker (struct rspamd_worker *worker) event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); event_add (&worker->bind_ev, NULL); + /* Fill ctx */ + ctx = g_malloc0 (sizeof (struct rspamd_worker_ctx)); + worker->ctx = ctx; + #ifndef BUILD_STATIC /* Check if this worker is not usual rspamd worker, but uses custom filters from specified path */ is_custom_str = g_hash_table_lookup (worker->cf->params, "custom_filters"); if (is_custom_str && g_module_supported () && load_custom_filters (worker, is_custom_str)) { msg_info ("starting custom process, loaded modules from %s", is_custom_str); - is_custom = TRUE; + ctx->is_custom = TRUE; } else { #endif @@ -649,20 +676,22 @@ start_worker (struct rspamd_worker *worker) /* Check whether we are mime worker */ is_mime_str = g_hash_table_lookup (worker->cf->params, "mime"); if (is_mime_str != NULL && (g_ascii_strcasecmp (is_mime_str, "no") == 0 || g_ascii_strcasecmp (is_mime_str, "false") == 0)) { - is_mime = FALSE; + ctx->is_mime = FALSE; } else { - is_mime = TRUE; + ctx->is_mime = TRUE; } #ifndef BUILD_STATIC } #endif + ctx->resolver = dns_resolver_init (worker->srv->cfg); + event_loop (0); #ifndef BUILD_STATIC - if (is_custom) { - unload_custom_filters (); + if (ctx->is_custom) { + unload_custom_filters (ctx); } #endif |