aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/buffer.c116
-rw-r--r--src/buffer.h2
-rw-r--r--src/kvstorage_server.c7
3 files changed, 98 insertions, 27 deletions
diff --git a/src/buffer.c b/src/buffer.c
index 95e42f756..e647374c2 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -177,23 +177,98 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
static gboolean
write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
{
- GList *cur;
+ GList *cur = NULL, *tmp;
GError *err = NULL;
rspamd_buffer_t *buf;
ssize_t r;
-
- /* Fix order */
- if (d->out_buffers) {
- d->out_buffers = g_list_reverse (d->out_buffers);
- }
- cur = g_list_first (d->out_buffers);
- while (cur) {
- buf = (rspamd_buffer_t *) cur->data;
- if (BUFREMAIN (buf) == 0) {
- /* Skip empty buffers */
- cur = g_list_next (cur);
- continue;
+ struct iovec *iov;
+ guint i, len, blen;
+
+ len = g_queue_get_length (d->out_buffers);
+ if (len > 1) {
+ /* IOV version */
+ cur = d->out_buffers->tail;
+ iov = g_slice_alloc (len * sizeof (struct iovec));
+ i = 0;
+ while (cur) {
+ buf = cur->data;
+ blen = BUFREMAIN (buf);
+ if (blen > 0) {
+ iov[i].iov_base = buf->pos;
+ iov[i].iov_len = blen;
+ }
+ else {
+ iov[i].iov_base = NULL;
+ iov[i].iov_len = 0;
+ }
+ i ++;
+ cur = g_list_previous (cur);
}
+ /* Now try to write the whole vector */
+ r = writev (fd, iov, len);
+ if (r == -1 && errno != EAGAIN) {
+ g_slice_free1 (len * sizeof (struct iovec), iov);
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ d->err_callback (err, d->user_data);
+ return FALSE;
+ }
+ }
+ else if (r > 0) {
+ /* Find pos inside buffers */
+ cur = d->out_buffers->tail;
+ i = 0;
+ while (cur) {
+ buf = cur->data;
+ blen = BUFREMAIN (buf);
+ if (r >= blen) {
+ tmp = cur;
+ cur = g_list_previous (cur);
+ /* Mark this buffer as read */
+ g_queue_delete_link (d->out_buffers, tmp);
+ r -= blen;
+ }
+ else {
+ /* This buffer was not written completely */
+ buf->pos += r;
+ break;
+ }
+ i ++;
+ cur = g_list_previous (cur);
+ }
+ g_slice_free1 (len * sizeof (struct iovec), iov);
+ if (cur != 0) {
+ /* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
+ event_add (d->ev, d->tv);
+ return TRUE;
+ }
+ }
+ else if (r == 0) {
+ /* Got EOF while we wait for data */
+ g_slice_free1 (len * sizeof (struct iovec), iov);
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+ d->err_callback (err, d->user_data);
+ return FALSE;
+ }
+ }
+ else if (r == -1 && errno == EAGAIN) {
+ g_slice_free1 (len * sizeof (struct iovec), iov);
+ debug_ip("partially write data, retry");
+ /* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
+ event_add (d->ev, d->tv);
+ return TRUE;
+ }
+ }
+ else if (len == 1) {
+ /* Single write version */
+ buf = d->out_buffers->head->data;
r = write (fd, buf->pos, BUFREMAIN (buf));
if (r == -1 && errno != EAGAIN) {
if (d->err_callback) {
@@ -207,7 +282,7 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
if (BUFREMAIN (buf) != 0) {
/* Continue with this buffer */
debug_ip("wrote %z bytes of %z", r, buf->data->len);
- continue;
+ return write_buffers (fd, d, is_delayed);
}
}
else if (r == 0) {
@@ -227,13 +302,11 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
event_add (d->ev, d->tv);
return TRUE;
}
- cur = g_list_next (cur);
}
if (cur == NULL) {
/* Disable write event for this time */
- g_list_free (d->out_buffers);
- d->out_buffers = NULL;
+ g_queue_clear (d->out_buffers);
debug_ip("all buffers were written successfully");
@@ -472,7 +545,7 @@ dispatcher_cb (gint fd, short what, void *arg)
sendfile_callback (d);
}
else {
- if (d->out_buffers == NULL) {
+ if (g_queue_get_length (d->out_buffers) == 0) {
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
event_base_set (d->ev_base, d->ev);
@@ -526,6 +599,7 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic
new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
new->fd = fd;
new->ev_base = base;
+ new->out_buffers = g_queue_new ();
event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
event_base_set (new->ev_base, new->ev);
@@ -540,9 +614,7 @@ rspamd_remove_dispatcher (rspamd_io_dispatcher_t * dispatcher)
if (dispatcher != NULL) {
event_del (dispatcher->ev);
memory_pool_delete (dispatcher->pool);
- if (dispatcher->out_buffers) {
- g_list_free (dispatcher->out_buffers);
- }
+ g_queue_free (dispatcher->out_buffers);
g_free (dispatcher);
}
}
@@ -609,7 +681,7 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, void *data, size_t len, gbo
newbuf->pos = newbuf->data->begin;
newbuf->data->len = len;
- d->out_buffers = g_list_prepend (d->out_buffers, newbuf);
+ g_queue_push_head (d->out_buffers, newbuf);
if (!delayed) {
debug_ip("plan write event");
diff --git a/src/buffer.h b/src/buffer.h
index 51b321833..96e38d471 100644
--- a/src/buffer.h
+++ b/src/buffer.h
@@ -33,7 +33,7 @@ typedef struct rspamd_buffer_s {
typedef struct rspamd_io_dispatcher_s {
rspamd_buffer_t *in_buf; /**< input buffer */
- GList *out_buffers; /**< out buffers chain */
+ GQueue *out_buffers; /**< out buffers chain */
struct timeval *tv; /**< io timeout */
struct event *ev; /**< libevent io event */
memory_pool_t *pool; /**< where to store data */
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index 48068cdf7..19a4b3b3d 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -355,10 +355,10 @@ kvstorage_read_socket (f_str_t * in, void *arg)
r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
elt->key, elt->flags, elt->size);
if (!rspamd_dispatcher_write (session->dispather, outbuf,
- r, FALSE, FALSE)) {
+ r, TRUE, FALSE)) {
return FALSE;
}
- if (!rspamd_dispatcher_write (session->dispather, elt->data, elt->size, FALSE, TRUE)) {
+ if (!rspamd_dispatcher_write (session->dispather, elt->data, elt->size, TRUE, TRUE)) {
return FALSE;
}
return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
@@ -443,9 +443,9 @@ thr_accept_socket (gint fd, short what, void *arg)
g_static_mutex_unlock (thr->accept_mtx);
return;
}
+ g_static_mutex_unlock (thr->accept_mtx);
/* Check for EAGAIN */
if (nfd == 0) {
- g_static_mutex_unlock (thr->accept_mtx);
return;
}
@@ -465,7 +465,6 @@ thr_accept_socket (gint fd, short what, void *arg)
memcpy (&session->client_addr, &su.s4.sin_addr,
sizeof (struct in_addr));
}
- g_static_mutex_unlock (thr->accept_mtx);
}
/**