]> source.dussan.org Git - rspamd.git/commitdiff
* Add support of XCLIENT to the smtp proxy.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 23 Apr 2012 17:40:01 +0000 (21:40 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 23 Apr 2012 17:40:01 +0000 (21:40 +0400)
src/proxy.c
src/proxy.h
src/smtp_proxy.c

index bd8503c714ca7700ae521ce93ed37410d46d1517..b6f1917aee98cc8e40dc80889c1d49b80df42a0f 100644 (file)
@@ -34,14 +34,17 @@ proxy_error_quark (void)
        return g_quark_from_static_string ("proxy-error");
 }
 
-static void
+void
 rspamd_proxy_close (rspamd_proxy_t *proxy)
 {
-       close (proxy->cfd);
-       close (proxy->bfd);
+       if (!proxy->closed) {
+               close (proxy->cfd);
+               close (proxy->bfd);
 
-       event_del (&proxy->client_ev);
-       event_del (&proxy->backend_ev);
+               event_del (&proxy->client_ev);
+               event_del (&proxy->backend_ev);
+               proxy->closed = TRUE;
+       }
 }
 
 static void
@@ -215,8 +218,8 @@ rspamd_create_proxy (gint cfd, gint bfd, memory_pool_t *pool, struct event_base
 
        new = memory_pool_alloc0 (pool, sizeof (rspamd_proxy_t));
 
-       new->cfd = cfd;
-       new->bfd = bfd;
+       new->cfd = dup (cfd);
+       new->bfd = dup (bfd);
        new->pool = pool;
        new->base = base;
        new->bufsize = bufsize;
index 6ebb08d565ddf1a11a303503ba053eefcbfe4454..970957e568ad676c96f9accb8d02c0a0d8020778 100644 (file)
@@ -48,6 +48,7 @@ typedef struct rspamd_proxy_s {
        gint buf_offset;                                                /**< offset to write */
        gpointer user_data;                                             /**< user's data for callbacks */
        struct timeval *tv;                                             /**< timeout for communications */
+       gboolean closed;                                                /**< whether descriptors are closed */
 } rspamd_proxy_t;
 
 /**
@@ -63,4 +64,6 @@ rspamd_proxy_t* rspamd_create_proxy (gint cfd, gint bfd, memory_pool_t *pool,
                struct event_base *base, gsize bufsize, struct timeval *tv,
                dispatcher_err_callback_t err_cb, gpointer ud);
 
+void rspamd_proxy_close (rspamd_proxy_t *proxy);
+
 #endif /* PROXY_H_ */
index 1ada5ff1e9416a3e79b70995979af04c3642fd5d..97e56a95676d174da7fbe08977eee9201cccc233 100644 (file)
@@ -85,7 +85,10 @@ struct smtp_proxy_ctx {
 enum rspamd_smtp_proxy_state {
        SMTP_PROXY_STATE_RESOLVE_REVERSE = 0,
        SMTP_PROXY_STATE_RESOLVE_NORMAL,
-       SMTP_PROXY_STATE_DELAY
+       SMTP_PROXY_STATE_DELAY,
+       SMTP_PROXY_STATE_GREETING,
+       SMTP_PROXY_STATE_XCLIENT,
+       SMTP_PROXY_STATE_PROXY
 };
 
 struct smtp_proxy_session {
@@ -109,10 +112,13 @@ struct smtp_proxy_session {
        struct smtp_upstream *upstream;
 
        struct event *delay_timer;
+       struct event upstream_ev;
 
        gboolean resolved;
        struct rspamd_dns_resolver *resolver;
        struct event_base *ev_base;
+
+       GString *upstream_greeting;
 };
 
 #ifndef HAVE_SA_SIGINFO
@@ -180,13 +186,31 @@ static void
 free_smtp_proxy_session (gpointer arg)
 {
        struct smtp_proxy_session            *session = arg;
+       static const char fatal_smtp_error[] = "521 5.2.1 Internal error" CRLF;
 
        if (session) {
                if (session->dispatcher) {
                        rspamd_remove_dispatcher (session->dispatcher);
                }
 
+               if (session->upstream_greeting) {
+                       g_string_free (session->upstream_greeting, TRUE);
+               }
+
+               if (session->state != SMTP_PROXY_STATE_PROXY) {
+                       /* Send 521 fatal error */
+                       write (session->sock, fatal_smtp_error, sizeof (fatal_smtp_error));
+               }
+
                close (session->sock);
+
+               if (session->proxy) {
+                       rspamd_proxy_close (session->proxy);
+               }
+               if (session->upstream_sock != -1) {
+                       event_del (&session->upstream_ev);
+                       close (session->upstream_sock);
+               }
                memory_pool_delete (session->pool);
                g_slice_free1 (sizeof (struct smtp_proxy_session), session);
        }
@@ -205,11 +229,193 @@ smtp_proxy_err_proxy (GError * err, void *arg)
        destroy_session (session->s);
 }
 
+/**
+ * Check whether SMTP greeting is valid
+ * @param s
+ * @return
+ */
+static gint
+check_valid_smtp_greeting (GString *s)
+{
+       gchar                                                           *p;
+
+       p = s->str + s->len - 1;
+       if (s->len < 6 || (*p != '\n' || *(p - 1) != '\r')) {
+               return 1;
+       }
+       p -= 5;
+
+       while (p >= s->str) {
+               /* It is fast to use memcmp here as we compare only 4 bytes */
+               if (memcmp (p, "220 ", 4) == 0) {
+                       /* Check position */
+                       if (p == s->str || *(p - 1) == '\n') {
+                               return 1;
+                       }
+                       return 0;
+               }
+               else if ((*p == '5' || *p == '4' || *p == '3') &&
+                               g_ascii_isdigit (p[1]) && g_ascii_isdigit (p[2]) && p[3] == ' ') {
+                       return -1;
+               }
+               p --;
+       }
+
+       return 1;
+}
+
+/*
+ * Handle upstream greeting
+ */
+
+static void
+smtp_proxy_greeting_handler (gint fd, short what, void *arg)
+{
+       struct smtp_proxy_session           *session = arg;
+       gint                                                             r;
+       gchar                                                            read_buf[BUFSIZ];
+
+       if (what == EV_READ) {
+               if (session->state == SMTP_PROXY_STATE_GREETING) {
+                       /* Fill greeting buffer with new portion of data */
+                       r = read (fd, read_buf, sizeof (read_buf) - 1);
+                       if (r > 0) {
+                               g_string_append_len (session->upstream_greeting, read_buf, r);
+                               /* Now search line with 220 */
+                               r = check_valid_smtp_greeting (session->upstream_greeting);
+                               if (r == 1) {
+                                       /* Send xclient */
+                                       if (session->ctx->use_xclient) {
+                                               r = rspamd_snprintf (read_buf, sizeof (read_buf), "XCLIENT NAME=%s ADDR=%s" CRLF,
+                                                               session->hostname, inet_ntoa (session->client_addr));
+                                               r = write (session->upstream_sock, read_buf, r);
+
+                                               if (r < 0 && errno == EAGAIN) {
+                                                       /* Add write event */
+                                                       event_del (&session->upstream_ev);
+                                                       event_set (&session->upstream_ev, session->upstream_sock,
+                                                                       EV_WRITE, smtp_proxy_greeting_handler, session);
+                                                       event_base_set (session->ev_base, &session->upstream_ev);
+                                                       event_add (&session->upstream_ev, NULL);
+                                               }
+                                               else if (r > 0) {
+                                                       session->upstream_greeting->len = 0;
+                                                       session->state = SMTP_PROXY_STATE_XCLIENT;
+                                               }
+                                               else {
+                                                       msg_info ("connection with %s got write error: %s", inet_ntoa (session->client_addr), strerror (errno));
+                                                       destroy_session (session->s);
+                                               }
+                                       }
+                                       else {
+                                               event_del (&session->upstream_ev);
+                                               /* Start direct proxy */
+                                               r = write (session->sock, session->upstream_greeting->str, session->upstream_greeting->len);
+                                               /* TODO: handle client's error here */
+                                               if (r > 0) {
+                                                       session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool,
+                                                               session->ev_base, session->ctx->proxy_buf_len,
+                                                               &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session);
+                                                       session->state = SMTP_PROXY_STATE_PROXY;
+                                               }
+                                               else {
+                                                       msg_info ("connection with %s got write error: %s", inet_ntoa (session->client_addr), strerror (errno));
+                                                       destroy_session (session->s);
+                                               }
+                                       }
+                               }
+                               else if (r == -1) {
+                                       /* Proxy sent 500 error */
+                                       msg_info ("connection with %s got smtp error for greeting", session->upstream->name);
+                                       destroy_session (session->s);
+                               }
+                       }
+                       else {
+                               msg_info ("connection with %s got read error: %s", session->upstream->name, strerror (errno));
+                               destroy_session (session->s);
+                       }
+               }
+               else if (session->state == SMTP_PROXY_STATE_XCLIENT) {
+                       /* Fill greeting buffer with new portion of data */
+                       r = read (fd, read_buf, sizeof (read_buf) - 1);
+                       if (r > 0) {
+                               g_string_append_len (session->upstream_greeting, read_buf, r);
+                               /* Now search line with 220 */
+                               r = check_valid_smtp_greeting (session->upstream_greeting);
+                               if (r == 1) {
+                                       event_del (&session->upstream_ev);
+                                       /* Start direct proxy */
+                                       r = write (session->sock, session->upstream_greeting->str, session->upstream_greeting->len);
+                                       /* TODO: handle client's error here */
+                                       if (r > 0) {
+                                               session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool,
+                                                               session->ev_base, session->ctx->proxy_buf_len,
+                                                               &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session);
+                                               session->state = SMTP_PROXY_STATE_PROXY;
+                                       }
+                                       else {
+                                               msg_info ("connection with %s got write error: %s", inet_ntoa (session->client_addr), strerror (errno));
+                                               destroy_session (session->s);
+                                       }
+                               }
+                               else if (r == -1) {
+                                       /* Proxy sent 500 error */
+                                       msg_info ("connection with %s got smtp error for xclient", session->upstream->name);
+                                       destroy_session (session->s);
+                               }
+                       }
+               }
+               else {
+                       msg_info ("connection with %s got read event at improper state: %d", session->upstream->name, session->state);
+                       destroy_session (session->s);
+               }
+       }
+       else if (what == EV_WRITE) {
+               if (session->state == SMTP_PROXY_STATE_GREETING) {
+                       /* Send xclient again */
+                       r = rspamd_snprintf (read_buf, sizeof (read_buf), "XCLIENT NAME=%s ADDR=%s" CRLF,
+                                       session->hostname, inet_ntoa (session->client_addr));
+                       r = write (session->upstream_sock, read_buf, r);
+
+                       if (r < 0 && errno == EAGAIN) {
+                               /* Add write event */
+                               event_del (&session->upstream_ev);
+                               event_set (&session->upstream_ev, session->upstream_sock,
+                                               EV_WRITE, smtp_proxy_greeting_handler, session);
+                               event_base_set (session->ev_base, &session->upstream_ev);
+                               event_add (&session->upstream_ev, NULL);
+                       }
+                       else if (r > 0) {
+                               session->upstream_greeting->len = 0;
+                               session->state = SMTP_PROXY_STATE_XCLIENT;
+                               event_del (&session->upstream_ev);
+                               event_set (&session->upstream_ev, session->upstream_sock,
+                                               EV_READ | EV_PERSIST, smtp_proxy_greeting_handler, session);
+                               event_base_set (session->ev_base, &session->upstream_ev);
+                               event_add (&session->upstream_ev, NULL);
+                       }
+                       else {
+                               msg_info ("connection with %s got write error: %s", session->upstream->name, strerror (errno));
+                               destroy_session (session->s);
+                       }
+               }
+               else {
+                       msg_info ("connection with %s got write event at improper state: %d", session->upstream->name, session->state);
+                       destroy_session (session->s);
+               }
+       }
+       else {
+               /* Timeout */
+               msg_info ("connection with %s timed out", session->upstream->name);
+               destroy_session (session->s);
+       }
+}
+
 static gboolean
 create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session)
 {
-       struct smtp_upstream              *selected;
-       struct sockaddr_un                *un;
+       struct smtp_upstream                    *selected;
+       struct sockaddr_un                      *un;
 
        /* Try to select upstream */
        selected = (struct smtp_upstream *)get_upstream_round_robin (session->ctx->upstreams,
@@ -237,9 +443,14 @@ create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session)
        }
        /* Create a proxy for upstream connection */
        rspamd_dispatcher_pause (session->dispatcher);
