/* Copyright (c) 2010-2012, Vsevolod Stakhov * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "config.h" #include "utlist.h" #include "libserver/proxy.h" #include "main.h" #include "smtp.h" /* * SMTP proxy is a simple smtp proxy worker for dns resolving and * load balancing. It uses XCLIENT command and is designed for MTA * that supports that (postfix and exim). */ /* Upstream timeouts */ #define DEFAULT_UPSTREAM_ERROR_TIME 10 #define DEFAULT_UPSTREAM_DEAD_TIME 300 #define DEFAULT_UPSTREAM_MAXERRORS 10 #define DEFAULT_PROXY_BUF_LEN 100 * 1024 #define SMTP_MAXERRORS 15 /* Init functions */ gpointer init_smtp_proxy (struct rspamd_config *cfg); void start_smtp_proxy (struct rspamd_worker *worker); worker_t smtp_proxy_worker = { "smtp_proxy", /* Name */ init_smtp_proxy, /* Init function */ start_smtp_proxy, /* Start function */ TRUE, /* Has socket */ FALSE, /* Non unique */ FALSE, /* Non threaded */ TRUE, /* Killable */ SOCK_STREAM /* TCP socket */ }; struct smtp_proxy_ctx { struct upstream_list *upstreams; gchar *upstreams_str; rspamd_mempool_t *pool; guint32 smtp_delay; guint32 delay_jitter; guint32 smtp_timeout_raw; struct timeval smtp_timeout; gboolean use_xclient; gboolean instant_reject; gsize proxy_buf_len; struct rspamd_dns_resolver *resolver; struct event_base *ev_base; GList *rbls; }; enum rspamd_smtp_proxy_state { SMTP_PROXY_STATE_RESOLVE_REVERSE = 0, SMTP_PROXY_STATE_RESOLVE_NORMAL, SMTP_PROXY_STATE_RESOLVE_RBL, SMTP_PROXY_STATE_DELAY, SMTP_PROXY_STATE_GREETING, SMTP_PROXY_STATE_XCLIENT, SMTP_PROXY_STATE_PROXY, SMTP_PROXY_STATE_REJECT, SMTP_PROXY_STATE_REJECT_EMULATE }; struct smtp_proxy_session { struct smtp_proxy_ctx *ctx; rspamd_mempool_t *pool; enum rspamd_smtp_proxy_state state; struct rspamd_worker *worker; struct in_addr client_addr; gchar *ptr_str; gchar *hostname; gchar *error; gchar *temp_name; gint sock; gint upstream_sock; struct rspamd_async_session *s; rspamd_io_dispatcher_t *dispatcher; rspamd_proxy_t *proxy; struct upstream *upstream; struct event *delay_timer; struct event upstream_ev; gboolean resolved; struct rspamd_dns_resolver *resolver; struct event_base *ev_base; GString *upstream_greeting; guint rbl_requests; gchar *dnsbl_applied; gchar *from; gchar *rcpt; guint errors; }; static void free_smtp_proxy_session (gpointer arg) { struct smtp_proxy_session *session = arg; static const char fatal_smtp_error[] = "521 5.2.1 Internal error" CRLF; if (session) { if (session->dispatcher) { rspamd_remove_dispatcher (session->dispatcher); } if (session->upstream_greeting) { g_string_free (session->upstream_greeting, TRUE); } if (session->state != SMTP_PROXY_STATE_PROXY && session->state != SMTP_PROXY_STATE_REJECT && session->state != SMTP_PROXY_STATE_REJECT_EMULATE) { /* Send 521 fatal error */ if (write (session->sock, fatal_smtp_error, sizeof (fatal_smtp_error)) == -1) { msg_err ("write error to client failed: %s", strerror (errno)); } } else if ((session->state == SMTP_PROXY_STATE_REJECT || session->state == SMTP_PROXY_STATE_REJECT_EMULATE) && session->from && session->rcpt && session->dnsbl_applied) { msg_info ("reject by %s mail from <%s> to <%s>, ip: %s", session->dnsbl_applied, session->from, session->rcpt, inet_ntoa (session->client_addr)); } close (session->sock); if (session->proxy) { rspamd_proxy_close (session->proxy); } if (session->ptr_str) { free (session->ptr_str); } if (session->upstream_sock != -1) { event_del (&session->upstream_ev); close (session->upstream_sock); } rspamd_mempool_delete (session->pool); g_slice_free1 (sizeof (struct smtp_proxy_session), session); } } static void smtp_proxy_err_proxy (GError * err, void *arg) { struct smtp_proxy_session *session = arg; if (err) { g_error_free (err); msg_info ("abnormally closing connection, error: %s", err->message); } /* Free buffers */ session->state = SMTP_PROXY_STATE_REJECT; destroy_session (session->s); } /** * Check whether SMTP greeting is valid * @param s * @return */ static gint check_valid_smtp_greeting (GString *s) { gchar *p; p = s->str + s->len - 1; if (s->len < 6 || (*p != '\n' || *(p - 1) != '\r')) { return 1; } p -= 5; while (p >= s->str) { /* It is fast to use memcmp here as we compare only 4 bytes */ if (memcmp (p, "220 ", 4) == 0) { /* Check position */ if (p == s->str || *(p - 1) == '\n') { return 1; } return 0; } else if ((*p == '5' || *p == '4' || *p == '3') && g_ascii_isdigit (p[1]) && g_ascii_isdigit (p[2]) && p[3] == ' ') { return -1; } p--; } return 1; } /* * Handle upstream greeting */ static void smtp_proxy_greeting_handler (gint fd, short what, void *arg) { struct smtp_proxy_session *session = arg; gint r; gchar read_buf[BUFSIZ]; if (what == EV_READ) { if (session->state == SMTP_PROXY_STATE_GREETING) { /* Fill greeting buffer with new portion of data */ r = read (fd, read_buf, sizeof (read_buf) - 1); if (r > 0) { g_string_append_len (session->upstream_greeting, read_buf, r); /* Now search line with 220 */ r = check_valid_smtp_greeting (session->upstream_greeting); if (r == 1) { /* Send xclient */ if (session->ctx->use_xclient) { r = rspamd_snprintf (read_buf, sizeof (read_buf), "XCLIENT NAME=%s ADDR=%s" CRLF, session->hostname, inet_ntoa (session->client_addr)); r = write (session->upstream_sock, read_buf, r); if (r < 0 && errno == EAGAIN) { /* Add write event */ event_del (&session->upstream_ev); event_set (&session->upstream_ev, session->upstream_sock, EV_WRITE, smtp_proxy_greeting_handler, session); event_base_set (session->ev_base, &session->upstream_ev); event_add (&session->upstream_ev, NULL); } else if (r > 0) { session->upstream_greeting->len = 0; session->state = SMTP_PROXY_STATE_XCLIENT; } else { msg_info ("connection with %s got write error: %s", inet_ntoa (session->client_addr), strerror (errno)); destroy_session (session->s); } } else { event_del (&session->upstream_ev); /* Start direct proxy */ r = write (session->sock, session->upstream_greeting->str, session->upstream_greeting->len); /* TODO: handle client's error here */ if (r > 0) { session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool, session->ev_base, session->ctx->proxy_buf_len, &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session); session->state = SMTP_PROXY_STATE_PROXY; } else { msg_info ("connection with %s got write error: %s", inet_ntoa (session->client_addr), strerror (errno)); destroy_session (session->s); } } } else if (r == -1) { /* Proxy sent 500 error */ msg_info ("connection with %s got smtp error for greeting", rspamd_upstream_name (session->upstream)); destroy_session (session->s); } } else { msg_info ("connection with %s got read error: %s", rspamd_upstream_name (session->upstream), strerror (errno)); destroy_session (session->s); } } else if (session->state == SMTP_PROXY_STATE_XCLIENT) { /* Fill greeting buffer with new portion of data */ r = read (fd, read_buf, sizeof (read_buf) - 1); if (r > 0) { g_string_append_len (session->upstream_greeting, read_buf, r); /* Now search line with 220 */ r = check_valid_smtp_greeting (session->upstream_greeting); if (r == 1) { event_del (&session->upstream_ev); /* Start direct proxy */ r = write (session->sock, session->upstream_greeting->str, session->upstream_greeting->len); /* TODO: handle client's error here */ if (r > 0) { session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool, session->ev_base, session->ctx->proxy_buf_len, &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session); session->state = SMTP_PROXY_STATE_PROXY; } else { msg_info ("connection with %s got write error: %s", inet_ntoa (session->client_addr), strerror (errno)); destroy_session (session->s); } } else if (r == -1) { /* Proxy sent 500 error */ msg_info ("connection with %s got smtp error for xclient", rspamd_upstream_name (session->upstream)); destroy_session (session->s); } } } else { msg_info ("connection with %s got read event at improper state: %d", rspamd_upstream_name (session->upstream), session->state); destroy_session (session->s); } } else if (what == EV_WRITE) { if (session->state == SMTP_PROXY_STATE_GREETING) { /* Send xclient again */ r = rspamd_snprintf (read_buf, sizeof (read_buf), "XCLIENT NAME=%s ADDR=%s" CRLF, session->hostname, inet_ntoa (session->client_addr)); r = write (session->upstream_sock, read_buf, r); if (r < 0 && errno == EAGAIN) { /* Add write event */ event_del (&session->upstream_ev); event_set (&session->upstream_ev, session->upstream_sock, EV_WRITE, smtp_proxy_greeting_handler, session); event_base_set (session->ev_base, &session->upstream_ev); event_add (&session->upstream_ev, NULL); } else if (r > 0) { session->upstream_greeting->len = 0; session->state = SMTP_PROXY_STATE_XCLIENT; event_del (&session->upstream_ev); event_set (&session->upstream_ev, session->upstream_sock, EV_READ | EV_PERSIST, smtp_proxy_greeting_handler, session); event_base_set (session->ev_base, &session->upstream_ev); event_add (&session->upstream_ev, NULL); } else { msg_info ("connection with %s got write error: %s", rspamd_upstream_name (session->upstream), strerror (errno)); destroy_session (session->s); } } else { msg_info ( "connection with %s got write event at improper state: %d", rspamd_upstream_name (session->upstream), session->state); destroy_session (session->s); } } else { /* Timeout */ msg_info ("connection with %s timed out", rspamd_upstream_name (session->upstream)); destroy_session (session->s); } } static gboolean create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session) { struct upstream *selected; /* Try to select upstream */ selected = rspamd_upstream_get (session->ctx->upstreams, RSPAMD_UPSTREAM_ROUND_ROBIN); if (selected == NULL) { msg_err ("no upstreams suitable found"); return FALSE; } session->upstream = selected; /* Now try to create socket */ session->upstream_sock = rspamd_inet_address_connect ( rspamd_upstream_addr (selected), SOCK_STREAM, TRUE); if (session->upstream_sock == -1) { msg_err ("cannot make a connection to %s", rspamd_upstream_name (session->upstream)); rspamd_upstream_fail (selected); return FALSE; } /* Create a proxy for upstream connection */ rspamd_dispatcher_pause (session->dispatcher); /* First of all get upstream's greeting */ session->state = SMTP_PROXY_STATE_GREETING; event_set (&session->upstream_ev, session->upstream_sock, EV_READ | EV_PERSIST, smtp_proxy_greeting_handler, session); event_base_set (session->ev_base, &session->upstream_ev); event_add (&session->upstream_ev, &session->ctx->smtp_timeout); session->upstream_greeting = g_string_sized_new (BUFSIZ); return TRUE; } static void smtp_dnsbl_cb (struct rdns_reply *reply, void *arg) { struct smtp_proxy_session *session = arg; const gchar *p; gint dots = 0; const struct rdns_request_name *req_name; session->rbl_requests--; req_name = rdns_request_get_name (reply->request, NULL); msg_debug ("got reply for %s: %s", req_name[0].name, rdns_strerror (reply->code)); if (session->state != SMTP_PROXY_STATE_REJECT) { if (reply->code == RDNS_RC_NOERROR) { /* This means that address is in dnsbl */ p = req_name[0].name; while (*p) { if (*p == '.') { dots++; } if (dots == 4) { session->dnsbl_applied = (gchar *)p + 1; break; } p++; } session->state = SMTP_PROXY_STATE_REJECT; } } if (session->rbl_requests == 0) { if (session->state != SMTP_PROXY_STATE_REJECT) { /* Make proxy */ if (!create_smtp_proxy_upstream_connection (session)) { rspamd_dispatcher_restore (session->dispatcher); } } else { if (session->ctx->instant_reject) { msg_info ("reject %s is denied by dnsbl: %s", inet_ntoa (session->client_addr), session->dnsbl_applied); if (!rspamd_dispatcher_write (session->dispatcher, make_smtp_error (session->pool, 521, "%s Client denied by %s", "5.2.1", session->dnsbl_applied), 0, FALSE, TRUE)) { msg_err ("cannot write smtp error"); } } else { /* Emulate fake smtp session */ rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, 0); if (!rspamd_dispatcher_write (session->dispatcher, make_smtp_error (session->pool, 220, "smtp ready"), 0, FALSE, TRUE)) { msg_err ("cannot write smtp reply"); } } rspamd_dispatcher_restore (session->dispatcher); } } } /* * Create requests to all rbls */ static void make_rbl_requests (struct smtp_proxy_session *session) { GList *cur; gchar *p, *dst; guint len; cur = session->ctx->rbls; while (cur) { len = INET_ADDRSTRLEN + strlen (cur->data) + 1; dst = rspamd_mempool_alloc (session->pool, len); /* Print ipv4 addr */ p = (gchar *)&session->client_addr.s_addr; rspamd_snprintf (dst, len, "%ud.%ud.%ud.%ud.%s", (guint)p[3], (guint)p[2], (guint)p[1], (guint)p[0], cur->data); if (make_dns_request (session->resolver, session->s, session->pool, smtp_dnsbl_cb, session, RDNS_REQUEST_A, dst)) { session->rbl_requests++; msg_debug ("send request to %s", dst); } cur = g_list_next (cur); } if (session->rbl_requests == 0) { /* Create proxy */ if (!create_smtp_proxy_upstream_connection (session)) { rspamd_dispatcher_restore (session->dispatcher); } } } /* Resolving and delay handlers */ /* * Return from a delay */ static void smtp_delay_handler (gint fd, short what, void *arg) { struct smtp_proxy_session *session = arg; remove_normal_event (session->s, (event_finalizer_t) event_del, session->delay_timer); if (session->state == SMTP_PROXY_STATE_DELAY) { /* TODO: Create upstream connection here */ if (session->ctx->rbls) { make_rbl_requests (session); } else { if (!create_smtp_proxy_upstream_connection (session)) { rspamd_dispatcher_restore (session->dispatcher); } } } else { /* TODO: Write error here */ session->state = SMTP_PROXY_STATE_REJECT; if (!rspamd_dispatcher_write (session->dispatcher, make_smtp_error (session->pool, 521, "%s Improper use of SMTP command pipelining", "5.2.1"), 0, FALSE, TRUE)) { msg_err ("cannot write smtp error"); } rspamd_dispatcher_restore (session->dispatcher); } } /* * Make delay for a client */ static void smtp_make_delay (struct smtp_proxy_session *session) { struct event *tev; struct timeval *tv; gint32 jitter; if (session->ctx->smtp_delay != 0 && session->state == SMTP_PROXY_STATE_DELAY) { tev = rspamd_mempool_alloc (session->pool, sizeof(struct event)); tv = rspamd_mempool_alloc (session->pool, sizeof(struct timeval)); if (session->ctx->delay_jitter != 0) { jitter = g_random_int_range (0, session->ctx->delay_jitter); msec_to_tv (session->ctx->smtp_delay + jitter, tv); } else { msec_to_tv (session->ctx->smtp_delay, tv); } evtimer_set (tev, smtp_delay_handler, session); evtimer_add (tev, tv); register_async_event (session->s, (event_finalizer_t) event_del, tev, g_quark_from_static_string ("smtp proxy")); session->delay_timer = tev; } else if (session->state == SMTP_PROXY_STATE_DELAY) { /* TODO: Create upstream connection here */ if (session->ctx->rbls) { make_rbl_requests (session); } else { if (!create_smtp_proxy_upstream_connection (session)) { rspamd_dispatcher_restore (session->dispatcher); } } } } /* * Handle DNS replies */ static void smtp_dns_cb (struct rdns_reply *reply, void *arg) { struct smtp_proxy_session *session = arg; gint res = 0; struct rdns_reply_entry *elt; GList *cur; switch (session->state) { case SMTP_PROXY_STATE_RESOLVE_REVERSE: /* Parse reverse reply and start resolve of this ip */ if (reply->code != RDNS_RC_NOERROR) { rspamd_conditional_debug (rspamd_main->logger, NULL, __FUNCTION__, "DNS error: %s", rdns_strerror (reply->code)); if (reply->code == RDNS_RC_NXDOMAIN) { session->hostname = rspamd_mempool_strdup (session->pool, XCLIENT_HOST_UNAVAILABLE); } else { session->hostname = rspamd_mempool_strdup (session->pool, XCLIENT_HOST_TEMPFAIL); } session->state = SMTP_PROXY_STATE_DELAY; smtp_make_delay (session); } else { if (reply->entries) { elt = reply->entries; session->hostname = rspamd_mempool_strdup (session->pool, elt->content.ptr.name); session->state = SMTP_PROXY_STATE_RESOLVE_NORMAL; make_dns_request (session->resolver, session->s, session->pool, smtp_dns_cb, session, RDNS_REQUEST_A, session->hostname); } } break; case SMTP_PROXY_STATE_RESOLVE_NORMAL: if (reply->code != RDNS_RC_NOERROR) { rspamd_conditional_debug (rspamd_main->logger, NULL, __FUNCTION__, "DNS error: %s", rdns_strerror (reply->code)); if (reply->code == RDNS_RC_NXDOMAIN) { session->hostname = rspamd_mempool_strdup (session->pool, XCLIENT_HOST_UNAVAILABLE); } else { session->hostname = rspamd_mempool_strdup (session->pool, XCLIENT_HOST_TEMPFAIL); } session->state = SMTP_PROXY_STATE_DELAY; smtp_make_delay (session); } else { res = 0; LL_FOREACH (reply->entries, elt) { if (memcmp (&session->client_addr, &elt->content.a.addr, sizeof(struct in_addr)) == 0) { res = 1; session->resolved = TRUE; break; } cur = g_list_next (cur); } if (res == 0) { msg_info ( "cannot find address for hostname: %s, ip: %s", session->hostname, inet_ntoa (session->client_addr)); session->hostname = rspamd_mempool_strdup (session->pool, XCLIENT_HOST_UNAVAILABLE); } session->state = SMTP_PROXY_STATE_DELAY; smtp_make_delay (session); } break; default: /* TODO: write something about pipelining */ break; } } static void proxy_parse_smtp_input (f_str_t *line, struct smtp_proxy_session *session) { gchar *p, *c, *end; gsize len; p = line->begin; end = line->begin + line->len; if (line->len >= sizeof("rcpt to: ") - 1 && (*p == 'r' || *p == 'R') && session->rcpt == NULL) { if (g_ascii_strncasecmp (p, "rcpt to: ", sizeof ("rcpt to: ") - 1) == 0) { p += sizeof ("rcpt to: ") - 1; /* Skip spaces */ while ((g_ascii_isspace (*p) || *p == '<') && p < end) { p++; } c = p; while (!(g_ascii_isspace (*p) || *p == '>') && p < end) { p++; } len = p - c; session->rcpt = rspamd_mempool_alloc (session->pool, len + 1); rspamd_strlcpy (session->rcpt, c, len + 1); } } else if (line->len >= sizeof("mail from: ") - 1 && (*p == 'm' || *p == 'M') && session->from == NULL) { if (g_ascii_strncasecmp (p, "mail from: ", sizeof ("mail from: ") - 1) == 0) { p += sizeof ("mail from: ") - 1; /* Skip spaces */ while ((g_ascii_isspace (*p) || *p == '<') && p < end) { p++; } c = p; while (!(g_ascii_isspace (*p) || *p == '>') && p < end) { p++; } len = p - c; session->from = rspamd_mempool_alloc (session->pool, len + 1); rspamd_strlcpy (session->from, c, len + 1); } } else if (line->len >= sizeof ("quit") - 1 && (*p == 'q' || *p == 'Q')) { if (g_ascii_strncasecmp (p, "quit", sizeof ("quit") - 1) == 0) { session->state = SMTP_PROXY_STATE_REJECT; } } } /* * Callback that is called when there is data to read in buffer */ static gboolean smtp_proxy_read_socket (f_str_t * in, void *arg) { struct smtp_proxy_session *session = arg; gchar *p; if (session->state != SMTP_PROXY_STATE_REJECT_EMULATE) { /* This can be called only if client is using invalid pipelining */ session->state = SMTP_PROXY_STATE_REJECT; if (!rspamd_dispatcher_write (session->dispatcher, make_smtp_error (session->pool, 521, "%s Improper use of SMTP command pipelining", "5.2.1"), 0, FALSE, TRUE)) { msg_err ("cannot write smtp error"); } destroy_session (session->s); } else { /* Try to extract data */ p = in->begin; if (in->len >= sizeof ("helo") - 1 && (*p == 'h' || *p == 'H' || *p == 'e' || *p == 'E')) { return rspamd_dispatcher_write (session->dispatcher, "220 smtp ready" CRLF, 0, FALSE, TRUE); } else if (in->len > 0) { proxy_parse_smtp_input (in, session); } if (session->state == SMTP_PROXY_STATE_REJECT) { /* Received QUIT command */ if (!rspamd_dispatcher_write (session->dispatcher, "221 2.0.0 Bye" CRLF, 0, FALSE, TRUE)) { msg_err ("cannot write smtp error"); } destroy_session (session->s); return FALSE; } if (session->rcpt != NULL) { session->errors++; if (session->errors > SMTP_MAXERRORS) { if (!rspamd_dispatcher_write (session->dispatcher, "521 5.2.1 Maximum errors reached" CRLF, 0, FALSE, TRUE)) { msg_err ("cannot write smtp error"); } destroy_session (session->s); return FALSE; } return rspamd_dispatcher_write (session->dispatcher, make_smtp_error (session->pool, 521, "%s Client denied by %s", "5.2.1", session->dnsbl_applied), 0, FALSE, TRUE); } else { return rspamd_dispatcher_write (session->dispatcher, "250 smtp ready" CRLF, 0, FALSE, TRUE); } } return FALSE; } /* * Actually called only if something goes wrong */ static gboolean smtp_proxy_write_socket (void *arg) { struct smtp_proxy_session *session = arg; if (session->ctx->instant_reject) { destroy_session (session->s); return FALSE; } else { session->state = SMTP_PROXY_STATE_REJECT_EMULATE; } return TRUE; } /* * Called if something goes wrong */ static void smtp_proxy_err_socket (GError * err, void *arg) { struct smtp_proxy_session *session = arg; if (err) { if (err->code == ETIMEDOUT) { /* Write smtp error */ if (!rspamd_dispatcher_write (session->dispatcher, "421 4.4.2 Error: timeout exceeded" CRLF, 0, FALSE, TRUE)) { msg_err ("cannot write smtp error"); } } msg_info ("abnormally closing connection, error: %s", err->message); g_error_free (err); } /* Free buffers */ destroy_session (session->s); } /* * Accept new connection and construct session */ static void accept_socket (gint fd, short what, void *arg) { struct rspamd_worker *worker = (struct rspamd_worker *)arg; struct smtp_proxy_session *session; struct smtp_proxy_ctx *ctx; rspamd_inet_addr_t addr; gint nfd; ctx = worker->ctx; if ((nfd = rspamd_accept_from_socket (fd, &addr)) == -1) { msg_warn ("accept failed: %s", strerror (errno)); return; } /* Check for EAGAIN */ if (nfd == 0) { return; } msg_info ("accepted connection from %s port %d", rspamd_inet_address_to_string (&addr), rspamd_inet_address_get_port (&addr)); ctx = worker->ctx; session = g_slice_alloc0 (sizeof (struct smtp_proxy_session)); session->pool = rspamd_mempool_new (rspamd_mempool_suggest_size ()); session->sock = nfd; session->worker = worker; session->ctx = ctx; session->resolver = ctx->resolver; session->ev_base = ctx->ev_base; session->upstream_sock = -1; session->ptr_str = rdns_generate_ptr_from_str (rspamd_inet_address_to_string ( &addr)); worker->srv->stat->connections_count++; /* Resolve client's addr */ /* Set up async session */ session->s = new_async_session (session->pool, NULL, NULL, free_smtp_proxy_session, session); session->state = SMTP_PROXY_STATE_RESOLVE_REVERSE; if (!make_dns_request (session->resolver, session->s, session->pool, smtp_dns_cb, session, RDNS_REQUEST_PTR, session->ptr_str)) { msg_err ("cannot resolve %s", inet_ntoa (session->client_addr)); g_slice_free1 (sizeof (struct smtp_proxy_session), session); close (nfd); return; } else { session->dispatcher = rspamd_create_dispatcher (session->ev_base, nfd, BUFFER_ANY, smtp_proxy_read_socket, smtp_proxy_write_socket, smtp_proxy_err_socket, &session->ctx->smtp_timeout, session); session->dispatcher->peer_addr = session->client_addr.s_addr; } } gpointer init_smtp_proxy (struct rspamd_config *cfg) { struct smtp_proxy_ctx *ctx; GQuark type; type = g_quark_try_string ("smtp_proxy"); ctx = g_malloc0 (sizeof (struct smtp_worker_ctx)); ctx->pool = rspamd_mempool_new (rspamd_mempool_suggest_size ()); /* Set default values */ ctx->smtp_timeout_raw = 300000; ctx->smtp_delay = 0; ctx->instant_reject = TRUE; rspamd_rcl_register_worker_option (cfg, type, "upstreams", rspamd_rcl_parse_struct_string, ctx, G_STRUCT_OFFSET (struct smtp_proxy_ctx, upstreams_str), 0); rspamd_rcl_register_worker_option (cfg, type, "timeout", rspamd_rcl_parse_struct_time, ctx, G_STRUCT_OFFSET (struct smtp_proxy_ctx, smtp_timeout_raw), RSPAMD_CL_FLAG_TIME_UINT_32); rspamd_rcl_register_worker_option (cfg, type, "delay", rspamd_rcl_parse_struct_time, ctx, G_STRUCT_OFFSET (struct smtp_proxy_ctx, smtp_delay), RSPAMD_CL_FLAG_TIME_UINT_32); rspamd_rcl_register_worker_option (cfg, type, "jitter", rspamd_rcl_parse_struct_time, ctx, G_STRUCT_OFFSET (struct smtp_proxy_ctx, delay_jitter), RSPAMD_CL_FLAG_TIME_UINT_32); rspamd_rcl_register_worker_option (cfg, type, "xclient", rspamd_rcl_parse_struct_boolean, ctx, G_STRUCT_OFFSET (struct smtp_proxy_ctx, use_xclient), 0); rspamd_rcl_register_worker_option (cfg, type, "instant_reject", rspamd_rcl_parse_struct_boolean, ctx, G_STRUCT_OFFSET (struct smtp_proxy_ctx, instant_reject), 0); rspamd_rcl_register_worker_option (cfg, type, "proxy_buffer", rspamd_rcl_parse_struct_integer, ctx, G_STRUCT_OFFSET (struct smtp_proxy_ctx, proxy_buf_len), RSPAMD_CL_FLAG_INT_32); rspamd_rcl_register_worker_option (cfg, type, "dnsbl", rspamd_rcl_parse_struct_string_list, ctx, G_STRUCT_OFFSET (struct smtp_proxy_ctx, rbls), 0); return ctx; } /* Make post-init configuration */ static gboolean config_smtp_proxy_worker (struct rspamd_worker *worker) { struct smtp_proxy_ctx *ctx = worker->ctx; gchar *value; /* Init timeval */ msec_to_tv (ctx->smtp_timeout_raw, &ctx->smtp_timeout); /* Init upstreams */ if ((value = ctx->upstreams_str) != NULL) { if (!rspamd_upstreams_parse_line (ctx->upstreams, value, 25, NULL)) { msg_err ("cannot parse any valid upstream"); return FALSE; } } else { msg_err ("no upstreams defined, don't know what to do"); return FALSE; } if (ctx->proxy_buf_len == 0) { ctx->proxy_buf_len = DEFAULT_PROXY_BUF_LEN; } return TRUE; } /* * Start worker process */ void start_smtp_proxy (struct rspamd_worker *worker) { struct smtp_proxy_ctx *ctx = worker->ctx; ctx->ev_base = rspamd_prepare_worker (worker, "smtp_proxy", accept_socket); /* Set smtp options */ if ( !config_smtp_proxy_worker (worker)) { msg_err ("cannot configure smtp worker, exiting"); exit (EXIT_SUCCESS); } /* DNS resolver */ ctx->resolver = dns_resolver_init (worker->srv->logger, ctx->ev_base, worker->srv->cfg); rspamd_upstreams_library_init (ctx->resolver->r, ctx->ev_base); rspamd_upstreams_library_config (worker->srv->cfg); /* Set umask */ umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); event_base_loop (ctx->ev_base, 0); close_log (rspamd_main->logger); exit (EXIT_SUCCESS); }