From fe5e1614f36236654623667df4f317ae5e5e1806 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 23 Apr 2012 21:40:01 +0400 Subject: [PATCH] * Add support of XCLIENT to the smtp proxy. --- src/proxy.c | 17 ++-- src/proxy.h | 3 + src/smtp_proxy.c | 240 ++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 239 insertions(+), 21 deletions(-) diff --git a/src/proxy.c b/src/proxy.c index bd8503c71..b6f1917ae 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -34,14 +34,17 @@ proxy_error_quark (void) return g_quark_from_static_string ("proxy-error"); } -static void +void rspamd_proxy_close (rspamd_proxy_t *proxy) { - close (proxy->cfd); - close (proxy->bfd); + if (!proxy->closed) { + close (proxy->cfd); + close (proxy->bfd); - event_del (&proxy->client_ev); - event_del (&proxy->backend_ev); + event_del (&proxy->client_ev); + event_del (&proxy->backend_ev); + proxy->closed = TRUE; + } } static void @@ -215,8 +218,8 @@ rspamd_create_proxy (gint cfd, gint bfd, memory_pool_t *pool, struct event_base new = memory_pool_alloc0 (pool, sizeof (rspamd_proxy_t)); - new->cfd = cfd; - new->bfd = bfd; + new->cfd = dup (cfd); + new->bfd = dup (bfd); new->pool = pool; new->base = base; new->bufsize = bufsize; diff --git a/src/proxy.h b/src/proxy.h index 6ebb08d56..970957e56 100644 --- a/src/proxy.h +++ b/src/proxy.h @@ -48,6 +48,7 @@ typedef struct rspamd_proxy_s { gint buf_offset; /**< offset to write */ gpointer user_data; /**< user's data for callbacks */ struct timeval *tv; /**< timeout for communications */ + gboolean closed; /**< whether descriptors are closed */ } rspamd_proxy_t; /** @@ -63,4 +64,6 @@ rspamd_proxy_t* rspamd_create_proxy (gint cfd, gint bfd, memory_pool_t *pool, struct event_base *base, gsize bufsize, struct timeval *tv, dispatcher_err_callback_t err_cb, gpointer ud); +void rspamd_proxy_close (rspamd_proxy_t *proxy); + #endif /* PROXY_H_ */ diff --git a/src/smtp_proxy.c b/src/smtp_proxy.c index 1ada5ff1e..97e56a956 100644 --- a/src/smtp_proxy.c +++ b/src/smtp_proxy.c @@ -85,7 +85,10 @@ struct smtp_proxy_ctx { enum rspamd_smtp_proxy_state { SMTP_PROXY_STATE_RESOLVE_REVERSE = 0, SMTP_PROXY_STATE_RESOLVE_NORMAL, - SMTP_PROXY_STATE_DELAY + SMTP_PROXY_STATE_DELAY, + SMTP_PROXY_STATE_GREETING, + SMTP_PROXY_STATE_XCLIENT, + SMTP_PROXY_STATE_PROXY }; struct smtp_proxy_session { @@ -109,10 +112,13 @@ struct smtp_proxy_session { struct smtp_upstream *upstream; struct event *delay_timer; + struct event upstream_ev; gboolean resolved; struct rspamd_dns_resolver *resolver; struct event_base *ev_base; + + GString *upstream_greeting; }; #ifndef HAVE_SA_SIGINFO @@ -180,13 +186,31 @@ static void free_smtp_proxy_session (gpointer arg) { struct smtp_proxy_session *session = arg; + static const char fatal_smtp_error[] = "521 5.2.1 Internal error" CRLF; if (session) { if (session->dispatcher) { rspamd_remove_dispatcher (session->dispatcher); } + if (session->upstream_greeting) { + g_string_free (session->upstream_greeting, TRUE); + } + + if (session->state != SMTP_PROXY_STATE_PROXY) { + /* Send 521 fatal error */ + write (session->sock, fatal_smtp_error, sizeof (fatal_smtp_error)); + } + close (session->sock); + + if (session->proxy) { + rspamd_proxy_close (session->proxy); + } + if (session->upstream_sock != -1) { + event_del (&session->upstream_ev); + close (session->upstream_sock); + } memory_pool_delete (session->pool); g_slice_free1 (sizeof (struct smtp_proxy_session), session); } @@ -205,11 +229,193 @@ smtp_proxy_err_proxy (GError * err, void *arg) destroy_session (session->s); } +/** + * Check whether SMTP greeting is valid + * @param s + * @return + */ +static gint +check_valid_smtp_greeting (GString *s) +{ + gchar *p; + + p = s->str + s->len - 1; + if (s->len < 6 || (*p != '\n' || *(p - 1) != '\r')) { + return 1; + } + p -= 5; + + while (p >= s->str) { + /* It is fast to use memcmp here as we compare only 4 bytes */ + if (memcmp (p, "220 ", 4) == 0) { + /* Check position */ + if (p == s->str || *(p - 1) == '\n') { + return 1; + } + return 0; + } + else if ((*p == '5' || *p == '4' || *p == '3') && + g_ascii_isdigit (p[1]) && g_ascii_isdigit (p[2]) && p[3] == ' ') { + return -1; + } + p --; + } + + return 1; +} + +/* + * Handle upstream greeting + */ + +static void +smtp_proxy_greeting_handler (gint fd, short what, void *arg) +{ + struct smtp_proxy_session *session = arg; + gint r; + gchar read_buf[BUFSIZ]; + + if (what == EV_READ) { + if (session->state == SMTP_PROXY_STATE_GREETING) { + /* Fill greeting buffer with new portion of data */ + r = read (fd, read_buf, sizeof (read_buf) - 1); + if (r > 0) { + g_string_append_len (session->upstream_greeting, read_buf, r); + /* Now search line with 220 */ + r = check_valid_smtp_greeting (session->upstream_greeting); + if (r == 1) { + /* Send xclient */ + if (session->ctx->use_xclient) { + r = rspamd_snprintf (read_buf, sizeof (read_buf), "XCLIENT NAME=%s ADDR=%s" CRLF, + session->hostname, inet_ntoa (session->client_addr)); + r = write (session->upstream_sock, read_buf, r); + + if (r < 0 && errno == EAGAIN) { + /* Add write event */ + event_del (&session->upstream_ev); + event_set (&session->upstream_ev, session->upstream_sock, + EV_WRITE, smtp_proxy_greeting_handler, session); + event_base_set (session->ev_base, &session->upstream_ev); + event_add (&session->upstream_ev, NULL); + } + else if (r > 0) { + session->upstream_greeting->len = 0; + session->state = SMTP_PROXY_STATE_XCLIENT; + } + else { + msg_info ("connection with %s got write error: %s", inet_ntoa (session->client_addr), strerror (errno)); + destroy_session (session->s); + } + } + else { + event_del (&session->upstream_ev); + /* Start direct proxy */ + r = write (session->sock, session->upstream_greeting->str, session->upstream_greeting->len); + /* TODO: handle client's error here */ + if (r > 0) { + session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool, + session->ev_base, session->ctx->proxy_buf_len, + &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session); + session->state = SMTP_PROXY_STATE_PROXY; + } + else { + msg_info ("connection with %s got write error: %s", inet_ntoa (session->client_addr), strerror (errno)); + destroy_session (session->s); + } + } + } + else if (r == -1) { + /* Proxy sent 500 error */ + msg_info ("connection with %s got smtp error for greeting", session->upstream->name); + destroy_session (session->s); + } + } + else { + msg_info ("connection with %s got read error: %s", session->upstream->name, strerror (errno)); + destroy_session (session->s); + } + } + else if (session->state == SMTP_PROXY_STATE_XCLIENT) { + /* Fill greeting buffer with new portion of data */ + r = read (fd, read_buf, sizeof (read_buf) - 1); + if (r > 0) { + g_string_append_len (session->upstream_greeting, read_buf, r); + /* Now search line with 220 */ + r = check_valid_smtp_greeting (session->upstream_greeting); + if (r == 1) { + event_del (&session->upstream_ev); + /* Start direct proxy */ + r = write (session->sock, session->upstream_greeting->str, session->upstream_greeting->len); + /* TODO: handle client's error here */ + if (r > 0) { + session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool, + session->ev_base, session->ctx->proxy_buf_len, + &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session); + session->state = SMTP_PROXY_STATE_PROXY; + } + else { + msg_info ("connection with %s got write error: %s", inet_ntoa (session->client_addr), strerror (errno)); + destroy_session (session->s); + } + } + else if (r == -1) { + /* Proxy sent 500 error */ + msg_info ("connection with %s got smtp error for xclient", session->upstream->name); + destroy_session (session->s); + } + } + } + else { + msg_info ("connection with %s got read event at improper state: %d", session->upstream->name, session->state); + destroy_session (session->s); + } + } + else if (what == EV_WRITE) { + if (session->state == SMTP_PROXY_STATE_GREETING) { + /* Send xclient again */ + r = rspamd_snprintf (read_buf, sizeof (read_buf), "XCLIENT NAME=%s ADDR=%s" CRLF, + session->hostname, inet_ntoa (session->client_addr)); + r = write (session->upstream_sock, read_buf, r); + + if (r < 0 && errno == EAGAIN) { + /* Add write event */ + event_del (&session->upstream_ev); + event_set (&session->upstream_ev, session->upstream_sock, + EV_WRITE, smtp_proxy_greeting_handler, session); + event_base_set (session->ev_base, &session->upstream_ev); + event_add (&session->upstream_ev, NULL); + } + else if (r > 0) { + session->upstream_greeting->len = 0; + session->state = SMTP_PROXY_STATE_XCLIENT; + event_del (&session->upstream_ev); + event_set (&session->upstream_ev, session->upstream_sock, + EV_READ | EV_PERSIST, smtp_proxy_greeting_handler, session); + event_base_set (session->ev_base, &session->upstream_ev); + event_add (&session->upstream_ev, NULL); + } + else { + msg_info ("connection with %s got write error: %s", session->upstream->name, strerror (errno)); + destroy_session (session->s); + } + } + else { + msg_info ("connection with %s got write event at improper state: %d", session->upstream->name, session->state); + destroy_session (session->s); + } + } + else { + /* Timeout */ + msg_info ("connection with %s timed out", session->upstream->name); + destroy_session (session->s); + } +} + static gboolean create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session) { - struct smtp_upstream *selected; - struct sockaddr_un *un; + struct smtp_upstream *selected; + struct sockaddr_un *un; /* Try to select upstream */ selected = (struct smtp_upstream *)get_upstream_round_robin (session->ctx->upstreams, @@ -237,9 +443,14 @@ create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session) } /* Create a proxy for upstream connection */ rspamd_dispatcher_pause (session->dispatcher); - session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool, - session->ev_base, session->ctx->proxy_buf_len, - &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session); + /* First of all get upstream's greeting */ + session->state = SMTP_PROXY_STATE_GREETING; + + event_set (&session->upstream_ev, session->upstream_sock, EV_READ | EV_PERSIST, smtp_proxy_greeting_handler, session); + event_base_set (session->ev_base, &session->upstream_ev); + event_add (&session->upstream_ev, &session->ctx->smtp_timeout); + + session->upstream_greeting = g_string_sized_new (BUFSIZ); return TRUE; } @@ -251,7 +462,7 @@ create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session) static void smtp_delay_handler (gint fd, short what, void *arg) { - struct smtp_proxy_session *session = arg; + struct smtp_proxy_session *session = arg; remove_normal_event (session->s, (event_finalizer_t) event_del, session->delay_timer); @@ -275,9 +486,9 @@ smtp_delay_handler (gint fd, short what, void *arg) static void smtp_make_delay (struct smtp_proxy_session *session) { - struct event *tev; - struct timeval *tv; - gint32 jitter; + struct event *tev; + struct timeval *tv; + gint32 jitter; if (session->ctx->smtp_delay != 0 && session->state == SMTP_PROXY_STATE_DELAY) { tev = memory_pool_alloc (session->pool, sizeof(struct event)); @@ -308,10 +519,10 @@ smtp_make_delay (struct smtp_proxy_session *session) static void smtp_dns_cb (struct rspamd_dns_reply *reply, void *arg) { - struct smtp_proxy_session *session = arg; - gint res = 0; - union rspamd_reply_element *elt; - GList *cur; + struct smtp_proxy_session *session = arg; + gint res = 0; + union rspamd_reply_element *elt; + GList *cur; switch (session->state) { @@ -470,6 +681,7 @@ accept_socket (gint fd, short what, void *arg) session->ctx = ctx; session->resolver = ctx->resolver; session->ev_base = ctx->ev_base; + session->upstream_sock = -1; worker->srv->stat->connections_count++; /* Resolve client's addr */