-       session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool,
-                       session->ev_base, session->ctx->proxy_buf_len,
-                       &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session);
+       /* First of all get upstream's greeting */
+       session->state = SMTP_PROXY_STATE_GREETING;
+
+       event_set (&session->upstream_ev, session->upstream_sock, EV_READ | EV_PERSIST, smtp_proxy_greeting_handler, session);
+       event_base_set (session->ev_base, &session->upstream_ev);
+       event_add (&session->upstream_ev, &session->ctx->smtp_timeout);
+
+       session->upstream_greeting = g_string_sized_new (BUFSIZ);
 
        return TRUE;
 }
@@ -251,7 +462,7 @@ create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session)
 static void
 smtp_delay_handler (gint fd, short what, void *arg)
 {
-       struct smtp_proxy_session *session = arg;
+       struct smtp_proxy_session                               *session = arg;
 
        remove_normal_event (session->s, (event_finalizer_t) event_del,
                        session->delay_timer);
@@ -275,9 +486,9 @@ smtp_delay_handler (gint fd, short what, void *arg)
 static void
 smtp_make_delay (struct smtp_proxy_session *session)
 {
-       struct event *tev;
-       struct timeval *tv;
-       gint32 jitter;
+       struct event                                                    *tev;
+       struct timeval                                                  *tv;
+       gint32                                                                   jitter;
 
        if (session->ctx->smtp_delay != 0 && session->state == SMTP_PROXY_STATE_DELAY) {
                tev = memory_pool_alloc (session->pool, sizeof(struct event));
@@ -308,10 +519,10 @@ smtp_make_delay (struct smtp_proxy_session *session)
 static void
 smtp_dns_cb (struct rspamd_dns_reply *reply, void *arg)
 {
-       struct smtp_proxy_session *session = arg;
-       gint res = 0;
-       union rspamd_reply_element *elt;
-       GList *cur;
+       struct smtp_proxy_session                                               *session = arg;
+       gint                                                                                     res = 0;
+       union rspamd_reply_element                                              *elt;
+       GList                                                                                   *cur;
 
        switch (session->state)
        {
@@ -470,6 +681,7 @@ accept_socket (gint fd, short what, void *arg)
        session->ctx = ctx;
        session->resolver = ctx->resolver;
        session->ev_base = ctx->ev_base;
+       session->upstream_sock = -1;
        worker->srv->stat->connections_count++;
 
        /* Resolve client's addr */