diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-03-07 20:47:55 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-03-07 20:47:55 +0400 |
commit | 9b8ecfb8923cae677777a68790f23fbcf26bdabf (patch) | |
tree | 01e262f8354d2209c595fc911f819ca9a54b37ed /src/smtp_proxy.c | |
parent | 944dd2a7eab62d2b23c31e26a33d4ad79c11381b (diff) | |
download | rspamd-9b8ecfb8923cae677777a68790f23fbcf26bdabf.tar.gz rspamd-9b8ecfb8923cae677777a68790f23fbcf26bdabf.zip |
* Add initial implementation of proxy object
* Add simple smtp proxy worker
Diffstat (limited to 'src/smtp_proxy.c')
-rw-r--r-- | src/smtp_proxy.c | 581 |
1 files changed, 581 insertions, 0 deletions
diff --git a/src/smtp_proxy.c b/src/smtp_proxy.c new file mode 100644 index 000000000..fa9da971a --- /dev/null +++ b/src/smtp_proxy.c @@ -0,0 +1,581 @@ +/* Copyright (c) 2010-2012, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "main.h" +#include "cfg_file.h" +#include "cfg_xml.h" +#include "util.h" +#include "smtp_proto.h" +#include "map.h" +#include "message.h" +#include "settings.h" +#include "dns.h" +#include "upstream.h" +#include "proxy.h" + +/* + * SMTP proxy is a simple smtp proxy worker for dns resolving and + * load balancing. It uses XCLIENT command and is designed for MTA + * that supports that (postfix and exim). + */ + +/* Upstream timeouts */ +#define DEFAULT_UPSTREAM_ERROR_TIME 10 +#define DEFAULT_UPSTREAM_DEAD_TIME 300 +#define DEFAULT_UPSTREAM_MAXERRORS 10 + +static sig_atomic_t wanna_die = 0; + +/* Init functions */ +gpointer init_smtp_proxy (); +void start_smtp_proxy (struct rspamd_worker *worker); + +worker_t smtp_proxy_worker = { + "smtp_proxy", /* Name */ + init_smtp_proxy, /* Init function */ + start_smtp_proxy, /* Start function */ + TRUE, /* Has socket */ + FALSE, /* Non unique */ + FALSE, /* Non threaded */ + TRUE /* Killable */ +}; + +struct smtp_proxy_ctx { + struct smtp_upstream upstreams[MAX_SMTP_UPSTREAMS]; + size_t upstream_num; + gchar *upstreams_str; + + memory_pool_t *pool; + guint32 smtp_delay; + guint32 delay_jitter; + guint32 smtp_timeout_raw; + struct timeval smtp_timeout; + + struct rspamd_dns_resolver *resolver; + struct event_base *ev_base; +}; + +enum rspamd_smtp_proxy_state { + SMTP_PROXY_STATE_RESOLVE_REVERSE = 0, + SMTP_PROXY_STATE_RESOLVE_NORMAL, + SMTP_PROXY_STATE_DELAY +}; + +struct smtp_proxy_session { + struct smtp_proxy_ctx *ctx; + memory_pool_t *pool; + + enum rspamd_smtp_proxy_state state; + struct rspamd_worker *worker; + struct in_addr client_addr; + gchar *hostname; + gchar *error; + gchar *temp_name; + gint sock; + gint upstream_sock; + + struct rspamd_async_session *s; + rspamd_io_dispatcher_t *dispatcher; + + rspamd_proxy_t *proxy; + + struct smtp_upstream *upstream; + + struct event *delay_timer; + + gboolean resolved; + struct rspamd_dns_resolver *resolver; + struct event_base *ev_base; +}; + +#ifndef HAVE_SA_SIGINFO +static void +sig_handler (gint signo) +#else +static void +sig_handler (gint signo, siginfo_t *info, void *unused) +#endif +{ + struct timeval tv; + + switch (signo) { + case SIGINT: + case SIGTERM: + if (!wanna_die) { + wanna_die = 1; + tv.tv_sec = 0; + tv.tv_usec = 0; + event_loopexit (&tv); + +#ifdef WITH_GPERF_TOOLS + ProfilerStop (); +#endif + } + break; + } +} + +/* + * Config reload is designed by sending sigusr to active workers and pending shutdown of them + */ +static void +sigusr2_handler (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + /* Do not accept new connections, preparing to end worker's process */ + struct timeval tv; + if (! wanna_die) { + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + event_del (&worker->sig_ev_usr1); + event_del (&worker->sig_ev_usr2); + event_del (&worker->bind_ev); + msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + event_loopexit (&tv); + } + return; +} + +/* + * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them + */ +static void +sigusr1_handler (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *) arg; + + reopen_log (worker->srv->logger); + + return; +} + +static void +free_smtp_proxy_session (gpointer arg) +{ + struct smtp_proxy_session *session = arg; + + if (session) { + if (session->dispatcher) { + rspamd_remove_dispatcher (session->dispatcher); + } + + close (session->sock); + memory_pool_delete (session->pool); + g_free (session); + } +} + +static void +smtp_proxy_err_proxy (GError * err, void *arg) +{ + struct smtp_proxy_session *session = arg; + + msg_info ("abnormally closing connection, error: %s", err->message); + /* Free buffers */ + destroy_session (session->s); +} + +static gboolean +create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session) +{ + struct smtp_upstream *selected; + struct sockaddr_un *un; + + /* Try to select upstream */ + selected = (struct smtp_upstream *)get_upstream_round_robin (session->ctx->upstreams, + session->ctx->upstream_num, sizeof (struct smtp_upstream), + time (NULL), DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS); + if (selected == NULL) { + msg_err ("no upstreams suitable found"); + return FALSE; + } + + session->upstream = selected; + + /* Now try to create socket */ + if (selected->is_unix) { + un = alloca (sizeof (struct sockaddr_un)); + session->upstream_sock = make_unix_socket (selected->name, un, FALSE, TRUE); + } + else { + session->upstream_sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE); + } + if (session->upstream_sock == -1) { + msg_err ("cannot make a connection to %s", selected->name); + upstream_fail (&selected->up, time (NULL)); + return FALSE; + } + /* Create a proxy for upstream connection */ + rspamd_dispatcher_pause (session->dispatcher); + session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool, + session->ev_base, BUFSIZ, &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session); + + return TRUE; +} + +/* Resolving and delay handlers */ +/* + * Return from a delay + */ +static void +smtp_delay_handler (gint fd, short what, void *arg) +{ + struct smtp_proxy_session *session = arg; + + remove_normal_event (session->s, (event_finalizer_t) event_del, + session->delay_timer); + if (session->state == SMTP_PROXY_STATE_DELAY) { + /* TODO: Create upstream connection here */ + create_smtp_proxy_upstream_connection (session); + } + else { + /* TODO: Write error here */ + if (rspamd_dispatcher_write (session->dispatcher, + make_smtp_error (session->pool, 550, "%s Improper use of SMTP command pipelining", "5.5.0"), + 0, FALSE, TRUE)) { + destroy_session (session->s); + } + } +} + +/* + * Make delay for a client + */ +static void +smtp_make_delay (struct smtp_proxy_session *session) +{ + struct event *tev; + struct timeval *tv; + gint32 jitter; + + if (session->ctx->smtp_delay != 0 && session->state == SMTP_PROXY_STATE_DELAY) { + tev = memory_pool_alloc (session->pool, sizeof(struct event)); + tv = memory_pool_alloc (session->pool, sizeof(struct timeval)); + if (session->ctx->delay_jitter != 0) { + jitter = g_random_int_range (0, session->ctx->delay_jitter); + msec_to_tv (session->ctx->smtp_delay + jitter, tv); + } + else { + msec_to_tv (session->ctx->smtp_delay, tv); + } + + evtimer_set (tev, smtp_delay_handler, session); + evtimer_add (tev, tv); + register_async_event (session->s, (event_finalizer_t) event_del, tev, + g_quark_from_static_string ("smtp proxy")); + session->delay_timer = tev; + } + else if (session->state == SMTP_PROXY_STATE_DELAY) { + /* TODO: Create upstream connection here */ + create_smtp_proxy_upstream_connection (session); + } +} + +/* + * Handle DNS replies + */ +static void +smtp_dns_cb (struct rspamd_dns_reply *reply, void *arg) +{ + struct smtp_proxy_session *session = arg; + gint res = 0; + union rspamd_reply_element *elt; + GList *cur; + + switch (session->state) + { + case SMTP_PROXY_STATE_RESOLVE_REVERSE: + /* Parse reverse reply and start resolve of this ip */ + if (reply->code != DNS_RC_NOERROR) { + rspamd_conditional_debug (rspamd_main->logger, + session->client_addr.s_addr, __FUNCTION__, "DNS error: %s", + dns_strerror (reply->code)); + + if (reply->code == DNS_RC_NXDOMAIN) { + session->hostname = memory_pool_strdup (session->pool, + XCLIENT_HOST_UNAVAILABLE); + } + else { + session->hostname = memory_pool_strdup (session->pool, + XCLIENT_HOST_TEMPFAIL); + } + session->state = SMTP_PROXY_STATE_DELAY; + smtp_make_delay (session); + } + else { + if (reply->elements) { + elt = reply->elements->data; + session->hostname = memory_pool_strdup (session->pool, + elt->ptr.name); + session->state = SMTP_PROXY_STATE_RESOLVE_NORMAL; + make_dns_request (session->resolver, session->s, session->pool, + smtp_dns_cb, session, DNS_REQUEST_A, session->hostname); + + } + } + break; + case SMTP_PROXY_STATE_RESOLVE_NORMAL: + if (reply->code != DNS_RC_NOERROR) { + rspamd_conditional_debug (rspamd_main->logger, + session->client_addr.s_addr, __FUNCTION__, "DNS error: %s", + dns_strerror (reply->code)); + + if (reply->code == DNS_RC_NXDOMAIN) { + session->hostname = memory_pool_strdup (session->pool, + XCLIENT_HOST_UNAVAILABLE); + } + else { + session->hostname = memory_pool_strdup (session->pool, + XCLIENT_HOST_TEMPFAIL); + } + session->state = SMTP_PROXY_STATE_DELAY; + smtp_make_delay (session); + } + else { + res = 0; + cur = reply->elements; + while (cur) { + elt = cur->data; + if (memcmp (&session->client_addr, &elt->a.addr[0], + sizeof(struct in_addr)) == 0) { + res = 1; + session->resolved = TRUE; + break; + } + cur = g_list_next (cur); + } + + if (res == 0) { + msg_info( + "cannot find address for hostname: %s, ip: %s", session->hostname, inet_ntoa (session->client_addr)); + session->hostname = memory_pool_strdup (session->pool, + XCLIENT_HOST_UNAVAILABLE); + } + session->state = SMTP_PROXY_STATE_DELAY; + smtp_make_delay (session); + } + break; + default: + /* TODO: write something about pipelining */ + break; + } +} + +/* + * Callback that is called when there is data to read in buffer + */ +static gboolean +smtp_proxy_read_socket (f_str_t * in, void *arg) +{ + struct smtp_proxy_session *session = arg; + + /* This can be called only if client is using invalid pipelining */ + if (rspamd_dispatcher_write (session->dispatcher, + make_smtp_error (session->pool, 550, "%s Improper use of SMTP command pipelining", "5.5.0"), + 0, FALSE, TRUE)) { + destroy_session (session->s); + } + else { + return FALSE; + } + + return TRUE; +} + +/* + * Called if something goes wrong + */ +static void +smtp_proxy_err_socket (GError * err, void *arg) +{ + struct smtp_proxy_session *session = arg; + + msg_info ("abnormally closing connection, error: %s", err->message); + /* Free buffers */ + destroy_session (session->s); +} + +/* + * Accept new connection and construct session + */ +static void +accept_socket (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *)arg; + union sa_union su; + struct smtp_proxy_session *session; + struct smtp_proxy_ctx *ctx; + + socklen_t addrlen = sizeof (su.ss); + gint nfd; + + if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { + msg_warn ("accept failed: %s", strerror (errno)); + return; + } + /* Check for EAGAIN */ + if (nfd == 0) { + return; + } + + ctx = worker->ctx; + session = g_slice_alloc0 (sizeof (struct smtp_proxy_session)); + session->pool = memory_pool_new (memory_pool_get_size ()); + + if (su.ss.ss_family == AF_UNIX) { + msg_info ("accepted connection from unix socket"); + session->client_addr.s_addr = INADDR_NONE; + } + else if (su.ss.ss_family == AF_INET) { + msg_info ("accepted connection from %s port %d", inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port)); + memcpy (&session->client_addr, &su.s4.sin_addr, sizeof (struct in_addr)); + } + + session->sock = nfd; + session->worker = worker; + session->ctx = ctx; + session->resolver = ctx->resolver; + session->ev_base = ctx->ev_base; + worker->srv->stat->connections_count++; + + /* Resolve client's addr */ + /* Set up async session */ + session->s = new_async_session (session->pool, NULL, NULL, free_smtp_proxy_session, session); + session->state = SMTP_PROXY_STATE_RESOLVE_REVERSE; + if (! make_dns_request (session->resolver, session->s, session->pool, + smtp_dns_cb, session, DNS_REQUEST_PTR, &session->client_addr)) { + msg_err ("cannot resolve %s", inet_ntoa (session->client_addr)); + g_slice_free1 (sizeof (struct smtp_proxy_session), session); + close (nfd); + return; + } + else { + session->dispatcher = rspamd_create_dispatcher (session->ev_base, nfd, BUFFER_ANY, + smtp_proxy_read_socket, NULL, smtp_proxy_err_socket, + &session->ctx->smtp_timeout, session); + session->dispatcher->peer_addr = session->client_addr.s_addr; + } +} + +gpointer +init_smtp_proxy (void) +{ + struct smtp_proxy_ctx *ctx; + GQuark type; + + type = g_quark_try_string ("smtp_proxy"); + + ctx = g_malloc0 (sizeof (struct smtp_worker_ctx)); + ctx->pool = memory_pool_new (memory_pool_get_size ()); + + /* Set default values */ + ctx->smtp_timeout_raw = 300000; + ctx->smtp_delay = 0; + + register_worker_opt (type, "upstreams", xml_handle_string, ctx, + G_STRUCT_OFFSET (struct smtp_proxy_ctx, upstreams_str)); + register_worker_opt (type, "timeout", xml_handle_seconds, ctx, + G_STRUCT_OFFSET (struct smtp_proxy_ctx, smtp_timeout_raw)); + register_worker_opt (type, "delay", xml_handle_seconds, ctx, + G_STRUCT_OFFSET (struct smtp_proxy_ctx, smtp_delay)); + register_worker_opt (type, "jitter", xml_handle_seconds, ctx, + G_STRUCT_OFFSET (struct smtp_proxy_ctx, delay_jitter)); + + return ctx; +} + +/* Make post-init configuration */ +static gboolean +config_smtp_proxy_worker (struct rspamd_worker *worker) +{ + struct smtp_proxy_ctx *ctx = worker->ctx; + gchar *value; + + /* Init timeval */ + msec_to_tv (ctx->smtp_timeout_raw, &ctx->smtp_timeout); + + /* Init upstreams */ + if ((value = ctx->upstreams_str) != NULL) { + if (!parse_upstreams_line (ctx->pool, ctx->upstreams, value, &ctx->upstream_num)) { + return FALSE; + } + } + else { + msg_err ("no upstreams defined, don't know what to do"); + return FALSE; + } + + return TRUE; +} + +/* + * Start worker process + */ +void +start_smtp_proxy (struct rspamd_worker *worker) +{ + struct sigaction signals; + struct smtp_worker_ctx *ctx = worker->ctx; + + gperf_profiler_init (worker->srv->cfg, "worker"); + + worker->srv->pid = getpid (); + ctx->ev_base = event_init (); + + /* Set smtp options */ + if ( !config_smtp_proxy_worker (worker)) { + msg_err ("cannot configure smtp worker, exiting"); + exit (EXIT_SUCCESS); + } + + init_signals (&signals, sig_handler); + sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + + /* SIGUSR2 handler */ + signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker); + event_base_set (ctx->ev_base, &worker->sig_ev_usr2); + signal_add (&worker->sig_ev_usr2, NULL); + + /* SIGUSR1 handler */ + signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler, (void *) worker); + event_base_set (ctx->ev_base, &worker->sig_ev_usr1); + signal_add (&worker->sig_ev_usr1, NULL); + + /* Accept event */ + event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); + event_base_set (ctx->ev_base, &worker->bind_ev); + event_add (&worker->bind_ev, NULL); + + /* DNS resolver */ + ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg); + + /* Set umask */ + umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); + + event_base_loop (ctx->ev_base, 0); + + close_log (rspamd_main->logger); + exit (EXIT_SUCCESS); +} + |