aboutsummaryrefslogtreecommitdiffstats
path: root/src/buffer.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-09-22 20:22:31 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-09-22 20:22:31 +0400
commit626a11ad9819593eadaca1e321192c75a32b51f3 (patch)
tree7f062ddf5d6ec04d7e2f4009541aa417df1cfe59 /src/buffer.c
parentfe815ce580d3c455292e1acda406ddb4d371120a (diff)
downloadrspamd-626a11ad9819593eadaca1e321192c75a32b51f3.tar.gz
rspamd-626a11ad9819593eadaca1e321192c75a32b51f3.zip
* Implement new system of async events handling (experimental)
Diffstat (limited to 'src/buffer.c')
-rw-r--r--src/buffer.c71
1 files changed, 29 insertions, 42 deletions
diff --git a/src/buffer.c b/src/buffer.c
index 7c52da10d..33d0904f4 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -38,13 +38,12 @@ dispatcher_error_quark (void)
#define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
-static void
-write_buffers (int fd, rspamd_io_dispatcher_t *d)
+static gboolean
+write_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean is_delayed)
{
GList *cur;
GError *err;
rspamd_buffer_t *buf;
- struct timeval *ntv;
ssize_t r;
/* Fix order */
@@ -64,7 +63,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
if (d->err_callback) {
err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
d->err_callback (err, d->user_data);
- return;
+ return FALSE;
}
}
else if (r > 0) {
@@ -80,7 +79,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
if (d->err_callback) {
err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF");
d->err_callback (err, d->user_data);
- return;
+ return FALSE;
}
}
else if (r == -1 && errno == EAGAIN) {
@@ -88,10 +87,8 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
/* Wait for other event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
- return;
+ event_add (d->ev, d->tv);
+ return TRUE;
}
cur = g_list_next (cur);
}
@@ -103,29 +100,25 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d)
msg_debug ("write_buffers: all buffers were written successfully");
- if (d->write_callback) {
- d->write_callback (d->user_data);
- if (d->wanna_die) {
+ if (is_delayed && d->write_callback) {
+ if (!d->write_callback (d->user_data)) {
msg_debug ("write_buffers: callback set wanna_die flag, terminating");
- rspamd_remove_dispatcher (d);
- return;
+ return FALSE;
}
}
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
+ event_add (d->ev, d->tv);
}
else {
/* Plan other write event */
event_del (d->ev);
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
+ event_add (d->ev, d->tv);
}
+
+ return TRUE;
}
static void
@@ -138,7 +131,11 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
char **pos;
size_t *len;
enum io_policy saved_policy;
-
+
+ if (d->wanna_die) {
+ rspamd_remove_dispatcher (d);
+ return;
+ }
if (d->in_buf == NULL) {
d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
@@ -208,10 +205,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
res.len --;
}
if (d->read_callback) {
- d->read_callback (&res, d->user_data);
- if (d->wanna_die) {
- msg_debug ("read_buffers: callback set wanna_die flag, terminating");
- rspamd_remove_dispatcher (d);
+ if (!d->read_callback (&res, d->user_data)) {
return;
}
/* Move remaining string to begin of buffer (draining) */
@@ -239,7 +233,9 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read)
res.len = r;
c = b + r;
if (d->read_callback) {
- d->read_callback (&res, d->user_data);
+ if (!d->read_callback (&res, d->user_data)) {
+ return;
+ }
/* Move remaining string to begin of buffer (draining) */
memmove (d->in_buf->data->begin, c, *len - r);
b = d->in_buf->data->begin;
@@ -264,7 +260,6 @@ dispatcher_cb (int fd, short what, void *arg)
{
rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg;
GError *err;
- struct timeval *ntv;
msg_debug ("dispatcher_cb: in dispatcher callback, what: %d, fd: %d", (int)what, fd);
@@ -280,12 +275,11 @@ dispatcher_cb (int fd, short what, void *arg)
if (d->out_buffers == NULL) {
event_del (d->ev);
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
+ event_add (d->ev, d->tv);
}
else {
- write_buffers (fd, d);
+ /* Delayed write */
+ write_buffers (fd, d, TRUE);
}
break;
case EV_READ:
@@ -303,7 +297,6 @@ rspamd_create_dispatcher (int fd, enum io_policy policy,
struct timeval *tv, void *user_data)
{
rspamd_io_dispatcher_t *new;
- struct timeval *ntv;
if (fd == -1) {
return NULL;
@@ -331,9 +324,7 @@ rspamd_create_dispatcher (int fd, enum io_policy policy,
new->fd = fd;
event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new);
- ntv = memory_pool_alloc (new->pool, sizeof (struct timeval));
- memcpy (ntv, new->tv, sizeof (struct timeval));
- event_add (new->ev, ntv);
+ event_add (new->ev, new->tv);
return new;
}
@@ -388,13 +379,12 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d,
msg_debug ("rspamd_set_dispatcher_policy: new input length watermark is %ld", (long int)d->nchars);
}
-void
+gboolean
rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
void *data,
size_t len, gboolean delayed, gboolean allocated)
{
rspamd_buffer_t *newbuf;
- struct timeval *ntv;
newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
if (!allocated) {
@@ -416,12 +406,9 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
if (!delayed) {
msg_debug ("rspamd_dispatcher_write: plan write event");
- event_del (d->ev);
- event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
- ntv = memory_pool_alloc (d->pool, sizeof (struct timeval));
- memcpy (ntv, d->tv, sizeof (struct timeval));
- event_add (d->ev, ntv);
+ return write_buffers (d->fd, d, FALSE);
}
+ return TRUE;
}
void