]> source.dussan.org Git - rspamd.git/commitdiff
* Add initial implementation of proxy object
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 7 Mar 2012 16:47:55 +0000 (20:47 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 7 Mar 2012 16:47:55 +0000 (20:47 +0400)
* Add simple smtp proxy worker

CMakeLists.txt
lib/CMakeLists.txt
src/proxy.c [new file with mode: 0644]
src/proxy.h [new file with mode: 0644]
src/smtp.c
src/smtp.h
src/smtp_proto.c
src/smtp_proto.h
src/smtp_proxy.c [new file with mode: 0644]
src/smtp_utils.c
src/smtp_utils.h

index 758081459a457286a559e19a471e2af7b3881595..1ac5fded740730e0941c6128a2792e56528876c7 100644 (file)
@@ -848,6 +848,7 @@ SET(RSPAMDSRC       src/modules.c
                                src/main.c
                                src/map.c
                                src/smtp.c
+                               src/smtp_proxy.c
                                src/worker.c)
 
 SET(TOKENIZERSSRC  src/tokenizers/tokenizers.c
@@ -864,7 +865,7 @@ SET(PLUGINSSRC      src/plugins/surbl.c
                                src/plugins/spf.c)
                                
 SET(MODULES_LIST surbl regexp chartable fuzzy_check spf)
-SET(WORKERS_LIST normal controller smtp lmtp fuzzy keystorage)
+SET(WORKERS_LIST normal controller smtp smtp_proxy lmtp fuzzy keystorage)
 
 AddModules(MODULES_LIST WORKERS_LIST)
 
index 0355fe3f910c36088cb30b9d10ef2b9010c582ad..feeab47ca79a9acb88b96f4baefaefe605e21bbb 100644 (file)
@@ -63,6 +63,7 @@ SET(RSPAMDLIBSRC
                                ../src/message.c
                                ../src/printf.c
                                ../src/protocol.c
+                               ../src/proxy.c
                                ../src/radix.c
                                ../src/settings.c
                                ../src/spf.c
diff --git a/src/proxy.c b/src/proxy.c
new file mode 100644 (file)
index 0000000..df1ccfd
--- /dev/null
@@ -0,0 +1,238 @@
+/* Copyright (c) 2010-2012, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *       * Redistributions of source code must retain the above copyright
+ *         notice, this list of conditions and the following disclaimer.
+ *       * Redistributions in binary form must reproduce the above copyright
+ *         notice, this list of conditions and the following disclaimer in the
+ *         documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "main.h"
+#include "proxy.h"
+
+static void rspamd_proxy_backend_handler (gint fd, gshort what, gpointer data);
+static void rspamd_proxy_client_handler (gint fd, gshort what, gpointer data);
+
+static inline                   GQuark
+proxy_error_quark (void)
+{
+       return g_quark_from_static_string ("proxy-error");
+}
+
+static void
+rspamd_proxy_close (rspamd_proxy_t *proxy)
+{
+       close (proxy->cfd);
+       close (proxy->bfd);
+
+       event_del (&proxy->client_ev);
+       event_del (&proxy->backend_ev);
+}
+
+static void
+rspamd_proxy_client_handler (gint fd, gshort what, gpointer data)
+{
+       rspamd_proxy_t                                          *proxy = data;
+       gint                                                             r;
+       GError                                                          *err = NULL;
+
+       if (what == EV_READ) {
+               /* Got data from client */
+               event_del (&proxy->client_ev);
+               r = read (proxy->cfd, proxy->buf, proxy->bufsize);
+               if (r > 0) {
+                       /* Write this buffer to backend */
+                       proxy->read_len = r;
+                       proxy->buf_offset = 0;
+                       event_del (&proxy->backend_ev);
+                       event_set (&proxy->backend_ev, proxy->bfd, EV_WRITE, rspamd_proxy_backend_handler, proxy);
+                       event_add (&proxy->backend_ev, proxy->tv);
+               }
+               else {
+                       /* Error case or zero reply */
+                       if (r < 0) {
+                               /* Error case */
+                               g_set_error (&err, proxy_error_quark(), r, "Client read error: %s", strerror (errno));
+                               rspamd_proxy_close (proxy);
+                               proxy->err_cb (err, proxy->user_data);
+                       }
+                       else {
+                               /* Client closes connection */
+                               rspamd_proxy_close (proxy);
+                               proxy->err_cb (NULL, proxy->user_data);
+                       }
+               }
+       }
+       else if (what == EV_WRITE) {
+               /* Can write to client */
+               r = write (proxy->cfd, proxy->buf + proxy->buf_offset, proxy->read_len - proxy->buf_offset);
+               if (r > 0) {
+                       /* We wrote something */
+                       proxy->buf_offset +=r;
+                       if (proxy->buf_offset == proxy->read_len) {
+                               /* We wrote everything */
+                               event_del (&proxy->client_ev);
+                               event_set (&proxy->client_ev, proxy->bfd, EV_READ, rspamd_proxy_client_handler, proxy);
+                               event_add (&proxy->client_ev, proxy->tv);
+                               event_del (&proxy->backend_ev);
+                               event_set (&proxy->backend_ev, proxy->bfd, EV_READ, rspamd_proxy_client_handler, proxy);
+                               event_add (&proxy->backend_ev, proxy->tv);
+                       }
+                       else {
+                               /* Plan another write event */
+                               event_add (&proxy->backend_ev, proxy->tv);
+                       }
+               }
+               else {
+                       /* Error case or zero reply */
+                       if (r < 0) {
+                               /* Error case */
+                               g_set_error (&err, proxy_error_quark(), r, "Client write error: %s", strerror (errno));
+                               rspamd_proxy_close (proxy);
+                               proxy->err_cb (err, proxy->user_data);
+                       }
+                       else {
+                               /* Client closes connection */
+                               rspamd_proxy_close (proxy);
+                               proxy->err_cb (NULL, proxy->user_data);
+                       }
+               }
+       }
+       else {
+               /* Got timeout */
+               g_set_error (&err, proxy_error_quark(), ETIMEDOUT, "Client timeout");
+               rspamd_proxy_close (proxy);
+               proxy->err_cb (err, proxy->user_data);
+       }
+}
+
+static void
+rspamd_proxy_backend_handler (gint fd, gshort what, gpointer data)
+{
+       rspamd_proxy_t                                          *proxy = data;
+       gint                                                             r;
+       GError                                                          *err;
+
+       if (what == EV_READ) {
+               /* Got data from backend */
+               event_del (&proxy->backend_ev);
+               r = read (proxy->cfd, proxy->buf, proxy->bufsize);
+               if (r > 0) {
+                       /* Write this buffer to client */
+                       proxy->read_len = r;
+                       proxy->buf_offset = 0;
+                       event_del (&proxy->client_ev);
+                       event_set (&proxy->client_ev, proxy->bfd, EV_WRITE, rspamd_proxy_client_handler, proxy);
+                       event_add (&proxy->client_ev, proxy->tv);
+               }
+               else {
+                       /* Error case or zero reply */
+                       if (r < 0) {
+                               /* Error case */
+                               g_set_error (&err, proxy_error_quark(), r, "Backend read error: %s", strerror (errno));
+                               rspamd_proxy_close (proxy);
+                               proxy->err_cb (err, proxy->user_data);
+                       }
+                       else {
+                               /* Client closes connection */
+                               rspamd_proxy_close (proxy);
+                               proxy->err_cb (NULL, proxy->user_data);
+                       }
+               }
+       }
+       else if (what == EV_WRITE) {
+               /* Can write to backend */
+               r = write (proxy->bfd, proxy->buf + proxy->buf_offset, proxy->read_len - proxy->buf_offset);
+               if (r > 0) {
+                       /* We wrote something */
+                       proxy->buf_offset +=r;
+                       if (proxy->buf_offset == proxy->read_len) {
+                               /* We wrote everything */
+                               event_del (&proxy->backend_ev);
+                               event_set (&proxy->backend_ev, proxy->bfd, EV_READ, rspamd_proxy_client_handler, proxy);
+                               event_add (&proxy->backend_ev, proxy->tv);
+                               event_del (&proxy->client_ev);
+                               event_set (&proxy->client_ev, proxy->bfd, EV_READ, rspamd_proxy_client_handler, proxy);
+                               event_add (&proxy->client_ev, proxy->tv);
+                       }
+                       else {
+                               /* Plan another write event */
+                               event_add (&proxy->backend_ev, proxy->tv);
+                       }
+               }
+               else {
+                       /* Error case or zero reply */
+                       if (r < 0) {
+                               /* Error case */
+                               g_set_error (&err, proxy_error_quark(), r, "Backend write error: %s", strerror (errno));
+                               rspamd_proxy_close (proxy);
+                               proxy->err_cb (err, proxy->user_data);
+                       }
+                       else {
+                               /* Client closes connection */
+                               rspamd_proxy_close (proxy);
+                               proxy->err_cb (NULL, proxy->user_data);
+                       }
+               }
+       }
+       else {
+               /* Got timeout */
+               g_set_error (&err, proxy_error_quark(), ETIMEDOUT, "Client timeout");
+               rspamd_proxy_close (proxy);
+               proxy->err_cb (err, proxy->user_data);
+       }
+}
+
+/**
+ * Create new proxy between cfd and bfd
+ * @param cfd client's socket
+ * @param bfd backend's socket
+ * @param bufsize size of exchange buffer
+ * @param err_cb callback for erorrs or completing
+ * @param ud user data for callback
+ * @return new proxy object
+ */
+rspamd_proxy_t*
+rspamd_create_proxy (gint cfd, gint bfd, memory_pool_t *pool, struct event_base *base,
+               gsize bufsize, struct timeval *tv, dispatcher_err_callback_t err_cb, gpointer ud)
+{
+       rspamd_proxy_t                                          *new;
+
+       new = memory_pool_alloc0 (pool, sizeof (rspamd_proxy_t));
+
+       new->cfd = cfd;
+       new->bfd = bfd;
+       new->pool = pool;
+       new->base = base;
+       new->bufsize = bufsize;
+       new->buf = memory_pool_alloc (pool, bufsize);
+       new->err_cb = err_cb;
+       new->user_data = ud;
+       new->tv = tv;
+
+       /* Set client's and backend's interfaces to read events */
+       event_set (&new->client_ev, new->cfd, EV_READ, rspamd_proxy_client_handler, new);
+       event_base_set (new->base, &new->client_ev);
+       event_add (&new->client_ev, new->tv);
+
+       event_set (&new->backend_ev, new->bfd, EV_READ, rspamd_proxy_backend_handler, new);
+       event_base_set (new->base, &new->backend_ev);
+       event_add (&new->backend_ev, new->tv);
+
+       return new;
+}
diff --git a/src/proxy.h b/src/proxy.h
new file mode 100644 (file)
index 0000000..6ebb08d
--- /dev/null
@@ -0,0 +1,66 @@
+/* Copyright (c) 2010-2012, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *       * Redistributions of source code must retain the above copyright
+ *         notice, this list of conditions and the following disclaimer.
+ *       * Redistributions in binary form must reproduce the above copyright
+ *         notice, this list of conditions and the following disclaimer in the
+ *         documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#ifndef PROXY_H_
+#define PROXY_H_
+
+#include "config.h"
+#include "buffer.h"
+
+/**
+ * @file proxy.h
+ * Direct asynchronous proxy implementation
+ */
+
+typedef struct rspamd_proxy_s {
+       struct event client_ev;                                 /**< event for client's communication */
+       struct event backend_ev;                                /**< event for backend communication */
+       struct event_base *base;                                /**< base for event operations */
+       memory_pool_t *pool;                                    /**< memory pool */
+       dispatcher_err_callback_t err_cb;               /**< error callback     */
+       struct event_base *ev_base;                             /**< event base */
+       gint cfd;                                                               /**< client's socket */
+       gint bfd;                                                               /**< backend's socket */
+       guint8 *buf;                                                    /**< exchange buffer */
+       gsize bufsize;                                                  /**< buffer size */
+       gint read_len;                                                  /**< read length */
+       gint buf_offset;                                                /**< offset to write */
+       gpointer user_data;                                             /**< user's data for callbacks */
+       struct timeval *tv;                                             /**< timeout for communications */
+} rspamd_proxy_t;
+
+/**
+ * Create new proxy between cfd and bfd
+ * @param cfd client's socket
+ * @param bfd backend's socket
+ * @param bufsize size of exchange buffer
+ * @param err_cb callback for erorrs or completing
+ * @param ud user data for callback
+ * @return new proxy object
+ */
+rspamd_proxy_t* rspamd_create_proxy (gint cfd, gint bfd, memory_pool_t *pool,
+               struct event_base *base, gsize bufsize, struct timeval *tv,
+               dispatcher_err_callback_t err_cb, gpointer ud);
+
+#endif /* PROXY_H_ */
index 605a680cd570d76403231aaa1c05770f023ee743..0c10762bd07872f7fae7bd018d4bcc9604f9bef8 100644 (file)
@@ -45,9 +45,6 @@
 
 #define DEFAULT_REJECT_MESSAGE "450 4.5.0 Spam message rejected"
 
-#define XCLIENT_HOST_UNAVAILABLE "[UNAVAILABLE]"
-#define XCLIENT_HOST_TEMPFAIL "[TEMPUNAVAIL]"
-
 static gboolean smtp_write_socket (void *arg);
 
 static sig_atomic_t                    wanna_die = 0;
@@ -397,7 +394,7 @@ smtp_read_socket (f_str_t * in, void *arg)
                case SMTP_STATE_RESOLVE_REVERSE:
                case SMTP_STATE_RESOLVE_NORMAL:
                case SMTP_STATE_DELAY:
-                       session->error = make_smtp_error (session, 550, "%s Improper use of SMTP command pipelining", "5.5.0");
+                       session->error = make_smtp_error (session->pool, 550, "%s Improper use of SMTP command pipelining", "5.5.0");
                        session->state = SMTP_STATE_ERROR;
                        break;
                case SMTP_STATE_GREETING:
@@ -444,7 +441,7 @@ smtp_read_socket (f_str_t * in, void *arg)
                        rspamd_dispatcher_pause (session->dispatcher);
                        break;
                default:
-                       session->error = make_smtp_error (session, 550, "%s Internal error", "5.5.0");
+                       session->error = make_smtp_error (session->pool, 550, "%s Internal error", "5.5.0");
                        session->state = SMTP_STATE_ERROR;
                        break;
        }
