summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-04-20 02:55:22 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-04-20 02:55:22 +0400
commit80efd10c7fda505905f960c73ce567c57e91a86b (patch)
treeaeb87f6d772f7f6359baa8debcaf850cdc20ee8b
parent637d09abaceb0e3babff9fd25fa133da4c0732ae (diff)
downloadrspamd-80efd10c7fda505905f960c73ce567c57e91a86b.tar.gz
rspamd-80efd10c7fda505905f960c73ce567c57e91a86b.zip
Allow rspamd dispatcher code to process half-closed connections.
-rw-r--r--src/aio_event.c2
-rw-r--r--src/buffer.c44
-rw-r--r--src/buffer.h2
-rw-r--r--src/worker.c2
4 files changed, 41 insertions, 9 deletions
diff --git a/src/aio_event.c b/src/aio_event.c
index 85f52118d..4f1bf95d4 100644
--- a/src/aio_event.c
+++ b/src/aio_event.c
@@ -48,7 +48,7 @@
#endif
#define SYS_eventfd 323
-#define MAX_AIO_EV 1024
+#define MAX_AIO_EV 64
struct io_cbdata {
gint fd;
diff --git a/src/buffer.c b/src/buffer.c
index d5bf673dd..c431988f8 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -386,11 +386,27 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
}
else if (r == 0) {
/* Got EOF while we wait for data */
+#if 0
if (d->err_callback) {
err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
d->err_callback (err, d->user_data);
return;
}
+#endif
+ /* Read returned 0, it may be shutdown or full quit */
+ if (!d->want_read) {
+ d->half_closed = TRUE;
+ /* Do not expect any read after this */
+ event_del (d->ev);
+ }
+ else {
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+ d->err_callback (err, d->user_data);
+ return;
+ }
+ }
+ return;
}
else if (r == -1 && errno == EAGAIN) {
debug_ip("partially read data, retry");
@@ -549,16 +565,26 @@ dispatcher_cb (gint fd, short what, void *arg)
}
else {
if (g_queue_get_length (d->out_buffers) == 0) {
- /* Want read again */
- event_del (d->ev);
- event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- if (d->is_restored && d->write_callback) {
- if (!d->write_callback (d->user_data)) {
+ if (d->half_closed && !d->is_restored) {
+ /* Socket is half closed and there is nothing more to write, closing connection */
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+ d->err_callback (err, d->user_data);
return;
}
- d->is_restored = FALSE;
+ }
+ else {
+ /* Want read again */
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
+ event_add (d->ev, d->tv);
+ if (d->is_restored && d->write_callback) {
+ if (!d->write_callback (d->user_data)) {
+ return;
+ }
+ d->is_restored = FALSE;
+ }
}
}
else {
@@ -599,6 +625,8 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic
new->err_callback = err_cb;
new->user_data = user_data;
new->strip_eol = TRUE;
+ new->half_closed = FALSE;
+ new->want_read = TRUE;
new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
new->fd = fd;
diff --git a/src/buffer.h b/src/buffer.h
index ce2cf3d2e..235497aa2 100644
--- a/src/buffer.h
+++ b/src/buffer.h
@@ -52,6 +52,8 @@ typedef struct rspamd_io_dispatcher_s {
gboolean in_sendfile; /**< whether buffer is in sendfile mode */
gboolean strip_eol; /**< strip or not line ends in BUFFER_LINE policy */
gboolean is_restored; /**< call a callback when dispatcher is restored */
+ gboolean half_closed; /**< connection is half closed */
+ gboolean want_read; /**< whether we want to read more data */
struct event_base *ev_base; /**< event base for io operations */
#ifndef HAVE_SENDFILE
void *map;
diff --git a/src/worker.c b/src/worker.c
index dad842ce3..eecaf45dc 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -470,6 +470,8 @@ read_socket (f_str_t * in, void *arg)
task->msg->len = in->len;
debug_task ("got string of length %z", task->msg->len);
task->state = WAIT_FILTER;
+ /* No more need of reading allowing half-closed connections to be proceed */
+ task->dispatcher->want_read = FALSE;
r = process_message (task);
if (r == -1) {
msg_warn ("processing of message failed");