]> source.dussan.org Git - rspamd.git/commitdiff
* Significate performance improving by vectorizing IO output (about 4 times for kv...
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 31 Oct 2011 17:55:54 +0000 (20:55 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 31 Oct 2011 17:55:54 +0000 (20:55 +0300)
src/buffer.c
src/buffer.h
src/kvstorage_server.c

index 95e42f756ca570e3949a4e0ab4c751da6a31a82d..e647374c2562557619efc1bb6adff838f16d17fc 100644 (file)
@@ -177,23 +177,98 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
 static                          gboolean
 write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
 {
-       GList                          *cur;
+       GList                          *cur = NULL, *tmp;
        GError                         *err = NULL;
        rspamd_buffer_t                *buf;
        ssize_t                         r;
-
-       /* Fix order */
-       if (d->out_buffers) {
-               d->out_buffers = g_list_reverse (d->out_buffers);
-       }
-       cur = g_list_first (d->out_buffers);
-       while (cur) {
-               buf = (rspamd_buffer_t *) cur->data;
-               if (BUFREMAIN (buf) == 0) {
-                       /* Skip empty buffers */
-                       cur = g_list_next (cur);
-                       continue;
+       struct iovec                               *iov;
+       guint                                           i, len, blen;
+
+       len = g_queue_get_length (d->out_buffers);
+       if (len > 1) {
+               /* IOV version */
+               cur = d->out_buffers->tail;
+               iov = g_slice_alloc (len * sizeof (struct iovec));
+               i = 0;
+               while (cur) {
+                       buf = cur->data;
+                       blen = BUFREMAIN (buf);
+                       if (blen > 0) {
+                               iov[i].iov_base = buf->pos;
+                               iov[i].iov_len = blen;
+                       }
+                       else {
+                               iov[i].iov_base = NULL;
+                               iov[i].iov_len = 0;
+                       }
+                       i ++;
+                       cur = g_list_previous (cur);
                }
+               /* Now try to write the whole vector */
+               r = writev (fd, iov, len);
+               if (r == -1 && errno != EAGAIN) {
+                       g_slice_free1 (len * sizeof (struct iovec), iov);
+                       if (d->err_callback) {
+                               err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+                               d->err_callback (err, d->user_data);
+                               return FALSE;
+                       }
+               }
+               else if (r > 0) {
+                       /* Find pos inside buffers */
+                       cur = d->out_buffers->tail;
+                       i = 0;
+                       while (cur) {
+                               buf = cur->data;
+                               blen = BUFREMAIN (buf);
+                               if (r >= blen) {
+                                       tmp = cur;
+                                       cur = g_list_previous (cur);
+                                       /* Mark this buffer as read */
+                                       g_queue_delete_link (d->out_buffers, tmp);
+                                       r -= blen;
+                               }
+                               else {
+                                       /* This buffer was not written completely */
+                                       buf->pos += r;
+                                       break;
+                               }
+                               i ++;
+                               cur = g_list_previous (cur);
+                       }
+                       g_slice_free1 (len * sizeof (struct iovec), iov);
+                       if (cur != 0) {
+                               /* Wait for other event */
+                               event_del (d->ev);
+                               event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+                               event_base_set (d->ev_base, d->ev);
+                               event_add (d->ev, d->tv);
+                               return TRUE;
+                       }
+               }
+               else if (r == 0) {
+                       /* Got EOF while we wait for data */
+                       g_slice_free1 (len * sizeof (struct iovec), iov);
+                       if (d->err_callback) {
+                               err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+                               d->err_callback (err, d->user_data);
+                               return FALSE;
+                       }
+               }
+               else if (r == -1 && errno == EAGAIN) {
+                       g_slice_free1 (len * sizeof (struct iovec), iov);
+                       debug_ip("partially write data, retry");
+                       /* Wait for other event */
+                       event_del (d->ev);
+                       event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+                       event_base_set (d->ev_base, d->ev);
+                       event_add (d->ev, d->tv);
+                       return TRUE;
+               }
+       }
+       else if (len == 1) {
+               /* Single write version */
+               buf = d->out_buffers->head->data;
                r = write (fd, buf->pos, BUFREMAIN (buf));
                if (r == -1 && errno != EAGAIN) {
                        if (d->err_callback) {
@@ -207,7 +282,7 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
                        if (BUFREMAIN (buf) != 0) {
                                /* Continue with this buffer */
                                debug_ip("wrote %z bytes of %z", r, buf->data->len);
-                               continue;
+                               return write_buffers (fd, d, is_delayed);
                        }
                }
                else if (r == 0) {
@@ -227,13 +302,11 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
                        event_add (d->ev, d->tv);
                        return TRUE;
                }
-               cur = g_list_next (cur);
        }
 
        if (cur == NULL) {
                /* Disable write event for this time */
-               g_list_free (d->out_buffers);
-               d->out_buffers = NULL;
+               g_queue_clear (d->out_buffers);
 
                debug_ip("all buffers were written successfully");
 
@@ -472,7 +545,7 @@ dispatcher_cb (gint fd, short what, void *arg)
                        sendfile_callback (d);
                }
                else {
-                       if (d->out_buffers == NULL) {
+                       if (g_queue_get_length (d->out_buffers) == 0) {
                                event_del (d->ev);
                                event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
                                event_base_set (d->ev_base, d->ev);
@@ -526,6 +599,7 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic
        new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
        new->fd = fd;
        new->ev_base = base;
+       new->out_buffers = g_queue_new ();
 
        event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
        event_base_set (new->ev_base, new->ev);
@@ -540,9 +614,7 @@ rspamd_remove_dispatcher (rspamd_io_dispatcher_t * dispatcher)
        if (dispatcher != NULL) {
                event_del (dispatcher->ev);
                memory_pool_delete (dispatcher->pool);
-               if (dispatcher->out_buffers) {
-                       g_list_free (dispatcher->out_buffers);
-               }
+               g_queue_free (dispatcher->out_buffers);
                g_free (dispatcher);
        }
 }
@@ -609,7 +681,7 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, void *data, size_t len, gbo
        newbuf->pos = newbuf->data->begin;
        newbuf->data->len = len;
 
-       d->out_buffers = g_list_prepend (d->out_buffers, newbuf);
+       g_queue_push_head (d->out_buffers, newbuf);
 
        if (!delayed) {
                debug_ip("plan write event");
index 51b321833c61a3b829a4e66afd154edc2abbfa30..96e38d471cc89a6621160d34a4610bc3e7a8a1e8 100644 (file)
@@ -33,7 +33,7 @@ typedef struct rspamd_buffer_s {
 
 typedef struct rspamd_io_dispatcher_s {
        rspamd_buffer_t *in_buf;                                                                                /**< input buffer                       */
-       GList *out_buffers;                                                                                             /**< out buffers chain          */
+       GQueue *out_buffers;                                                                                    /**< out buffers chain          */
        struct timeval *tv;                                                                                             /**< io timeout                         */
        struct event *ev;                                                                                               /**< libevent io event          */
        memory_pool_t *pool;                                                                                    /**< where to store data        */
index 48068cdf7efc0ad50a383ff426d619a5eed9b0b8..19a4b3b3ddd24ceba3ca598b2d0db8aa4182daa4 100644 (file)
@@ -355,10 +355,10 @@ kvstorage_read_socket (f_str_t * in, void *arg)
                                        r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
                                                        elt->key, elt->flags, elt->size);
                                        if (!rspamd_dispatcher_write (session->dispather, outbuf,
-                                                                                                                               r, FALSE, FALSE)) {
+                                                                                                                               r, TRUE, FALSE)) {
                                                return FALSE;
                                        }
-                                       if (!rspamd_dispatcher_write (session->dispather, elt->data, elt->size, FALSE, TRUE)) {
+                                       if (!rspamd_dispatcher_write (session->dispather, elt->data, elt->size, TRUE, TRUE)) {
                                                return FALSE;
                                        }
                                        return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
@@ -443,9 +443,9 @@ thr_accept_socket (gint fd, short what, void *arg)
                g_static_mutex_unlock (thr->accept_mtx);
                return;
        }
+       g_static_mutex_unlock (thr->accept_mtx);
        /* Check for EAGAIN */
        if (nfd == 0) {
-               g_static_mutex_unlock (thr->accept_mtx);
                return;
        }
 
@@ -465,7 +465,6 @@ thr_accept_socket (gint fd, short what, void *arg)
                memcpy (&session->client_addr, &su.s4.sin_addr,
                                                sizeof (struct in_addr));
        }
-       g_static_mutex_unlock (thr->accept_mtx);
 }
 
 /**