aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver/buffer.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2014-07-23 12:57:31 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2014-07-23 12:57:31 +0100
commit379055dbbb4af997b4d3ffb161d447872d7ca357 (patch)
tree3774553d470f93e12ddeb454aad9b3b607cf8918 /src/libserver/buffer.c
parent602ae7a0b7e215ba2677131b8fdc70abc156b3ca (diff)
downloadrspamd-379055dbbb4af997b4d3ffb161d447872d7ca357.tar.gz
rspamd-379055dbbb4af997b4d3ffb161d447872d7ca357.zip
Unify style without sorting headers.
Diffstat (limited to 'src/libserver/buffer.c')
-rw-r--r--src/libserver/buffer.c238
1 files changed, 140 insertions, 98 deletions
diff --git a/src/libserver/buffer.c b/src/libserver/buffer.c
index 864f2fad6..403b3dafd 100644
--- a/src/libserver/buffer.c
+++ b/src/libserver/buffer.c
@@ -29,12 +29,15 @@
#include <sys/sendfile.h>
#endif
-#define G_DISPATCHER_ERROR dispatcher_error_quark()
-#define debug_ip(...) rspamd_conditional_debug(rspamd_main->logger, NULL, __FUNCTION__, __VA_ARGS__)
+#define G_DISPATCHER_ERROR dispatcher_error_quark ()
+#define debug_ip(...) rspamd_conditional_debug (rspamd_main->logger, \
+ NULL, \
+ __FUNCTION__, \
+ __VA_ARGS__)
-static void dispatcher_cb (gint fd, short what, void *arg);
+static void dispatcher_cb (gint fd, short what, void *arg);
-static inline GQuark
+static inline GQuark
dispatcher_error_quark (void)
{
return g_quark_from_static_string ("g-dispatcher-error-quark");
@@ -44,11 +47,11 @@ static gboolean
sendfile_callback (rspamd_io_dispatcher_t *d)
{
- GError *err;
+ GError *err;
#ifdef HAVE_SENDFILE
# if defined(FREEBSD) || defined(DARWIN)
- off_t off = 0;
+ off_t off = 0;
#if defined(FREEBSD)
/* FreeBSD version */
if (sendfile (d->sendfile_fd, d->fd, d->offset, 0, NULL, &off, 0) != 0) {
@@ -58,13 +61,15 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
#endif
if (errno != EAGAIN) {
if (d->err_callback) {
- err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ err =
+ g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (
+ errno));
d->err_callback (err, d->user_data);
return FALSE;
}
}
else {
- debug_ip("partially write data, retry");
+ debug_ip ("partially write data, retry");
/* Wait for other event */
d->offset += off;
event_del (d->ev);
@@ -76,30 +81,33 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
else {
if (d->write_callback) {
if (!d->write_callback (d->user_data)) {
- debug_ip("callback set wanna_die flag, terminating");
+ debug_ip ("callback set wanna_die flag, terminating");
return FALSE;
}
}
event_del (d->ev);
- event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb,
+ (void *)d);
event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
d->in_sendfile = FALSE;
}
# else
- ssize_t r;
+ ssize_t r;
/* Linux version */
r = sendfile (d->fd, d->sendfile_fd, &d->offset, d->file_size);
if (r == -1) {
if (errno != EAGAIN) {
if (d->err_callback) {
- err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ err =
+ g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (
+ errno));
d->err_callback (err, d->user_data);
return FALSE;
}
}
else {
- debug_ip("partially write data, retry");
+ debug_ip ("partially write data, retry");
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
@@ -108,7 +116,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
}
}
else if (r + d->offset < (ssize_t)d->file_size) {
- debug_ip("partially write data, retry");
+ debug_ip ("partially write data, retry");
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
@@ -118,30 +126,33 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
else {
if (d->write_callback) {
if (!d->write_callback (d->user_data)) {
- debug_ip("callback set wanna_die flag, terminating");
+ debug_ip ("callback set wanna_die flag, terminating");
return FALSE;
}
}
event_del (d->ev);
- event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb,
+ (void *)d);
event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
d->in_sendfile = FALSE;
}
# endif
#else
- ssize_t r;
+ ssize_t r;
r = write (d->fd, d->map, d->file_size - d->offset);
if (r == -1) {
if (errno != EAGAIN) {
if (d->err_callback) {
- err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ err =
+ g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (
+ errno));
d->err_callback (err, d->user_data);
return FALSE;
}
}
else {
- debug_ip("partially write data, retry");
+ debug_ip ("partially write data, retry");
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
@@ -151,7 +162,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
}
else if (r + d->offset < d->file_size) {
d->offset += r;
- debug_ip("partially write data, retry");
+ debug_ip ("partially write data, retry");
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
@@ -161,12 +172,13 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
else {
if (d->write_callback) {
if (!d->write_callback (d->user_data)) {
- debug_ip("callback set wanna_die flag, terminating");
+ debug_ip ("callback set wanna_die flag, terminating");
return FALSE;
}
}
event_del (d->ev);
- event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb,
+ (void *)d);
event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
d->in_sendfile = FALSE;
@@ -177,25 +189,25 @@ sendfile_callback (rspamd_io_dispatcher_t *d)
#define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
-#define APPEND_OUT_BUFFER(d, buf) do { \
- DL_APPEND((d)->out_buffers.buffers, buf); \
- (d)->out_buffers.pending ++; \
- } while (0)
-#define DELETE_OUT_BUFFER(d, buf) do { \
- DL_DELETE((d)->out_buffers.buffers, (buf)); \
- g_string_free((buf->data), (buf)->allocated); \
- g_slice_free1(sizeof (struct rspamd_out_buffer_s), (buf)); \
- (d)->out_buffers.pending --; \
- } while (0)
-
-static gboolean
+#define APPEND_OUT_BUFFER(d, buf) do { \
+ DL_APPEND ((d)->out_buffers.buffers, buf); \
+ (d)->out_buffers.pending++; \
+} while (0)
+#define DELETE_OUT_BUFFER(d, buf) do { \
+ DL_DELETE ((d)->out_buffers.buffers, (buf)); \
+ g_string_free ((buf->data), (buf)->allocated); \
+ g_slice_free1 (sizeof (struct rspamd_out_buffer_s), (buf)); \
+ (d)->out_buffers.pending--; \
+} while (0)
+
+static gboolean
write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
{
- GError *err = NULL;
- struct rspamd_out_buffer_s *cur = NULL, *tmp;
- ssize_t r;
- struct iovec *iov;
- guint i, len;
+ GError *err = NULL;
+ struct rspamd_out_buffer_s *cur = NULL, *tmp;
+ ssize_t r;
+ struct iovec *iov;
+ guint i, len;
len = d->out_buffers.pending;
while (len > 0) {
@@ -203,24 +215,28 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
is_delayed = TRUE;
iov = g_slice_alloc (len * sizeof (struct iovec));
i = 0;
- DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
+ DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp)
+ {
iov[i].iov_base = cur->data->str;
iov[i].iov_len = cur->data->len;
- i ++;
+ i++;
}
/* Now try to write the whole vector */
r = writev (fd, iov, len);
if (r == -1 && errno != EAGAIN) {
g_slice_free1 (len * sizeof (struct iovec), iov);
if (d->err_callback) {
- err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ err =
+ g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (
+ errno));
d->err_callback (err, d->user_data);
return FALSE;
}
}
else if (r > 0) {
/* Find pos inside buffers */
- DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
+ DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp)
+ {
if (r >= (ssize_t)cur->data->len) {
/* Mark this buffer as read */
r -= cur->data->len;
@@ -253,7 +269,7 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
}
else if (r == -1 && errno == EAGAIN) {
g_slice_free1 (len * sizeof (struct iovec), iov);
- debug_ip("partially write data, retry");
+ debug_ip ("partially write data, retry");
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
@@ -271,7 +287,7 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
if (is_delayed && d->write_callback) {
if (!d->write_callback (d->user_data)) {
- debug_ip("callback set wanna_die flag, terminating");
+ debug_ip ("callback set wanna_die flag, terminating");
return FALSE;
}
}
@@ -295,13 +311,13 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
static void
read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
{
- ssize_t r;
- GError *err = NULL;
- f_str_t res;
- gchar *c, *b;
- gchar *end;
- size_t len;
- enum io_policy saved_policy;
+ ssize_t r;
+ GError *err = NULL;
+ f_str_t res;
+ gchar *c, *b;
+ gchar *end;
+ size_t len;
+ enum io_policy saved_policy;
if (d->wanna_die) {
rspamd_remove_dispatcher (d);
@@ -309,7 +325,8 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
}
if (d->in_buf == NULL) {
- d->in_buf = rspamd_mempool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t));
+ d->in_buf =
+ rspamd_mempool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t));
if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) {
d->in_buf->data = fstralloc_tmp (d->pool, d->default_buf_size);
}
@@ -335,7 +352,9 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
r = read (fd, end, BUFREMAIN (d->in_buf));
if (r == -1 && errno != EAGAIN) {
if (d->err_callback) {
- err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ err =
+ g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (
+ errno));
d->err_callback (err, d->user_data);
return;
}
@@ -364,7 +383,7 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
}
}
else if (r == -1 && errno == EAGAIN) {
- debug_ip("partially read data, retry");
+ debug_ip ("partially read data, retry");
return;
}
else {
@@ -372,8 +391,12 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
d->in_buf->pos += r;
d->in_buf->data->len += r;
}
- debug_ip("read %z characters, policy is %s, watermark is: %z, buffer has %z bytes", r,
- d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", d->nchars, d->in_buf->data->len);
+ debug_ip (
+ "read %z characters, policy is %s, watermark is: %z, buffer has %z bytes",
+ r,
+ d->policy == BUFFER_LINE ? "LINE" : "CHARACTER",
+ d->nchars,
+ d->in_buf->data->len);
}
saved_policy = d->policy;
@@ -382,16 +405,16 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
len = d->in_buf->data->len;
b = c;
r = 0;
-
+
switch (d->policy) {
case BUFFER_LINE:
/** Variables:
- * b - begin of line
- * r - current position in buffer
- * *len - length of remaining buffer
- * c - pointer to current position (buffer->begin + r)
- * res - result string
- */
+ * b - begin of line
+ * r - current position in buffer
+ * *len - length of remaining buffer
+ * c - pointer to current position (buffer->begin + r)
+ * res - result string
+ */
while (r < (ssize_t)len) {
if (*c == '\n') {
res.begin = b;
@@ -404,7 +427,7 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
}
else {
/* Include EOL in reply */
- res.len ++;
+ res.len++;
}
/* Call callback for a line */
if (d->read_callback) {
@@ -468,7 +491,8 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
d->in_buf->pos = d->in_buf->data->begin;
}
if (d->policy != saved_policy && (ssize_t)len != r) {
- debug_ip("policy changed during callback, restart buffer's processing");
+ debug_ip (
+ "policy changed during callback, restart buffer's processing");
read_buffers (fd, d, TRUE);
return;
}
@@ -484,13 +508,14 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
* Actually we do not want to send zero sized
* buffers to a read callback
*/
- if (! (d->want_read && res.len == 0)) {
+ if (!(d->want_read && res.len == 0)) {
if (!d->read_callback (&res, d->user_data)) {
return;
}
}
if (d->policy != saved_policy) {
- debug_ip("policy changed during callback, restart buffer's processing");
+ debug_ip (
+ "policy changed during callback, restart buffer's processing");
read_buffers (fd, d, TRUE);
return;
}
@@ -506,10 +531,10 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
static void
dispatcher_cb (gint fd, short what, void *arg)
{
- rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg;
- GError *err = NULL;
+ rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg;
+ GError *err = NULL;
- debug_ip("in dispatcher callback, what: %d, fd: %d", (gint)what, fd);
+ debug_ip ("in dispatcher callback, what: %d, fd: %d", (gint)what, fd);
if ((what & EV_TIMEOUT) != 0) {
if (d->err_callback) {
@@ -538,7 +563,8 @@ dispatcher_cb (gint fd, short what, void *arg)
else {
/* Want read again */
event_del (d->ev);
- event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb,
+ (void *)d);
event_base_set (d->ev_base, d->ev);
event_add (d->ev, d->tv);
if (d->is_restored && d->write_callback) {
@@ -558,11 +584,17 @@ dispatcher_cb (gint fd, short what, void *arg)
}
-rspamd_io_dispatcher_t *
-rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy policy,
- dispatcher_read_callback_t read_cb, dispatcher_write_callback_t write_cb, dispatcher_err_callback_t err_cb, struct timeval *tv, void *user_data)
+rspamd_io_dispatcher_t *
+rspamd_create_dispatcher (struct event_base *base,
+ gint fd,
+ enum io_policy policy,
+ dispatcher_read_callback_t read_cb,
+ dispatcher_write_callback_t write_cb,
+ dispatcher_err_callback_t err_cb,
+ struct timeval *tv,
+ void *user_data)
{
- rspamd_io_dispatcher_t *new;
+ rspamd_io_dispatcher_t *new;
if (fd == -1) {
return NULL;
@@ -608,7 +640,8 @@ rspamd_remove_dispatcher (rspamd_io_dispatcher_t * d)
struct rspamd_out_buffer_s *cur, *tmp;
if (d != NULL) {
- DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
+ DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp)
+ {
DELETE_OUT_BUFFER (d, cur);
}
event_del (d->ev);
@@ -618,10 +651,12 @@ rspamd_remove_dispatcher (rspamd_io_dispatcher_t * d)
}
void
-rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, size_t nchars)
+rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d,
+ enum io_policy policy,
+ size_t nchars)
{
- f_str_t *tmp;
- gint t;
+ f_str_t *tmp;
+ gint t;
if (d->policy != policy || nchars != d->nchars) {
d->policy = policy;
@@ -630,7 +665,8 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy,
if (policy == BUFFER_CHARACTER && nchars != 0) {
if (d->in_buf && d->in_buf->data->size < nchars) {
tmp = fstralloc_tmp (d->pool, d->nchars + 1);
- memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
+ memcpy (tmp->begin, d->in_buf->data->begin,
+ d->in_buf->data->len);
t = d->in_buf->pos - d->in_buf->data->begin;
tmp->len = d->in_buf->data->len;
d->in_buf->data = tmp;
@@ -640,7 +676,8 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy,
else if (policy == BUFFER_LINE || policy == BUFFER_ANY) {
if (d->in_buf && d->nchars < d->default_buf_size) {
tmp = fstralloc_tmp (d->pool, d->default_buf_size);
- memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
+ memcpy (tmp->begin, d->in_buf->data->begin,
+ d->in_buf->data->len);
t = d->in_buf->pos - d->in_buf->data->begin;
tmp->len = d->in_buf->data->len;
d->in_buf->data = tmp;
@@ -650,14 +687,14 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy,
}
}
- debug_ip("new input length watermark is %uz", d->nchars);
+ debug_ip ("new input length watermark is %uz", d->nchars);
}
gboolean
rspamd_dispatcher_write (rspamd_io_dispatcher_t * d,
- const void *data, size_t len, gboolean delayed, gboolean allocated)
+ const void *data, size_t len, gboolean delayed, gboolean allocated)
{
- struct rspamd_out_buffer_s *newbuf;
+ struct rspamd_out_buffer_s *newbuf;
newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s));
if (len == 0) {
@@ -680,7 +717,7 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d,
APPEND_OUT_BUFFER (d, newbuf);
if (!delayed) {
- debug_ip("plan write event");
+ debug_ip ("plan write event");
return write_buffers (d->fd, d, FALSE);
}
/* Otherwise plan write event */
@@ -692,12 +729,13 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d,
return TRUE;
}
-gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d,
- GString *str,
- gboolean delayed,
- gboolean free_on_write)
+gboolean
+rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d,
+ GString *str,
+ gboolean delayed,
+ gboolean free_on_write)
{
- struct rspamd_out_buffer_s *newbuf;
+ struct rspamd_out_buffer_s *newbuf;
newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s));
newbuf->data = str;
@@ -706,7 +744,7 @@ gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d,
APPEND_OUT_BUFFER (d, newbuf);
if (!delayed) {
- debug_ip("plan write event");
+ debug_ip ("plan write event");
return write_buffers (d->fd, d, FALSE);
}
/* Otherwise plan write event */
@@ -718,7 +756,7 @@ gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d,
return TRUE;
}
-gboolean
+gboolean
rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len)
{
if (lseek (fd, 0, SEEK_SET) == -1) {
@@ -733,9 +771,12 @@ rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len)
#ifndef HAVE_SENDFILE
#ifdef HAVE_MMAP_NOCORE
- if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd, 0)) == MAP_FAILED) {
+ if ((d->map =
+ mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd,
+ 0)) == MAP_FAILED) {
#else
- if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) {
+ if ((d->map =
+ mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) {
#endif
msg_warn ("mmap failed: %s", strerror (errno));
return FALSE;
@@ -771,7 +812,8 @@ rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *d)
{
struct rspamd_out_buffer_s *cur, *tmp;
- DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) {
+ DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp)
+ {
DELETE_OUT_BUFFER (d, cur);
}
/* Cleanup temporary data */
@@ -781,6 +823,6 @@ rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *d)
#undef debug_ip
-/*
- * vi:ts=4
+/*
+ * vi:ts=4
*/