diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-09-22 20:22:31 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-09-22 20:22:31 +0400 |
commit | 626a11ad9819593eadaca1e321192c75a32b51f3 (patch) | |
tree | 7f062ddf5d6ec04d7e2f4009541aa417df1cfe59 /src/buffer.c | |
parent | fe815ce580d3c455292e1acda406ddb4d371120a (diff) | |
download | rspamd-626a11ad9819593eadaca1e321192c75a32b51f3.tar.gz rspamd-626a11ad9819593eadaca1e321192c75a32b51f3.zip |
* Implement new system of async events handling (experimental)
Diffstat (limited to 'src/buffer.c')
-rw-r--r-- | src/buffer.c | 71 |
1 files changed, 29 insertions, 42 deletions
diff --git a/src/buffer.c b/src/buffer.c index 7c52da10d..33d0904f4 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -38,13 +38,12 @@ dispatcher_error_quark (void) #define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin) -static void -write_buffers (int fd, rspamd_io_dispatcher_t *d) +static gboolean +write_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean is_delayed) { GList *cur; GError *err; rspamd_buffer_t *buf; - struct timeval *ntv; ssize_t r; /* Fix order */ @@ -64,7 +63,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d) if (d->err_callback) { err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); d->err_callback (err, d->user_data); - return; + return FALSE; } } else if (r > 0) { @@ -80,7 +79,7 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d) if (d->err_callback) { err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); d->err_callback (err, d->user_data); - return; + return FALSE; } } else if (r == -1 && errno == EAGAIN) { @@ -88,10 +87,8 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d) /* Wait for other event */ event_del (d->ev); event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); - ntv = memory_pool_alloc (d->pool, sizeof (struct timeval)); - memcpy (ntv, d->tv, sizeof (struct timeval)); - event_add (d->ev, ntv); - return; + event_add (d->ev, d->tv); + return TRUE; } cur = g_list_next (cur); } @@ -103,29 +100,25 @@ write_buffers (int fd, rspamd_io_dispatcher_t *d) msg_debug ("write_buffers: all buffers were written successfully"); - if (d->write_callback) { - d->write_callback (d->user_data); - if (d->wanna_die) { + if (is_delayed && d->write_callback) { + if (!d->write_callback (d->user_data)) { msg_debug ("write_buffers: callback set wanna_die flag, terminating"); - rspamd_remove_dispatcher (d); - return; + return FALSE; } } event_del (d->ev); event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); - ntv = memory_pool_alloc (d->pool, sizeof (struct timeval)); - memcpy (ntv, d->tv, sizeof (struct timeval)); - event_add (d->ev, ntv); + event_add (d->ev, d->tv); } else { /* Plan other write event */ event_del (d->ev); event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); - ntv = memory_pool_alloc (d->pool, sizeof (struct timeval)); - memcpy (ntv, d->tv, sizeof (struct timeval)); - event_add (d->ev, ntv); + event_add (d->ev, d->tv); } + + return TRUE; } static void @@ -138,7 +131,11 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read) char **pos; size_t *len; enum io_policy saved_policy; - + + if (d->wanna_die) { + rspamd_remove_dispatcher (d); + return; + } if (d->in_buf == NULL) { d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); @@ -208,10 +205,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read) res.len --; } if (d->read_callback) { - d->read_callback (&res, d->user_data); - if (d->wanna_die) { - msg_debug ("read_buffers: callback set wanna_die flag, terminating"); - rspamd_remove_dispatcher (d); + if (!d->read_callback (&res, d->user_data)) { return; } /* Move remaining string to begin of buffer (draining) */ @@ -239,7 +233,9 @@ read_buffers (int fd, rspamd_io_dispatcher_t *d, gboolean skip_read) res.len = r; c = b + r; if (d->read_callback) { - d->read_callback (&res, d->user_data); + if (!d->read_callback (&res, d->user_data)) { + return; + } /* Move remaining string to begin of buffer (draining) */ memmove (d->in_buf->data->begin, c, *len - r); b = d->in_buf->data->begin; @@ -264,7 +260,6 @@ dispatcher_cb (int fd, short what, void *arg) { rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *)arg; GError *err; - struct timeval *ntv; msg_debug ("dispatcher_cb: in dispatcher callback, what: %d, fd: %d", (int)what, fd); @@ -280,12 +275,11 @@ dispatcher_cb (int fd, short what, void *arg) if (d->out_buffers == NULL) { event_del (d->ev); event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); - ntv = memory_pool_alloc (d->pool, sizeof (struct timeval)); - memcpy (ntv, d->tv, sizeof (struct timeval)); - event_add (d->ev, ntv); + event_add (d->ev, d->tv); } else { - write_buffers (fd, d); + /* Delayed write */ + write_buffers (fd, d, TRUE); } break; case EV_READ: @@ -303,7 +297,6 @@ rspamd_create_dispatcher (int fd, enum io_policy policy, struct timeval *tv, void *user_data) { rspamd_io_dispatcher_t *new; - struct timeval *ntv; if (fd == -1) { return NULL; @@ -331,9 +324,7 @@ rspamd_create_dispatcher (int fd, enum io_policy policy, new->fd = fd; event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new); - ntv = memory_pool_alloc (new->pool, sizeof (struct timeval)); - memcpy (ntv, new->tv, sizeof (struct timeval)); - event_add (new->ev, ntv); + event_add (new->ev, new->tv); return new; } @@ -388,13 +379,12 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, msg_debug ("rspamd_set_dispatcher_policy: new input length watermark is %ld", (long int)d->nchars); } -void +gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d, void *data, size_t len, gboolean delayed, gboolean allocated) { rspamd_buffer_t *newbuf; - struct timeval *ntv; newbuf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); if (!allocated) { @@ -416,12 +406,9 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t *d, if (!delayed) { msg_debug ("rspamd_dispatcher_write: plan write event"); - event_del (d->ev); - event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); - ntv = memory_pool_alloc (d->pool, sizeof (struct timeval)); - memcpy (ntv, d->tv, sizeof (struct timeval)); - event_add (d->ev, ntv); + return write_buffers (d->fd, d, FALSE); } + return TRUE; } void |