diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-02-19 21:16:30 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-02-19 21:16:30 +0300 |
commit | bcece60fa1bfd4bbb09a64c058835fe3245e1d18 (patch) | |
tree | 813faac7ee96029e604a8a73c88ffee4e10a579a /src/buffer.c | |
parent | 2dca592dce2fdde806af89e5cf036bd11bd4df61 (diff) | |
download | rspamd-bcece60fa1bfd4bbb09a64c058835fe3245e1d18.tar.gz rspamd-bcece60fa1bfd4bbb09a64c058835fe3245e1d18.zip |
* Implement rspamd IO with IO dispatcher (TODO: still some issues with timeouts must be resolved)
Diffstat (limited to 'src/buffer.c')
-rw-r--r-- | src/buffer.c | 355 |
1 files changed, 355 insertions, 0 deletions
diff --git a/src/buffer.c b/src/buffer.c new file mode 100644 index 000000000..46ef98850 --- /dev/null +++ b/src/buffer.c @@ -0,0 +1,355 @@ +/* + * Copyright (c) 2009, Rambler media + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "buffer.h" + +#define G_DISPATCHER_ERROR dispatcher_error_quark() + +static void dispatcher_cb (int fd, short what, void *arg); + +static inline GQuark +dispatcher_error_quark (void) +{ + return g_quark_from_static_string ("g-dispatcher-error-quark"); +} + +#define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin) + +static void +write_buffers (int fd, rspamd_io_dispatcher_t *d) +{ + GList *cur; + GError *err; + rspamd_buffer_t *buf; + ssize_t r; + + /* Fix order */ + if (d->out_buffers) { + d->out_buffers = g_list_reverse (d->out_buffers); + } + cur = g_list_first (d->out_buffers); + while (cur) { + buf = (rspamd_buffer_t *)cur->data; + if (BUFREMAIN (buf) == 0) { + /* Skip empty buffers */ + cur = g_list_next (cur); + continue; + } + + r = write (fd, buf->pos, BUFREMAIN (buf)); + if (r == -1 && errno != EAGAIN) { + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + d->err_callback (err, d->user_data); + return; + } + } + else if (r > 0) { + buf->pos += r; + if (BUFREMAIN (buf) != 0) { + /* Continue with this buffer */ + continue; + } + } + else if (r == 0) { + /* Got EOF while we wait for data */ + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); + d->err_callback (err, d->user_data); + return; + } + } + else if (errno == EAGAIN) { + /* Wait for other event */ + return; + } + cur = g_list_next (cur); + } + + if (cur == NULL) { + /* Disable write event for this time */ + g_list_free (d->out_buffers); + d->out_buffers = NULL; + + if (d->write_callback) { + d->write_callback (d->user_data); + } + + event_del (d->ev); + event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } +} + +static void +read_buffers (int fd, rspamd_io_dispatcher_t *d) +{ + ssize_t r, len; + GError *err; + f_str_t res; + char *c; + + if (d->in_buf == NULL) { + d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); + if (d->policy == BUFFER_LINE) { + d->in_buf->data = fstralloc (d->pool, BUFSIZ); + } + else { + d->in_buf->data = fstralloc (d->pool, d->nchars); + } + d->in_buf->pos = d->in_buf->data->begin; + } + + if (BUFREMAIN (d->in_buf) == 0) { + /* Buffer is full, try to call callback with overflow error */ + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, E2BIG, "buffer overflow"); + d->err_callback (err, d->user_data); + return; + } + } + else { + /* Try to read the whole buffer */ + r = read (fd, d->in_buf->pos, BUFREMAIN (d->in_buf)); + if (r == -1 && errno != EAGAIN) { + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + d->err_callback (err, d->user_data); + return; + } + } + else if (r == 0) { + /* Got EOF while we wait for data */ + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); + d->err_callback (err, d->user_data); + return; + } + } + else if (errno == EAGAIN) { + return; + } + else { + d->in_buf->pos += r; + d->in_buf->data->len += r; + } + + } + + c = d->in_buf->data->begin; + r = 0; + len = d->in_buf->data->len; + + switch (d->policy) { + case BUFFER_LINE: + while (r < len) { + if (*c == '\r' || *c == '\n') { + res.begin = d->in_buf->data->begin; + res.len = r; + if (d->read_callback) { + d->read_callback (&res, d->user_data); + if (r < len - 1 && *(c + 1) == '\n') { + r ++; + c ++; + } + /* Move remaining string to begin of buffer (draining) */ + memmove (d->in_buf->data->begin, c, len - r); + c = d->in_buf->data->begin; + d->in_buf->data->len -= r + 1; + d->in_buf->pos -= r + 1; + r = 0; + len = d->in_buf->data->len; + continue; + } + } + r ++; + c ++; + } + break; + case BUFFER_CHARACTER: + while (r < len) { + if (r == d->nchars) { + res.begin = d->in_buf->data->begin; + res.len = r; + if (d->read_callback) { + d->read_callback (&res, d->user_data); + /* Move remaining string to begin of buffer (draining) */ + memmove (d->in_buf->data->begin, c, len - r); + c = d->in_buf->data->begin; + d->in_buf->data->len -= r; + d->in_buf->pos -= r; + r = 0; + len = d->in_buf->data->len; + continue; + } + + } + r ++; + c ++; + } + break; + } +} + +#undef BUFREMAIN + +static void +dispatcher_cb (int fd, short what, void *arg) +{ + rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg; + GError *err; + + switch (what) { + case EV_TIMEOUT: + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, ETIMEDOUT, "IO timeout"); + d->err_callback (err, d->user_data); + } + break; + case EV_WRITE: + /* No data to write, disable further EV_WRITE to this fd */ + if (d->out_buffers == NULL) { + event_del (d->ev); + event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } + else { + write_buffers (fd, d); + } + break; + case EV_READ: + read_buffers (fd, d); + break; + } +} + + +rspamd_io_dispatcher_t* +rspamd_create_dispatcher (int fd, enum io_policy policy, + dispatcher_read_callback_t read_cb, + dispatcher_write_callback_t write_cb, + dispatcher_err_callback_t err_cb, + struct timeval *tv, void *user_data) +{ + rspamd_io_dispatcher_t *new; + + if (fd == -1) { + return NULL; + } + + new = g_malloc (sizeof (rspamd_io_dispatcher_t)); + bzero (new, sizeof (rspamd_io_dispatcher_t)); + + new->pool = memory_pool_new (memory_pool_get_size ()); + new->tv = memory_pool_alloc (new->pool, sizeof (struct timeval)); + memcpy (new->tv, tv, sizeof (struct timeval)); + new->policy = policy; + new->read_callback = read_cb; + new->write_callback = write_cb; + new->err_callback = err_cb; + new->user_data = user_data; + + new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event)); + new->fd = fd; + + event_set (new->ev, fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)new); + event_add (new->ev, new->tv); + + return new; +} + +void +rspamd_remove_dispatcher (rspamd_io_dispatcher_t *dispatcher) +{ + if (dispatcher != NULL) { + event_del (dispatcher->ev); + memory_pool_delete (dispatcher->pool); + g_free (dispatcher); + } +} + +void +rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, + enum io_policy policy, + size_t nchars) +{ + f_str_t *tmp; + + if (d->policy != policy || d->nchars != nchars) { + d->policy = policy; + d->nchars = nchars ? nchars : BUFSIZ; + /* Resize input buffer if needed */ + if (policy == BUFFER_CHARACTER && nchars != 0) { + if (d->in_buf && d->in_buf->data->size < nchars) { + tmp = fstralloc (d->pool, d->nchars); + memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); + tmp->len = d->in_buf->data->len; + d->in_buf->data = tmp; + } + } + else if (policy == BUFFER_LINE) { + if (d->in_buf && d->nchars < BUFSIZ) { + tmp = fstralloc (d->pool, BUFSIZ); + memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); + tmp->len = d->in_buf->data->len; + d->in_buf->data = tmp; + } + } + } +} + +void +rspamd_dispatcher_write (rspamd_io_dispatcher_t *d, + void *data, + size_t len, gboolean delayed) +{ + rspamd_buffer_t *newbuf; + + newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); + newbuf->data = memory_pool_alloc (d->pool, sizeof (f_str_t)); + + newbuf->data->begin = memory_pool_alloc (d->pool, len); + memcpy (newbuf->data->begin, data, len); + newbuf->data->size = len; + newbuf->pos = newbuf->data->begin; + + d->out_buffers = g_list_prepend (d->out_buffers, newbuf); + + if (!delayed) { + event_del (d->ev); + event_set (d->ev, d->fd, EV_READ | EV_WRITE | EV_PERSIST, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } +} + +void +rspamd_dispatcher_pause (rspamd_io_dispatcher_t *d) +{ + event_del (d->ev); +} + +/* + * vi:ts=4 + */ |