From 80efd10c7fda505905f960c73ce567c57e91a86b Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Fri, 20 Apr 2012 02:55:22 +0400 Subject: [PATCH] Allow rspamd dispatcher code to process half-closed connections. --- src/aio_event.c | 2 +- src/buffer.c | 44 ++++++++++++++++++++++++++++++++++++-------- src/buffer.h | 2 ++ src/worker.c | 2 ++ 4 files changed, 41 insertions(+), 9 deletions(-) diff --git a/src/aio_event.c b/src/aio_event.c index 85f52118d..4f1bf95d4 100644 --- a/src/aio_event.c +++ b/src/aio_event.c @@ -48,7 +48,7 @@ #endif #define SYS_eventfd 323 -#define MAX_AIO_EV 1024 +#define MAX_AIO_EV 64 struct io_cbdata { gint fd; diff --git a/src/buffer.c b/src/buffer.c index d5bf673dd..c431988f8 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -386,11 +386,27 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) } else if (r == 0) { /* Got EOF while we wait for data */ +#if 0 if (d->err_callback) { err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); d->err_callback (err, d->user_data); return; } +#endif + /* Read returned 0, it may be shutdown or full quit */ + if (!d->want_read) { + d->half_closed = TRUE; + /* Do not expect any read after this */ + event_del (d->ev); + } + else { + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); + d->err_callback (err, d->user_data); + return; + } + } + return; } else if (r == -1 && errno == EAGAIN) { debug_ip("partially read data, retry"); @@ -549,16 +565,26 @@ dispatcher_cb (gint fd, short what, void *arg) } else { if (g_queue_get_length (d->out_buffers) == 0) { - /* Want read again */ - event_del (d->ev); - event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - if (d->is_restored && d->write_callback) { - if (!d->write_callback (d->user_data)) { + if (d->half_closed && !d->is_restored) { + /* Socket is half closed and there is nothing more to write, closing connection */ + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); + d->err_callback (err, d->user_data); return; } - d->is_restored = FALSE; + } + else { + /* Want read again */ + event_del (d->ev); + event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); + event_add (d->ev, d->tv); + if (d->is_restored && d->write_callback) { + if (!d->write_callback (d->user_data)) { + return; + } + d->is_restored = FALSE; + } } } else { @@ -599,6 +625,8 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic new->err_callback = err_cb; new->user_data = user_data; new->strip_eol = TRUE; + new->half_closed = FALSE; + new->want_read = TRUE; new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event)); new->fd = fd; diff --git a/src/buffer.h b/src/buffer.h index ce2cf3d2e..235497aa2 100644 --- a/src/buffer.h +++ b/src/buffer.h @@ -52,6 +52,8 @@ typedef struct rspamd_io_dispatcher_s { gboolean in_sendfile; /**< whether buffer is in sendfile mode */ gboolean strip_eol; /**< strip or not line ends in BUFFER_LINE policy */ gboolean is_restored; /**< call a callback when dispatcher is restored */ + gboolean half_closed; /**< connection is half closed */ + gboolean want_read; /**< whether we want to read more data */ struct event_base *ev_base; /**< event base for io operations */ #ifndef HAVE_SENDFILE void *map; diff --git a/src/worker.c b/src/worker.c index dad842ce3..eecaf45dc 100644 --- a/src/worker.c +++ b/src/worker.c @@ -470,6 +470,8 @@ read_socket (f_str_t * in, void *arg) task->msg->len = in->len; debug_task ("got string of length %z", task->msg->len); task->state = WAIT_FILTER; + /* No more need of reading allowing half-closed connections to be proceed */ + task->dispatcher->want_read = FALSE; r = process_message (task); if (r == -1) { msg_warn ("processing of message failed"); -- 2.39.5