diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2010-06-10 21:47:22 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2010-06-10 21:47:22 +0400 |
commit | 07082741605e8e048a129bec28695f57263de1e8 (patch) | |
tree | 7c3f92439dfc40cac6c495f052ff3e913aea6709 | |
parent | 1be79df4d51fc2e497a73fc0163de08d406cc1f3 (diff) | |
download | rspamd-07082741605e8e048a129bec28695f57263de1e8.tar.gz rspamd-07082741605e8e048a129bec28695f57263de1e8.zip |
* Check messages received via smtp proxy
* Add support for sendfile in io dispatcher
* Fix issues with compatibility of worker_task and smtp proxy
* Proxy DATA command
-rw-r--r-- | CMakeLists.txt | 2 | ||||
-rw-r--r-- | config.h.in | 7 | ||||
-rw-r--r-- | src/buffer.c | 186 | ||||
-rw-r--r-- | src/buffer.h | 16 | ||||
-rw-r--r-- | src/filter.c | 7 | ||||
-rw-r--r-- | src/main.h | 2 | ||||
-rw-r--r-- | src/smtp.c | 259 | ||||
-rw-r--r-- | src/smtp.h | 13 | ||||
-rw-r--r-- | src/smtp_proto.c | 111 | ||||
-rw-r--r-- | src/smtp_proto.h | 5 |
10 files changed, 573 insertions, 35 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 9e929ec1e..72bf8b97a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -281,6 +281,7 @@ CHECK_INCLUDE_FILES(pwd.h HAVE_PWD_H) CHECK_INCLUDE_FILES(grp.h HAVE_GRP_H) CHECK_INCLUDE_FILES(glob.h HAVE_GLOB_H) CHECK_INCLUDE_FILES(poll.h HAVE_POLL_H) +CHECK_INCLUDE_FILES(sys/sendfile.h HAVE_SYS_SENDFILE_H) IF(HAVE_SYS_WAIT_H) LIST(APPEND CMAKE_REQUIRED_INCLUDES sys/wait.h) @@ -294,6 +295,7 @@ CHECK_FUNCTION_EXISTS(wait4 HAVE_WAIT4) CHECK_FUNCTION_EXISTS(waitpid HAVE_WAITPID) CHECK_FUNCTION_EXISTS(flock HAVE_FLOCK) CHECK_FUNCTION_EXISTS(tanhl HAVE_TANHL) +CHECK_FUNCTION_EXISTS(sendfile HAVE_SENDFILE) CHECK_SYMBOL_EXISTS(PATH_MAX limits.h HAVE_PATH_MAX) CHECK_SYMBOL_EXISTS(MAXPATHLEN sys/param.h HAVE_MAXPATHLEN) diff --git a/config.h.in b/config.h.in index ebb48f508..bbac937c1 100644 --- a/config.h.in +++ b/config.h.in @@ -121,6 +121,9 @@ #cmakedefine BUILD_STATIC 1 +#cmakedefine HAVE_SENDFILE 1 +#cmakedefine HAVE_SYS_SENDFILE_H 1 + #define RVERSION "${RSPAMD_VERSION}" #define RSPAMD_MASTER_SITE_URL "${RSPAMD_MASTER_SITE_URL}" @@ -294,6 +297,10 @@ #define HAVE_SETLOCALE 1 #endif +#ifdef HAVE_SYS_SENDFILE_H +#include <sys/sendfile.h> +#endif + #ifdef WITH_GPERF_TOOLS #include <google/profiler.h> #endif diff --git a/src/buffer.c b/src/buffer.c index 7dd43d2ad..5eb2c81d1 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -36,6 +36,126 @@ dispatcher_error_quark (void) return g_quark_from_static_string ("g-dispatcher-error-quark"); } +static gboolean +sendfile_callback (rspamd_io_dispatcher_t *d) +{ + ssize_t r; + GError *err; + +#ifdef HAVE_SENDFILE + #if defined(FREEBSD) + off_t off = 0; + /* FreeBSD version */ + if (sendfile (d->sendfile_fd, d->fd, d->offset, 0, 0, &off, 0) != 0) { + if (errno != EAGAIN) { + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + d->err_callback (err, d->user_data); + return FALSE; + } + } + else { + debug_ip (d->peer_addr, "partially write data, retry"); + /* Wait for other event */ + d->offset += off; + event_del (d->ev); + event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } + } + else { + if (d->write_callback) { + if (!d->write_callback (d->user_data)) { + debug_ip (d->peer_addr, "callback set wanna_die flag, terminating"); + return FALSE; + } + } + event_del (d->ev); + event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + d->in_sendfile = FALSE; + } + #else + /* Linux version */ + r = sendfile (d->fd, d->sendfile_fd, &d->offset, d->file_size); + if (r == -1) { + if (errno != EAGAIN) { + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + d->err_callback (err, d->user_data); + return FALSE; + } + } + else { + debug_ip (d->peer_addr, "partially write data, retry"); + /* Wait for other event */ + event_del (d->ev); + event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } + } + else if (r + d->offset < d->file_size) { + debug_ip (d->peer_addr, "partially write data, retry"); + /* Wait for other event */ + event_del (d->ev); + event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } + else { + if (d->write_callback) { + if (!d->write_callback (d->user_data)) { + debug_ip (d->peer_addr, "callback set wanna_die flag, terminating"); + return FALSE; + } + } + event_del (d->ev); + event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + d->in_sendfile = FALSE; + } + #endif +#else + r = write (d->fd, d->map, d->file_size - d->offset); + if (r == -1) { + if (errno != EAGAIN) { + if (d->err_callback) { + err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); + d->err_callback (err, d->user_data); + return FALSE; + } + } + else { + debug_ip (d->peer_addr, "partially write data, retry"); + /* Wait for other event */ + event_del (d->ev); + event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } + } + else if (r + d->offset < d->file_size) { + d->offset += r; + debug_ip (d->peer_addr, "partially write data, retry"); + /* Wait for other event */ + event_del (d->ev); + event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } + else { + if (d->write_callback) { + if (!d->write_callback (d->user_data)) { + debug_ip (d->peer_addr, "callback set wanna_die flag, terminating"); + return FALSE; + } + } + event_del (d->ev); + event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + d->in_sendfile = FALSE; + } +#endif + return TRUE; +} + #define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin) static gboolean @@ -139,7 +259,7 @@ read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read) if (d->in_buf == NULL) { d->in_buf = memory_pool_alloc (d->pool, sizeof (rspamd_buffer_t)); - if (d->policy == BUFFER_LINE) { + if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) { d->in_buf->data = fstralloc (d->pool, BUFSIZ); } else { @@ -254,6 +374,22 @@ read_buffers (int fd, rspamd_io_dispatcher_t * d, gboolean skip_read) } } break; + case BUFFER_ANY: + res.begin = d->in_buf->data->begin; + res.len = *len; + if (d->read_callback) { + if (!d->read_callback (&res, d->user_data)) { + return; + } + if (d->policy != saved_policy) { + debug_ip (d->peer_addr, "policy changed during callback, restart buffer's processing"); + read_buffers (fd, d, TRUE); + return; + } + } + d->in_buf->pos = d->in_buf->data->begin; + d->in_buf->data->len = 0; + break; } } @@ -276,14 +412,19 @@ dispatcher_cb (int fd, short what, void *arg) break; case EV_WRITE: /* No data to write, disable further EV_WRITE to this fd */ - if (d->out_buffers == NULL) { - event_del (d->ev); - event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); - event_add (d->ev, d->tv); + if (d->in_sendfile) { + sendfile_callback (d); } else { - /* Delayed write */ - write_buffers (fd, d, TRUE); + if (d->out_buffers == NULL) { + event_del (d->ev); + event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); + event_add (d->ev, d->tv); + } + else { + /* Delayed write */ + write_buffers (fd, d, TRUE); + } } break; case EV_READ: @@ -315,6 +456,7 @@ rspamd_create_dispatcher (int fd, enum io_policy policy, new->tv = NULL; } new->nchars = 0; + new->in_sendfile = FALSE; new->policy = policy; new->read_callback = read_cb; new->write_callback = write_cb; @@ -363,7 +505,7 @@ rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, enum io_policy policy, d->in_buf->pos = d->in_buf->data->begin + t; } } - else if (policy == BUFFER_LINE) { + else if (policy == BUFFER_LINE || policy == BUFFER_ANY) { if (d->in_buf && d->nchars < BUFSIZ) { tmp = fstralloc (d->pool, BUFSIZ); memcpy (tmp->begin, d->in_buf->data->begin, d->in_buf->data->len); @@ -413,6 +555,34 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, void *data, size_t len, gbo return TRUE; } + +gboolean +rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, int fd, size_t len) +{ + if (lseek (fd, 0, SEEK_SET) == -1) { + msg_warn ("lseek failed: %s", strerror (errno)); + return FALSE; + } + + d->offset = 0; + d->in_sendfile = TRUE; + d->sendfile_fd = fd; + d->file_size = len; + +#ifndef HAVE_SENDFILE + #ifdef HAVE_MMAP_NOCORE + if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd, 0)) == MAP_FAILED) { + #else + if ((d->map = mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) { + #endif + msg_warn ("mmap failed: %s", strerror (errno)); + return FALSE; + } +#endif + + return sendfile_callback (d); +} + void rspamd_dispatcher_pause (rspamd_io_dispatcher_t * d) { diff --git a/src/buffer.h b/src/buffer.h index 4cf9de555..9f3897d1c 100644 --- a/src/buffer.h +++ b/src/buffer.h @@ -20,6 +20,7 @@ typedef void (*dispatcher_err_callback_t)(GError *err, void *user_data); enum io_policy { BUFFER_LINE, /**< call handler when we have line ready */ BUFFER_CHARACTER, /**< call handler when we have some characters */ + BUFFER_ANY /**< call handler whenever we got data in buffer */ }; /** @@ -45,6 +46,13 @@ typedef struct rspamd_io_dispatcher_s { dispatcher_write_callback_t write_callback; /**< write callback */ dispatcher_err_callback_t err_callback; /**< error callback */ void *user_data; /**< user's data for callbacks */ + off_t offset; /**< for sendfile use */ + size_t file_size; + int sendfile_fd; + gboolean in_sendfile; /**< whether buffer is in sendfile mode */ +#ifndef HAVE_SENDFILE + void *map; +#endif } rspamd_io_dispatcher_t; /** @@ -87,6 +95,14 @@ gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d, size_t len, gboolean delayed, gboolean allocated); /** + * Send specified descriptor to dispatcher + * @param d pointer to dispatcher's object + * @param fd descriptor of file + * @param len length of data + */ +gboolean rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, int fd, size_t len); + +/** * Pause IO events on dispatcher * @param d pointer to dispatcher's object */ diff --git a/src/filter.c b/src/filter.c index 0a18bd793..c6f936087 100644 --- a/src/filter.c +++ b/src/filter.c @@ -310,7 +310,12 @@ continue_process_filters (struct worker_task *task) /* Process all statfiles */ process_statfiles (task); /* XXX: ugly direct call */ - task->dispatcher->write_callback (task); + if (task->fin_callback) { + task->fin_callback (task->fin_arg); + } + else { + task->dispatcher->write_callback (task); + } return 1; } diff --git a/src/main.h b/src/main.h index 9371652d3..d5f971468 100644 --- a/src/main.h +++ b/src/main.h @@ -225,6 +225,8 @@ struct worker_task { gboolean view_checked; gboolean pass_all_filters; /**< pass task throught every rule */ uint32_t parser_recursion; /**< for avoiding recursion stack overflow */ + gboolean (*fin_callback)(void *arg); /**< calback for filters finalizing */ + void *fin_arg; /**< argument for fin callback */ }; /** diff --git a/src/smtp.c b/src/smtp.c index 1bf135d30..3f366f7fd 100644 --- a/src/smtp.c +++ b/src/smtp.c @@ -29,6 +29,8 @@ #include "smtp.h" #include "smtp_proto.h" #include "map.h" +#include "message.h" +#include "settings.h" #include "evdns/evdns.h" /* Max line size as it is defined in rfc2822 */ @@ -39,6 +41,9 @@ #define DEFAULT_UPSTREAM_DEAD_TIME 300 #define DEFAULT_UPSTREAM_MAXERRORS 10 + +#define DEFAULT_REJECT_MESSAGE "450 4.5.0 Spam message rejected" + static gboolean smtp_write_socket (void *arg); static sig_atomic_t wanna_die = 0; @@ -89,6 +94,9 @@ free_smtp_session (gpointer arg) } memory_pool_delete (session->pool); close (session->sock); + if (session->temp_fd != -1) { + close (session->temp_fd); + } g_free (session); } } @@ -146,7 +154,7 @@ create_smtp_upstream_connection (struct smtp_session *session) } /* Create a dispatcher for upstream connection */ session->upstream_dispatcher = rspamd_create_dispatcher (session->upstream_sock, BUFFER_LINE, - smtp_upstream_read_socket, NULL, smtp_upstream_err_socket, + smtp_upstream_read_socket, smtp_upstream_write_socket, smtp_upstream_err_socket, &session->ctx->smtp_timeout, session); session->state = SMTP_STATE_WAIT_UPSTREAM; session->upstream_state = SMTP_STATE_GREETING; @@ -159,6 +167,8 @@ read_smtp_command (struct smtp_session *session, f_str_t *line) { /* XXX: write dialog implementation */ struct smtp_command *cmd; + char outbuf[BUFSIZ]; + int r; if (! parse_smtp_command (session, line, &cmd)) { session->error = SMTP_ERROR_BAD_COMMAND; @@ -201,10 +211,22 @@ read_smtp_command (struct smtp_session *session, f_str_t *line) if (session->state == SMTP_STATE_RCPT) { if (parse_smtp_rcpt (session, cmd)) { /* Make upstream connection */ - if (!create_smtp_upstream_connection (session)) { - session->error = SMTP_ERROR_UPSTREAM; - session->state = SMTP_STATE_CRITICAL_ERROR; - return FALSE; + if (session->upstream == NULL) { + if (!create_smtp_upstream_connection (session)) { + session->error = SMTP_ERROR_UPSTREAM; + session->state = SMTP_STATE_CRITICAL_ERROR; + return FALSE; + } + } + else { + /* Send next rcpt to upstream */ + session->state = SMTP_STATE_WAIT_UPSTREAM; + session->upstream_state = SMTP_STATE_BEFORE_DATA; + rspamd_dispatcher_restore (session->upstream_dispatcher); + r = snprintf (outbuf, sizeof (outbuf), "RCPT TO: "); + r += smtp_upstream_write_list (session->rcpt->data, outbuf + r, sizeof (outbuf) - r); + session->cur_rcpt = NULL; + return rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE); } session->state = SMTP_STATE_WAIT_UPSTREAM; return TRUE; @@ -222,6 +244,10 @@ read_smtp_command (struct smtp_session *session, f_str_t *line) if (session->rcpt) { g_list_free (session->rcpt); } + if (session->upstream) { + remove_normal_event (session->s, smtp_upstream_finalize_connection, session); + session->upstream = NULL; + } session->state = SMTP_STATE_GREETING; break; case SMTP_COMMAND_DATA: @@ -230,7 +256,19 @@ read_smtp_command (struct smtp_session *session, f_str_t *line) session->error = SMTP_ERROR_RECIPIENTS; return FALSE; } - session->error = SMTP_ERROR_DATA_OK; + if (session->upstream == NULL) { + session->error = SMTP_ERROR_UPSTREAM; + session->state = SMTP_STATE_CRITICAL_ERROR; + return FALSE; + } + else { + session->upstream_state = SMTP_STATE_DATA; + rspamd_dispatcher_restore (session->upstream_dispatcher); + r = snprintf (outbuf, sizeof (outbuf), "DATA" CRLF); + session->state = SMTP_STATE_WAIT_UPSTREAM; + session->error = SMTP_ERROR_DATA_OK; + return rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE); + } } else { goto improper_sequence; @@ -250,6 +288,88 @@ improper_sequence: return FALSE; } +static gboolean +smtp_send_upstream_message (struct smtp_session *session) +{ + rspamd_dispatcher_pause (session->dispatcher); + rspamd_dispatcher_restore (session->upstream_dispatcher); + + if (! rspamd_dispatcher_sendfile (session->upstream_dispatcher, session->temp_fd, session->temp_size)) { + goto err; + } + session->upstream_state = SMTP_STATE_IN_SENDFILE; + session->state = SMTP_STATE_WAIT_UPSTREAM; + return TRUE; + +err: + session->error = SMTP_ERROR_FILE; + session->state = SMTP_STATE_CRITICAL_ERROR; + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + destroy_session (session->s); + return FALSE; +} + +static gboolean +process_smtp_data (struct smtp_session *session) +{ + struct stat st; + int r; + + if (fstat (session->temp_fd, &st) == -1) { + goto err; + } + /* Now mmap temp file if it is small enough */ + session->temp_size = st.st_size; + if (session->ctx->max_size == 0 || st.st_size < session->ctx->max_size) { + session->task = construct_task (session->worker); + session->task->fin_callback = smtp_write_socket; + session->task->fin_arg = session; + session->task->msg = memory_pool_alloc (session->pool, sizeof (f_str_t)); +#ifdef HAVE_MMAP_NOCORE + if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED | MAP_NOCORE, session->temp_fd, 0)) == MAP_FAILED) { +#else + if ((session->task->msg->begin = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, session->temp_fd, 0)) == MAP_FAILED) { +#endif + goto err; + } + session->task->msg->len = st.st_size; + if (process_message (session->task) == -1) { + msg_err ("cannot process message"); + munmap (session->task->msg->begin, st.st_size); + goto err; + } + session->task->helo = session->helo; + memcpy (&session->task->from_addr, &session->client_addr, sizeof (struct in_addr)); + session->task->cmd = CMD_CHECK; + r = process_filters (session->task); + if (r == -1) { + munmap (session->task->msg->begin, st.st_size); + msg_err ("cannot process filters"); + goto err; + } + else if (r == 0) { + session->state = SMTP_STATE_END; + rspamd_dispatcher_pause (session->dispatcher); + } + else { + process_statfiles (session->task); + session->state = SMTP_STATE_END; + return smtp_write_socket (session); + } + } + else { + return smtp_send_upstream_message (session); + } + + return TRUE; +err: + session->error = SMTP_ERROR_FILE; + session->state = SMTP_STATE_CRITICAL_ERROR; + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + destroy_session (session->s); + return FALSE; +} + /* * Callback that is called when there is data to read in buffer */ @@ -257,6 +377,8 @@ static gboolean smtp_read_socket (f_str_t * in, void *arg) { struct smtp_session *session = arg; + char *p; + gboolean do_write; switch (session->state) { case SMTP_STATE_RESOLVE_REVERSE: @@ -275,6 +397,69 @@ smtp_read_socket (f_str_t * in, void *arg) smtp_write_socket (session); } break; + case SMTP_STATE_AFTER_DATA: + if (in->len == 0) { + return TRUE; + } + p = in->begin + in->len; + do_write = TRUE; + if (in->len > sizeof (session->data_end)) { + /* New data is more than trailer buffer */ + if (session->data_idx != 0 && write (session->temp_fd, session->data_end, session->data_idx) != session->data_idx) { + msg_err ("cannot write to temp file: %s", strerror (errno)); + session->error = SMTP_ERROR_FILE; + session->state = SMTP_STATE_CRITICAL_ERROR; + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + destroy_session (session->s); + return FALSE; + } + memcpy (session->data_end, p - sizeof (session->data_end) + 1, sizeof (session->data_end)); + session->data_idx = 5; + } + else if (session->data_idx + in->len < sizeof (session->data_end)){ + /* New data is less than trailer buffer plus index */ + memcpy (session->data_end + session->data_idx, in->begin, in->len); + session->data_idx += in->len; + do_write = FALSE; + } + else { + /* Save remaining bytes */ + if (session->data_idx != 0 && write (session->temp_fd, session->data_end, session->data_idx) != session->data_idx) { + msg_err ("cannot write to temp file: %s", strerror (errno)); + session->error = SMTP_ERROR_FILE; + session->state = SMTP_STATE_CRITICAL_ERROR; + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + destroy_session (session->s); + return FALSE; + } + /* Move bytes */ + session->data_idx = sizeof (session->data_end) - in->len; + memmove (session->data_end, session->data_end + (sizeof (session->data_end) - in->len) + 1, sizeof (session->data_end) - in->len); + memcpy (session->data_end + session->data_idx, in->begin, in->len); + session->data_idx = 5; + } + if (do_write) { + if (memcmp (session->data_end, DATA_END_TRAILER, sizeof (session->data_end)) == 0) { + return process_smtp_data (session); + } + else { + if (session->data_idx < in->len) { + if (in->len - session->data_idx != 0 && + write (session->temp_fd, in->begin, in->len - session->data_idx) != in->len - session->data_idx) { + msg_err ("cannot write to temp file: %s", strerror (errno)); + session->error = SMTP_ERROR_FILE; + session->state = SMTP_STATE_CRITICAL_ERROR; + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + destroy_session (session->s); + return FALSE; + } + } + } + } + break; + case SMTP_STATE_WAIT_UPSTREAM: + rspamd_dispatcher_pause (session->dispatcher); + break; default: session->error = make_smtp_error (session, 550, "%s Internal error", "5.5.0"); session->state = SMTP_STATE_ERROR; @@ -299,6 +484,13 @@ static gboolean smtp_write_socket (void *arg) { struct smtp_session *session = arg; + double ms = 0, rs = 0; + int r; + struct metric_result *metric_res; + struct metric *m; + char logbuf[1024]; + gboolean is_spam = FALSE; + GList *symbols, *cur; if (session->state == SMTP_STATE_CRITICAL_ERROR) { if (session->error != NULL) { @@ -307,6 +499,47 @@ smtp_write_socket (void *arg) destroy_session (session->s); return FALSE; } + else if (session->state == SMTP_STATE_END) { + /* Check metric */ + m = g_hash_table_lookup (session->cfg->metrics, session->ctx->metric); + metric_res = g_hash_table_lookup (session->task->results, session->ctx->metric); + if (m != NULL && metric_res != NULL) { + if (!check_metric_settings (session->task, m, &ms, &rs)) { + ms = m->required_score; + rs = m->reject_score; + } + if (metric_res->score >= ms) { + is_spam = TRUE; + } + + r = snprintf (logbuf, sizeof (logbuf), "msg ok, id: <%s>, ", session->task->message_id); + r += snprintf (logbuf + r, sizeof (logbuf) - r, "(%s: %s: [%.2f/%.2f/%.2f] [", + (char *)m->name, is_spam ? "T" : "F", metric_res->score, ms, rs); + symbols = g_hash_table_get_keys (metric_res->symbols); + cur = symbols; + while (cur) { + if (g_list_next (cur) != NULL) { + r += snprintf (logbuf + r, sizeof (logbuf) - r, "%s,", (char *)cur->data); + } + else { + r += snprintf (logbuf + r, sizeof (logbuf) - r, "%s", (char *)cur->data); + } + cur = g_list_next (cur); + } + g_list_free (symbols); + r += snprintf (logbuf + r, sizeof (logbuf) - r, "]), len: %ld, time: %sms", + (long int)session->task->msg->len, calculate_check_time (&session->task->ts, session->cfg->clock_res)); + msg_info ("%s", logbuf); + + if (is_spam) { + rspamd_dispatcher_write (session->dispatcher, session->ctx->reject_message, 0, FALSE, TRUE); + rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE); + destroy_session (session->s); + return FALSE; + } + } + return smtp_send_upstream_message (session); + } else { if (session->error != NULL) { rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); @@ -501,8 +734,10 @@ accept_socket (int fd, short what, void *arg) } session->sock = nfd; + session->temp_fd = -1; session->worker = worker; session->ctx = worker->ctx; + session->cfg = worker->srv->cfg; session->session_time = time (NULL); worker->srv->stat->connections_count++; @@ -762,6 +997,18 @@ config_smtp_worker (struct rspamd_worker *worker) if ((value = g_hash_table_lookup (worker->cf->params, "smtp_capabilities")) != NULL) { make_capabilities (ctx, value); } + if ((value = g_hash_table_lookup (worker->cf->params, "smtp_metric")) != NULL) { + ctx->metric = memory_pool_strdup (ctx->pool, value); + } + else { + ctx->metric = DEFAULT_METRIC; + } + if ((value = g_hash_table_lookup (worker->cf->params, "smtp_reject_message")) != NULL) { + ctx->reject_message = memory_pool_strdup (ctx->pool, value); + } + else { + ctx->reject_message = DEFAULT_REJECT_MESSAGE; + } /* Set ctx */ worker->ctx = ctx; diff --git a/src/smtp.h b/src/smtp.h index 36319bb4c..4208df4f2 100644 --- a/src/smtp.h +++ b/src/smtp.h @@ -29,6 +29,9 @@ struct smtp_worker_ctx { gboolean use_xclient; gboolean helo_required; char *smtp_capabilities; + char *reject_message; + size_t max_size; + char *metric; }; enum rspamd_smtp_state { @@ -41,9 +44,10 @@ enum rspamd_smtp_state { SMTP_STATE_RCPT, SMTP_STATE_BEFORE_DATA, SMTP_STATE_DATA, - SMTP_STATE_EOD, + SMTP_STATE_AFTER_DATA, SMTP_STATE_END, SMTP_STATE_WAIT_UPSTREAM, + SMTP_STATE_IN_SENDFILE, SMTP_STATE_ERROR, SMTP_STATE_CRITICAL_ERROR, SMTP_STATE_WRITE_ERROR @@ -51,6 +55,7 @@ enum rspamd_smtp_state { struct smtp_session { struct smtp_worker_ctx *ctx; + struct config_file *cfg; memory_pool_t *pool; enum rspamd_smtp_state state; @@ -62,6 +67,8 @@ struct smtp_session { char *error; int sock; int upstream_sock; + int temp_fd; + size_t temp_size; time_t session_time; gchar *helo; @@ -74,6 +81,10 @@ struct smtp_session { rspamd_io_dispatcher_t *upstream_dispatcher; struct smtp_upstream *upstream; + + char data_end[5]; + char data_idx; + gboolean resolved; gboolean esmtp; }; diff --git a/src/smtp_proto.c b/src/smtp_proto.c index 82fffa690..bef52b6b5 100644 --- a/src/smtp_proto.c +++ b/src/smtp_proto.c @@ -163,7 +163,7 @@ parse_smtp_command (struct smtp_session *session, f_str_t *line, struct smtp_com break; case SMTP_PARSE_ARGUMENT: if (ch == ' ' || ch == ':' || ch == CR || ch == LF || i == line->len - 1) { - if (i == line->len - 1) { + if (i == line->len - 1 && (ch != ' ' && ch != CR && ch != LF)) { p ++; } arg->len = p - c; @@ -329,14 +329,14 @@ parse_smtp_rcpt (struct smtp_session *session, struct smtp_command *cmd) /* Return -1 if there are some error, 1 if all is ok and 0 in case of incomplete reply */ static int -check_smtp_ustream_reply (f_str_t *in) +check_smtp_ustream_reply (f_str_t *in, char success_code) { char *p; /* Check for 250 at the begin of line */ if (in->len >= sizeof ("220 ") - 1) { p = in->begin; - if (p[0] == '2') { + if (p[0] == success_code) { /* Last reply line */ if (p[3] == ' ') { return 1; @@ -353,7 +353,7 @@ check_smtp_ustream_reply (f_str_t *in) return -1; } -static size_t +size_t smtp_upstream_write_list (GList *args, char *buf, size_t buflen) { GList *cur = args; @@ -374,22 +374,35 @@ smtp_upstream_write_list (GList *args, char *buf, size_t buflen) } gboolean +smtp_upstream_write_socket (void *arg) +{ + struct smtp_session *session = arg; + + if (session->upstream_state == SMTP_STATE_IN_SENDFILE) { + session->upstream_state = SMTP_STATE_END; + return rspamd_dispatcher_write (session->upstream_dispatcher, DATA_END_TRAILER, sizeof (DATA_END_TRAILER) - 1, FALSE, TRUE); + } + + return TRUE; +} + +gboolean smtp_upstream_read_socket (f_str_t * in, void *arg) { struct smtp_session *session = arg; - char outbuf[BUFSIZ]; + char outbuf[BUFSIZ], *tmppattern; int r; switch (session->upstream_state) { case SMTP_STATE_GREETING: - r = check_smtp_ustream_reply (in); + r = check_smtp_ustream_reply (in, '2'); if (r == -1) { - session->error = memory_pool_alloc (session->pool, in->len + 3); + session->error = memory_pool_alloc (session->pool, in->len + 1); g_strlcpy (session->error, in->begin, in->len + 1); /* XXX: assume upstream errors as critical errors */ session->state = SMTP_STATE_CRITICAL_ERROR; rspamd_dispatcher_restore (session->dispatcher); - rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE); rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE); destroy_session (session->s); return FALSE; @@ -417,14 +430,14 @@ smtp_upstream_read_socket (f_str_t * in, void *arg) } break; case SMTP_STATE_HELO: - r = check_smtp_ustream_reply (in); + r = check_smtp_ustream_reply (in, '2'); if (r == -1) { session->error = memory_pool_alloc (session->pool, in->len + 1); g_strlcpy (session->error, in->begin, in->len + 1); /* XXX: assume upstream errors as critical errors */ session->state = SMTP_STATE_CRITICAL_ERROR; rspamd_dispatcher_restore (session->dispatcher); - rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE); rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE); destroy_session (session->s); return FALSE; @@ -443,14 +456,14 @@ smtp_upstream_read_socket (f_str_t * in, void *arg) } break; case SMTP_STATE_FROM: - r = check_smtp_ustream_reply (in); + r = check_smtp_ustream_reply (in, '2'); if (r == -1) { session->error = memory_pool_alloc (session->pool, in->len + 1); g_strlcpy (session->error, in->begin, in->len + 1); /* XXX: assume upstream errors as critical errors */ session->state = SMTP_STATE_CRITICAL_ERROR; rspamd_dispatcher_restore (session->dispatcher); - rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE); rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE); destroy_session (session->s); return FALSE; @@ -463,14 +476,14 @@ smtp_upstream_read_socket (f_str_t * in, void *arg) } break; case SMTP_STATE_RCPT: - r = check_smtp_ustream_reply (in); + r = check_smtp_ustream_reply (in, '2'); if (r == -1) { session->error = memory_pool_alloc (session->pool, in->len + 1); g_strlcpy (session->error, in->begin, in->len + 1); /* XXX: assume upstream errors as critical errors */ session->state = SMTP_STATE_CRITICAL_ERROR; rspamd_dispatcher_restore (session->dispatcher); - rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE); rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE); destroy_session (session->s); return FALSE; @@ -485,14 +498,20 @@ smtp_upstream_read_socket (f_str_t * in, void *arg) } break; case SMTP_STATE_BEFORE_DATA: - r = check_smtp_ustream_reply (in); + r = check_smtp_ustream_reply (in, '2'); if (r == -1) { session->error = memory_pool_alloc (session->pool, in->len + 1); g_strlcpy (session->error, in->begin, in->len + 1); rspamd_dispatcher_restore (session->dispatcher); - rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE); rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE); - session->rcpt = g_list_delete_link (session->rcpt, session->cur_rcpt); + if (session->cur_rcpt) { + session->rcpt = g_list_delete_link (session->rcpt, session->cur_rcpt); + } + else { + session->rcpt = g_list_delete_link (session->rcpt, session->rcpt); + } + session->state = SMTP_STATE_RCPT; return TRUE; } else if (r == 1) { @@ -500,6 +519,7 @@ smtp_upstream_read_socket (f_str_t * in, void *arg) r = snprintf (outbuf, sizeof (outbuf), "RCPT TO: "); r += smtp_upstream_write_list (session->cur_rcpt, outbuf + r, sizeof (outbuf) - r); session->cur_rcpt = g_list_next (session->cur_rcpt); + rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE); } else { session->upstream_state = SMTP_STATE_DATA; @@ -508,15 +528,68 @@ smtp_upstream_read_socket (f_str_t * in, void *arg) session->error = memory_pool_alloc (session->pool, in->len + 1); g_strlcpy (session->error, in->begin, in->len + 1); /* Write to client */ - rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + rspamd_dispatcher_write (session->dispatcher, session->error, in->len, FALSE, TRUE); rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE); if (session->state == SMTP_STATE_WAIT_UPSTREAM) { rspamd_dispatcher_restore (session->dispatcher); session->state = SMTP_STATE_RCPT; } - return rspamd_dispatcher_write (session->upstream_dispatcher, outbuf, r, FALSE, FALSE); } break; + case SMTP_STATE_DATA: + r = check_smtp_ustream_reply (in, '3'); + if (r == -1) { + session->error = memory_pool_alloc (session->pool, in->len + 1); + g_strlcpy (session->error, in->begin, in->len + 1); + /* XXX: assume upstream errors as critical errors */ + session->state = SMTP_STATE_CRITICAL_ERROR; + rspamd_dispatcher_restore (session->dispatcher); + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE); + destroy_session (session->s); + return FALSE; + } + else if (r == 1) { + r = strlen (session->cfg->temp_dir) + sizeof ("/rspamd-XXXXXX.tmp"); + tmppattern = alloca (r); + snprintf (tmppattern, r, "%s/rspamd-XXXXXX.tmp", session->cfg->temp_dir); + session->temp_fd = g_mkstemp_full (tmppattern, O_RDWR, S_IWUSR | S_IRUSR); + if (session->temp_fd == -1) { + session->error = SMTP_ERROR_FILE; + session->state = SMTP_STATE_CRITICAL_ERROR; + rspamd_dispatcher_restore (session->dispatcher); + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + destroy_session (session->s); + return FALSE; + } + session->state = SMTP_STATE_AFTER_DATA; + session->error = SMTP_ERROR_DATA_OK; + rspamd_dispatcher_restore (session->dispatcher); + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + rspamd_dispatcher_pause (session->upstream_dispatcher); + rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_ANY, 0); + session->data_idx = 0; + memset (session->data_end, 0, sizeof (session->data_end)); + return TRUE; + } + break; + case SMTP_STATE_END: + session->error = memory_pool_alloc (session->pool, in->len + 1); + g_strlcpy (session->error, in->begin, in->len + 1); + session->state = SMTP_STATE_END; + rspamd_dispatcher_restore (session->dispatcher); + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE); + destroy_session (session->s); + return FALSE; + default: + msg_err ("got upstream reply at unexpected state: %d, reply: %V", session->upstream_state, in); + session->state = SMTP_STATE_CRITICAL_ERROR; + rspamd_dispatcher_restore (session->dispatcher); + rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE); + rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE); + destroy_session (session->s); + return FALSE; } return TRUE; diff --git a/src/smtp_proto.h b/src/smtp_proto.h index c78cfb094..e850e4a5d 100644 --- a/src/smtp_proto.h +++ b/src/smtp_proto.h @@ -11,9 +11,12 @@ #define SMTP_ERROR_RECIPIENTS "554 No valid recipients" CRLF #define SMTP_ERROR_UNIMPLIMENTED "502 Command not implemented" CRLF #define SMTP_ERROR_UPSTREAM "421 Service not available, closing transmission channel" CRLF +#define SMTP_ERROR_FILE "420 Service not available, filesystem error" CRLF #define SMTP_ERROR_OK "250 Requested mail action okay, completed" CRLF #define SMTP_ERROR_DATA_OK "354 Start mail input; end with <CRLF>.<CRLF>" CRLF +#define DATA_END_TRAILER CRLF "." CRLF + struct smtp_command { enum { @@ -40,7 +43,9 @@ gboolean parse_smtp_rcpt (struct smtp_session *session, struct smtp_command *cmd /* Upstream SMTP */ gboolean smtp_upstream_read_socket (f_str_t * in, void *arg); +gboolean smtp_upstream_write_socket (void *arg); void smtp_upstream_err_socket (GError *err, void *arg); void smtp_upstream_finalize_connection (gpointer data); +size_t smtp_upstream_write_list (GList *args, char *buf, size_t buflen); #endif |