From 67e60b72919e9c33b25da7088a92c03d591ecf3f Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 25 Oct 2010 21:39:38 +0400 Subject: [PATCH] Fix action settings (reported by Anton Nekhoroshih). Split smtp code to 'utils', 'protocol' and 'worker' functions. * Add support of actions for smtp worker. --- CMakeLists.txt | 1 + src/settings.c | 10 +-- src/smtp.c | 153 +------------------------------- src/smtp_proto.c | 1 + src/smtp_utils.c | 223 +++++++++++++++++++++++++++++++++++++++++++++++ src/smtp_utils.h | 38 ++++++++ 6 files changed, 270 insertions(+), 156 deletions(-) create mode 100644 src/smtp_utils.c create mode 100644 src/smtp_utils.h diff --git a/CMakeLists.txt b/CMakeLists.txt index b192f51b2..a0b94609e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -518,6 +518,7 @@ SET(RSPAMDSRC src/modules.c src/spf.c src/smtp.c src/smtp_proto.c + src/smtp_utils.c src/statfile.c src/statfile_sync.c src/symbols_cache.c diff --git a/src/settings.c b/src/settings.c index cbf73ddd4..65dea078e 100644 --- a/src/settings.c +++ b/src/settings.c @@ -228,7 +228,7 @@ json_fin_cb (memory_pool_t * pool, struct map_cb_data *data) while (act_it) { act_value = json_object_iter_value (act_it); - if (it_val && json_is_number (act_value)) { + if (act_value && json_is_number (act_value)) { if (check_action_str (json_object_iter_key (act_it), &j)) { new_act = g_malloc (sizeof (struct metric_action)); new_act->action = j; @@ -239,15 +239,15 @@ json_fin_cb (memory_pool_t * pool, struct map_cb_data *data) /* Special cases */ if (g_ascii_strcasecmp (json_object_iter_key (act_it), "spam_score") == 0) { score = g_malloc (sizeof (double)); - *score = json_number_value (act_it); + *score = json_number_value (act_value); g_hash_table_insert (cur_settings->metric_scores, - g_strdup (json_object_iter_key (act_it)), score); + g_strdup (json_object_iter_key (json_it)), score); } else if (g_ascii_strcasecmp (json_object_iter_key (act_it), "reject_score") == 0) { score = g_malloc (sizeof (double)); - *score = json_number_value (act_it); + *score = json_number_value (act_value); g_hash_table_insert (cur_settings->reject_scores, - g_strdup (json_object_iter_key (act_it)), score); + g_strdup (json_object_iter_key (json_it)), score); } } } diff --git a/src/smtp.c b/src/smtp.c index fd679bea7..1bda83423 100644 --- a/src/smtp.c +++ b/src/smtp.c @@ -28,6 +28,7 @@ #include "util.h" #include "smtp.h" #include "smtp_proto.h" +#include "smtp_utils.h" #include "map.h" #include "message.h" #include "settings.h" @@ -41,7 +42,6 @@ #define DEFAULT_UPSTREAM_DEAD_TIME 300 #define DEFAULT_UPSTREAM_MAXERRORS 10 - #define DEFAULT_REJECT_MESSAGE "450 4.5.0 Spam message rejected" static gboolean smtp_write_socket (void *arg); @@ -79,37 +79,6 @@ sig_handler (gint signo, siginfo_t *info, void *unused) } } - -static void -free_smtp_session (gpointer arg) -{ - struct smtp_session *session = arg; - - if (session) { - if (session->task) { - free_task (session->task, FALSE); - if (session->task->msg->begin) { - munmap (session->task->msg->begin, session->task->msg->len); - } - } - if (session->rcpt) { - g_list_free (session->rcpt); - } - if (session->dispatcher) { - rspamd_remove_dispatcher (session->dispatcher); - } - close (session->sock); - if (session->temp_name != NULL) { - unlink (session->temp_name); - } - if (session->temp_fd != -1) { - close (session->temp_fd); - } - memory_pool_delete (session->pool); - g_free (session); - } -} - /* * Config reload is designed by sending sigusr to active workers and pending shutdown of them */ @@ -131,46 +100,6 @@ sigusr_handler (gint fd, short what, void *arg) return; } -static gboolean -create_smtp_upstream_connection (struct smtp_session *session) -{ - struct smtp_upstream *selected; - struct sockaddr_un *un; - - /* Try to select upstream */ - selected = (struct smtp_upstream *)get_upstream_round_robin (session->ctx->upstreams, - session->ctx->upstream_num, sizeof (struct smtp_upstream), - session->session_time, DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS); - if (selected == NULL) { - msg_err ("no upstreams suitable found"); - return FALSE; - } - - session->upstream = selected; - - /* Now try to create socket */ - if (selected->is_unix) { - un = alloca (sizeof (struct sockaddr_un)); - session->upstream_sock = make_unix_socket (selected->name, un, FALSE); - } - else { - session->upstream_sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE); - } - if (session->upstream_sock == -1) { - msg_err ("cannot make a connection to %s", selected->name); - upstream_fail (&selected->up, session->session_time); - return FALSE; - } - /* Create a dispatcher for upstream connection */ - session->upstream_dispatcher = rspamd_create_dispatcher (session->upstream_sock, BUFFER_LINE, - smtp_upstream_read_socket, smtp_upstream_write_socket, smtp_upstream_err_socket, - &session->ctx->smtp_timeout, session); - session->state = SMTP_STATE_WAIT_UPSTREAM; - session->upstream_state = SMTP_STATE_GREETING; - register_async_event (session->s, (event_finalizer_t)smtp_upstream_finalize_connection, session, FALSE); - return TRUE; -} - static gboolean call_stage_filters (struct smtp_session *session, enum rspamd_smtp_stage stage) { @@ -335,30 +264,6 @@ improper_sequence: return FALSE; } -static gboolean -smtp_send_upstream_message (struct smtp_session *session) -{ - rspamd_dispatcher_pause (session->dispatcher); - rspamd_dispatcher_restore (session->upstream_dispatcher); - - session->upstream_state = SMTP_STATE_IN_SENDFILE; - session->state = SMTP_STATE_WAIT_UPSTREAM; - if (! rspamd_dispatcher_sendfile (session->upstream_dispatcher, session->temp_fd, session->temp_size)) { - msg_err ("sendfile failed: %s", strerror (errno)); - goto err; - } - return TRUE; - -err: - session->error = SMTP_ERROR_FILE; - session->state = SMTP_STATE_CRITICAL_ERROR; - if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) { - return FALSE; - } - destroy_session (session->s); - return FALSE; -} - static gboolean process_smtp_data (struct smtp_session *session) { @@ -534,13 +439,6 @@ static gboolean smtp_write_socket (void *arg) { struct smtp_session *session = arg; - double ms = 0, rs = 0; - gint r; - struct metric_result *metric_res; - struct metric *m; - gchar logbuf[1024]; - gboolean is_spam = FALSE; - GList *symbols, *cur; if (session->state == SMTP_STATE_CRITICAL_ERROR) { if (session->error != NULL) { @@ -553,54 +451,7 @@ smtp_write_socket (void *arg) } else if (session->state == SMTP_STATE_END) { if (session->task != NULL) { - /* Check metric */ - m = g_hash_table_lookup (session->cfg->metrics, session->ctx->metric); - metric_res = g_hash_table_lookup (session->task->results, session->ctx->metric); - if (m != NULL && metric_res != NULL) { - if (!check_metric_settings (session->task, m, &ms, &rs)) { - ms = m->required_score; - rs = m->reject_score; - } - if (metric_res->score >= ms) { - is_spam = TRUE; - } - - r = rspamd_snprintf (logbuf, sizeof (logbuf), "msg ok, id: <%s>, ", session->task->message_id); - r += rspamd_snprintf (logbuf + r, sizeof (logbuf) - r, "(%s: %s: [%.2f/%.2f/%.2f] [", - (gchar *)m->name, is_spam ? "T" : "F", metric_res->score, ms, rs); - symbols = g_hash_table_get_keys (metric_res->symbols); - cur = symbols; - while (cur) { - if (g_list_next (cur) != NULL) { - r += rspamd_snprintf (logbuf + r, sizeof (logbuf) - r, "%s,", (gchar *)cur->data); - } - else { - r += rspamd_snprintf (logbuf + r, sizeof (logbuf) - r, "%s", (gchar *)cur->data); - } - cur = g_list_next (cur); - } - g_list_free (symbols); -#ifdef HAVE_CLOCK_GETTIME - r += rspamd_snprintf (logbuf + r, sizeof (logbuf) - r, "]), len: %z, time: %s", - session->task->msg->len, calculate_check_time (&session->task->tv, &session->task->ts, session->cfg->clock_res)); -#else - r += rspamd_snprintf (logbuf + r, sizeof (logbuf) - r, "]), len: %z, time: %s", - session->task->msg->len, calculate_check_time (&session->task->tv, session->cfg->clock_res)); -#endif - msg_info ("%s", logbuf); - - if (is_spam) { - if (! rspamd_dispatcher_write (session->dispatcher, session->ctx->reject_message, 0, FALSE, TRUE)) { - return FALSE; - } - if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) { - return FALSE; - } - destroy_session (session->s); - return FALSE; - } - } - return smtp_send_upstream_message (session); + return write_smtp_reply (session); } else { if (session->error != NULL) { diff --git a/src/smtp_proto.c b/src/smtp_proto.c index dc8fccec1..ee9084f28 100644 --- a/src/smtp_proto.c +++ b/src/smtp_proto.c @@ -28,6 +28,7 @@ #include "util.h" #include "smtp.h" #include "smtp_proto.h" +#include "smtp_utils.h" gchar * make_smtp_error (struct smtp_session *session, gint error_code, const gchar *format, ...) diff --git a/src/smtp_utils.c b/src/smtp_utils.c new file mode 100644 index 000000000..1a30112ee --- /dev/null +++ b/src/smtp_utils.c @@ -0,0 +1,223 @@ +/* Copyright (c) 2010, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "main.h" +#include "filter.h" +#include "settings.h" +#include "smtp.h" +#include "smtp_proto.h" + +void +free_smtp_session (gpointer arg) +{ + struct smtp_session *session = arg; + + if (session) { + if (session->task) { + free_task (session->task, FALSE); + if (session->task->msg->begin) { + munmap (session->task->msg->begin, session->task->msg->len); + } + } + if (session->rcpt) { + g_list_free (session->rcpt); + } + if (session->dispatcher) { + rspamd_remove_dispatcher (session->dispatcher); + } + close (session->sock); + if (session->temp_name != NULL) { + unlink (session->temp_name); + } + if (session->temp_fd != -1) { + close (session->temp_fd); + } + memory_pool_delete (session->pool); + g_free (session); + } +} + +gboolean +create_smtp_upstream_connection (struct smtp_session *session) +{ + struct smtp_upstream *selected; + struct sockaddr_un *un; + + /* Try to select upstream */ + selected = (struct smtp_upstream *)get_upstream_round_robin (session->ctx->upstreams, + session->ctx->upstream_num, sizeof (struct smtp_upstream), + session->session_time, DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS); + if (selected == NULL) { + msg_err ("no upstreams suitable found"); + return FALSE; + } + + session->upstream = selected; + + /* Now try to create socket */ + if (selected->is_unix) { + un = alloca (sizeof (struct sockaddr_un)); + session->upstream_sock = make_unix_socket (selected->name, un, FALSE); + } + else { + session->upstream_sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE); + } + if (session->upstream_sock == -1) { + msg_err ("cannot make a connection to %s", selected->name); + upstream_fail (&selected->up, session->session_time); + return FALSE; + } + /* Create a dispatcher for upstream connection */ + session->upstream_dispatcher = rspamd_create_dispatcher (session->upstream_sock, BUFFER_LINE, + smtp_upstream_read_socket, smtp_upstream_write_socket, smtp_upstream_err_socket, + &session->ctx->smtp_timeout, session); + session->state = SMTP_STATE_WAIT_UPSTREAM; + session->upstream_state = SMTP_STATE_GREETING; + register_async_event (session->s, (event_finalizer_t)smtp_upstream_finalize_connection, session, FALSE); + return TRUE; +} + +gboolean +smtp_send_upstream_message (struct smtp_session *session) +{ + rspamd_dispatcher_pause (session->dispatcher); + rspamd_dispatcher_restore (session->upstream_dispatcher); + + session->upstream_state = SMTP_STATE_IN_SENDFILE; + session->state = SMTP_STATE_WAIT_UPSTREAM; + if (! rspamd_dispatcher_sendfile (session->upstream_dispatcher, session->temp_fd, session->temp_size)) { + msg_err ("sendfile failed: %s", strerror (errno)); + goto err; + } + return TRUE; + +err: + session->error = SMTP_ERROR_FILE; + session->state = SMTP_STATE_CRITICAL_ERROR; + if (! rspamd_dispatcher_write (session->dispatcher, session->error, 0, FALSE, TRUE)) { + return FALSE; + } + destroy_session (session->s); + return FALSE; +} + +struct smtp_metric_callback_data { + struct smtp_session *session; + enum rspamd_metric_action action; + struct metric_result *res; + gchar *log_buf; + gint log_offset; + gint log_size; + gboolean alive; +}; + +static void +smtp_metric_symbols_callback (gpointer key, gpointer value, void *user_data) +{ + struct smtp_metric_callback_data *cd = user_data; + + cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset, "%s,", (gchar *)key); +} + +static void +smtp_metric_callback (gpointer key, gpointer value, gpointer ud) +{ + struct smtp_metric_callback_data *cd = ud; + struct metric_result *metric_res = value; + enum rspamd_metric_action action = METRIC_ACTION_NOACTION; + double ms = 0, rs = 0; + gboolean is_spam = FALSE; + struct worker_task *task; + + task = cd->session->task; + + if (! check_metric_action_settings (task, metric_res->metric, metric_res->score, &action)) { + action = check_metric_action (metric_res->score, ms, metric_res->metric); + } + + if (action < cd->action) { + cd->action = action; + cd->res = metric_res; + } + + if (!task->is_skipped) { + cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset, "(%s: %c (%s): [%.2f/%.2f/%.2f] [", + (gchar *)key, is_spam ? 'T' : 'F', str_action_metric (action), metric_res->score, ms, rs); + } + else { + cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset, "(%s: %c (default): [%.2f/%.2f/%.2f] [", + (gchar *)key, 'S', metric_res->score, ms, rs); + + } + g_hash_table_foreach (metric_res->symbols, smtp_metric_symbols_callback, cd); + /* Remove last , from log buf */ + if (cd->log_buf[cd->log_offset - 1] == ',') { + cd->log_buf[--cd->log_offset] = '\0'; + } + +#ifdef HAVE_CLOCK_GETTIME + cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset, "]), len: %z, time: %s,", + task->msg->len, calculate_check_time (&task->tv, &task->ts, task->cfg->clock_res)); +#else + cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset, "]), len: %z, time: %s,", + task->msg->len, calculate_check_time (&task->tv, task->cfg->clock_res)); +#endif +} + +gboolean +write_smtp_reply (struct smtp_session *session) +{ + gchar logbuf[1024]; + struct smtp_metric_callback_data cd; + + /* Check metrics */ + cd.session = session; + cd.action = METRIC_ACTION_NOACTION; + cd.res = NULL; + cd.log_buf = logbuf; + cd.log_offset = rspamd_snprintf (logbuf, sizeof (logbuf), "id: <%s>, qid: <%s>, ", + session->task->message_id, session->task->queue_id); + cd.log_size = sizeof (logbuf); + if (session->task->user) { + cd.log_offset += rspamd_snprintf (logbuf + cd.log_offset, sizeof (logbuf) - cd.log_offset, + "user: %s, ", session->task->user); + } + + g_hash_table_foreach (session->task->results, smtp_metric_callback, &cd); + + msg_info ("%s", logbuf); + + if (cd.action <= METRIC_ACTION_REJECT) { + if (! rspamd_dispatcher_write (session->dispatcher, session->ctx->reject_message, 0, FALSE, TRUE)) { + return FALSE; + } + if (! rspamd_dispatcher_write (session->dispatcher, CRLF, sizeof (CRLF) - 1, FALSE, TRUE)) { + return FALSE; + } + destroy_session (session->s); + return FALSE; + } + /* XXX: Add other actions */ + return smtp_send_upstream_message (session); +} diff --git a/src/smtp_utils.h b/src/smtp_utils.h new file mode 100644 index 000000000..0374a715e --- /dev/null +++ b/src/smtp_utils.h @@ -0,0 +1,38 @@ +#ifndef SMTP_UTILS_H_ +#define SMTP_UTILS_H_ + +#include "config.h" +#include "main.h" +#include "smtp.h" +#include "smtp_proto.h" + +/** + * @file smtp_utils.h + * Contains utilities for smtp protocol handling + */ + +/** + * Send message to upstream + * @param session session object + */ +gboolean smtp_send_upstream_message (struct smtp_session *session); + +/** + * Create connection to upstream + * @param session session object + */ +gboolean create_smtp_upstream_connection (struct smtp_session *session); + + +/** + * Write reply to upstream + * @param session session object + */ +gboolean write_smtp_reply (struct smtp_session *session); + +/** + * Frees smtp session object + */ +void free_smtp_session (gpointer arg); + +#endif /* SMTP_UTILS_H_ */ -- 2.39.5