aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2013-10-10 17:51:55 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2013-10-10 17:51:55 +0100
commit64d2a3ca66ab31045edcdf711ff6956c1919c1e9 (patch)
treeb69d085dbc8f70b1c115fa88c2fe0aab191479a5
parent24939dddbf5b1b5bc35d303a2813ef825803d885 (diff)
downloadrspamd-64d2a3ca66ab31045edcdf711ff6956c1919c1e9.tar.gz
rspamd-64d2a3ca66ab31045edcdf711ff6956c1919c1e9.zip
Improve output buffering architecture.
-rw-r--r--src/buffer.c193
-rw-r--r--src/buffer.h28
2 files changed, 112 insertions, 109 deletions
diff --git a/src/buffer.c b/src/buffer.c
index 2c86b1c9a..381d77cd2 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -174,38 +174,36 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
#define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
+#define APPEND_OUT_BUFFER(d, buf) do { \
+ DL_APPEND((d)->out_buffers.buffers, buf); \
+ (d)->out_buffers.pending ++; \
+ } while (0)
+#define DELETE_OUT_BUFFER(d, buf) do { \
+ DL_DELETE((d)->out_buffers.buffers, (buf)); \
+ g_string_free((buf->data), (buf)->allocated); \
+ g_slice_free1(sizeof (struct rspamd_out_buffer_s), (buf)); \
+ (d)->out_buffers.pending --; \
+ } while (0)
+
static gboolean
write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
{
- GList *cur = NULL, *tmp;
GError *err = NULL;
- rspamd_buffer_t *buf;
+ struct rspamd_out_buffer_s *cur = NULL, *tmp;
ssize_t r;
- struct iovec *iov;
- guint i, len, blen;
-
- len = g_queue_get_length (d->out_buffers);
- if (len > 1) {
- /* IOV version */
+ struct iovec *iov;
+ guint i, len;
+ len = d->out_buffers.pending;
+ while (len > 0) {
/* Unset delayed as actually we HAVE buffers to write */
is_delayed = TRUE;
- cur = d->out_buffers->tail;
iov = g_slice_alloc (len * sizeof (struct iovec));
i = 0;
- while (cur) {
- buf = cur->data;
- blen = BUFREMAIN (buf);
- if (blen > 0) {
- iov[i].iov_base = buf->pos;
- iov[i].iov_len = blen;
- }
- else {
- iov[i].iov_base = NULL;
- iov[i].iov_len = 0;
- }
+ DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
+ iov[i].iov_base = cur->data->str;
+ iov[i].iov_len = cur->data->len;
i ++;
- cur = g_list_previous (cur);
}
/* Now try to write the whole vector */
r = writev (fd, iov, len);
@@ -219,28 +217,20 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
}
else if (r > 0) {
/* Find pos inside buffers */
- cur = d->out_buffers->tail;
- i = 0;
- while (cur) {
- buf = cur->data;
- blen = BUFREMAIN (buf);
- if (r >= (ssize_t)blen) {
- tmp = cur;
- cur = g_list_previous (cur);
+ DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
+ if (r >= (ssize_t)cur->data->len) {
/* Mark this buffer as read */
- g_queue_delete_link (d->out_buffers, tmp);
- r -= blen;
+ r -= cur->data->len;
+ DELETE_OUT_BUFFER (d, cur);
}
else {
/* This buffer was not written completely */
- buf->pos += r;
+ g_string_erase (cur->data, 0, r);
break;
}
- i ++;
- cur = g_list_previous (cur);
}
g_slice_free1 (len * sizeof (struct iovec), iov);
- if (cur != 0) {
+ if (d->out_buffers.pending > 0) {
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
@@ -269,49 +259,11 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
return TRUE;
}
}
- else if (len == 1) {
- /* Single write version */
- buf = d->out_buffers->head->data;
- r = write (fd, buf->pos, BUFREMAIN (buf));
- if (r == -1 && errno != EAGAIN) {
- if (d->err_callback) {
- err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
- d->err_callback (err, d->user_data);
- return FALSE;
- }
- }
- else if (r > 0) {
- buf->pos += r;
- if (BUFREMAIN (buf) != 0) {
- /* Continue with this buffer */
- debug_ip("wrote %z bytes of %z", r, buf->data->len);
- return write_buffers (fd, d, is_delayed);
- }
- }
- else if (r == 0) {
- /* Got EOF while we wait for data */
- if (d->err_callback) {
- err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
- d->err_callback (err, d->user_data);
- return FALSE;
- }
- }
- else if (r == -1 && errno == EAGAIN) {
- debug_ip("partially write data, retry");
- /* Wait for other event */
- event_del (d->ev);
- event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- return TRUE;
- }
- }
- if (cur == NULL) {
+ if (d->out_buffers.pending == 0) {
/* Disable write event for this time */
- g_queue_clear (d->out_buffers);
- debug_ip("all buffers were written successfully");
+ debug_ip ("all buffers were written successfully");
if (is_delayed && d->write_callback) {
if (!d->write_callback (d->user_data)) {
@@ -355,7 +307,7 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
if (d->in_buf == NULL) {
d->in_buf = memory_pool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t));
if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) {
- d->in_buf->data = fstralloc_tmp (d->pool, BUFSIZ);
+ d->in_buf->data = fstralloc_tmp (d->pool, d->default_buf_size);
}
else {
d->in_buf->data = fstralloc_tmp (d->pool, d->nchars + 1);
@@ -570,7 +522,7 @@ dispatcher_cb (gint fd, short what, void *arg)
sendfile_callback (d);
}
else {
- if (g_queue_get_length (d->out_buffers) == 0) {
+ if (d->out_buffers.pending == 0) {
if (d->half_closed && !d->is_restored) {
/* Socket is half closed and there is nothing more to write, closing connection */
if (d->err_callback) {
@@ -612,8 +564,7 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic
return NULL;
}
- new = g_malloc (sizeof (rspamd_io_dispatcher_t));
- bzero (new, sizeof (rspamd_io_dispatcher_t));
+ new = g_slice_alloc (sizeof (rspamd_io_dispatcher_t));
new->pool = memory_pool_new (memory_pool_get_size ());
if (tv != NULL) {
@@ -634,11 +585,11 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic
new->half_closed = FALSE;
new->want_read = TRUE;
new->is_restored = FALSE;
+ new->default_buf_size = sysconf (_SC_PAGESIZE);
new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
new->fd = fd;
new->ev_base = base;
- new->out_buffers = g_queue_new ();
event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
event_base_set (new->ev_base, new->ev);
@@ -648,13 +599,17 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic
}
void
-rspamd_remove_dispatcher (rspamd_io_dispatcher_t * dispatcher)
+rspamd_remove_dispatcher (rspamd_io_dispatcher_t * d)
{
- if (dispatcher != NULL) {
- event_del (dispatcher->ev);
- memory_pool_delete (dispatcher->pool);
- g_queue_free (dispatcher->out_buffers);
- g_free (dispatcher);
+ struct rspamd_out_buffer_s *cur, *tmp;
+
+ if (d != NULL) {
+ DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
+ DELETE_OUT_BUFFER (d, cur);
+ }
+ event_del (d->ev);
+ memory_pool_delete (d->pool);
+ g_slice_free1 (sizeof (rspamd_io_dispatcher_t), d);
}
}
@@ -666,7 +621,7 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy,
if (d->policy != policy || nchars != d->nchars) {
d->policy = policy;
- d->nchars = nchars ? nchars : BUFSIZ;
+ d->nchars = nchars ? nchars : d->default_buf_size;
/* Resize input buffer if needed */
if (policy == BUFFER_CHARACTER && nchars != 0) {
if (d->in_buf && d->in_buf->data->size < nchars) {
@@ -679,8 +634,8 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy,
}
}
else if (policy == BUFFER_LINE || policy == BUFFER_ANY) {
- if (d->in_buf && d->nchars < BUFSIZ) {
- tmp = fstralloc_tmp (d->pool, BUFSIZ);
+ if (d->in_buf && d->nchars < d->default_buf_size) {
+ tmp = fstralloc_tmp (d->pool, d->default_buf_size);
memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
t = d->in_buf->pos - d->in_buf->data->begin;
tmp->len = d->in_buf->data->len;
@@ -695,35 +650,30 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy,
}
gboolean
-rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, const void *data, size_t len, gboolean delayed, gboolean allocated)
+rspamd_dispatcher_write (rspamd_io_dispatcher_t * d,
+ const void *data, size_t len, gboolean delayed, gboolean allocated)
{
- rspamd_buffer_t *newbuf;
+ struct rspamd_out_buffer_s *newbuf;
- newbuf = memory_pool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t));
+ newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s));
if (len == 0) {
/* Assume NULL terminated */
len = strlen ((const gchar *)data);
}
if (!allocated) {
- newbuf->data = memory_pool_alloc_tmp (d->pool, sizeof (f_str_t));
- newbuf->data->begin = memory_pool_alloc_tmp (d->pool, len);
- newbuf->data->size = len;
- newbuf->data->len = len;
-
- /* We need to copy data to temporary internal buffer to avoid using of stack variables */
- memcpy (newbuf->data->begin, data, len);
+ newbuf->data = g_string_new_len (data, len);
+ newbuf->allocated = TRUE;
}
else {
- newbuf->data = memory_pool_alloc_tmp (d->pool, sizeof (f_str_t));
- newbuf->data->begin = (gchar *)data;
- newbuf->data->size = len;
+ newbuf->data = g_string_new (NULL);
+ newbuf->data->str = (gchar *)data;
+ newbuf->data->len = len;
+ newbuf->data->allocated_len = len;
+ newbuf->allocated = FALSE;
}
- newbuf->pos = newbuf->data->begin;
- newbuf->data->len = len;
-
- g_queue_push_head (d->out_buffers, newbuf);
+ APPEND_OUT_BUFFER (d, newbuf);
if (!delayed) {
debug_ip("plan write event");
@@ -738,6 +688,31 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, const void *data, size_t le
return TRUE;
}
+gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d,
+ GString *str,
+ gboolean delayed,
+ gboolean free_on_write)
+{
+ struct rspamd_out_buffer_s *newbuf;
+
+ newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s));
+ newbuf->data = str;
+ newbuf->allocated = free_on_write;
+
+ APPEND_OUT_BUFFER (d, newbuf);
+
+ if (!delayed) {
+ debug_ip("plan write event");
+ return write_buffers (d->fd, d, FALSE);
+ }
+ /* Otherwise plan write event */
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
+ event_add (d->ev, d->tv);
+
+ return TRUE;
+}
gboolean
rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len)
@@ -790,7 +765,11 @@ rspamd_dispatcher_restore (rspamd_io_dispatcher_t * d)
void
rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *d)
{
- g_queue_clear (d->out_buffers);
+ struct rspamd_out_buffer_s *cur, *tmp;
+
+ DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
+ DELETE_OUT_BUFFER (d, cur);
+ }
/* Cleanup temporary data */
memory_pool_cleanup_tmp (d->pool);
d->in_buf = NULL;
diff --git a/src/buffer.h b/src/buffer.h
index 11b03bca4..a68284af0 100644
--- a/src/buffer.h
+++ b/src/buffer.h
@@ -31,9 +31,18 @@ typedef struct rspamd_buffer_s {
gchar *pos; /**< current position */
} rspamd_buffer_t;
+struct rspamd_out_buffer_s {
+ GString *data;
+ gboolean allocated;
+ struct rspamd_out_buffer_s *prev, *next;
+};
+
typedef struct rspamd_io_dispatcher_s {
rspamd_buffer_t *in_buf; /**< input buffer */
- GQueue *out_buffers; /**< out buffers chain */
+ struct {
+ guint pending;
+ struct rspamd_out_buffer_s *buffers;
+ } out_buffers; /**< output buffers chain */
struct timeval *tv; /**< io timeout */
struct event *ev; /**< libevent io event */
memory_pool_t *pool; /**< where to store data */
@@ -46,6 +55,7 @@ typedef struct rspamd_io_dispatcher_s {
dispatcher_write_callback_t write_callback; /**< write callback */
dispatcher_err_callback_t err_callback; /**< error callback */
void *user_data; /**< user's data for callbacks */
+ gulong default_buf_size; /**< default size for buffering */
off_t offset; /**< for sendfile use */
size_t file_size;
gint sendfile_fd;
@@ -97,7 +107,21 @@ void rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
*/
gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
const void *data,
- size_t len, gboolean delayed, gboolean allocated) G_GNUC_WARN_UNUSED_RESULT;
+ size_t len, gboolean delayed,
+ gboolean allocated) G_GNUC_WARN_UNUSED_RESULT;
+
+/**
+ * Write a GString to dispatcher
+ * @param d dipatcher object
+ * @param str string to write
+ * @param delayed delay write
+ * @param free_on_write free string after writing to a socket
+ * @return TRUE if write has been queued successfully
+ */
+gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d,
+ GString *str,
+ gboolean delayed,
+ gboolean free_on_write) G_GNUC_WARN_UNUSED_RESULT;
/**
* Send specified descriptor to dispatcher