From 80efd10c7fda505905f960c73ce567c57e91a86b Mon Sep 17 00:00:00 2001
From: Vsevolod Stakhov <vsevolod@rambler-co.ru>
Date: Fri, 20 Apr 2012 02:55:22 +0400
Subject: Allow rspamd dispatcher code to process half-closed connections.

---
 src/aio_event.c |  2 +-
 src/buffer.c    | 44 ++++++++++++++++++++++++++++++++++++--------
 src/buffer.h    |  2 ++
 src/worker.c    |  2 ++
 4 files changed, 41 insertions(+), 9 deletions(-)

diff --git a/src/aio_event.c b/src/aio_event.c
index 85f52118d..4f1bf95d4 100644
--- a/src/aio_event.c
+++ b/src/aio_event.c
@@ -48,7 +48,7 @@
 #endif
 
 #define SYS_eventfd       323
-#define MAX_AIO_EV        1024
+#define MAX_AIO_EV        64
 
 struct io_cbdata {
 	gint fd;
diff --git a/src/buffer.c b/src/buffer.c
index d5bf673dd..c431988f8 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -386,11 +386,27 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
 		}
 		else if (r == 0) {
 			/* Got EOF while we wait for data */
+#if 0
 			if (d->err_callback) {
 				err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
 				d->err_callback (err, d->user_data);
 				return;
 			}
+#endif
+			/* Read returned 0, it may be shutdown or full quit */
+			if (!d->want_read) {
+				d->half_closed = TRUE;
+				/* Do not expect any read after this */
+				event_del (d->ev);
+			}
+			else {
+				if (d->err_callback) {
+					err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+					d->err_callback (err, d->user_data);
+					return;
+				}
+			}
+			return;
 		}
 		else if (r == -1 && errno == EAGAIN) {
 			debug_ip("partially read data, retry");
@@ -549,16 +565,26 @@ dispatcher_cb (gint fd, short what, void *arg)
 		}
 		else {
 			if (g_queue_get_length (d->out_buffers) == 0) {
-				/* Want read again */
-				event_del (d->ev);
-				event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
-				event_base_set (d->ev_base, d->ev);
-				event_add (d->ev, d->tv);
-				if (d->is_restored && d->write_callback) {
-					if (!d->write_callback (d->user_data)) {
+				if (d->half_closed && !d->is_restored) {
+					/* Socket is half closed and there is nothing more to write, closing connection */
+					if (d->err_callback) {
+						err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+						d->err_callback (err, d->user_data);
 						return;
 					}
-					d->is_restored = FALSE;
+				}
+				else {
+					/* Want read again */
+					event_del (d->ev);
+					event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+					event_base_set (d->ev_base, d->ev);
+					event_add (d->ev, d->tv);
+					if (d->is_restored && d->write_callback) {
+						if (!d->write_callback (d->user_data)) {
+							return;
+						}
+						d->is_restored = FALSE;
+					}
 				}
 			}
 			else {
@@ -599,6 +625,8 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic
 	new->err_callback = err_cb;
 	new->user_data = user_data;
 	new->strip_eol = TRUE;
+	new->half_closed = FALSE;
+	new->want_read = TRUE;
 
 	new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
 	new->fd = fd;
diff --git a/src/buffer.h b/src/buffer.h
index ce2cf3d2e..235497aa2 100644
--- a/src/buffer.h
+++ b/src/buffer.h
@@ -52,6 +52,8 @@ typedef struct rspamd_io_dispatcher_s {
 	gboolean in_sendfile;											/**< whether buffer is in sendfile mode */
 	gboolean strip_eol;												/**< strip or not line ends in BUFFER_LINE policy */
 	gboolean is_restored;											/**< call a callback when dispatcher is restored */
+	gboolean half_closed;											/**< connection is half closed */
+	gboolean want_read;												/**< whether we want to read more data */
 	struct event_base *ev_base;										/**< event base for io operations */
 #ifndef HAVE_SENDFILE
 	void *map;
diff --git a/src/worker.c b/src/worker.c
index dad842ce3..eecaf45dc 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -470,6 +470,8 @@ read_socket (f_str_t * in, void *arg)
 		task->msg->len = in->len;
 		debug_task ("got string of length %z", task->msg->len);
 		task->state = WAIT_FILTER;
+		/* No more need of reading allowing half-closed connections to be proceed */
+		task->dispatcher->want_read = FALSE;
 		r = process_message (task);
 		if (r == -1) {
 			msg_warn ("processing of message failed");
-- 
cgit v1.2.3