diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-07-23 12:53:08 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-07-23 12:53:08 +0100 |
commit | fe79d8c5a39f2b717f78cc3f3ef21b3cfc46500b (patch) | |
tree | c84e6a5d4c5cd78a7a2cc3c7adbc7af5d0541682 /src/libserver/buffer.c | |
parent | e0483657ff6cf1adc828ccce457814d61fe90a0d (diff) | |
download | rspamd-fe79d8c5a39f2b717f78cc3f3ef21b3cfc46500b.tar.gz rspamd-fe79d8c5a39f2b717f78cc3f3ef21b3cfc46500b.zip |
Revert "Unify code style."
This reverts commit e0483657ff6cf1adc828ccce457814d61fe90a0d.
Diffstat (limited to 'src/libserver/buffer.c')
-rw-r--r-- | src/libserver/buffer.c | 240 |
1 files changed, 99 insertions, 141 deletions
diff --git a/src/libserver/buffer.c b/src/libserver/buffer.c index 4f684caed..864f2fad6 100644 --- a/src/libserver/buffer.c +++ b/src/libserver/buffer.c @@ -22,22 +22,19 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#include "buffer.h" #include "config.h" +#include "buffer.h" #include "main.h" #ifdef HAVE_SYS_SENDFILE_H #include <sys/sendfile.h> #endif -#define G_DISPATCHER_ERROR dispatcher_error_quark () -#define debug_ip(...) rspamd_conditional_debug (rspamd_main->logger, \ - NULL, \ - __FUNCTION__, \ - __VA_ARGS__) +#define G_DISPATCHER_ERROR dispatcher_error_quark() +#define debug_ip(...) rspamd_conditional_debug(rspamd_main->logger, NULL, __FUNCTION__, __VA_ARGS__) -static void dispatcher_cb (gint fd, short what, void *arg); +static void dispatcher_cb (gint fd, short what, void *arg); -static inline GQuark +static inline GQuark dispatcher_error_quark (void) { return g_quark_from_static_string ("g-dispatcher-error-quark"); @@ -47,11 +44,11 @@ static gboolean sendfile_callback (rspamd_io_dispatcher_t *d) { - GError *err; + GError *err; #ifdef HAVE_SENDFILE # if defined(FREEBSD) || defined(DARWIN) - off_t off = 0; + off_t off = 0; #if defined(FREEBSD) /* FreeBSD version */ if (sendfile (d->sendfile_fd, d->fd, d->offset, 0, NULL, &off, 0) != 0) { @@ -61,15 +58,13 @@ sendfile_callback (rspamd_io_dispatcher_t *d) #endif if (errno != EAGAIN) { if (d->err_callback) { - err = - g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( - errno)); + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); d->err_callback (err, d->user_data); return FALSE; } } else { - debug_ip ("partially write data, retry"); + debug_ip("partially write data, retry"); /* Wait for other event */ d->offset += off; event_del (d->ev); @@ -81,33 +76,30 @@ sendfile_callback (rspamd_io_dispatcher_t *d) else { if (d->write_callback) { if (!d->write_callback (d->user_data)) { - debug_ip ("callback set wanna_die flag, terminating"); + debug_ip("callback set wanna_die flag, terminating"); return FALSE; } } event_del (d->ev); - event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, - (void *)d); + event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); d->in_sendfile = FALSE; } # else - ssize_t r; + ssize_t r; /* Linux version */ r = sendfile (d->fd, d->sendfile_fd, &d->offset, d->file_size); if (r == -1) { if (errno != EAGAIN) { if (d->err_callback) { - err = - g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( - errno)); + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); d->err_callback (err, d->user_data); return FALSE; } } else { - debug_ip ("partially write data, retry"); + debug_ip("partially write data, retry"); /* Wait for other event */ event_del (d->ev); event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); @@ -116,7 +108,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d) } } else if (r + d->offset < (ssize_t)d->file_size) { - debug_ip ("partially write data, retry"); + debug_ip("partially write data, retry"); /* Wait for other event */ event_del (d->ev); event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); @@ -126,33 +118,30 @@ sendfile_callback (rspamd_io_dispatcher_t *d) else { if (d->write_callback) { if (!d->write_callback (d->user_data)) { - debug_ip ("callback set wanna_die flag, terminating"); + debug_ip("callback set wanna_die flag, terminating"); return FALSE; } } event_del (d->ev); - event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, - (void *)d); + event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); d->in_sendfile = FALSE; } # endif #else - ssize_t r; + ssize_t r; r = write (d->fd, d->map, d->file_size - d->offset); if (r == -1) { if (errno != EAGAIN) { if (d->err_callback) { - err = - g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( - errno)); + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); d->err_callback (err, d->user_data); return FALSE; } } else { - debug_ip ("partially write data, retry"); + debug_ip("partially write data, retry"); /* Wait for other event */ event_del (d->ev); event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); @@ -162,7 +151,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d) } else if (r + d->offset < d->file_size) { d->offset += r; - debug_ip ("partially write data, retry"); + debug_ip("partially write data, retry"); /* Wait for other event */ event_del (d->ev); event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); @@ -172,13 +161,12 @@ sendfile_callback (rspamd_io_dispatcher_t *d) else { if (d->write_callback) { if (!d->write_callback (d->user_data)) { - debug_ip ("callback set wanna_die flag, terminating"); + debug_ip("callback set wanna_die flag, terminating"); return FALSE; } } event_del (d->ev); - event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, - (void *)d); + event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); d->in_sendfile = FALSE; @@ -189,25 +177,25 @@ sendfile_callback (rspamd_io_dispatcher_t *d) #define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin) -#define APPEND_OUT_BUFFER(d, buf) do { \ - DL_APPEND ((d)->out_buffers.buffers, buf); \ - (d)->out_buffers.pending++; \ -} while (0) -#define DELETE_OUT_BUFFER(d, buf) do { \ - DL_DELETE ((d)->out_buffers.buffers, (buf)); \ - g_string_free ((buf->data), (buf)->allocated); \ - g_slice_free1 (sizeof (struct rspamd_out_buffer_s), (buf)); \ - (d)->out_buffers.pending--; \ -} while (0) - -static gboolean +#define APPEND_OUT_BUFFER(d, buf) do { \ + DL_APPEND((d)->out_buffers.buffers, buf); \ + (d)->out_buffers.pending ++; \ + } while (0) +#define DELETE_OUT_BUFFER(d, buf) do { \ + DL_DELETE((d)->out_buffers.buffers, (buf)); \ + g_string_free((buf->data), (buf)->allocated); \ + g_slice_free1(sizeof (struct rspamd_out_buffer_s), (buf)); \ + (d)->out_buffers.pending --; \ + } while (0) + +static gboolean write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) { - GError *err = NULL; - struct rspamd_out_buffer_s *cur = NULL, *tmp; - ssize_t r; - struct iovec *iov; - guint i, len; + GError *err = NULL; + struct rspamd_out_buffer_s *cur = NULL, *tmp; + ssize_t r; + struct iovec *iov; + guint i, len; len = d->out_buffers.pending; while (len > 0) { @@ -215,28 +203,24 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) is_delayed = TRUE; iov = g_slice_alloc (len * sizeof (struct iovec)); i = 0; - DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) - { + DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) { iov[i].iov_base = cur->data->str; iov[i].iov_len = cur->data->len; - i++; + i ++; } /* Now try to write the whole vector */ r = writev (fd, iov, len); if (r == -1 && errno != EAGAIN) { g_slice_free1 (len * sizeof (struct iovec), iov); if (d->err_callback) { - err = - g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( - errno)); + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); d->err_callback (err, d->user_data); return FALSE; } } else if (r > 0) { /* Find pos inside buffers */ - DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) - { + DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) { if (r >= (ssize_t)cur->data->len) { /* Mark this buffer as read */ r -= cur->data->len; @@ -269,7 +253,7 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) } else if (r == -1 && errno == EAGAIN) { g_slice_free1 (len * sizeof (struct iovec), iov); - debug_ip ("partially write data, retry"); + debug_ip("partially write data, retry"); /* Wait for other event */ event_del (d->ev); event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); @@ -287,7 +271,7 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) if (is_delayed && d->write_callback) { if (!d->write_callback (d->user_data)) { - debug_ip ("callback set wanna_die flag, terminating"); + debug_ip("callback set wanna_die flag, terminating"); return FALSE; } } @@ -311,13 +295,13 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) static void read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) { - ssize_t r; - GError *err = NULL; - f_str_t res; - gchar *c, *b; - gchar *end; - size_t len; - enum io_policy saved_policy; + ssize_t r; + GError *err = NULL; + f_str_t res; + gchar *c, *b; + gchar *end; + size_t len; + enum io_policy saved_policy; if (d->wanna_die) { rspamd_remove_dispatcher (d); @@ -325,8 +309,7 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) } if (d->in_buf == NULL) { - d->in_buf = - rspamd_mempool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t)); + d->in_buf = rspamd_mempool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t)); if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) { d->in_buf->data = fstralloc_tmp (d->pool, d->default_buf_size); } @@ -352,9 +335,7 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) r = read (fd, end, BUFREMAIN (d->in_buf)); if (r == -1 && errno != EAGAIN) { if (d->err_callback) { - err = - g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( - errno)); + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); d->err_callback (err, d->user_data); return; } @@ -383,7 +364,7 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) } } else if (r == -1 && errno == EAGAIN) { - debug_ip ("partially read data, retry"); + debug_ip("partially read data, retry"); return; } else { @@ -391,12 +372,8 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) d->in_buf->pos += r; d->in_buf->data->len += r; } - debug_ip ( - "read %z characters, policy is %s, watermark is: %z, buffer has %z bytes", - r, - d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", - d->nchars, - d->in_buf->data->len); + debug_ip("read %z characters, policy is %s, watermark is: %z, buffer has %z bytes", r, + d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", d->nchars, d->in_buf->data->len); } saved_policy = d->policy; @@ -405,16 +382,16 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) len = d->in_buf->data->len; b = c; r = 0; - + switch (d->policy) { case BUFFER_LINE: /** Variables: - * b - begin of line - * r - current position in buffer - * *len - length of remaining buffer - * c - pointer to current position (buffer->begin + r) - * res - result string - */ + * b - begin of line + * r - current position in buffer + * *len - length of remaining buffer + * c - pointer to current position (buffer->begin + r) + * res - result string + */ while (r < (ssize_t)len) { if (*c == '\n') { res.begin = b; @@ -427,7 +404,7 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) } else { /* Include EOL in reply */ - res.len++; + res.len ++; } /* Call callback for a line */ if (d->read_callback) { @@ -491,8 +468,7 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) d->in_buf->pos = d->in_buf->data->begin; } if (d->policy != saved_policy && (ssize_t)len != r) { - debug_ip ( - "policy changed during callback, restart buffer's processing"); + debug_ip("policy changed during callback, restart buffer's processing"); read_buffers (fd, d, TRUE); return; } @@ -508,14 +484,13 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) * Actually we do not want to send zero sized * buffers to a read callback */ - if (!(d->want_read && res.len == 0)) { + if (! (d->want_read && res.len == 0)) { if (!d->read_callback (&res, d->user_data)) { return; } } if (d->policy != saved_policy) { - debug_ip ( - "policy changed during callback, restart buffer's processing"); + debug_ip("policy changed during callback, restart buffer's processing"); read_buffers (fd, d, TRUE); return; } @@ -531,10 +506,10 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) static void dispatcher_cb (gint fd, short what, void *arg) { - rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg; - GError *err = NULL; + rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg; + GError *err = NULL; - debug_ip ("in dispatcher callback, what: %d, fd: %d", (gint)what, fd); + debug_ip("in dispatcher callback, what: %d, fd: %d", (gint)what, fd); if ((what & EV_TIMEOUT) != 0) { if (d->err_callback) { @@ -563,8 +538,7 @@ dispatcher_cb (gint fd, short what, void *arg) else { /* Want read again */ event_del (d->ev); - event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, - (void *)d); + event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); if (d->is_restored && d->write_callback) { @@ -584,17 +558,11 @@ dispatcher_cb (gint fd, short what, void *arg) } -rspamd_io_dispatcher_t * -rspamd_create_dispatcher (struct event_base *base, - gint fd, - enum io_policy policy, - dispatcher_read_callback_t read_cb, - dispatcher_write_callback_t write_cb, - dispatcher_err_callback_t err_cb, - struct timeval *tv, - void *user_data) +rspamd_io_dispatcher_t * +rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy policy, + dispatcher_read_callback_t read_cb, dispatcher_write_callback_t write_cb, dispatcher_err_callback_t err_cb, struct timeval *tv, void *user_data) { - rspamd_io_dispatcher_t *new; + rspamd_io_dispatcher_t *new; if (fd == -1) { return NULL; @@ -640,8 +608,7 @@ rspamd_remove_dispatcher (rspamd_io_dispatcher_t * d) struct rspamd_out_buffer_s *cur, *tmp; if (d != NULL) { - DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) - { + DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) { DELETE_OUT_BUFFER (d, cur); } event_del (d->ev); @@ -651,12 +618,10 @@ rspamd_remove_dispatcher (rspamd_io_dispatcher_t * d) } void -rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, - enum io_policy policy, - size_t nchars) +rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, size_t nchars) { - f_str_t *tmp; - gint t; + f_str_t *tmp; + gint t; if (d->policy != policy || nchars != d->nchars) { d->policy = policy; @@ -665,8 +630,7 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, if (policy == BUFFER_CHARACTER && nchars != 0) { if (d->in_buf && d->in_buf->data->size < nchars) { tmp = fstralloc_tmp (d->pool, d->nchars + 1); - memcpy (tmp->begin, d->in_buf->data->begin, - d->in_buf->data->len); + memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); t = d->in_buf->pos - d->in_buf->data->begin; tmp->len = d->in_buf->data->len; d->in_buf->data = tmp; @@ -676,8 +640,7 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, else if (policy == BUFFER_LINE || policy == BUFFER_ANY) { if (d->in_buf && d->nchars < d->default_buf_size) { tmp = fstralloc_tmp (d->pool, d->default_buf_size); - memcpy (tmp->begin, d->in_buf->data->begin, - d->in_buf->data->len); + memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); t = d->in_buf->pos - d->in_buf->data->begin; tmp->len = d->in_buf->data->len; d->in_buf->data = tmp; @@ -687,14 +650,14 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, } } - debug_ip ("new input length watermark is %uz", d->nchars); + debug_ip("new input length watermark is %uz", d->nchars); } gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, - const void *data, size_t len, gboolean delayed, gboolean allocated) + const void *data, size_t len, gboolean delayed, gboolean allocated) { - struct rspamd_out_buffer_s *newbuf; + struct rspamd_out_buffer_s *newbuf; newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s)); if (len == 0) { @@ -717,7 +680,7 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, APPEND_OUT_BUFFER (d, newbuf); if (!delayed) { - debug_ip ("plan write event"); + debug_ip("plan write event"); return write_buffers (d->fd, d, FALSE); } /* Otherwise plan write event */ @@ -729,13 +692,12 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, return TRUE; } -gboolean -rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d, - GString *str, - gboolean delayed, - gboolean free_on_write) +gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d, + GString *str, + gboolean delayed, + gboolean free_on_write) { - struct rspamd_out_buffer_s *newbuf; + struct rspamd_out_buffer_s *newbuf; newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s)); newbuf->data = str; @@ -744,7 +706,7 @@ rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d, APPEND_OUT_BUFFER (d, newbuf); if (!delayed) { - debug_ip ("plan write event"); + debug_ip("plan write event"); return write_buffers (d->fd, d, FALSE); } /* Otherwise plan write event */ @@ -756,7 +718,7 @@ rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d, return TRUE; } -gboolean +gboolean rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len) { if (lseek (fd, 0, SEEK_SET) == -1) { @@ -771,12 +733,9 @@ rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len) #ifndef HAVE_SENDFILE #ifdef HAVE_MMAP_NOCORE - if ((d->map = - mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd, - 0)) == MAP_FAILED) { + if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd, 0)) == MAP_FAILED) { #else - if ((d->map = - mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) { + if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) { #endif msg_warn ("mmap failed: %s", strerror (errno)); return FALSE; @@ -812,8 +771,7 @@ rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *d) { struct rspamd_out_buffer_s *cur, *tmp; - DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) - { + DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) { DELETE_OUT_BUFFER (d, cur); } /* Cleanup temporary data */ @@ -823,6 +781,6 @@ rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *d) #undef debug_ip -/* - * vi:ts=4 +/* + * vi:ts=4 */ |