diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-02-20 18:49:04 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-02-20 18:49:04 +0300 |
commit | 606128de4cb33a2727d6609df46ecf0c72006a73 (patch) | |
tree | 8a12200c73b1ca0446b6f855beb46546ffbd8a81 | |
parent | bcece60fa1bfd4bbb09a64c058835fe3245e1d18 (diff) | |
download | rspamd-606128de4cb33a2727d6609df46ecf0c72006a73.tar.gz rspamd-606128de4cb33a2727d6609df46ecf0c72006a73.zip |
* Fix dispatcher timeouts handling
* Add wanna_die flag that can be used in dispatcher's callbacks
-rw-r--r-- | src/buffer.c | 44 | ||||
-rw-r--r-- | src/buffer.h | 1 | ||||
-rw-r--r-- | src/controller.c | 8 | ||||
-rw-r--r-- | src/protocol.c | 2 | ||||
-rw-r--r-- | src/worker.c | 17 |
5 files changed, 57 insertions, 15 deletions
diff --git a/src/buffer.c b/src/buffer.c index 46ef98850..268b6cbb1 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -24,6 +24,7 @@ #include "config.h" #include "buffer.h" +#include "main.h" #define G_DISPATCHER_ERROR dispatcher_error_quark() @@ -57,7 +58,6 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d) cur = g_list_next (cur); continue; } - r = write (fd, buf->pos, BUFREMAIN (buf)); if (r == -1 && errno != EAGAIN) { if (d->err_callback) { @@ -70,6 +70,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d) buf->pos += r; if (BUFREMAIN (buf) != 0) { /* Continue with this buffer */ + msg_debug ("write_buffers: wrote %ld bytes of %ld", (long int)r, (long int)buf->data->len); continue; } } @@ -83,6 +84,9 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d) } else if (errno == EAGAIN) { /* Wait for other event */ + event_del (d->ev); + event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); return; } cur = g_list_next (cur); @@ -93,23 +97,37 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d) g_list_free (d->out_buffers); d->out_buffers = NULL; + msg_debug ("write_buffers: all buffers were written successfully"); + if (d->write_callback) { d->write_callback (d->user_data); + if (d->wanna_die) { + msg_debug ("write_buffers: callback set wanna_die flag, terminating"); + rspamd_remove_dispatcher (d); + return; + } } - + event_del (d->ev); event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); event_add (d->ev, d->tv); } + else { + /* Plan other write event */ + event_del (d->ev); + event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } } static void read_buffers (int fd, rspamd_io_dispatcher_t *d) { - ssize_t r, len; + ssize_t r; GError *err; f_str_t res; char *c; + unsigned int len; if (d->in_buf == NULL) { d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); @@ -170,12 +188,17 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d) res.len = r; if (d->read_callback) { d->read_callback (&res, d->user_data); + if (d->wanna_die) { + msg_debug ("read_buffers: callback set wanna_die flag, terminating"); + rspamd_remove_dispatcher (d); + return; + } if (r < len - 1 && *(c + 1) == '\n') { r ++; c ++; } /* Move remaining string to begin of buffer (draining) */ - memmove (d->in_buf->data->begin, c, len - r); + memmove (d->in_buf->data->begin, c + 1, len - r - 1); c = d->in_buf->data->begin; d->in_buf->data->len -= r + 1; d->in_buf->pos -= r + 1; @@ -274,7 +297,7 @@ rspamd_create_dispatcher (int fd, enum io_policy policy, new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event)); new->fd = fd; - event_set (new->ev, fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)new); + event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new); event_add (new->ev, new->tv); return new; @@ -328,18 +351,19 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t *d, rspamd_buffer_t *newbuf; newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); - newbuf->data = memory_pool_alloc (d->pool, sizeof (f_str_t)); - - newbuf->data->begin = memory_pool_alloc (d->pool, len); + newbuf->data = fstralloc (d->pool, len); + + /* We need to copy data to temporary internal buffer to avoid using of stack variables */ memcpy (newbuf->data->begin, data, len); - newbuf->data->size = len; newbuf->pos = newbuf->data->begin; + newbuf->data->len = len; d->out_buffers = g_list_prepend (d->out_buffers, newbuf); if (!delayed) { + msg_debug ("rspamd_dispatcher_write: plan write event"); event_del (d->ev); - event_set (d->ev, d->fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)d); + event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); event_add (d->ev, d->tv); } } diff --git a/src/buffer.h b/src/buffer.h index 93c7818dd..4c9e5c905 100644 --- a/src/buffer.h +++ b/src/buffer.h @@ -39,6 +39,7 @@ typedef struct rspamd_io_dispatcher_s { enum io_policy policy; /**< IO policy */ size_t nchars; /**< how many chars to read */ int fd; /**< descriptor */ + gboolean wanna_die; /**< if dispatcher should be stopped */ dispatcher_read_callback_t read_callback; /**< read callback */ dispatcher_write_callback_t write_callback; /**< write callback */ dispatcher_err_callback_t err_callback; /**< error callback */ diff --git a/src/controller.c b/src/controller.c index 7733bc924..8128d7356 100644 --- a/src/controller.c +++ b/src/controller.c @@ -457,7 +457,13 @@ static void err_socket (GError *err, void *arg) { struct controller_session *session = (struct controller_session *)arg; - msg_info ("err_socket: abnormally closing control connection, error: %s", err->message); + + if (err->code == EOF) { + msg_info ("err_socket: client closed control connection"); + } + else { + msg_info ("err_socket: abnormally closing control connection, error: %s", err->message); + } /* Free buffers */ free_session (session); } diff --git a/src/protocol.c b/src/protocol.c index ab0db5f78..543507c61 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -91,6 +91,7 @@ parse_command (struct worker_task *task, char *line) { char *token; + msg_debug ("parse_command: got line from worker: %s", line); token = strsep (&line, " "); if (line == NULL || token == NULL) { msg_debug ("parse_command: bad command: %s", token); @@ -191,6 +192,7 @@ parse_header (struct worker_task *task, char *line) task->last_error = "Unknown content length"; task->error_code = RSPAMD_LENGTH_ERROR; task->state = WRITE_ERROR; + return -1; } } return 0; diff --git a/src/worker.c b/src/worker.c index 6d0c413cd..f3bbac8ce 100644 --- a/src/worker.c +++ b/src/worker.c @@ -40,8 +40,8 @@ #include <perl.h> /* from the Perl distribution */ #define TASK_POOL_SIZE 4095 -/* 2 seconds for worker's IO */ -#define WORKER_IO_TIMEOUT 2 +/* 60 seconds for worker's IO */ +#define WORKER_IO_TIMEOUT 60 const f_str_t CRLF = { /* begin */"\r\n", @@ -119,7 +119,8 @@ free_task (struct worker_task *task) g_list_free_1 (part); } memory_pool_delete (task->task_pool); - rspamd_remove_dispatcher (task->dispatcher); + /* Plan dispatcher shutdown */ + task->dispatcher->wanna_die = 1; close (task->sock); g_free (task); } @@ -216,7 +217,8 @@ accept_socket (int fd, short what, void *arg) struct sockaddr_storage ss; struct worker_task *new_task; socklen_t addrlen = sizeof(ss); - int nfd; + int nfd, on = 1; + struct linger linger; if ((nfd = accept (fd, (struct sockaddr *)&ss, &addrlen)) == -1) { return; @@ -224,6 +226,13 @@ accept_socket (int fd, short what, void *arg) if (event_make_socket_nonblocking(fd) < 0) { return; } + + /* Socket options */ + setsockopt (nfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&on, sizeof(on)); + setsockopt (nfd, SOL_SOCKET, SO_REUSEADDR, (void *)&on, sizeof(on)); + linger.l_onoff = 1; + linger.l_linger = 2; + setsockopt (nfd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)); new_task = g_malloc (sizeof (struct worker_task)); if (new_task == NULL) { |