diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-10-17 15:28:38 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-10-17 15:28:38 +0100 |
commit | 463d6092aabb1b4721833f884d04dbb67570fa0e (patch) | |
tree | 05e083c47f4ccdfe77676f2ffd6ea1394e95575e | |
parent | 9ecc5ea5e15a44f830aff1e1b1ad92b683bca53a (diff) | |
download | rspamd-463d6092aabb1b4721833f884d04dbb67570fa0e.tar.gz rspamd-463d6092aabb1b4721833f884d04dbb67570fa0e.zip |
[Rework] Massive removal of legacy code
-rw-r--r-- | src/libmime/smtp_proto.c | 772 | ||||
-rw-r--r-- | src/libmime/smtp_proto.h | 105 | ||||
-rw-r--r-- | src/libmime/smtp_utils.c | 354 | ||||
-rw-r--r-- | src/libmime/smtp_utils.h | 50 | ||||
-rw-r--r-- | src/libserver/buffer.c | 804 | ||||
-rw-r--r-- | src/libserver/buffer.h | 154 | ||||
-rw-r--r-- | src/lmtp.c | 365 | ||||
-rw-r--r-- | src/lmtp.h | 18 | ||||
-rw-r--r-- | src/lmtp_proto.c | 758 | ||||
-rw-r--r-- | src/lmtp_proto.h | 44 | ||||
-rw-r--r-- | src/lua/lua_buffer.c | 381 | ||||
-rw-r--r-- | src/smtp.c | 1029 | ||||
-rw-r--r-- | src/smtp.h | 126 | ||||
-rw-r--r-- | src/smtp_proxy.c | 1119 |
14 files changed, 0 insertions, 6079 deletions
diff --git a/src/libmime/smtp_proto.c b/src/libmime/smtp_proto.c deleted file mode 100644 index 9a73b0649..000000000 --- a/src/libmime/smtp_proto.c +++ /dev/null @@ -1,772 +0,0 @@ -/*- - * Copyright 2016 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "config.h" -#include "rspamd.h" -#include "cfg_file.h" -#include "util.h" -#include "smtp.h" -#include "smtp_proto.h" -#include "smtp_utils.h" -#include "unix-std.h" - -gchar * -make_smtp_error (rspamd_mempool_t *pool, - gint error_code, - const gchar *format, - ...) -{ - va_list vp; - gchar *result = NULL, *p; - size_t len; - - va_start (vp, format); - len = g_printf_string_upper_bound (format, vp); - va_end (vp); - va_start (vp, format); - len += sizeof ("65535 ") + sizeof (CRLF) - 1; - result = rspamd_mempool_alloc (pool, len); - p = result + rspamd_snprintf (result, len, "%d ", error_code); - p = rspamd_vsnprintf (p, len - (p - result), format, vp); - *p++ = CR; *p++ = LF; *p = '\0'; - va_end (vp); - - return result; -} - - -gboolean -parse_smtp_command (struct smtp_session *session, - rspamd_ftok_t *line, - struct smtp_command **cmd) -{ - enum { - SMTP_PARSE_START = 0, - SMTP_PARSE_SPACES, - SMTP_PARSE_ARGUMENT, - SMTP_PARSE_DONE - } state; - const gchar *p, *c; - gchar ch, cmd_buf[4]; - guint i; - rspamd_ftok_t *arg = NULL; - struct smtp_command *pcmd; - - if (line->len == 0) { - return FALSE; - } - - state = SMTP_PARSE_START; - c = line->begin; - p = c; - *cmd = rspamd_mempool_alloc0 (session->pool, sizeof (struct smtp_command)); - pcmd = *cmd; - - for (i = 0; i < line->len; i++, p++) { - ch = *p; - switch (state) { - case SMTP_PARSE_START: - if (ch == ' ' || ch == ':' || ch == CR || ch == LF || i == - line->len - 1) { - if (i == line->len - 1) { - p++; - } - if (p - c == 4) { - cmd_buf[0] = g_ascii_toupper (c[0]); - cmd_buf[1] = g_ascii_toupper (c[1]); - cmd_buf[2] = g_ascii_toupper (c[2]); - cmd_buf[3] = g_ascii_toupper (c[3]); - - if (memcmp (cmd_buf, "HELO", 4) == 0) { - pcmd->command = SMTP_COMMAND_HELO; - } - else if (memcmp (cmd_buf, "EHLO", 4) == 0) { - pcmd->command = SMTP_COMMAND_EHLO; - } - else if (memcmp (cmd_buf, "MAIL", 4) == 0) { - pcmd->command = SMTP_COMMAND_MAIL; - } - else if (memcmp (cmd_buf, "RCPT", 4) == 0) { - pcmd->command = SMTP_COMMAND_RCPT; - } - else if (memcmp (cmd_buf, "DATA", 4) == 0) { - pcmd->command = SMTP_COMMAND_DATA; - } - else if (memcmp (cmd_buf, "QUIT", 4) == 0) { - pcmd->command = SMTP_COMMAND_QUIT; - } - else if (memcmp (cmd_buf, "NOOP", 4) == 0) { - pcmd->command = SMTP_COMMAND_NOOP; - } - else if (memcmp (cmd_buf, "EXPN", 4) == 0) { - pcmd->command = SMTP_COMMAND_EXPN; - } - else if (memcmp (cmd_buf, "RSET", 4) == 0) { - pcmd->command = SMTP_COMMAND_RSET; - } - else if (memcmp (cmd_buf, "HELP", 4) == 0) { - pcmd->command = SMTP_COMMAND_HELP; - } - else if (memcmp (cmd_buf, "VRFY", 4) == 0) { - pcmd->command = SMTP_COMMAND_VRFY; - } - else { - msg_info ("invalid command: %*s", 4, cmd_buf); - return FALSE; - } - } - else { - /* Invalid command */ - msg_info ("invalid command: %*s", 4, c); - return FALSE; - } - /* Now check what we have */ - if (ch == ' ' || ch == ':') { - state = SMTP_PARSE_SPACES; - } - else if (ch == CR) { - state = SMTP_PARSE_DONE; - } - else if (ch == LF) { - return TRUE; - } - } - else if ((ch < 'A' || ch > 'Z') && (ch < 'a' || ch > 'z')) { - msg_info ("invalid letter code in SMTP command: %d", (gint)ch); - return FALSE; - } - break; - case SMTP_PARSE_SPACES: - if (ch == CR) { - state = SMTP_PARSE_DONE; - } - else if (ch == LF) { - goto end; - } - else if (ch != ' ' && ch != ':') { - state = SMTP_PARSE_ARGUMENT; - arg = rspamd_mempool_alloc (session->pool, sizeof (rspamd_fstring_t)); - c = p; - } - break; - case SMTP_PARSE_ARGUMENT: - if (ch == ' ' || ch == ':' || ch == CR || ch == LF || i == - line->len - 1) { - if (i == line->len - 1 && (ch != ' ' && ch != CR && ch != LF)) { - p++; - } - arg->len = p - c; - arg->begin = rspamd_mempool_alloc (session->pool, arg->len); - memcpy ((gchar *)arg->begin, c, arg->len); - pcmd->args = g_list_prepend (pcmd->args, arg); - - if (ch == ' ' || ch == ':') { - state = SMTP_PARSE_SPACES; - } - else if (ch == CR) { - state = SMTP_PARSE_DONE; - } - else { - goto end; - } - } - break; - case SMTP_PARSE_DONE: - if (ch == LF) { - goto end; - } - msg_info ("CR without LF in SMTP command"); - return FALSE; - } - } - -end: - if (pcmd->args) { - pcmd->args = g_list_reverse (pcmd->args); - rspamd_mempool_add_destructor (session->pool, - (rspamd_mempool_destruct_t)g_list_free, - pcmd->args); - } - return TRUE; -} - -static gboolean -check_smtp_path (rspamd_ftok_t *path) -{ - guint i; - const gchar *p; - - p = path->begin; - if (*p != '<' || path->len < 2) { - return FALSE; - } - for (i = 0; i < path->len; i++, p++) { - if (*p == '>' && i != path->len - 1) { - return FALSE; - } - } - - return *(p - 1) == '>'; -} - -gboolean -parse_smtp_helo (struct smtp_session *session, struct smtp_command *cmd) -{ - rspamd_ftok_t *arg; - - if (cmd->args == NULL) { - session->error = SMTP_ERROR_BAD_ARGUMENTS; - return FALSE; - } - arg = cmd->args->data; - session->helo = rspamd_mempool_alloc (session->pool, arg->len + 1); - rspamd_strlcpy (session->helo, arg->begin, arg->len + 1); - /* Now try to write reply */ - if (cmd->command == SMTP_COMMAND_HELO) { - /* No ESMTP */ - session->error = SMTP_ERROR_OK; - session->esmtp = FALSE; - return TRUE; - } - else { - /* Try to write all capabilities */ - session->esmtp = TRUE; - if (session->ctx->smtp_capabilities == NULL) { - session->error = SMTP_ERROR_OK; - return TRUE; - } - else { - session->error = session->ctx->smtp_capabilities; - return TRUE; - } - } - - return FALSE; -} - -gboolean -parse_smtp_from (struct smtp_session *session, struct smtp_command *cmd) -{ - rspamd_ftok_t *arg; - GList *cur = cmd->args; - - if (cmd->args == NULL) { - session->error = SMTP_ERROR_BAD_ARGUMENTS; - return FALSE; - } - arg = cur->data; - /* First argument MUST be FROM */ - if (arg->len != 4 || ( - g_ascii_toupper (arg->begin[0]) != 'F' || - g_ascii_toupper (arg->begin[1]) != 'R' || - g_ascii_toupper (arg->begin[2]) != 'O' || - g_ascii_toupper (arg->begin[3]) != 'M')) { - session->error = SMTP_ERROR_BAD_ARGUMENTS; - return FALSE; - } - /* Next one is from address */ - cur = g_list_next (cur); - if (cur == NULL) { - session->error = SMTP_ERROR_BAD_ARGUMENTS; - return FALSE; - } - arg = cur->data; - if (check_smtp_path (arg)) { - session->from = cur; - } - else { - session->error = SMTP_ERROR_BAD_ARGUMENTS; - return FALSE; - } - - return TRUE; -} - -gboolean -parse_smtp_rcpt (struct smtp_session *session, struct smtp_command *cmd) -{ - rspamd_ftok_t *arg; - GList *cur = cmd->args; - - if (cmd->args == NULL) { - session->error = SMTP_ERROR_BAD_ARGUMENTS; - return FALSE; - } - arg = cur->data; - /* First argument MUST be FROM */ - if (arg->len != 2 || ( - g_ascii_toupper (arg->begin[0]) != 'T' || - g_ascii_toupper (arg->begin[1]) != 'O')) { - session->error = SMTP_ERROR_BAD_ARGUMENTS; - return FALSE; - } - /* Next one is from address */ - cur = g_list_next (cur); - if (cur == NULL) { - session->error = SMTP_ERROR_BAD_ARGUMENTS; - return FALSE; - } - arg = cur->data; - if (check_smtp_path (arg)) { - session->rcpt = g_list_prepend (session->rcpt, cur); - } - else { - session->error = SMTP_ERROR_BAD_ARGUMENTS; - return FALSE; - } - - return TRUE; - -} - -/* Return -1 if there are some error, 1 if all is ok and 0 in case of incomplete reply */ -static gint -check_smtp_ustream_reply (rspamd_ftok_t *in, gchar success_code) -{ - const gchar *p; - - /* Check for 250 at the begin of line */ - if (in->len >= sizeof ("220 ") - 1) { - p = in->begin; - if (p[0] == success_code) { - /* Last reply line */ - if (p[3] == ' ') { - return 1; - } - else { - return 0; - } - } - else { - return -1; - } - } - - return -1; -} - -size_t -smtp_upstream_write_list (GList *args, gchar *buf, size_t buflen) -{ - GList *cur = args; - size_t r = 0; - rspamd_ftok_t *arg; - - while (cur && r < buflen - 3) { - arg = cur->data; - r += rspamd_snprintf (buf + r, buflen - r, " %T", arg); - cur = g_list_next (cur); - } - - buf[r++] = CR; - buf[r++] = LF; - buf[r] = '\0'; - - return r; -} - -gboolean -smtp_upstream_write_socket (void *arg) -{ - struct smtp_session *session = arg; - - if (session->upstream_state == SMTP_STATE_IN_SENDFILE) { - session->upstream_state = SMTP_STATE_AFTER_DATA; - return rspamd_dispatcher_write (session->upstream_dispatcher, - CRLF DATA_END_TRAILER, - sizeof (CRLF DATA_END_TRAILER) - 1, - FALSE, - TRUE); - } - - return TRUE; -} - -gboolean -smtp_upstream_read_socket (rspamd_ftok_t * in, void *arg) -{ - struct smtp_session *session = arg; - gchar outbuf[BUFSIZ]; - gint r; - - msg_debug ("in: %T, state: %d", in, session->upstream_state); - switch (session->upstream_state) { - case SMTP_STATE_GREETING: - r = check_smtp_ustream_reply (in, '2'); - if (r == -1) { - session->error = rspamd_mempool_alloc (session->pool, in->len + 1); - rspamd_strlcpy (session->error, in->begin, in->len + 1); - /* XXX: assume upstream errors as critical errors */ - session->state = SMTP_STATE_CRITICAL_ERROR; - rspamd_dispatcher_restore (session->dispatcher); - if (!rspamd_dispatcher_write (session->dispatcher, session->error, - in->len, FALSE, TRUE)) { - goto err; - } - if (!rspamd_dispatcher_write (session->dispatcher, CRLF, - sizeof (CRLF) - 1, FALSE, TRUE)) { - goto err; - } - rspamd_session_destroy (session->s); - return FALSE; - } - else if (r == 1) { - if (session->ctx->use_xclient) { - r = rspamd_snprintf (outbuf, - sizeof (outbuf), - "XCLIENT NAME=%s ADDR=%s" CRLF, - session->resolved ? session->hostname : "[UNDEFINED]", - inet_ntoa (session->client_addr)); - session->upstream_state = SMTP_STATE_HELO; - return rspamd_dispatcher_write (session->upstream_dispatcher, - outbuf, - r, - FALSE, - FALSE); - } - else { - session->upstream_state = SMTP_STATE_FROM; - if (session->helo) { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s %s" CRLF, - session->esmtp ? "EHLO" : "HELO", - session->helo); - } - else { - return smtp_upstream_read_socket (in, arg); - } - return rspamd_dispatcher_write (session->upstream_dispatcher, - outbuf, - r, - FALSE, - FALSE); - } - } - break; - case SMTP_STATE_HELO: - r = check_smtp_ustream_reply (in, '2'); - if (r == -1) { - session->error = rspamd_mempool_alloc (session->pool, in->len + 1); - rspamd_strlcpy (session->error, in->begin, in->len + 1); - /* XXX: assume upstream errors as critical errors */ - session->state = SMTP_STATE_CRITICAL_ERROR; - rspamd_dispatcher_restore (session->dispatcher); - if (!rspamd_dispatcher_write (session->dispatcher, session->error, - in->len, FALSE, TRUE)) { - goto err; - } - if (!rspamd_dispatcher_write (session->dispatcher, CRLF, - sizeof (CRLF) - 1, FALSE, TRUE)) { - goto err; - } - rspamd_session_destroy (session->s); - return FALSE; - } - else if (r == 1) { - session->upstream_state = SMTP_STATE_FROM; - if (session->helo) { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "%s %s" CRLF, - session->esmtp ? "EHLO" : "HELO", - session->helo); - } - else { - return smtp_upstream_read_socket (in, arg); - } - return rspamd_dispatcher_write (session->upstream_dispatcher, - outbuf, - r, - FALSE, - FALSE); - } - break; - case SMTP_STATE_FROM: - r = check_smtp_ustream_reply (in, '2'); - if (r == -1) { - session->error = rspamd_mempool_alloc (session->pool, in->len + 1); - rspamd_strlcpy (session->error, in->begin, in->len + 1); - /* XXX: assume upstream errors as critical errors */ - session->state = SMTP_STATE_CRITICAL_ERROR; - rspamd_dispatcher_restore (session->dispatcher); - if (!rspamd_dispatcher_write (session->dispatcher, session->error, - in->len, FALSE, TRUE)) { - goto err; - } - if (!rspamd_dispatcher_write (session->dispatcher, CRLF, - sizeof (CRLF) - 1, FALSE, TRUE)) { - goto err; - } - rspamd_session_destroy (session->s); - return FALSE; - } - else if (r == 1) { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "MAIL FROM: "); - r += - smtp_upstream_write_list (session->from, - outbuf + r, - sizeof (outbuf) - r); - session->upstream_state = SMTP_STATE_RCPT; - return rspamd_dispatcher_write (session->upstream_dispatcher, - outbuf, - r, - FALSE, - FALSE); - } - break; - case SMTP_STATE_RCPT: - r = check_smtp_ustream_reply (in, '2'); - if (r == -1) { - session->error = rspamd_mempool_alloc (session->pool, in->len + 1); - rspamd_strlcpy (session->error, in->begin, in->len + 1); - /* XXX: assume upstream errors as critical errors */ - session->state = SMTP_STATE_CRITICAL_ERROR; - rspamd_dispatcher_restore (session->dispatcher); - if (!rspamd_dispatcher_write (session->dispatcher, session->error, - in->len, FALSE, TRUE)) { - goto err; - } - if (!rspamd_dispatcher_write (session->dispatcher, CRLF, - sizeof (CRLF) - 1, FALSE, TRUE)) { - goto err; - } - rspamd_session_destroy (session->s); - return FALSE; - } - else if (r == 1) { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "RCPT TO: "); - session->cur_rcpt = g_list_first (session->rcpt); - r += smtp_upstream_write_list (session->cur_rcpt->data, - outbuf + r, - sizeof (outbuf) - r); - session->cur_rcpt = g_list_next (session->cur_rcpt); - session->upstream_state = SMTP_STATE_BEFORE_DATA; - return rspamd_dispatcher_write (session->upstream_dispatcher, - outbuf, - r, - FALSE, - FALSE); - } - break; - case SMTP_STATE_BEFORE_DATA: - r = check_smtp_ustream_reply (in, '2'); - if (r == -1) { - session->error = rspamd_mempool_alloc (session->pool, in->len + 1); - rspamd_strlcpy (session->error, in->begin, in->len + 1); - rspamd_dispatcher_restore (session->dispatcher); - if (!rspamd_dispatcher_write (session->dispatcher, session->error, - in->len, FALSE, TRUE)) { - goto err; - } - if (!rspamd_dispatcher_write (session->dispatcher, CRLF, - sizeof (CRLF) - 1, FALSE, TRUE)) { - goto err; - } - if (session->cur_rcpt) { - session->rcpt = g_list_delete_link (session->rcpt, - session->cur_rcpt); - } - else { - session->rcpt = - g_list_delete_link (session->rcpt, session->rcpt); - } - session->errors++; - session->state = SMTP_STATE_RCPT; - return TRUE; - } - else if (r == 1) { - if (session->cur_rcpt != NULL) { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "RCPT TO: "); - r += smtp_upstream_write_list (session->cur_rcpt, - outbuf + r, - sizeof (outbuf) - r); - session->cur_rcpt = g_list_next (session->cur_rcpt); - if (!rspamd_dispatcher_write (session->upstream_dispatcher, - outbuf, r, FALSE, FALSE)) { - goto err; - } - } - else { - session->upstream_state = SMTP_STATE_DATA; - rspamd_dispatcher_pause (session->upstream_dispatcher); - } - session->error = rspamd_mempool_alloc (session->pool, in->len + 1); - rspamd_strlcpy (session->error, in->begin, in->len + 1); - /* Write to client */ - if (!rspamd_dispatcher_write (session->dispatcher, session->error, - in->len, FALSE, TRUE)) { - goto err; - } - if (!rspamd_dispatcher_write (session->dispatcher, CRLF, - sizeof (CRLF) - 1, FALSE, TRUE)) { - goto err; - } - if (session->state == SMTP_STATE_WAIT_UPSTREAM) { - rspamd_dispatcher_restore (session->dispatcher); - session->state = SMTP_STATE_RCPT; - } - } - break; - case SMTP_STATE_DATA: - r = check_smtp_ustream_reply (in, '3'); - if (r == -1) { - session->error = rspamd_mempool_alloc (session->pool, in->len + 1); - rspamd_strlcpy (session->error, in->begin, in->len + 1); - /* XXX: assume upstream errors as critical errors */ - session->state = SMTP_STATE_CRITICAL_ERROR; - rspamd_dispatcher_restore (session->dispatcher); - if (!rspamd_dispatcher_write (session->dispatcher, session->error, - 0, FALSE, TRUE)) { - goto err; - } - if (!rspamd_dispatcher_write (session->dispatcher, CRLF, - sizeof (CRLF) - 1, FALSE, TRUE)) { - goto err; - } - rspamd_session_destroy (session->s); - return FALSE; - } - else if (r == 1) { - if (!make_smtp_tempfile (session)) { - session->error = SMTP_ERROR_FILE; - session->state = SMTP_STATE_CRITICAL_ERROR; - rspamd_dispatcher_restore (session->dispatcher); - if (!rspamd_dispatcher_write (session->dispatcher, - session->error, 0, FALSE, TRUE)) { - goto err; - } - rspamd_session_destroy (session->s); - return FALSE; - } - session->state = SMTP_STATE_AFTER_DATA; - session->error = SMTP_ERROR_DATA_OK; - rspamd_dispatcher_restore (session->dispatcher); - if (!rspamd_dispatcher_write (session->dispatcher, session->error, - 0, FALSE, TRUE)) { - goto err; - } - rspamd_dispatcher_pause (session->upstream_dispatcher); - rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, 0); - session->dispatcher->strip_eol = FALSE; - return TRUE; - } - break; - case SMTP_STATE_AFTER_DATA: - session->error = rspamd_mempool_alloc (session->pool, in->len + 1); - rspamd_strlcpy (session->error, in->begin, in->len + 1); - session->state = SMTP_STATE_DATA; - rspamd_dispatcher_restore (session->dispatcher); - if (!rspamd_dispatcher_write (session->dispatcher, session->error, 0, - FALSE, TRUE)) { - goto err; - } - if (!rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - - 1, FALSE, TRUE)) { - goto err; - } - if (!rspamd_dispatcher_write (session->upstream_dispatcher, "QUIT" CRLF, - sizeof ("QUIT" CRLF) - 1, FALSE, TRUE)) { - goto err; - } - session->upstream_state = SMTP_STATE_END; - return TRUE; - break; - case SMTP_STATE_END: - r = check_smtp_ustream_reply (in, '5'); - if (r == -1) { - session->error = rspamd_mempool_alloc (session->pool, in->len + 1); - rspamd_strlcpy (session->error, in->begin, in->len + 1); - /* XXX: assume upstream errors as critical errors */ - session->state = SMTP_STATE_CRITICAL_ERROR; - rspamd_dispatcher_restore (session->dispatcher); - if (!rspamd_dispatcher_write (session->dispatcher, session->error, - 0, FALSE, TRUE)) { - goto err; - } - if (!rspamd_dispatcher_write (session->dispatcher, CRLF, - sizeof (CRLF) - 1, FALSE, TRUE)) { - goto err; - } - rspamd_session_destroy (session->s); - return FALSE; - } - else { - rspamd_session_remove_event (session->s, - (event_finalizer_t)smtp_upstream_finalize_connection, - session); - } - return FALSE; - break; - default: - msg_err ("got upstream reply at unexpected state: %d, reply: %T", - session->upstream_state, - in); - session->state = SMTP_STATE_CRITICAL_ERROR; - rspamd_dispatcher_restore (session->dispatcher); - if (!rspamd_dispatcher_write (session->dispatcher, session->error, 0, - FALSE, TRUE)) { - goto err; - } - if (!rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - - 1, FALSE, TRUE)) { - goto err; - } - rspamd_session_destroy (session->s); - return FALSE; - } - - return TRUE; -err: - msg_warn ("write error occured"); - return FALSE; -} - -void -smtp_upstream_err_socket (GError *err, void *arg) -{ - struct smtp_session *session = arg; - - msg_info ("abnormally closing connection with upstream %s, error: %s", - rspamd_upstream_name (session->upstream), - err->message); - session->error = SMTP_ERROR_UPSTREAM; - session->state = SMTP_STATE_CRITICAL_ERROR; - /* XXX: assume upstream errors as critical errors */ - rspamd_dispatcher_restore (session->dispatcher); - if (!rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, - TRUE)) { - return; - } - if (!rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, - FALSE, TRUE)) { - return; - } - rspamd_upstream_fail (session->upstream); - rspamd_session_destroy (session->s); -} - -void -smtp_upstream_finalize_connection (gpointer data) -{ - struct smtp_session *session = data; - - if (session->state != SMTP_STATE_CRITICAL_ERROR) { - if (!rspamd_dispatcher_write (session->upstream_dispatcher, "QUIT" CRLF, - 0, FALSE, TRUE)) { - msg_warn ("cannot send correctly closing message to upstream"); - } - } - rspamd_remove_dispatcher (session->upstream_dispatcher); - session->upstream_dispatcher = NULL; - close (session->upstream_sock); - session->upstream_sock = -1; -} diff --git a/src/libmime/smtp_proto.h b/src/libmime/smtp_proto.h deleted file mode 100644 index 7ede6f803..000000000 --- a/src/libmime/smtp_proto.h +++ /dev/null @@ -1,105 +0,0 @@ -#ifndef RSPAMD_SMTP_PROTO_H -#define RSPAMD_SMTP_PROTO_H - -#include "config.h" - -/* SMTP errors */ -#define SMTP_ERROR_BAD_COMMAND "500 Syntax error, command unrecognized" CRLF -#define SMTP_ERROR_BAD_ARGUMENTS "501 Syntax error in parameters or arguments" \ - CRLF -#define SMTP_ERROR_SEQUENCE "503 Bad sequence of commands" CRLF -#define SMTP_ERROR_RECIPIENTS "554 No valid recipients" CRLF -#define SMTP_ERROR_UNIMPLIMENTED "502 Command not implemented" CRLF -#define SMTP_ERROR_LIMIT "505 Too many errors. Aborting." CRLF -#define SMTP_ERROR_UPSTREAM \ - "421 Service not available, closing transmission channel" CRLF -#define SMTP_ERROR_FILE "420 Service not available, filesystem error" CRLF -#define SMTP_ERROR_OK "250 Requested mail action okay, completed" CRLF -#define SMTP_ERROR_DATA_OK "354 Start mail input; end with <CRLF>.<CRLF>" CRLF - -#define DATA_END_TRAILER "." CRLF - -#define XCLIENT_HOST_UNAVAILABLE "[UNAVAILABLE]" -#define XCLIENT_HOST_TEMPFAIL "[TEMPUNAVAIL]" - -#define MAX_SMTP_UPSTREAMS 128 - -enum smtp_command_type { - SMTP_COMMAND_HELO = 0, - SMTP_COMMAND_EHLO, - SMTP_COMMAND_QUIT, - SMTP_COMMAND_NOOP, - SMTP_COMMAND_MAIL, - SMTP_COMMAND_RCPT, - SMTP_COMMAND_RSET, - SMTP_COMMAND_DATA, - SMTP_COMMAND_VRFY, - SMTP_COMMAND_EXPN, - SMTP_COMMAND_HELP -}; -struct smtp_command { - enum smtp_command_type command; - GList *args; -}; - -/* - * Generate SMTP error message - */ -gchar * make_smtp_error (rspamd_mempool_t *pool, - gint error_code, - const gchar *format, - ...); - -/* - * Parse a single SMTP command - */ -gboolean parse_smtp_command (struct smtp_session *session, - rspamd_ftok_t *line, - struct smtp_command **cmd); - -/* - * Parse HELO command - */ -gboolean parse_smtp_helo (struct smtp_session *session, - struct smtp_command *cmd); - -/* - * Parse MAIL command - */ -gboolean parse_smtp_from (struct smtp_session *session, - struct smtp_command *cmd); - -/* - * Parse RCPT command - */ -gboolean parse_smtp_rcpt (struct smtp_session *session, - struct smtp_command *cmd); - -/* Upstream SMTP */ - -/* - * Read a line from SMTP upstream - */ -gboolean smtp_upstream_read_socket (rspamd_ftok_t * in, void *arg); - -/* - * Write to SMTP upstream - */ -gboolean smtp_upstream_write_socket (void *arg); - -/* - * Error handler for SMTP upstream - */ -void smtp_upstream_err_socket (GError *err, void *arg); - -/* - * Terminate connection with upstream - */ -void smtp_upstream_finalize_connection (gpointer data); - -/* - * Write a list of strings to the upstream - */ -size_t smtp_upstream_write_list (GList *args, gchar *buf, size_t buflen); - -#endif diff --git a/src/libmime/smtp_utils.c b/src/libmime/smtp_utils.c deleted file mode 100644 index 9244213d7..000000000 --- a/src/libmime/smtp_utils.c +++ /dev/null @@ -1,354 +0,0 @@ -/*- - * Copyright 2016 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "config.h" -#include "rspamd.h" -#include "smtp.h" -#include "unix-std.h" - -void -free_smtp_session (gpointer arg) -{ - struct smtp_session *session = arg; - - if (session) { - if (session->task) { - if (session->task->msg.begin) { - munmap ((gpointer)session->task->msg.begin, - session->task->msg.len); - } - rspamd_task_free (session->task); - } - if (session->rcpt) { - g_list_free (session->rcpt); - } - if (session->dispatcher) { - rspamd_remove_dispatcher (session->dispatcher); - } - close (session->sock); - if (session->temp_name != NULL) { - unlink (session->temp_name); - } - if (session->temp_fd != -1) { - close (session->temp_fd); - } - rspamd_mempool_delete (session->pool); - g_free (session); - } -} - -gboolean -create_smtp_upstream_connection (struct smtp_session *session) -{ - struct upstream *selected; - - /* Try to select upstream */ - selected = rspamd_upstream_get (session->ctx->upstreams, - RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); - if (selected == NULL) { - msg_err ("no upstreams suitable found"); - return FALSE; - } - - session->upstream = selected; - - /* Now try to create socket */ - session->upstream_sock = rspamd_inet_address_connect ( - rspamd_upstream_addr (selected), SOCK_STREAM, TRUE); - if (session->upstream_sock == -1) { - msg_err ("cannot make a connection to %s", rspamd_upstream_name (selected)); - rspamd_upstream_fail (selected); - return FALSE; - } - /* Create a dispatcher for upstream connection */ - session->upstream_dispatcher = rspamd_create_dispatcher (session->ev_base, - session->upstream_sock, - BUFFER_LINE, - smtp_upstream_read_socket, - smtp_upstream_write_socket, - smtp_upstream_err_socket, - &session->ctx->smtp_timeout, - session); - session->state = SMTP_STATE_WAIT_UPSTREAM; - session->upstream_state = SMTP_STATE_GREETING; - rspamd_session_add_event (session->s, - (event_finalizer_t)smtp_upstream_finalize_connection, - session, - g_quark_from_static_string ("smtp proxy")); - return TRUE; -} - -gboolean -smtp_send_upstream_message (struct smtp_session *session) -{ - rspamd_dispatcher_pause (session->dispatcher); - rspamd_dispatcher_restore (session->upstream_dispatcher); - - session->upstream_state = SMTP_STATE_IN_SENDFILE; - session->state = SMTP_STATE_WAIT_UPSTREAM; - if (!rspamd_dispatcher_sendfile (session->upstream_dispatcher, - session->temp_fd, session->temp_size)) { - msg_err ("sendfile failed: %s", strerror (errno)); - goto err; - } - return TRUE; - -err: - session->error = SMTP_ERROR_FILE; - session->state = SMTP_STATE_CRITICAL_ERROR; - if (!rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, - TRUE)) { - return FALSE; - } - rspamd_session_destroy (session->s); - return FALSE; -} - -struct smtp_metric_callback_data { - struct smtp_session *session; - enum rspamd_metric_action action; - struct metric_result *res; - gchar *log_buf; - gint log_offset; - gint log_size; - gboolean alive; -}; - -static void -smtp_metric_symbols_callback (gpointer key, gpointer value, void *user_data) -{ - struct smtp_metric_callback_data *cd = user_data; - - cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, - cd->log_size - cd->log_offset, - "%s,", - (gchar *)key); -} - -static void -smtp_metric_callback (gpointer key, gpointer value, gpointer ud) -{ - struct smtp_metric_callback_data *cd = ud; - struct metric_result *metric_res = value; - enum rspamd_metric_action action = METRIC_ACTION_NOACTION; - double ms = 0, rs = 0; - gboolean is_spam = FALSE; - struct rspamd_task *task; - - task = cd->session->task; - - /* XXX rewrite */ - ms = metric_res->metric->actions[METRIC_ACTION_REJECT].score; - rs = metric_res->metric->actions[METRIC_ACTION_REJECT].score; -#if 0 - if (!check_metric_settings (metric_res, &ms, &rs)) { - ms = metric_res->metric->actions[METRIC_ACTION_REJECT].score; - rs = metric_res->metric->actions[METRIC_ACTION_REJECT].score; - } - if (!check_metric_action_settings (task, metric_res, metric_res->score, - &action)) { - action = - check_metric_action (metric_res->score, ms, metric_res->metric); - } -#endif - if (metric_res->score >= ms) { - is_spam = 1; - } - if (action < cd->action) { - cd->action = action; - cd->res = metric_res; - } - - if (!RSPAMD_TASK_IS_SKIPPED (task)) { - cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, - cd->log_size - cd->log_offset, - "(%s: %c (%s): [%.2f/%.2f/%.2f] [", - (gchar *)key, - is_spam ? 'T' : 'F', - rspamd_action_to_str (action), - metric_res->score, - ms, - rs); - } - else { - cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, - cd->log_size - cd->log_offset, - "(%s: %c (default): [%.2f/%.2f/%.2f] [", - (gchar *)key, - 'S', - metric_res->score, - ms, - rs); - - } - g_hash_table_foreach (metric_res->symbols, smtp_metric_symbols_callback, - cd); - /* Remove last , from log buf */ - if (cd->log_buf[cd->log_offset - 1] == ',') { - cd->log_buf[--cd->log_offset] = '\0'; - } - - cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, - cd->log_size - cd->log_offset, - "]), len: %z, time: %s,", - task->msg.len, - rspamd_log_check_time (task->time_real, task->time_virtual, - task->cfg->clock_res)); -} - -gboolean -make_smtp_tempfile (struct smtp_session *session) -{ - gsize r; - - r = strlen (session->cfg->temp_dir) + sizeof ("/rspamd-XXXXXX"); - session->temp_name = rspamd_mempool_alloc (session->pool, r); - rspamd_snprintf (session->temp_name, - r, - "%s/rspamd-XXXXXX", - session->cfg->temp_dir); -#ifdef HAVE_MKSTEMP - /* Umask is set before */ - session->temp_fd = mkstemp (session->temp_name); -#else - session->temp_fd = g_mkstemp_full (session->temp_name, - O_RDWR, - S_IWUSR | S_IRUSR); -#endif - if (session->temp_fd == -1) { - msg_err ("mkstemp error: %s", strerror (errno)); - - return FALSE; - } - - return TRUE; -} - -gboolean -write_smtp_reply (struct smtp_session *session) -{ - gchar logbuf[1024], *new_subject; - const gchar *old_subject; - struct smtp_metric_callback_data cd; - GMimeStream *stream; - gint old_fd, sublen; - - /* Check metrics */ - cd.session = session; - cd.action = METRIC_ACTION_NOACTION; - cd.res = NULL; - cd.log_buf = logbuf; - cd.log_offset = rspamd_snprintf (logbuf, - sizeof (logbuf), - "id: <%s>, qid: <%s>, ", - session->task->message_id, - session->task->queue_id); - cd.log_size = sizeof (logbuf); - if (session->task->user) { - cd.log_offset += rspamd_snprintf (logbuf + cd.log_offset, - sizeof (logbuf) - cd.log_offset, - "user: %s, ", - session->task->user); - } - - g_hash_table_foreach (session->task->results, smtp_metric_callback, &cd); - - msg_info ("%s", logbuf); - - if (cd.action <= METRIC_ACTION_REJECT) { - if (!rspamd_dispatcher_write (session->dispatcher, - session->ctx->reject_message, 0, FALSE, TRUE)) { - return FALSE; - } - if (!rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - - 1, FALSE, TRUE)) { - return FALSE; - } - rspamd_session_destroy (session->s); - return FALSE; - } - else if (cd.action <= METRIC_ACTION_ADD_HEADER || cd.action <= - METRIC_ACTION_REWRITE_SUBJECT) { - old_fd = session->temp_fd; - if (!make_smtp_tempfile (session)) { - session->error = SMTP_ERROR_FILE; - session->state = SMTP_STATE_CRITICAL_ERROR; - rspamd_dispatcher_restore (session->dispatcher); - if (!rspamd_dispatcher_write (session->dispatcher, session->error, - 0, FALSE, TRUE)) { - goto err; - } - rspamd_session_destroy (session->s); - return FALSE; - } - - if (cd.action <= METRIC_ACTION_REWRITE_SUBJECT) { - /* XXX: add this action */ - old_subject = g_mime_message_get_subject (session->task->message); - if (old_subject != NULL) { - sublen = strlen (old_subject) + sizeof (SPAM_SUBJECT); - new_subject = rspamd_mempool_alloc (session->pool, sublen); - rspamd_snprintf (new_subject, - sublen, - "%s%s", - SPAM_SUBJECT, - old_subject); - } - else { - new_subject = SPAM_SUBJECT; - } - g_mime_message_set_subject (session->task->message, new_subject); - } - else if (cd.action <= METRIC_ACTION_ADD_HEADER) { -#ifndef GMIME24 - g_mime_message_add_header (session->task->message, "X-Spam", - "true"); -#else - g_mime_object_append_header (GMIME_OBJECT ( - session->task->message), "X-Spam", "true"); -#endif - } - stream = g_mime_stream_fs_new (session->temp_fd); - g_mime_stream_fs_set_owner (GMIME_STREAM_FS (stream), FALSE); - close (old_fd); - - if (g_mime_object_write_to_stream (GMIME_OBJECT (session->task->message), - stream) == -1) { - msg_err ("cannot write MIME object to stream: %s", - strerror (errno)); - session->error = SMTP_ERROR_FILE; - session->state = SMTP_STATE_CRITICAL_ERROR; - rspamd_dispatcher_restore (session->dispatcher); - if (!rspamd_dispatcher_write (session->dispatcher, session->error, - 0, FALSE, TRUE)) { - goto err; - } - rspamd_session_destroy (session->s); - return FALSE; - } - g_object_unref (stream); - } - /* XXX: Add other actions */ - return smtp_send_upstream_message (session); -err: - session->error = SMTP_ERROR_FILE; - session->state = SMTP_STATE_CRITICAL_ERROR; - if (!rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, - TRUE)) { - return FALSE; - } - rspamd_session_destroy (session->s); - return FALSE; -} diff --git a/src/libmime/smtp_utils.h b/src/libmime/smtp_utils.h deleted file mode 100644 index f72831b6b..000000000 --- a/src/libmime/smtp_utils.h +++ /dev/null @@ -1,50 +0,0 @@ -#ifndef SMTP_UTILS_H_ -#define SMTP_UTILS_H_ - -#include "config.h" - -/** - * @file smtp_utils.h - * Contains utilities for smtp protocol handling - */ - -struct smtp_upstream { - const gchar *name; - gchar *addr; - guint16 port; - gboolean is_unix; -}; - -#define MAX_SMTP_UPSTREAMS 128 - -struct smtp_session; - -/** - * Send message to upstream - * @param session session object - */ -gboolean smtp_send_upstream_message (struct smtp_session *session); - -/** - * Create connection to upstream - * @param session session object - */ -gboolean create_smtp_upstream_connection (struct smtp_session *session); - -/** - * Create temporary file for smtp session - */ -gboolean make_smtp_tempfile (struct smtp_session *session); - -/** - * Write reply to upstream - * @param session session object - */ -gboolean write_smtp_reply (struct smtp_session *session); - -/** - * Frees smtp session object - */ -void free_smtp_session (gpointer arg); - -#endif /* SMTP_UTILS_H_ */ diff --git a/src/libserver/buffer.c b/src/libserver/buffer.c deleted file mode 100644 index 03be2e9a4..000000000 --- a/src/libserver/buffer.c +++ /dev/null @@ -1,804 +0,0 @@ -/*- - * Copyright 2016 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "config.h" -#include "buffer.h" -#include "rspamd.h" -#include "unix-std.h" -#ifdef HAVE_SYS_SENDFILE_H -#include <sys/sendfile.h> -#endif -#include "utlist.h" - -#define G_DISPATCHER_ERROR dispatcher_error_quark () -#define debug_ip(...) rspamd_conditional_debug (NULL, \ - NULL, \ - d->pool->tag.tagname, d->pool->tag.uid, \ - G_STRFUNC, \ - __VA_ARGS__) - -static void dispatcher_cb (gint fd, short what, void *arg); - -static inline GQuark -dispatcher_error_quark (void) -{ - return g_quark_from_static_string ("g-dispatcher-error-quark"); -} - -static gboolean -sendfile_callback (rspamd_io_dispatcher_t *d) -{ - - GError *err; - -#ifdef HAVE_SENDFILE -# if defined(FREEBSD) || defined(DARWIN) - off_t off = 0; - #if defined(FREEBSD) - /* FreeBSD version */ - if (sendfile (d->sendfile_fd, d->fd, d->offset, 0, NULL, &off, 0) != 0) { - #elif defined(DARWIN) - /* Darwin version */ - if (sendfile (d->sendfile_fd, d->fd, d->offset, &off, NULL, 0) != 0) { - #endif - if (errno != EAGAIN) { - if (d->err_callback) { - err = - g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( - errno)); - d->err_callback (err, d->user_data); - return FALSE; - } - } - else { - debug_ip ("partially write data, retry"); - /* Wait for other event */ - d->offset += off; - event_del (d->ev); - event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - } - } - else { - if (d->write_callback) { - if (!d->write_callback (d->user_data)) { - debug_ip ("callback set wanna_die flag, terminating"); - return FALSE; - } - } - event_del (d->ev); - event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, - (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - d->in_sendfile = FALSE; - } -# else - ssize_t r; - /* Linux version */ - r = sendfile (d->fd, d->sendfile_fd, &d->offset, d->file_size); - if (r == -1) { - if (errno != EAGAIN) { - if (d->err_callback) { - err = - g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( - errno)); - d->err_callback (err, d->user_data); - return FALSE; - } - } - else { - debug_ip ("partially write data, retry"); - /* Wait for other event */ - event_del (d->ev); - event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - } - } - else if (r + d->offset < (ssize_t)d->file_size) { - debug_ip ("partially write data, retry"); - /* Wait for other event */ - event_del (d->ev); - event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - } - else { - if (d->write_callback) { - if (!d->write_callback (d->user_data)) { - debug_ip ("callback set wanna_die flag, terminating"); - return FALSE; - } - } - event_del (d->ev); - event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, - (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - d->in_sendfile = FALSE; - } -# endif -#else - ssize_t r; - r = write (d->fd, d->map, d->file_size - d->offset); - if (r == -1) { - if (errno != EAGAIN) { - if (d->err_callback) { - err = - g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( - errno)); - d->err_callback (err, d->user_data); - return FALSE; - } - } - else { - debug_ip ("partially write data, retry"); - /* Wait for other event */ - event_del (d->ev); - event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - } - } - else if (r + d->offset < d->file_size) { - d->offset += r; - debug_ip ("partially write data, retry"); - /* Wait for other event */ - event_del (d->ev); - event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - } - else { - if (d->write_callback) { - if (!d->write_callback (d->user_data)) { - debug_ip ("callback set wanna_die flag, terminating"); - return FALSE; - } - } - event_del (d->ev); - event_set (d->ev, d->fd, EV_READ | EV_PERSIST, dispatcher_cb, - (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - d->in_sendfile = FALSE; - } -#endif - return TRUE; -} - -#define BUFREMAIN(x) (x)->data->size - ((x)->pos - (x)->data->begin) - -#define APPEND_OUT_BUFFER(d, buf) do { \ - DL_APPEND ((d)->out_buffers.buffers, buf); \ - (d)->out_buffers.pending++; \ -} while (0) -#define DELETE_OUT_BUFFER(d, buf) do { \ - DL_DELETE ((d)->out_buffers.buffers, (buf)); \ - g_string_free ((buf->data), (buf)->allocated); \ - g_slice_free1 (sizeof (struct rspamd_out_buffer_s), (buf)); \ - (d)->out_buffers.pending--; \ -} while (0) - -static gboolean -write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) -{ - GError *err = NULL; - struct rspamd_out_buffer_s *cur = NULL, *tmp; - ssize_t r; - struct iovec *iov; - guint i, len; - - len = d->out_buffers.pending; - while (len > 0) { - /* Unset delayed as actually we HAVE buffers to write */ - is_delayed = TRUE; - iov = g_slice_alloc (len * sizeof (struct iovec)); - i = 0; - DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) - { - iov[i].iov_base = cur->data->str; - iov[i].iov_len = cur->data->len; - i++; - } - /* Now try to write the whole vector */ - r = writev (fd, iov, len); - if (r == -1 && errno != EAGAIN) { - g_slice_free1 (len * sizeof (struct iovec), iov); - if (d->err_callback) { - err = - g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( - errno)); - d->err_callback (err, d->user_data); - return FALSE; - } - } - else if (r > 0) { - /* Find pos inside buffers */ - DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) - { - if (r >= (ssize_t)cur->data->len) { - /* Mark this buffer as read */ - r -= cur->data->len; - DELETE_OUT_BUFFER (d, cur); - } - else { - /* This buffer was not written completely */ - g_string_erase (cur->data, 0, r); - break; - } - } - g_slice_free1 (len * sizeof (struct iovec), iov); - if (d->out_buffers.pending > 0) { - /* Wait for other event */ - event_del (d->ev); - event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - return TRUE; - } - } - else if (r == 0) { - /* Got EOF while we wait for data */ - g_slice_free1 (len * sizeof (struct iovec), iov); - if (d->err_callback) { - err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); - d->err_callback (err, d->user_data); - return FALSE; - } - } - else if (r == -1 && errno == EAGAIN) { - g_slice_free1 (len * sizeof (struct iovec), iov); - debug_ip ("partially write data, retry"); - /* Wait for other event */ - event_del (d->ev); - event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - return TRUE; - } - len = d->out_buffers.pending; - } - - if (d->out_buffers.pending == 0) { - /* Disable write event for this time */ - - debug_ip ("all buffers were written successfully"); - - if (is_delayed && d->write_callback) { - if (!d->write_callback (d->user_data)) { - debug_ip ("callback set wanna_die flag, terminating"); - return FALSE; - } - } - - event_del (d->ev); - event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - } - else { - /* Plan other write event */ - event_del (d->ev); - event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - } - - return TRUE; -} - -static struct rspamd_buffer_buf * -allocate_buffer (rspamd_mempool_t *pool, gsize size) -{ - struct rspamd_buffer_buf *b; - - b = rspamd_mempool_alloc_tmp (pool, sizeof (*b)); - b->begin = rspamd_mempool_alloc_tmp (pool, size); - b->size = size; - b->len = 0; - - return b; -} - -static void -read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) -{ - ssize_t r; - GError *err = NULL; - rspamd_ftok_t res; - guchar *c, *b; - guchar *end; - size_t len; - enum io_policy saved_policy; - - if (d->wanna_die) { - rspamd_remove_dispatcher (d); - return; - } - - if (d->in_buf == NULL) { - d->in_buf = - rspamd_mempool_alloc_tmp (d->pool, sizeof (rspamd_buffer_t)); - if (d->policy == BUFFER_LINE || d->policy == BUFFER_ANY) { - d->in_buf->data = allocate_buffer (d->pool, d->default_buf_size); - } - else { - d->in_buf->data = allocate_buffer (d->pool, d->nchars + 1); - } - d->in_buf->pos = d->in_buf->data->begin; - } - - end = d->in_buf->pos; - len = d->in_buf->data->len; - - if (BUFREMAIN (d->in_buf) == 0) { - /* Buffer is full, try to call callback with overflow error */ - if (d->err_callback) { - err = g_error_new (G_DISPATCHER_ERROR, E2BIG, "buffer overflow"); - d->err_callback (err, d->user_data); - return; - } - } - else if (!skip_read) { - /* Try to read the whole buffer */ - r = read (fd, end, BUFREMAIN (d->in_buf)); - if (r == -1 && errno != EAGAIN) { - if (d->err_callback) { - err = - g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror ( - errno)); - d->err_callback (err, d->user_data); - return; - } - } - else if (r == 0) { - /* Got EOF while we wait for data */ -#if 0 - if (d->err_callback) { - err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); - d->err_callback (err, d->user_data); - return; - } -#endif - /* Read returned 0, it may be shutdown or full quit */ - if (!d->want_read) { - d->half_closed = TRUE; - /* Do not expect any read after this */ - event_del (d->ev); - } - else { - if (d->err_callback) { - err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); - d->err_callback (err, d->user_data); - return; - } - } - } - else if (r == -1 && errno == EAGAIN) { - debug_ip ("partially read data, retry"); - return; - } - else { - /* Set current position in buffer */ - d->in_buf->pos += r; - d->in_buf->data->len += r; - } - debug_ip ( - "read %z characters, policy is %s, watermark is: %z, buffer has %z bytes", - r, - d->policy == BUFFER_LINE ? "LINE" : "CHARACTER", - d->nchars, - d->in_buf->data->len); - } - - saved_policy = d->policy; - c = d->in_buf->data->begin; - end = d->in_buf->pos; - len = d->in_buf->data->len; - b = c; - r = 0; - - switch (d->policy) { - case BUFFER_LINE: - /** Variables: - * b - begin of line - * r - current position in buffer - * *len - length of remaining buffer - * c - pointer to current position (buffer->begin + r) - * res - result string - */ - while (r < (ssize_t)len) { - if (*c == '\n') { - res.begin = b; - res.len = c - b; - /* Strip EOL */ - if (d->strip_eol) { - if (r != 0 && *(c - 1) == '\r') { - res.len--; - } - } - else { - /* Include EOL in reply */ - res.len++; - } - /* Call callback for a line */ - if (d->read_callback) { - if (!d->read_callback (&res, d->user_data)) { - return; - } - if (d->policy != saved_policy) { - /* Drain buffer as policy is changed */ - /* Note that d->in_buffer is other pointer now, so we need to reinit all pointers */ - /* First detect how much symbols do we have */ - if (end == c) { - /* In fact we read the whole buffer and change input policy, so just set current pos to begin of buffer */ - d->in_buf->pos = d->in_buf->data->begin; - d->in_buf->data->len = 0; - } - else { - /* Otherwise we need to move buffer */ - /* Reinit pointers */ - len = d->in_buf->data->len - r - 1; - end = d->in_buf->data->begin + r + 1; - memmove (d->in_buf->data->begin, end, len); - d->in_buf->data->len = len; - d->in_buf->pos = d->in_buf->data->begin + len; - /* Process remaining buffer */ - read_buffers (fd, d, TRUE); - } - return; - } - } - /* Set new begin of line */ - b = c + 1; - } - r++; - c++; - } - /* Now drain remaining characters in buffer */ - memmove (d->in_buf->data->begin, b, c - b); - d->in_buf->data->len = c - b; - d->in_buf->pos = d->in_buf->data->begin + (c - b); - break; - case BUFFER_CHARACTER: - r = d->nchars; - if ((ssize_t)len >= r) { - res.begin = b; - res.len = r; - c = b + r; - if (d->read_callback) { - if (!d->read_callback (&res, d->user_data)) { - return; - } - /* Move remaining string to begin of buffer (draining) */ - if ((ssize_t)len > r) { - len -= r; - memmove (d->in_buf->data->begin, c, len); - d->in_buf->data->len = len; - d->in_buf->pos = d->in_buf->data->begin + len; - b = d->in_buf->data->begin; - } - else { - d->in_buf->data->len = 0; - d->in_buf->pos = d->in_buf->data->begin; - } - if (d->policy != saved_policy && (ssize_t)len != r) { - debug_ip ( - "policy changed during callback, restart buffer's processing"); - read_buffers (fd, d, TRUE); - return; - } - } - } - break; - case BUFFER_ANY: - res.begin = d->in_buf->data->begin; - res.len = len; - - if (d->read_callback) { - /* - * Actually we do not want to send zero sized - * buffers to a read callback - */ - if (!(d->want_read && res.len == 0)) { - if (!d->read_callback (&res, d->user_data)) { - return; - } - } - if (d->policy != saved_policy) { - debug_ip ( - "policy changed during callback, restart buffer's processing"); - read_buffers (fd, d, TRUE); - return; - } - } - d->in_buf->pos = d->in_buf->data->begin; - d->in_buf->data->len = 0; - break; - } -} - -#undef BUFREMAIN - -static void -dispatcher_cb (gint fd, short what, void *arg) -{ - rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg; - GError *err = NULL; - - debug_ip ("in dispatcher callback, what: %d, fd: %d", (gint)what, fd); - - if ((what & EV_TIMEOUT) != 0) { - if (d->err_callback) { - err = g_error_new (G_DISPATCHER_ERROR, ETIMEDOUT, "IO timeout"); - d->err_callback (err, d->user_data); - } - } - else if ((what & EV_READ) != 0) { - read_buffers (fd, d, FALSE); - } - else if ((what & EV_WRITE) != 0) { - /* No data to write, disable further EV_WRITE to this fd */ - if (d->in_sendfile) { - sendfile_callback (d); - } - else { - if (d->out_buffers.pending == 0) { - if (d->half_closed && !d->is_restored) { - /* Socket is half closed and there is nothing more to write, closing connection */ - if (d->err_callback) { - err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); - d->err_callback (err, d->user_data); - return; - } - } - else { - /* Want read again */ - event_del (d->ev); - event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, - (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - if (d->is_restored && d->write_callback) { - if (!d->write_callback (d->user_data)) { - return; - } - d->is_restored = FALSE; - } - } - } - else { - /* Delayed write */ - write_buffers (fd, d, TRUE); - } - } - } -} - - -rspamd_io_dispatcher_t * -rspamd_create_dispatcher (struct event_base *base, - gint fd, - enum io_policy policy, - dispatcher_read_callback_t read_cb, - dispatcher_write_callback_t write_cb, - dispatcher_err_callback_t err_cb, - struct timeval *tv, - void *user_data) -{ - rspamd_io_dispatcher_t *new; - - if (fd == -1) { - return NULL; - } - - new = g_slice_alloc0 (sizeof (rspamd_io_dispatcher_t)); - - new->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), NULL); - if (tv != NULL) { - new->tv = rspamd_mempool_alloc (new->pool, sizeof (struct timeval)); - memcpy (new->tv, tv, sizeof (struct timeval)); - } - else { - new->tv = NULL; - } - new->nchars = 0; - new->in_sendfile = FALSE; - new->policy = policy; - new->read_callback = read_cb; - new->write_callback = write_cb; - new->err_callback = err_cb; - new->user_data = user_data; - new->strip_eol = TRUE; - new->half_closed = FALSE; - new->want_read = TRUE; - new->is_restored = FALSE; - new->default_buf_size = sysconf (_SC_PAGESIZE); - - new->ev = rspamd_mempool_alloc0 (new->pool, sizeof (struct event)); - new->fd = fd; - new->ev_base = base; - - event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new); - event_base_set (new->ev_base, new->ev); - event_add (new->ev, new->tv); - - return new; -} - -void -rspamd_remove_dispatcher (rspamd_io_dispatcher_t * d) -{ - struct rspamd_out_buffer_s *cur, *tmp; - - if (d != NULL) { - DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) - { - DELETE_OUT_BUFFER (d, cur); - } - event_del (d->ev); - rspamd_mempool_delete (d->pool); - g_slice_free1 (sizeof (rspamd_io_dispatcher_t), d); - } -} - -void -rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t * d, - enum io_policy policy, - size_t nchars) -{ - struct rspamd_buffer_buf *tmp; - gint t; - - if (d->policy != policy || nchars != d->nchars) { - d->policy = policy; - d->nchars = nchars ? nchars : d->default_buf_size; - /* Resize input buffer if needed */ - if (policy == BUFFER_CHARACTER && nchars != 0) { - if (d->in_buf && d->in_buf->data->size < nchars) { - tmp = allocate_buffer (d->pool, d->nchars + 1); - memcpy (tmp->begin, d->in_buf->data->begin, - d->in_buf->data->len); - t = d->in_buf->pos - d->in_buf->data->begin; - tmp->len = d->in_buf->data->len; - d->in_buf->data = tmp; - d->in_buf->pos = d->in_buf->data->begin + t; - } - } - else if (policy == BUFFER_LINE || policy == BUFFER_ANY) { - if (d->in_buf && d->nchars < d->default_buf_size) { - tmp = allocate_buffer (d->pool, d->default_buf_size); - memcpy (tmp->begin, d->in_buf->data->begin, - d->in_buf->data->len); - t = d->in_buf->pos - d->in_buf->data->begin; - tmp->len = d->in_buf->data->len; - d->in_buf->data = tmp; - d->in_buf->pos = d->in_buf->data->begin + t; - } - d->strip_eol = TRUE; - } - } - - debug_ip ("new input length watermark is %uz", d->nchars); -} - -gboolean -rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, - const void *data, size_t len, gboolean delayed, gboolean allocated) -{ - struct rspamd_out_buffer_s *newbuf; - - newbuf = g_slice_alloc (sizeof (struct rspamd_out_buffer_s)); - if (len == 0) { - /* Assume NULL terminated */ - len = strlen ((const gchar *)data); - } - - if (!allocated) { - newbuf->data = g_string_new_len (data, len); - newbuf->allocated = TRUE; - } - else { - newbuf->data = g_string_new (NULL); - newbuf->data->str = (gchar *)data; - newbuf->data->len = len; - newbuf->data->allocated_len = len; - newbuf->allocated = FALSE; - } - - APPEND_OUT_BUFFER (d, newbuf); - - if (!delayed) { - debug_ip ("plan write event"); - return write_buffers (d->fd, d, FALSE); - } - /* Otherwise plan write event */ - event_del (d->ev); - event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, (void *)d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - - return TRUE; -} - -gboolean -rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, gint fd, size_t len) -{ - if (lseek (fd, 0, SEEK_SET) == -1) { - msg_warn ("lseek failed: %s", strerror (errno)); - return FALSE; - } - - d->offset = 0; - d->in_sendfile = TRUE; - d->sendfile_fd = fd; - d->file_size = len; - -#ifndef HAVE_SENDFILE - #ifdef HAVE_MMAP_NOCORE - if ((d->map = - mmap (NULL, len, PROT_READ, MAP_SHARED | MAP_NOCORE, fd, - 0)) == MAP_FAILED) { - #else - if ((d->map = - mmap (NULL, len, PROT_READ, MAP_SHARED, fd, 0)) == MAP_FAILED) { - #endif - msg_warn ("mmap failed: %s", strerror (errno)); - return FALSE; - } -#endif - - return sendfile_callback (d); -} - -void -rspamd_dispatcher_pause (rspamd_io_dispatcher_t * d) -{ - debug_ip ("paused dispatcher"); - event_del (d->ev); - d->is_restored = FALSE; -} - -void -rspamd_dispatcher_restore (rspamd_io_dispatcher_t * d) -{ - if (!d->is_restored) { - debug_ip ("restored dispatcher"); - event_del (d->ev); - event_set (d->ev, d->fd, EV_WRITE, dispatcher_cb, d); - event_base_set (d->ev_base, d->ev); - event_add (d->ev, d->tv); - d->is_restored = TRUE; - } -} - -void -rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *d) -{ - struct rspamd_out_buffer_s *cur, *tmp; - - DL_FOREACH_SAFE (d->out_buffers.buffers, cur, tmp) - { - DELETE_OUT_BUFFER (d, cur); - } - /* Cleanup temporary data */ - rspamd_mempool_cleanup_tmp (d->pool); - d->in_buf = NULL; -} - -#undef debug_ip diff --git a/src/libserver/buffer.h b/src/libserver/buffer.h deleted file mode 100644 index 26c605484..000000000 --- a/src/libserver/buffer.h +++ /dev/null @@ -1,154 +0,0 @@ -/** - * @file buffer.h - * Implements buffered IO - */ - -#ifndef RSPAMD_BUFFER_H -#define RSPAMD_BUFFER_H - -#include "config.h" -#include "mem_pool.h" -#include "fstring.h" - -typedef gboolean (*dispatcher_read_callback_t)(rspamd_ftok_t *in, void *user_data); -typedef gboolean (*dispatcher_write_callback_t)(void *user_data); -typedef void (*dispatcher_err_callback_t)(GError *err, void *user_data); - -/** - * Types of IO handling - */ -enum io_policy { - BUFFER_LINE, /**< call handler when we have line ready */ - BUFFER_CHARACTER, /**< call handler when we have some characters */ - BUFFER_ANY /**< call handler whenever we got data in buffer */ -}; - -/** - * Buffer structure - */ -struct rspamd_buffer_buf { - gsize size; - gsize len; - guchar *begin; -}; - -typedef struct rspamd_buffer_s { - struct rspamd_buffer_buf *data; - guchar *pos; /**< current position */ -} rspamd_buffer_t; - -struct rspamd_out_buffer_s { - GString *data; - gboolean allocated; - struct rspamd_out_buffer_s *prev, *next; -}; - -typedef struct rspamd_io_dispatcher_s { - rspamd_buffer_t *in_buf; /**< input buffer */ - struct { - guint pending; - struct rspamd_out_buffer_s *buffers; - } out_buffers; /**< output buffers chain */ - struct timeval *tv; /**< io timeout */ - struct event *ev; /**< libevent io event */ - rspamd_mempool_t *pool; /**< where to store data */ - enum io_policy policy; /**< IO policy */ - size_t nchars; /**< how many chars to read */ - gint fd; /**< descriptor */ - guint32 peer_addr; /**< address of peer for debugging */ - gboolean wanna_die; /**< if dispatcher should be stopped */ - dispatcher_read_callback_t read_callback; /**< read callback */ - dispatcher_write_callback_t write_callback; /**< write callback */ - dispatcher_err_callback_t err_callback; /**< error callback */ - void *user_data; /**< user's data for callbacks */ - gulong default_buf_size; /**< default size for buffering */ - off_t offset; /**< for sendfile use */ - size_t file_size; - gint sendfile_fd; - gboolean in_sendfile; /**< whether buffer is in sendfile mode */ - gboolean strip_eol; /**< strip or not line ends in BUFFER_LINE policy */ - gboolean is_restored; /**< call a callback when dispatcher is restored */ - gboolean half_closed; /**< connection is half closed */ - gboolean want_read; /**< whether we want to read more data */ - struct event_base *ev_base; /**< event base for io operations */ -#ifndef HAVE_SENDFILE - void *map; -#endif -} rspamd_io_dispatcher_t; - -/** - * Creates rspamd IO dispatcher for specified descriptor - * @param fd descriptor to IO - * @param policy IO policy - * @param read_cb read callback handler - * @param write_cb write callback handler - * @param err_cb error callback handler - * @param tv IO timeout - * @param user_data pointer to user's data - * @return new dispatcher object or NULL in case of failure - */ -rspamd_io_dispatcher_t * rspamd_create_dispatcher (struct event_base *base, - gint fd, - enum io_policy policy, - dispatcher_read_callback_t read_cb, - dispatcher_write_callback_t write_cb, - dispatcher_err_callback_t err_cb, - struct timeval *tv, - void *user_data); - -/** - * Set new policy for dispatcher - * @param d pointer to dispatcher's object - * @param policy IO policy - * @param nchars number of characters in buffer for character policy - */ -void rspamd_set_dispatcher_policy (rspamd_io_dispatcher_t *d, - enum io_policy policy, - size_t nchars); - -/** - * Write data when it would be possible - * @param d pointer to dispatcher's object - * @param data data to write - * @param len length of data - */ -gboolean rspamd_dispatcher_write (rspamd_io_dispatcher_t *d, - const void *data, - size_t len, gboolean delayed, - gboolean allocated) G_GNUC_WARN_UNUSED_RESULT; - -/** - * Send specified descriptor to dispatcher - * @param d pointer to dispatcher's object - * @param fd descriptor of file - * @param len length of data - */ -gboolean rspamd_dispatcher_sendfile (rspamd_io_dispatcher_t *d, - gint fd, - size_t len) G_GNUC_WARN_UNUSED_RESULT; - -/** - * Pause IO events on dispatcher - * @param d pointer to dispatcher's object - */ -void rspamd_dispatcher_pause (rspamd_io_dispatcher_t *d); - -/** - * Restore IO events on dispatcher - * @param d pointer to dispatcher's object - */ -void rspamd_dispatcher_restore (rspamd_io_dispatcher_t *d); - -/** - * Frees dispatcher object - * @param dispatcher pointer to dispatcher's object - */ -void rspamd_remove_dispatcher (rspamd_io_dispatcher_t *dispatcher); - -/** - * Cleanup dispatcher freeing all temporary data - * @param dispatcher pointer to dispatcher's object - */ -void rspamd_dispacther_cleanup (rspamd_io_dispatcher_t *dispatcher); - -#endif diff --git a/src/lmtp.c b/src/lmtp.c deleted file mode 100644 index c63dba38a..000000000 --- a/src/lmtp.c +++ /dev/null @@ -1,365 +0,0 @@ -/*- - * Copyright 2016 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "config.h" -#include "buffer.h" -#include "main.h" -#include "lmtp.h" -#include "lmtp_proto.h" -#include "cfg_file.h" -#include "util.h" -#include "url.h" -#include "message.h" - -static gchar greetingbuf[1024]; -static struct timeval io_tv; - -static gboolean lmtp_write_socket (void *arg); - -void start_lmtp (struct rspamd_worker *worker); - -worker_t lmtp_worker = { - "controller", /* Name */ - NULL, /* Init function */ - start_lmtp, /* Start function */ - TRUE, /* Has socket */ - FALSE, /* Non unique */ - FALSE, /* Non threaded */ - TRUE, /* Killable */ - RSPAMD_WORKER_VER /* Version info */ -}; - -#ifndef HAVE_SA_SIGINFO -static void -sig_handler (gint signo) -#else -static void -sig_handler (gint signo, siginfo_t *info, void *unused) -#endif -{ - switch (signo) { - case SIGINT: - case SIGTERM: - _exit (1); - break; - } -} - -/* - * Config reload is designed by sending sigusr to active workers and pending shutdown of them - */ -static void -sigusr2_handler (gint fd, short what, void *arg) -{ - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - /* Do not accept new connections, preparing to end worker's process */ - struct timeval tv; - tv.tv_sec = SOFT_SHUTDOWN_TIME; - tv.tv_usec = 0; - event_del (&worker->sig_ev_usr1); - event_del (&worker->sig_ev_usr2); - event_del (&worker->bind_ev); - msg_info ("lmtp worker's shutdown is pending in %d sec", - SOFT_SHUTDOWN_TIME); - event_loopexit (&tv); - return; -} - -/* - * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them - */ -static void -sigusr1_handler (gint fd, short what, void *arg) -{ - struct rspamd_worker *worker = (struct rspamd_worker *) arg; - - rspamd_log_reopen (worker->srv->logger); - - return; -} - -/* - * Destructor for recipients list - */ -static void -rcpt_destruct (void *pointer) -{ - struct rspamd_task *task = (struct rspamd_task *)pointer; - - if (task->rcpt) { - g_list_free (task->rcpt); - } -} - -/* - * Free all structures of lmtp proto - */ -static void -free_lmtp_task (struct rspamd_lmtp_proto *lmtp, gboolean is_soft) -{ - GList *part; - struct rspamd_mime_part *p; - struct rspamd_task *task = lmtp->task; - - if (lmtp) { - debug_task ("free pointer %p", lmtp->task); - while ((part = g_list_first (lmtp->task->parts))) { - lmtp->task->parts = g_list_remove_link (lmtp->task->parts, part); - p = (struct rspamd_mime_part *)part->data; - g_byte_array_free (p->content, FALSE); - g_list_free_1 (part); - } - rspamd_mempool_delete (lmtp->task->task_pool); - if (is_soft) { - /* Plan dispatcher shutdown */ - lmtp->task->dispatcher->wanna_die = 1; - } - else { - rspamd_remove_dispatcher (lmtp->task->dispatcher); - } - close (lmtp->task->sock); - g_free (lmtp->task); - g_free (lmtp); - } -} - -/* - * Callback that is called when there is data to read in buffer - */ -static gboolean -lmtp_read_socket (rspamd_fstring_t * in, void *arg) -{ - struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; - struct rspamd_task *task = lmtp->task; - ssize_t r; - - switch (task->state) { - case READ_COMMAND: - case READ_HEADER: - if (read_lmtp_input_line (lmtp, in) != 0) { - msg_info ("closing lmtp connection due to protocol error"); - lmtp->task->state = CLOSING_CONNECTION; - } - /* Task was read, recall read handler once more with new state to process message and write reply */ - if (task->state == READ_MESSAGE) { - lmtp_read_socket (in, arg); - } - break; - case READ_MESSAGE: - r = rspamd_message_parse (lmtp->task); - r = rspamd_process_filters (lmtp->task); - if (r == -1) { - return FALSE; - } - else if (r == 0) { - task->state = WAIT_FILTER; - rspamd_dispatcher_pause (lmtp->task->dispatcher); - } - else { - rspamd_process_statistics (lmtp->task); - task->state = WRITE_REPLY; - lmtp_write_socket (lmtp); - } - break; - default: - debug_task ("invalid state while reading from socket %d", - lmtp->task->state); - break; - } - - return TRUE; -} - -/* - * Callback for socket writing - */ -static gboolean -lmtp_write_socket (void *arg) -{ - struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; - struct rspamd_task *task = lmtp->task; - - switch (lmtp->task->state) { - case WRITE_REPLY: - if (write_lmtp_reply (lmtp) == 1) { - lmtp->task->state = WAIT_FILTER; - } - else { - lmtp->task->state = CLOSING_CONNECTION; - } - break; - case WRITE_ERROR: - write_lmtp_reply (lmtp); - lmtp->task->state = CLOSING_CONNECTION; - break; - case CLOSING_CONNECTION: - debug_task ("normally closing connection"); - free_lmtp_task (lmtp, TRUE); - return FALSE; - break; - default: - debug_task ("invalid state while writing to socket %d", - lmtp->task->state); - break; - } - - return TRUE; -} - -/* - * Called if something goes wrong - */ -static void -lmtp_err_socket (GError * err, void *arg) -{ - struct rspamd_lmtp_proto *lmtp = (struct rspamd_lmtp_proto *)arg; - msg_info ("abnormally closing connection, error: %s", err->message); - /* Free buffers */ - free_lmtp_task (lmtp, FALSE); -} - -/* - * Accept new connection and construct task - */ -static void -accept_socket (gint fd, short what, void *arg) -{ - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - union sa_union su; - struct rspamd_task *new_task; - struct rspamd_lmtp_proto *lmtp; - socklen_t addrlen = sizeof (su.ss); - gint nfd; - - if ((nfd = - accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { - msg_warn ("accept failed: %s", strerror (errno)); - return; - } - - lmtp = g_malloc (sizeof (struct rspamd_lmtp_proto)); - - new_task = rspamd_task_new (worker); - - if (su.ss.ss_family == AF_UNIX) { - msg_info ("accepted connection from unix socket"); - new_task->client_addr.s_addr = INADDR_NONE; - } - else if (su.ss.ss_family == AF_INET) { - msg_info ("accepted connection from %s port %d", - inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port)); - memcpy (&new_task->client_addr, &su.s4.sin_addr, - sizeof (struct in_addr)); - } - - new_task->sock = nfd; - new_task->cfg = worker->srv->cfg; - new_task->task_pool = rspamd_mempool_new (rspamd_mempool_suggest_size ()); - /* Add destructor for recipients list (it would be better to use anonymous function here */ - rspamd_mempool_add_destructor (new_task->task_pool, - (rspamd_mempool_destruct_t) rcpt_destruct, new_task); - new_task->results = g_hash_table_new (rspamd_str_hash, rspamd_str_equal); - new_task->ev_base = worker->ctx; - rspamd_mempool_add_destructor (new_task->task_pool, - (rspamd_mempool_destruct_t) g_hash_table_destroy, new_task->results); - worker->srv->stat->connections_count++; - lmtp->task = new_task; - lmtp->state = LMTP_READ_LHLO; - - /* Set up dispatcher */ - new_task->dispatcher = rspamd_create_dispatcher (new_task->ev_base, - nfd, - BUFFER_LINE, - lmtp_read_socket, - lmtp_write_socket, - lmtp_err_socket, - &io_tv, - (void *)lmtp); - new_task->dispatcher->peer_addr = new_task->client_addr.s_addr; - if (!rspamd_dispatcher_write (lmtp->task->dispatcher, greetingbuf, - strlen (greetingbuf), FALSE, FALSE)) { - msg_warn ("cannot write greeting"); - } -} - -/* - * Start lmtp worker process - */ -void -start_lmtp (struct rspamd_worker *worker) -{ - struct sigaction signals; - gchar *hostbuf; - gsize hostmax; - module_t **mod; - - worker->srv->pid = getpid (); - worker->ctx = event_init (); - - rspamd_signals_init (&signals, sig_handler); - sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); - - /* SIGUSR2 handler */ - signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, - (void *) worker); - event_base_set (worker->ctx, &worker->sig_ev_usr2); - signal_add (&worker->sig_ev_usr2, NULL); - - /* SIGUSR1 handler */ - signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler, - (void *) worker); - event_base_set (worker->ctx, &worker->sig_ev_usr1); - signal_add (&worker->sig_ev_usr1, NULL); - - /* Accept event */ - event_set (&worker->bind_ev, - worker->cf->listen_sock, - EV_READ | EV_PERSIST, - accept_socket, - (void *)worker); - event_base_set (worker->ctx, &worker->bind_ev); - event_add (&worker->bind_ev, NULL); - - /* Perform modules configuring */ - mod = &modules[0]; - while (*mod) { - (*mod)->module_config_func (worker->srv->cfg); - mod++; - } - - /* Fill hostname buf */ - hostmax = sysconf (_SC_HOST_NAME_MAX) + 1; - hostbuf = alloca (hostmax); - gethostname (hostbuf, hostmax); - hostbuf[hostmax - 1] = '\0'; - rspamd_snprintf (greetingbuf, - sizeof (greetingbuf), - "%d rspamd version %s LMTP on %s Ready\r\n", - LMTP_OK, - RVERSION, - hostbuf); - - io_tv.tv_sec = 60000; - io_tv.tv_usec = 0; - - gperf_profiler_init (worker->srv->cfg, "lmtp"); - - event_base_loop (worker->ctx, 0); - exit (EXIT_SUCCESS); -} - -/* - * vi:ts=4 - */ diff --git a/src/lmtp.h b/src/lmtp.h deleted file mode 100644 index 863c6eacd..000000000 --- a/src/lmtp.h +++ /dev/null @@ -1,18 +0,0 @@ -#ifndef RSPAMD_LMTP_H -#define RSPAMD_LMTP_H - -#include "config.h" -#include "main.h" - -#define LMTP_GREETING 220 -#define LMTP_QUIT 221 -#define LMTP_OK 250 -#define LMTP_DATA 354 -#define LMTP_ERROR_PROCESS 500 -#define LMTP_FAILURE 530 -#define LMTP_AUTH_ERROR 503 -#define LMTP_BAD_CMD 503 -#define LMTP_NO_RCPT 554 -#define LMTP_TEMP_FAIL 421 - -#endif diff --git a/src/lmtp_proto.c b/src/lmtp_proto.c deleted file mode 100644 index a56467103..000000000 --- a/src/lmtp_proto.c +++ /dev/null @@ -1,758 +0,0 @@ -/*- - * Copyright 2016 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "config.h" -#include "main.h" -#include "cfg_file.h" -#include "util.h" -#include "lmtp.h" -#include "lmtp_proto.h" - -/* Max line size as it is defined in rfc2822 */ -#define OUTBUFSIZ 1000 - -/* LMTP commands */ -static rspamd_fstring_t lhlo_command = { - .begin = "LHLO", - .len = sizeof ("LHLO") - 1 -}; - -static rspamd_fstring_t mail_command = { - .begin = "MAIL FROM:", - .len = sizeof ("MAIL FROM:") - 1 -}; - -static rspamd_fstring_t rcpt_command = { - .begin = "RCPT TO:", - .len = sizeof ("RCPT TO:") - 1 -}; - -static rspamd_fstring_t data_command = { - .begin = "DATA", - .len = sizeof ("DATA") - 1 -}; - -static rspamd_fstring_t data_dot = { - .begin = ".\r\n", - .len = sizeof (".\r\n") - 1 -}; - -static const gchar *mail_regexp = - "[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*@(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?"; -static GRegex *mail_re = NULL; - -/* - * Extract e-mail from read line - * return <> if no valid address detected - */ -static gchar * -extract_mail (rspamd_mempool_t * pool, rspamd_fstring_t * line) -{ - GError *err = NULL; - gchar *match; - GMatchInfo *info; - - if (mail_re == NULL) { - /* Compile regexp */ - mail_re = g_regex_new (mail_regexp, G_REGEX_RAW, 0, &err); - } - - if (g_regex_match_full (mail_re, line->begin, line->len, 0, 0, &info, - NULL) == TRUE) { - match = rspamd_mempool_strdup (pool, g_match_info_fetch (info, 0)); - g_match_info_free (info); - } - else { - match = "<>"; - } - - return match; -} - -static gboolean -out_lmtp_reply (struct rspamd_task *task, gint code, gchar *rcode, gchar *msg) -{ - gchar outbuf[OUTBUFSIZ]; - gint r; - - if (*rcode == '\0') { - r = rspamd_snprintf (outbuf, OUTBUFSIZ, "%d %s\r\n", code, msg); - } - else { - r = - rspamd_snprintf (outbuf, OUTBUFSIZ, "%d %s %s\r\n", code, rcode, - msg); - } - if (!rspamd_dispatcher_write (task->dispatcher, outbuf, r, FALSE, FALSE)) { - return FALSE; - } - return TRUE; -} - -gint -read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, rspamd_fstring_t * line) -{ - gchar *c, *rcpt; - rspamd_fstring_t fstr; - gint i = 0, l = 0, size; - - switch (lmtp->state) { - case LMTP_READ_LHLO: - /* Search LHLO line */ - if ((i = rspamd_fstrstri (line, &lhlo_command)) == -1) { - msg_info ("LHLO expected but not found"); - (void)out_lmtp_reply (lmtp->task, - LMTP_BAD_CMD, - "5.0.0", - "Need LHLO here"); - return -1; - } - else { - i += lhlo_command.len; - c = line->begin + i; - /* Skip spaces */ - while (g_ascii_isspace (*c) && i < (gint)line->len) { - i++; - c++; - } - lmtp->task->helo = rspamd_mempool_alloc (lmtp->task->task_pool, - line->len - i + 1); - /* Strlcpy makes string null terminated by design */ - rspamd_strlcpy (lmtp->task->helo, c, line->len - i + 1); - lmtp->state = LMTP_READ_FROM; - if (!out_lmtp_reply (lmtp->task, LMTP_OK, "", "Ok")) { - return -1; - } - return 0; - } - break; - case LMTP_READ_FROM: - /* Search MAIL FROM: line */ - if ((i = rspamd_fstrstri (line, &mail_command)) == -1) { - msg_info ("MAIL expected but not found"); - (void)out_lmtp_reply (lmtp->task, - LMTP_BAD_CMD, - "5.0.0", - "Need MAIL here"); - return -1; - } - else { - i += mail_command.len; - c = line->begin + i; - fstr.begin = line->begin + i; - fstr.len = line->len - i; - lmtp->task->from = extract_mail (lmtp->task->task_pool, &fstr); - lmtp->state = LMTP_READ_RCPT; - if (!out_lmtp_reply (lmtp->task, LMTP_OK, "2.1.0", "Sender ok")) { - return -1; - } - return 0; - } - break; - case LMTP_READ_RCPT: - /* Search RCPT_TO: line */ - if ((i = rspamd_fstrstri (line, &rcpt_command)) == -1) { - msg_info ("RCPT expected but not found"); - (void)out_lmtp_reply (lmtp->task, - LMTP_NO_RCPT, - "5.5.4", - "Need RCPT here"); - return -1; - } - else { - i += rcpt_command.len; - c = line->begin + i; - fstr.begin = line->begin + i; - fstr.len = line->len - i; - rcpt = extract_mail (lmtp->task->task_pool, &fstr); - if (*rcpt == '<' && *(rcpt + 1) == '>') { - /* Invalid or empty rcpt not allowed */ - msg_info ("bad recipient"); - (void)out_lmtp_reply (lmtp->task, - LMTP_NO_RCPT, - "5.5.4", - "Bad recipient"); - return -1; - } - /* Strlcpy makes string null terminated by design */ - lmtp->task->rcpt = g_list_prepend (lmtp->task->rcpt, rcpt); - lmtp->state = LMTP_READ_DATA; - if (!out_lmtp_reply (lmtp->task, LMTP_OK, "2.1.0", - "Recipient ok")) { - return -1; - } - return 0; - } - break; - case LMTP_READ_DATA: - /* Search DATA line */ - if ((i = rspamd_fstrstri (line, &data_command)) == -1) { - msg_info ("DATA expected but not found"); - (void)out_lmtp_reply (lmtp->task, - LMTP_BAD_CMD, - "5.0.0", - "Need DATA here"); - return -1; - } - else { - i += data_command.len; - c = line->begin + i; - /* Skip spaces */ - while (g_ascii_isspace (*c++)) { - i++; - } - rcpt = rspamd_mempool_alloc (lmtp->task->task_pool, - line->len - i + 1); - /* Strlcpy makes string null terminated by design */ - rspamd_strlcpy (rcpt, c, line->len - i + 1); - lmtp->task->rcpt = g_list_prepend (lmtp->task->rcpt, rcpt); - lmtp->state = LMTP_READ_MESSAGE; - if (!out_lmtp_reply (lmtp->task, LMTP_DATA, "", - "Enter message, ending with \".\" on a line by itself")) { - return -1; - } - lmtp->task->msg = rspamd_fstralloc (lmtp->task->task_pool, BUFSIZ); - return 0; - } - break; - case LMTP_READ_MESSAGE: - if (strncmp (line->begin, data_dot.begin, line->len) == 0) { - lmtp->state = LMTP_READ_DOT; - lmtp->task->state = READ_MESSAGE; - return 0; - } - else { - l = lmtp->task->msg->len; - size = lmtp->task->msg->size; - if ((gint)(l + line->len) > size) { - /* Grow buffer */ - if ((gint)line->len > size) { - size += line->len << 1; - } - else { - /* size *= 2 */ - size <<= 1; - } - lmtp->task->msg = rspamd_fstrgrow (lmtp->task->task_pool, - lmtp->task->msg, - size); - } - rspamd_fstrcat (lmtp->task->msg, line); - return 0; - } - break; - case LMTP_READ_DOT: - /* We have some input after reading dot, close connection as we have no currently support of multiply - * messages per session - */ - if (!out_lmtp_reply (lmtp->task, LMTP_QUIT, "", "Bye")) { - return -1; - } - return 0; - break; - } - - return 0; -} - -struct mta_callback_data { - struct rspamd_task *task; - rspamd_io_dispatcher_t *dispatcher; - enum { - LMTP_WANT_GREETING, - LMTP_WANT_MAIL, - LMTP_WANT_RCPT, - LMTP_WANT_DATA, - LMTP_WANT_DOT, - LMTP_WANT_CLOSING, - } state; -}; - -static gboolean -parse_mta_str (rspamd_fstring_t * in, struct mta_callback_data *cd) -{ - gint r; - static rspamd_fstring_t okres1 = { - .begin = "250 ", - .len = sizeof ("250 ") - 1, - } - , okres2 = { - .begin = "220 ",.len = sizeof ("220 ") - 1, - } - , datares = { - .begin = "354 ",.len = sizeof ("354 ") - 1, - }; - - switch (cd->state) { - case LMTP_WANT_GREETING: - case LMTP_WANT_MAIL: - case LMTP_WANT_RCPT: - case LMTP_WANT_DATA: - case LMTP_WANT_CLOSING: - r = rspamd_fstrstr (in, &okres1); - if (r == -1) { - r = rspamd_fstrstr (in, &okres2); - } - break; - case LMTP_WANT_DOT: - r = rspamd_fstrstr (in, &datares); - break; - } - - return r != -1; -} - -static void -close_mta_connection (struct mta_callback_data *cd, gboolean is_success) -{ - cd->task->state = CLOSING_CONNECTION; - if (is_success) { - if (!out_lmtp_reply (cd->task, LMTP_OK, "", "Delivery completed")) { - return; - } - } - else { - if (!out_lmtp_reply (cd->task, LMTP_FAILURE, "", "Delivery failure")) { - return; - } - } - rspamd_remove_dispatcher (cd->dispatcher); -} - -/* - * Callback that is called when there is data to read in buffer - */ -static gboolean -mta_read_socket (rspamd_fstring_t * in, void *arg) -{ - struct mta_callback_data *cd = (struct mta_callback_data *)arg; - gchar outbuf[1024], *hostbuf, *c; - gint hostmax, r; - GList *cur; - static rspamd_fstring_t contres1 = { - .begin = "250-", - .len = sizeof ("250-") - 1, - } - , contres2 = { - .begin = "220-",.len = sizeof ("220-") - 1, - }; - - if (rspamd_fstrstr (in, &contres1) != -1 || rspamd_fstrstr (in, &contres2) != -1) { - /* Skip such lines */ - return TRUE; - } - - switch (cd->state) { - case LMTP_WANT_GREETING: - if (!parse_mta_str (in, cd)) { - msg_warn ("got bad greeting"); - close_mta_connection (cd, FALSE); - return FALSE; - } - hostmax = sysconf (_SC_HOST_NAME_MAX) + 1; - hostbuf = alloca (hostmax); - gethostname (hostbuf, hostmax); - hostbuf[hostmax - 1] = '\0'; - if (cd->task->cfg->deliver_lmtp) { - r = rspamd_snprintf (outbuf, - sizeof (outbuf), - "LHLO %s" CRLF, - hostbuf); - } - else { - r = rspamd_snprintf (outbuf, - sizeof (outbuf), - "HELO %s" CRLF, - hostbuf); - } - if (!rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, - FALSE)) { - return FALSE; - } - cd->state = LMTP_WANT_MAIL; - break; - case LMTP_WANT_MAIL: - if (!parse_mta_str (in, cd)) { - msg_warn ("got bad helo"); - close_mta_connection (cd, FALSE); - return FALSE; - } - r = rspamd_snprintf (outbuf, - sizeof (outbuf), - "MAIL FROM: <%s>" CRLF, - cd->task->from); - if (!rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, - FALSE)) { - return FALSE; - } - cd->state = LMTP_WANT_RCPT; - break; - case LMTP_WANT_RCPT: - if (!parse_mta_str (in, cd)) { - msg_warn ("got bad mail from"); - close_mta_connection (cd, FALSE); - return FALSE; - } - cur = g_list_first (cd->task->rcpt); - r = 0; - while (cur) { - r += rspamd_snprintf (outbuf + r, - sizeof (outbuf) - r, - "RCPT TO: <%s>" CRLF, - (gchar *)cur->data); - cur = g_list_next (cur); - } - - if (!rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, - FALSE)) { - return FALSE; - } - cd->state = LMTP_WANT_DATA; - break; - case LMTP_WANT_DATA: - if (!parse_mta_str (in, cd)) { - msg_warn ("got bad rcpt"); - close_mta_connection (cd, FALSE); - return FALSE; - } - r = rspamd_snprintf (outbuf, sizeof (outbuf), "DATA" CRLF); - if (!rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, - FALSE)) { - return FALSE; - } - cd->state = LMTP_WANT_DOT; - break; - case LMTP_WANT_DOT: - if (!parse_mta_str (in, cd)) { - msg_warn ("got bad data"); - close_mta_connection (cd, FALSE); - return FALSE; - } - c = g_mime_object_to_string ((GMimeObject *) cd->task->message); - r = strlen (c); - if (!rspamd_dispatcher_write (cd->task->dispatcher, c, r, TRUE, TRUE)) { - return FALSE; - } - rspamd_mempool_add_destructor (cd->task->task_pool, - (rspamd_mempool_destruct_t) g_free, c); - r = rspamd_snprintf (outbuf, sizeof (outbuf), CRLF "." CRLF); - if (!rspamd_dispatcher_write (cd->task->dispatcher, outbuf, r, FALSE, - FALSE)) { - return FALSE; - } - cd->state = LMTP_WANT_CLOSING; - break; - case LMTP_WANT_CLOSING: - if (!parse_mta_str (in, cd)) { - msg_warn ("message not delivered"); - close_mta_connection (cd, FALSE); - return FALSE; - } - close_mta_connection (cd, TRUE); - break; - } - - return TRUE; -} - -/* - * Called if something goes wrong - */ -static void -mta_err_socket (GError * err, void *arg) -{ - struct mta_callback_data *cd = (struct mta_callback_data *)arg; - msg_info ("abnormaly terminating connection with MTA"); - close_mta_connection (cd, FALSE); -} - -/* - * Deliver mail via smtp or lmtp - */ -static gint -lmtp_deliver_mta (struct rspamd_task *task) -{ - gint sock; - struct sockaddr_un *un; - struct mta_callback_data *cd; - - if (task->cfg->deliver_family == AF_UNIX) { - un = alloca (sizeof (struct sockaddr_un)); - sock = rspamd_socket_unix (task->cfg->deliver_host, - un, - SOCK_STREAM, - FALSE, - TRUE); - } - else { - sock = rspamd_socket (task->cfg->deliver_host, - task->cfg->deliver_port, - SOCK_STREAM, - TRUE, - FALSE, - TRUE); - } - if (sock == -1) { - msg_warn ("cannot create socket for %s, %s", - task->cfg->deliver_host, - strerror (errno)); - } - - cd = rspamd_mempool_alloc (task->task_pool, - sizeof (struct mta_callback_data)); - cd->task = task; - cd->state = LMTP_WANT_GREETING; - cd->dispatcher = rspamd_create_dispatcher (task->ev_base, - sock, - BUFFER_LINE, - mta_read_socket, - NULL, - mta_err_socket, - NULL, - (void *)cd); - return 0; -} - -static gchar * -format_lda_args (struct rspamd_task *task) -{ - gchar *res, *c, *r; - size_t len; - GList *rcpt; - gboolean got_args = FALSE; - - c = task->cfg->deliver_agent_path; - /* Find first arg */ - if ((c = strchr (c, ' ')) == NULL) { - return task->cfg->deliver_agent_path; - } - - /* Calculate length of result string */ - len = strlen (task->cfg->deliver_agent_path); - while (*c) { - if (*c == '%') { - c++; - switch (*c) { - case 'f': - /* Insert from */ - len += strlen (task->from) - 2; - break; - case 'r': - /* Insert list of recipients */ - rcpt = g_list_first (task->rcpt); - len -= 2; - while (rcpt) { - len += strlen ((gchar *)rcpt->data) + 1; - rcpt = g_list_next (rcpt); - } - break; - } - } - c++; - len++; - } - res = rspamd_mempool_alloc (task->task_pool, len + 1); - r = res; - c = task->cfg->deliver_agent_path; - - while (*c) { - if (*c == ' ') { - got_args = TRUE; - } - if (got_args && *c == '%') { - switch (*(c + 1)) { - case 'f': - /* Insert from */ - c += 2; - len = strlen (task->from); - memcpy (r, task->from, len); - r += len; - break; - case 'r': - /* Insert list of recipients */ - c += 2; - rcpt = g_list_first (task->rcpt); - while (rcpt) { - len = strlen ((gchar *)rcpt->data) + 1; - memcpy (r, rcpt->data, len); - r += len; - *r++ = ' '; - rcpt = g_list_next (rcpt); - } - break; - default: - *r = *c; - r++; - c++; - break; - } - } - else { - *r = *c; - r++; - c++; - } - } - - return res; -} - -static gint -lmtp_deliver_lda (struct rspamd_task *task) -{ - gchar *args, **argv; - GMimeStream *stream; - gint rc, ecode, p[2], argc; - pid_t cpid, pid; - - if ((args = format_lda_args (task)) == NULL) { - return -1; - } - - /* Format arguments in shell style */ - if (!g_shell_parse_argv (args, &argc, &argv, NULL)) { - msg_info ("cannot parse arguments"); - return -1; - } - - if (pipe (p) == -1) { - g_strfreev (argv); - msg_info ("cannot open pipe: %s", strerror (errno)); - return -1; - } - - /* Fork to exec LDA */ -#ifdef HAVE_VFORK - if ((cpid = vfork ()) == -1) { - g_strfreev (argv); - msg_info ("cannot fork: %s", strerror (errno)); - return -1; - } -#else - if ((cpid = fork ()) == -1) { - g_strfreev (argv); - msg_info ("cannot fork: %s", strerror (errno)); - return -1; - } -#endif - - if (cpid == 0) { - /* Child process, close write pipe and keep only read one */ - close (p[1]); - /* Set standart IO descriptors */ - if (p[0] != STDIN_FILENO) { - (void)dup2 (p[0], STDIN_FILENO); - (void)close (p[0]); - } - - execv (argv[0], argv); - _exit (127); - } - - close (p[0]); - stream = g_mime_stream_fs_new (p[1]); - - if (g_mime_object_write_to_stream ((GMimeObject *) task->message, - stream) == -1) { - g_strfreev (argv); - msg_info ("cannot write stream to lda"); - return -1; - } - - g_object_unref (stream); - close (p[1]); - -#if defined(HAVE_WAIT4) - do { - pid = wait4 (cpid, &rc, 0, NULL); - } while (pid == -1 && errno == EINTR); -#elif defined(HAVE_WAITPID) - do { - pid = waitpid (cpid, &rc, 0); - } while (pid == -1 && errno == EINTR); -#else -# error wait mechanisms are undefined -#endif - if (rc == -1) { - g_strfreev (argv); - msg_info ("lda returned error code"); - return -1; - } - else if (WIFEXITED (rc)) { - ecode = WEXITSTATUS (rc); - if (ecode == 0) { - g_strfreev (argv); - return 0; - } - else { - g_strfreev (argv); - msg_info ("lda returned error code %d", ecode); - return -1; - } - } - - g_strfreev (argv); - return -1; -} - -gint -lmtp_deliver_message (struct rspamd_task *task) -{ - if (task->cfg->deliver_agent_path != NULL) { - /* Do deliver to LDA */ - return lmtp_deliver_lda (task); - } - else { - /* XXX: do lmtp/smtp client */ - return -1; - } -} - -gint -write_lmtp_reply (struct rspamd_lmtp_proto *lmtp) -{ - gint r; - struct rspamd_task *task = lmtp->task; - - debug_task ("writing reply to client"); - if (lmtp->task->error_code != 0) { - if (!out_lmtp_reply (lmtp->task, lmtp->task->error_code, "", - lmtp->task->last_error)) { - return -1; - } - } - else { - /* Do delivery */ - if ((r = lmtp_deliver_message (lmtp->task)) == -1) { - out_lmtp_reply (lmtp->task, LMTP_FAILURE, "", "Delivery failure"); - return -1; - } - else if (r == 0) { - if (!out_lmtp_reply (lmtp->task, LMTP_OK, "", - "Delivery completed")) { - return -1; - } - } - else { - return 1; - } - } - - return 0; -} - -/* - * vi:ts=4 - */ diff --git a/src/lmtp_proto.h b/src/lmtp_proto.h deleted file mode 100644 index fb2f0f8f2..000000000 --- a/src/lmtp_proto.h +++ /dev/null @@ -1,44 +0,0 @@ -#ifndef RSPAMD_LMTP_PROTO_H -#define RSPAMD_LMTP_PROTO_H - -#include "config.h" - -struct rspamd_task; - -enum lmtp_state { - LMTP_READ_LHLO, - LMTP_READ_FROM, - LMTP_READ_RCPT, - LMTP_READ_DATA, - LMTP_READ_MESSAGE, - LMTP_READ_DOT, -}; - -struct rspamd_lmtp_proto { - struct rspamd_task *task; - enum lmtp_state state; -}; - -/** - * Read one line of user's input for specified task - * @param lmtp lmtp object - * @param line line of user's input - * @return 0 if line was successfully parsed and -1 if we have protocol error - */ -gint read_lmtp_input_line (struct rspamd_lmtp_proto *lmtp, rspamd_fstring_t *line); - -/** - * Deliver message via lmtp/smtp or pipe to LDA - * @param task task object - * @return 0 if we wrote message and -1 if there was some error - */ -gint lmtp_deliver_message (struct rspamd_task *task); - -/** - * Write reply for specified lmtp object - * @param lmtp lmtp object - * @return 0 if we wrote reply and -1 if there was some error - */ -gint write_lmtp_reply (struct rspamd_lmtp_proto *lmtp); - -#endif diff --git a/src/lua/lua_buffer.c b/src/lua/lua_buffer.c deleted file mode 100644 index 3605a0eb5..000000000 --- a/src/lua/lua_buffer.c +++ /dev/null @@ -1,381 +0,0 @@ -/*- - * Copyright 2016 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "lua_common.h" -#include "buffer.h" -#include "unix-std.h" -#include <math.h> - -/* Public prototypes */ -struct rspamd_io_dispatcher_s * lua_check_io_dispatcher (lua_State * L); -void luaopen_io_dispatcher (lua_State * L); - -/* Lua bindings */ -LUA_FUNCTION_DEF (io_dispatcher, create); -LUA_FUNCTION_DEF (io_dispatcher, set_policy); -LUA_FUNCTION_DEF (io_dispatcher, write); -LUA_FUNCTION_DEF (io_dispatcher, pause); -LUA_FUNCTION_DEF (io_dispatcher, restore); -LUA_FUNCTION_DEF (io_dispatcher, destroy); - -static const struct luaL_reg io_dispatcherlib_m[] = { - LUA_INTERFACE_DEF (io_dispatcher, set_policy), - LUA_INTERFACE_DEF (io_dispatcher, write), - LUA_INTERFACE_DEF (io_dispatcher, pause), - LUA_INTERFACE_DEF (io_dispatcher, restore), - LUA_INTERFACE_DEF (io_dispatcher, destroy), - {"__tostring", rspamd_lua_class_tostring}, - {NULL, NULL} -}; - -static const struct luaL_reg io_dispatcherlib_f[] = { - LUA_INTERFACE_DEF (io_dispatcher, create), - {NULL, NULL} -}; - -struct lua_dispatcher_cbdata { - lua_State *L; - rspamd_io_dispatcher_t *d; - struct event_base *base; - gint cbref_read; - gint cbref_write; - gint cbref_err; -}; - -struct rspamd_io_dispatcher_s * -lua_check_io_dispatcher (lua_State * L) -{ - void *ud = rspamd_lua_check_udata (L, 1, "rspamd{io_dispatcher}"); - luaL_argcheck (L, ud != NULL, 1, "'io_dispatcher' expected"); - return ud ? *((struct rspamd_io_dispatcher_s **)ud) : NULL; -} - -struct event_base * -lua_check_event_base (lua_State *L) -{ - void *ud = rspamd_lua_check_udata (L, 1, "rspamd{ev_base}"); - luaL_argcheck (L, ud != NULL, 1, "'ev_base' expected"); - return ud ? *((struct event_base **)ud) : NULL; -} - -/* Dispatcher callbacks */ - -static gboolean -lua_io_read_cb (rspamd_ftok_t * in, void *arg) -{ - struct lua_dispatcher_cbdata *cbdata = arg; - gboolean res; - rspamd_io_dispatcher_t **pdispatcher; - - /* callback (dispatcher, data) */ - lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); - pdispatcher = - lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *)); - rspamd_lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1); - *pdispatcher = cbdata->d; - lua_pushlstring (cbdata->L, in->begin, in->len); - - if (lua_pcall (cbdata->L, 2, 1, 0) != 0) { - msg_info ("call to session finalizer failed: %s", - lua_tostring (cbdata->L, -1)); - lua_pop (cbdata->L, 1); - } - - res = lua_toboolean (cbdata->L, -1); - lua_pop (cbdata->L, 1); - - return res; -} - -static gboolean -lua_io_write_cb (void *arg) -{ - struct lua_dispatcher_cbdata *cbdata = arg; - gboolean res = FALSE; - rspamd_io_dispatcher_t **pdispatcher; - - if (cbdata->cbref_write) { - lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); - /* callback (dispatcher) */ - pdispatcher = - lua_newuserdata (cbdata->L, - sizeof (struct rspamd_io_dispatcher_s *)); - rspamd_lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1); - *pdispatcher = cbdata->d; - - - if (lua_pcall (cbdata->L, 1, 1, 0) != 0) { - msg_info ("call to session finalizer failed: %s", - lua_tostring (cbdata->L, -1)); - lua_pop (cbdata->L, 1); - } - - res = lua_toboolean (cbdata->L, -1); - lua_pop (cbdata->L, 1); - } - - return res; -} - -static void -lua_io_err_cb (GError * err, void *arg) -{ - struct lua_dispatcher_cbdata *cbdata = arg; - rspamd_io_dispatcher_t **pdispatcher; - - /* callback (dispatcher, err) */ - lua_rawgeti (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err); - pdispatcher = - lua_newuserdata (cbdata->L, sizeof (struct rspamd_io_dispatcher_s *)); - rspamd_lua_setclass (cbdata->L, "rspamd{io_dispatcher}", -1); - *pdispatcher = cbdata->d; - lua_pushstring (cbdata->L, err->message); - - if (lua_pcall (cbdata->L, 2, 0, 0) != 0) { - msg_info ("call to session finalizer failed: %s", - lua_tostring (cbdata->L, -1)); - lua_pop (cbdata->L, 1); - } - - /* Unref callbacks */ - luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_read); - if (cbdata->cbref_write) { - luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_write); - } - luaL_unref (cbdata->L, LUA_REGISTRYINDEX, cbdata->cbref_err); - - g_error_free (err); - g_slice_free1 (sizeof (struct lua_dispatcher_cbdata), cbdata); -} - -/* - * rspamd_dispatcher.create(base,fd, read_cb, write_cb, err_cb[, timeout]) - */ -static int -lua_io_dispatcher_create (lua_State *L) -{ - struct rspamd_io_dispatcher_s *io_dispatcher, **pdispatcher; - gint fd; - struct lua_dispatcher_cbdata *cbdata; - struct timeval tv = {0, 0}; - double tv_num, tmp; - - if (lua_gettop (L) >= 5 && lua_isfunction (L, 3) && lua_isfunction (L, 5)) { - cbdata = g_slice_alloc0 (sizeof (struct lua_dispatcher_cbdata)); - cbdata->base = lua_check_event_base (L); - if (cbdata->base == NULL) { - /* Create new event base */ - msg_warn ("create new event base as it is not specified"); - cbdata->base = event_init (); - } - cbdata->L = L; - fd = lua_tointeger (L, 2); - lua_pushvalue (L, 3); - cbdata->cbref_read = luaL_ref (L, LUA_REGISTRYINDEX); - if (lua_isfunction (L, 4)) { - /* Push write callback as well */ - lua_pushvalue (L, 4); - cbdata->cbref_write = luaL_ref (L, LUA_REGISTRYINDEX); - } - /* Error callback */ - lua_pushvalue (L, 5); - cbdata->cbref_err = luaL_ref (L, LUA_REGISTRYINDEX); - - if (lua_gettop (L) > 5) { - tv_num = lua_tonumber (L, 6); - tv.tv_sec = trunc (tv_num); - tv.tv_usec = modf (tv_num, &tmp) * 1000.; - io_dispatcher = rspamd_create_dispatcher (cbdata->base, - fd, - BUFFER_LINE, - lua_io_read_cb, - lua_io_write_cb, - lua_io_err_cb, - &tv, - cbdata); - } - else { - io_dispatcher = rspamd_create_dispatcher (cbdata->base, - fd, - BUFFER_LINE, - lua_io_read_cb, - lua_io_write_cb, - lua_io_err_cb, - NULL, - cbdata); - } - - cbdata->d = io_dispatcher; - /* Push result */ - pdispatcher = - lua_newuserdata (L, sizeof (struct rspamd_io_dispatcher_s *)); - rspamd_lua_setclass (L, "rspamd{io_dispatcher}", -1); - *pdispatcher = io_dispatcher; - } - else { - msg_err ("invalid number of arguments to io_dispatcher.create: %d", - lua_gettop (L)); - lua_pushnil (L); - } - - return 1; -} - -static int -lua_io_dispatcher_set_policy (lua_State *L) -{ - struct rspamd_io_dispatcher_s *io_dispatcher = lua_check_io_dispatcher (L); - gint policy, limit = 0; - - if (io_dispatcher) { - policy = lua_tonumber (L, 2); - if (policy > BUFFER_ANY || policy < BUFFER_LINE) { - msg_err ("invalid policy: %d", policy); - } - else { - if (lua_gettop (L) > 2) { - limit = lua_tonumber (L, 3); - } - rspamd_set_dispatcher_policy (io_dispatcher, policy, limit); - return 0; - } - } - else { - lua_pushnil (L); - } - - return 1; -} - -static int -lua_io_dispatcher_write (lua_State *L) -{ - struct rspamd_io_dispatcher_s *io_dispatcher = lua_check_io_dispatcher (L); - gboolean delayed = FALSE, res; - const gchar *data; - size_t len; - - if (io_dispatcher) { - if (lua_gettop (L) < 2) { - msg_err ("invalid number of arguments to io_dispatcher.create: %d", - lua_gettop (L)); - lua_pushboolean (L, FALSE); - } - else { - data = lua_tolstring (L, 2, &len); - if (lua_gettop (L) > 2) { - delayed = lua_toboolean (L, 3); - } - res = rspamd_dispatcher_write (io_dispatcher, - (void *)data, - len, - delayed, - FALSE); - lua_pushboolean (L, res); - } - } - else { - lua_pushnil (L); - } - - return 1; -} - -static int -lua_io_dispatcher_pause (lua_State *L) -{ - struct rspamd_io_dispatcher_s *io_dispatcher = lua_check_io_dispatcher (L); - - if (io_dispatcher) { - rspamd_dispatcher_pause (io_dispatcher); - return 0; - } - else { - lua_pushnil (L); - } - - return 1; -} - -static int -lua_io_dispatcher_restore (lua_State *L) -{ - struct rspamd_io_dispatcher_s *io_dispatcher = lua_check_io_dispatcher (L); - - if (io_dispatcher) { - rspamd_dispatcher_restore (io_dispatcher); - return 0; - } - else { - lua_pushnil (L); - } - - return 1; -} - -static int -lua_io_dispatcher_destroy (lua_State *L) -{ - struct rspamd_io_dispatcher_s *io_dispatcher = lua_check_io_dispatcher (L); - - if (io_dispatcher) { - rspamd_remove_dispatcher (io_dispatcher); - return 0; - } - else { - lua_pushnil (L); - } - - return 1; -} - -static gint -lua_load_dispatcher (lua_State *L) -{ - lua_newtable (L); - luaL_register (L, NULL, io_dispatcherlib_f); - - return 1; -} - - -void -luaopen_io_dispatcher (lua_State * L) -{ - luaL_newmetatable (L, "rspamd{io_dispatcher}"); - lua_pushstring (L, "__index"); - lua_pushvalue (L, -2); - lua_settable (L, -3); - - lua_pushstring (L, "class"); - lua_pushstring (L, "rspamd{io_dispatcher}"); - lua_rawset (L, -3); - - luaL_register (L, NULL, io_dispatcherlib_m); - lua_pop (L, 1); /* remove metatable from stack */ - rspamd_lua_add_preload (L, "rspamd_io_dispatcher", lua_load_dispatcher); - - /* Simple event class */ - rspamd_lua_new_class (L, "rspamd{ev_base}", null_reg); - lua_pop (L, 1); /* remove metatable from stack */ - - /* Set buffer types globals */ - lua_pushnumber (L, BUFFER_LINE); - lua_setglobal (L, "IO_BUFFER_LINE"); - lua_pushnumber (L, BUFFER_CHARACTER); - lua_setglobal (L, "IO_BUFFER_CHARACTER"); - lua_pushnumber (L, BUFFER_ANY); - lua_setglobal (L, "IO_BUFFER_ANY"); -} diff --git a/src/smtp.c b/src/smtp.c deleted file mode 100644 index 07e28a4de..000000000 --- a/src/smtp.c +++ /dev/null @@ -1,1029 +0,0 @@ -/*- - * Copyright 2016 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "config.h" -#include "main.h" -#include "cfg_file.h" -#include "cfg_xml.h" -#include "util.h" -#include "smtp.h" -#include "smtp_proto.h" -#include "smtp_utils.h" -#include "map.h" -#include "message.h" -#include "settings.h" -#include "dns.h" -#include "lua/lua_common.h" - -/* Max line size as it is defined in rfc2822 */ -#define OUTBUFSIZ 1000 - -/* Upstream timeouts */ -#define DEFAULT_UPSTREAM_ERROR_TIME 10 -#define DEFAULT_UPSTREAM_DEAD_TIME 300 -#define DEFAULT_UPSTREAM_MAXERRORS 10 - -#define DEFAULT_REJECT_MESSAGE "450 4.5.0 Spam message rejected" - -static gboolean smtp_write_socket (void *arg); - -/* Init functions */ -gpointer init_smtp (struct rspamd_config *cfg); -void start_smtp (struct rspamd_worker *worker); - -worker_t smtp_worker = { - "smtp", /* Name */ - init_smtp, /* Init function */ - start_smtp, /* Start function */ - TRUE, /* Has socket */ - FALSE, /* Non unique */ - FALSE, /* Non threaded */ - TRUE, /* Killable */ - SOCK_STREAM, /* TCP socket */ - RSPAMD_WORKER_VER /* Version info */ -}; - -static gboolean -call_stage_filters (struct smtp_session *session, enum rspamd_smtp_stage stage) -{ - gboolean res = TRUE; - GList *list = session->ctx->smtp_filters[stage]; - struct smtp_filter *filter; - - while (list) { - filter = list->data; - if (!filter->filter (session, filter->filter_data)) { - res = FALSE; - break; - } - list = g_list_next (list); - } - - return res; -} - -static gboolean -read_smtp_command (struct smtp_session *session, rspamd_fstring_t *line) -{ - struct smtp_command *cmd; - gchar outbuf[BUFSIZ]; - gint r; - - if (!parse_smtp_command (session, line, &cmd)) { - session->error = SMTP_ERROR_BAD_COMMAND; - session->errors++; - return FALSE; - } - - switch (cmd->command) { - case SMTP_COMMAND_HELO: - case SMTP_COMMAND_EHLO: - if (session->state == SMTP_STATE_GREETING || session->state == - SMTP_STATE_HELO) { - if (parse_smtp_helo (session, cmd)) { - session->state = SMTP_STATE_FROM; - } - else { - session->errors++; - } - if (!call_stage_filters (session, SMTP_STAGE_HELO)) { - return FALSE; - } - return TRUE; - } - else { - goto improper_sequence; - } - break; - case SMTP_COMMAND_QUIT: - session->state = SMTP_STATE_QUIT; - break; - case SMTP_COMMAND_NOOP: - break; - case SMTP_COMMAND_MAIL: - if (((session->state == SMTP_STATE_GREETING || session->state == - SMTP_STATE_HELO) && !session->ctx->helo_required) - || session->state == SMTP_STATE_FROM) { - if (parse_smtp_from (session, cmd)) { - session->state = SMTP_STATE_RCPT; - } - else { - session->errors++; - return FALSE; - } - if (!call_stage_filters (session, SMTP_STAGE_MAIL)) { - return FALSE; - } - } - else { - goto improper_sequence; - } - break; - case SMTP_COMMAND_RCPT: - if (session->state == SMTP_STATE_RCPT) { - if (parse_smtp_rcpt (session, cmd)) { - if (!call_stage_filters (session, SMTP_STAGE_RCPT)) { - return FALSE; - } - /* Make upstream connection */ - if (session->upstream == NULL) { - if (!create_smtp_upstream_connection (session)) { - session->error = SMTP_ERROR_UPSTREAM; - session->state = SMTP_STATE_CRITICAL_ERROR; - return FALSE; - } - } - else { - /* Send next rcpt to upstream */ - session->state = SMTP_STATE_WAIT_UPSTREAM; - session->upstream_state = SMTP_STATE_BEFORE_DATA; - rspamd_dispatcher_restore (session->upstream_dispatcher); - r = rspamd_snprintf (outbuf, sizeof (outbuf), "RCPT TO: "); - r += smtp_upstream_write_list (session->rcpt->data, - outbuf + r, - sizeof (outbuf) - r); - session->cur_rcpt = NULL; - return rspamd_dispatcher_write ( - session->upstream_dispatcher, - outbuf, - r, - FALSE, - FALSE); - } - session->state = SMTP_STATE_WAIT_UPSTREAM; - return TRUE; - } - else { - session->errors++; - return FALSE; - } - } - else { - goto improper_sequence; - } - break; - case SMTP_COMMAND_RSET: - session->from = NULL; - if (session->rcpt) { - g_list_free (session->rcpt); - } - if (session->upstream) { - rspamd_session_remove_event (session->s, - smtp_upstream_finalize_connection, - session); - session->upstream = NULL; - } - session->state = SMTP_STATE_GREETING; - break; - case SMTP_COMMAND_DATA: - if (session->state == SMTP_STATE_RCPT) { - if (session->rcpt == NULL) { - session->error = SMTP_ERROR_RECIPIENTS; - session->errors++; - return FALSE; - } - if (!call_stage_filters (session, SMTP_STAGE_DATA)) { - return FALSE; - } - if (session->upstream == NULL) { - session->error = SMTP_ERROR_UPSTREAM; - session->state = SMTP_STATE_CRITICAL_ERROR; - return FALSE; - } - else { - session->upstream_state = SMTP_STATE_DATA; - rspamd_dispatcher_restore (session->upstream_dispatcher); - r = rspamd_snprintf (outbuf, sizeof (outbuf), "DATA" CRLF); - session->state = SMTP_STATE_WAIT_UPSTREAM; - session->error = SMTP_ERROR_DATA_OK; - return rspamd_dispatcher_write (session->upstream_dispatcher, - outbuf, - r, - FALSE, - FALSE); - } - } - else { - goto improper_sequence; - } - case SMTP_COMMAND_VRFY: - case SMTP_COMMAND_EXPN: - case SMTP_COMMAND_HELP: - session->error = SMTP_ERROR_UNIMPLIMENTED; - return FALSE; - } - - session->error = SMTP_ERROR_OK; - return TRUE; - -improper_sequence: - session->errors++; - session->error = SMTP_ERROR_SEQUENCE; - return FALSE; -} - -static gboolean -process_smtp_data (struct smtp_session *session) -{ - struct stat st; - gint r; - GList *cur, *t; - rspamd_fstring_t *f; - gchar *s; - - if (fstat (session->temp_fd, &st) == -1) { - msg_err ("fstat failed: %s", strerror (errno)); - goto err; - } - /* Now mmap temp file if it is small enough */ - session->temp_size = st.st_size; - if (session->ctx->max_size == 0 || st.st_size < - (off_t)session->ctx->max_size) { - session->task = rspamd_task_new (session->worker); - session->task->resolver = session->resolver; - session->task->fin_callback = smtp_write_socket; - session->task->fin_arg = session; - session->task->msg = - rspamd_mempool_alloc (session->pool, sizeof (GString)); - session->task->s = session->s; -#ifdef HAVE_MMAP_NOCORE - if ((session->task->msg->str = - mmap (NULL, st.st_size, PROT_READ, MAP_SHARED | MAP_NOCORE, - session->temp_fd, 0)) == MAP_FAILED) { -#else - if ((session->task->msg->str = - mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, session->temp_fd, - 0)) == MAP_FAILED) { -#endif - msg_err ("mmap failed: %s", strerror (errno)); - goto err; - } - session->task->msg->len = st.st_size; - session->task->helo = session->helo; - /* Save MAIL FROM */ - cur = session->from; - if (cur) { - f = cur->data; - s = rspamd_mempool_alloc (session->pool, f->len + 1); - rspamd_strlcpy (s, f->begin, f->len + 1); - session->task->from = s; - } - /* Save recipients */ - t = session->rcpt; - while (t) { - cur = t->data; - if (cur) { - f = cur->data; - s = rspamd_mempool_alloc (session->pool, f->len + 1); - rspamd_strlcpy (s, f->begin, f->len + 1); - session->task->rcpt = g_list_prepend (session->task->rcpt, s); - } - t = g_list_next (t); - } - - memcpy (&session->task->from_addr, &session->client_addr, - sizeof (struct in_addr)); - session->task->cmd = CMD_CHECK; - - if (rspamd_message_parse (session->task) == -1) { - msg_err ("cannot process message"); - munmap (session->task->msg->str, st.st_size); - goto err; - } - if (session->task->cfg->pre_filters == NULL) { - r = rspamd_process_filters (session->task); - if (r == -1) { - msg_err ("cannot process message"); - munmap (session->task->msg->str, st.st_size); - goto err; - } - } - else { - rspamd_lua_call_pre_filters (session->task); - /* We want fin_task after pre filters are processed */ - session->task->s->wanna_die = TRUE; - session->task->state = WAIT_PRE_FILTER; - rspamd_session_pending (session->task->s); - } - } - else { - msg_info ("not scan message as it is %z bytes and maximum is %z", - st.st_size, - session->ctx->max_size); - session->task = NULL; - return smtp_send_upstream_message (session); - } - - return TRUE; -err: - session->error = SMTP_ERROR_FILE; - session->state = SMTP_STATE_CRITICAL_ERROR; - if (!rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, - TRUE)) { - return FALSE; - } - rspamd_session_destroy (session->s); - return FALSE; -} - -/* - * Callback that is called when there is data to read in buffer - */ -static gboolean -smtp_read_socket (rspamd_fstring_t * in, void *arg) -{ - struct smtp_session *session = arg; - - switch (session->state) { - case SMTP_STATE_RESOLVE_REVERSE: - case SMTP_STATE_RESOLVE_NORMAL: - case SMTP_STATE_DELAY: - session->error = make_smtp_error (session->pool, - 550, - "%s Improper use of SMTP command pipelining", - "5.5.0"); - session->state = SMTP_STATE_ERROR; - break; - case SMTP_STATE_GREETING: - case SMTP_STATE_HELO: - case SMTP_STATE_FROM: - case SMTP_STATE_RCPT: - case SMTP_STATE_DATA: - read_smtp_command (session, in); - if (session->state != SMTP_STATE_WAIT_UPSTREAM) { - if (session->errors > session->ctx->max_errors) { - session->error = SMTP_ERROR_LIMIT; - session->state = SMTP_STATE_CRITICAL_ERROR; - if (!rspamd_dispatcher_write (session->dispatcher, - session->error, 0, FALSE, TRUE)) { - return FALSE; - } - rspamd_session_destroy (session->s); - return FALSE; - } - if (!smtp_write_socket (session)) { - return FALSE; - } - } - break; - case SMTP_STATE_AFTER_DATA: - if (in->len == 0) { - return TRUE; - } - if (in->len == 3 && - memcmp (in->begin, DATA_END_TRAILER, in->len) == 0) { - return process_smtp_data (session); - } - - if (write (session->temp_fd, in->begin, in->len) != (ssize_t)in->len) { - msg_err ("cannot write to temp file: %s", strerror (errno)); - session->error = SMTP_ERROR_FILE; - session->state = SMTP_STATE_CRITICAL_ERROR; - if (!rspamd_dispatcher_write (session->dispatcher, session->error, - 0, FALSE, TRUE)) { - return FALSE; - } - rspamd_session_destroy (session->s); - return FALSE; - } - break; - case SMTP_STATE_WAIT_UPSTREAM: - rspamd_dispatcher_pause (session->dispatcher); - break; - default: - session->error = make_smtp_error (session->pool, - 550, - "%s Internal error", - "5.5.0"); - session->state = SMTP_STATE_ERROR; - break; - } - - if (session->state == SMTP_STATE_QUIT) { - rspamd_session_destroy (session->s); - return FALSE; - } - else if (session->state == SMTP_STATE_WAIT_UPSTREAM) { - rspamd_dispatcher_pause (session->dispatcher); - } - - return TRUE; -} - -/* - * Callback for socket writing - */ -static gboolean -smtp_write_socket (void *arg) -{ - struct smtp_session *session = arg; - - if (session->state == SMTP_STATE_CRITICAL_ERROR) { - if (session->error != NULL) { - if (!rspamd_dispatcher_write (session->dispatcher, session->error, - 0, FALSE, TRUE)) { - return FALSE; - } - } - rspamd_session_destroy (session->s); - return FALSE; - } - else if (session->state == SMTP_STATE_END) { - if (session->task != NULL) { - return write_smtp_reply (session); - } - else { - if (session->error != NULL) { - if (!rspamd_dispatcher_write (session->dispatcher, - session->error, 0, FALSE, TRUE)) { - return FALSE; - } - } - } - } - else { - if (session->error != NULL) { - if (!rspamd_dispatcher_write (session->dispatcher, session->error, - 0, FALSE, TRUE)) { - return FALSE; - } - } - } - - return TRUE; -} - -/* - * Called if something goes wrong - */ -static void -smtp_err_socket (GError * err, void *arg) -{ - struct smtp_session *session = arg; - - msg_info ("abnormally closing connection, error: %s", err->message); - /* Free buffers */ - rspamd_session_destroy (session->s); -} - -/* - * Write greeting to client - */ -static gboolean -write_smtp_greeting (struct smtp_session *session) -{ - if (session->ctx->smtp_banner) { - if (!rspamd_dispatcher_write (session->dispatcher, - session->ctx->smtp_banner, 0, FALSE, TRUE)) { - return FALSE; - } - } - - return TRUE; -} - -/* - * Return from a delay - */ -static void -smtp_delay_handler (gint fd, short what, void *arg) -{ - struct smtp_session *session = arg; - - rspamd_session_remove_event (session->s, - (event_finalizer_t)event_del, - session->delay_timer); - if (session->state == SMTP_STATE_DELAY) { - session->state = SMTP_STATE_GREETING; - write_smtp_greeting (session); - } - else { - session->state = SMTP_STATE_CRITICAL_ERROR; - (void)smtp_write_socket (session); - } -} - -/* - * Make delay for a client - */ -static void -smtp_make_delay (struct smtp_session *session) -{ - struct event *tev; - struct timeval *tv; - gint32 jitter; - - if (session->ctx->smtp_delay != 0 && session->state == SMTP_STATE_DELAY) { - tev = rspamd_mempool_alloc (session->pool, sizeof (struct event)); - tv = rspamd_mempool_alloc (session->pool, sizeof (struct timeval)); - if (session->ctx->delay_jitter != 0) { - jitter = g_random_int_range (0, session->ctx->delay_jitter); - msec_to_tv (session->ctx->smtp_delay + jitter, tv); - } - else { - msec_to_tv (session->ctx->smtp_delay, tv); - } - - evtimer_set (tev, smtp_delay_handler, session); - evtimer_add (tev, tv); - rspamd_session_add_event (session->s, - (event_finalizer_t)event_del, - tev, - g_quark_from_static_string ("smtp proxy")); - session->delay_timer = tev; - } - else if (session->state == SMTP_STATE_DELAY) { - session->state = SMTP_STATE_GREETING; - write_smtp_greeting (session); - } -} - -/* - * Handle DNS replies - */ -static void -smtp_dns_cb (struct rspamd_dns_reply *reply, void *arg) -{ - struct smtp_session *session = arg; - gint res = 0; - union rspamd_reply_element *elt; - GList *cur; - - switch (session->state) { - case SMTP_STATE_RESOLVE_REVERSE: - /* Parse reverse reply and start resolve of this ip */ - if (reply->code != RDNS_RC_NOERROR) { - rspamd_conditional_debug (rspamd_main->logger, - session->client_addr.s_addr, - __FUNCTION__, - "DNS error: %s", - dns_strerror (reply->code)); - - if (reply->code == RDNS_RC_NXDOMAIN) { - session->hostname = rspamd_mempool_strdup (session->pool, - XCLIENT_HOST_UNAVAILABLE); - } - else { - session->hostname = rspamd_mempool_strdup (session->pool, - XCLIENT_HOST_TEMPFAIL); - } - session->state = SMTP_STATE_DELAY; - smtp_make_delay (session); - } - else { - if (reply->elements) { - elt = reply->elements->data; - session->hostname = rspamd_mempool_strdup (session->pool, - elt->ptr.name); - session->state = SMTP_STATE_RESOLVE_NORMAL; - make_dns_request (session->resolver, session->s, session->pool, - smtp_dns_cb, session, RDNS_REQUEST_A, session->hostname); - - } - } - break; - case SMTP_STATE_RESOLVE_NORMAL: - if (reply->code != RDNS_RC_NOERROR) { - rspamd_conditional_debug (rspamd_main->logger, - session->client_addr.s_addr, - __FUNCTION__, - "DNS error: %s", - dns_strerror (reply->code)); - - if (reply->code == RDNS_RC_NXDOMAIN) { - session->hostname = rspamd_mempool_strdup (session->pool, - XCLIENT_HOST_UNAVAILABLE); - } - else { - session->hostname = rspamd_mempool_strdup (session->pool, - XCLIENT_HOST_TEMPFAIL); - } - session->state = SMTP_STATE_DELAY; - smtp_make_delay (session); - } - else { - res = 0; - cur = reply->elements; - while (cur) { - elt = cur->data; - if (memcmp (&session->client_addr, &elt->a.addr[0], - sizeof (struct in_addr)) == 0) { - res = 1; - session->resolved = TRUE; - break; - } - cur = g_list_next (cur); - } - - if (res == 0) { - msg_info ("cannot find address for hostname: %s, ip: %s", - session->hostname, - inet_ntoa (session->client_addr)); - session->hostname = rspamd_mempool_strdup (session->pool, - XCLIENT_HOST_UNAVAILABLE); - } - session->state = SMTP_STATE_DELAY; - smtp_make_delay (session); - } - break; - case SMTP_STATE_ERROR: - session->state = SMTP_STATE_WRITE_ERROR; - smtp_write_socket (session); - break; - default: - /* - * This callback is called on unknown state, usually this indicates - * an error (invalid pipelining) - */ - break; - } -} - -/* - * Accept new connection and construct task - */ -static void -accept_socket (gint fd, short what, void *arg) -{ - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - union sa_union su; - struct smtp_session *session; - struct smtp_worker_ctx *ctx; - - socklen_t addrlen = sizeof (su.ss); - gint nfd; - - if ((nfd = - accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { - msg_warn ("accept failed: %s", strerror (errno)); - return; - } - /* Check for EAGAIN */ - if (nfd == 0) { - return; - } - - ctx = worker->ctx; - session = g_malloc0 (sizeof (struct smtp_session)); - session->pool = rspamd_mempool_new (rspamd_mempool_suggest_size ()); - - if (su.ss.ss_family == AF_UNIX) { - msg_info ("accepted connection from unix socket"); - session->client_addr.s_addr = INADDR_NONE; - } - else if (su.ss.ss_family == AF_INET) { - msg_info ("accepted connection from %s port %d", - inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port)); - memcpy (&session->client_addr, &su.s4.sin_addr, - sizeof (struct in_addr)); - } - - session->sock = nfd; - session->temp_fd = -1; - session->worker = worker; - session->ctx = ctx; - session->cfg = worker->srv->cfg; - session->session_time = time (NULL); - session->resolver = ctx->resolver; - session->ev_base = ctx->ev_base; - worker->srv->stat->connections_count++; - - /* Resolve client's addr */ - /* Set up async session */ - session->s = rspamd_session_create (session->pool, - NULL, - NULL, - free_smtp_session, - session); - session->state = SMTP_STATE_RESOLVE_REVERSE; - if (!make_dns_request (session->resolver, session->s, session->pool, - smtp_dns_cb, session, RDNS_REQUEST_PTR, &session->client_addr)) { - msg_err ("cannot resolve %s", inet_ntoa (session->client_addr)); - g_free (session); - close (nfd); - return; - } - else { - session->dispatcher = rspamd_create_dispatcher (session->ev_base, - nfd, - BUFFER_LINE, - smtp_read_socket, - smtp_write_socket, - smtp_err_socket, - &session->ctx->smtp_timeout, - session); - session->dispatcher->peer_addr = session->client_addr.s_addr; - } -} - -static void -parse_smtp_banner (struct smtp_worker_ctx *ctx, const gchar *line) -{ - gint hostmax, banner_len = sizeof ("220 ") - 1; - gchar *p, *t, *hostbuf = NULL; - gboolean has_crlf = FALSE; - - p = (gchar *)line; - while (*p) { - if (*p == '%') { - p++; - switch (*p) { - case 'n': - /* Assume %n as CRLF */ - banner_len += sizeof (CRLF) - 1 + sizeof ("220 -") - 1; - has_crlf = TRUE; - break; - case 'h': - hostmax = sysconf (_SC_HOST_NAME_MAX) + 1; - hostbuf = alloca (hostmax); - gethostname (hostbuf, hostmax); - hostbuf[hostmax - 1] = '\0'; - banner_len += strlen (hostbuf); - break; - case '%': - banner_len += 1; - break; - default: - banner_len += 2; - break; - } - } - else { - banner_len++; - } - p++; - } - - if (has_crlf) { - banner_len += sizeof (CRLF "220 " CRLF); - } - else { - banner_len += sizeof (CRLF); - } - - ctx->smtp_banner = rspamd_mempool_alloc (ctx->pool, banner_len + 1); - t = ctx->smtp_banner; - p = (gchar *)line; - - if (has_crlf) { - t = g_stpcpy (t, "220-"); - } - else { - t = g_stpcpy (t, "220 "); - } - - while (*p) { - if (*p == '%') { - p++; - switch (*p) { - case 'n': - /* Assume %n as CRLF */ - *t++ = CR; *t++ = LF; - t = g_stpcpy (t, "220-"); - p++; - break; - case 'h': - t = g_stpcpy (t, hostbuf); - p++; - break; - case '%': - *t++ = '%'; - p++; - break; - default: - /* Copy all %<gchar> to dest */ - *t++ = *(p - 1); *t++ = *p; - break; - } - } - else { - *t++ = *p++; - } - } - if (has_crlf) { - t = g_stpcpy (t, CRLF "220 " CRLF); - } - else { - t = g_stpcpy (t, CRLF); - } -} - -static void -make_capabilities (struct smtp_worker_ctx *ctx, const gchar *line) -{ - gchar **strv, *p, *result, *hostbuf; - guint32 num, i, len, hostmax; - - strv = g_strsplit_set (line, ",;", -1); - num = g_strv_length (strv); - - hostmax = sysconf (_SC_HOST_NAME_MAX) + 1; - hostbuf = alloca (hostmax); - gethostname (hostbuf, hostmax); - hostbuf[hostmax - 1] = '\0'; - - len = sizeof ("250-") + strlen (hostbuf) + sizeof (CRLF) - 1; - - for (i = 0; i < num; i++) { - p = strv[i]; - len += sizeof ("250-") + sizeof (CRLF) + strlen (p) - 2; - } - - result = rspamd_mempool_alloc (ctx->pool, len); - ctx->smtp_capabilities = result; - - p = result; - if (num == 0) { - p += rspamd_snprintf (p, len - (p - result), "250 %s" CRLF, hostbuf); - } - else { - p += rspamd_snprintf (p, len - (p - result), "250-%s" CRLF, hostbuf); - for (i = 0; i < num; i++) { - if (i != num - 1) { - p += rspamd_snprintf (p, - len - (p - result), - "250-%s" CRLF, - strv[i]); - } - else { - p += rspamd_snprintf (p, - len - (p - result), - "250 %s" CRLF, - strv[i]); - } - } - } - - g_strfreev (strv); -} - -gpointer -init_smtp (struct rspamd_config *cfg) -{ - struct smtp_worker_ctx *ctx; - GQuark type; - - type = g_quark_try_string ("smtp"); - - ctx = g_malloc0 (sizeof (struct smtp_worker_ctx)); - ctx->pool = rspamd_mempool_new (rspamd_mempool_suggest_size ()); - - /* Set default values */ - ctx->smtp_timeout_raw = 300000; - ctx->smtp_delay = 0; - ctx->smtp_banner = "220 ESMTP Ready." CRLF; - bzero (ctx->smtp_filters, sizeof (GList *) * SMTP_STAGE_MAX); - ctx->max_errors = DEFAULT_MAX_ERRORS; - ctx->reject_message = DEFAULT_REJECT_MESSAGE; - - rspamd_rcl_register_worker_option (cfg, type, "upstreams", - rspamd_rcl_parse_struct_string, ctx, - G_STRUCT_OFFSET (struct smtp_worker_ctx, upstreams_str), 0); - - rspamd_rcl_register_worker_option (cfg, type, "banner", - rspamd_rcl_parse_struct_string, ctx, - G_STRUCT_OFFSET (struct smtp_worker_ctx, smtp_banner_str), 0); - - rspamd_rcl_register_worker_option (cfg, type, "timeout", - rspamd_rcl_parse_struct_time, ctx, - G_STRUCT_OFFSET (struct smtp_worker_ctx, - smtp_timeout_raw), RSPAMD_CL_FLAG_TIME_UINT_32); - - rspamd_rcl_register_worker_option (cfg, type, "delay", - rspamd_rcl_parse_struct_time, ctx, - G_STRUCT_OFFSET (struct smtp_worker_ctx, - smtp_delay), RSPAMD_CL_FLAG_TIME_UINT_32); - - rspamd_rcl_register_worker_option (cfg, type, "jitter", - rspamd_rcl_parse_struct_time, ctx, - G_STRUCT_OFFSET (struct smtp_worker_ctx, - delay_jitter), RSPAMD_CL_FLAG_TIME_UINT_32); - - rspamd_rcl_register_worker_option (cfg, type, "capabilities", - rspamd_rcl_parse_struct_string, ctx, - G_STRUCT_OFFSET (struct smtp_worker_ctx, smtp_capabilities_str), 0); - - rspamd_rcl_register_worker_option (cfg, type, "xclient", - rspamd_rcl_parse_struct_boolean, ctx, - G_STRUCT_OFFSET (struct smtp_worker_ctx, use_xclient), 0); - - rspamd_rcl_register_worker_option (cfg, type, "reject_message", - rspamd_rcl_parse_struct_string, ctx, - G_STRUCT_OFFSET (struct smtp_worker_ctx, reject_message), 0); - - rspamd_rcl_register_worker_option (cfg, type, "max_errors", - rspamd_rcl_parse_struct_integer, ctx, - G_STRUCT_OFFSET (struct smtp_worker_ctx, - max_errors), RSPAMD_CL_FLAG_INT_32); - - rspamd_rcl_register_worker_option (cfg, type, "max_size", - rspamd_rcl_parse_struct_integer, ctx, - G_STRUCT_OFFSET (struct smtp_worker_ctx, - max_size), RSPAMD_CL_FLAG_INT_SIZE); - - return ctx; -} - -/* Make post-init configuration */ -static gboolean -config_smtp_worker (struct rspamd_worker *worker) -{ - struct smtp_worker_ctx *ctx = worker->ctx; - gchar *value; - - /* Init timeval */ - msec_to_tv (ctx->smtp_timeout_raw, &ctx->smtp_timeout); - - /* Init upstreams */ - if ((value = ctx->upstreams_str) != NULL) { - if (!parse_upstreams_line (ctx->pool, ctx->upstreams, value, - &ctx->upstream_num)) { - return FALSE; - } - } - else { - msg_err ("no upstreams defined, don't know what to do"); - return FALSE; - } - /* Create smtp banner */ - if ((value = ctx->smtp_banner_str) != NULL) { - parse_smtp_banner (ctx, value); - } - - /* Parse capabilities */ - if ((value = ctx->smtp_capabilities_str) != NULL) { - make_capabilities (ctx, value); - } - - return TRUE; -} - - -/* - * Start worker process - */ -void -start_smtp (struct rspamd_worker *worker) -{ - struct smtp_worker_ctx *ctx = worker->ctx; - - ctx->ev_base = rspamd_prepare_worker (worker, "smtp_worker", accept_socket); - - /* Set smtp options */ - if ( !config_smtp_worker (worker)) { - msg_err ("cannot configure smtp worker, exiting"); - exit (EXIT_SUCCESS); - } - - /* Maps events */ - rspamd_map_watch (worker->srv->cfg, ctx->ev_base); - - /* DNS resolver */ - ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg); - - /* Set umask */ - umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); - - event_base_loop (ctx->ev_base, 0); - - rspamd_log_close (rspamd_main->logger); - exit (EXIT_SUCCESS); -} - -void -register_smtp_filter (struct smtp_worker_ctx *ctx, - enum rspamd_smtp_stage stage, - smtp_filter_t filter, - gpointer filter_data) -{ - struct smtp_filter *new; - - new = rspamd_mempool_alloc (ctx->pool, sizeof (struct smtp_filter)); - - new->filter = filter; - new->filter_data = filter_data; - - if (stage >= SMTP_STAGE_MAX) { - msg_err ("invalid smtp stage: %d", stage); - } - else { - ctx->smtp_filters[stage] = - g_list_prepend (ctx->smtp_filters[stage], new); - } -} - -/* - * vi:ts=4 - */ diff --git a/src/smtp.h b/src/smtp.h deleted file mode 100644 index 798e87a30..000000000 --- a/src/smtp.h +++ /dev/null @@ -1,126 +0,0 @@ -#ifndef RSPAMD_SMTP_H -#define RSPAMD_SMTP_H - -#include "config.h" -#include "libutil/upstream.h" -#include "libmime/smtp_utils.h" -#include "libmime/smtp_proto.h" -#include "rspamd.h" - -struct rspamd_dns_resolver; - -#define DEFAULT_MAX_ERRORS 10 - -enum rspamd_smtp_stage { - SMTP_STAGE_CONNECT = 0, - SMTP_STAGE_HELO, - SMTP_STAGE_MAIL, - SMTP_STAGE_RCPT, - SMTP_STAGE_DATA, - SMTP_STAGE_MAX -}; - -struct smtp_worker_ctx { - struct upstream_list *upstreams; - gsize upstream_num; - gchar *upstreams_str; - - rspamd_mempool_t *pool; - gchar *smtp_banner; - gchar *smtp_banner_str; - guint32 smtp_delay; - guint32 delay_jitter; - guint32 smtp_timeout_raw; - struct timeval smtp_timeout; - - gboolean use_xclient; - gboolean helo_required; - gchar *smtp_capabilities; - gchar *smtp_capabilities_str; - gchar *reject_message; - gsize max_size; - guint32 max_errors; - gchar *metric; - GList *smtp_filters[SMTP_STAGE_MAX]; - struct rspamd_dns_resolver *resolver; - struct event_base *ev_base; -}; - -enum rspamd_smtp_state { - SMTP_STATE_RESOLVE_REVERSE = 0, - SMTP_STATE_RESOLVE_NORMAL, - SMTP_STATE_DELAY, - SMTP_STATE_GREETING, - SMTP_STATE_HELO, - SMTP_STATE_FROM, - SMTP_STATE_RCPT, - SMTP_STATE_BEFORE_DATA, - SMTP_STATE_DATA, - SMTP_STATE_AFTER_DATA, - SMTP_STATE_END, - SMTP_STATE_QUIT, - SMTP_STATE_WAIT_UPSTREAM, - SMTP_STATE_IN_SENDFILE, - SMTP_STATE_ERROR, - SMTP_STATE_CRITICAL_ERROR, - SMTP_STATE_WRITE_ERROR -}; - -struct smtp_session { - struct smtp_worker_ctx *ctx; - struct rspamd_config *cfg; - rspamd_mempool_t *pool; - - enum rspamd_smtp_state state; - enum rspamd_smtp_state upstream_state; - struct rspamd_worker *worker; - struct rspamd_task *task; - struct in_addr client_addr; - gchar *hostname; - gchar *error; - gchar *temp_name; - gint sock; - gint upstream_sock; - gint temp_fd; - size_t temp_size; - time_t session_time; - - gchar *helo; - GList *from; - GList *rcpt; - GList *cur_rcpt; - - guint errors; - - struct rspamd_async_session *s; - rspamd_io_dispatcher_t *dispatcher; - rspamd_io_dispatcher_t *upstream_dispatcher; - - struct upstream *upstream; - - struct event *delay_timer; - - gboolean resolved; - gboolean esmtp; - struct rspamd_dns_resolver *resolver; - struct event_base *ev_base; -}; - -typedef gboolean (*smtp_filter_t)(struct smtp_session *session, - gpointer filter_data); - -struct smtp_filter { - smtp_filter_t filter; - gpointer filter_data; -}; - -/* - * Register new SMTP filter - * XXX: work is still in progress - */ -void register_smtp_filter (struct smtp_worker_ctx *ctx, - enum rspamd_smtp_stage stage, - smtp_filter_t filter, - gpointer filter_data); - -#endif diff --git a/src/smtp_proxy.c b/src/smtp_proxy.c deleted file mode 100644 index 8eebc2c86..000000000 --- a/src/smtp_proxy.c +++ /dev/null @@ -1,1119 +0,0 @@ -/*- - * Copyright 2016 Vsevolod Stakhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "config.h" -#include "utlist.h" -#include "libserver/proxy.h" -#include "rspamd.h" -#include "smtp.h" -#include "libserver/worker_util.h" -#include "unix-std.h" - -/* - * SMTP proxy is a simple smtp proxy worker for dns resolving and - * load balancing. It uses XCLIENT command and is designed for MTA - * that supports that (postfix and exim). - */ - -/* Upstream timeouts */ -#define DEFAULT_UPSTREAM_ERROR_TIME 10 -#define DEFAULT_UPSTREAM_DEAD_TIME 300 -#define DEFAULT_UPSTREAM_MAXERRORS 10 - -#define DEFAULT_PROXY_BUF_LEN 100 * 1024 - -#define SMTP_MAXERRORS 15 -/* Init functions */ -gpointer init_smtp_proxy (struct rspamd_config *cfg); -void start_smtp_proxy (struct rspamd_worker *worker); - -worker_t smtp_proxy_worker = { - "smtp_proxy", /* Name */ - init_smtp_proxy, /* Init function */ - start_smtp_proxy, /* Start function */ - RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE, - RSPAMD_WORKER_SOCKET_TCP, /* TCP socket */ - RSPAMD_WORKER_VER /* Version info */ -}; - -static guint64 rspamd_smtp_proxy_magic = 0xf3d849189c85f12dULL; - -struct smtp_proxy_ctx { - guint64 magic; - struct upstream_list *upstreams; - gchar *upstreams_str; - - rspamd_mempool_t *pool; - guint32 smtp_delay; - guint32 delay_jitter; - guint32 smtp_timeout_raw; - struct timeval smtp_timeout; - - gboolean use_xclient; - - gboolean instant_reject; - - gsize proxy_buf_len; - - struct rspamd_dns_resolver *resolver; - struct event_base *ev_base; - - GList *rbls; -}; - -enum rspamd_smtp_proxy_state { - SMTP_PROXY_STATE_RESOLVE_REVERSE = 0, - SMTP_PROXY_STATE_RESOLVE_NORMAL, - SMTP_PROXY_STATE_RESOLVE_RBL, - SMTP_PROXY_STATE_DELAY, - SMTP_PROXY_STATE_GREETING, - SMTP_PROXY_STATE_XCLIENT, - SMTP_PROXY_STATE_PROXY, - SMTP_PROXY_STATE_REJECT, - SMTP_PROXY_STATE_REJECT_EMULATE -}; - -struct smtp_proxy_session { - struct smtp_proxy_ctx *ctx; - rspamd_mempool_t *pool; - - enum rspamd_smtp_proxy_state state; - struct rspamd_worker *worker; - struct in_addr client_addr; - gchar *ptr_str; - gchar *hostname; - gchar *error; - gchar *temp_name; - gint sock; - gint upstream_sock; - - struct rspamd_async_session *s; - rspamd_io_dispatcher_t *dispatcher; - - rspamd_proxy_t *proxy; - - struct upstream *upstream; - - struct event *delay_timer; - struct event upstream_ev; - - gboolean resolved; - struct rspamd_dns_resolver *resolver; - struct event_base *ev_base; - - GString *upstream_greeting; - - guint rbl_requests; - gchar *dnsbl_applied; - - gchar *from; - gchar *rcpt; - - guint errors; -}; - -static void -free_smtp_proxy_session (gpointer arg) -{ - struct smtp_proxy_session *session = arg; - static const char fatal_smtp_error[] = "521 5.2.1 Internal error" CRLF; - - if (session) { - if (session->dispatcher) { - rspamd_remove_dispatcher (session->dispatcher); - } - - if (session->upstream_greeting) { - g_string_free (session->upstream_greeting, TRUE); - } - - if (session->state != SMTP_PROXY_STATE_PROXY && session->state != - SMTP_PROXY_STATE_REJECT && - session->state != SMTP_PROXY_STATE_REJECT_EMULATE) { - /* Send 521 fatal error */ - if (write (session->sock, fatal_smtp_error, - sizeof (fatal_smtp_error)) == -1) { - msg_err ("write error to client failed: %s", strerror (errno)); - } - } - else if ((session->state == SMTP_PROXY_STATE_REJECT || session->state == - SMTP_PROXY_STATE_REJECT_EMULATE) && - session->from && session->rcpt && session->dnsbl_applied) { - msg_info ("reject by %s mail from <%s> to <%s>, ip: %s", - session->dnsbl_applied, - session->from, - session->rcpt, - inet_ntoa (session->client_addr)); - } - - close (session->sock); - - if (session->proxy) { - rspamd_proxy_close (session->proxy); - } - if (session->ptr_str) { - free (session->ptr_str); - } - if (session->upstream_sock != -1) { - event_del (&session->upstream_ev); - close (session->upstream_sock); - } - rspamd_mempool_delete (session->pool); - g_slice_free1 (sizeof (struct smtp_proxy_session), session); - } -} - -static void -smtp_proxy_err_proxy (GError * err, void *arg) -{ - struct smtp_proxy_session *session = arg; - - if (err) { - g_error_free (err); - msg_info ("abnormally closing connection, error: %s", err->message); - } - /* Free buffers */ - session->state = SMTP_PROXY_STATE_REJECT; - rspamd_session_destroy (session->s); -} - -/** - * Check whether SMTP greeting is valid - * @param s - * @return - */ -static gint -check_valid_smtp_greeting (GString *s) -{ - gchar *p; - - p = s->str + s->len - 1; - if (s->len < 6 || (*p != '\n' || *(p - 1) != '\r')) { - return 1; - } - p -= 5; - - while (p >= s->str) { - /* It is fast to use memcmp here as we compare only 4 bytes */ - if (memcmp (p, "220 ", 4) == 0) { - /* Check position */ - if (p == s->str || *(p - 1) == '\n') { - return 1; - } - return 0; - } - else if ((*p == '5' || *p == '4' || *p == '3') && - g_ascii_isdigit (p[1]) && g_ascii_isdigit (p[2]) && p[3] == ' ') { - return -1; - } - p--; - } - - return 1; -} - -/* - * Handle upstream greeting - */ - -static void -smtp_proxy_greeting_handler (gint fd, short what, void *arg) -{ - struct smtp_proxy_session *session = arg; - gint r; - gchar read_buf[BUFSIZ]; - - if (what == EV_READ) { - if (session->state == SMTP_PROXY_STATE_GREETING) { - /* Fill greeting buffer with new portion of data */ - r = read (fd, read_buf, sizeof (read_buf) - 1); - if (r > 0) { - g_string_append_len (session->upstream_greeting, read_buf, r); - /* Now search line with 220 */ - r = check_valid_smtp_greeting (session->upstream_greeting); - if (r == 1) { - /* Send xclient */ - if (session->ctx->use_xclient) { - r = rspamd_snprintf (read_buf, - sizeof (read_buf), - "XCLIENT NAME=%s ADDR=%s" CRLF, - session->hostname, - inet_ntoa (session->client_addr)); - r = write (session->upstream_sock, read_buf, r); - - if (r < 0 && errno == EAGAIN) { - /* Add write event */ - event_del (&session->upstream_ev); - event_set (&session->upstream_ev, - session->upstream_sock, - EV_WRITE, - smtp_proxy_greeting_handler, - session); - event_base_set (session->ev_base, - &session->upstream_ev); - event_add (&session->upstream_ev, NULL); - } - else if (r > 0) { - session->upstream_greeting->len = 0; - session->state = SMTP_PROXY_STATE_XCLIENT; - } - else { - msg_info ("connection with %s got write error: %s", - inet_ntoa (session->client_addr), - strerror (errno)); - rspamd_session_destroy (session->s); - } - } - else { - event_del (&session->upstream_ev); - /* Start direct proxy */ - r = write (session->sock, - session->upstream_greeting->str, - session->upstream_greeting->len); - /* TODO: handle client's error here */ - if (r > 0) { - session->proxy = rspamd_create_proxy (session->sock, - session->upstream_sock, - session->pool, - session->ev_base, - session->ctx->proxy_buf_len, - &session->ctx->smtp_timeout, - smtp_proxy_err_proxy, - session); - session->state = SMTP_PROXY_STATE_PROXY; - } - else { - msg_info ("connection with %s got write error: %s", - inet_ntoa (session->client_addr), - strerror (errno)); - rspamd_session_destroy (session->s); - } - } - } - else if (r == -1) { - /* Proxy sent 500 error */ - msg_info ("connection with %s got smtp error for greeting", - rspamd_upstream_name (session->upstream)); - rspamd_session_destroy (session->s); - } - } - else { - msg_info ("connection with %s got read error: %s", - rspamd_upstream_name (session->upstream), - strerror (errno)); - rspamd_session_destroy (session->s); - } - } - else if (session->state == SMTP_PROXY_STATE_XCLIENT) { - /* Fill greeting buffer with new portion of data */ - r = read (fd, read_buf, sizeof (read_buf) - 1); - if (r > 0) { - g_string_append_len (session->upstream_greeting, read_buf, r); - /* Now search line with 220 */ - r = check_valid_smtp_greeting (session->upstream_greeting); - if (r == 1) { - event_del (&session->upstream_ev); - /* Start direct proxy */ - r = write (session->sock, - session->upstream_greeting->str, - session->upstream_greeting->len); - /* TODO: handle client's error here */ - if (r > 0) { - session->proxy = rspamd_create_proxy (session->sock, - session->upstream_sock, - session->pool, - session->ev_base, - session->ctx->proxy_buf_len, - &session->ctx->smtp_timeout, - smtp_proxy_err_proxy, - session); - session->state = SMTP_PROXY_STATE_PROXY; - } - else { - msg_info ("connection with %s got write error: %s", - inet_ntoa (session->client_addr), - strerror (errno)); - rspamd_session_destroy (session->s); - } - } - else if (r == -1) { - /* Proxy sent 500 error */ - msg_info ("connection with %s got smtp error for xclient", - rspamd_upstream_name (session->upstream)); - rspamd_session_destroy (session->s); - } - } - } - else { - msg_info ("connection with %s got read event at improper state: %d", - rspamd_upstream_name (session->upstream), - session->state); - rspamd_session_destroy (session->s); - } - } - else if (what == EV_WRITE) { - if (session->state == SMTP_PROXY_STATE_GREETING) { - /* Send xclient again */ - r = rspamd_snprintf (read_buf, - sizeof (read_buf), - "XCLIENT NAME=%s ADDR=%s" CRLF, - session->hostname, - inet_ntoa (session->client_addr)); - r = write (session->upstream_sock, read_buf, r); - - if (r < 0 && errno == EAGAIN) { - /* Add write event */ - event_del (&session->upstream_ev); - event_set (&session->upstream_ev, session->upstream_sock, - EV_WRITE, smtp_proxy_greeting_handler, session); - event_base_set (session->ev_base, &session->upstream_ev); - event_add (&session->upstream_ev, NULL); - } - else if (r > 0) { - session->upstream_greeting->len = 0; - session->state = SMTP_PROXY_STATE_XCLIENT; - event_del (&session->upstream_ev); - event_set (&session->upstream_ev, session->upstream_sock, - EV_READ | EV_PERSIST, smtp_proxy_greeting_handler, session); - event_base_set (session->ev_base, &session->upstream_ev); - event_add (&session->upstream_ev, NULL); - } - else { - msg_info ("connection with %s got write error: %s", - rspamd_upstream_name (session->upstream), - strerror (errno)); - rspamd_session_destroy (session->s); - } - } - else { - msg_info ( - "connection with %s got write event at improper state: %d", - rspamd_upstream_name (session->upstream), - session->state); - rspamd_session_destroy (session->s); - } - } - else { - /* Timeout */ - msg_info ("connection with %s timed out", - rspamd_upstream_name (session->upstream)); - rspamd_session_destroy (session->s); - } -} - -static gboolean -create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session) -{ - struct upstream *selected; - - /* Try to select upstream */ - selected = rspamd_upstream_get (session->ctx->upstreams, - RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); - if (selected == NULL) { - msg_err ("no upstreams suitable found"); - return FALSE; - } - - session->upstream = selected; - - /* Now try to create socket */ - session->upstream_sock = rspamd_inet_address_connect ( - rspamd_upstream_addr (selected), SOCK_STREAM, TRUE); - if (session->upstream_sock == -1) { - msg_err ("cannot make a connection to %s", - rspamd_upstream_name (session->upstream)); - rspamd_upstream_fail (selected); - return FALSE; - } - /* Create a proxy for upstream connection */ - rspamd_dispatcher_pause (session->dispatcher); - /* First of all get upstream's greeting */ - session->state = SMTP_PROXY_STATE_GREETING; - - event_set (&session->upstream_ev, - session->upstream_sock, - EV_READ | EV_PERSIST, - smtp_proxy_greeting_handler, - session); - event_base_set (session->ev_base, &session->upstream_ev); - event_add (&session->upstream_ev, &session->ctx->smtp_timeout); - - session->upstream_greeting = g_string_sized_new (BUFSIZ); - - return TRUE; -} - -static void -smtp_dnsbl_cb (struct rdns_reply *reply, void *arg) -{ - struct smtp_proxy_session *session = arg; - const gchar *p; - gint dots = 0; - const struct rdns_request_name *req_name; - - session->rbl_requests--; - - req_name = rdns_request_get_name (reply->request, NULL); - - msg_debug ("got reply for %s: %s", req_name[0].name, - rdns_strerror (reply->code)); - - if (session->state != SMTP_PROXY_STATE_REJECT) { - - if (reply->code == RDNS_RC_NOERROR) { - /* This means that address is in dnsbl */ - p = req_name[0].name; - while (*p) { - if (*p == '.') { - dots++; - } - if (dots == 4) { - session->dnsbl_applied = (gchar *)p + 1; - break; - } - p++; - } - session->state = SMTP_PROXY_STATE_REJECT; - } - } - - if (session->rbl_requests == 0) { - if (session->state != SMTP_PROXY_STATE_REJECT) { - /* Make proxy */ - if (!create_smtp_proxy_upstream_connection (session)) { - rspamd_dispatcher_restore (session->dispatcher); - } - } - else { - if (session->ctx->instant_reject) { - msg_info ("reject %s is denied by dnsbl: %s", - inet_ntoa (session->client_addr), session->dnsbl_applied); - if (!rspamd_dispatcher_write (session->dispatcher, - make_smtp_error (session->pool, 521, - "%s Client denied by %s", "5.2.1", session->dnsbl_applied), - 0, FALSE, TRUE)) { - msg_err ("cannot write smtp error"); - } - } - else { - /* Emulate fake smtp session */ - rspamd_set_dispatcher_policy (session->dispatcher, - BUFFER_LINE, - 0); - if (!rspamd_dispatcher_write (session->dispatcher, - make_smtp_error (session->pool, 220, "smtp ready"), - 0, FALSE, TRUE)) { - msg_err ("cannot write smtp reply"); - } - } - rspamd_dispatcher_restore (session->dispatcher); - } - } -} - -/* - * Create requests to all rbls - */ -static void -make_rbl_requests (struct smtp_proxy_session *session) -{ - GList *cur; - gchar *p, *dst; - guint len; - - cur = session->ctx->rbls; - while (cur) { - len = INET_ADDRSTRLEN + strlen (cur->data) + 1; - dst = rspamd_mempool_alloc (session->pool, len); - /* Print ipv4 addr */ - p = (gchar *)&session->client_addr.s_addr; - rspamd_snprintf (dst, len, "%ud.%ud.%ud.%ud.%s", (guint)p[3], - (guint)p[2], (guint)p[1], (guint)p[0], cur->data); - if (make_dns_request (session->resolver, session->s, session->pool, - smtp_dnsbl_cb, session, RDNS_REQUEST_A, dst)) { - session->rbl_requests++; - msg_debug ("send request to %s", dst); - } - cur = g_list_next (cur); - } - - if (session->rbl_requests == 0) { - /* Create proxy */ - if (!create_smtp_proxy_upstream_connection (session)) { - rspamd_dispatcher_restore (session->dispatcher); - } - } -} - -/* Resolving and delay handlers */ -/* - * Return from a delay - */ -static void -smtp_delay_handler (gint fd, short what, void *arg) -{ - struct smtp_proxy_session *session = arg; - - rspamd_session_remove_event (session->s, (event_finalizer_t) event_del, - session->delay_timer); - if (session->state == SMTP_PROXY_STATE_DELAY) { - /* TODO: Create upstream connection here */ - if (session->ctx->rbls) { - make_rbl_requests (session); - } - else { - if (!create_smtp_proxy_upstream_connection (session)) { - rspamd_dispatcher_restore (session->dispatcher); - } - } - } - else { - /* TODO: Write error here */ - session->state = SMTP_PROXY_STATE_REJECT; - if (!rspamd_dispatcher_write (session->dispatcher, - make_smtp_error (session->pool, 521, - "%s Improper use of SMTP command pipelining", "5.2.1"), - 0, FALSE, TRUE)) { - msg_err ("cannot write smtp error"); - } - rspamd_dispatcher_restore (session->dispatcher); - } -} - -/* - * Make delay for a client - */ -static void -smtp_make_delay (struct smtp_proxy_session *session) -{ - struct event *tev; - struct timeval *tv; - gint32 jitter; - - if (session->ctx->smtp_delay != 0 && session->state == - SMTP_PROXY_STATE_DELAY) { - tev = rspamd_mempool_alloc (session->pool, sizeof(struct event)); - tv = rspamd_mempool_alloc (session->pool, sizeof(struct timeval)); - if (session->ctx->delay_jitter != 0) { - jitter = g_random_int_range (0, session->ctx->delay_jitter); - msec_to_tv (session->ctx->smtp_delay + jitter, tv); - } - else { - msec_to_tv (session->ctx->smtp_delay, tv); - } - - evtimer_set (tev, smtp_delay_handler, session); - evtimer_add (tev, tv); - rspamd_session_add_event (session->s, (event_finalizer_t) event_del, tev, - g_quark_from_static_string ("smtp proxy")); - session->delay_timer = tev; - } - else if (session->state == SMTP_PROXY_STATE_DELAY) { - /* TODO: Create upstream connection here */ - if (session->ctx->rbls) { - make_rbl_requests (session); - } - else { - if (!create_smtp_proxy_upstream_connection (session)) { - rspamd_dispatcher_restore (session->dispatcher); - } - } - } -} - -/* - * Handle DNS replies - */ -static void -smtp_dns_cb (struct rdns_reply *reply, void *arg) -{ - struct smtp_proxy_session *session = arg; - gint res = 0; - struct rdns_reply_entry *elt; - - switch (session->state) - { - case SMTP_PROXY_STATE_RESOLVE_REVERSE: - /* Parse reverse reply and start resolve of this ip */ - if (reply->code != RDNS_RC_NOERROR) { - rspamd_conditional_debug (session->worker->srv->logger, - NULL, "rdns", NULL, G_STRFUNC, "DNS error: %s", - rdns_strerror (reply->code)); - - if (reply->code == RDNS_RC_NXDOMAIN) { - session->hostname = rspamd_mempool_strdup (session->pool, - XCLIENT_HOST_UNAVAILABLE); - } - else { - session->hostname = rspamd_mempool_strdup (session->pool, - XCLIENT_HOST_TEMPFAIL); - } - session->state = SMTP_PROXY_STATE_DELAY; - smtp_make_delay (session); - } - else { - if (reply->entries) { - elt = reply->entries; - session->hostname = rspamd_mempool_strdup (session->pool, - elt->content.ptr.name); - session->state = SMTP_PROXY_STATE_RESOLVE_NORMAL; - make_dns_request (session->resolver, session->s, session->pool, - smtp_dns_cb, session, RDNS_REQUEST_A, session->hostname); - - } - } - break; - case SMTP_PROXY_STATE_RESOLVE_NORMAL: - if (reply->code != RDNS_RC_NOERROR) { - rspamd_conditional_debug (session->worker->srv->logger, - NULL, "rdns", NULL, G_STRFUNC, "DNS error: %s", - rdns_strerror (reply->code)); - - if (reply->code == RDNS_RC_NXDOMAIN) { - session->hostname = rspamd_mempool_strdup (session->pool, - XCLIENT_HOST_UNAVAILABLE); - } - else { - session->hostname = rspamd_mempool_strdup (session->pool, - XCLIENT_HOST_TEMPFAIL); - } - session->state = SMTP_PROXY_STATE_DELAY; - smtp_make_delay (session); - } - else { - res = 0; - LL_FOREACH (reply->entries, elt) - { - if (memcmp (&session->client_addr, &elt->content.a.addr, - sizeof(struct in_addr)) == 0) { - res = 1; - session->resolved = TRUE; - break; - } - } - - if (res == 0) { - msg_info ( - "cannot find address for hostname: %s, ip: %s", - session->hostname, - inet_ntoa (session->client_addr)); - session->hostname = rspamd_mempool_strdup (session->pool, - XCLIENT_HOST_UNAVAILABLE); - } - session->state = SMTP_PROXY_STATE_DELAY; - smtp_make_delay (session); - } - break; - default: - /* TODO: write something about pipelining */ - break; - } -} - -static void -proxy_parse_smtp_input (rspamd_ftok_t *line, struct smtp_proxy_session *session) -{ - const gchar *p, *c, *end; - gsize len; - - p = line->begin; - end = line->begin + line->len; - if (line->len >= sizeof("rcpt to: ") - 1 && - (*p == 'r' || *p == 'R') && session->rcpt == NULL) { - if (g_ascii_strncasecmp (p, "rcpt to: ", - sizeof ("rcpt to: ") - 1) == 0) { - p += sizeof ("rcpt to: ") - 1; - /* Skip spaces */ - while ((g_ascii_isspace (*p) || *p == '<') && p < end) { - p++; - } - c = p; - while (!(g_ascii_isspace (*p) || *p == '>') && p < end) { - p++; - } - len = p - c; - session->rcpt = rspamd_mempool_alloc (session->pool, len + 1); - rspamd_strlcpy (session->rcpt, c, len + 1); - } - } - else if (line->len >= sizeof("mail from: ") - 1 && - (*p == 'm' || *p == 'M') && session->from == NULL) { - if (g_ascii_strncasecmp (p, "mail from: ", sizeof ("mail from: ") - - 1) == 0) { - p += sizeof ("mail from: ") - 1; - /* Skip spaces */ - while ((g_ascii_isspace (*p) || *p == '<') && p < end) { - p++; - } - c = p; - while (!(g_ascii_isspace (*p) || *p == '>') && p < end) { - p++; - } - len = p - c; - session->from = rspamd_mempool_alloc (session->pool, len + 1); - rspamd_strlcpy (session->from, c, len + 1); - } - } - else if (line->len >= sizeof ("quit") - 1 && (*p == 'q' || *p == 'Q')) { - if (g_ascii_strncasecmp (p, "quit", sizeof ("quit") - 1) == 0) { - session->state = SMTP_PROXY_STATE_REJECT; - } - } -} - -/* - * Callback that is called when there is data to read in buffer - */ -static gboolean -smtp_proxy_read_socket (rspamd_ftok_t * in, void *arg) -{ - struct smtp_proxy_session *session = arg; - const gchar *p; - - if (session->state != SMTP_PROXY_STATE_REJECT_EMULATE) { - /* This can be called only if client is using invalid pipelining */ - session->state = SMTP_PROXY_STATE_REJECT; - if (!rspamd_dispatcher_write (session->dispatcher, - make_smtp_error (session->pool, 521, - "%s Improper use of SMTP command pipelining", "5.2.1"), - 0, FALSE, TRUE)) { - msg_err ("cannot write smtp error"); - } - rspamd_session_destroy (session->s); - } - else { - /* Try to extract data */ - p = in->begin; - if (in->len >= sizeof ("helo") - 1 && - (*p == 'h' || *p == 'H' || *p == 'e' || *p == 'E')) { - return rspamd_dispatcher_write (session->dispatcher, - "220 smtp ready" CRLF, - 0, FALSE, TRUE); - } - else if (in->len > 0) { - proxy_parse_smtp_input (in, session); - } - if (session->state == SMTP_PROXY_STATE_REJECT) { - /* Received QUIT command */ - if (!rspamd_dispatcher_write (session->dispatcher, - "221 2.0.0 Bye" CRLF, - 0, FALSE, TRUE)) { - msg_err ("cannot write smtp error"); - } - rspamd_session_destroy (session->s); - return FALSE; - } - if (session->rcpt != NULL) { - session->errors++; - if (session->errors > SMTP_MAXERRORS) { - if (!rspamd_dispatcher_write (session->dispatcher, - "521 5.2.1 Maximum errors reached" CRLF, - 0, FALSE, TRUE)) { - msg_err ("cannot write smtp error"); - } - rspamd_session_destroy (session->s); - return FALSE; - } - return rspamd_dispatcher_write (session->dispatcher, - make_smtp_error (session->pool, - 521, - "%s Client denied by %s", - "5.2.1", - session->dnsbl_applied), - 0, FALSE, TRUE); - } - else { - return rspamd_dispatcher_write (session->dispatcher, - "250 smtp ready" CRLF, - 0, FALSE, TRUE); - } - } - - return FALSE; -} - -/* - * Actually called only if something goes wrong - */ -static gboolean -smtp_proxy_write_socket (void *arg) -{ - struct smtp_proxy_session *session = arg; - - if (session->ctx->instant_reject) { - rspamd_session_destroy (session->s); - return FALSE; - } - else { - session->state = SMTP_PROXY_STATE_REJECT_EMULATE; - } - - return TRUE; -} - -/* - * Called if something goes wrong - */ -static void -smtp_proxy_err_socket (GError * err, void *arg) -{ - struct smtp_proxy_session *session = arg; - - if (err) { - if (err->code == ETIMEDOUT) { - /* Write smtp error */ - if (!rspamd_dispatcher_write (session->dispatcher, - "421 4.4.2 Error: timeout exceeded" CRLF, - 0, FALSE, TRUE)) { - msg_err ("cannot write smtp error"); - } - } - msg_info ("abnormally closing connection, error: %s", err->message); - g_error_free (err); - } - /* Free buffers */ - rspamd_session_destroy (session->s); -} - -/* - * Accept new connection and construct session - */ -static void -accept_socket (gint fd, short what, void *arg) -{ - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - struct smtp_proxy_session *session; - struct smtp_proxy_ctx *ctx; - rspamd_inet_addr_t *addr; - gint nfd; - - ctx = worker->ctx; - - if ((nfd = - rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) { - msg_warn ("accept failed: %s", strerror (errno)); - return; - } - /* Check for EAGAIN */ - if (nfd == 0) { - return; - } - - msg_info ("accepted connection from %s port %d", - rspamd_inet_address_to_string (addr), - rspamd_inet_address_get_port (addr)); - - ctx = worker->ctx; - session = g_slice_alloc0 (sizeof (struct smtp_proxy_session)); - session->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), NULL); - - - session->sock = nfd; - session->worker = worker; - session->ctx = ctx; - session->resolver = ctx->resolver; - session->ev_base = ctx->ev_base; - session->upstream_sock = -1; - session->ptr_str = rdns_generate_ptr_from_str (rspamd_inet_address_to_string ( - addr)); - worker->srv->stat->connections_count++; - - /* Resolve client's addr */ - /* Set up async session */ - session->s = rspamd_session_create (session->pool, - NULL, - NULL, - free_smtp_proxy_session, - session); - rspamd_inet_address_destroy (addr); - session->state = SMTP_PROXY_STATE_RESOLVE_REVERSE; - if (!make_dns_request (session->resolver, session->s, session->pool, - smtp_dns_cb, session, RDNS_REQUEST_PTR, session->ptr_str)) { - msg_err ("cannot resolve %s", inet_ntoa (session->client_addr)); - g_slice_free1 (sizeof (struct smtp_proxy_session), session); - close (nfd); - return; - } - else { - session->dispatcher = rspamd_create_dispatcher (session->ev_base, - nfd, - BUFFER_ANY, - smtp_proxy_read_socket, - smtp_proxy_write_socket, - smtp_proxy_err_socket, - &session->ctx->smtp_timeout, - session); - session->dispatcher->peer_addr = session->client_addr.s_addr; - } -} - -gpointer -init_smtp_proxy (struct rspamd_config *cfg) -{ - struct smtp_proxy_ctx *ctx; - GQuark type; - - type = g_quark_try_string ("smtp_proxy"); - - ctx = g_malloc0 (sizeof (struct smtp_worker_ctx)); - ctx->magic = rspamd_smtp_proxy_magic; - ctx->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), NULL); - - /* Set default values */ - ctx->smtp_timeout_raw = 300000; - ctx->smtp_delay = 0; - ctx->instant_reject = TRUE; - - rspamd_rcl_register_worker_option (cfg, - type, - "upstreams", - rspamd_rcl_parse_struct_string, - ctx, - G_STRUCT_OFFSET (struct smtp_proxy_ctx, upstreams_str), - 0, - "List of upstream SMTP servers"); - - rspamd_rcl_register_worker_option (cfg, - type, - "timeout", - rspamd_rcl_parse_struct_time, - ctx, - G_STRUCT_OFFSET (struct smtp_proxy_ctx, - smtp_timeout_raw), - RSPAMD_CL_FLAG_TIME_UINT_32, - "IO timeout"); - - rspamd_rcl_register_worker_option (cfg, - type, - "delay", - rspamd_rcl_parse_struct_time, - ctx, - G_STRUCT_OFFSET (struct smtp_proxy_ctx, - smtp_delay), - RSPAMD_CL_FLAG_TIME_UINT_32, - "SMTP greeting delay"); - - rspamd_rcl_register_worker_option (cfg, - type, - "jitter", - rspamd_rcl_parse_struct_time, - ctx, - G_STRUCT_OFFSET (struct smtp_proxy_ctx, - delay_jitter), - RSPAMD_CL_FLAG_TIME_UINT_32, - "Jitter atribute for SMTP delay"); - - rspamd_rcl_register_worker_option (cfg, - type, - "xclient", - rspamd_rcl_parse_struct_boolean, - ctx, - G_STRUCT_OFFSET (struct smtp_proxy_ctx, use_xclient), - 0, - "Use XCLIENT protocol for upstream communication"); - - rspamd_rcl_register_worker_option (cfg, - type, - "instant_reject", - rspamd_rcl_parse_struct_boolean, - ctx, - G_STRUCT_OFFSET (struct smtp_proxy_ctx, instant_reject), - 0, - "Reject invalid pipelining"); - - rspamd_rcl_register_worker_option (cfg, - type, - "proxy_buffer", - rspamd_rcl_parse_struct_integer, - ctx, - G_STRUCT_OFFSET (struct smtp_proxy_ctx, - proxy_buf_len), - RSPAMD_CL_FLAG_INT_32, - "Adjust SMTP buffer size"); - - rspamd_rcl_register_worker_option (cfg, - type, - "dnsbl", - rspamd_rcl_parse_struct_string_list, - ctx, - G_STRUCT_OFFSET (struct smtp_proxy_ctx, rbls), - 0, - "Use the following DNS lists as IP blacklists"); - - return ctx; -} - -/* Make post-init configuration */ -static gboolean -config_smtp_proxy_worker (struct rspamd_worker *worker) -{ - struct smtp_proxy_ctx *ctx = worker->ctx; - gchar *value; - - /* Init timeval */ - msec_to_tv (ctx->smtp_timeout_raw, &ctx->smtp_timeout); - - /* Init upstreams */ - if ((value = ctx->upstreams_str) != NULL) { - if (!rspamd_upstreams_parse_line (ctx->upstreams, value, 25, NULL)) { - msg_err ("cannot parse any valid upstream"); - return FALSE; - } - } - else { - msg_err ("no upstreams defined, don't know what to do"); - return FALSE; - } - - if (ctx->proxy_buf_len == 0) { - ctx->proxy_buf_len = DEFAULT_PROXY_BUF_LEN; - } - - return TRUE; -} - -/* - * Start worker process - */ -void -start_smtp_proxy (struct rspamd_worker *worker) -{ - struct smtp_proxy_ctx *ctx = worker->ctx; - - ctx->ev_base = rspamd_prepare_worker (worker, "smtp_proxy", accept_socket); - - /* Set smtp options */ - if ( !config_smtp_proxy_worker (worker)) { - msg_err ("cannot configure smtp worker, exiting"); - exit (EXIT_SUCCESS); - } - - /* DNS resolver */ - ctx->resolver = dns_resolver_init (worker->srv->logger, - ctx->ev_base, - worker->srv->cfg); - - rspamd_upstreams_library_config (worker->srv->cfg, worker->srv->cfg->ups_ctx, - ctx->ev_base, ctx->resolver->r); - /* Set umask */ - umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); - - event_base_loop (ctx->ev_base, 0); - rspamd_worker_block_signals (); - - rspamd_log_close (worker->srv->logger); - exit (EXIT_SUCCESS); -} - |