From 9b8ecfb8923cae677777a68790f23fbcf26bdabf Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 7 Mar 2012 20:47:55 +0400 Subject: [PATCH] * Add initial implementation of proxy object * Add simple smtp proxy worker --- CMakeLists.txt | 3 +- lib/CMakeLists.txt | 1 + src/proxy.c | 238 +++++++++++++++++++ src/proxy.h | 66 +++++ src/smtp.c | 64 +---- src/smtp.h | 15 +- src/smtp_proto.c | 4 +- src/smtp_proto.h | 6 +- src/smtp_proxy.c | 581 +++++++++++++++++++++++++++++++++++++++++++++ src/smtp_utils.c | 60 ++++- src/smtp_utils.h | 23 +- 11 files changed, 980 insertions(+), 81 deletions(-) create mode 100644 src/proxy.c create mode 100644 src/proxy.h create mode 100644 src/smtp_proxy.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 758081459..1ac5fded7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -848,6 +848,7 @@ SET(RSPAMDSRC src/modules.c src/main.c src/map.c src/smtp.c + src/smtp_proxy.c src/worker.c) SET(TOKENIZERSSRC src/tokenizers/tokenizers.c @@ -864,7 +865,7 @@ SET(PLUGINSSRC src/plugins/surbl.c src/plugins/spf.c) SET(MODULES_LIST surbl regexp chartable fuzzy_check spf) -SET(WORKERS_LIST normal controller smtp lmtp fuzzy keystorage) +SET(WORKERS_LIST normal controller smtp smtp_proxy lmtp fuzzy keystorage) AddModules(MODULES_LIST WORKERS_LIST) diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 0355fe3f9..feeab47ca 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -63,6 +63,7 @@ SET(RSPAMDLIBSRC ../src/message.c ../src/printf.c ../src/protocol.c + ../src/proxy.c ../src/radix.c ../src/settings.c ../src/spf.c diff --git a/src/proxy.c b/src/proxy.c new file mode 100644 index 000000000..df1ccfdae --- /dev/null +++ b/src/proxy.c @@ -0,0 +1,238 @@ +/* Copyright (c) 2010-2012, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "main.h" +#include "proxy.h" + +static void rspamd_proxy_backend_handler (gint fd, gshort what, gpointer data); +static void rspamd_proxy_client_handler (gint fd, gshort what, gpointer data); + +static inline GQuark +proxy_error_quark (void) +{ + return g_quark_from_static_string ("proxy-error"); +} + +static void +rspamd_proxy_close (rspamd_proxy_t *proxy) +{ + close (proxy->cfd); + close (proxy->bfd); + + event_del (&proxy->client_ev); + event_del (&proxy->backend_ev); +} + +static void +rspamd_proxy_client_handler (gint fd, gshort what, gpointer data) +{ + rspamd_proxy_t *proxy = data; + gint r; + GError *err = NULL; + + if (what == EV_READ) { + /* Got data from client */ + event_del (&proxy->client_ev); + r = read (proxy->cfd, proxy->buf, proxy->bufsize); + if (r > 0) { + /* Write this buffer to backend */ + proxy->read_len = r; + proxy->buf_offset = 0; + event_del (&proxy->backend_ev); + event_set (&proxy->backend_ev, proxy->bfd, EV_WRITE, rspamd_proxy_backend_handler, proxy); + event_add (&proxy->backend_ev, proxy->tv); + } + else { + /* Error case or zero reply */ + if (r < 0) { + /* Error case */ + g_set_error (&err, proxy_error_quark(), r, "Client read error: %s", strerror (errno)); + rspamd_proxy_close (proxy); + proxy->err_cb (err, proxy->user_data); + } + else { + /* Client closes connection */ + rspamd_proxy_close (proxy); + proxy->err_cb (NULL, proxy->user_data); + } + } + } + else if (what == EV_WRITE) { + /* Can write to client */ + r = write (proxy->cfd, proxy->buf + proxy->buf_offset, proxy->read_len - proxy->buf_offset); + if (r > 0) { + /* We wrote something */ + proxy->buf_offset +=r; + if (proxy->buf_offset == proxy->read_len) { + /* We wrote everything */ + event_del (&proxy->client_ev); + event_set (&proxy->client_ev, proxy->bfd, EV_READ, rspamd_proxy_client_handler, proxy); + event_add (&proxy->client_ev, proxy->tv); + event_del (&proxy->backend_ev); + event_set (&proxy->backend_ev, proxy->bfd, EV_READ, rspamd_proxy_client_handler, proxy); + event_add (&proxy->backend_ev, proxy->tv); + } + else { + /* Plan another write event */ + event_add (&proxy->backend_ev, proxy->tv); + } + } + else { + /* Error case or zero reply */ + if (r < 0) { + /* Error case */ + g_set_error (&err, proxy_error_quark(), r, "Client write error: %s", strerror (errno)); + rspamd_proxy_close (proxy); + proxy->err_cb (err, proxy->user_data); + } + else { + /* Client closes connection */ + rspamd_proxy_close (proxy); + proxy->err_cb (NULL, proxy->user_data); + } + } + } + else { + /* Got timeout */ + g_set_error (&err, proxy_error_quark(), ETIMEDOUT, "Client timeout"); + rspamd_proxy_close (proxy); + proxy->err_cb (err, proxy->user_data); + } +} + +static void +rspamd_proxy_backend_handler (gint fd, gshort what, gpointer data) +{ + rspamd_proxy_t *proxy = data; + gint r; + GError *err; + + if (what == EV_READ) { + /* Got data from backend */ + event_del (&proxy->backend_ev); + r = read (proxy->cfd, proxy->buf, proxy->bufsize); + if (r > 0) { + /* Write this buffer to client */ + proxy->read_len = r; + proxy->buf_offset = 0; + event_del (&proxy->client_ev); + event_set (&proxy->client_ev, proxy->bfd, EV_WRITE, rspamd_proxy_client_handler, proxy); + event_add (&proxy->client_ev, proxy->tv); + } + else { + /* Error case or zero reply */ + if (r < 0) { + /* Error case */ + g_set_error (&err, proxy_error_quark(), r, "Backend read error: %s", strerror (errno)); + rspamd_proxy_close (proxy); + proxy->err_cb (err, proxy->user_data); + } + else { + /* Client closes connection */ + rspamd_proxy_close (proxy); + proxy->err_cb (NULL, proxy->user_data); + } + } + } + else if (what == EV_WRITE) { + /* Can write to backend */ + r = write (proxy->bfd, proxy->buf + proxy->buf_offset, proxy->read_len - proxy->buf_offset); + if (r > 0) { + /* We wrote something */ + proxy->buf_offset +=r; + if (proxy->buf_offset == proxy->read_len) { + /* We wrote everything */ + event_del (&proxy->backend_ev); + event_set (&proxy->backend_ev, proxy->bfd, EV_READ, rspamd_proxy_client_handler, proxy); + event_add (&proxy->backend_ev, proxy->tv); + event_del (&proxy->client_ev); + event_set (&proxy->client_ev, proxy->bfd, EV_READ, rspamd_proxy_client_handler, proxy); + event_add (&proxy->client_ev, proxy->tv); + } + else { + /* Plan another write event */ + event_add (&proxy->backend_ev, proxy->tv); + } + } + else { + /* Error case or zero reply */ + if (r < 0) { + /* Error case */ + g_set_error (&err, proxy_error_quark(), r, "Backend write error: %s", strerror (errno)); + rspamd_proxy_close (proxy); + proxy->err_cb (err, proxy->user_data); + } + else { + /* Client closes connection */ + rspamd_proxy_close (proxy); + proxy->err_cb (NULL, proxy->user_data); + } + } + } + else { + /* Got timeout */ + g_set_error (&err, proxy_error_quark(), ETIMEDOUT, "Client timeout"); + rspamd_proxy_close (proxy); + proxy->err_cb (err, proxy->user_data); + } +} + +/** + * Create new proxy between cfd and bfd + * @param cfd client's socket + * @param bfd backend's socket + * @param bufsize size of exchange buffer + * @param err_cb callback for erorrs or completing + * @param ud user data for callback + * @return new proxy object + */ +rspamd_proxy_t* +rspamd_create_proxy (gint cfd, gint bfd, memory_pool_t *pool, struct event_base *base, + gsize bufsize, struct timeval *tv, dispatcher_err_callback_t err_cb, gpointer ud) +{ + rspamd_proxy_t *new; + + new = memory_pool_alloc0 (pool, sizeof (rspamd_proxy_t)); + + new->cfd = cfd; + new->bfd = bfd; + new->pool = pool; + new->base = base; + new->bufsize = bufsize; + new->buf = memory_pool_alloc (pool, bufsize); + new->err_cb = err_cb; + new->user_data = ud; + new->tv = tv; + + /* Set client's and backend's interfaces to read events */ + event_set (&new->client_ev, new->cfd, EV_READ, rspamd_proxy_client_handler, new); + event_base_set (new->base, &new->client_ev); + event_add (&new->client_ev, new->tv); + + event_set (&new->backend_ev, new->bfd, EV_READ, rspamd_proxy_backend_handler, new); + event_base_set (new->base, &new->backend_ev); + event_add (&new->backend_ev, new->tv); + + return new; +} diff --git a/src/proxy.h b/src/proxy.h new file mode 100644 index 000000000..6ebb08d56 --- /dev/null +++ b/src/proxy.h @@ -0,0 +1,66 @@ +/* Copyright (c) 2010-2012, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +#ifndef PROXY_H_ +#define PROXY_H_ + +#include "config.h" +#include "buffer.h" + +/** + * @file proxy.h + * Direct asynchronous proxy implementation + */ + +typedef struct rspamd_proxy_s { + struct event client_ev; /**< event for client's communication */ + struct event backend_ev; /**< event for backend communication */ + struct event_base *base; /**< base for event operations */ + memory_pool_t *pool; /**< memory pool */ + dispatcher_err_callback_t err_cb; /**< error callback */ + struct event_base *ev_base; /**< event base */ + gint cfd; /**< client's socket */ + gint bfd; /**< backend's socket */ + guint8 *buf; /**< exchange buffer */ + gsize bufsize; /**< buffer size */ + gint read_len; /**< read length */ + gint buf_offset; /**< offset to write */ + gpointer user_data; /**< user's data for callbacks */ + struct timeval *tv; /**< timeout for communications */ +} rspamd_proxy_t; + +/** + * Create new proxy between cfd and bfd + * @param cfd client's socket + * @param bfd backend's socket + * @param bufsize size of exchange buffer + * @param err_cb callback for erorrs or completing + * @param ud user data for callback + * @return new proxy object + */ +rspamd_proxy_t* rspamd_create_proxy (gint cfd, gint bfd, memory_pool_t *pool, + struct event_base *base, gsize bufsize, struct timeval *tv, + dispatcher_err_callback_t err_cb, gpointer ud); + +#endif /* PROXY_H_ */ diff --git a/src/smtp.c b/src/smtp.c index 605a680cd..0c10762bd 100644 --- a/src/smtp.c +++ b/src/smtp.c @@ -45,9 +45,6 @@ #define DEFAULT_REJECT_MESSAGE "450 4.5.0 Spam message rejected" -#define XCLIENT_HOST_UNAVAILABLE "[UNAVAILABLE]" -#define XCLIENT_HOST_TEMPFAIL "[TEMPUNAVAIL]" - static gboolean smtp_write_socket (void *arg); static sig_atomic_t wanna_die = 0; @@ -397,7 +394,7 @@ smtp_read_socket (f_str_t * in, void *arg) case SMTP_STATE_RESOLVE_REVERSE: case SMTP_STATE_RESOLVE_NORMAL: case SMTP_STATE_DELAY: - session->error = make_smtp_error (session, 550, "%s Improper use of SMTP command pipelining", "5.5.0"); + session->error = make_smtp_error (session->pool, 550, "%s Improper use of SMTP command pipelining", "5.5.0"); session->state = SMTP_STATE_ERROR; break; case SMTP_STATE_GREETING: @@ -444,7 +441,7 @@ smtp_read_socket (f_str_t * in, void *arg) rspamd_dispatcher_pause (session->dispatcher); break; default: - session->error = make_smtp_error (session, 550, "%s Internal error", "5.5.0"); + session->error = make_smtp_error (session->pool, 550, "%s Internal error", "5.5.0"); session->state = SMTP_STATE_ERROR; break; } @@ -820,61 +817,6 @@ parse_smtp_banner (struct smtp_worker_ctx *ctx, const gchar *line) } } -static gboolean -parse_upstreams_line (struct smtp_worker_ctx *ctx, const gchar *line) -{ - gchar **strv, *p, *t, *tt, *err_str; - guint32 num, i; - struct smtp_upstream *cur; - gchar resolved_path[PATH_MAX]; - - strv = g_strsplit_set (line, ",; ", -1); - num = g_strv_length (strv); - - if (num >= MAX_UPSTREAM) { - msg_err ("cannot define %d upstreams %d is max", num, MAX_UPSTREAM); - return FALSE; - } - - for (i = 0; i < num; i ++) { - p = strv[i]; - cur = &ctx->upstreams[ctx->upstream_num]; - if ((t = strrchr (p, ':')) != NULL && (tt = strchr (p, ':')) != t) { - /* Assume that after last `:' we have weigth */ - *t = '\0'; - t ++; - errno = 0; - cur->up.priority = strtoul (t, &err_str, 10); - if (errno != 0 || (err_str && *err_str != '\0')) { - msg_err ("cannot convert weight: %s, %s", t, strerror (errno)); - g_strfreev (strv); - return FALSE; - } - } - if (*p == '/') { - cur->is_unix = TRUE; - if (realpath (p, resolved_path) == NULL) { - msg_err ("cannot resolve path: %s", resolved_path); - g_strfreev (strv); - return FALSE; - } - cur->name = memory_pool_strdup (ctx->pool, resolved_path); - ctx->upstream_num ++; - } - else { - if (! parse_host_port (p, &cur->addr, &cur->port)) { - g_strfreev (strv); - return FALSE; - } - cur->name = memory_pool_strdup (ctx->pool, p); - ctx->upstream_num ++; - } - } - - g_strfreev (strv); - return TRUE; -} - static void make_capabilities (struct smtp_worker_ctx *ctx, const gchar *line) { @@ -973,7 +915,7 @@ config_smtp_worker (struct rspamd_worker *worker) /* Init upstreams */ if ((value = ctx->upstreams_str) != NULL) { - if (!parse_upstreams_line (ctx, value)) { + if (!parse_upstreams_line (ctx->pool, ctx->upstreams, value, &ctx->upstream_num)) { return FALSE; } } diff --git a/src/smtp.h b/src/smtp.h index d2c14f924..714fb3523 100644 --- a/src/smtp.h +++ b/src/smtp.h @@ -4,19 +4,10 @@ #include "config.h" #include "main.h" #include "upstream.h" - -struct smtp_upstream { - struct upstream up; - - const gchar *name; - struct in_addr addr; - guint16 port; - gboolean is_unix; -}; +#include "smtp_utils.h" struct rspamd_dns_resolver; -#define MAX_UPSTREAM 128 #define DEFAULT_MAX_ERRORS 10 enum rspamd_smtp_stage { @@ -29,8 +20,8 @@ enum rspamd_smtp_stage { }; struct smtp_worker_ctx { - struct smtp_upstream upstreams[MAX_UPSTREAM]; - size_t upstream_num; + struct smtp_upstream upstreams[MAX_SMTP_UPSTREAMS]; + gsize upstream_num; gchar *upstreams_str; memory_pool_t *pool; diff --git a/src/smtp_proto.c b/src/smtp_proto.c index d443a1fb3..c8f6fa556 100644 --- a/src/smtp_proto.c +++ b/src/smtp_proto.c @@ -31,7 +31,7 @@ #include "smtp_utils.h" gchar * -make_smtp_error (struct smtp_session *session, gint error_code, const gchar *format, ...) +make_smtp_error (memory_pool_t *pool, gint error_code, const gchar *format, ...) { va_list vp; gchar *result = NULL, *p; @@ -42,7 +42,7 @@ make_smtp_error (struct smtp_session *session, gint error_code, const gchar *for va_end (vp); va_start (vp, format); len += sizeof ("65535 ") + sizeof (CRLF) - 1; - result = memory_pool_alloc (session->pool, len); + result = memory_pool_alloc (pool, len); p = result + rspamd_snprintf (result, len, "%d ", error_code); p = rspamd_vsnprintf (p, len - (p - result), format, vp); *p++ = CR; *p++ = LF; *p = '\0'; diff --git a/src/smtp_proto.h b/src/smtp_proto.h index 2f2904192..efb09bf35 100644 --- a/src/smtp_proto.h +++ b/src/smtp_proto.h @@ -18,6 +18,10 @@ #define DATA_END_TRAILER "." CRLF +#define XCLIENT_HOST_UNAVAILABLE "[UNAVAILABLE]" +#define XCLIENT_HOST_TEMPFAIL "[TEMPUNAVAIL]" + +#define MAX_SMTP_UPSTREAMS 128 struct smtp_command { enum { @@ -39,7 +43,7 @@ struct smtp_command { /* * Generate SMTP error message */ -gchar * make_smtp_error (struct smtp_session *session, gint error_code, const gchar *format, ...); +gchar * make_smtp_error (memory_pool_t *pool, gint error_code, const gchar *format, ...); /* * Parse a single SMTP command diff --git a/src/smtp_proxy.c b/src/smtp_proxy.c new file mode 100644 index 000000000..fa9da971a --- /dev/null +++ b/src/smtp_proxy.c @@ -0,0 +1,581 @@ +/* Copyright (c) 2010-2012, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "main.h" +#include "cfg_file.h" +#include "cfg_xml.h" +#include "util.h" +#include "smtp_proto.h" +#include "map.h" +#include "message.h" +#include "settings.h" +#include "dns.h" +#include "upstream.h" +#include "proxy.h" + +/* + * SMTP proxy is a simple smtp proxy worker for dns resolving and + * load balancing. It uses XCLIENT command and is designed for MTA + * that supports that (postfix and exim). + */ + +/* Upstream timeouts */ +#define DEFAULT_UPSTREAM_ERROR_TIME 10 +#define DEFAULT_UPSTREAM_DEAD_TIME 300 +#define DEFAULT_UPSTREAM_MAXERRORS 10 + +static sig_atomic_t wanna_die = 0; + +/* Init functions */ +gpointer init_smtp_proxy (); +void start_smtp_proxy (struct rspamd_worker *worker); + +worker_t smtp_proxy_worker = { + "smtp_proxy", /* Name */ + init_smtp_proxy, /* Init function */ + start_smtp_proxy, /* Start function */ + TRUE, /* Has socket */ + FALSE, /* Non unique */ + FALSE, /* Non threaded */ + TRUE /* Killable */ +}; + +struct smtp_proxy_ctx { + struct smtp_upstream upstreams[MAX_SMTP_UPSTREAMS]; + size_t upstream_num; + gchar *upstreams_str; + + memory_pool_t *pool; + guint32 smtp_delay; + guint32 delay_jitter; + guint32 smtp_timeout_raw; + struct timeval smtp_timeout; + + struct rspamd_dns_resolver *resolver; + struct event_base *ev_base; +}; + +enum rspamd_smtp_proxy_state { + SMTP_PROXY_STATE_RESOLVE_REVERSE = 0, + SMTP_PROXY_STATE_RESOLVE_NORMAL, + SMTP_PROXY_STATE_DELAY +}; + +struct smtp_proxy_session { + struct smtp_proxy_ctx *ctx; + memory_pool_t *pool; + + enum rspamd_smtp_proxy_state state; + struct rspamd_worker *worker; + struct in_addr client_addr; + gchar *hostname; + gchar *error; + gchar *temp_name; + gint sock; + gint upstream_sock; + + struct rspamd_async_session *s; + rspamd_io_dispatcher_t *dispatcher; + + rspamd_proxy_t *proxy; + + struct smtp_upstream *upstream; + + struct event *delay_timer; + + gboolean resolved; + struct rspamd_dns_resolver *resolver; + struct event_base *ev_base; +}; + +#ifndef HAVE_SA_SIGINFO +static void +sig_handler (gint signo) +#else +static void +sig_handler (gint signo, siginfo_t *info, void *unused) +#endif +{ + struct timeval tv; + + switch (signo) { + case SIGINT: + case SIGTERM: + if (!wanna_die) { + wanna_die = 1; + tv.tv_sec = 0; + tv.tv_usec = 0; + event_loopexit (&tv); + +#ifdef WITH_GPERF_TOOLS + ProfilerStop (); +#endif + } + break; + } +} + +/* + * Config reload is designed by sending sigusr to active workers and pending shutdown of them + */ +static void +sigusr2_handler (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + /* Do not accept new connections, preparing to end worker's process */ + struct timeval tv; + if (! wanna_die) { + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + event_del (&worker->sig_ev_usr1); + event_del (&worker->sig_ev_usr2); + event_del (&worker->bind_ev); + msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + event_loopexit (&tv); + } + return; +} + +/* + * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them + */ +static void +sigusr1_handler (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *) arg; + + reopen_log (worker->srv->logger); + + return; +} + +static void +free_smtp_proxy_session (gpointer arg) +{ + struct smtp_proxy_session *session = arg; + + if (session) { + if (session->dispatcher) { + rspamd_remove_dispatcher (session->dispatcher); + } + + close (session->sock); + memory_pool_delete (session->pool); + g_free (session); + } +} + +static void +smtp_proxy_err_proxy (GError * err, void *arg) +{ + struct smtp_proxy_session *session = arg; + + msg_info ("abnormally closing connection, error: %s", err->message); + /* Free buffers */ + destroy_session (session->s); +} + +static gboolean +create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session) +{ + struct smtp_upstream *selected; + struct sockaddr_un *un; + + /* Try to select upstream */ + selected = (struct smtp_upstream *)get_upstream_round_robin (session->ctx->upstreams, + session->ctx->upstream_num, sizeof (struct smtp_upstream), + time (NULL), DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS); + if (selected == NULL) { + msg_err ("no upstreams suitable found"); + return FALSE; + } + + session->upstream = selected; + + /* Now try to create socket */ + if (selected->is_unix) { + un = alloca (sizeof (struct sockaddr_un)); + session->upstream_sock = make_unix_socket (selected->name, un, FALSE, TRUE); + } + else { + session->upstream_sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE); + } + if (session->upstream_sock == -1) { + msg_err ("cannot make a connection to %s", selected->name); + upstream_fail (&selected->up, time (NULL)); + return FALSE; + } + /* Create a proxy for upstream connection */ + rspamd_dispatcher_pause (session->dispatcher); + session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool, + session->ev_base, BUFSIZ, &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session); + + return TRUE; +} + +/* Resolving and delay handlers */ +/* + * Return from a delay + */ +static void +smtp_delay_handler (gint fd, short what, void *arg) +{ + struct smtp_proxy_session *session = arg; + + remove_normal_event (session->s, (event_finalizer_t) event_del, + session->delay_timer); + if (session->state == SMTP_PROXY_STATE_DELAY) { + /* TODO: Create upstream connection here */ + create_smtp_proxy_upstream_connection (session); + } + else { + /* TODO: Write error here */ + if (rspamd_dispatcher_write (session->dispatcher, + make_smtp_error (session->pool, 550, "%s Improper use of SMTP command pipelining", "5.5.0"), + 0, FALSE, TRUE)) { + destroy_session (session->s); + } + } +} + +/* + * Make delay for a client + */ +static void +smtp_make_delay (struct smtp_proxy_session *session) +{ + struct event *tev; + struct timeval *tv; + gint32 jitter; + + if (session->ctx->smtp_delay != 0 && session->state == SMTP_PROXY_STATE_DELAY) { + tev = memory_pool_alloc (session->pool, sizeof(struct event)); + tv = memory_pool_alloc (session->pool, sizeof(struct timeval)); + if (session->ctx->delay_jitter != 0) { + jitter = g_random_int_range (0, session->ctx->delay_jitter); + msec_to_tv (session->ctx->smtp_delay + jitter, tv); + } + else { + msec_to_tv (session->ctx->smtp_delay, tv); + } + + evtimer_set (tev, smtp_delay_handler, session); + evtimer_add (tev, tv); + register_async_event (session->s, (event_finalizer_t) event_del, tev, + g_quark_from_static_string ("smtp proxy")); + session->delay_timer = tev; + } + else if (session->state == SMTP_PROXY_STATE_DELAY) { + /* TODO: Create upstream connection here */ + create_smtp_proxy_upstream_connection (session); + } +} + +/* + * Handle DNS replies + */ +static void +smtp_dns_cb (struct rspamd_dns_reply *reply, void *arg) +{ + struct smtp_proxy_session *session = arg; + gint res = 0; + union rspamd_reply_element *elt; + GList *cur; + + switch (session->state) + { + case SMTP_PROXY_STATE_RESOLVE_REVERSE: + /* Parse reverse reply and start resolve of this ip */ + if (reply->code != DNS_RC_NOERROR) { + rspamd_conditional_debug (rspamd_main->logger, + session->client_addr.s_addr, __FUNCTION__, "DNS error: %s", + dns_strerror (reply->code)); + + if (reply->code == DNS_RC_NXDOMAIN) { + session->hostname = memory_pool_strdup (session->pool, + XCLIENT_HOST_UNAVAILABLE); + } + else { + session->hostname = memory_pool_strdup (session->pool, + XCLIENT_HOST_TEMPFAIL); + } + session->state = SMTP_PROXY_STATE_DELAY; + smtp_make_delay (session); + } + else { + if (reply->elements) { + elt = reply->elements->data; + session->hostname = memory_pool_strdup (session->pool, + elt->ptr.name); + session->state = SMTP_PROXY_STATE_RESOLVE_NORMAL; + make_dns_request (session->resolver, session->s, session->pool, + smtp_dns_cb, session, DNS_REQUEST_A, session->hostname); + + } + } + break; + case SMTP_PROXY_STATE_RESOLVE_NORMAL: + if (reply->code != DNS_RC_NOERROR) { + rspamd_conditional_debug (rspamd_main->logger, + session->client_addr.s_addr, __FUNCTION__, "DNS error: %s", + dns_strerror (reply->code)); + + if (reply->code == DNS_RC_NXDOMAIN) { + session->hostname = memory_pool_strdup (session->pool, + XCLIENT_HOST_UNAVAILABLE); + } + else { + session->hostname = memory_pool_strdup (session->pool, + XCLIENT_HOST_TEMPFAIL); + } + session->state = SMTP_PROXY_STATE_DELAY; + smtp_make_delay (session); + } + else { + res = 0; + cur = reply->elements; + while (cur) { + elt = cur->data; + if (memcmp (&session->client_addr, &elt->a.addr[0], + sizeof(struct in_addr)) == 0) { + res = 1; + session->resolved = TRUE; + break; + } + cur = g_list_next (cur); + } + + if (res == 0) { + msg_info( + "cannot find address for hostname: %s, ip: %s", session->hostname, inet_ntoa (session->client_addr)); + session->hostname = memory_pool_strdup (session->pool, + XCLIENT_HOST_UNAVAILABLE); + } + session->state = SMTP_PROXY_STATE_DELAY; + smtp_make_delay (session); + } + break; + default: + /* TODO: write something about pipelining */ + break; + } +} + +/* + * Callback that is called when there is data to read in buffer + */ +static gboolean +smtp_proxy_read_socket (f_str_t * in, void *arg) +{ + struct smtp_proxy_session *session = arg; + + /* This can be called only if client is using invalid pipelining */ + if (rspamd_dispatcher_write (session->dispatcher, + make_smtp_error (session->pool, 550, "%s Improper use of SMTP command pipelining", "5.5.0"), + 0, FALSE, TRUE)) { + destroy_session (session->s); + } + else { + return FALSE; + } + + return TRUE; +} + +/* + * Called if something goes wrong + */ +static void +smtp_proxy_err_socket (GError * err, void *arg) +{ + struct smtp_proxy_session *session = arg; + + msg_info ("abnormally closing connection, error: %s", err->message); + /* Free buffers */ + destroy_session (session->s); +} + +/* + * Accept new connection and construct session + */ +static void +accept_socket (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + union sa_union su; + struct smtp_proxy_session *session; + struct smtp_proxy_ctx *ctx; + + socklen_t addrlen = sizeof (su.ss); + gint nfd; + + if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { + msg_warn ("accept failed: %s", strerror (errno)); + return; + } + /* Check for EAGAIN */ + if (nfd == 0) { + return; + } + + ctx = worker->ctx; + session = g_slice_alloc0 (sizeof (struct smtp_proxy_session)); + session->pool = memory_pool_new (memory_pool_get_size ()); + + if (su.ss.ss_family == AF_UNIX) { + msg_info ("accepted connection from unix socket"); + session->client_addr.s_addr = INADDR_NONE; + } + else if (su.ss.ss_family == AF_INET) { + msg_info ("accepted connection from %s port %d", inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port)); + memcpy (&session->client_addr, &su.s4.sin_addr, sizeof (struct in_addr)); + } + + session->sock = nfd; + session->worker = worker; + session->ctx = ctx; + session->resolver = ctx->resolver; + session->ev_base = ctx->ev_base; + worker->srv->stat->connections_count++; + + /* Resolve client's addr */ + /* Set up async session */ + session->s = new_async_session (session->pool, NULL, NULL, free_smtp_proxy_session, session); + session->state = SMTP_PROXY_STATE_RESOLVE_REVERSE; + if (! make_dns_request (session->resolver, session->s, session->pool, + smtp_dns_cb, session, DNS_REQUEST_PTR, &session->client_addr)) { + msg_err ("cannot resolve %s", inet_ntoa (session->client_addr)); + g_slice_free1 (sizeof (struct smtp_proxy_session), session); + close (nfd); + return; + } + else { + session->dispatcher = rspamd_create_dispatcher (session->ev_base, nfd, BUFFER_ANY, + smtp_proxy_read_socket, NULL, smtp_proxy_err_socket, + &session->ctx->smtp_timeout, session); + session->dispatcher->peer_addr = session->client_addr.s_addr; + } +} + +gpointer +init_smtp_proxy (void) +{ + struct smtp_proxy_ctx *ctx; + GQuark type; + + type = g_quark_try_string ("smtp_proxy"); + + ctx = g_malloc0 (sizeof (struct smtp_worker_ctx)); + ctx->pool = memory_pool_new (memory_pool_get_size ()); + + /* Set default values */ + ctx->smtp_timeout_raw = 300000; + ctx->smtp_delay = 0; + + register_worker_opt (type, "upstreams", xml_handle_string, ctx, + G_STRUCT_OFFSET (struct smtp_proxy_ctx, upstreams_str)); + register_worker_opt (type, "timeout", xml_handle_seconds, ctx, + G_STRUCT_OFFSET (struct smtp_proxy_ctx, smtp_timeout_raw)); + register_worker_opt (type, "delay", xml_handle_seconds, ctx, + G_STRUCT_OFFSET (struct smtp_proxy_ctx, smtp_delay)); + register_worker_opt (type, "jitter", xml_handle_seconds, ctx, + G_STRUCT_OFFSET (struct smtp_proxy_ctx, delay_jitter)); + + return ctx; +} + +/* Make post-init configuration */ +static gboolean +config_smtp_proxy_worker (struct rspamd_worker *worker) +{ + struct smtp_proxy_ctx *ctx = worker->ctx; + gchar *value; + + /* Init timeval */ + msec_to_tv (ctx->smtp_timeout_raw, &ctx->smtp_timeout); + + /* Init upstreams */ + if ((value = ctx->upstreams_str) != NULL) { + if (!parse_upstreams_line (ctx->pool, ctx->upstreams, value, &ctx->upstream_num)) { + return FALSE; + } + } + else { + msg_err ("no upstreams defined, don't know what to do"); + return FALSE; + } + + return TRUE; +} + +/* + * Start worker process + */ +void +start_smtp_proxy (struct rspamd_worker *worker) +{ + struct sigaction signals; + struct smtp_worker_ctx *ctx = worker->ctx; + + gperf_profiler_init (worker->srv->cfg, "worker"); + + worker->srv->pid = getpid (); + ctx->ev_base = event_init (); + + /* Set smtp options */ + if ( !config_smtp_proxy_worker (worker)) { + msg_err ("cannot configure smtp worker, exiting"); + exit (EXIT_SUCCESS); + } + + init_signals (&signals, sig_handler); + sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + + /* SIGUSR2 handler */ + signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker); + event_base_set (ctx->ev_base, &worker->sig_ev_usr2); + signal_add (&worker->sig_ev_usr2, NULL); + + /* SIGUSR1 handler */ + signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler, (void *) worker); + event_base_set (ctx->ev_base, &worker->sig_ev_usr1); + signal_add (&worker->sig_ev_usr1, NULL); + + /* Accept event */ + event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_base_set (ctx->ev_base, &worker->bind_ev); + event_add (&worker->bind_ev, NULL); + + /* DNS resolver */ + ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg); + + /* Set umask */ + umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); + + event_base_loop (ctx->ev_base, 0); + + close_log (rspamd_main->logger); + exit (EXIT_SUCCESS); +} + diff --git a/src/smtp_utils.c b/src/smtp_utils.c index 8ceaef5a6..170efdad5 100644 --- a/src/smtp_utils.c +++ b/src/smtp_utils.c @@ -28,7 +28,7 @@ #include "smtp.h" #include "smtp_proto.h" -gboolean +void free_smtp_session (gpointer arg) { struct smtp_session *session = arg; @@ -56,8 +56,6 @@ free_smtp_session (gpointer arg) memory_pool_delete (session->pool); g_free (session); } - - return TRUE; } gboolean @@ -313,3 +311,59 @@ err: destroy_session (session->s); return FALSE; } + +gboolean +parse_upstreams_line (memory_pool_t *pool, struct smtp_upstream *upstreams, const gchar *line, gsize *count) +{ + gchar **strv, *p, *t, *tt, *err_str; + guint32 num, i; + struct smtp_upstream *cur; + gchar resolved_path[PATH_MAX]; + + strv = g_strsplit_set (line, ",; ", -1); + num = g_strv_length (strv); + + if (num >= MAX_SMTP_UPSTREAMS) { + msg_err ("cannot define %d upstreams %d is max", num, MAX_SMTP_UPSTREAMS); + return FALSE; + } + *count = 0; + + for (i = 0; i < num; i ++) { + p = strv[i]; + cur = &upstreams[*count]; + if ((t = strrchr (p, ':')) != NULL && (tt = strchr (p, ':')) != t) { + /* Assume that after last `:' we have weigth */ + *t = '\0'; + t ++; + errno = 0; + cur->up.priority = strtoul (t, &err_str, 10); + if (errno != 0 || (err_str && *err_str != '\0')) { + msg_err ("cannot convert weight: %s, %s", t, strerror (errno)); + g_strfreev (strv); + return FALSE; + } + } + if (*p == '/') { + cur->is_unix = TRUE; + if (realpath (p, resolved_path) == NULL) { + msg_err ("cannot resolve path: %s", resolved_path); + g_strfreev (strv); + return FALSE; + } + cur->name = memory_pool_strdup (pool, resolved_path); + (*count) ++; + } + else { + if (! parse_host_port (p, &cur->addr, &cur->port)) { + g_strfreev (strv); + return FALSE; + } + cur->name = memory_pool_strdup (pool, p); + (*count) ++; + } + } + + g_strfreev (strv); + return TRUE; +} diff --git a/src/smtp_utils.h b/src/smtp_utils.h index e61a85ae2..b32b8235e 100644 --- a/src/smtp_utils.h +++ b/src/smtp_utils.h @@ -4,13 +4,25 @@ #include "config.h" #include "main.h" #include "smtp.h" -#include "smtp_proto.h" /** * @file smtp_utils.h * Contains utilities for smtp protocol handling */ +struct smtp_upstream { + struct upstream up; + + const gchar *name; + struct in_addr addr; + guint16 port; + gboolean is_unix; +}; + +#define MAX_SMTP_UPSTREAMS 128 + +struct smtp_session; + /** * Send message to upstream * @param session session object @@ -39,4 +51,13 @@ gboolean write_smtp_reply (struct smtp_session *session); */ void free_smtp_session (gpointer arg); +/** + * Parse upstreams line + * @param upstreams pointer to the array of upstreams (must be at least MAX_SMTP_UPSTREAMS size) + * @param line description line + * @param count targeted count + * @return + */ +gboolean parse_upstreams_line (memory_pool_t *pool, struct smtp_upstream *upstreams, const gchar *line, gsize *count); + #endif /* SMTP_UTILS_H_ */ -- 2.39.5