]> source.dussan.org Git - rspamd.git/commitdiff
Allow rspamd dispatcher code to process half-closed connections.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 19 Apr 2012 22:55:22 +0000 (02:55 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 19 Apr 2012 22:55:22 +0000 (02:55 +0400)
src/aio_event.c
src/buffer.c
src/buffer.h
src/worker.c

index 85f52118df7a4814196916266456b692aec01c68..4f1bf95d416c35fc7606c3c8a5f4b4e518357907 100644 (file)
@@ -48,7 +48,7 @@
 #endif
 
 #define SYS_eventfd       323
-#define MAX_AIO_EV        1024
+#define MAX_AIO_EV        64
 
 struct io_cbdata {
        gint fd;
index d5bf673ddc515057177d9623eadbc857459c16e5..c431988f8d74e79704f2d1cf3375e92ab663737d 100644 (file)
@@ -386,11 +386,27 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
                }
                else if (r == 0) {
                        /* Got EOF while we wait for data */
+#if 0
                        if (d->err_callback) {
                                err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
                                d->err_callback (err, d->user_data);
                                return;
                        }
+#endif
+                       /* Read returned 0, it may be shutdown or full quit */
+                       if (!d->want_read) {
+                               d->half_closed = TRUE;
+                               /* Do not expect any read after this */
+                               event_del (d->ev);
+                       }
+                       else {
+                               if (d->err_callback) {
+                                       err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+                                       d->err_callback (err, d->user_data);
+                                       return;
+                               }
+                       }
+                       return;
                }
                else if (r == -1 && errno == EAGAIN) {
                        debug_ip("partially read data, retry");
@@ -549,16 +565,26 @@ dispatcher_cb (gint fd, short what, void *arg)
                }
                else {
                        if (g_queue_get_length (d->out_buffers) == 0) {
-                               /* Want read again */
-                               event_del (d->ev);
-                               event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
-                               event_base_set (d->ev_base, d->ev);
-                               event_add (d->ev, d->tv);
-                               if (d->is_restored && d->write_callback) {
-                                       if (!d->write_callback (d->user_data)) {
+                               if (d->half_closed && !d->is_restored) {
+                                       /* Socket is half closed and there is nothing more to write, closing connection */
+                                       if (d->err_callback) {
+                                               err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+                                               d->err_callback (err, d->user_data);
                                                return;
                                        }
-                                       d->is_restored = FALSE;
+                               }
+                               else {
+                                       /* Want read again */
+                                       event_del (d->ev);
+                                       event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+                                       event_base_set (d->ev_base, d->ev);
+                                       event_add (d->ev, d->tv);
+                                       if (d->is_restored && d->write_callback) {
+                                               if (!d->write_callback (d->user_data)) {
+                                                       return;
+                                               }
+                                               d->is_restored = FALSE;
+                                       }
                                }
                        }
                        else {
@@ -599,6 +625,8 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic
        new->err_callback = err_cb;
        new->user_data = user_data;
        new->strip_eol = TRUE;
+       new->half_closed = FALSE;
+       new->want_read = TRUE;
 
        new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
        new->fd = fd;
index ce2cf3d2ef0b8bfb06aa1eb13be0bb87f2a2be25..235497aa2ed00e83933878bbeb4025f7b7816efa 100644 (file)
@@ -52,6 +52,8 @@ typedef struct rspamd_io_dispatcher_s {
        gboolean in_sendfile;                                                                                   /**< whether buffer is in sendfile mode */
        gboolean strip_eol;                                                                                             /**< strip or not line ends in BUFFER_LINE policy */
        gboolean is_restored;                                                                                   /**< call a callback when dispatcher is restored */
+       gboolean half_closed;                                                                                   /**< connection is half closed */
+       gboolean want_read;                                                                                             /**< whether we want to read more data */
        struct event_base *ev_base;                                                                             /**< event base for io operations */
 #ifndef HAVE_SENDFILE
        void *map;
index dad842ce39cc9eba97be6b9670f521c49024c3c4..eecaf45dc6c9ec751be17059e9c83c8798bb7f41 100644 (file)
@@ -470,6 +470,8 @@ read_socket (f_str_t * in, void *arg)
                task->msg->len = in->len;
                debug_task ("got string of length %z", task->msg->len);
                task->state = WAIT_FILTER;
+               /* No more need of reading allowing half-closed connections to be proceed */
+               task->dispatcher->want_read = FALSE;
                r = process_message (task);
                if (r == -1) {
                        msg_warn ("processing of message failed");