From 64d2a3ca66ab31045edcdf711ff6956c1919c1e9 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 10 Oct 2013 17:51:55 +0100 Subject: [PATCH] Improve output buffering architecture. --- src/buffer.c | 193 +++++++++++++++++++++++---------------------------- src/buffer.h | 28 +++++++- 2 files changed, 112 insertions(+), 109 deletions(-) diff --git a/src/buffer.c b/src/buffer.c index 2c86b1c9a..381d77cd2 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -174,38 +174,36 @@ sendfile_callback (rspamd_io_dispatcher_t *d) #define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin) +#define APPEND_OUT_BUFFER(d, buf) do { \ + DL_APPEND((d)->out_buffers.buffers, buf); \ + (d)->out_buffers.pending ++; \ + } while (0) +#define DELETE_OUT_BUFFER(d, buf) do { \ + DL_DELETE((d)->out_buffers.buffers, (buf)); \ + g_string_free((buf->data), (buf)->allocated); \ + g_slice_free1(sizeof (struct rspamd_out_buffer_s), (buf)); \ + (d)->out_buffers.pending --; \ + } while (0) + static gboolean write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) { - GList *cur = NULL, *tmp; GError *err = NULL; - rspamd_buffer_t *buf; + struct rspamd_out_buffer_s *cur = NULL, *tmp; ssize_t r; - struct iovec *iov; - guint i, len, blen; - - len = g_queue_get_length (d->out_buffers); - if (len > 1) { - /* IOV version */ + struct iovec *iov; + guint i, len; + len = d->out_buffers.pending; + while (len > 0) { /* Unset delayed as actually we HAVE buffers to write */ is_delayed = TRUE; - cur = d->out_buffers->tail; iov = g_slice_alloc (len * sizeof (struct iovec)); i = 0; - while (cur) { - buf = cur->data; - blen = BUFREMAIN (buf); - if (blen > 0) { - iov[i].iov_base = buf->pos; - iov[i].iov_len = blen; - } - else { - iov[i].iov_base = NULL; - iov[i].iov_len = 0; - } + DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) { + iov[i].iov_base = cur->data->str; + iov[i].iov_len = cur->data->len; i ++; - cur = g_list_previous (cur); } /* Now try to write the whole vector */ r = writev (fd, iov, len); @@ -219,28 +217,20 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) } else if (r > 0) { /* Find pos inside buffers */ - cur = d->out_buffers->tail; - i = 0; - while (cur) { - buf = cur->data; - blen = BUFREMAIN (buf); - if (r >= (ssize_t)blen) { - tmp = cur; - cur = g_list_previous (cur); + DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) { + if (r >= (ssize_t)cur->data->len) { /* Mark this buffer as read */ - g_queue_delete_link (d->out_buffers, tmp); - r -= blen; + r -= cur->data->len; + DELETE_OUT_BUFFER (d, cur); } else { /* This buffer was not written completely */ - buf->pos += r; + g_string_erase (cur->data, 0, r); break; } - i ++; - cur = g_list_previous (cur); } g_slice_free1 (len * sizeof (struct iovec), iov); - if (cur != 0) { + if (d->out_buffers.pending > 0) { /* Wait for other event */ event_del (d->ev); event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); @@ -269,49 +259,11 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) return TRUE; } } - else if (len == 1) { - /* Single write version */ - buf = d->out_buffers->head->data; - r = write (fd, buf->pos, BUFREMAIN (buf)); - if (r == -1 && errno != EAGAIN) { - if (d->err_callback) { - err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); - d->err_callback (err, d->user_data); - return FALSE; - } - } - else if (r > 0) { - buf->pos += r; - if (BUFREMAIN (buf) != 0) { - /* Continue with this buffer */ - debug_ip("wrote %z bytes of %z", r, buf->data->len); - return write_buffers (fd, d, is_delayed); - } - } - else if (r == 0) { - /* Got EOF while we wait for data */ - if (d->err_callback) { - err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); - d->err_callback (err, d->user_data); - return FALSE; - } - } - else if (r == -1 && errno == EAGAIN) { - debug_ip("partially write data, retry"); - /* Wait for other event */ - event_del (d->ev); - event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - return TRUE; - } - } - if (cur == NULL) { + if (d->out_buffers.pending == 0) { /* Disable write event for this time */ - g_queue_clear (d->out_buffers); - debug_ip("all buffers were written successfully"); + debug_ip ("all buffers were written successfully"); if (is_delayed && d->write_callback) { if (!d->write_callback (d->user_data)) { @@ -355,7 +307,7 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) if (d->in_buf == NULL) { d->in_buf = memory_pool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t)); if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) { - d->in_buf->data = fstralloc_tmp (d->pool, BUFSIZ); + d->in_buf->data = fstralloc_tmp (d->pool, d->default_buf_size); } else { d->in_buf->data = fstralloc_tmp (d->pool, d->nchars + 1); @@ -570,7 +522,7 @@ dispatcher_cb (gint fd, short what, void *arg) sendfile_callback (d); } else { - if (g_queue_get_length (d->out_buffers) == 0) { + if (d->out_buffers.pending == 0) { if (d->half_closed && !d->is_restored) { /* Socket is half closed and there is nothing more to write, closing connection */ if (d->err_callback) { @@ -612,8 +564,7 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic return NULL; } - new = g_malloc (sizeof (rspamd_io_dispatcher_t)); - bzero (new, sizeof (rspamd_io_dispatcher_t)); + new = g_slice_alloc (sizeof (rspamd_io_dispatcher_t)); new->pool = memory_pool_new (memory_pool_get_size ()); if (tv != NULL) { @@ -634,11 +585,11 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic new->half_closed = FALSE; new->want_read = TRUE; new->is_restored = FALSE; + new->default_buf_size = sysconf (_SC_PAGESIZE); new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event)); new->fd = fd; new->ev_base = base; - new->out_buffers = g_queue_new (); event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new); event_base_set (new->ev_base, new->ev); @@ -648,13 +599,17 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic } void -rspamd_remove_dispatcher (rspamd_io_dispatcher_t * dispatcher) +rspamd_remove_dispatcher (rspamd_io_dispatcher_t * d) { - if (dispatcher != NULL) { - event_del (dispatcher->ev); - memory_pool_delete (dispatcher->pool); - g_queue_free (dispatcher->out_buffers); - g_free (dispatcher); + struct rspamd_out_buffer_s *cur, *tmp; + + if (d != NULL) { + DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) { + DELETE_OUT_BUFFER (d, cur); + } + event_del (d->ev); + memory_pool_delete (d->pool); + g_slice_free1 (sizeof (rspamd_io_dispatcher_t), d); } } @@ -666,7 +621,7 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, if (d->policy != policy || nchars != d->nchars) { d->policy = policy; - d->nchars = nchars ? nchars : BUFSIZ; + d->nchars = nchars ? nchars : d->default_buf_size; /* Resize input buffer if needed */ if (policy == BUFFER_CHARACTER && nchars != 0) { if (d->in_buf && d->in_buf->data->size < nchars) { @@ -679,8 +634,8 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, } } else if (policy == BUFFER_LINE || policy == BUFFER_ANY) { - if (d->in_buf && d->nchars < BUFSIZ) { - tmp = fstralloc_tmp (d->pool, BUFSIZ); + if (d->in_buf && d->nchars < d->default_buf_size) { + tmp = fstralloc_tmp (d->pool, d->default_buf_size); memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); t = d->in_buf->pos - d->in_buf->data->begin; tmp->len = d->in_buf->data->len; @@ -695,35 +650,30 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, } gboolean -rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, const void *data, size_t len, gboolean delayed, gboolean allocated) +rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, + const void *data, size_t len, gboolean delayed, gboolean allocated) { - rspamd_buffer_t *newbuf; + struct rspamd_out_buffer_s *newbuf; - newbuf = memory_pool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t)); + newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s)); if (len == 0) { /* Assume NULL terminated */ len = strlen ((const gchar *)data); } if (!allocated) { - newbuf->data = memory_pool_alloc_tmp (d->pool, sizeof (f_str_t)); - newbuf->data->begin = memory_pool_alloc_tmp (d->pool, len); - newbuf->data->size = len; - newbuf->data->len = len; - - /* We need to copy data to temporary internal buffer to avoid using of stack variables */ - memcpy (newbuf->data->begin, data, len); + newbuf->data = g_string_new_len (data, len); + newbuf->allocated = TRUE; } else { - newbuf->data = memory_pool_alloc_tmp (d->pool, sizeof (f_str_t)); - newbuf->data->begin = (gchar *)data; - newbuf->data->size = len; + newbuf->data = g_string_new (NULL); + newbuf->data->str = (gchar *)data; + newbuf->data->len = len; + newbuf->data->allocated_len = len; + newbuf->allocated = FALSE; } - newbuf->pos = newbuf->data->begin; - newbuf->data->len = len; - - g_queue_push_head (d->out_buffers, newbuf); + APPEND_OUT_BUFFER (d, newbuf); if (!delayed) { debug_ip("plan write event"); @@ -738,6 +688,31 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, const void *data, size_t le return TRUE; } +gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d, + GString *str, + gboolean delayed, + gboolean free_on_write) +{ + struct rspamd_out_buffer_s *newbuf; + + newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s)); + newbuf->data = str; + newbuf->allocated = free_on_write; + + APPEND_OUT_BUFFER (d, newbuf); + + if (!delayed) { + debug_ip("plan write event"); + return write_buffers (d->fd, d, FALSE); + } + /* Otherwise plan write event */ + event_del (d->ev); + event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); + event_add (d->ev, d->tv); + + return TRUE; +} gboolean rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len) @@ -790,7 +765,11 @@ rspamd_dispatcher_restore (rspamd_io_dispatcher_t * d) void rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *d) { - g_queue_clear (d->out_buffers); + struct rspamd_out_buffer_s *cur, *tmp; + + DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) { + DELETE_OUT_BUFFER (d, cur); + } /* Cleanup temporary data */ memory_pool_cleanup_tmp (d->pool); d->in_buf = NULL; diff --git a/src/buffer.h b/src/buffer.h index 11b03bca4..a68284af0 100644 --- a/src/buffer.h +++ b/src/buffer.h @@ -31,9 +31,18 @@ typedef struct rspamd_buffer_s { gchar *pos; /**< current position */ } rspamd_buffer_t; +struct rspamd_out_buffer_s { + GString *data; + gboolean allocated; + struct rspamd_out_buffer_s *prev, *next; +}; + typedef struct rspamd_io_dispatcher_s { rspamd_buffer_t *in_buf; /**< input buffer */ - GQueue *out_buffers; /**< out buffers chain */ + struct { + guint pending; + struct rspamd_out_buffer_s *buffers; + } out_buffers; /**< output buffers chain */ struct timeval *tv; /**< io timeout */ struct event *ev; /**< libevent io event */ memory_pool_t *pool; /**< where to store data */ @@ -46,6 +55,7 @@ typedef struct rspamd_io_dispatcher_s { dispatcher_write_callback_t write_callback; /**< write callback */ dispatcher_err_callback_t err_callback; /**< error callback */ void *user_data; /**< user's data for callbacks */ + gulong default_buf_size; /**< default size for buffering */ off_t offset; /**< for sendfile use */ size_t file_size; gint sendfile_fd; @@ -97,7 +107,21 @@ void rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, */ gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d, const void *data, - size_t len, gboolean delayed, gboolean allocated) G_GNUC_WARN_UNUSED_RESULT; + size_t len, gboolean delayed, + gboolean allocated) G_GNUC_WARN_UNUSED_RESULT; + +/** + * Write a GString to dispatcher + * @param d dipatcher object + * @param str string to write + * @param delayed delay write + * @param free_on_write free string after writing to a socket + * @return TRUE if write has been queued successfully + */ +gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d, + GString *str, + gboolean delayed, + gboolean free_on_write) G_GNUC_WARN_UNUSED_RESULT; /** * Send specified descriptor to dispatcher -- 2.39.5