static gboolean
write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
{
- GList *cur;
+ GList *cur = NULL, *tmp;
GError *err = NULL;
rspamd_buffer_t *buf;
ssize_t r;
-
- /* Fix order */
- if (d->out_buffers) {
- d->out_buffers = g_list_reverse (d->out_buffers);
- }
- cur = g_list_first (d->out_buffers);
- while (cur) {
- buf = (rspamd_buffer_t *) cur->data;
- if (BUFREMAIN (buf) == 0) {
- /* Skip empty buffers */
- cur = g_list_next (cur);
- continue;
+ struct iovec *iov;
+ guint i, len, blen;
+
+ len = g_queue_get_length (d->out_buffers);
+ if (len > 1) {
+ /* IOV version */
+ cur = d->out_buffers->tail;
+ iov = g_slice_alloc (len * sizeof (struct iovec));
+ i = 0;
+ while (cur) {
+ buf = cur->data;
+ blen = BUFREMAIN (buf);
+ if (blen > 0) {
+ iov[i].iov_base = buf->pos;
+ iov[i].iov_len = blen;
+ }
+ else {
+ iov[i].iov_base = NULL;
+ iov[i].iov_len = 0;
+ }
+ i ++;
+ cur = g_list_previous (cur);
}
+ /* Now try to write the whole vector */
+ r = writev (fd, iov, len);
+ if (r == -1 && errno != EAGAIN) {
+ g_slice_free1 (len * sizeof (struct iovec), iov);
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ d->err_callback (err, d->user_data);
+ return FALSE;
+ }
+ }
+ else if (r > 0) {
+ /* Find pos inside buffers */
+ cur = d->out_buffers->tail;
+ i = 0;
+ while (cur) {
+ buf = cur->data;
+ blen = BUFREMAIN (buf);
+ if (r >= blen) {
+ tmp = cur;
+ cur = g_list_previous (cur);
+ /* Mark this buffer as read */
+ g_queue_delete_link (d->out_buffers, tmp);
+ r -= blen;
+ }
+ else {
+ /* This buffer was not written completely */
+ buf->pos += r;
+ break;
+ }
+ i ++;
+ cur = g_list_previous (cur);
+ }
+ g_slice_free1 (len * sizeof (struct iovec), iov);
+ if (cur != 0) {
+ /* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
+ event_add (d->ev, d->tv);
+ return TRUE;
+ }
+ }
+ else if (r == 0) {
+ /* Got EOF while we wait for data */
+ g_slice_free1 (len * sizeof (struct iovec), iov);
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
+ d->err_callback (err, d->user_data);
+ return FALSE;
+ }
+ }
+ else if (r == -1 && errno == EAGAIN) {
+ g_slice_free1 (len * sizeof (struct iovec), iov);
+ debug_ip("partially write data, retry");
+ /* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_base_set (d->ev_base, d->ev);
+ event_add (d->ev, d->tv);
+ return TRUE;
+ }
+ }
+ else if (len == 1) {
+ /* Single write version */
+ buf = d->out_buffers->head->data;
r = write (fd, buf->pos, BUFREMAIN (buf));
if (r == -1 && errno != EAGAIN) {
if (d->err_callback) {
if (BUFREMAIN (buf) != 0) {
/* Continue with this buffer */
debug_ip("wrote %z bytes of %z", r, buf->data->len);
- continue;
+ return write_buffers (fd, d, is_delayed);
}
}
else if (r == 0) {
event_add (d->ev, d->tv);
return TRUE;
}
- cur = g_list_next (cur);
}
if (cur == NULL) {
/* Disable write event for this time */
- g_list_free (d->out_buffers);
- d->out_buffers = NULL;
+ g_queue_clear (d->out_buffers);
debug_ip("all buffers were written successfully");
sendfile_callback (d);
}
else {
- if (d->out_buffers == NULL) {
+ if (g_queue_get_length (d->out_buffers) == 0) {
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
event_base_set (d->ev_base, d->ev);
new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event));
new->fd = fd;
new->ev_base = base;
+ new->out_buffers = g_queue_new ();
event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
event_base_set (new->ev_base, new->ev);
if (dispatcher != NULL) {
event_del (dispatcher->ev);
memory_pool_delete (dispatcher->pool);
- if (dispatcher->out_buffers) {
- g_list_free (dispatcher->out_buffers);
- }
+ g_queue_free (dispatcher->out_buffers);
g_free (dispatcher);
}
}
newbuf->pos = newbuf->data->begin;
newbuf->data->len = len;
- d->out_buffers = g_list_prepend (d->out_buffers, newbuf);
+ g_queue_push_head (d->out_buffers, newbuf);
if (!delayed) {
debug_ip("plan write event");
r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
elt->key, elt->flags, elt->size);
if (!rspamd_dispatcher_write (session->dispather, outbuf,
- r, FALSE, FALSE)) {
+ r, TRUE, FALSE)) {
return FALSE;
}
- if (!rspamd_dispatcher_write (session->dispather, elt->data, elt->size, FALSE, TRUE)) {
+ if (!rspamd_dispatcher_write (session->dispather, elt->data, elt->size, TRUE, TRUE)) {
return FALSE;
}
return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
g_static_mutex_unlock (thr->accept_mtx);
return;
}
+ g_static_mutex_unlock (thr->accept_mtx);
/* Check for EAGAIN */
if (nfd == 0) {
- g_static_mutex_unlock (thr->accept_mtx);
return;
}
memcpy (&session->client_addr, &su.s4.sin_addr,
sizeof (struct in_addr));
}
- g_static_mutex_unlock (thr->accept_mtx);
}
/**