]> source.dussan.org Git - rspamd.git/commitdiff
Improve output buffering architecture.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 10 Oct 2013 16:51:55 +0000 (17:51 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 10 Oct 2013 16:51:55 +0000 (17:51 +0100)
src/buffer.c
src/buffer.h

index 2c86b1c9af85d05019fee42662f6f0e5af6cd228..381d77cd219d20e8c84a4c91bb9668b33d5ff7ef 100644 (file)
@@ -174,38 +174,36 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
 
 #define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
 
+#define APPEND_OUT_BUFFER(d, buf) do {                                 \
+       DL_APPEND((d)->out_buffers.buffers, buf);                       \
+       (d)->out_buffers.pending ++;                                            \
+       } while (0)
+#define DELETE_OUT_BUFFER(d, buf) do {                                 \
+       DL_DELETE((d)->out_buffers.buffers, (buf));                     \
+       g_string_free((buf->data), (buf)->allocated);           \
+       g_slice_free1(sizeof (struct rspamd_out_buffer_s), (buf)); \
+       (d)->out_buffers.pending --;                                            \
+       } while (0)
+
 static                          gboolean
 write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
 {
-       GList                          *cur = NULL, *tmp;
        GError                         *err = NULL;
-       rspamd_buffer_t                *buf;
+       struct rspamd_out_buffer_s    *cur = NULL, *tmp;
        ssize_t                         r;
-       struct iovec                               *iov;
-       guint                                           i, len, blen;
-
-       len = g_queue_get_length (d->out_buffers);
-       if (len > 1) {
-               /* IOV version */
+       struct iovec                  *iov;
+       guint                           i, len;
 
+       len = d->out_buffers.pending;
+       while (len > 0) {
                /* Unset delayed as actually we HAVE buffers to write */
                is_delayed = TRUE;
-               cur = d->out_buffers->tail;
                iov = g_slice_alloc (len * sizeof (struct iovec));
                i = 0;
-               while (cur) {
-                       buf = cur->data;
-                       blen = BUFREMAIN (buf);
-                       if (blen > 0) {
-                               iov[i].iov_base = buf->pos;
-                               iov[i].iov_len = blen;
-                       }
-                       else {
-                               iov[i].iov_base = NULL;
-                               iov[i].iov_len = 0;
-                       }
+               DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
+                       iov[i].iov_base = cur->data->str;
+                       iov[i].iov_len = cur->data->len;
                        i ++;
-                       cur = g_list_previous (cur);
                }
                /* Now try to write the whole vector */
                r = writev (fd, iov, len);
@@ -219,28 +217,20 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
                }
                else if (r > 0) {
                        /* Find pos inside buffers */
-                       cur = d->out_buffers->tail;
-                       i = 0;
-                       while (cur) {
-                               buf = cur->data;
-                               blen = BUFREMAIN (buf);
-                               if (r >= (ssize_t)blen) {
-                                       tmp = cur;
-                                       cur = g_list_previous (cur);
+                       DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
+                               if (r >= (ssize_t)cur->data->len) {
                                        /* Mark this buffer as read */
-                                       g_queue_delete_link (d->out_buffers, tmp);
-                                       r -= blen;
+                                       r -= cur->data->len;
+                                       DELETE_OUT_BUFFER (d, cur);
                                }
                                else {
                                        /* This buffer was not written completely */
-                                       buf->pos += r;
+                                       g_string_erase (cur->data, 0, r);
                                        break;
                                }
-                               i ++;
-                               cur = g_list_previous (cur);
                        }
                        g_slice_free1 (len * sizeof (struct iovec), iov);
-                       if (cur != 0) {
+                       if (d->out_buffers.pending > 0) {
                                /* Wait for other event */
                                event_del (d->ev);
                                event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
@@ -269,49 +259,11 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
                        return TRUE;
                }
        }
-       else if (len == 1) {
-               /* Single write version */
-               buf = d->out_buffers->head->data;
-               r = write (fd, buf->pos, BUFREMAIN (buf));
-               if (r == -1 && errno != EAGAIN) {
-                       if (d->err_callback) {
-                               err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
-                               d->err_callback (err, d->user_data);
-                               return FALSE;
-                       }
-               }
-               else if (r > 0) {
-                       buf->pos += r;
-                       if (BUFREMAIN (buf) != 0) {
-                               /* Continue with this buffer */
-                               debug_ip("wrote %z bytes of %z", r, buf->data->len);
-                               return write_buffers (fd, d, is_delayed);
-                       }
-               }
-               else if (r == 0) {
-                       /* Got EOF while we wait for data */
-                       if (d->err_callback) {
-                               err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
-                               d->err_callback (err, d->user_data);
-                               return FALSE;
-                       }
-               }
-               else if (r == -1 && errno == EAGAIN) {
-                       debug_ip("partially write data, retry");
-                       /* Wait for other event */
-                       event_del (d->ev);
-                       event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
-                       event_base_set (d->ev_base, d->ev);
-                       event_add (d->ev, d->tv);
-                       return TRUE;
-               }
-       }
 
-       if (cur == NULL) {
+       if (d->out_buffers.pending == 0) {
                /* Disable write event for this time */
-               g_queue_clear (d->out_buffers);
 
-               debug_ip("all buffers were written successfully");
+               debug_ip ("all buffers were written successfully");
 
                if (is_delayed && d->write_callback) {
                        if (!d->write_callback (d->user_data)) {
@@ -355,7 +307,7 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
        if (d->in_buf == NULL) {
                d->in_buf = memory_pool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t));
                if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) {
-                       d->in_buf->data = fstralloc_tmp (d->pool, BUFSIZ);
+                       d->in_buf->data = fstralloc_tmp (d->pool, d->default_buf_size);
                }
                else {
                        d->in_buf->data = fstralloc_tmp (d->pool, d->nchars + 1);
@@ -570,7 +522,7 @@ dispatcher_cb (gint fd, short what, void *arg)
                        sendfile_callback (d);
                }
                else {
-                       if (g_queue_get_length (d->out_buffers) == 0) {
+                       if (d->out_buffers.pending == 0) {
                                if (d->half_closed && !d->is_restored) {
                                        /* Socket is half closed and there is nothing more to write, closing connection */
                                        if (d->err_callback) {
@@ -612,8 +564,7 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic
                return NULL;
        }
 
-       new = g_malloc (sizeof (rspamd_io_dispatcher_t));
-       bzero (new, sizeof (rspamd_io_dispatcher_t));
+       new = g_slice_alloc (sizeof (rspamd_io_dispatcher_t));
 
        new->pool = memory_pool_new (memory_pool_get_size ());
        if (tv != NULL) {
@@ -634,11 +585,11 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic
        new->half_closed = FALSE;
        new->want_read = TRUE;
        new->is_restored = FALSE;
+       new->default_buf_size = sysconf (_SC_PAGESIZE);
 
        new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
        new->fd = fd;
        new->ev_base = base;
-       new->out_buffers = g_queue_new ();
 
        event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
        event_base_set (new->ev_base, new->ev);
@@ -648,13 +599,17 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic
 }
 
 void
-rspamd_remove_dispatcher (rspamd_io_dispatcher_t * dispatcher)
+rspamd_remove_dispatcher (rspamd_io_dispatcher_t * d)
 {
-       if (dispatcher != NULL) {
-               event_del (dispatcher->ev);
-               memory_pool_delete (dispatcher->pool);
-               g_queue_free (dispatcher->out_buffers);
-               g_free (dispatcher);
+       struct rspamd_out_buffer_s *cur, *tmp;
+
+       if (d != NULL) {
+               DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
+                       DELETE_OUT_BUFFER (d, cur);
+               }
+               event_del (d->ev);
+               memory_pool_delete (d->pool);
+               g_slice_free1 (sizeof (rspamd_io_dispatcher_t), d);
        }
 }
 
@@ -666,7 +621,7 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy,
 
        if (d->policy != policy || nchars != d->nchars) {
                d->policy = policy;
-               d->nchars = nchars ? nchars : BUFSIZ;
+               d->nchars = nchars ? nchars : d->default_buf_size;
                /* Resize input buffer if needed */
                if (policy == BUFFER_CHARACTER && nchars != 0) {
                        if (d->in_buf && d->in_buf->data->size < nchars) {
@@ -679,8 +634,8 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy,
                        }
                }
                else if (policy == BUFFER_LINE || policy == BUFFER_ANY) {
-                       if (d->in_buf && d->nchars < BUFSIZ) {
-                               tmp = fstralloc_tmp (d->pool, BUFSIZ);
+                       if (d->in_buf && d->nchars < d->default_buf_size) {
+                               tmp = fstralloc_tmp (d->pool, d->default_buf_size);
                                memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
                                t = d->in_buf->pos - d->in_buf->data->begin;
                                tmp->len = d->in_buf->data->len;
@@ -695,35 +650,30 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy,
 }
 
 gboolean
-rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, const void *data, size_t len, gboolean delayed, gboolean allocated)
+rspamd_dispatcher_write (rspamd_io_dispatcher_t * d,
+               const void *data, size_t len, gboolean delayed, gboolean allocated)
 {
-       rspamd_buffer_t                *newbuf;
+       struct rspamd_out_buffer_s                *newbuf;
 
-       newbuf = memory_pool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t));
+       newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s));
        if (len == 0) {
                /* Assume NULL terminated */
                len = strlen ((const gchar *)data);
        }
 
        if (!allocated) {
-               newbuf->data = memory_pool_alloc_tmp (d->pool, sizeof (f_str_t));
-               newbuf->data->begin = memory_pool_alloc_tmp (d->pool, len);
-               newbuf->data->size = len;
-               newbuf->data->len = len;
-
-               /* We need to copy data to temporary internal buffer to avoid using of stack variables */
-               memcpy (newbuf->data->begin, data, len);
+               newbuf->data = g_string_new_len (data, len);
+               newbuf->allocated = TRUE;
        }
        else {
-               newbuf->data = memory_pool_alloc_tmp (d->pool, sizeof (f_str_t));
-               newbuf->data->begin = (gchar *)data;
-               newbuf->data->size = len;
+               newbuf->data = g_string_new (NULL);
+               newbuf->data->str = (gchar *)data;
+               newbuf->data->len = len;
+               newbuf->data->allocated_len = len;
+               newbuf->allocated = FALSE;
        }
 
-       newbuf->pos = newbuf->data->begin;
-       newbuf->data->len = len;
-
-       g_queue_push_head (d->out_buffers, newbuf);
+       APPEND_OUT_BUFFER (d, newbuf);
 
        if (!delayed) {
                debug_ip("plan write event");
@@ -738,6 +688,31 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, const void *data, size_t le
        return TRUE;
 }
 
+gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d,
+                                                                                                 GString *str,
+                                                                                                 gboolean delayed,
+                                                                                                 gboolean free_on_write)
+{
+       struct rspamd_out_buffer_s                *newbuf;
+
+       newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s));
+       newbuf->data = str;
+       newbuf->allocated = free_on_write;
+
+       APPEND_OUT_BUFFER (d, newbuf);
+
+       if (!delayed) {
+               debug_ip("plan write event");
+               return write_buffers (d->fd, d, FALSE);
+       }
+       /* Otherwise plan write event */
+       event_del (d->ev);
+       event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+       event_base_set (d->ev_base, d->ev);
+       event_add (d->ev, d->tv);
+
+       return TRUE;
+}
 
 gboolean 
 rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len)
@@ -790,7 +765,11 @@ rspamd_dispatcher_restore (rspamd_io_dispatcher_t * d)
 void
 rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *d)
 {
-       g_queue_clear (d->out_buffers);
+       struct rspamd_out_buffer_s *cur, *tmp;
+
+       DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
+               DELETE_OUT_BUFFER (d, cur);
+       }
        /* Cleanup temporary data */
        memory_pool_cleanup_tmp (d->pool);
        d->in_buf = NULL;
index 11b03bca4b1d3ad854a16866c43c234b225b56a8..a68284af0c02bafb8f3c528ab146765c93180427 100644 (file)
@@ -31,9 +31,18 @@ typedef struct rspamd_buffer_s {
        gchar *pos;                                                                                                             /**< current position           */
 } rspamd_buffer_t;
 
+struct rspamd_out_buffer_s {
+       GString *data;
+       gboolean allocated;
+       struct rspamd_out_buffer_s *prev, *next;
+};
+
 typedef struct rspamd_io_dispatcher_s {
        rspamd_buffer_t *in_buf;                                                                                /**< input buffer                       */
-       GQueue *out_buffers;                                                                                    /**< out buffers chain          */
+       struct {
+               guint pending;
+               struct rspamd_out_buffer_s *buffers;
+       } out_buffers;                                                                                                  /**< output buffers chain       */
        struct timeval *tv;                                                                                             /**< io timeout                         */
        struct event *ev;                                                                                               /**< libevent io event          */
        memory_pool_t *pool;                                                                                    /**< where to store data        */
@@ -46,6 +55,7 @@ typedef struct rspamd_io_dispatcher_s {
        dispatcher_write_callback_t write_callback;                                             /**< write callback                     */
        dispatcher_err_callback_t err_callback;                                                 /**< error callback                     */
        void *user_data;                                                                                                /**< user's data for callbacks */
+       gulong default_buf_size;                                                                                /**< default size for buffering */
        off_t offset;                                                                                                   /**< for sendfile use           */
        size_t file_size;
        gint sendfile_fd;
@@ -97,7 +107,21 @@ void rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
  */
 gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
                                                                                                  const void *data,
-                                                                                                 size_t len, gboolean delayed, gboolean allocated) G_GNUC_WARN_UNUSED_RESULT;
+                                                                                                 size_t len, gboolean delayed,
+                                                                                                 gboolean allocated) G_GNUC_WARN_UNUSED_RESULT;
+
+/**
+ * Write a GString to dispatcher
+ * @param d dipatcher object
+ * @param str string to write
+ * @param delayed delay write
+ * @param free_on_write free string after writing to a socket
+ * @return TRUE if write has been queued successfully
+ */
+gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d,
+                                                                                                 GString *str,
+                                                                                                 gboolean delayed,
+                                                                                                 gboolean free_on_write) G_GNUC_WARN_UNUSED_RESULT;
 
 /**
  * Send specified descriptor to dispatcher