summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2010-06-10 21:47:22 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2010-06-10 21:47:22 +0400
commit07082741605e8e048a129bec28695f57263de1e8 (patch)
tree7c3f92439dfc40cac6c495f052ff3e913aea6709
parent1be79df4d51fc2e497a73fc0163de08d406cc1f3 (diff)
downloadrspamd-07082741605e8e048a129bec28695f57263de1e8.tar.gz
rspamd-07082741605e8e048a129bec28695f57263de1e8.zip
* Check messages received via smtp proxy
* Add support for sendfile in io dispatcher * Fix issues with compatibility of worker_task and smtp proxy * Proxy DATA command
-rw-r--r--CMakeLists.txt2
-rw-r--r--config.h.in7
-rw-r--r--src/buffer.c186
-rw-r--r--src/buffer.h16
-rw-r--r--src/filter.c7
-rw-r--r--src/main.h2
-rw-r--r--src/smtp.c259
-rw-r--r--src/smtp.h13
-rw-r--r--src/smtp_proto.c111
-rw-r--r--src/smtp_proto.h5
10 files changed, 573 insertions, 35 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9e929ec1e..72bf8b97a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -281,6 +281,7 @@ CHECK_INCLUDE_FILES(pwd.h HAVE_PWD_H)
CHECK_INCLUDE_FILES(grp.h HAVE_GRP_H)
CHECK_INCLUDE_FILES(glob.h HAVE_GLOB_H)
CHECK_INCLUDE_FILES(poll.h HAVE_POLL_H)
+CHECK_INCLUDE_FILES(sys/sendfile.h HAVE_SYS_SENDFILE_H)
IF(HAVE_SYS_WAIT_H)
LIST(APPEND CMAKE_REQUIRED_INCLUDES sys/wait.h)
@@ -294,6 +295,7 @@ CHECK_FUNCTION_EXISTS(wait4 HAVE_WAIT4)
CHECK_FUNCTION_EXISTS(waitpid HAVE_WAITPID)
CHECK_FUNCTION_EXISTS(flock HAVE_FLOCK)
CHECK_FUNCTION_EXISTS(tanhl HAVE_TANHL)
+CHECK_FUNCTION_EXISTS(sendfile HAVE_SENDFILE)
CHECK_SYMBOL_EXISTS(PATH_MAX limits.h HAVE_PATH_MAX)
CHECK_SYMBOL_EXISTS(MAXPATHLEN sys/param.h HAVE_MAXPATHLEN)
diff --git a/config.h.in b/config.h.in
index ebb48f508..bbac937c1 100644
--- a/config.h.in
+++ b/config.h.in
@@ -121,6 +121,9 @@
#cmakedefine BUILD_STATIC 1
+#cmakedefine HAVE_SENDFILE 1
+#cmakedefine HAVE_SYS_SENDFILE_H 1
+
#define RVERSION "${RSPAMD_VERSION}"
#define RSPAMD_MASTER_SITE_URL "${RSPAMD_MASTER_SITE_URL}"
@@ -294,6 +297,10 @@
#define HAVE_SETLOCALE 1
#endif
+#ifdef HAVE_SYS_SENDFILE_H
+#include <sys/sendfile.h>
+#endif
+
#ifdef WITH_GPERF_TOOLS
#include <google/profiler.h>
#endif
diff --git a/src/buffer.c b/src/buffer.c
index 7dd43d2ad..5eb2c81d1 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -36,6 +36,126 @@ dispatcher_error_quark (void)
return g_quark_from_static_string ("g-dispatcher-error-quark");
}
+static gboolean
+sendfile_callback (rspamd_io_dispatcher_t *d)
+{
+ ssize_t r;
+ GError *err;
+
+#ifdef HAVE_SENDFILE
+ #if defined(FREEBSD)
+ off_t off = 0;
+ /* FreeBSD version */
+ if (sendfile (d->sendfile_fd, d->fd, d->offset, 0, 0, &off, 0) != 0) {
+ if (errno != EAGAIN) {
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ d->err_callback (err, d->user_data);
+ return FALSE;
+ }
+ }
+ else {
+ debug_ip (d->peer_addr, "partially write data, retry");
+ /* Wait for other event */
+ d->offset += off;
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+ }
+ else {
+ if (d->write_callback) {
+ if (!d->write_callback (d->user_data)) {
+ debug_ip (d->peer_addr, "callback set wanna_die flag, terminating");
+ return FALSE;
+ }
+ }
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ d->in_sendfile = FALSE;
+ }
+ #else
+ /* Linux version */
+ r = sendfile (d->fd, d->sendfile_fd, &d->offset, d->file_size);
+ if (r == -1) {
+ if (errno != EAGAIN) {
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ d->err_callback (err, d->user_data);
+ return FALSE;
+ }
+ }
+ else {
+ debug_ip (d->peer_addr, "partially write data, retry");
+ /* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+ }
+ else if (r + d->offset < d->file_size) {
+ debug_ip (d->peer_addr, "partially write data, retry");
+ /* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+ else {
+ if (d->write_callback) {
+ if (!d->write_callback (d->user_data)) {
+ debug_ip (d->peer_addr, "callback set wanna_die flag, terminating");
+ return FALSE;
+ }
+ }
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ d->in_sendfile = FALSE;
+ }
+ #endif
+#else
+ r = write (d->fd, d->map, d->file_size - d->offset);
+ if (r == -1) {
+ if (errno != EAGAIN) {
+ if (d->err_callback) {
+ err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno));
+ d->err_callback (err, d->user_data);
+ return FALSE;
+ }
+ }
+ else {
+ debug_ip (d->peer_addr, "partially write data, retry");
+ /* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+ }
+ else if (r + d->offset < d->file_size) {
+ d->offset += r;
+ debug_ip (d->peer_addr, "partially write data, retry");
+ /* Wait for other event */
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+ else {
+ if (d->write_callback) {
+ if (!d->write_callback (d->user_data)) {
+ debug_ip (d->peer_addr, "callback set wanna_die flag, terminating");
+ return FALSE;
+ }
+ }
+ event_del (d->ev);
+ event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ d->in_sendfile = FALSE;
+ }
+#endif
+ return TRUE;
+}
+
#define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin)
static gboolean
@@ -139,7 +259,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
if (d->in_buf == NULL) {
d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t));
- if (d->policy == BUFFER_LINE) {
+ if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) {
d->in_buf->data = fstralloc (d->pool, BUFSIZ);
}
else {
@@ -254,6 +374,22 @@ read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
}
}
break;
+ case BUFFER_ANY:
+ res.begin = d->in_buf->data->begin;
+ res.len = *len;
+ if (d->read_callback) {
+ if (!d->read_callback (&res, d->user_data)) {
+ return;
+ }
+ if (d->policy != saved_policy) {
+ debug_ip (d->peer_addr, "policy changed during callback, restart buffer's processing");
+ read_buffers (fd, d, TRUE);
+ return;
+ }
+ }
+ d->in_buf->pos = d->in_buf->data->begin;
+ d->in_buf->data->len = 0;
+ break;
}
}
@@ -276,14 +412,19 @@ dispatcher_cb (int fd, short what, void *arg)
break;
case EV_WRITE:
/* No data to write, disable further EV_WRITE to this fd */
- if (d->out_buffers == NULL) {
- event_del (d->ev);
- event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
- event_add (d->ev, d->tv);
+ if (d->in_sendfile) {
+ sendfile_callback (d);
}
else {
- /* Delayed write */
- write_buffers (fd, d, TRUE);
+ if (d->out_buffers == NULL) {
+ event_del (d->ev);
+ event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d);
+ event_add (d->ev, d->tv);
+ }
+ else {
+ /* Delayed write */
+ write_buffers (fd, d, TRUE);
+ }
}
break;
case EV_READ:
@@ -315,6 +456,7 @@ rspamd_create_dispatcher (int fd, enum io_policy policy,
new->tv = NULL;
}
new->nchars = 0;
+ new->in_sendfile = FALSE;
new->policy = policy;
new->read_callback = read_cb;
new->write_callback = write_cb;
@@ -363,7 +505,7 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy,
d->in_buf->pos = d->in_buf->data->begin + t;
}
}
- else if (policy == BUFFER_LINE) {
+ else if (policy == BUFFER_LINE || policy == BUFFER_ANY) {
if (d->in_buf && d->nchars < BUFSIZ) {
tmp = fstralloc (d->pool, BUFSIZ);
memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len);
@@ -413,6 +555,34 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, void *data, size_t len, gbo
return TRUE;
}
+
+gboolean
+rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, int fd, size_t len)
+{
+ if (lseek (fd, 0, SEEK_SET) == -1) {
+ msg_warn ("lseek failed: %s", strerror (errno));
+ return FALSE;
+ }
+
+ d->offset = 0;
+ d->in_sendfile = TRUE;
+ d->sendfile_fd = fd;
+ d->file_size = len;
+
+#ifndef HAVE_SENDFILE
+ #ifdef HAVE_MMAP_NOCORE
+ if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd, 0)) == MAP_FAILED) {
+ #else
+ if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) {
+ #endif
+ msg_warn ("mmap failed: %s", strerror (errno));
+ return FALSE;
+ }
+#endif
+
+ return sendfile_callback (d);
+}
+
void
rspamd_dispatcher_pause (rspamd_io_dispatcher_t * d)
{
diff --git a/src/buffer.h b/src/buffer.h
index 4cf9de555..9f3897d1c 100644
--- a/src/buffer.h
+++ b/src/buffer.h
@@ -20,6 +20,7 @@ typedef void (*dispatcher_err_callback_t)(GError *err, void *user_data);
enum io_policy {
BUFFER_LINE, /**< call handler when we have line ready */
BUFFER_CHARACTER, /**< call handler when we have some characters */
+ BUFFER_ANY /**< call handler whenever we got data in buffer */
};
/**
@@ -45,6 +46,13 @@ typedef struct rspamd_io_dispatcher_s {
dispatcher_write_callback_t write_callback; /**< write callback */
dispatcher_err_callback_t err_callback; /**< error callback */
void *user_data; /**< user's data for callbacks */
+ off_t offset; /**< for sendfile use */
+ size_t file_size;
+ int sendfile_fd;
+ gboolean in_sendfile; /**< whether buffer is in sendfile mode */
+#ifndef HAVE_SENDFILE
+ void *map;
+#endif
} rspamd_io_dispatcher_t;
/**
@@ -87,6 +95,14 @@ gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d,
size_t len, gboolean delayed, gboolean allocated);
/**
+ * Send specified descriptor to dispatcher
+ * @param d pointer to dispatcher's object
+ * @param fd descriptor of file
+ * @param len length of data
+ */
+gboolean rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, int fd, size_t len);
+
+/**
* Pause IO events on dispatcher
* @param d pointer to dispatcher's object
*/
diff --git a/src/filter.c b/src/filter.c
index 0a18bd793..c6f936087 100644
--- a/src/filter.c
+++ b/src/filter.c
@@ -310,7 +310,12 @@ continue_process_filters (struct worker_task *task)
/* Process all statfiles */
process_statfiles (task);
/* XXX: ugly direct call */
- task->dispatcher->write_callback (task);
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ task->dispatcher->write_callback (task);
+ }
return 1;
}
diff --git a/src/main.h b/src/main.h
index 9371652d3..d5f971468 100644
--- a/src/main.h
+++ b/src/main.h
@@ -225,6 +225,8 @@ struct worker_task {
gboolean view_checked;
gboolean pass_all_filters; /**< pass task throught every rule */
uint32_t parser_recursion; /**< for avoiding recursion stack overflow */
+ gboolean (*fin_callback)(void *arg); /**< calback for filters finalizing */
+ void *fin_arg; /**< argument for fin callback */
};
/**
diff --git a/src/smtp.c b/src/smtp.c
index 1bf135d30..3f366f7fd 100644
--- a/src/smtp.c
+++ b/src/smtp.c
@@ -29,6 +29,8 @@
#include "smtp.h"
#include "smtp_proto.h"
#include "map.h"
+#include "message.h"
+#include "settings.h"
#include "evdns/evdns.h"
/* Max line size as it is defined in rfc2822 */
@@ -39,6 +41,9 @@
#define DEFAULT_UPSTREAM_DEAD_TIME 300
#define DEFAULT_UPSTREAM_MAXERRORS 10
+
+#define DEFAULT_REJECT_MESSAGE "450 4.5.0 Spam message rejected"
+
static gboolean smtp_write_socket (void *arg);
static sig_atomic_t wanna_die = 0;
@@ -89,6 +94,9 @@ free_smtp_session (gpointer arg)
}
memory_pool_delete (session->pool);
close (session->sock);
+ if (session->temp_fd != -1) {
+ close (session->temp_fd);
+ }
g_free (session);
}
}
@@ -146,7 +154,7 @@ create_smtp_upstream_connection (struct smtp_session *session)
}
/* Create a dispatcher for upstream connection */
session->upstream_dispatcher = rspamd_create_dispatcher (session->upstream_sock, BUFFER_LINE,
- smtp_upstream_read_socket, NULL, smtp_upstream_err_socket,
+ smtp_upstream_read_socket, smtp_upstream_write_socket, smtp_upstream_err_socket,
&session->ctx->smtp_timeout, session);
session->state = SMTP_STATE_WAIT_UPSTREAM;
session->upstream_state = SMTP_STATE_GREETING;
@@ -159,6 +167,8 @@ read_smtp_command (struct smtp_session *session, f_str_t *line)
{
/* XXX: write dialog implementation */
struct smtp_command *cmd;
+ char outbuf[BUFSIZ];
+ int r;
if (! parse_smtp_command (session, line, &cmd)) {
session->error = SMTP_ERROR_BAD_COMMAND;
@@ -201,10 +211,22 @@ read_smtp_command (struct smtp_session *session, f_str_t *line)
if (session->state == SMTP_STATE_RCPT) {
if (parse_smtp_rcpt (session, cmd)) {
/* Make upstream connection */
- if (!create_smtp_upstream_connection (session)) {
- session->error = SMTP_ERROR_UPSTREAM;
- session->state = SMTP_STATE_CRITICAL_ERROR;
- return FALSE;
+ if (session->upstream == NULL) {
+ if (!create_smtp_upstream_connection (session)) {
+ session->error = SMTP_ERROR_UPSTREAM;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ return FALSE;
+ }
+ }
+ else {
+ /* Send next rcpt to upstream */
+ session->state = SMTP_STATE_WAIT_UPSTREAM;
+ session->upstream_state = SMTP_STATE_BEFORE_DATA;
+ rspamd_dispatcher_restore (session->upstream_dispatcher);
+ r = snprintf (outbuf, sizeof (outbuf), "RCPT TO: ");
+ r += smtp_upstream_write_list (session->rcpt->data, outbuf + r, sizeof (outbuf) - r);
+ session->cur_rcpt = NULL;
+ return rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE);
}
session->state = SMTP_STATE_WAIT_UPSTREAM;
return TRUE;
@@ -222,6 +244,10 @@ read_smtp_command (struct smtp_session *session, f_str_t *line)
if (session->rcpt) {
g_list_free (session->rcpt);
}
+ if (session->upstream) {
+ remove_normal_event (session->s, smtp_upstream_finalize_connection, session);
+ session->upstream = NULL;
+ }
session->state = SMTP_STATE_GREETING;
break;
case SMTP_COMMAND_DATA:
@@ -230,7 +256,19 @@ read_smtp_command (struct smtp_session *session, f_str_t *line)
session->error = SMTP_ERROR_RECIPIENTS;
return FALSE;
}
- session->error = SMTP_ERROR_DATA_OK;
+ if (session->upstream == NULL) {
+ session->error = SMTP_ERROR_UPSTREAM;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ return FALSE;
+ }
+ else {
+ session->upstream_state = SMTP_STATE_DATA;
+ rspamd_dispatcher_restore (session->upstream_dispatcher);
+ r = snprintf (outbuf, sizeof (outbuf), "DATA" CRLF);
+ session->state = SMTP_STATE_WAIT_UPSTREAM;
+ session->error = SMTP_ERROR_DATA_OK;
+ return rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE);
+ }
}
else {
goto improper_sequence;
@@ -250,6 +288,88 @@ improper_sequence:
return FALSE;
}
+static gboolean
+smtp_send_upstream_message (struct smtp_session *session)
+{
+ rspamd_dispatcher_pause (session->dispatcher);
+ rspamd_dispatcher_restore (session->upstream_dispatcher);
+
+ if (! rspamd_dispatcher_sendfile (session->upstream_dispatcher, session->temp_fd, session->temp_size)) {
+ goto err;
+ }
+ session->upstream_state = SMTP_STATE_IN_SENDFILE;
+ session->state = SMTP_STATE_WAIT_UPSTREAM;
+ return TRUE;
+
+err:
+ session->error = SMTP_ERROR_FILE;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+}
+
+static gboolean
+process_smtp_data (struct smtp_session *session)
+{
+ struct stat st;
+ int r;
+
+ if (fstat (session->temp_fd, &st) == -1) {
+ goto err;
+ }
+ /* Now mmap temp file if it is small enough */
+ session->temp_size = st.st_size;
+ if (session->ctx->max_size == 0 || st.st_size < session->ctx->max_size) {
+ session->task = construct_task (session->worker);
+ session->task->fin_callback = smtp_write_socket;
+ session->task->fin_arg = session;
+ session->task->msg = memory_pool_alloc (session->pool, sizeof (f_str_t));
+#ifdef HAVE_MMAP_NOCORE
+ if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED | MAP_NOCORE, session->temp_fd, 0)) == MAP_FAILED) {
+#else
+ if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, session->temp_fd, 0)) == MAP_FAILED) {
+#endif
+ goto err;
+ }
+ session->task->msg->len = st.st_size;
+ if (process_message (session->task) == -1) {
+ msg_err ("cannot process message");
+ munmap (session->task->msg->begin, st.st_size);
+ goto err;
+ }
+ session->task->helo = session->helo;
+ memcpy (&session->task->from_addr, &session->client_addr, sizeof (struct in_addr));
+ session->task->cmd = CMD_CHECK;
+ r = process_filters (session->task);
+ if (r == -1) {
+ munmap (session->task->msg->begin, st.st_size);
+ msg_err ("cannot process filters");
+ goto err;
+ }
+ else if (r == 0) {
+ session->state = SMTP_STATE_END;
+ rspamd_dispatcher_pause (session->dispatcher);
+ }
+ else {
+ process_statfiles (session->task);
+ session->state = SMTP_STATE_END;
+ return smtp_write_socket (session);
+ }
+ }
+ else {
+ return smtp_send_upstream_message (session);
+ }
+
+ return TRUE;
+err:
+ session->error = SMTP_ERROR_FILE;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+}
+
/*
* Callback that is called when there is data to read in buffer
*/
@@ -257,6 +377,8 @@ static gboolean
smtp_read_socket (f_str_t * in, void *arg)
{
struct smtp_session *session = arg;
+ char *p;
+ gboolean do_write;
switch (session->state) {
case SMTP_STATE_RESOLVE_REVERSE:
@@ -275,6 +397,69 @@ smtp_read_socket (f_str_t * in, void *arg)
smtp_write_socket (session);
}
break;
+ case SMTP_STATE_AFTER_DATA:
+ if (in->len == 0) {
+ return TRUE;
+ }
+ p = in->begin + in->len;
+ do_write = TRUE;
+ if (in->len > sizeof (session->data_end)) {
+ /* New data is more than trailer buffer */
+ if (session->data_idx != 0 && write (session->temp_fd, session->data_end, session->data_idx) != session->data_idx) {
+ msg_err ("cannot write to temp file: %s", strerror (errno));
+ session->error = SMTP_ERROR_FILE;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+ }
+ memcpy (session->data_end, p - sizeof (session->data_end) + 1, sizeof (session->data_end));
+ session->data_idx = 5;
+ }
+ else if (session->data_idx + in->len < sizeof (session->data_end)){
+ /* New data is less than trailer buffer plus index */
+ memcpy (session->data_end + session->data_idx, in->begin, in->len);
+ session->data_idx += in->len;
+ do_write = FALSE;
+ }
+ else {
+ /* Save remaining bytes */
+ if (session->data_idx != 0 && write (session->temp_fd, session->data_end, session->data_idx) != session->data_idx) {
+ msg_err ("cannot write to temp file: %s", strerror (errno));
+ session->error = SMTP_ERROR_FILE;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+ }
+ /* Move bytes */
+ session->data_idx = sizeof (session->data_end) - in->len;
+ memmove (session->data_end, session->data_end + (sizeof (session->data_end) - in->len) + 1, sizeof (session->data_end) - in->len);
+ memcpy (session->data_end + session->data_idx, in->begin, in->len);
+ session->data_idx = 5;
+ }
+ if (do_write) {
+ if (memcmp (session->data_end, DATA_END_TRAILER, sizeof (session->data_end)) == 0) {
+ return process_smtp_data (session);
+ }
+ else {
+ if (session->data_idx < in->len) {
+ if (in->len - session->data_idx != 0 &&
+ write (session->temp_fd, in->begin, in->len - session->data_idx) != in->len - session->data_idx) {
+ msg_err ("cannot write to temp file: %s", strerror (errno));
+ session->error = SMTP_ERROR_FILE;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+ }
+ }
+ }
+ }
+ break;
+ case SMTP_STATE_WAIT_UPSTREAM:
+ rspamd_dispatcher_pause (session->dispatcher);
+ break;
default:
session->error = make_smtp_error (session, 550, "%s Internal error", "5.5.0");
session->state = SMTP_STATE_ERROR;
@@ -299,6 +484,13 @@ static gboolean
smtp_write_socket (void *arg)
{
struct smtp_session *session = arg;
+ double ms = 0, rs = 0;
+ int r;
+ struct metric_result *metric_res;
+ struct metric *m;
+ char logbuf[1024];
+ gboolean is_spam = FALSE;
+ GList *symbols, *cur;
if (session->state == SMTP_STATE_CRITICAL_ERROR) {
if (session->error != NULL) {
@@ -307,6 +499,47 @@ smtp_write_socket (void *arg)
destroy_session (session->s);
return FALSE;
}
+ else if (session->state == SMTP_STATE_END) {
+ /* Check metric */
+ m = g_hash_table_lookup (session->cfg->metrics, session->ctx->metric);
+ metric_res = g_hash_table_lookup (session->task->results, session->ctx->metric);
+ if (m != NULL && metric_res != NULL) {
+ if (!check_metric_settings (session->task, m, &ms, &rs)) {
+ ms = m->required_score;
+ rs = m->reject_score;
+ }
+ if (metric_res->score >= ms) {
+ is_spam = TRUE;
+ }
+
+ r = snprintf (logbuf, sizeof (logbuf), "msg ok, id: <%s>, ", session->task->message_id);
+ r += snprintf (logbuf + r, sizeof (logbuf) - r, "(%s: %s: [%.2f/%.2f/%.2f] [",
+ (char *)m->name, is_spam ? "T" : "F", metric_res->score, ms, rs);
+ symbols = g_hash_table_get_keys (metric_res->symbols);
+ cur = symbols;
+ while (cur) {
+ if (g_list_next (cur) != NULL) {
+ r += snprintf (logbuf + r, sizeof (logbuf) - r, "%s,", (char *)cur->data);
+ }
+ else {
+ r += snprintf (logbuf + r, sizeof (logbuf) - r, "%s", (char *)cur->data);
+ }
+ cur = g_list_next (cur);
+ }
+ g_list_free (symbols);
+ r += snprintf (logbuf + r, sizeof (logbuf) - r, "]), len: %ld, time: %sms",
+ (long int)session->task->msg->len, calculate_check_time (&session->task->ts, session->cfg->clock_res));
+ msg_info ("%s", logbuf);
+
+ if (is_spam) {
+ rspamd_dispatcher_write (session->dispatcher, session->ctx->reject_message, 0, FALSE, TRUE);
+ rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+ }
+ }
+ return smtp_send_upstream_message (session);
+ }
else {
if (session->error != NULL) {
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
@@ -501,8 +734,10 @@ accept_socket (int fd, short what, void *arg)
}
session->sock = nfd;
+ session->temp_fd = -1;
session->worker = worker;
session->ctx = worker->ctx;
+ session->cfg = worker->srv->cfg;
session->session_time = time (NULL);
worker->srv->stat->connections_count++;
@@ -762,6 +997,18 @@ config_smtp_worker (struct rspamd_worker *worker)
if ((value = g_hash_table_lookup (worker->cf->params, "smtp_capabilities")) != NULL) {
make_capabilities (ctx, value);
}
+ if ((value = g_hash_table_lookup (worker->cf->params, "smtp_metric")) != NULL) {
+ ctx->metric = memory_pool_strdup (ctx->pool, value);
+ }
+ else {
+ ctx->metric = DEFAULT_METRIC;
+ }
+ if ((value = g_hash_table_lookup (worker->cf->params, "smtp_reject_message")) != NULL) {
+ ctx->reject_message = memory_pool_strdup (ctx->pool, value);
+ }
+ else {
+ ctx->reject_message = DEFAULT_REJECT_MESSAGE;
+ }
/* Set ctx */
worker->ctx = ctx;
diff --git a/src/smtp.h b/src/smtp.h
index 36319bb4c..4208df4f2 100644
--- a/src/smtp.h
+++ b/src/smtp.h
@@ -29,6 +29,9 @@ struct smtp_worker_ctx {
gboolean use_xclient;
gboolean helo_required;
char *smtp_capabilities;
+ char *reject_message;
+ size_t max_size;
+ char *metric;
};
enum rspamd_smtp_state {
@@ -41,9 +44,10 @@ enum rspamd_smtp_state {
SMTP_STATE_RCPT,
SMTP_STATE_BEFORE_DATA,
SMTP_STATE_DATA,
- SMTP_STATE_EOD,
+ SMTP_STATE_AFTER_DATA,
SMTP_STATE_END,
SMTP_STATE_WAIT_UPSTREAM,
+ SMTP_STATE_IN_SENDFILE,
SMTP_STATE_ERROR,
SMTP_STATE_CRITICAL_ERROR,
SMTP_STATE_WRITE_ERROR
@@ -51,6 +55,7 @@ enum rspamd_smtp_state {
struct smtp_session {
struct smtp_worker_ctx *ctx;
+ struct config_file *cfg;
memory_pool_t *pool;
enum rspamd_smtp_state state;
@@ -62,6 +67,8 @@ struct smtp_session {
char *error;
int sock;
int upstream_sock;
+ int temp_fd;
+ size_t temp_size;
time_t session_time;
gchar *helo;
@@ -74,6 +81,10 @@ struct smtp_session {
rspamd_io_dispatcher_t *upstream_dispatcher;
struct smtp_upstream *upstream;
+
+ char data_end[5];
+ char data_idx;
+
gboolean resolved;
gboolean esmtp;
};
diff --git a/src/smtp_proto.c b/src/smtp_proto.c
index 82fffa690..bef52b6b5 100644
--- a/src/smtp_proto.c
+++ b/src/smtp_proto.c
@@ -163,7 +163,7 @@ parse_smtp_command (struct smtp_session *session, f_str_t *line, struct smtp_com
break;
case SMTP_PARSE_ARGUMENT:
if (ch == ' ' || ch == ':' || ch == CR || ch == LF || i == line->len - 1) {
- if (i == line->len - 1) {
+ if (i == line->len - 1 && (ch != ' ' && ch != CR && ch != LF)) {
p ++;
}
arg->len = p - c;
@@ -329,14 +329,14 @@ parse_smtp_rcpt (struct smtp_session *session, struct smtp_command *cmd)
/* Return -1 if there are some error, 1 if all is ok and 0 in case of incomplete reply */
static int
-check_smtp_ustream_reply (f_str_t *in)
+check_smtp_ustream_reply (f_str_t *in, char success_code)
{
char *p;
/* Check for 250 at the begin of line */
if (in->len >= sizeof ("220 ") - 1) {
p = in->begin;
- if (p[0] == '2') {
+ if (p[0] == success_code) {
/* Last reply line */
if (p[3] == ' ') {
return 1;
@@ -353,7 +353,7 @@ check_smtp_ustream_reply (f_str_t *in)
return -1;
}
-static size_t
+size_t
smtp_upstream_write_list (GList *args, char *buf, size_t buflen)
{
GList *cur = args;
@@ -374,22 +374,35 @@ smtp_upstream_write_list (GList *args, char *buf, size_t buflen)
}
gboolean
+smtp_upstream_write_socket (void *arg)
+{
+ struct smtp_session *session = arg;
+
+ if (session->upstream_state == SMTP_STATE_IN_SENDFILE) {
+ session->upstream_state = SMTP_STATE_END;
+ return rspamd_dispatcher_write (session->upstream_dispatcher, DATA_END_TRAILER, sizeof (DATA_END_TRAILER) - 1, FALSE, TRUE);
+ }
+
+ return TRUE;
+}
+
+gboolean
smtp_upstream_read_socket (f_str_t * in, void *arg)
{
struct smtp_session *session = arg;
- char outbuf[BUFSIZ];
+ char outbuf[BUFSIZ], *tmppattern;
int r;
switch (session->upstream_state) {
case SMTP_STATE_GREETING:
- r = check_smtp_ustream_reply (in);
+ r = check_smtp_ustream_reply (in, '2');
if (r == -1) {
- session->error = memory_pool_alloc (session->pool, in->len + 3);
+ session->error = memory_pool_alloc (session->pool, in->len + 1);
g_strlcpy (session->error, in->begin, in->len + 1);
/* XXX: assume upstream errors as critical errors */
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_restore (session->dispatcher);
- rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
destroy_session (session->s);
return FALSE;
@@ -417,14 +430,14 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
}
break;
case SMTP_STATE_HELO:
- r = check_smtp_ustream_reply (in);
+ r = check_smtp_ustream_reply (in, '2');
if (r == -1) {
session->error = memory_pool_alloc (session->pool, in->len + 1);
g_strlcpy (session->error, in->begin, in->len + 1);
/* XXX: assume upstream errors as critical errors */
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_restore (session->dispatcher);
- rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
destroy_session (session->s);
return FALSE;
@@ -443,14 +456,14 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
}
break;
case SMTP_STATE_FROM:
- r = check_smtp_ustream_reply (in);
+ r = check_smtp_ustream_reply (in, '2');
if (r == -1) {
session->error = memory_pool_alloc (session->pool, in->len + 1);
g_strlcpy (session->error, in->begin, in->len + 1);
/* XXX: assume upstream errors as critical errors */
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_restore (session->dispatcher);
- rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
destroy_session (session->s);
return FALSE;
@@ -463,14 +476,14 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
}
break;
case SMTP_STATE_RCPT:
- r = check_smtp_ustream_reply (in);
+ r = check_smtp_ustream_reply (in, '2');
if (r == -1) {
session->error = memory_pool_alloc (session->pool, in->len + 1);
g_strlcpy (session->error, in->begin, in->len + 1);
/* XXX: assume upstream errors as critical errors */
session->state = SMTP_STATE_CRITICAL_ERROR;
rspamd_dispatcher_restore (session->dispatcher);
- rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
destroy_session (session->s);
return FALSE;
@@ -485,14 +498,20 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
}
break;
case SMTP_STATE_BEFORE_DATA:
- r = check_smtp_ustream_reply (in);
+ r = check_smtp_ustream_reply (in, '2');
if (r == -1) {
session->error = memory_pool_alloc (session->pool, in->len + 1);
g_strlcpy (session->error, in->begin, in->len + 1);
rspamd_dispatcher_restore (session->dispatcher);
- rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
- session->rcpt = g_list_delete_link (session->rcpt, session->cur_rcpt);
+ if (session->cur_rcpt) {
+ session->rcpt = g_list_delete_link (session->rcpt, session->cur_rcpt);
+ }
+ else {
+ session->rcpt = g_list_delete_link (session->rcpt, session->rcpt);
+ }
+ session->state = SMTP_STATE_RCPT;
return TRUE;
}
else if (r == 1) {
@@ -500,6 +519,7 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
r = snprintf (outbuf, sizeof (outbuf), "RCPT TO: ");
r += smtp_upstream_write_list (session->cur_rcpt, outbuf + r, sizeof (outbuf) - r);
session->cur_rcpt = g_list_next (session->cur_rcpt);
+ rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE);
}
else {
session->upstream_state = SMTP_STATE_DATA;
@@ -508,15 +528,68 @@ smtp_upstream_read_socket (f_str_t * in, void *arg)
session->error = memory_pool_alloc (session->pool, in->len + 1);
g_strlcpy (session->error, in->begin, in->len + 1);
/* Write to client */
- rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE);
rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
if (session->state == SMTP_STATE_WAIT_UPSTREAM) {
rspamd_dispatcher_restore (session->dispatcher);
session->state = SMTP_STATE_RCPT;
}
- return rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE);
}
break;
+ case SMTP_STATE_DATA:
+ r = check_smtp_ustream_reply (in, '3');
+ if (r == -1) {
+ session->error = memory_pool_alloc (session->pool, in->len + 1);
+ g_strlcpy (session->error, in->begin, in->len + 1);
+ /* XXX: assume upstream errors as critical errors */
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ rspamd_dispatcher_restore (session->dispatcher);
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+ }
+ else if (r == 1) {
+ r = strlen (session->cfg->temp_dir) + sizeof ("/rspamd-XXXXXX.tmp");
+ tmppattern = alloca (r);
+ snprintf (tmppattern, r, "%s/rspamd-XXXXXX.tmp", session->cfg->temp_dir);
+ session->temp_fd = g_mkstemp_full (tmppattern, O_RDWR, S_IWUSR | S_IRUSR);
+ if (session->temp_fd == -1) {
+ session->error = SMTP_ERROR_FILE;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ rspamd_dispatcher_restore (session->dispatcher);
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+ }
+ session->state = SMTP_STATE_AFTER_DATA;
+ session->error = SMTP_ERROR_DATA_OK;
+ rspamd_dispatcher_restore (session->dispatcher);
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ rspamd_dispatcher_pause (session->upstream_dispatcher);
+ rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_ANY, 0);
+ session->data_idx = 0;
+ memset (session->data_end, 0, sizeof (session->data_end));
+ return TRUE;
+ }
+ break;
+ case SMTP_STATE_END:
+ session->error = memory_pool_alloc (session->pool, in->len + 1);
+ g_strlcpy (session->error, in->begin, in->len + 1);
+ session->state = SMTP_STATE_END;
+ rspamd_dispatcher_restore (session->dispatcher);
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+ default:
+ msg_err ("got upstream reply at unexpected state: %d, reply: %V", session->upstream_state, in);
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ rspamd_dispatcher_restore (session->dispatcher);
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
}
return TRUE;
diff --git a/src/smtp_proto.h b/src/smtp_proto.h
index c78cfb094..e850e4a5d 100644
--- a/src/smtp_proto.h
+++ b/src/smtp_proto.h
@@ -11,9 +11,12 @@
#define SMTP_ERROR_RECIPIENTS "554 No valid recipients" CRLF
#define SMTP_ERROR_UNIMPLIMENTED "502 Command not implemented" CRLF
#define SMTP_ERROR_UPSTREAM "421 Service not available, closing transmission channel" CRLF
+#define SMTP_ERROR_FILE "420 Service not available, filesystem error" CRLF
#define SMTP_ERROR_OK "250 Requested mail action okay, completed" CRLF
#define SMTP_ERROR_DATA_OK "354 Start mail input; end with <CRLF>.<CRLF>" CRLF
+#define DATA_END_TRAILER CRLF "." CRLF
+
struct smtp_command {
enum {
@@ -40,7 +43,9 @@ gboolean parse_smtp_rcpt (struct smtp_session *session, struct smtp_command *cmd
/* Upstream SMTP */
gboolean smtp_upstream_read_socket (f_str_t * in, void *arg);
+gboolean smtp_upstream_write_socket (void *arg);
void smtp_upstream_err_socket (GError *err, void *arg);
void smtp_upstream_finalize_connection (gpointer data);
+size_t smtp_upstream_write_list (GList *args, char *buf, size_t buflen);
#endif