aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2010-07-08 20:07:07 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2010-07-08 20:07:07 +0400
commit3d1c40c972d68623f88875ec03ae7c8bafbadad5 (patch)
tree75a34069f368ebb52b47e8c3f605dcde1de3e9cd /src/worker.c
parent75bf13b9bda0d1eb98671b68064becd4f6946c14 (diff)
downloadrspamd-3d1c40c972d68623f88875ec03ae7c8bafbadad5.tar.gz
rspamd-3d1c40c972d68623f88875ec03ae7c8bafbadad5.zip
* Make DNS resolver working
* Many improvements to rspamd test suite: now it CAN be used for testing rspamd functionality * Write DNS resolver tests * Fix issues with memory_pool mutexes and with creating of statfiles
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c93
1 files changed, 61 insertions, 32 deletions
diff --git a/src/worker.c b/src/worker.c
index 02077d5c9..11bb24867 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -36,6 +36,7 @@
#include "modules.h"
#include "message.h"
#include "map.h"
+#include "dns.h"
#include "evdns/evdns.h"
@@ -70,13 +71,19 @@ struct custom_filter {
#endif
-static struct timeval io_tv;
-/* Detect whether this worker is mime worker */
-static gboolean is_mime;
-
-/* Detect whether this worker bypass normal filters and is using custom filters */
-static gboolean is_custom;
-static GList *custom_filters;
+/*
+ * Worker's context
+ */
+struct rspamd_worker_ctx {
+ struct timeval io_tv;
+ /* Detect whether this worker is mime worker */
+ gboolean is_mime;
+ /* Detect whether this worker is mime worker */
+ gboolean is_custom;
+ GList *custom_filters;
+ /* DNS resolver */
+ struct rspamd_dns_resolver *resolver;
+};
static gboolean write_socket (void *arg);
@@ -150,8 +157,9 @@ fin_custom_filters (struct worker_task *task)
GList *cur, *curd;
struct custom_filter *filt;
char *output = NULL, *log = NULL;
+ struct rspamd_worker_ctx *ctx = task->worker->ctx;
- cur = custom_filters;
+ cur = ctx->custom_filters;
curd = task->rcpt;
while (cur) {
filt = cur->data;
@@ -183,8 +191,9 @@ parse_line_custom (struct worker_task *task, f_str_t *in)
struct custom_filter *filt;
char *output = NULL;
gboolean res = TRUE;
+ struct rspamd_worker_ctx *ctx = task->worker->ctx;
- cur = custom_filters;
+ cur = ctx->custom_filters;
curd = task->rcpt;
while (cur) {
filt = cur->data;
@@ -280,12 +289,14 @@ static gboolean
read_socket (f_str_t * in, void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
+ struct rspamd_worker_ctx *ctx;
ssize_t r;
+ ctx = task->worker->ctx;
switch (task->state) {
case READ_COMMAND:
case READ_HEADER:
- if (is_custom) {
+ if (ctx->is_custom) {
if (! parse_line_custom (task, in)) {
task->last_error = "Read error";
task->error_code = RSPAMD_NETWORK_ERROR;
@@ -352,13 +363,16 @@ static gboolean
write_socket (void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
+ struct rspamd_worker_ctx *ctx;
+
+ ctx = task->worker->ctx;
switch (task->state) {
case WRITE_REPLY:
if (! write_reply (task)) {
return FALSE;
}
- if (is_custom) {
+ if (ctx->is_custom) {
fin_custom_filters (task);
}
destroy_session (task->s);
@@ -368,7 +382,7 @@ write_socket (void *arg)
if (! write_reply (task)) {
return FALSE;
}
- if (is_custom) {
+ if (ctx->is_custom) {
fin_custom_filters (task);
}
destroy_session (task->s);
@@ -376,7 +390,7 @@ write_socket (void *arg)
break;
case CLOSING_CONNECTION:
debug_task ("normally closing connection");
- if (is_custom) {
+ if (ctx->is_custom) {
fin_custom_filters (task);
}
destroy_session (task->s);
@@ -384,7 +398,7 @@ write_socket (void *arg)
break;
default:
msg_info ("abnormally closing connection");
- if (is_custom) {
+ if (ctx->is_custom) {
fin_custom_filters (task);
}
destroy_session (task->s);
@@ -401,9 +415,12 @@ static void
err_socket (GError * err, void *arg)
{
struct worker_task *task = (struct worker_task *)arg;
+ struct rspamd_worker_ctx *ctx;
+
+ ctx = task->worker->ctx;
msg_info ("abnormally closing connection, error: %s", err->message);
/* Free buffers */
- if (is_custom) {
+ if (ctx->is_custom) {
fin_custom_filters (task);
}
destroy_session (task->s);
@@ -434,8 +451,7 @@ construct_task (struct rspamd_worker *worker)
if (gettimeofday (&new_task->tv, NULL) == -1) {
msg_warn ("gettimeofday failed: %s", strerror (errno));
}
- io_tv.tv_sec = WORKER_IO_TIMEOUT;
- io_tv.tv_usec = 0;
+
new_task->task_pool = memory_pool_new (memory_pool_get_size ());
/* Add destructor for recipients list (it would be better to use anonymous function here */
@@ -458,6 +474,7 @@ static void
accept_socket (int fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct rspamd_worker_ctx *ctx;
union sa_union su;
struct worker_task *new_task;
GList *cur;
@@ -466,6 +483,7 @@ accept_socket (int fd, short what, void *arg)
socklen_t addrlen = sizeof (su.ss);
int nfd;
+ ctx = worker->ctx;
if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
msg_warn ("accept failed: %s", strerror (errno));
return;
@@ -487,17 +505,20 @@ accept_socket (int fd, short what, void *arg)
}
new_task->sock = nfd;
- new_task->is_mime = is_mime;
+ new_task->is_mime = ctx->is_mime;
worker->srv->stat->connections_count++;
+ new_task->resolver = ctx->resolver;
+ ctx->io_tv.tv_sec = WORKER_IO_TIMEOUT;
+ ctx->io_tv.tv_usec = 0;
/* Set up dispatcher */
- new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &io_tv, (void *)new_task);
+ new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &ctx->io_tv, (void *)new_task);
new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
/* Init custom filters */
#ifndef BUILD_STATIC
- if (is_custom) {
- cur = custom_filters;
+ if (ctx->is_custom) {
+ cur = ctx->custom_filters;
while (cur) {
filt = cur->data;
if (filt->before_connect) {
@@ -515,7 +536,7 @@ accept_socket (int fd, short what, void *arg)
#ifndef BUILD_STATIC
static gboolean
-load_custom_filter (struct config_file *cfg, const char *file)
+load_custom_filter (struct config_file *cfg, const char *file, struct rspamd_worker_ctx *ctx)
{
struct custom_filter *filt;
struct stat st;
@@ -548,7 +569,7 @@ load_custom_filter (struct config_file *cfg, const char *file)
filt->init_func (cfg);
filt->filename = g_strdup (file);
- custom_filters = g_list_prepend (custom_filters, filt);
+ ctx->custom_filters = g_list_prepend (ctx->custom_filters, filt);
return TRUE;
}
@@ -561,6 +582,7 @@ load_custom_filters (struct rspamd_worker *worker, const char *path)
{
glob_t gp;
int r, i;
+ struct rspamd_worker_ctx *ctx = worker->ctx;
gp.gl_offs = 0;
if ((r = glob (path, GLOB_NOSORT, NULL, &gp)) != 0) {
@@ -569,7 +591,7 @@ load_custom_filters (struct rspamd_worker *worker, const char *path)
}
for (i = 0; i < gp.gl_pathc; i ++) {
- if (! load_custom_filter (worker->srv->cfg, gp.gl_pathv[i])) {
+ if (! load_custom_filter (worker->srv->cfg, gp.gl_pathv[i], ctx)) {
globfree (&gp);
return FALSE;
}
@@ -581,12 +603,12 @@ load_custom_filters (struct rspamd_worker *worker, const char *path)
}
static void
-unload_custom_filters (void)
+unload_custom_filters (struct rspamd_worker_ctx *ctx)
{
GList *cur;
struct custom_filter *filt;
- cur = custom_filters;
+ cur = ctx->custom_filters;
while (cur) {
filt = cur->data;
if (filt->fin_func) {
@@ -597,7 +619,7 @@ unload_custom_filters (void)
cur = g_list_next (cur);
}
- g_list_free (custom_filters);
+ g_list_free (ctx->custom_filters);
}
#endif
@@ -611,6 +633,7 @@ start_worker (struct rspamd_worker *worker)
struct sigaction signals;
char *is_mime_str;
char *is_custom_str;
+ struct rspamd_worker_ctx *ctx;
#ifdef WITH_PROFILER
extern void _start (void), etext (void);
@@ -635,12 +658,16 @@ start_worker (struct rspamd_worker *worker)
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
event_add (&worker->bind_ev, NULL);
+ /* Fill ctx */
+ ctx = g_malloc0 (sizeof (struct rspamd_worker_ctx));
+ worker->ctx = ctx;
+
#ifndef BUILD_STATIC
/* Check if this worker is not usual rspamd worker, but uses custom filters from specified path */
is_custom_str = g_hash_table_lookup (worker->cf->params, "custom_filters");
if (is_custom_str && g_module_supported () && load_custom_filters (worker, is_custom_str)) {
msg_info ("starting custom process, loaded modules from %s", is_custom_str);
- is_custom = TRUE;
+ ctx->is_custom = TRUE;
}
else {
#endif
@@ -649,20 +676,22 @@ start_worker (struct rspamd_worker *worker)
/* Check whether we are mime worker */
is_mime_str = g_hash_table_lookup (worker->cf->params, "mime");
if (is_mime_str != NULL && (g_ascii_strcasecmp (is_mime_str, "no") == 0 || g_ascii_strcasecmp (is_mime_str, "false") == 0)) {
- is_mime = FALSE;
+ ctx->is_mime = FALSE;
}
else {
- is_mime = TRUE;
+ ctx->is_mime = TRUE;
}
#ifndef BUILD_STATIC
}
#endif
+ ctx->resolver = dns_resolver_init (worker->srv->cfg);
+
event_loop (0);
#ifndef BUILD_STATIC
- if (is_custom) {
- unload_custom_filters ();
+ if (ctx->is_custom) {
+ unload_custom_filters (ctx);
}
#endif