aboutsummaryrefslogtreecommitdiffstats
path: root/src/smtp_proxy.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-03-07 20:47:55 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-03-07 20:47:55 +0400
commit9b8ecfb8923cae677777a68790f23fbcf26bdabf (patch)
tree01e262f8354d2209c595fc911f819ca9a54b37ed /src/smtp_proxy.c
parent944dd2a7eab62d2b23c31e26a33d4ad79c11381b (diff)
downloadrspamd-9b8ecfb8923cae677777a68790f23fbcf26bdabf.tar.gz
rspamd-9b8ecfb8923cae677777a68790f23fbcf26bdabf.zip
* Add initial implementation of proxy object
* Add simple smtp proxy worker
Diffstat (limited to 'src/smtp_proxy.c')
-rw-r--r--src/smtp_proxy.c581
1 files changed, 581 insertions, 0 deletions
diff --git a/src/smtp_proxy.c b/src/smtp_proxy.c
new file mode 100644
index 000000000..fa9da971a
--- /dev/null
+++ b/src/smtp_proxy.c
@@ -0,0 +1,581 @@
+/* Copyright (c) 2010-2012, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "main.h"
+#include "cfg_file.h"
+#include "cfg_xml.h"
+#include "util.h"
+#include "smtp_proto.h"
+#include "map.h"
+#include "message.h"
+#include "settings.h"
+#include "dns.h"
+#include "upstream.h"
+#include "proxy.h"
+
+/*
+ * SMTP proxy is a simple smtp proxy worker for dns resolving and
+ * load balancing. It uses XCLIENT command and is designed for MTA
+ * that supports that (postfix and exim).
+ */
+
+/* Upstream timeouts */
+#define DEFAULT_UPSTREAM_ERROR_TIME 10
+#define DEFAULT_UPSTREAM_DEAD_TIME 300
+#define DEFAULT_UPSTREAM_MAXERRORS 10
+
+static sig_atomic_t wanna_die = 0;
+
+/* Init functions */
+gpointer init_smtp_proxy ();
+void start_smtp_proxy (struct rspamd_worker *worker);
+
+worker_t smtp_proxy_worker = {
+ "smtp_proxy", /* Name */
+ init_smtp_proxy, /* Init function */
+ start_smtp_proxy, /* Start function */
+ TRUE, /* Has socket */
+ FALSE, /* Non unique */
+ FALSE, /* Non threaded */
+ TRUE /* Killable */
+};
+
+struct smtp_proxy_ctx {
+ struct smtp_upstream upstreams[MAX_SMTP_UPSTREAMS];
+ size_t upstream_num;
+ gchar *upstreams_str;
+
+ memory_pool_t *pool;
+ guint32 smtp_delay;
+ guint32 delay_jitter;
+ guint32 smtp_timeout_raw;
+ struct timeval smtp_timeout;
+
+ struct rspamd_dns_resolver *resolver;
+ struct event_base *ev_base;
+};
+
+enum rspamd_smtp_proxy_state {
+ SMTP_PROXY_STATE_RESOLVE_REVERSE = 0,
+ SMTP_PROXY_STATE_RESOLVE_NORMAL,
+ SMTP_PROXY_STATE_DELAY
+};
+
+struct smtp_proxy_session {
+ struct smtp_proxy_ctx *ctx;
+ memory_pool_t *pool;
+
+ enum rspamd_smtp_proxy_state state;
+ struct rspamd_worker *worker;
+ struct in_addr client_addr;
+ gchar *hostname;
+ gchar *error;
+ gchar *temp_name;
+ gint sock;
+ gint upstream_sock;
+
+ struct rspamd_async_session *s;
+ rspamd_io_dispatcher_t *dispatcher;
+
+ rspamd_proxy_t *proxy;
+
+ struct smtp_upstream *upstream;
+
+ struct event *delay_timer;
+
+ gboolean resolved;
+ struct rspamd_dns_resolver *resolver;
+ struct event_base *ev_base;
+};
+
+#ifndef HAVE_SA_SIGINFO
+static void
+sig_handler (gint signo)
+#else
+static void
+sig_handler (gint signo, siginfo_t *info, void *unused)
+#endif
+{
+ struct timeval tv;
+
+ switch (signo) {
+ case SIGINT:
+ case SIGTERM:
+ if (!wanna_die) {
+ wanna_die = 1;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ event_loopexit (&tv);
+
+#ifdef WITH_GPERF_TOOLS
+ ProfilerStop ();
+#endif
+ }
+ break;
+ }
+}
+
+/*
+ * Config reload is designed by sending sigusr to active workers and pending shutdown of them
+ */
+static void
+sigusr2_handler (gint fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ /* Do not accept new connections, preparing to end worker's process */
+ struct timeval tv;
+ if (! wanna_die) {
+ tv.tv_sec = SOFT_SHUTDOWN_TIME;
+ tv.tv_usec = 0;
+ event_del (&worker->sig_ev_usr1);
+ event_del (&worker->sig_ev_usr2);
+ event_del (&worker->bind_ev);
+ msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+ event_loopexit (&tv);
+ }
+ return;
+}
+
+/*
+ * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them
+ */
+static void
+sigusr1_handler (gint fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+
+ reopen_log (worker->srv->logger);
+
+ return;
+}
+
+static void
+free_smtp_proxy_session (gpointer arg)
+{
+ struct smtp_proxy_session *session = arg;
+
+ if (session) {
+ if (session->dispatcher) {
+ rspamd_remove_dispatcher (session->dispatcher);
+ }
+
+ close (session->sock);
+ memory_pool_delete (session->pool);
+ g_free (session);
+ }
+}
+
+static void
+smtp_proxy_err_proxy (GError * err, void *arg)
+{
+ struct smtp_proxy_session *session = arg;
+
+ msg_info ("abnormally closing connection, error: %s", err->message);
+ /* Free buffers */
+ destroy_session (session->s);
+}
+
+static gboolean
+create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session)
+{
+ struct smtp_upstream *selected;
+ struct sockaddr_un *un;
+
+ /* Try to select upstream */
+ selected = (struct smtp_upstream *)get_upstream_round_robin (session->ctx->upstreams,
+ session->ctx->upstream_num, sizeof (struct smtp_upstream),
+ time (NULL), DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS);
+ if (selected == NULL) {
+ msg_err ("no upstreams suitable found");
+ return FALSE;
+ }
+
+ session->upstream = selected;
+
+ /* Now try to create socket */
+ if (selected->is_unix) {
+ un = alloca (sizeof (struct sockaddr_un));
+ session->upstream_sock = make_unix_socket (selected->name, un, FALSE, TRUE);
+ }
+ else {
+ session->upstream_sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE);
+ }
+ if (session->upstream_sock == -1) {
+ msg_err ("cannot make a connection to %s", selected->name);
+ upstream_fail (&selected->up, time (NULL));
+ return FALSE;
+ }
+ /* Create a proxy for upstream connection */
+ rspamd_dispatcher_pause (session->dispatcher);
+ session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool,
+ session->ev_base, BUFSIZ, &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session);
+
+ return TRUE;
+}
+
+/* Resolving and delay handlers */
+/*
+ * Return from a delay
+ */
+static void
+smtp_delay_handler (gint fd, short what, void *arg)
+{
+ struct smtp_proxy_session *session = arg;
+
+ remove_normal_event (session->s, (event_finalizer_t) event_del,
+ session->delay_timer);
+ if (session->state == SMTP_PROXY_STATE_DELAY) {
+ /* TODO: Create upstream connection here */
+ create_smtp_proxy_upstream_connection (session);
+ }
+ else {
+ /* TODO: Write error here */
+ if (rspamd_dispatcher_write (session->dispatcher,
+ make_smtp_error (session->pool, 550, "%s Improper use of SMTP command pipelining", "5.5.0"),
+ 0, FALSE, TRUE)) {
+ destroy_session (session->s);
+ }
+ }
+}
+
+/*
+ * Make delay for a client
+ */
+static void
+smtp_make_delay (struct smtp_proxy_session *session)
+{
+ struct event *tev;
+ struct timeval *tv;
+ gint32 jitter;
+
+ if (session->ctx->smtp_delay != 0 && session->state == SMTP_PROXY_STATE_DELAY) {
+ tev = memory_pool_alloc (session->pool, sizeof(struct event));
+ tv = memory_pool_alloc (session->pool, sizeof(struct timeval));
+ if (session->ctx->delay_jitter != 0) {
+ jitter = g_random_int_range (0, session->ctx->delay_jitter);
+ msec_to_tv (session->ctx->smtp_delay + jitter, tv);
+ }
+ else {
+ msec_to_tv (session->ctx->smtp_delay, tv);
+ }
+
+ evtimer_set (tev, smtp_delay_handler, session);
+ evtimer_add (tev, tv);
+ register_async_event (session->s, (event_finalizer_t) event_del, tev,
+ g_quark_from_static_string ("smtp proxy"));
+ session->delay_timer = tev;
+ }
+ else if (session->state == SMTP_PROXY_STATE_DELAY) {
+ /* TODO: Create upstream connection here */
+ create_smtp_proxy_upstream_connection (session);
+ }
+}
+
+/*
+ * Handle DNS replies
+ */
+static void
+smtp_dns_cb (struct rspamd_dns_reply *reply, void *arg)
+{
+ struct smtp_proxy_session *session = arg;
+ gint res = 0;
+ union rspamd_reply_element *elt;
+ GList *cur;
+
+ switch (session->state)
+ {
+ case SMTP_PROXY_STATE_RESOLVE_REVERSE:
+ /* Parse reverse reply and start resolve of this ip */
+ if (reply->code != DNS_RC_NOERROR) {
+ rspamd_conditional_debug (rspamd_main->logger,
+ session->client_addr.s_addr, __FUNCTION__, "DNS error: %s",
+ dns_strerror (reply->code));
+
+ if (reply->code == DNS_RC_NXDOMAIN) {
+ session->hostname = memory_pool_strdup (session->pool,
+ XCLIENT_HOST_UNAVAILABLE);
+ }
+ else {
+ session->hostname = memory_pool_strdup (session->pool,
+ XCLIENT_HOST_TEMPFAIL);
+ }
+ session->state = SMTP_PROXY_STATE_DELAY;
+ smtp_make_delay (session);
+ }
+ else {
+ if (reply->elements) {
+ elt = reply->elements->data;
+ session->hostname = memory_pool_strdup (session->pool,
+ elt->ptr.name);
+ session->state = SMTP_PROXY_STATE_RESOLVE_NORMAL;
+ make_dns_request (session->resolver, session->s, session->pool,
+ smtp_dns_cb, session, DNS_REQUEST_A, session->hostname);
+
+ }
+ }
+ break;
+ case SMTP_PROXY_STATE_RESOLVE_NORMAL:
+ if (reply->code != DNS_RC_NOERROR) {
+ rspamd_conditional_debug (rspamd_main->logger,
+ session->client_addr.s_addr, __FUNCTION__, "DNS error: %s",
+ dns_strerror (reply->code));
+
+ if (reply->code == DNS_RC_NXDOMAIN) {
+ session->hostname = memory_pool_strdup (session->pool,
+ XCLIENT_HOST_UNAVAILABLE);
+ }
+ else {
+ session->hostname = memory_pool_strdup (session->pool,
+ XCLIENT_HOST_TEMPFAIL);
+ }
+ session->state = SMTP_PROXY_STATE_DELAY;
+ smtp_make_delay (session);
+ }
+ else {
+ res = 0;
+ cur = reply->elements;
+ while (cur) {
+ elt = cur->data;
+ if (memcmp (&session->client_addr, &elt->a.addr[0],
+ sizeof(struct in_addr)) == 0) {
+ res = 1;
+ session->resolved = TRUE;
+ break;
+ }
+ cur = g_list_next (cur);
+ }
+
+ if (res == 0) {
+ msg_info(
+ "cannot find address for hostname: %s, ip: %s", session->hostname, inet_ntoa (session->client_addr));
+ session->hostname = memory_pool_strdup (session->pool,
+ XCLIENT_HOST_UNAVAILABLE);
+ }
+ session->state = SMTP_PROXY_STATE_DELAY;
+ smtp_make_delay (session);
+ }
+ break;
+ default:
+ /* TODO: write something about pipelining */
+ break;
+ }
+}
+
+/*
+ * Callback that is called when there is data to read in buffer
+ */
+static gboolean
+smtp_proxy_read_socket (f_str_t * in, void *arg)
+{
+ struct smtp_proxy_session *session = arg;
+
+ /* This can be called only if client is using invalid pipelining */
+ if (rspamd_dispatcher_write (session->dispatcher,
+ make_smtp_error (session->pool, 550, "%s Improper use of SMTP command pipelining", "5.5.0"),
+ 0, FALSE, TRUE)) {
+ destroy_session (session->s);
+ }
+ else {
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+/*
+ * Called if something goes wrong
+ */
+static void
+smtp_proxy_err_socket (GError * err, void *arg)
+{
+ struct smtp_proxy_session *session = arg;
+
+ msg_info ("abnormally closing connection, error: %s", err->message);
+ /* Free buffers */
+ destroy_session (session->s);
+}
+
+/*
+ * Accept new connection and construct session
+ */
+static void
+accept_socket (gint fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ union sa_union su;
+ struct smtp_proxy_session *session;
+ struct smtp_proxy_ctx *ctx;
+
+ socklen_t addrlen = sizeof (su.ss);
+ gint nfd;
+
+ if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
+ msg_warn ("accept failed: %s", strerror (errno));
+ return;
+ }
+ /* Check for EAGAIN */
+ if (nfd == 0) {
+ return;
+ }
+
+ ctx = worker->ctx;
+ session = g_slice_alloc0 (sizeof (struct smtp_proxy_session));
+ session->pool = memory_pool_new (memory_pool_get_size ());
+
+ if (su.ss.ss_family == AF_UNIX) {
+ msg_info ("accepted connection from unix socket");
+ session->client_addr.s_addr = INADDR_NONE;
+ }
+ else if (su.ss.ss_family == AF_INET) {
+ msg_info ("accepted connection from %s port %d", inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
+ memcpy (&session->client_addr, &su.s4.sin_addr, sizeof (struct in_addr));
+ }
+
+ session->sock = nfd;
+ session->worker = worker;
+ session->ctx = ctx;
+ session->resolver = ctx->resolver;
+ session->ev_base = ctx->ev_base;
+ worker->srv->stat->connections_count++;
+
+ /* Resolve client's addr */
+ /* Set up async session */
+ session->s = new_async_session (session->pool, NULL, NULL, free_smtp_proxy_session, session);
+ session->state = SMTP_PROXY_STATE_RESOLVE_REVERSE;
+ if (! make_dns_request (session->resolver, session->s, session->pool,
+ smtp_dns_cb, session, DNS_REQUEST_PTR, &session->client_addr)) {
+ msg_err ("cannot resolve %s", inet_ntoa (session->client_addr));
+ g_slice_free1 (sizeof (struct smtp_proxy_session), session);
+ close (nfd);
+ return;
+ }
+ else {
+ session->dispatcher = rspamd_create_dispatcher (session->ev_base, nfd, BUFFER_ANY,
+ smtp_proxy_read_socket, NULL, smtp_proxy_err_socket,
+ &session->ctx->smtp_timeout, session);
+ session->dispatcher->peer_addr = session->client_addr.s_addr;
+ }
+}
+
+gpointer
+init_smtp_proxy (void)
+{
+ struct smtp_proxy_ctx *ctx;
+ GQuark type;
+
+ type = g_quark_try_string ("smtp_proxy");
+
+ ctx = g_malloc0 (sizeof (struct smtp_worker_ctx));
+ ctx->pool = memory_pool_new (memory_pool_get_size ());
+
+ /* Set default values */
+ ctx->smtp_timeout_raw = 300000;
+ ctx->smtp_delay = 0;
+
+ register_worker_opt (type, "upstreams", xml_handle_string, ctx,
+ G_STRUCT_OFFSET (struct smtp_proxy_ctx, upstreams_str));
+ register_worker_opt (type, "timeout", xml_handle_seconds, ctx,
+ G_STRUCT_OFFSET (struct smtp_proxy_ctx, smtp_timeout_raw));
+ register_worker_opt (type, "delay", xml_handle_seconds, ctx,
+ G_STRUCT_OFFSET (struct smtp_proxy_ctx, smtp_delay));
+ register_worker_opt (type, "jitter", xml_handle_seconds, ctx,
+ G_STRUCT_OFFSET (struct smtp_proxy_ctx, delay_jitter));
+
+ return ctx;
+}
+
+/* Make post-init configuration */
+static gboolean
+config_smtp_proxy_worker (struct rspamd_worker *worker)
+{
+ struct smtp_proxy_ctx *ctx = worker->ctx;
+ gchar *value;
+
+ /* Init timeval */
+ msec_to_tv (ctx->smtp_timeout_raw, &ctx->smtp_timeout);
+
+ /* Init upstreams */
+ if ((value = ctx->upstreams_str) != NULL) {
+ if (!parse_upstreams_line (ctx->pool, ctx->upstreams, value, &ctx->upstream_num)) {
+ return FALSE;
+ }
+ }
+ else {
+ msg_err ("no upstreams defined, don't know what to do");
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+/*
+ * Start worker process
+ */
+void
+start_smtp_proxy (struct rspamd_worker *worker)
+{
+ struct sigaction signals;
+ struct smtp_worker_ctx *ctx = worker->ctx;
+
+ gperf_profiler_init (worker->srv->cfg, "worker");
+
+ worker->srv->pid = getpid ();
+ ctx->ev_base = event_init ();
+
+ /* Set smtp options */
+ if ( !config_smtp_proxy_worker (worker)) {
+ msg_err ("cannot configure smtp worker, exiting");
+ exit (EXIT_SUCCESS);
+ }
+
+ init_signals (&signals, sig_handler);
+ sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+
+ /* SIGUSR2 handler */
+ signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
+ signal_add (&worker->sig_ev_usr2, NULL);
+
+ /* SIGUSR1 handler */
+ signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler, (void *) worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
+ signal_add (&worker->sig_ev_usr1, NULL);
+
+ /* Accept event */
+ event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+ event_base_set (ctx->ev_base, &worker->bind_ev);
+ event_add (&worker->bind_ev, NULL);
+
+ /* DNS resolver */
+ ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
+
+ /* Set umask */
+ umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);
+
+ event_base_loop (ctx->ev_base, 0);
+
+ close_log (rspamd_main->logger);
+ exit (EXIT_SUCCESS);
+}
+