aboutsummaryrefslogtreecommitdiffstats
path: root/src/buffer.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2010-06-10 21:47:22 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2010-06-10 21:47:22 +0400
commit07082741605e8e048a129bec28695f57263de1e8 (patch)
tree7c3f92439dfc40cac6c495f052ff3e913aea6709 /src/buffer.c
parent1be79df4d51fc2e497a73fc0163de08d406cc1f3 (diff)
downloadrspamd-07082741605e8e048a129bec28695f57263de1e8.tar.gz
rspamd-07082741605e8e048a129bec28695f57263de1e8.zip
* Check messages received via smtp proxy
* Add support for sendfile in io dispatcher * Fix issues with compatibility of worker_task and smtp proxy * Proxy DATA command
Diffstat (limited to 'src/buffer.c')
-rw-r--r--src/buffer.c186
1 files changed, 178 insertions, 8 deletions
diff --git a/src/buffer.c b/src/buffer.c
index 7dd43d2ad..5eb2c81d1 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -36,6 +36,126 @@ dispatcher_error_quark (void)
return g_quark_from_static_string ("g-dispatcher-error-quark");
}
+static gboolean
+sendfile_callback (rspamd_io_dispatcher_t *d)
+{
+ ssize_t r;
+ GError *err;
+
+#ifdef HAVE_SENDFILE
+ #if defined(FREEBSD)
+ off_t off = 0;
+ /* FreeBSD version */
+ if (sendfile (d->sendfile_fd, d->fd, d->offset, 0, 0, &off, 0) != 0) {
+ if (errno != EAGAIN) {
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ d->err_callback (err, d->user_data);
+ return FALSE;
+ }
+ }
+ else {
+ debug_ip (d->peer_addr, "partially write data, retry");
+ /* Wait for other event */
+ d->offset += off;
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+ }
+ else {
+ if (d->write_callback) {
+ if (!d->write_callback (d->user_data)) {
+ debug_ip (d->peer_addr, "callback set wanna_die flag, terminating");
+ return FALSE;
+ }
+ }
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ d->in_sendfile = FALSE;
+ }
+ #else
+ /* Linux version */
+ r = sendfile (d->fd, d->sendfile_fd, &d->offset, d->file_size);
+ if (r == -1) {
+ if (errno != EAGAIN) {
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ d->err_callback (err, d->user_data);
+ return FALSE;
+ }
+ }
+ else {
+ debug_ip (d->peer_addr, "partially write data, retry");
+ /* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+ }
+ else if (r + d->offset < d->file_size) {
+ debug_ip (d->peer_addr, "partially write data, retry");
+ /* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+ else {
+ if (d->write_callback) {
+ if (!d->write_callback (d->user_data)) {
+ debug_ip (d->peer_addr, "callback set wanna_die flag, terminating");
+ return FALSE;
+ }
+ }
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ d->in_sendfile = FALSE;
+ }
+ #endif
+#else
+ r = write (d->fd, d->map, d->file_size - d->offset);
+ if (r == -1) {
+ if (errno != EAGAIN) {
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ d->err_callback (err, d->user_data);
+ return FALSE;
+ }
+ }
+ else {
+ debug_ip (d->peer_addr, "partially write data, retry");
+ /* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+ }
+ else if (r + d->offset < d->file_size) {
+ d->offset += r;
+ debug_ip (d->peer_addr, "partially write data, retry");
+ /* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+ else {
+ if (d->write_callback) {
+ if (!d->write_callback (d->user_data)) {
+ debug_ip (d->peer_addr, "callback set wanna_die flag, terminating");
+ return FALSE;
+ }
+ }
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ d->in_sendfile = FALSE;
+ }
+#endif
+ return TRUE;
+}
+
#define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
static gboolean
@@ -139,7 +259,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
if (d->in_buf == NULL) {
d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
- if (d->policy == BUFFER_LINE) {
+ if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) {
d->in_buf->data = fstralloc (d->pool, BUFSIZ);
}
else {
@@ -254,6 +374,22 @@ read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
}
}
break;
+ case BUFFER_ANY:
+ res.begin = d->in_buf->data->begin;
+ res.len = *len;
+ if (d->read_callback) {
+ if (!d->read_callback (&res, d->user_data)) {
+ return;
+ }
+ if (d->policy != saved_policy) {
+ debug_ip (d->peer_addr, "policy changed during callback, restart buffer's processing");
+ read_buffers (fd, d, TRUE);
+ return;
+ }
+ }
+ d->in_buf->pos = d->in_buf->data->begin;
+ d->in_buf->data->len = 0;
+ break;
}
}
@@ -276,14 +412,19 @@ dispatcher_cb (int fd, short what, void *arg)
break;
case EV_WRITE:
/* No data to write, disable further EV_WRITE to this fd */
- if (d->out_buffers == NULL) {
- event_del (d->ev);
- event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
- event_add (d->ev, d->tv);
+ if (d->in_sendfile) {
+ sendfile_callback (d);
}
else {
- /* Delayed write */
- write_buffers (fd, d, TRUE);
+ if (d->out_buffers == NULL) {
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+ else {
+ /* Delayed write */
+ write_buffers (fd, d, TRUE);
+ }
}
break;
case EV_READ:
@@ -315,6 +456,7 @@ rspamd_create_dispatcher (int fd, enum io_policy policy,
new->tv = NULL;
}
new->nchars = 0;
+ new->in_sendfile = FALSE;
new->policy = policy;
new->read_callback = read_cb;
new->write_callback = write_cb;
@@ -363,7 +505,7 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy,
d->in_buf->pos = d->in_buf->data->begin + t;
}
}
- else if (policy == BUFFER_LINE) {
+ else if (policy == BUFFER_LINE || policy == BUFFER_ANY) {
if (d->in_buf && d->nchars < BUFSIZ) {
tmp = fstralloc (d->pool, BUFSIZ);
memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
@@ -413,6 +555,34 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, void *data, size_t len, gbo
return TRUE;
}
+
+gboolean
+rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, int fd, size_t len)
+{
+ if (lseek (fd, 0, SEEK_SET) == -1) {
+ msg_warn ("lseek failed: %s", strerror (errno));
+ return FALSE;
+ }
+
+ d->offset = 0;
+ d->in_sendfile = TRUE;
+ d->sendfile_fd = fd;
+ d->file_size = len;
+
+#ifndef HAVE_SENDFILE
+ #ifdef HAVE_MMAP_NOCORE
+ if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd, 0)) == MAP_FAILED) {
+ #else
+ if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) {
+ #endif
+ msg_warn ("mmap failed: %s", strerror (errno));
+ return FALSE;
+ }
+#endif
+
+ return sendfile_callback (d);
+}
+
void
rspamd_dispatcher_pause (rspamd_io_dispatcher_t * d)
{