aboutsummaryrefslogtreecommitdiffstats
path: root/src/buffer.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-10-31 20:55:54 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-10-31 20:55:54 +0300
commit48e621e0c0fcaa3bbb788147ccd4fc302c6c929b (patch)
treeab4c170ceb7021c0c9f34468987b882ff2c6a705 /src/buffer.c
parentc750ae5859456d3bc7593e5998deac48cfad7c69 (diff)
downloadrspamd-48e621e0c0fcaa3bbb788147ccd4fc302c6c929b.tar.gz
rspamd-48e621e0c0fcaa3bbb788147ccd4fc302c6c929b.zip
* Significate performance improving by vectorizing IO output (about 4 times for kv storage).
Diffstat (limited to 'src/buffer.c')
-rw-r--r--src/buffer.c116
1 files changed, 94 insertions, 22 deletions
diff --git a/src/buffer.c b/src/buffer.c
index 95e42f756..e647374c2 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -177,23 +177,98 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
static gboolean
write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
{
- GList *cur;
+ GList *cur = NULL, *tmp;
GError *err = NULL;
rspamd_buffer_t *buf;
ssize_t r;
-
- /* Fix order */
- if (d->out_buffers) {
- d->out_buffers = g_list_reverse (d->out_buffers);
- }
- cur = g_list_first (d->out_buffers);
- while (cur) {
- buf = (rspamd_buffer_t *) cur->data;
- if (BUFREMAIN (buf) == 0) {
- /* Skip empty buffers */
- cur = g_list_next (cur);
- continue;
+ struct iovec *iov;
+ guint i, len, blen;
+
+ len = g_queue_get_length (d->out_buffers);
+ if (len > 1) {
+ /* IOV version */
+ cur = d->out_buffers->tail;
+ iov = g_slice_alloc (len * sizeof (struct iovec));
+ i = 0;
+ while (cur) {
+ buf = cur->data;
+ blen = BUFREMAIN (buf);
+ if (blen > 0) {
+ iov[i].iov_base = buf->pos;
+ iov[i].iov_len = blen;
+ }
+ else {
+ iov[i].iov_base = NULL;
+ iov[i].iov_len = 0;
+ }
+ i ++;
+ cur = g_list_previous (cur);
}
+ /* Now try to write the whole vector */
+ r = writev (fd, iov, len);
+ if (r == -1 && errno != EAGAIN) {
+ g_slice_free1 (len * sizeof (struct iovec), iov);
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ d->err_callback (err, d->user_data);
+ return FALSE;
+ }
+ }
+ else if (r > 0) {
+ /* Find pos inside buffers */
+ cur = d->out_buffers->tail;
+ i = 0;
+ while (cur) {
+ buf = cur->data;
+ blen = BUFREMAIN (buf);
+ if (r >= blen) {
+ tmp = cur;
+ cur = g_list_previous (cur);
+ /* Mark this buffer as read */
+ g_queue_delete_link (d->out_buffers, tmp);
+ r -= blen;
+ }
+ else {
+ /* This buffer was not written completely */
+ buf->pos += r;
+ break;
+ }
+ i ++;
+ cur = g_list_previous (cur);
+ }
+ g_slice_free1 (len * sizeof (struct iovec), iov);
+ if (cur != 0) {
+ /* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
+ event_add (d->ev, d->tv);
+ return TRUE;
+ }
+ }
+ else if (r == 0) {
+ /* Got EOF while we wait for data */
+ g_slice_free1 (len * sizeof (struct iovec), iov);
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+ d->err_callback (err, d->user_data);
+ return FALSE;
+ }
+ }
+ else if (r == -1 && errno == EAGAIN) {
+ g_slice_free1 (len * sizeof (struct iovec), iov);
+ debug_ip("partially write data, retry");
+ /* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
+ event_add (d->ev, d->tv);
+ return TRUE;
+ }
+ }
+ else if (len == 1) {
+ /* Single write version */
+ buf = d->out_buffers->head->data;
r = write (fd, buf->pos, BUFREMAIN (buf));
if (r == -1 && errno != EAGAIN) {
if (d->err_callback) {
@@ -207,7 +282,7 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
if (BUFREMAIN (buf) != 0) {
/* Continue with this buffer */
debug_ip("wrote %z bytes of %z", r, buf->data->len);
- continue;
+ return write_buffers (fd, d, is_delayed);
}
}
else if (r == 0) {
@@ -227,13 +302,11 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
event_add (d->ev, d->tv);
return TRUE;
}
- cur = g_list_next (cur);
}
if (cur == NULL) {
/* Disable write event for this time */
- g_list_free (d->out_buffers);
- d->out_buffers = NULL;
+ g_queue_clear (d->out_buffers);
debug_ip("all buffers were written successfully");
@@ -472,7 +545,7 @@ dispatcher_cb (gint fd, short what, void *arg)
sendfile_callback (d);
}
else {
- if (d->out_buffers == NULL) {
+ if (g_queue_get_length (d->out_buffers) == 0) {
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
event_base_set (d->ev_base, d->ev);
@@ -526,6 +599,7 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic
new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
new->fd = fd;
new->ev_base = base;
+ new->out_buffers = g_queue_new ();
event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
event_base_set (new->ev_base, new->ev);
@@ -540,9 +614,7 @@ rspamd_remove_dispatcher (rspamd_io_dispatcher_t * dispatcher)
if (dispatcher != NULL) {
event_del (dispatcher->ev);
memory_pool_delete (dispatcher->pool);
- if (dispatcher->out_buffers) {
- g_list_free (dispatcher->out_buffers);
- }
+ g_queue_free (dispatcher->out_buffers);
g_free (dispatcher);
}
}
@@ -609,7 +681,7 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, void *data, size_t len, gbo
newbuf->pos = newbuf->data->begin;
newbuf->data->len = len;
- d->out_buffers = g_list_prepend (d->out_buffers, newbuf);
+ g_queue_push_head (d->out_buffers, newbuf);
if (!delayed) {
debug_ip("plan write event");