aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver/buffer.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-10-17 15:28:38 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-10-17 15:28:38 +0100
commit463d6092aabb1b4721833f884d04dbb67570fa0e (patch)
tree05e083c47f4ccdfe77676f2ffd6ea1394e95575e /src/libserver/buffer.c
parent9ecc5ea5e15a44f830aff1e1b1ad92b683bca53a (diff)
downloadrspamd-463d6092aabb1b4721833f884d04dbb67570fa0e.tar.gz
rspamd-463d6092aabb1b4721833f884d04dbb67570fa0e.zip
[Rework] Massive removal of legacy code
Diffstat (limited to 'src/libserver/buffer.c')
-rw-r--r--src/libserver/buffer.c804
1 files changed, 0 insertions, 804 deletions
diff --git a/src/libserver/buffer.c b/src/libserver/buffer.c
deleted file mode 100644
index 03be2e9a4..000000000
--- a/src/libserver/buffer.c
+++ /dev/null
@@ -1,804 +0,0 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "config.h"
-#include "buffer.h"
-#include "rspamd.h"
-#include "unix-std.h"
-#ifdef HAVE_SYS_SENDFILE_H
-#include <sys/sendfile.h>
-#endif
-#include "utlist.h"
-
-#define G_DISPATCHER_ERROR dispatcher_error_quark ()
-#define debug_ip(...) rspamd_conditional_debug (NULL, \
- NULL, \
- d->pool->tag.tagname, d->pool->tag.uid, \
- G_STRFUNC, \
- __VA_ARGS__)
-
-static void dispatcher_cb (gint fd, short what, void *arg);
-
-static inline GQuark
-dispatcher_error_quark (void)
-{
- return g_quark_from_static_string ("g-dispatcher-error-quark");
-}
-
-static gboolean
-sendfile_callback (rspamd_io_dispatcher_t *d)
-{
-
- GError *err;
-
-#ifdef HAVE_SENDFILE
-# if defined(FREEBSD) || defined(DARWIN)
- off_t off = 0;
- #if defined(FREEBSD)
- /* FreeBSD version */
- if (sendfile (d->sendfile_fd, d->fd, d->offset, 0, NULL, &off, 0) != 0) {
- #elif defined(DARWIN)
- /* Darwin version */
- if (sendfile (d->sendfile_fd, d->fd, d->offset, &off, NULL, 0) != 0) {
- #endif
- if (errno != EAGAIN) {
- if (d->err_callback) {
- err =
- g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (
- errno));
- d->err_callback (err, d->user_data);
- return FALSE;
- }
- }
- else {
- debug_ip ("partially write data, retry");
- /* Wait for other event */
- d->offset += off;
- event_del (d->ev);
- event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- }
- }
- else {
- if (d->write_callback) {
- if (!d->write_callback (d->user_data)) {
- debug_ip ("callback set wanna_die flag, terminating");
- return FALSE;
- }
- }
- event_del (d->ev);
- event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb,
- (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- d->in_sendfile = FALSE;
- }
-# else
- ssize_t r;
- /* Linux version */
- r = sendfile (d->fd, d->sendfile_fd, &d->offset, d->file_size);
- if (r == -1) {
- if (errno != EAGAIN) {
- if (d->err_callback) {
- err =
- g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (
- errno));
- d->err_callback (err, d->user_data);
- return FALSE;
- }
- }
- else {
- debug_ip ("partially write data, retry");
- /* Wait for other event */
- event_del (d->ev);
- event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- }
- }
- else if (r + d->offset < (ssize_t)d->file_size) {
- debug_ip ("partially write data, retry");
- /* Wait for other event */
- event_del (d->ev);
- event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- }
- else {
- if (d->write_callback) {
- if (!d->write_callback (d->user_data)) {
- debug_ip ("callback set wanna_die flag, terminating");
- return FALSE;
- }
- }
- event_del (d->ev);
- event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb,
- (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- d->in_sendfile = FALSE;
- }
-# endif
-#else
- ssize_t r;
- r = write (d->fd, d->map, d->file_size - d->offset);
- if (r == -1) {
- if (errno != EAGAIN) {
- if (d->err_callback) {
- err =
- g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (
- errno));
- d->err_callback (err, d->user_data);
- return FALSE;
- }
- }
- else {
- debug_ip ("partially write data, retry");
- /* Wait for other event */
- event_del (d->ev);
- event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- }
- }
- else if (r + d->offset < d->file_size) {
- d->offset += r;
- debug_ip ("partially write data, retry");
- /* Wait for other event */
- event_del (d->ev);
- event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- }
- else {
- if (d->write_callback) {
- if (!d->write_callback (d->user_data)) {
- debug_ip ("callback set wanna_die flag, terminating");
- return FALSE;
- }
- }
- event_del (d->ev);
- event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb,
- (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- d->in_sendfile = FALSE;
- }
-#endif
- return TRUE;
-}
-
-#define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
-
-#define APPEND_OUT_BUFFER(d, buf) do { \
- DL_APPEND ((d)->out_buffers.buffers, buf); \
- (d)->out_buffers.pending++; \
-} while (0)
-#define DELETE_OUT_BUFFER(d, buf) do { \
- DL_DELETE ((d)->out_buffers.buffers, (buf)); \
- g_string_free ((buf->data), (buf)->allocated); \
- g_slice_free1 (sizeof (struct rspamd_out_buffer_s), (buf)); \
- (d)->out_buffers.pending--; \
-} while (0)
-
-static gboolean
-write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
-{
- GError *err = NULL;
- struct rspamd_out_buffer_s *cur = NULL, *tmp;
- ssize_t r;
- struct iovec *iov;
- guint i, len;
-
- len = d->out_buffers.pending;
- while (len > 0) {
- /* Unset delayed as actually we HAVE buffers to write */
- is_delayed = TRUE;
- iov = g_slice_alloc (len * sizeof (struct iovec));
- i = 0;
- DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp)
- {
- iov[i].iov_base = cur->data->str;
- iov[i].iov_len = cur->data->len;
- i++;
- }
- /* Now try to write the whole vector */
- r = writev (fd, iov, len);
- if (r == -1 && errno != EAGAIN) {
- g_slice_free1 (len * sizeof (struct iovec), iov);
- if (d->err_callback) {
- err =
- g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (
- errno));
- d->err_callback (err, d->user_data);
- return FALSE;
- }
- }
- else if (r > 0) {
- /* Find pos inside buffers */
- DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp)
- {
- if (r >= (ssize_t)cur->data->len) {
- /* Mark this buffer as read */
- r -= cur->data->len;
- DELETE_OUT_BUFFER (d, cur);
- }
- else {
- /* This buffer was not written completely */
- g_string_erase (cur->data, 0, r);
- break;
- }
- }
- g_slice_free1 (len * sizeof (struct iovec), iov);
- if (d->out_buffers.pending > 0) {
- /* Wait for other event */
- event_del (d->ev);
- event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- return TRUE;
- }
- }
- else if (r == 0) {
- /* Got EOF while we wait for data */
- g_slice_free1 (len * sizeof (struct iovec), iov);
- if (d->err_callback) {
- err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
- d->err_callback (err, d->user_data);
- return FALSE;
- }
- }
- else if (r == -1 && errno == EAGAIN) {
- g_slice_free1 (len * sizeof (struct iovec), iov);
- debug_ip ("partially write data, retry");
- /* Wait for other event */
- event_del (d->ev);
- event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- return TRUE;
- }
- len = d->out_buffers.pending;
- }
-
- if (d->out_buffers.pending == 0) {
- /* Disable write event for this time */
-
- debug_ip ("all buffers were written successfully");
-
- if (is_delayed && d->write_callback) {
- if (!d->write_callback (d->user_data)) {
- debug_ip ("callback set wanna_die flag, terminating");
- return FALSE;
- }
- }
-
- event_del (d->ev);
- event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- }
- else {
- /* Plan other write event */
- event_del (d->ev);
- event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- }
-
- return TRUE;
-}
-
-static struct rspamd_buffer_buf *
-allocate_buffer (rspamd_mempool_t *pool, gsize size)
-{
- struct rspamd_buffer_buf *b;
-
- b = rspamd_mempool_alloc_tmp (pool, sizeof (*b));
- b->begin = rspamd_mempool_alloc_tmp (pool, size);
- b->size = size;
- b->len = 0;
-
- return b;
-}
-
-static void
-read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
-{
- ssize_t r;
- GError *err = NULL;
- rspamd_ftok_t res;
- guchar *c, *b;
- guchar *end;
- size_t len;
- enum io_policy saved_policy;
-
- if (d->wanna_die) {
- rspamd_remove_dispatcher (d);
- return;
- }
-
- if (d->in_buf == NULL) {
- d->in_buf =
- rspamd_mempool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t));
- if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) {
- d->in_buf->data = allocate_buffer (d->pool, d->default_buf_size);
- }
- else {
- d->in_buf->data = allocate_buffer (d->pool, d->nchars + 1);
- }
- d->in_buf->pos = d->in_buf->data->begin;
- }
-
- end = d->in_buf->pos;
- len = d->in_buf->data->len;
-
- if (BUFREMAIN (d->in_buf) == 0) {
- /* Buffer is full, try to call callback with overflow error */
- if (d->err_callback) {
- err = g_error_new (G_DISPATCHER_ERROR, E2BIG, "buffer overflow");
- d->err_callback (err, d->user_data);
- return;
- }
- }
- else if (!skip_read) {
- /* Try to read the whole buffer */
- r = read (fd, end, BUFREMAIN (d->in_buf));
- if (r == -1 && errno != EAGAIN) {
- if (d->err_callback) {
- err =
- g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (
- errno));
- d->err_callback (err, d->user_data);
- return;
- }
- }
- else if (r == 0) {
- /* Got EOF while we wait for data */
-#if 0
- if (d->err_callback) {
- err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
- d->err_callback (err, d->user_data);
- return;
- }
-#endif
- /* Read returned 0, it may be shutdown or full quit */
- if (!d->want_read) {
- d->half_closed = TRUE;
- /* Do not expect any read after this */
- event_del (d->ev);
- }
- else {
- if (d->err_callback) {
- err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
- d->err_callback (err, d->user_data);
- return;
- }
- }
- }
- else if (r == -1 && errno == EAGAIN) {
- debug_ip ("partially read data, retry");
- return;
- }
- else {
- /* Set current position in buffer */
- d->in_buf->pos += r;
- d->in_buf->data->len += r;
- }
- debug_ip (
- "read %z characters, policy is %s, watermark is: %z, buffer has %z bytes",
- r,
- d->policy == BUFFER_LINE ? "LINE" : "CHARACTER",
- d->nchars,
- d->in_buf->data->len);
- }
-
- saved_policy = d->policy;
- c = d->in_buf->data->begin;
- end = d->in_buf->pos;
- len = d->in_buf->data->len;
- b = c;
- r = 0;
-
- switch (d->policy) {
- case BUFFER_LINE:
- /** Variables:
- * b - begin of line
- * r - current position in buffer
- * *len - length of remaining buffer
- * c - pointer to current position (buffer->begin + r)
- * res - result string
- */
- while (r < (ssize_t)len) {
- if (*c == '\n') {
- res.begin = b;
- res.len = c - b;
- /* Strip EOL */
- if (d->strip_eol) {
- if (r != 0 && *(c - 1) == '\r') {
- res.len--;
- }
- }
- else {
- /* Include EOL in reply */
- res.len++;
- }
- /* Call callback for a line */
- if (d->read_callback) {
- if (!d->read_callback (&res, d->user_data)) {
- return;
- }
- if (d->policy != saved_policy) {
- /* Drain buffer as policy is changed */
- /* Note that d->in_buffer is other pointer now, so we need to reinit all pointers */
- /* First detect how much symbols do we have */
- if (end == c) {
- /* In fact we read the whole buffer and change input policy, so just set current pos to begin of buffer */
- d->in_buf->pos = d->in_buf->data->begin;
- d->in_buf->data->len = 0;
- }
- else {
- /* Otherwise we need to move buffer */
- /* Reinit pointers */
- len = d->in_buf->data->len - r - 1;
- end = d->in_buf->data->begin + r + 1;
- memmove (d->in_buf->data->begin, end, len);
- d->in_buf->data->len = len;
- d->in_buf->pos = d->in_buf->data->begin + len;
- /* Process remaining buffer */
- read_buffers (fd, d, TRUE);
- }
- return;
- }
- }
- /* Set new begin of line */
- b = c + 1;
- }
- r++;
- c++;
- }
- /* Now drain remaining characters in buffer */
- memmove (d->in_buf->data->begin, b, c - b);
- d->in_buf->data->len = c - b;
- d->in_buf->pos = d->in_buf->data->begin + (c - b);
- break;
- case BUFFER_CHARACTER:
- r = d->nchars;
- if ((ssize_t)len >= r) {
- res.begin = b;
- res.len = r;
- c = b + r;
- if (d->read_callback) {
- if (!d->read_callback (&res, d->user_data)) {
- return;
- }
- /* Move remaining string to begin of buffer (draining) */
- if ((ssize_t)len > r) {
- len -= r;
- memmove (d->in_buf->data->begin, c, len);
- d->in_buf->data->len = len;
- d->in_buf->pos = d->in_buf->data->begin + len;
- b = d->in_buf->data->begin;
- }
- else {
- d->in_buf->data->len = 0;
- d->in_buf->pos = d->in_buf->data->begin;
- }
- if (d->policy != saved_policy && (ssize_t)len != r) {
- debug_ip (
- "policy changed during callback, restart buffer's processing");
- read_buffers (fd, d, TRUE);
- return;
- }
- }
- }
- break;
- case BUFFER_ANY:
- res.begin = d->in_buf->data->begin;
- res.len = len;
-
- if (d->read_callback) {
- /*
- * Actually we do not want to send zero sized
- * buffers to a read callback
- */
- if (!(d->want_read && res.len == 0)) {
- if (!d->read_callback (&res, d->user_data)) {
- return;
- }
- }
- if (d->policy != saved_policy) {
- debug_ip (
- "policy changed during callback, restart buffer's processing");
- read_buffers (fd, d, TRUE);
- return;
- }
- }
- d->in_buf->pos = d->in_buf->data->begin;
- d->in_buf->data->len = 0;
- break;
- }
-}
-
-#undef BUFREMAIN
-
-static void
-dispatcher_cb (gint fd, short what, void *arg)
-{
- rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg;
- GError *err = NULL;
-
- debug_ip ("in dispatcher callback, what: %d, fd: %d", (gint)what, fd);
-
- if ((what & EV_TIMEOUT) != 0) {
- if (d->err_callback) {
- err = g_error_new (G_DISPATCHER_ERROR, ETIMEDOUT, "IO timeout");
- d->err_callback (err, d->user_data);
- }
- }
- else if ((what & EV_READ) != 0) {
- read_buffers (fd, d, FALSE);
- }
- else if ((what & EV_WRITE) != 0) {
- /* No data to write, disable further EV_WRITE to this fd */
- if (d->in_sendfile) {
- sendfile_callback (d);
- }
- else {
- if (d->out_buffers.pending == 0) {
- if (d->half_closed && !d->is_restored) {
- /* Socket is half closed and there is nothing more to write, closing connection */
- if (d->err_callback) {
- err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
- d->err_callback (err, d->user_data);
- return;
- }
- }
- else {
- /* Want read again */
- event_del (d->ev);
- event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb,
- (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- if (d->is_restored && d->write_callback) {
- if (!d->write_callback (d->user_data)) {
- return;
- }
- d->is_restored = FALSE;
- }
- }
- }
- else {
- /* Delayed write */
- write_buffers (fd, d, TRUE);
- }
- }
- }
-}
-
-
-rspamd_io_dispatcher_t *
-rspamd_create_dispatcher (struct event_base *base,
- gint fd,
- enum io_policy policy,
- dispatcher_read_callback_t read_cb,
- dispatcher_write_callback_t write_cb,
- dispatcher_err_callback_t err_cb,
- struct timeval *tv,
- void *user_data)
-{
- rspamd_io_dispatcher_t *new;
-
- if (fd == -1) {
- return NULL;
- }
-
- new = g_slice_alloc0 (sizeof (rspamd_io_dispatcher_t));
-
- new->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), NULL);
- if (tv != NULL) {
- new->tv = rspamd_mempool_alloc (new->pool, sizeof (struct timeval));
- memcpy (new->tv, tv, sizeof (struct timeval));
- }
- else {
- new->tv = NULL;
- }
- new->nchars = 0;
- new->in_sendfile = FALSE;
- new->policy = policy;
- new->read_callback = read_cb;
- new->write_callback = write_cb;
- new->err_callback = err_cb;
- new->user_data = user_data;
- new->strip_eol = TRUE;
- new->half_closed = FALSE;
- new->want_read = TRUE;
- new->is_restored = FALSE;
- new->default_buf_size = sysconf (_SC_PAGESIZE);
-
- new->ev = rspamd_mempool_alloc0 (new->pool, sizeof (struct event));
- new->fd = fd;
- new->ev_base = base;
-
- event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
- event_base_set (new->ev_base, new->ev);
- event_add (new->ev, new->tv);
-
- return new;
-}
-
-void
-rspamd_remove_dispatcher (rspamd_io_dispatcher_t * d)
-{
- struct rspamd_out_buffer_s *cur, *tmp;
-
- if (d != NULL) {
- DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp)
- {
- DELETE_OUT_BUFFER (d, cur);
- }
- event_del (d->ev);
- rspamd_mempool_delete (d->pool);
- g_slice_free1 (sizeof (rspamd_io_dispatcher_t), d);
- }
-}
-
-void
-rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d,
- enum io_policy policy,
- size_t nchars)
-{
- struct rspamd_buffer_buf *tmp;
- gint t;
-
- if (d->policy != policy || nchars != d->nchars) {
- d->policy = policy;
- d->nchars = nchars ? nchars : d->default_buf_size;
- /* Resize input buffer if needed */
- if (policy == BUFFER_CHARACTER && nchars != 0) {
- if (d->in_buf && d->in_buf->data->size < nchars) {
- tmp = allocate_buffer (d->pool, d->nchars + 1);
- memcpy (tmp->begin, d->in_buf->data->begin,
- d->in_buf->data->len);
- t = d->in_buf->pos - d->in_buf->data->begin;
- tmp->len = d->in_buf->data->len;
- d->in_buf->data = tmp;
- d->in_buf->pos = d->in_buf->data->begin + t;
- }
- }
- else if (policy == BUFFER_LINE || policy == BUFFER_ANY) {
- if (d->in_buf && d->nchars < d->default_buf_size) {
- tmp = allocate_buffer (d->pool, d->default_buf_size);
- memcpy (tmp->begin, d->in_buf->data->begin,
- d->in_buf->data->len);
- t = d->in_buf->pos - d->in_buf->data->begin;
- tmp->len = d->in_buf->data->len;
- d->in_buf->data = tmp;
- d->in_buf->pos = d->in_buf->data->begin + t;
- }
- d->strip_eol = TRUE;
- }
- }
-
- debug_ip ("new input length watermark is %uz", d->nchars);
-}
-
-gboolean
-rspamd_dispatcher_write (rspamd_io_dispatcher_t * d,
- const void *data, size_t len, gboolean delayed, gboolean allocated)
-{
- struct rspamd_out_buffer_s *newbuf;
-
- newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s));
- if (len == 0) {
- /* Assume NULL terminated */
- len = strlen ((const gchar *)data);
- }
-
- if (!allocated) {
- newbuf->data = g_string_new_len (data, len);
- newbuf->allocated = TRUE;
- }
- else {
- newbuf->data = g_string_new (NULL);
- newbuf->data->str = (gchar *)data;
- newbuf->data->len = len;
- newbuf->data->allocated_len = len;
- newbuf->allocated = FALSE;
- }
-
- APPEND_OUT_BUFFER (d, newbuf);
-
- if (!delayed) {
- debug_ip ("plan write event");
- return write_buffers (d->fd, d, FALSE);
- }
- /* Otherwise plan write event */
- event_del (d->ev);
- event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
-
- return TRUE;
-}
-
-gboolean
-rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len)
-{
- if (lseek (fd, 0, SEEK_SET) == -1) {
- msg_warn ("lseek failed: %s", strerror (errno));
- return FALSE;
- }
-
- d->offset = 0;
- d->in_sendfile = TRUE;
- d->sendfile_fd = fd;
- d->file_size = len;
-
-#ifndef HAVE_SENDFILE
- #ifdef HAVE_MMAP_NOCORE
- if ((d->map =
- mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd,
- 0)) == MAP_FAILED) {
- #else
- if ((d->map =
- mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) {
- #endif
- msg_warn ("mmap failed: %s", strerror (errno));
- return FALSE;
- }
-#endif
-
- return sendfile_callback (d);
-}
-
-void
-rspamd_dispatcher_pause (rspamd_io_dispatcher_t * d)
-{
- debug_ip ("paused dispatcher");
- event_del (d->ev);
- d->is_restored = FALSE;
-}
-
-void
-rspamd_dispatcher_restore (rspamd_io_dispatcher_t * d)
-{
- if (!d->is_restored) {
- debug_ip ("restored dispatcher");
- event_del (d->ev);
- event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, d);
- event_base_set (d->ev_base, d->ev);
- event_add (d->ev, d->tv);
- d->is_restored = TRUE;
- }
-}
-
-void
-rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *d)
-{
- struct rspamd_out_buffer_s *cur, *tmp;
-
- DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp)
- {
- DELETE_OUT_BUFFER (d, cur);
- }
- /* Cleanup temporary data */
- rspamd_mempool_cleanup_tmp (d->pool);
- d->in_buf = NULL;
-}
-
-#undef debug_ip