aboutsummaryrefslogtreecommitdiffstats
path: root/src/buffer.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-04-20 02:55:22 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-04-20 02:55:22 +0400
commit80efd10c7fda505905f960c73ce567c57e91a86b (patch)
treeaeb87f6d772f7f6359baa8debcaf850cdc20ee8b /src/buffer.c
parent637d09abaceb0e3babff9fd25fa133da4c0732ae (diff)
downloadrspamd-80efd10c7fda505905f960c73ce567c57e91a86b.tar.gz
rspamd-80efd10c7fda505905f960c73ce567c57e91a86b.zip
Allow rspamd dispatcher code to process half-closed connections.
Diffstat (limited to 'src/buffer.c')
-rw-r--r--src/buffer.c44
1 files changed, 36 insertions, 8 deletions
diff --git a/src/buffer.c b/src/buffer.c
index d5bf673dd..c431988f8 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -386,11 +386,27 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
}
else if (r == 0) {
/* Got EOF while we wait for data */
+#if 0
if (d->err_callback) {
err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
d->err_callback (err, d->user_data);
return;
}
+#endif
+ /* Read returned 0, it may be shutdown or full quit */
+ if (!d->want_read) {
+ d->half_closed = TRUE;
+ /* Do not expect any read after this */
+ event_del (d->ev);
+ }
+ else {
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+ d->err_callback (err, d->user_data);
+ return;
+ }
+ }
+ return;
}
else if (r == -1 && errno == EAGAIN) {
debug_ip("partially read data, retry");
@@ -549,16 +565,26 @@ dispatcher_cb (gint fd, short what, void *arg)
}
else {
if (g_queue_get_length (d->out_buffers) == 0) {
- /* Want read again */
- event_del (d->ev);
- event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- if (d->is_restored && d->write_callback) {
- if (!d->write_callback (d->user_data)) {
+ if (d->half_closed && !d->is_restored) {
+ /* Socket is half closed and there is nothing more to write, closing connection */
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+ d->err_callback (err, d->user_data);
return;
}
- d->is_restored = FALSE;
+ }
+ else {
+ /* Want read again */
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
+ event_add (d->ev, d->tv);
+ if (d->is_restored && d->write_callback) {
+ if (!d->write_callback (d->user_data)) {
+ return;
+ }
+ d->is_restored = FALSE;
+ }
}
}
else {
@@ -599,6 +625,8 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic
new->err_callback = err_cb;
new->user_data = user_data;
new->strip_eol = TRUE;
+ new->half_closed = FALSE;
+ new->want_read = TRUE;
new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
new->fd = fd;