summaryrefslogtreecommitdiffstats
path: root/src/smtp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/smtp.c')
-rw-r--r--src/smtp.c259
1 files changed, 253 insertions, 6 deletions
diff --git a/src/smtp.c b/src/smtp.c
index 1bf135d30..3f366f7fd 100644
--- a/src/smtp.c
+++ b/src/smtp.c
@@ -29,6 +29,8 @@
#include "smtp.h"
#include "smtp_proto.h"
#include "map.h"
+#include "message.h"
+#include "settings.h"
#include "evdns/evdns.h"
/* Max line size as it is defined in rfc2822 */
@@ -39,6 +41,9 @@
#define DEFAULT_UPSTREAM_DEAD_TIME 300
#define DEFAULT_UPSTREAM_MAXERRORS 10
+
+#define DEFAULT_REJECT_MESSAGE "450 4.5.0 Spam message rejected"
+
static gboolean smtp_write_socket (void *arg);
static sig_atomic_t wanna_die = 0;
@@ -89,6 +94,9 @@ free_smtp_session (gpointer arg)
}
memory_pool_delete (session->pool);
close (session->sock);
+ if (session->temp_fd != -1) {
+ close (session->temp_fd);
+ }
g_free (session);
}
}
@@ -146,7 +154,7 @@ create_smtp_upstream_connection (struct smtp_session *session)
}
/* Create a dispatcher for upstream connection */
session->upstream_dispatcher = rspamd_create_dispatcher (session->upstream_sock, BUFFER_LINE,
- smtp_upstream_read_socket, NULL, smtp_upstream_err_socket,
+ smtp_upstream_read_socket, smtp_upstream_write_socket, smtp_upstream_err_socket,
&session->ctx->smtp_timeout, session);
session->state = SMTP_STATE_WAIT_UPSTREAM;
session->upstream_state = SMTP_STATE_GREETING;
@@ -159,6 +167,8 @@ read_smtp_command (struct smtp_session *session, f_str_t *line)
{
/* XXX: write dialog implementation */
struct smtp_command *cmd;
+ char outbuf[BUFSIZ];
+ int r;
if (! parse_smtp_command (session, line, &cmd)) {
session->error = SMTP_ERROR_BAD_COMMAND;
@@ -201,10 +211,22 @@ read_smtp_command (struct smtp_session *session, f_str_t *line)
if (session->state == SMTP_STATE_RCPT) {
if (parse_smtp_rcpt (session, cmd)) {
/* Make upstream connection */
- if (!create_smtp_upstream_connection (session)) {
- session->error = SMTP_ERROR_UPSTREAM;
- session->state = SMTP_STATE_CRITICAL_ERROR;
- return FALSE;
+ if (session->upstream == NULL) {
+ if (!create_smtp_upstream_connection (session)) {
+ session->error = SMTP_ERROR_UPSTREAM;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ return FALSE;
+ }
+ }
+ else {
+ /* Send next rcpt to upstream */
+ session->state = SMTP_STATE_WAIT_UPSTREAM;
+ session->upstream_state = SMTP_STATE_BEFORE_DATA;
+ rspamd_dispatcher_restore (session->upstream_dispatcher);
+ r = snprintf (outbuf, sizeof (outbuf), "RCPT TO: ");
+ r += smtp_upstream_write_list (session->rcpt->data, outbuf + r, sizeof (outbuf) - r);
+ session->cur_rcpt = NULL;
+ return rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE);
}
session->state = SMTP_STATE_WAIT_UPSTREAM;
return TRUE;
@@ -222,6 +244,10 @@ read_smtp_command (struct smtp_session *session, f_str_t *line)
if (session->rcpt) {
g_list_free (session->rcpt);
}
+ if (session->upstream) {
+ remove_normal_event (session->s, smtp_upstream_finalize_connection, session);
+ session->upstream = NULL;
+ }
session->state = SMTP_STATE_GREETING;
break;
case SMTP_COMMAND_DATA:
@@ -230,7 +256,19 @@ read_smtp_command (struct smtp_session *session, f_str_t *line)
session->error = SMTP_ERROR_RECIPIENTS;
return FALSE;
}
- session->error = SMTP_ERROR_DATA_OK;
+ if (session->upstream == NULL) {
+ session->error = SMTP_ERROR_UPSTREAM;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ return FALSE;
+ }
+ else {
+ session->upstream_state = SMTP_STATE_DATA;
+ rspamd_dispatcher_restore (session->upstream_dispatcher);
+ r = snprintf (outbuf, sizeof (outbuf), "DATA" CRLF);
+ session->state = SMTP_STATE_WAIT_UPSTREAM;
+ session->error = SMTP_ERROR_DATA_OK;
+ return rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE);
+ }
}
else {
goto improper_sequence;
@@ -250,6 +288,88 @@ improper_sequence:
return FALSE;
}
+static gboolean
+smtp_send_upstream_message (struct smtp_session *session)
+{
+ rspamd_dispatcher_pause (session->dispatcher);
+ rspamd_dispatcher_restore (session->upstream_dispatcher);
+
+ if (! rspamd_dispatcher_sendfile (session->upstream_dispatcher, session->temp_fd, session->temp_size)) {
+ goto err;
+ }
+ session->upstream_state = SMTP_STATE_IN_SENDFILE;
+ session->state = SMTP_STATE_WAIT_UPSTREAM;
+ return TRUE;
+
+err:
+ session->error = SMTP_ERROR_FILE;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+}
+
+static gboolean
+process_smtp_data (struct smtp_session *session)
+{
+ struct stat st;
+ int r;
+
+ if (fstat (session->temp_fd, &st) == -1) {
+ goto err;
+ }
+ /* Now mmap temp file if it is small enough */
+ session->temp_size = st.st_size;
+ if (session->ctx->max_size == 0 || st.st_size < session->ctx->max_size) {
+ session->task = construct_task (session->worker);
+ session->task->fin_callback = smtp_write_socket;
+ session->task->fin_arg = session;
+ session->task->msg = memory_pool_alloc (session->pool, sizeof (f_str_t));
+#ifdef HAVE_MMAP_NOCORE
+ if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED | MAP_NOCORE, session->temp_fd, 0)) == MAP_FAILED) {
+#else
+ if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, session->temp_fd, 0)) == MAP_FAILED) {
+#endif
+ goto err;
+ }
+ session->task->msg->len = st.st_size;
+ if (process_message (session->task) == -1) {
+ msg_err ("cannot process message");
+ munmap (session->task->msg->begin, st.st_size);
+ goto err;
+ }
+ session->task->helo = session->helo;
+ memcpy (&session->task->from_addr, &session->client_addr, sizeof (struct in_addr));
+ session->task->cmd = CMD_CHECK;
+ r = process_filters (session->task);
+ if (r == -1) {
+ munmap (session->task->msg->begin, st.st_size);
+ msg_err ("cannot process filters");
+ goto err;
+ }
+ else if (r == 0) {
+ session->state = SMTP_STATE_END;
+ rspamd_dispatcher_pause (session->dispatcher);
+ }
+ else {
+ process_statfiles (session->task);
+ session->state = SMTP_STATE_END;
+ return smtp_write_socket (session);
+ }
+ }
+ else {
+ return smtp_send_upstream_message (session);
+ }
+
+ return TRUE;
+err:
+ session->error = SMTP_ERROR_FILE;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+}
+
/*
* Callback that is called when there is data to read in buffer
*/
@@ -257,6 +377,8 @@ static gboolean
smtp_read_socket (f_str_t * in, void *arg)
{
struct smtp_session *session = arg;
+ char *p;
+ gboolean do_write;
switch (session->state) {
case SMTP_STATE_RESOLVE_REVERSE:
@@ -275,6 +397,69 @@ smtp_read_socket (f_str_t * in, void *arg)
smtp_write_socket (session);
}
break;
+ case SMTP_STATE_AFTER_DATA:
+ if (in->len == 0) {
+ return TRUE;
+ }
+ p = in->begin + in->len;
+ do_write = TRUE;
+ if (in->len > sizeof (session->data_end)) {
+ /* New data is more than trailer buffer */
+ if (session->data_idx != 0 && write (session->temp_fd, session->data_end, session->data_idx) != session->data_idx) {
+ msg_err ("cannot write to temp file: %s", strerror (errno));
+ session->error = SMTP_ERROR_FILE;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+ }
+ memcpy (session->data_end, p - sizeof (session->data_end) + 1, sizeof (session->data_end));
+ session->data_idx = 5;
+ }
+ else if (session->data_idx + in->len < sizeof (session->data_end)){
+ /* New data is less than trailer buffer plus index */
+ memcpy (session->data_end + session->data_idx, in->begin, in->len);
+ session->data_idx += in->len;
+ do_write = FALSE;
+ }
+ else {
+ /* Save remaining bytes */
+ if (session->data_idx != 0 && write (session->temp_fd, session->data_end, session->data_idx) != session->data_idx) {
+ msg_err ("cannot write to temp file: %s", strerror (errno));
+ session->error = SMTP_ERROR_FILE;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+ }
+ /* Move bytes */
+ session->data_idx = sizeof (session->data_end) - in->len;
+ memmove (session->data_end, session->data_end + (sizeof (session->data_end) - in->len) + 1, sizeof (session->data_end) - in->len);
+ memcpy (session->data_end + session->data_idx, in->begin, in->len);
+ session->data_idx = 5;
+ }
+ if (do_write) {
+ if (memcmp (session->data_end, DATA_END_TRAILER, sizeof (session->data_end)) == 0) {
+ return process_smtp_data (session);
+ }
+ else {
+ if (session->data_idx < in->len) {
+ if (in->len - session->data_idx != 0 &&
+ write (session->temp_fd, in->begin, in->len - session->data_idx) != in->len - session->data_idx) {
+ msg_err ("cannot write to temp file: %s", strerror (errno));
+ session->error = SMTP_ERROR_FILE;
+ session->state = SMTP_STATE_CRITICAL_ERROR;
+ rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+ }
+ }
+ }
+ }
+ break;
+ case SMTP_STATE_WAIT_UPSTREAM:
+ rspamd_dispatcher_pause (session->dispatcher);
+ break;
default:
session->error = make_smtp_error (session, 550, "%s Internal error", "5.5.0");
session->state = SMTP_STATE_ERROR;
@@ -299,6 +484,13 @@ static gboolean
smtp_write_socket (void *arg)
{
struct smtp_session *session = arg;
+ double ms = 0, rs = 0;
+ int r;
+ struct metric_result *metric_res;
+ struct metric *m;
+ char logbuf[1024];
+ gboolean is_spam = FALSE;
+ GList *symbols, *cur;
if (session->state == SMTP_STATE_CRITICAL_ERROR) {
if (session->error != NULL) {
@@ -307,6 +499,47 @@ smtp_write_socket (void *arg)
destroy_session (session->s);
return FALSE;
}
+ else if (session->state == SMTP_STATE_END) {
+ /* Check metric */
+ m = g_hash_table_lookup (session->cfg->metrics, session->ctx->metric);
+ metric_res = g_hash_table_lookup (session->task->results, session->ctx->metric);
+ if (m != NULL && metric_res != NULL) {
+ if (!check_metric_settings (session->task, m, &ms, &rs)) {
+ ms = m->required_score;
+ rs = m->reject_score;
+ }
+ if (metric_res->score >= ms) {
+ is_spam = TRUE;
+ }
+
+ r = snprintf (logbuf, sizeof (logbuf), "msg ok, id: <%s>, ", session->task->message_id);
+ r += snprintf (logbuf + r, sizeof (logbuf) - r, "(%s: %s: [%.2f/%.2f/%.2f] [",
+ (char *)m->name, is_spam ? "T" : "F", metric_res->score, ms, rs);
+ symbols = g_hash_table_get_keys (metric_res->symbols);
+ cur = symbols;
+ while (cur) {
+ if (g_list_next (cur) != NULL) {
+ r += snprintf (logbuf + r, sizeof (logbuf) - r, "%s,", (char *)cur->data);
+ }
+ else {
+ r += snprintf (logbuf + r, sizeof (logbuf) - r, "%s", (char *)cur->data);
+ }
+ cur = g_list_next (cur);
+ }
+ g_list_free (symbols);
+ r += snprintf (logbuf + r, sizeof (logbuf) - r, "]), len: %ld, time: %sms",
+ (long int)session->task->msg->len, calculate_check_time (&session->task->ts, session->cfg->clock_res));
+ msg_info ("%s", logbuf);
+
+ if (is_spam) {
+ rspamd_dispatcher_write (session->dispatcher, session->ctx->reject_message, 0, FALSE, TRUE);
+ rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE);
+ destroy_session (session->s);
+ return FALSE;
+ }
+ }
+ return smtp_send_upstream_message (session);
+ }
else {
if (session->error != NULL) {
rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE);
@@ -501,8 +734,10 @@ accept_socket (int fd, short what, void *arg)
}
session->sock = nfd;
+ session->temp_fd = -1;
session->worker = worker;
session->ctx = worker->ctx;
+ session->cfg = worker->srv->cfg;
session->session_time = time (NULL);
worker->srv->stat->connections_count++;
@@ -762,6 +997,18 @@ config_smtp_worker (struct rspamd_worker *worker)
if ((value = g_hash_table_lookup (worker->cf->params, "smtp_capabilities")) != NULL) {
make_capabilities (ctx, value);
}
+ if ((value = g_hash_table_lookup (worker->cf->params, "smtp_metric")) != NULL) {
+ ctx->metric = memory_pool_strdup (ctx->pool, value);
+ }
+ else {
+ ctx->metric = DEFAULT_METRIC;
+ }
+ if ((value = g_hash_table_lookup (worker->cf->params, "smtp_reject_message")) != NULL) {
+ ctx->reject_message = memory_pool_strdup (ctx->pool, value);
+ }
+ else {
+ ctx->reject_message = DEFAULT_REJECT_MESSAGE;
+ }
/* Set ctx */
worker->ctx = ctx;