diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-07-23 12:57:31 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-07-23 12:57:31 +0100 |
commit | 379055dbbb4af997b4d3ffb161d447872d7ca357 (patch) | |
tree | 3774553d470f93e12ddeb454aad9b3b607cf8918 /src/libserver/buffer.c | |
parent | 602ae7a0b7e215ba2677131b8fdc70abc156b3ca (diff) | |
download | rspamd-379055dbbb4af997b4d3ffb161d447872d7ca357.tar.gz rspamd-379055dbbb4af997b4d3ffb161d447872d7ca357.zip |
Unify style without sorting headers.
Diffstat (limited to 'src/libserver/buffer.c')
-rw-r--r-- | src/libserver/buffer.c | 238 |
1 files changed, 140 insertions, 98 deletions
diff --git a/src/libserver/buffer.c b/src/libserver/buffer.c index 864f2fad6..403b3dafd 100644 --- a/src/libserver/buffer.c +++ b/src/libserver/buffer.c @@ -29,12 +29,15 @@ #include <sys/sendfile.h> #endif -#define G_DISPATCHER_ERROR dispatcher_error_quark() -#define debug_ip(...) rspamd_conditional_debug(rspamd_main->logger, NULL, __FUNCTION__, __VA_ARGS__) +#define G_DISPATCHER_ERROR dispatcher_error_quark () +#define debug_ip(...) rspamd_conditional_debug (rspamd_main->logger, \ + NULL, \ + __FUNCTION__, \ + __VA_ARGS__) -static void dispatcher_cb (gint fd, short what, void *arg); +static void dispatcher_cb (gint fd, short what, void *arg); -static inline GQuark +static inline GQuark dispatcher_error_quark (void) { return g_quark_from_static_string ("g-dispatcher-error-quark"); @@ -44,11 +47,11 @@ static gboolean sendfile_callback (rspamd_io_dispatcher_t *d) { - GError *err; + GError *err; #ifdef HAVE_SENDFILE # if defined(FREEBSD) || defined(DARWIN) - off_t off = 0; + off_t off = 0; #if defined(FREEBSD) /* FreeBSD version */ if (sendfile (d->sendfile_fd, d->fd, d->offset, 0, NULL, &off, 0) != 0) { @@ -58,13 +61,15 @@ sendfile_callback (rspamd_io_dispatcher_t *d) #endif if (errno != EAGAIN) { if (d->err_callback) { - err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + err = + g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( + errno)); d->err_callback (err, d->user_data); return FALSE; } } else { - debug_ip("partially write data, retry"); + debug_ip ("partially write data, retry"); /* Wait for other event */ d->offset += off; event_del (d->ev); @@ -76,30 +81,33 @@ sendfile_callback (rspamd_io_dispatcher_t *d) else { if (d->write_callback) { if (!d->write_callback (d->user_data)) { - debug_ip("callback set wanna_die flag, terminating"); + debug_ip ("callback set wanna_die flag, terminating"); return FALSE; } } event_del (d->ev); - event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, + (void *)d); event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); d->in_sendfile = FALSE; } # else - ssize_t r; + ssize_t r; /* Linux version */ r = sendfile (d->fd, d->sendfile_fd, &d->offset, d->file_size); if (r == -1) { if (errno != EAGAIN) { if (d->err_callback) { - err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + err = + g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( + errno)); d->err_callback (err, d->user_data); return FALSE; } } else { - debug_ip("partially write data, retry"); + debug_ip ("partially write data, retry"); /* Wait for other event */ event_del (d->ev); event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); @@ -108,7 +116,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d) } } else if (r + d->offset < (ssize_t)d->file_size) { - debug_ip("partially write data, retry"); + debug_ip ("partially write data, retry"); /* Wait for other event */ event_del (d->ev); event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); @@ -118,30 +126,33 @@ sendfile_callback (rspamd_io_dispatcher_t *d) else { if (d->write_callback) { if (!d->write_callback (d->user_data)) { - debug_ip("callback set wanna_die flag, terminating"); + debug_ip ("callback set wanna_die flag, terminating"); return FALSE; } } event_del (d->ev); - event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, + (void *)d); event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); d->in_sendfile = FALSE; } # endif #else - ssize_t r; + ssize_t r; r = write (d->fd, d->map, d->file_size - d->offset); if (r == -1) { if (errno != EAGAIN) { if (d->err_callback) { - err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + err = + g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( + errno)); d->err_callback (err, d->user_data); return FALSE; } } else { - debug_ip("partially write data, retry"); + debug_ip ("partially write data, retry"); /* Wait for other event */ event_del (d->ev); event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); @@ -151,7 +162,7 @@ sendfile_callback (rspamd_io_dispatcher_t *d) } else if (r + d->offset < d->file_size) { d->offset += r; - debug_ip("partially write data, retry"); + debug_ip ("partially write data, retry"); /* Wait for other event */ event_del (d->ev); event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); @@ -161,12 +172,13 @@ sendfile_callback (rspamd_io_dispatcher_t *d) else { if (d->write_callback) { if (!d->write_callback (d->user_data)) { - debug_ip("callback set wanna_die flag, terminating"); + debug_ip ("callback set wanna_die flag, terminating"); return FALSE; } } event_del (d->ev); - event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, + (void *)d); event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); d->in_sendfile = FALSE; @@ -177,25 +189,25 @@ sendfile_callback (rspamd_io_dispatcher_t *d) #define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin) -#define APPEND_OUT_BUFFER(d, buf) do { \ - DL_APPEND((d)->out_buffers.buffers, buf); \ - (d)->out_buffers.pending ++; \ - } while (0) -#define DELETE_OUT_BUFFER(d, buf) do { \ - DL_DELETE((d)->out_buffers.buffers, (buf)); \ - g_string_free((buf->data), (buf)->allocated); \ - g_slice_free1(sizeof (struct rspamd_out_buffer_s), (buf)); \ - (d)->out_buffers.pending --; \ - } while (0) - -static gboolean +#define APPEND_OUT_BUFFER(d, buf) do { \ + DL_APPEND ((d)->out_buffers.buffers, buf); \ + (d)->out_buffers.pending++; \ +} while (0) +#define DELETE_OUT_BUFFER(d, buf) do { \ + DL_DELETE ((d)->out_buffers.buffers, (buf)); \ + g_string_free ((buf->data), (buf)->allocated); \ + g_slice_free1 (sizeof (struct rspamd_out_buffer_s), (buf)); \ + (d)->out_buffers.pending--; \ +} while (0) + +static gboolean write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) { - GError *err = NULL; - struct rspamd_out_buffer_s *cur = NULL, *tmp; - ssize_t r; - struct iovec *iov; - guint i, len; + GError *err = NULL; + struct rspamd_out_buffer_s *cur = NULL, *tmp; + ssize_t r; + struct iovec *iov; + guint i, len; len = d->out_buffers.pending; while (len > 0) { @@ -203,24 +215,28 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) is_delayed = TRUE; iov = g_slice_alloc (len * sizeof (struct iovec)); i = 0; - DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) { + DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) + { iov[i].iov_base = cur->data->str; iov[i].iov_len = cur->data->len; - i ++; + i++; } /* Now try to write the whole vector */ r = writev (fd, iov, len); if (r == -1 && errno != EAGAIN) { g_slice_free1 (len * sizeof (struct iovec), iov); if (d->err_callback) { - err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + err = + g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( + errno)); d->err_callback (err, d->user_data); return FALSE; } } else if (r > 0) { /* Find pos inside buffers */ - DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) { + DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) + { if (r >= (ssize_t)cur->data->len) { /* Mark this buffer as read */ r -= cur->data->len; @@ -253,7 +269,7 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) } else if (r == -1 && errno == EAGAIN) { g_slice_free1 (len * sizeof (struct iovec), iov); - debug_ip("partially write data, retry"); + debug_ip ("partially write data, retry"); /* Wait for other event */ event_del (d->ev); event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); @@ -271,7 +287,7 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) if (is_delayed && d->write_callback) { if (!d->write_callback (d->user_data)) { - debug_ip("callback set wanna_die flag, terminating"); + debug_ip ("callback set wanna_die flag, terminating"); return FALSE; } } @@ -295,13 +311,13 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) static void read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) { - ssize_t r; - GError *err = NULL; - f_str_t res; - gchar *c, *b; - gchar *end; - size_t len; - enum io_policy saved_policy; + ssize_t r; + GError *err = NULL; + f_str_t res; + gchar *c, *b; + gchar *end; + size_t len; + enum io_policy saved_policy; if (d->wanna_die) { rspamd_remove_dispatcher (d); @@ -309,7 +325,8 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) } if (d->in_buf == NULL) { - d->in_buf = rspamd_mempool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t)); + d->in_buf = + rspamd_mempool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t)); if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) { d->in_buf->data = fstralloc_tmp (d->pool, d->default_buf_size); } @@ -335,7 +352,9 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) r = read (fd, end, BUFREMAIN (d->in_buf)); if (r == -1 && errno != EAGAIN) { if (d->err_callback) { - err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + err = + g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( + errno)); d->err_callback (err, d->user_data); return; } @@ -364,7 +383,7 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) } } else if (r == -1 && errno == EAGAIN) { - debug_ip("partially read data, retry"); + debug_ip ("partially read data, retry"); return; } else { @@ -372,8 +391,12 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) d->in_buf->pos += r; d->in_buf->data->len += r; } - debug_ip("read %z characters, policy is %s, watermark is: %z, buffer has %z bytes", r, - d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", d->nchars, d->in_buf->data->len); + debug_ip ( + "read %z characters, policy is %s, watermark is: %z, buffer has %z bytes", + r, + d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", + d->nchars, + d->in_buf->data->len); } saved_policy = d->policy; @@ -382,16 +405,16 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) len = d->in_buf->data->len; b = c; r = 0; - + switch (d->policy) { case BUFFER_LINE: /** Variables: - * b - begin of line - * r - current position in buffer - * *len - length of remaining buffer - * c - pointer to current position (buffer->begin + r) - * res - result string - */ + * b - begin of line + * r - current position in buffer + * *len - length of remaining buffer + * c - pointer to current position (buffer->begin + r) + * res - result string + */ while (r < (ssize_t)len) { if (*c == '\n') { res.begin = b; @@ -404,7 +427,7 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) } else { /* Include EOL in reply */ - res.len ++; + res.len++; } /* Call callback for a line */ if (d->read_callback) { @@ -468,7 +491,8 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) d->in_buf->pos = d->in_buf->data->begin; } if (d->policy != saved_policy && (ssize_t)len != r) { - debug_ip("policy changed during callback, restart buffer's processing"); + debug_ip ( + "policy changed during callback, restart buffer's processing"); read_buffers (fd, d, TRUE); return; } @@ -484,13 +508,14 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) * Actually we do not want to send zero sized * buffers to a read callback */ - if (! (d->want_read && res.len == 0)) { + if (!(d->want_read && res.len == 0)) { if (!d->read_callback (&res, d->user_data)) { return; } } if (d->policy != saved_policy) { - debug_ip("policy changed during callback, restart buffer's processing"); + debug_ip ( + "policy changed during callback, restart buffer's processing"); read_buffers (fd, d, TRUE); return; } @@ -506,10 +531,10 @@ read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) static void dispatcher_cb (gint fd, short what, void *arg) { - rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg; - GError *err = NULL; + rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg; + GError *err = NULL; - debug_ip("in dispatcher callback, what: %d, fd: %d", (gint)what, fd); + debug_ip ("in dispatcher callback, what: %d, fd: %d", (gint)what, fd); if ((what & EV_TIMEOUT) != 0) { if (d->err_callback) { @@ -538,7 +563,8 @@ dispatcher_cb (gint fd, short what, void *arg) else { /* Want read again */ event_del (d->ev); - event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, + (void *)d); event_base_set (d->ev_base, d->ev); event_add (d->ev, d->tv); if (d->is_restored && d->write_callback) { @@ -558,11 +584,17 @@ dispatcher_cb (gint fd, short what, void *arg) } -rspamd_io_dispatcher_t * -rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy policy, - dispatcher_read_callback_t read_cb, dispatcher_write_callback_t write_cb, dispatcher_err_callback_t err_cb, struct timeval *tv, void *user_data) +rspamd_io_dispatcher_t * +rspamd_create_dispatcher (struct event_base *base, + gint fd, + enum io_policy policy, + dispatcher_read_callback_t read_cb, + dispatcher_write_callback_t write_cb, + dispatcher_err_callback_t err_cb, + struct timeval *tv, + void *user_data) { - rspamd_io_dispatcher_t *new; + rspamd_io_dispatcher_t *new; if (fd == -1) { return NULL; @@ -608,7 +640,8 @@ rspamd_remove_dispatcher (rspamd_io_dispatcher_t * d) struct rspamd_out_buffer_s *cur, *tmp; if (d != NULL) { - DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) { + DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) + { DELETE_OUT_BUFFER (d, cur); } event_del (d->ev); @@ -618,10 +651,12 @@ rspamd_remove_dispatcher (rspamd_io_dispatcher_t * d) } void -rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, size_t nchars) +rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, + enum io_policy policy, + size_t nchars) { - f_str_t *tmp; - gint t; + f_str_t *tmp; + gint t; if (d->policy != policy || nchars != d->nchars) { d->policy = policy; @@ -630,7 +665,8 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, if (policy == BUFFER_CHARACTER && nchars != 0) { if (d->in_buf && d->in_buf->data->size < nchars) { tmp = fstralloc_tmp (d->pool, d->nchars + 1); - memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); + memcpy (tmp->begin, d->in_buf->data->begin, + d->in_buf->data->len); t = d->in_buf->pos - d->in_buf->data->begin; tmp->len = d->in_buf->data->len; d->in_buf->data = tmp; @@ -640,7 +676,8 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, else if (policy == BUFFER_LINE || policy == BUFFER_ANY) { if (d->in_buf && d->nchars < d->default_buf_size) { tmp = fstralloc_tmp (d->pool, d->default_buf_size); - memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); + memcpy (tmp->begin, d->in_buf->data->begin, + d->in_buf->data->len); t = d->in_buf->pos - d->in_buf->data->begin; tmp->len = d->in_buf->data->len; d->in_buf->data = tmp; @@ -650,14 +687,14 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, } } - debug_ip("new input length watermark is %uz", d->nchars); + debug_ip ("new input length watermark is %uz", d->nchars); } gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, - const void *data, size_t len, gboolean delayed, gboolean allocated) + const void *data, size_t len, gboolean delayed, gboolean allocated) { - struct rspamd_out_buffer_s *newbuf; + struct rspamd_out_buffer_s *newbuf; newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s)); if (len == 0) { @@ -680,7 +717,7 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, APPEND_OUT_BUFFER (d, newbuf); if (!delayed) { - debug_ip("plan write event"); + debug_ip ("plan write event"); return write_buffers (d->fd, d, FALSE); } /* Otherwise plan write event */ @@ -692,12 +729,13 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, return TRUE; } -gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d, - GString *str, - gboolean delayed, - gboolean free_on_write) +gboolean +rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d, + GString *str, + gboolean delayed, + gboolean free_on_write) { - struct rspamd_out_buffer_s *newbuf; + struct rspamd_out_buffer_s *newbuf; newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s)); newbuf->data = str; @@ -706,7 +744,7 @@ gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d, APPEND_OUT_BUFFER (d, newbuf); if (!delayed) { - debug_ip("plan write event"); + debug_ip ("plan write event"); return write_buffers (d->fd, d, FALSE); } /* Otherwise plan write event */ @@ -718,7 +756,7 @@ gboolean rspamd_dispatcher_write_string (rspamd_io_dispatcher_t *d, return TRUE; } -gboolean +gboolean rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len) { if (lseek (fd, 0, SEEK_SET) == -1) { @@ -733,9 +771,12 @@ rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len) #ifndef HAVE_SENDFILE #ifdef HAVE_MMAP_NOCORE - if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd, 0)) == MAP_FAILED) { + if ((d->map = + mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd, + 0)) == MAP_FAILED) { #else - if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) { + if ((d->map = + mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) { #endif msg_warn ("mmap failed: %s", strerror (errno)); return FALSE; @@ -771,7 +812,8 @@ rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *d) { struct rspamd_out_buffer_s *cur, *tmp; - DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) { + DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) + { DELETE_OUT_BUFFER (d, cur); } /* Cleanup temporary data */ @@ -781,6 +823,6 @@ rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *d) #undef debug_ip -/* - * vi:ts=4 +/* + * vi:ts=4 */ |