@@ -820,61 +817,6 @@ parse_smtp_banner (struct smtp_worker_ctx *ctx, const gchar *line)
        }
 }
 
-static gboolean
-parse_upstreams_line (struct smtp_worker_ctx *ctx, const gchar *line)
-{
-       gchar                           **strv, *p, *t, *tt, *err_str;
-       guint32                         num, i;
-       struct smtp_upstream           *cur;
-       gchar                           resolved_path[PATH_MAX];
-       
-       strv = g_strsplit_set (line, ",; ", -1);
-       num = g_strv_length (strv);
-
-       if (num >= MAX_UPSTREAM) {
-               msg_err ("cannot define %d upstreams %d is max", num, MAX_UPSTREAM);
-               return FALSE;
-       }
-
-       for (i = 0; i < num; i ++) {
-               p = strv[i];
-               cur = &ctx->upstreams[ctx->upstream_num];
-               if ((t = strrchr (p, ':')) != NULL && (tt = strchr (p, ':')) != t) {
-                       /* Assume that after last `:' we have weigth */
-                       *t = '\0';
-                       t ++;
-                       errno = 0;
-                       cur->up.priority = strtoul (t, &err_str, 10);
-                       if (errno != 0 || (err_str && *err_str != '\0')) {
-                               msg_err ("cannot convert weight: %s, %s", t, strerror (errno));
-                               g_strfreev (strv);
-                               return FALSE;
-                       }
-               }
-               if (*p == '/') {
-                       cur->is_unix = TRUE;
-                       if (realpath (p, resolved_path) == NULL) {
-                               msg_err ("cannot resolve path: %s", resolved_path);
-                               g_strfreev (strv);
-                               return FALSE;
-                       }
-                       cur->name = memory_pool_strdup (ctx->pool, resolved_path);
-                       ctx->upstream_num ++;
-               }
-               else {
-                       if (! parse_host_port (p, &cur->addr, &cur->port)) {
-                               g_strfreev (strv);
-                               return FALSE;
-                       }
-                       cur->name = memory_pool_strdup (ctx->pool, p);
-                       ctx->upstream_num ++;
-               }
-       }
-
-       g_strfreev (strv);
-       return TRUE;
-}
-
 static void
 make_capabilities (struct smtp_worker_ctx *ctx, const gchar *line)
 {
@@ -973,7 +915,7 @@ config_smtp_worker (struct rspamd_worker *worker)
 
        /* Init upstreams */
        if ((value = ctx->upstreams_str) != NULL) {
-               if (!parse_upstreams_line (ctx, value)) {
+               if (!parse_upstreams_line (ctx->pool, ctx->upstreams, value, &ctx->upstream_num)) {
                        return FALSE;
                }
        }
index d2c14f924e6d7b6586a17d65b5d0cae4f64eae84..714fb35232c965a7cc1ed392854d4a95545c2d79 100644 (file)
@@ -4,19 +4,10 @@
 #include "config.h"
 #include "main.h"
 #include "upstream.h"
-
-struct smtp_upstream {
-       struct upstream up;
-       
-       const gchar *name;
-       struct in_addr addr;
-       guint16 port;
-       gboolean is_unix;
-}; 
+#include "smtp_utils.h"
 
 struct rspamd_dns_resolver;
 
-#define MAX_UPSTREAM 128
 #define DEFAULT_MAX_ERRORS 10
 
 enum rspamd_smtp_stage {
@@ -29,8 +20,8 @@ enum rspamd_smtp_stage {
 };
 
 struct smtp_worker_ctx {
-       struct smtp_upstream upstreams[MAX_UPSTREAM];
-       size_t upstream_num;
+       struct smtp_upstream upstreams[MAX_SMTP_UPSTREAMS];
+       gsize upstream_num;
        gchar *upstreams_str;
        
        memory_pool_t *pool;
index d443a1fb3644d601dad4b4fc3d084e09e2d478c8..c8f6fa556ef5325ae2b9724150075f42e5b0d348 100644 (file)
@@ -31,7 +31,7 @@
 #include "smtp_utils.h"
 
 gchar                           *
-make_smtp_error (struct smtp_session *session, gint error_code, const gchar *format, ...)
+make_smtp_error (memory_pool_t *pool, gint error_code, const gchar *format, ...)
 {
        va_list                         vp;
        gchar                           *result = NULL, *p;
@@ -42,7 +42,7 @@ make_smtp_error (struct smtp_session *session, gint error_code, const gchar *for
        va_end (vp);
        va_start (vp, format);
        len += sizeof ("65535 ") + sizeof (CRLF) - 1;
-       result = memory_pool_alloc (session->pool, len);
+       result = memory_pool_alloc (pool, len);
        p = result + rspamd_snprintf (result, len, "%d ", error_code);
        p = rspamd_vsnprintf (p, len - (p - result), format, vp);
        *p++ = CR; *p++ = LF; *p = '\0';
index 2f2904192fb2625f7545bd37c7df17287b3879f7..efb09bf3570f10a4476c385311ffe33ac4d28783 100644 (file)
 
 #define DATA_END_TRAILER "." CRLF
 
+#define XCLIENT_HOST_UNAVAILABLE "[UNAVAILABLE]"
+#define XCLIENT_HOST_TEMPFAIL "[TEMPUNAVAIL]"
+
+#define MAX_SMTP_UPSTREAMS 128
 
 struct smtp_command {
        enum {
@@ -39,7 +43,7 @@ struct smtp_command {
 /*
  * Generate SMTP error message
  */
-gchar * make_smtp_error (struct smtp_session *session, gint error_code, const gchar *format, ...);
+gchar * make_smtp_error (memory_pool_t *pool, gint error_code, const gchar *format, ...);
 
 /*
  * Parse a single SMTP command
diff --git a/src/smtp_proxy.c b/src/smtp_proxy.c
new file mode 100644 (file)
index 0000000..fa9da97
--- /dev/null
@@ -0,0 +1,581 @@
+/* Copyright (c) 2010-2012, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *       * Redistributions of source code must retain the above copyright
+ *         notice, this list of conditions and the following disclaimer.
+ *       * Redistributions in binary form must reproduce the above copyright
+ *         notice, this list of conditions and the following disclaimer in the
+ *         documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "main.h"
+#include "cfg_file.h"
+#include "cfg_xml.h"
+#include "util.h"
+#include "smtp_proto.h"
+#include "map.h"
+#include "message.h"
+#include "settings.h"
+#include "dns.h"
+#include "upstream.h"
+#include "proxy.h"
+
+/*
+ * SMTP proxy is a simple smtp proxy worker for dns resolving and
+ * load balancing. It uses XCLIENT command and is designed for MTA
+ * that supports that (postfix and exim).
+ */
+
+/* Upstream timeouts */
+#define DEFAULT_UPSTREAM_ERROR_TIME 10
+#define DEFAULT_UPSTREAM_DEAD_TIME 300
+#define DEFAULT_UPSTREAM_MAXERRORS 10
+
+static sig_atomic_t                    wanna_die = 0;
+
+/* Init functions */
+gpointer init_smtp_proxy ();
+void start_smtp_proxy (struct rspamd_worker *worker);
+
+worker_t smtp_proxy_worker = {
+       "smtp_proxy",                           /* Name */
+       init_smtp_proxy,                        /* Init function */
+       start_smtp_proxy,                       /* Start function */
+       TRUE,                                           /* Has socket */
+       FALSE,                                          /* Non unique */
+       FALSE,                                          /* Non threaded */
+       TRUE                                            /* Killable */
+};
+
+struct smtp_proxy_ctx {
+       struct smtp_upstream upstreams[MAX_SMTP_UPSTREAMS];
+       size_t upstream_num;
+       gchar *upstreams_str;
+
+       memory_pool_t *pool;
+       guint32 smtp_delay;
+       guint32 delay_jitter;
+       guint32 smtp_timeout_raw;
+       struct timeval smtp_timeout;
+
+       struct rspamd_dns_resolver *resolver;
+       struct event_base *ev_base;
+};
+
+enum rspamd_smtp_proxy_state {
+       SMTP_PROXY_STATE_RESOLVE_REVERSE = 0,
+       SMTP_PROXY_STATE_RESOLVE_NORMAL,
+       SMTP_PROXY_STATE_DELAY
+};
+
+struct smtp_proxy_session {
+       struct smtp_proxy_ctx *ctx;
+       memory_pool_t *pool;
+
+       enum rspamd_smtp_proxy_state state;
+       struct rspamd_worker *worker;
+       struct in_addr client_addr;
+       gchar *hostname;
+       gchar *error;
+       gchar *temp_name;
+       gint sock;
+       gint upstream_sock;
+
+       struct rspamd_async_session *s;
+       rspamd_io_dispatcher_t *dispatcher;
+
+       rspamd_proxy_t *proxy;
+
+       struct smtp_upstream *upstream;
+
+       struct event *delay_timer;
+
+       gboolean resolved;
+       struct rspamd_dns_resolver *resolver;
+       struct event_base *ev_base;
+};
+
+#ifndef HAVE_SA_SIGINFO
+static void
+sig_handler (gint signo)
+#else
+static void
+sig_handler (gint signo, siginfo_t *info, void *unused)
+#endif
+{
+       struct timeval                  tv;
+
+       switch (signo) {
+       case SIGINT:
+       case SIGTERM:
+               if (!wanna_die) {
+                       wanna_die = 1;
+                       tv.tv_sec = 0;
+                       tv.tv_usec = 0;
+                       event_loopexit (&tv);
+
+#ifdef WITH_GPERF_TOOLS
+                       ProfilerStop ();
+#endif
+               }
+               break;
+       }
+}
+
+/*
+ * Config reload is designed by sending sigusr to active workers and pending shutdown of them
+ */
+static void
+sigusr2_handler (gint fd, short what, void *arg)
+{
+       struct rspamd_worker           *worker = (struct rspamd_worker *)arg;
+       /* Do not accept new connections, preparing to end worker's process */
+       struct timeval                  tv;
+       if (! wanna_die) {
+               tv.tv_sec = SOFT_SHUTDOWN_TIME;
+               tv.tv_usec = 0;
+               event_del (&worker->sig_ev_usr1);
+               event_del (&worker->sig_ev_usr2);
+               event_del (&worker->bind_ev);
+               msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+               event_loopexit (&tv);
+       }
+       return;
+}
+
+/*
+ * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them
+ */
+static void
+sigusr1_handler (gint fd, short what, void *arg)
+{
+       struct rspamd_worker           *worker = (struct rspamd_worker *) arg;
+
+       reopen_log (worker->srv->logger);
+
+       return;
+}
+
+static void
+free_smtp_proxy_session (gpointer arg)
+{
+       struct smtp_proxy_session            *session = arg;
+
+       if (session) {
+               if (session->dispatcher) {
+                       rspamd_remove_dispatcher (session->dispatcher);
+               }
+
+               close (session->sock);
+               memory_pool_delete (session->pool);
+               g_free (session);
+       }
+}
+
+static void
+smtp_proxy_err_proxy (GError * err, void *arg)
+{
+       struct smtp_proxy_session            *session = arg;
+
+       msg_info ("abnormally closing connection, error: %s", err->message);
+       /* Free buffers */
+       destroy_session (session->s);
+}
+
+static gboolean
+create_smtp_proxy_upstream_connection (struct smtp_proxy_session *session)
+{
+       struct smtp_upstream              *selected;
+       struct sockaddr_un                *un;
+
+       /* Try to select upstream */
+       selected = (struct smtp_upstream *)get_upstream_round_robin (session->ctx->upstreams,
+                       session->ctx->upstream_num, sizeof (struct smtp_upstream),
+                       time (NULL), DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS);
+       if (selected == NULL) {
+               msg_err ("no upstreams suitable found");
+               return FALSE;
+       }
+
+       session->upstream = selected;
+
+       /* Now try to create socket */
+       if (selected->is_unix) {
+               un = alloca (sizeof (struct sockaddr_un));
+               session->upstream_sock = make_unix_socket (selected->name, un, FALSE, TRUE);
+       }
+       else {
+               session->upstream_sock = make_tcp_socket (&selected->addr, selected->port, FALSE, TRUE);
+       }
+       if (session->upstream_sock == -1) {
+               msg_err ("cannot make a connection to %s", selected->name);
+               upstream_fail (&selected->up, time (NULL));
+               return FALSE;
+       }
+       /* Create a proxy for upstream connection */
+       rspamd_dispatcher_pause (session->dispatcher);
+       session->proxy = rspamd_create_proxy (session->sock, session->upstream_sock, session->pool,
+                       session->ev_base, BUFSIZ, &session->ctx->smtp_timeout, smtp_proxy_err_proxy, session);
+
+       return TRUE;
+}
+
+/* Resolving and delay handlers */
+/*
+ * Return from a delay
+ */
+static void
+smtp_delay_handler (gint fd, short what, void *arg)
+{
+       struct smtp_proxy_session *session = arg;
+
+       remove_normal_event (session->s, (event_finalizer_t) event_del,
+                       session->delay_timer);
+       if (session->state == SMTP_PROXY_STATE_DELAY) {
+               /* TODO: Create upstream connection here */
+               create_smtp_proxy_upstream_connection (session);
+       }
+       else {
+               /* TODO: Write error here */
+               if (rspamd_dispatcher_write (session->dispatcher,
+                               make_smtp_error (session->pool, 550, "%s Improper use of SMTP command pipelining", "5.5.0"),
+                               0, FALSE, TRUE)) {
+                       destroy_session (session->s);
+               }
+       }
+}
+
+/*
+ * Make delay for a client
+ */
+static void
+smtp_make_delay (struct smtp_proxy_session *session)
+{
+       struct event *tev;
+       struct timeval *tv;
+       gint32 jitter;
+
+       if (session->ctx->smtp_delay != 0 && session->state == SMTP_PROXY_STATE_DELAY) {
+               tev = memory_pool_alloc (session->pool, sizeof(struct event));
+               tv = memory_pool_alloc (session->pool, sizeof(struct timeval));
+               if (session->ctx->delay_jitter != 0) {
+                       jitter = g_random_int_range (0, session->ctx->delay_jitter);
+                       msec_to_tv (session->ctx->smtp_delay + jitter, tv);
+               }
+               else {
+                       msec_to_tv (session->ctx->smtp_delay, tv);
+               }
+
+               evtimer_set (tev, smtp_delay_handler, session);
+               evtimer_add (tev, tv);
+               register_async_event (session->s, (event_finalizer_t) event_del, tev,
+                               g_quark_from_static_string ("smtp proxy"));
+               session->delay_timer = tev;
+       }
+       else if (session->state == SMTP_PROXY_STATE_DELAY) {
+               /* TODO: Create upstream connection here */
+               create_smtp_proxy_upstream_connection (session);
+       }
+}
+
+/*
+ * Handle DNS replies
+ */
+static void
+smtp_dns_cb (struct rspamd_dns_reply *reply, void *arg)
+{
+       struct smtp_proxy_session *session = arg;
+       gint res = 0;
+       union rspamd_reply_element *elt;
+       GList *cur;
+
+       switch (session->state)
+       {
+       case SMTP_PROXY_STATE_RESOLVE_REVERSE:
+               /* Parse reverse reply and start resolve of this ip */
+               if (reply->code != DNS_RC_NOERROR) {
+                       rspamd_conditional_debug (rspamd_main->logger,
+                                       session->client_addr.s_addr, __FUNCTION__, "DNS error: %s",
+                                       dns_strerror (reply->code));
+
+                       if (reply->code == DNS_RC_NXDOMAIN) {
+                               session->hostname = memory_pool_strdup (session->pool,
+                                               XCLIENT_HOST_UNAVAILABLE);
+                       }
+                       else {
+                               session->hostname = memory_pool_strdup (session->pool,
+                                               XCLIENT_HOST_TEMPFAIL);
+                       }
+                       session->state = SMTP_PROXY_STATE_DELAY;
+                       smtp_make_delay (session);
+               }
+               else {
+                       if (reply->elements) {
+                               elt = reply->elements->data;
+                               session->hostname = memory_pool_strdup (session->pool,
+                                               elt->ptr.name);
+                               session->state = SMTP_PROXY_STATE_RESOLVE_NORMAL;
+                               make_dns_request (session->resolver, session->s, session->pool,
+                                               smtp_dns_cb, session, DNS_REQUEST_A, session->hostname);
+
+                       }
+               }
+               break;
+       case SMTP_PROXY_STATE_RESOLVE_NORMAL:
+               if (reply->code != DNS_RC_NOERROR) {
+                       rspamd_conditional_debug (rspamd_main->logger,
+                                       session->client_addr.s_addr, __FUNCTION__, "DNS error: %s",
+                                       dns_strerror (reply->code));
+
+                       if (reply->code == DNS_RC_NXDOMAIN) {
+                               session->hostname = memory_pool_strdup (session->pool,
+                                               XCLIENT_HOST_UNAVAILABLE);
+                       }
+                       else {
+                               session->hostname = memory_pool_strdup (session->pool,
+                                               XCLIENT_HOST_TEMPFAIL);
+                       }
+                       session->state = SMTP_PROXY_STATE_DELAY;
+                       smtp_make_delay (session);
+               }
+               else {
+                       res = 0;
+                       cur = reply->elements;
+                       while (cur) {
+                               elt = cur->data;
+                               if (memcmp (&session->client_addr, &elt->a.addr[0],
+                                               sizeof(struct in_addr)) == 0) {
+                                       res = 1;
+                                       session->resolved = TRUE;
+                                       break;
+                               }
+                               cur = g_list_next (cur);
+                       }
+
+                       if (res == 0) {
+                               msg_info(
+                                               "cannot find address for hostname: %s, ip: %s", session->hostname, inet_ntoa (session->client_addr));
+                               session->hostname = memory_pool_strdup (session->pool,
+                                               XCLIENT_HOST_UNAVAILABLE);
+                       }
+                       session->state = SMTP_PROXY_STATE_DELAY;
+                       smtp_make_delay (session);
+               }
+               break;
+       default:
+               /* TODO: write something about pipelining */
+               break;
+       }
+}
+
+/*
+ * Callback that is called when there is data to read in buffer
+ */
+static                          gboolean
+smtp_proxy_read_socket (f_str_t * in, void *arg)
+{
+       struct smtp_proxy_session            *session = arg;
+
+       /* This can be called only if client is using invalid pipelining */
+       if (rspamd_dispatcher_write (session->dispatcher,
+                       make_smtp_error (session->pool, 550, "%s Improper use of SMTP command pipelining", "5.5.0"),
+                       0, FALSE, TRUE)) {
+               destroy_session (session->s);
+       }
+       else {
+               return FALSE;
+       }
+
+       return TRUE;
+}
+
+/*
+ * Called if something goes wrong
+ */
+static void
+smtp_proxy_err_socket (GError * err, void *arg)
+{
+       struct smtp_proxy_session            *session = arg;
+
+       msg_info ("abnormally closing connection, error: %s", err->message);
+       /* Free buffers */
+       destroy_session (session->s);
+}
+
+/*
+ * Accept new connection and construct session
+ */
+static void
+accept_socket (gint fd, short what, void *arg)
+{
+       struct rspamd_worker           *worker = (struct rspamd_worker *)arg;
+       union sa_union                  su;
+       struct smtp_proxy_session      *session;
+       struct smtp_proxy_ctx          *ctx;
+
+       socklen_t                       addrlen = sizeof (su.ss);
+       gint                            nfd;
+
+       if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
+               msg_warn ("accept failed: %s", strerror (errno));
+               return;
+       }
+       /* Check for EAGAIN */
+       if (nfd == 0) {
+               return;
+       }
+
+       ctx = worker->ctx;
+       session = g_slice_alloc0 (sizeof (struct smtp_proxy_session));
+       session->pool = memory_pool_new (memory_pool_get_size ());
+
+       if (su.ss.ss_family == AF_UNIX) {
+               msg_info ("accepted connection from unix socket");
+               session->client_addr.s_addr = INADDR_NONE;
+       }
+       else if (su.ss.ss_family == AF_INET) {
+               msg_info ("accepted connection from %s port %d", inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
+               memcpy (&session->client_addr, &su.s4.sin_addr, sizeof (struct in_addr));
+       }
+
+       session->sock = nfd;
+       session->worker = worker;
+       session->ctx = ctx;
+       session->resolver = ctx->resolver;
+       session->ev_base = ctx->ev_base;
+       worker->srv->stat->connections_count++;
+
+       /* Resolve client's addr */
+       /* Set up async session */
+       session->s = new_async_session (session->pool, NULL, NULL, free_smtp_proxy_session, session);
+       session->state = SMTP_PROXY_STATE_RESOLVE_REVERSE;
+       if (! make_dns_request (session->resolver, session->s, session->pool,
+                       smtp_dns_cb, session, DNS_REQUEST_PTR, &session->client_addr)) {
+               msg_err ("cannot resolve %s", inet_ntoa (session->client_addr));
+               g_slice_free1 (sizeof (struct smtp_proxy_session), session);
+               close (nfd);
+               return;
+       }
+       else {
+               session->dispatcher = rspamd_create_dispatcher (session->ev_base, nfd, BUFFER_ANY,
+                                                               smtp_proxy_read_socket, NULL, smtp_proxy_err_socket,
+                                                               &session->ctx->smtp_timeout, session);
+               session->dispatcher->peer_addr = session->client_addr.s_addr;
+       }
+}
+
+gpointer
+init_smtp_proxy (void)
+{
+       struct smtp_proxy_ctx                   *ctx;
+       GQuark                                                          type;
+
+       type = g_quark_try_string ("smtp_proxy");
+
+       ctx = g_malloc0 (sizeof (struct smtp_worker_ctx));
+       ctx->pool = memory_pool_new (memory_pool_get_size ());
+
+       /* Set default values */
+       ctx->smtp_timeout_raw = 300000;
+       ctx->smtp_delay = 0;
+
+       register_worker_opt (type, "upstreams", xml_handle_string, ctx,
+                               G_STRUCT_OFFSET (struct smtp_proxy_ctx, upstreams_str));
+       register_worker_opt (type, "timeout", xml_handle_seconds, ctx,
+                                       G_STRUCT_OFFSET (struct smtp_proxy_ctx, smtp_timeout_raw));
+       register_worker_opt (type, "delay", xml_handle_seconds, ctx,
+                                       G_STRUCT_OFFSET (struct smtp_proxy_ctx, smtp_delay));
+       register_worker_opt (type, "jitter", xml_handle_seconds, ctx,
+                                               G_STRUCT_OFFSET (struct smtp_proxy_ctx, delay_jitter));
+
+       return ctx;
+}
+
+/* Make post-init configuration */
+static gboolean
+config_smtp_proxy_worker (struct rspamd_worker *worker)
+{
+       struct smtp_proxy_ctx         *ctx = worker->ctx;
+       gchar                         *value;
+
+       /* Init timeval */
+       msec_to_tv (ctx->smtp_timeout_raw, &ctx->smtp_timeout);
+
+       /* Init upstreams */
+       if ((value = ctx->upstreams_str) != NULL) {
+               if (!parse_upstreams_line (ctx->pool, ctx->upstreams, value, &ctx->upstream_num)) {
+                       return FALSE;
+               }
+       }
+       else {
+               msg_err ("no upstreams defined, don't know what to do");
+               return FALSE;
+       }
+
+       return TRUE;
+}
+
+/*
+ * Start worker process
+ */
+void
+start_smtp_proxy (struct rspamd_worker *worker)
+{
+       struct sigaction                signals;
+       struct smtp_worker_ctx         *ctx = worker->ctx;
+
+       gperf_profiler_init (worker->srv->cfg, "worker");
+
+       worker->srv->pid = getpid ();
+       ctx->ev_base = event_init ();
+
+       /* Set smtp options */
+       if ( !config_smtp_proxy_worker (worker)) {
+               msg_err ("cannot configure smtp worker, exiting");
+               exit (EXIT_SUCCESS);
+       }
+
+       init_signals (&signals, sig_handler);
+       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
+
+       /* SIGUSR2 handler */
+       signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
+       event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
+       signal_add (&worker->sig_ev_usr2, NULL);
+
+       /* SIGUSR1 handler */
+       signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler, (void *) worker);
+       event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
+       signal_add (&worker->sig_ev_usr1, NULL);
+
+       /* Accept event */
+       event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
+       event_base_set (ctx->ev_base, &worker->bind_ev);
+       event_add (&worker->bind_ev, NULL);
+
+       /* DNS resolver */
+       ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
+
+       /* Set umask */
+       umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);
+
+       event_base_loop (ctx->ev_base, 0);
+
+       close_log (rspamd_main->logger);
+       exit (EXIT_SUCCESS);
+}
+
index 8ceaef5a673d3214fa2bc6ee99ba683b2ba08020..170efdad561f1acf59fff04d87732905a4a33494 100644 (file)
@@ -28,7 +28,7 @@
 #include "smtp.h"
 #include "smtp_proto.h"
 
-gboolean
+void
 free_smtp_session (gpointer arg)
 {
        struct smtp_session            *session = arg;
@@ -56,8 +56,6 @@ free_smtp_session (gpointer arg)
                memory_pool_delete (session->pool);
                g_free (session);
        }
-
-       return TRUE;
 }
 
 gboolean
@@ -313,3 +311,59 @@ err:
        destroy_session (session->s);
        return FALSE;
 }
+
+gboolean
+parse_upstreams_line (memory_pool_t *pool, struct smtp_upstream *upstreams, const gchar *line, gsize *count)
+{
+       gchar                           **strv, *p, *t, *tt, *err_str;
+       guint32                         num, i;
+       struct smtp_upstream           *cur;
+       gchar                           resolved_path[PATH_MAX];
+
+       strv = g_strsplit_set (line, ",; ", -1);
+       num = g_strv_length (strv);
+
+       if (num >= MAX_SMTP_UPSTREAMS) {
+               msg_err ("cannot define %d upstreams %d is max", num, MAX_SMTP_UPSTREAMS);
+               return FALSE;
+       }
+       *count = 0;
+
+       for (i = 0; i < num; i ++) {
+               p = strv[i];
+               cur = &upstreams[*count];
+               if ((t = strrchr (p, ':')) != NULL && (tt = strchr (p, ':')) != t) {
+                       /* Assume that after last `:' we have weigth */
+                       *t = '\0';
+                       t ++;
+                       errno = 0;
+                       cur->up.priority = strtoul (t, &err_str, 10);
+                       if (errno != 0 || (err_str && *err_str != '\0')) {
+                               msg_err ("cannot convert weight: %s, %s", t, strerror (errno));
+                               g_strfreev (strv);
+                               return FALSE;
+                       }
+               }
+               if (*p == '/') {
+                       cur->is_unix = TRUE;
+                       if (realpath (p, resolved_path) == NULL) {
+                               msg_err ("cannot resolve path: %s", resolved_path);
+                               g_strfreev (strv);
+                               return FALSE;
+                       }
+                       cur->name = memory_pool_strdup (pool, resolved_path);
+                       (*count) ++;
+               }
+               else {
+                       if (! parse_host_port (p, &cur->addr, &cur->port)) {
+                               g_strfreev (strv);
+                               return FALSE;
+                       }
+                       cur->name = memory_pool_strdup (pool, p);
+                       (*count) ++;
+               }
+       }
+
+       g_strfreev (strv);
+       return TRUE;
+}
index e61a85ae21875083b0491a18f2ec578b4bd25597..b32b8235e08c8ea9ab0c8b63f6a69bd7ff88690e 100644 (file)
@@ -4,13 +4,25 @@
 #include "config.h"
 #include "main.h"
 #include "smtp.h"
-#include "smtp_proto.h"
 
 /**
  * @file smtp_utils.h
  * Contains utilities for smtp protocol handling
  */
 
+struct smtp_upstream {
+       struct upstream up;
+
+       const gchar *name;
+       struct in_addr addr;
+       guint16 port;
+       gboolean is_unix;
+};
+
+#define MAX_SMTP_UPSTREAMS 128
+
+struct smtp_session;
+
 /**
  * Send message to upstream
  * @param session session object
@@ -39,4 +51,13 @@ gboolean write_smtp_reply (struct smtp_session *session);
  */
 void free_smtp_session (gpointer arg);
 
+/**
+ * Parse upstreams line
+ * @param upstreams pointer to the array of upstreams (must be at least MAX_SMTP_UPSTREAMS size)
+ * @param line description line
+ * @param count targeted count
+ * @return
+ */
+gboolean parse_upstreams_line (memory_pool_t *pool, struct smtp_upstream *upstreams, const gchar *line, gsize *count);
+
 #endif /* SMTP_UTILS_H_ */