diff options
Diffstat (limited to 'src/smtp.c')
-rw-r--r-- | src/smtp.c | 259 |
1 files changed, 253 insertions, 6 deletions
diff --git a/src/smtp.c b/src/smtp.c index 1bf135d30..3f366f7fd 100644 --- a/src/smtp.c +++ b/src/smtp.c @@ -29,6 +29,8 @@ #include "smtp.h" #include "smtp_proto.h" #include "map.h" +#include "message.h" +#include "settings.h" #include "evdns/evdns.h" /* Max line size as it is defined in rfc2822 */ @@ -39,6 +41,9 @@ #define DEFAULT_UPSTREAM_DEAD_TIME 300 #define DEFAULT_UPSTREAM_MAXERRORS 10 + +#define DEFAULT_REJECT_MESSAGE "450 4.5.0 Spam message rejected" + static gboolean smtp_write_socket (void *arg); static sig_atomic_t wanna_die = 0; @@ -89,6 +94,9 @@ free_smtp_session (gpointer arg) } memory_pool_delete (session->pool); close (session->sock); + if (session->temp_fd != -1) { + close (session->temp_fd); + } g_free (session); } } @@ -146,7 +154,7 @@ create_smtp_upstream_connection (struct smtp_session *session) } /* Create a dispatcher for upstream connection */ session->upstream_dispatcher = rspamd_create_dispatcher (session->upstream_sock, BUFFER_LINE, - smtp_upstream_read_socket, NULL, smtp_upstream_err_socket, + smtp_upstream_read_socket, smtp_upstream_write_socket, smtp_upstream_err_socket, &session->ctx->smtp_timeout, session); session->state = SMTP_STATE_WAIT_UPSTREAM; session->upstream_state = SMTP_STATE_GREETING; @@ -159,6 +167,8 @@ read_smtp_command (struct smtp_session *session, f_str_t *line) { /* XXX: write dialog implementation */ struct smtp_command *cmd; + char outbuf[BUFSIZ]; + int r; if (! parse_smtp_command (session, line, &cmd)) { session->error = SMTP_ERROR_BAD_COMMAND; @@ -201,10 +211,22 @@ read_smtp_command (struct smtp_session *session, f_str_t *line) if (session->state == SMTP_STATE_RCPT) { if (parse_smtp_rcpt (session, cmd)) { /* Make upstream connection */ - if (!create_smtp_upstream_connection (session)) { - session->error = SMTP_ERROR_UPSTREAM; - session->state = SMTP_STATE_CRITICAL_ERROR; - return FALSE; + if (session->upstream == NULL) { + if (!create_smtp_upstream_connection (session)) { + session->error = SMTP_ERROR_UPSTREAM; + session->state = SMTP_STATE_CRITICAL_ERROR; + return FALSE; + } + } + else { + /* Send next rcpt to upstream */ + session->state = SMTP_STATE_WAIT_UPSTREAM; + session->upstream_state = SMTP_STATE_BEFORE_DATA; + rspamd_dispatcher_restore (session->upstream_dispatcher); + r = snprintf (outbuf, sizeof (outbuf), "RCPT TO: "); + r += smtp_upstream_write_list (session->rcpt->data, outbuf + r, sizeof (outbuf) - r); + session->cur_rcpt = NULL; + return rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE); } session->state = SMTP_STATE_WAIT_UPSTREAM; return TRUE; @@ -222,6 +244,10 @@ read_smtp_command (struct smtp_session *session, f_str_t *line) if (session->rcpt) { g_list_free (session->rcpt); } + if (session->upstream) { + remove_normal_event (session->s, smtp_upstream_finalize_connection, session); + session->upstream = NULL; + } session->state = SMTP_STATE_GREETING; break; case SMTP_COMMAND_DATA: @@ -230,7 +256,19 @@ read_smtp_command (struct smtp_session *session, f_str_t *line) session->error = SMTP_ERROR_RECIPIENTS; return FALSE; } - session->error = SMTP_ERROR_DATA_OK; + if (session->upstream == NULL) { + session->error = SMTP_ERROR_UPSTREAM; + session->state = SMTP_STATE_CRITICAL_ERROR; + return FALSE; + } + else { + session->upstream_state = SMTP_STATE_DATA; + rspamd_dispatcher_restore (session->upstream_dispatcher); + r = snprintf (outbuf, sizeof (outbuf), "DATA" CRLF); + session->state = SMTP_STATE_WAIT_UPSTREAM; + session->error = SMTP_ERROR_DATA_OK; + return rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE); + } } else { goto improper_sequence; @@ -250,6 +288,88 @@ improper_sequence: return FALSE; } +static gboolean +smtp_send_upstream_message (struct smtp_session *session) +{ + rspamd_dispatcher_pause (session->dispatcher); + rspamd_dispatcher_restore (session->upstream_dispatcher); + + if (! rspamd_dispatcher_sendfile (session->upstream_dispatcher, session->temp_fd, session->temp_size)) { + goto err; + } + session->upstream_state = SMTP_STATE_IN_SENDFILE; + session->state = SMTP_STATE_WAIT_UPSTREAM; + return TRUE; + +err: + session->error = SMTP_ERROR_FILE; + session->state = SMTP_STATE_CRITICAL_ERROR; + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + destroy_session (session->s); + return FALSE; +} + +static gboolean +process_smtp_data (struct smtp_session *session) +{ + struct stat st; + int r; + + if (fstat (session->temp_fd, &st) == -1) { + goto err; + } + /* Now mmap temp file if it is small enough */ + session->temp_size = st.st_size; + if (session->ctx->max_size == 0 || st.st_size < session->ctx->max_size) { + session->task = construct_task (session->worker); + session->task->fin_callback = smtp_write_socket; + session->task->fin_arg = session; + session->task->msg = memory_pool_alloc (session->pool, sizeof (f_str_t)); +#ifdef HAVE_MMAP_NOCORE + if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED | MAP_NOCORE, session->temp_fd, 0)) == MAP_FAILED) { +#else + if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, session->temp_fd, 0)) == MAP_FAILED) { +#endif + goto err; + } + session->task->msg->len = st.st_size; + if (process_message (session->task) == -1) { + msg_err ("cannot process message"); + munmap (session->task->msg->begin, st.st_size); + goto err; + } + session->task->helo = session->helo; + memcpy (&session->task->from_addr, &session->client_addr, sizeof (struct in_addr)); + session->task->cmd = CMD_CHECK; + r = process_filters (session->task); + if (r == -1) { + munmap (session->task->msg->begin, st.st_size); + msg_err ("cannot process filters"); + goto err; + } + else if (r == 0) { + session->state = SMTP_STATE_END; + rspamd_dispatcher_pause (session->dispatcher); + } + else { + process_statfiles (session->task); + session->state = SMTP_STATE_END; + return smtp_write_socket (session); + } + } + else { + return smtp_send_upstream_message (session); + } + + return TRUE; +err: + session->error = SMTP_ERROR_FILE; + session->state = SMTP_STATE_CRITICAL_ERROR; + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + destroy_session (session->s); + return FALSE; +} + /* * Callback that is called when there is data to read in buffer */ @@ -257,6 +377,8 @@ static gboolean smtp_read_socket (f_str_t * in, void *arg) { struct smtp_session *session = arg; + char *p; + gboolean do_write; switch (session->state) { case SMTP_STATE_RESOLVE_REVERSE: @@ -275,6 +397,69 @@ smtp_read_socket (f_str_t * in, void *arg) smtp_write_socket (session); } break; + case SMTP_STATE_AFTER_DATA: + if (in->len == 0) { + return TRUE; + } + p = in->begin + in->len; + do_write = TRUE; + if (in->len > sizeof (session->data_end)) { + /* New data is more than trailer buffer */ + if (session->data_idx != 0 && write (session->temp_fd, session->data_end, session->data_idx) != session->data_idx) { + msg_err ("cannot write to temp file: %s", strerror (errno)); + session->error = SMTP_ERROR_FILE; + session->state = SMTP_STATE_CRITICAL_ERROR; + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + destroy_session (session->s); + return FALSE; + } + memcpy (session->data_end, p - sizeof (session->data_end) + 1, sizeof (session->data_end)); + session->data_idx = 5; + } + else if (session->data_idx + in->len < sizeof (session->data_end)){ + /* New data is less than trailer buffer plus index */ + memcpy (session->data_end + session->data_idx, in->begin, in->len); + session->data_idx += in->len; + do_write = FALSE; + } + else { + /* Save remaining bytes */ + if (session->data_idx != 0 && write (session->temp_fd, session->data_end, session->data_idx) != session->data_idx) { + msg_err ("cannot write to temp file: %s", strerror (errno)); + session->error = SMTP_ERROR_FILE; + session->state = SMTP_STATE_CRITICAL_ERROR; + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + destroy_session (session->s); + return FALSE; + } + /* Move bytes */ + session->data_idx = sizeof (session->data_end) - in->len; + memmove (session->data_end, session->data_end + (sizeof (session->data_end) - in->len) + 1, sizeof (session->data_end) - in->len); + memcpy (session->data_end + session->data_idx, in->begin, in->len); + session->data_idx = 5; + } + if (do_write) { + if (memcmp (session->data_end, DATA_END_TRAILER, sizeof (session->data_end)) == 0) { + return process_smtp_data (session); + } + else { + if (session->data_idx < in->len) { + if (in->len - session->data_idx != 0 && + write (session->temp_fd, in->begin, in->len - session->data_idx) != in->len - session->data_idx) { + msg_err ("cannot write to temp file: %s", strerror (errno)); + session->error = SMTP_ERROR_FILE; + session->state = SMTP_STATE_CRITICAL_ERROR; + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + destroy_session (session->s); + return FALSE; + } + } + } + } + break; + case SMTP_STATE_WAIT_UPSTREAM: + rspamd_dispatcher_pause (session->dispatcher); + break; default: session->error = make_smtp_error (session, 550, "%s Internal error", "5.5.0"); session->state = SMTP_STATE_ERROR; @@ -299,6 +484,13 @@ static gboolean smtp_write_socket (void *arg) { struct smtp_session *session = arg; + double ms = 0, rs = 0; + int r; + struct metric_result *metric_res; + struct metric *m; + char logbuf[1024]; + gboolean is_spam = FALSE; + GList *symbols, *cur; if (session->state == SMTP_STATE_CRITICAL_ERROR) { if (session->error != NULL) { @@ -307,6 +499,47 @@ smtp_write_socket (void *arg) destroy_session (session->s); return FALSE; } + else if (session->state == SMTP_STATE_END) { + /* Check metric */ + m = g_hash_table_lookup (session->cfg->metrics, session->ctx->metric); + metric_res = g_hash_table_lookup (session->task->results, session->ctx->metric); + if (m != NULL && metric_res != NULL) { + if (!check_metric_settings (session->task, m, &ms, &rs)) { + ms = m->required_score; + rs = m->reject_score; + } + if (metric_res->score >= ms) { + is_spam = TRUE; + } + + r = snprintf (logbuf, sizeof (logbuf), "msg ok, id: <%s>, ", session->task->message_id); + r += snprintf (logbuf + r, sizeof (logbuf) - r, "(%s: %s: [%.2f/%.2f/%.2f] [", + (char *)m->name, is_spam ? "T" : "F", metric_res->score, ms, rs); + symbols = g_hash_table_get_keys (metric_res->symbols); + cur = symbols; + while (cur) { + if (g_list_next (cur) != NULL) { + r += snprintf (logbuf + r, sizeof (logbuf) - r, "%s,", (char *)cur->data); + } + else { + r += snprintf (logbuf + r, sizeof (logbuf) - r, "%s", (char *)cur->data); + } + cur = g_list_next (cur); + } + g_list_free (symbols); + r += snprintf (logbuf + r, sizeof (logbuf) - r, "]), len: %ld, time: %sms", + (long int)session->task->msg->len, calculate_check_time (&session->task->ts, session->cfg->clock_res)); + msg_info ("%s", logbuf); + + if (is_spam) { + rspamd_dispatcher_write (session->dispatcher, session->ctx->reject_message, 0, FALSE, TRUE); + rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE); + destroy_session (session->s); + return FALSE; + } + } + return smtp_send_upstream_message (session); + } else { if (session->error != NULL) { rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); @@ -501,8 +734,10 @@ accept_socket (int fd, short what, void *arg) } session->sock = nfd; + session->temp_fd = -1; session->worker = worker; session->ctx = worker->ctx; + session->cfg = worker->srv->cfg; session->session_time = time (NULL); worker->srv->stat->connections_count++; @@ -762,6 +997,18 @@ config_smtp_worker (struct rspamd_worker *worker) if ((value = g_hash_table_lookup (worker->cf->params, "smtp_capabilities")) != NULL) { make_capabilities (ctx, value); } + if ((value = g_hash_table_lookup (worker->cf->params, "smtp_metric")) != NULL) { + ctx->metric = memory_pool_strdup (ctx->pool, value); + } + else { + ctx->metric = DEFAULT_METRIC; + } + if ((value = g_hash_table_lookup (worker->cf->params, "smtp_reject_message")) != NULL) { + ctx->reject_message = memory_pool_strdup (ctx->pool, value); + } + else { + ctx->reject_message = DEFAULT_REJECT_MESSAGE; + } /* Set ctx */ worker->ctx = ctx; |