diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/buffer.c | 116 | ||||
-rw-r--r-- | src/buffer.h | 2 | ||||
-rw-r--r-- | src/kvstorage_server.c | 7 |
3 files changed, 98 insertions, 27 deletions
diff --git a/src/buffer.c b/src/buffer.c index 95e42f756..e647374c2 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -177,23 +177,98 @@ sendfile_callback (rspamd_io_dispatcher_t *d) static gboolean write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) { - GList *cur; + GList *cur = NULL, *tmp; GError *err = NULL; rspamd_buffer_t *buf; ssize_t r; - - /* Fix order */ - if (d->out_buffers) { - d->out_buffers = g_list_reverse (d->out_buffers); - } - cur = g_list_first (d->out_buffers); - while (cur) { - buf = (rspamd_buffer_t *) cur->data; - if (BUFREMAIN (buf) == 0) { - /* Skip empty buffers */ - cur = g_list_next (cur); - continue; + struct iovec *iov; + guint i, len, blen; + + len = g_queue_get_length (d->out_buffers); + if (len > 1) { + /* IOV version */ + cur = d->out_buffers->tail; + iov = g_slice_alloc (len * sizeof (struct iovec)); + i = 0; + while (cur) { + buf = cur->data; + blen = BUFREMAIN (buf); + if (blen > 0) { + iov[i].iov_base = buf->pos; + iov[i].iov_len = blen; + } + else { + iov[i].iov_base = NULL; + iov[i].iov_len = 0; + } + i ++; + cur = g_list_previous (cur); } + /* Now try to write the whole vector */ + r = writev (fd, iov, len); + if (r == -1 && errno != EAGAIN) { + g_slice_free1 (len * sizeof (struct iovec), iov); + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + d->err_callback (err, d->user_data); + return FALSE; + } + } + else if (r > 0) { + /* Find pos inside buffers */ + cur = d->out_buffers->tail; + i = 0; + while (cur) { + buf = cur->data; + blen = BUFREMAIN (buf); + if (r >= blen) { + tmp = cur; + cur = g_list_previous (cur); + /* Mark this buffer as read */ + g_queue_delete_link (d->out_buffers, tmp); + r -= blen; + } + else { + /* This buffer was not written completely */ + buf->pos += r; + break; + } + i ++; + cur = g_list_previous (cur); + } + g_slice_free1 (len * sizeof (struct iovec), iov); + if (cur != 0) { + /* Wait for other event */ + event_del (d->ev); + event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); + event_add (d->ev, d->tv); + return TRUE; + } + } + else if (r == 0) { + /* Got EOF while we wait for data */ + g_slice_free1 (len * sizeof (struct iovec), iov); + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); + d->err_callback (err, d->user_data); + return FALSE; + } + } + else if (r == -1 && errno == EAGAIN) { + g_slice_free1 (len * sizeof (struct iovec), iov); + debug_ip("partially write data, retry"); + /* Wait for other event */ + event_del (d->ev); + event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); + event_base_set (d->ev_base, d->ev); + event_add (d->ev, d->tv); + return TRUE; + } + } + else if (len == 1) { + /* Single write version */ + buf = d->out_buffers->head->data; r = write (fd, buf->pos, BUFREMAIN (buf)); if (r == -1 && errno != EAGAIN) { if (d->err_callback) { @@ -207,7 +282,7 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) if (BUFREMAIN (buf) != 0) { /* Continue with this buffer */ debug_ip("wrote %z bytes of %z", r, buf->data->len); - continue; + return write_buffers (fd, d, is_delayed); } } else if (r == 0) { @@ -227,13 +302,11 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) event_add (d->ev, d->tv); return TRUE; } - cur = g_list_next (cur); } if (cur == NULL) { /* Disable write event for this time */ - g_list_free (d->out_buffers); - d->out_buffers = NULL; + g_queue_clear (d->out_buffers); debug_ip("all buffers were written successfully"); @@ -472,7 +545,7 @@ dispatcher_cb (gint fd, short what, void *arg) sendfile_callback (d); } else { - if (d->out_buffers == NULL) { + if (g_queue_get_length (d->out_buffers) == 0) { event_del (d->ev); event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); event_base_set (d->ev_base, d->ev); @@ -526,6 +599,7 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event)); new->fd = fd; new->ev_base = base; + new->out_buffers = g_queue_new (); event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new); event_base_set (new->ev_base, new->ev); @@ -540,9 +614,7 @@ rspamd_remove_dispatcher (rspamd_io_dispatcher_t * dispatcher) if (dispatcher != NULL) { event_del (dispatcher->ev); memory_pool_delete (dispatcher->pool); - if (dispatcher->out_buffers) { - g_list_free (dispatcher->out_buffers); - } + g_queue_free (dispatcher->out_buffers); g_free (dispatcher); } } @@ -609,7 +681,7 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, void *data, size_t len, gbo newbuf->pos = newbuf->data->begin; newbuf->data->len = len; - d->out_buffers = g_list_prepend (d->out_buffers, newbuf); + g_queue_push_head (d->out_buffers, newbuf); if (!delayed) { debug_ip("plan write event"); diff --git a/src/buffer.h b/src/buffer.h index 51b321833..96e38d471 100644 --- a/src/buffer.h +++ b/src/buffer.h @@ -33,7 +33,7 @@ typedef struct rspamd_buffer_s { typedef struct rspamd_io_dispatcher_s { rspamd_buffer_t *in_buf; /**< input buffer */ - GList *out_buffers; /**< out buffers chain */ + GQueue *out_buffers; /**< out buffers chain */ struct timeval *tv; /**< io timeout */ struct event *ev; /**< libevent io event */ memory_pool_t *pool; /**< where to store data */ diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index 48068cdf7..19a4b3b3d 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -355,10 +355,10 @@ kvstorage_read_socket (f_str_t * in, void *arg) r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF, elt->key, elt->flags, elt->size); if (!rspamd_dispatcher_write (session->dispather, outbuf, - r, FALSE, FALSE)) { + r, TRUE, FALSE)) { return FALSE; } - if (!rspamd_dispatcher_write (session->dispather, elt->data, elt->size, FALSE, TRUE)) { + if (!rspamd_dispatcher_write (session->dispather, elt->data, elt->size, TRUE, TRUE)) { return FALSE; } return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF, @@ -443,9 +443,9 @@ thr_accept_socket (gint fd, short what, void *arg) g_static_mutex_unlock (thr->accept_mtx); return; } + g_static_mutex_unlock (thr->accept_mtx); /* Check for EAGAIN */ if (nfd == 0) { - g_static_mutex_unlock (thr->accept_mtx); return; } @@ -465,7 +465,6 @@ thr_accept_socket (gint fd, short what, void *arg) memcpy (&session->client_addr, &su.s4.sin_addr, sizeof (struct in_addr)); } - g_static_mutex_unlock (thr->accept_mtx); } /** |