diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2010-06-10 21:47:22 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2010-06-10 21:47:22 +0400 |
commit | 07082741605e8e048a129bec28695f57263de1e8 (patch) | |
tree | 7c3f92439dfc40cac6c495f052ff3e913aea6709 /src/buffer.c | |
parent | 1be79df4d51fc2e497a73fc0163de08d406cc1f3 (diff) | |
download | rspamd-07082741605e8e048a129bec28695f57263de1e8.tar.gz rspamd-07082741605e8e048a129bec28695f57263de1e8.zip |
* Check messages received via smtp proxy
* Add support for sendfile in io dispatcher
* Fix issues with compatibility of worker_task and smtp proxy
* Proxy DATA command
Diffstat (limited to 'src/buffer.c')
-rw-r--r-- | src/buffer.c | 186 |
1 files changed, 178 insertions, 8 deletions
diff --git a/src/buffer.c b/src/buffer.c index 7dd43d2ad..5eb2c81d1 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -36,6 +36,126 @@ dispatcher_error_quark (void) return g_quark_from_static_string ("g-dispatcher-error-quark"); } +static gboolean +sendfile_callback (rspamd_io_dispatcher_t *d) +{ + ssize_t r; + GError *err; + +#ifdef HAVE_SENDFILE + #if defined(FREEBSD) + off_t off = 0; + /* FreeBSD version */ + if (sendfile (d->sendfile_fd, d->fd, d->offset, 0, 0, &off, 0) != 0) { + if (errno != EAGAIN) { + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + d->err_callback (err, d->user_data); + return FALSE; + } + } + else { + debug_ip (d->peer_addr, "partially write data, retry"); + /* Wait for other event */ + d->offset += off; + event_del (d->ev); + event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } + } + else { + if (d->write_callback) { + if (!d->write_callback (d->user_data)) { + debug_ip (d->peer_addr, "callback set wanna_die flag, terminating"); + return FALSE; + } + } + event_del (d->ev); + event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + d->in_sendfile = FALSE; + } + #else + /* Linux version */ + r = sendfile (d->fd, d->sendfile_fd, &d->offset, d->file_size); + if (r == -1) { + if (errno != EAGAIN) { + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + d->err_callback (err, d->user_data); + return FALSE; + } + } + else { + debug_ip (d->peer_addr, "partially write data, retry"); + /* Wait for other event */ + event_del (d->ev); + event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } + } + else if (r + d->offset < d->file_size) { + debug_ip (d->peer_addr, "partially write data, retry"); + /* Wait for other event */ + event_del (d->ev); + event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } + else { + if (d->write_callback) { + if (!d->write_callback (d->user_data)) { + debug_ip (d->peer_addr, "callback set wanna_die flag, terminating"); + return FALSE; + } + } + event_del (d->ev); + event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + d->in_sendfile = FALSE; + } + #endif +#else + r = write (d->fd, d->map, d->file_size - d->offset); + if (r == -1) { + if (errno != EAGAIN) { + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + d->err_callback (err, d->user_data); + return FALSE; + } + } + else { + debug_ip (d->peer_addr, "partially write data, retry"); + /* Wait for other event */ + event_del (d->ev); + event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } + } + else if (r + d->offset < d->file_size) { + d->offset += r; + debug_ip (d->peer_addr, "partially write data, retry"); + /* Wait for other event */ + event_del (d->ev); + event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } + else { + if (d->write_callback) { + if (!d->write_callback (d->user_data)) { + debug_ip (d->peer_addr, "callback set wanna_die flag, terminating"); + return FALSE; + } + } + event_del (d->ev); + event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + d->in_sendfile = FALSE; + } +#endif + return TRUE; +} + #define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin) static gboolean @@ -139,7 +259,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read) if (d->in_buf == NULL) { d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); - if (d->policy == BUFFER_LINE) { + if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) { d->in_buf->data = fstralloc (d->pool, BUFSIZ); } else { @@ -254,6 +374,22 @@ read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read) } } break; + case BUFFER_ANY: + res.begin = d->in_buf->data->begin; + res.len = *len; + if (d->read_callback) { + if (!d->read_callback (&res, d->user_data)) { + return; + } + if (d->policy != saved_policy) { + debug_ip (d->peer_addr, "policy changed during callback, restart buffer's processing"); + read_buffers (fd, d, TRUE); + return; + } + } + d->in_buf->pos = d->in_buf->data->begin; + d->in_buf->data->len = 0; + break; } } @@ -276,14 +412,19 @@ dispatcher_cb (int fd, short what, void *arg) break; case EV_WRITE: /* No data to write, disable further EV_WRITE to this fd */ - if (d->out_buffers == NULL) { - event_del (d->ev); - event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); - event_add (d->ev, d->tv); + if (d->in_sendfile) { + sendfile_callback (d); } else { - /* Delayed write */ - write_buffers (fd, d, TRUE); + if (d->out_buffers == NULL) { + event_del (d->ev); + event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } + else { + /* Delayed write */ + write_buffers (fd, d, TRUE); + } } break; case EV_READ: @@ -315,6 +456,7 @@ rspamd_create_dispatcher (int fd, enum io_policy policy, new->tv = NULL; } new->nchars = 0; + new->in_sendfile = FALSE; new->policy = policy; new->read_callback = read_cb; new->write_callback = write_cb; @@ -363,7 +505,7 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, d->in_buf->pos = d->in_buf->data->begin + t; } } - else if (policy == BUFFER_LINE) { + else if (policy == BUFFER_LINE || policy == BUFFER_ANY) { if (d->in_buf && d->nchars < BUFSIZ) { tmp = fstralloc (d->pool, BUFSIZ); memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); @@ -413,6 +555,34 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, void *data, size_t len, gbo return TRUE; } + +gboolean +rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, int fd, size_t len) +{ + if (lseek (fd, 0, SEEK_SET) == -1) { + msg_warn ("lseek failed: %s", strerror (errno)); + return FALSE; + } + + d->offset = 0; + d->in_sendfile = TRUE; + d->sendfile_fd = fd; + d->file_size = len; + +#ifndef HAVE_SENDFILE + #ifdef HAVE_MMAP_NOCORE + if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd, 0)) == MAP_FAILED) { + #else + if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) { + #endif + msg_warn ("mmap failed: %s", strerror (errno)); + return FALSE; + } +#endif + + return sendfile_callback (d); +} + void rspamd_dispatcher_pause (rspamd_io_dispatcher_t * d) { |