]> source.dussan.org Git - rspamd.git/commitdiff
[Rework] Rename http proxy to rspamd proxy
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 6 May 2016 12:28:56 +0000 (13:28 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 6 May 2016 12:28:56 +0000 (13:28 +0100)
src/CMakeLists.txt
src/http_proxy.c [deleted file]
src/rspamadm/CMakeLists.txt
src/rspamd_proxy.c [new file with mode: 0644]

index 349924214b1b769e87d553eaff52541b7809c0e6..a7b03f55ce997b699c60114ab641db05f73589c6 100644 (file)
@@ -80,7 +80,7 @@ SET(RSPAMDSRC controller.c
                                rspamd.c
                                smtp_proxy.c
                                worker.c
-                               http_proxy.c
+                               rspamd_proxy.c
                                log_helper.c)
 
 SET(PLUGINSSRC plugins/surbl.c
@@ -89,10 +89,11 @@ SET(PLUGINSSRC      plugins/surbl.c
                                plugins/fuzzy_check.c
                                plugins/spf.c
                                plugins/dkim_check.c
-                               libserver/rspamd_control.c lua/lua_fann.c)
+                               libserver/rspamd_control.c
+                               lua/lua_fann.c)
 
 SET(MODULES_LIST surbl regexp chartable fuzzy_check spf dkim)
-SET(WORKERS_LIST normal controller smtp_proxy fuzzy lua http_proxy log_helper)
+SET(WORKERS_LIST normal controller smtp_proxy fuzzy lua rspamd_proxy log_helper)
 IF (ENABLE_HYPERSCAN MATCHES "ON")
        LIST(APPEND WORKERS_LIST "hs_helper")
        LIST(APPEND RSPAMDSRC "hs_helper.c")
diff --git a/src/http_proxy.c b/src/http_proxy.c
deleted file mode 100644 (file)
index b61511d..0000000
+++ /dev/null
@@ -1,506 +0,0 @@
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "config.h"
-#include "libutil/util.h"
-#include "libutil/map.h"
-#include "libutil/upstream.h"
-#include "libserver/protocol.h"
-#include "libserver/cfg_file.h"
-#include "libserver/url.h"
-#include "libserver/dns.h"
-#include "libmime/message.h"
-#include "rspamd.h"
-#include "libserver/worker_util.h"
-#include "keypairs_cache.h"
-#include "ottery.h"
-#include "unix-std.h"
-
-/* Rotate keys each minute by default */
-#define DEFAULT_ROTATION_TIME 60.0
-
-gpointer init_http_proxy (struct rspamd_config *cfg);
-void start_http_proxy (struct rspamd_worker *worker);
-
-worker_t http_proxy_worker = {
-       "http_proxy",               /* Name */
-       init_http_proxy,            /* Init function */
-       start_http_proxy,           /* Start function */
-       RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE,
-       SOCK_STREAM,                /* TCP socket */
-       RSPAMD_WORKER_VER
-};
-
-struct rspamd_http_upstream {
-       gchar *name;
-       struct upstream_list *u;
-       struct rspamd_cryptobox_pubkey *key;
-};
-
-static const guint64 rspamd_http_proxy_magic = 0xcdeb4fd1fc351980ULL;
-
-struct http_proxy_ctx {
-       guint64 magic;
-       gdouble timeout;
-       struct timeval io_tv;
-       struct rspamd_config *cfg;
-       /* DNS resolver */
-       struct rspamd_dns_resolver *resolver;
-       /* Events base */
-       struct event_base *ev_base;
-       /* Encryption key for clients */
-       struct rspamd_cryptobox_keypair *key;
-       /* Keys cache */
-       struct rspamd_keypair_cache *keys_cache;
-       /* Upstreams to use */
-       GHashTable *upstreams;
-       /* Default upstream */
-       struct rspamd_http_upstream *default_upstream;
-       /* Local rotating keypair for upstreams */
-       struct rspamd_cryptobox_keypair *local_key;
-       struct event rotate_ev;
-       gdouble rotate_tm;
-};
-
-struct http_proxy_session {
-       struct http_proxy_ctx *ctx;
-       struct event_base *ev_base;
-       struct rspamd_cryptobox_keypair *local_key;
-       struct rspamd_cryptobox_pubkey *remote_key;
-       struct upstream *up;
-       gint client_sock;
-       gint backend_sock;
-       rspamd_inet_addr_t *client_addr;
-       struct rspamd_http_connection *client_conn;
-       struct rspamd_http_connection *backend_conn;
-       struct rspamd_dns_resolver *resolver;
-       gboolean replied;
-};
-
-static GQuark
-http_proxy_quark (void)
-{
-       return g_quark_from_static_string ("http-proxy");
-}
-
-static gboolean
-http_proxy_parse_upstream (rspamd_mempool_t *pool,
-       const ucl_object_t *obj,
-       gpointer ud,
-       struct rspamd_rcl_section *section,
-       GError **err)
-{
-       const ucl_object_t *elt;
-       struct rspamd_http_upstream *up = NULL;
-       struct http_proxy_ctx *ctx;
-       struct rspamd_rcl_struct_parser *pd = ud;
-
-       ctx = pd->user_struct;
-
-       if (ucl_object_type (obj) != UCL_OBJECT) {
-               g_set_error (err, http_proxy_quark (), 100,
-                               "upstream option must be an object");
-
-               return FALSE;
-       }
-
-       elt = ucl_object_lookup (obj, "name");
-       if (elt == NULL) {
-               g_set_error (err, http_proxy_quark (), 100,
-                               "upstream option must have some name definition");
-
-               return FALSE;
-       }
-
-       up = g_slice_alloc0 (sizeof (*up));
-       up->name = g_strdup (ucl_object_tostring (elt));
-
-       elt = ucl_object_lookup (obj, "key");
-       if (elt != NULL) {
-               up->key = rspamd_pubkey_from_base32 (ucl_object_tostring (elt), 0,
-                               RSPAMD_KEYPAIR_KEX, RSPAMD_CRYPTOBOX_MODE_25519);
-
-               if (up->key == NULL) {
-                       g_set_error (err, http_proxy_quark (), 100,
-                                       "cannot read upstream key");
-
-                       goto err;
-               }
-       }
-
-       elt = ucl_object_lookup (obj, "hosts");
-
-       if (elt == NULL) {
-               g_set_error (err, http_proxy_quark (), 100,
-                               "upstream option must have some hosts definition");
-
-               goto err;
-       }
-
-       up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx);
-       if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) {
-               g_set_error (err, http_proxy_quark (), 100,
-                               "upstream has bad hosts definition");
-
-               goto err;
-       }
-
-       elt = ucl_object_lookup (obj, "default");
-       if (elt && ucl_object_toboolean (elt)) {
-               ctx->default_upstream = up;
-       }
-
-       g_hash_table_insert (ctx->upstreams, up->name, up);
-
-       return TRUE;
-
-err:
-
-       if (up) {
-               g_free (up->name);
-               rspamd_upstreams_destroy (up->u);
-
-               if (up->key) {
-                       rspamd_pubkey_unref (up->key);
-               }
-
-               g_slice_free1 (sizeof (*up), up);
-       }
-
-       return FALSE;
-}
-
-gpointer
-init_http_proxy (struct rspamd_config *cfg)
-{
-       struct http_proxy_ctx *ctx;
-       GQuark type;
-
-       type = g_quark_try_string ("http_proxy");
-
-       ctx = g_malloc0 (sizeof (struct http_proxy_ctx));
-       ctx->magic = rspamd_http_proxy_magic;
-       ctx->timeout = 5.0;
-       ctx->upstreams = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
-       ctx->rotate_tm = DEFAULT_ROTATION_TIME;
-       ctx->cfg = cfg;
-
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "timeout",
-                       rspamd_rcl_parse_struct_time,
-                       ctx,
-                       G_STRUCT_OFFSET (struct http_proxy_ctx,
-                                       timeout),
-                       RSPAMD_CL_FLAG_TIME_FLOAT,
-                       "IO timeout");
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "rotate",
-                       rspamd_rcl_parse_struct_time,
-                       ctx,
-                       G_STRUCT_OFFSET (struct http_proxy_ctx,
-                                       rotate_tm),
-                       RSPAMD_CL_FLAG_TIME_FLOAT,
-                       "Rotation keys time, default: "
-                       G_STRINGIFY (DEFAULT_ROTATION_TIME) " seconds");
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "keypair",
-                       rspamd_rcl_parse_struct_keypair,
-                       ctx,
-                       G_STRUCT_OFFSET (struct http_proxy_ctx,
-                                       key),
-                       0,
-                       "Server's keypair");
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "upstream",
-                       http_proxy_parse_upstream,
-                       ctx,
-                       0,
-                       0,
-                       "List of upstreams");
-
-       return ctx;
-}
-
-static void
-proxy_session_cleanup (struct http_proxy_session *session)
-{
-       rspamd_inet_address_destroy (session->client_addr);
-
-       if (session->backend_conn) {
-               rspamd_http_connection_unref (session->backend_conn);
-       }
-       if (session->client_conn) {
-               rspamd_http_connection_unref (session->client_conn);
-       }
-
-       close (session->backend_sock);
-       close (session->client_sock);
-
-       g_slice_free1 (sizeof (*session), session);
-}
-
-static void
-proxy_client_write_error (struct http_proxy_session *session, gint code)
-{
-       struct rspamd_http_message *reply;
-
-       reply = rspamd_http_new_message (HTTP_RESPONSE);
-       reply->code = code;
-       rspamd_http_connection_write_message (session->client_conn,
-                       reply, NULL, NULL, session, session->client_sock,
-                       &session->ctx->io_tv, session->ev_base);
-}
-
-static void
-proxy_backend_error_handler (struct rspamd_http_connection *conn, GError *err)
-{
-       struct http_proxy_session *session = conn->ud;
-
-       msg_info ("abnormally closing connection from backend: %s, error: %s",
-               rspamd_inet_address_to_string (rspamd_upstream_addr (session->up)),
-               err->message);
-       rspamd_http_connection_reset (session->backend_conn);
-       /* Terminate session immediately */
-       proxy_client_write_error (session, err->code);
-}
-
-static gint
-proxy_backend_finish_handler (struct rspamd_http_connection *conn,
-       struct rspamd_http_message *msg)
-{
-       struct http_proxy_session *session = conn->ud;
-
-       rspamd_http_connection_steal_msg (session->backend_conn);
-       rspamd_http_message_remove_header (msg, "Content-Length");
-       rspamd_http_message_remove_header (msg, "Key");
-       rspamd_http_connection_reset (session->backend_conn);
-       rspamd_http_connection_write_message (session->client_conn,
-               msg, NULL, NULL, session, session->client_sock,
-               &session->ctx->io_tv, session->ev_base);
-
-       return 0;
-}
-
-static void
-proxy_client_error_handler (struct rspamd_http_connection *conn, GError *err)
-{
-       struct http_proxy_session *session = conn->ud;
-
-       msg_info ("abnormally closing connection from: %s, error: %s",
-               rspamd_inet_address_to_string (session->client_addr), err->message);
-       /* Terminate session immediately */
-       proxy_session_cleanup (session);
-}
-
-static gint
-proxy_client_finish_handler (struct rspamd_http_connection *conn,
-       struct rspamd_http_message *msg)
-{
-       struct http_proxy_session *session = conn->ud;
-       struct rspamd_http_upstream *backend = NULL;
-       const rspamd_ftok_t *host;
-       gchar hostbuf[512];
-
-       if (!session->replied) {
-               host = rspamd_http_message_find_header (msg, "Host");
-
-               if (host == NULL) {
-                       backend = session->ctx->default_upstream;
-               }
-               else {
-                       rspamd_strlcpy (hostbuf, host->begin, MIN(host->len + 1, sizeof (hostbuf)));
-                       backend = g_hash_table_lookup (session->ctx->upstreams, hostbuf);
-
-                       if (backend == NULL) {
-                               backend = session->ctx->default_upstream;
-                       }
-               }
-
-               if (backend == NULL) {
-                       /* No backend */
-                       msg_err ("cannot find upstream for %s", host ? hostbuf : "default");
-                       goto err;
-               }
-               else {
-                       session->up = rspamd_upstream_get (backend->u,
-                                       RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
-
-                       if (session->up == NULL) {
-                               msg_err ("cannot select upstream for %s", host ? hostbuf : "default");
-                               goto err;
-                       }
-
-                       session->backend_sock = rspamd_inet_address_connect (
-                                       rspamd_upstream_addr (session->up), SOCK_STREAM, TRUE);
-
-                       if (session->backend_sock == -1) {
-                               msg_err ("cannot connect upstream for %s", host ? hostbuf : "default");
-                               rspamd_upstream_fail (session->up);
-                               goto err;
-                       }
-
-                       rspamd_http_connection_steal_msg (session->client_conn);
-                       rspamd_http_message_remove_header (msg, "Content-Length");
-                       rspamd_http_message_remove_header (msg, "Key");
-                       rspamd_http_connection_reset (session->client_conn);
-                       session->backend_conn = rspamd_http_connection_new (
-                                       NULL,
-                                       proxy_backend_error_handler,
-                                       proxy_backend_finish_handler,
-                                       RSPAMD_HTTP_CLIENT_SIMPLE,
-                                       RSPAMD_HTTP_CLIENT,
-                                       session->ctx->keys_cache);
-
-                       rspamd_http_connection_set_key (session->backend_conn,
-                                       session->ctx->local_key);
-                       msg->peer_key = rspamd_pubkey_ref (backend->key);
-                       session->replied = TRUE;
-
-                       rspamd_http_connection_write_message (session->backend_conn,
-                               msg, NULL, NULL, session, session->backend_sock,
-                               &session->ctx->io_tv, session->ev_base);
-               }
-       }
-       else {
-               proxy_session_cleanup (session);
-       }
-
-       return 0;
-
-err:
-       session->replied = TRUE;
-       proxy_client_write_error (session, 404);
-
-       return 0;
-}
-
-static void
-proxy_accept_socket (gint fd, short what, void *arg)
-{
-       struct rspamd_worker *worker = (struct rspamd_worker *) arg;
-       struct http_proxy_ctx *ctx;
-       rspamd_inet_addr_t *addr;
-       struct http_proxy_session *session;
-       gint nfd;
-
-       ctx = worker->ctx;
-
-       if ((nfd =
-               rspamd_accept_from_socket (fd, &addr)) == -1) {
-               msg_warn ("accept failed: %s", strerror (errno));
-               return;
-       }
-       /* Check for EAGAIN */
-       if (nfd == 0) {
-               return;
-       }
-
-       msg_info ("accepted connection from %s port %d",
-               rspamd_inet_address_to_string (addr),
-               rspamd_inet_address_get_port (addr));
-
-       session = g_slice_alloc0 (sizeof (*session));
-       session->client_sock = nfd;
-       session->client_addr = addr;
-
-       session->resolver = ctx->resolver;
-
-       session->client_conn = rspamd_http_connection_new (
-               NULL,
-               proxy_client_error_handler,
-               proxy_client_finish_handler,
-               0,
-               RSPAMD_HTTP_SERVER,
-               ctx->keys_cache);
-       session->ev_base = ctx->ev_base;
-       session->ctx = ctx;
-
-       if (ctx->key) {
-               rspamd_http_connection_set_key (session->client_conn, ctx->key);
-       }
-
-       rspamd_http_connection_read_message (session->client_conn,
-               session,
-               nfd,
-               &ctx->io_tv,
-               ctx->ev_base);
-}
-
-static void
-proxy_rotate_key (gint fd, short what, void *arg)
-{
-       struct timeval rot_tv;
-       struct http_proxy_ctx *ctx = arg;
-       gpointer kp;
-
-       double_to_tv (ctx->rotate_tm, &rot_tv);
-       rot_tv.tv_sec += ottery_rand_range (rot_tv.tv_sec);
-       event_del (&ctx->rotate_ev);
-       event_add (&ctx->rotate_ev, &rot_tv);
-
-       kp = ctx->local_key;
-       ctx->local_key = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX,
-                       RSPAMD_CRYPTOBOX_MODE_25519);
-       rspamd_keypair_unref (kp);
-}
-
-void
-start_http_proxy (struct rspamd_worker *worker)
-{
-       struct http_proxy_ctx *ctx = worker->ctx;
-       struct timeval rot_tv;
-
-       ctx->ev_base = rspamd_prepare_worker (worker, "http_proxy",
-                       proxy_accept_socket);
-
-       ctx->resolver = dns_resolver_init (worker->srv->logger,
-                       ctx->ev_base,
-                       worker->srv->cfg);
-       double_to_tv (ctx->timeout, &ctx->io_tv);
-       rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver);
-
-       rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
-                       ctx->ev_base, ctx->resolver->r);
-
-       /* XXX: stupid default */
-       ctx->keys_cache = rspamd_keypair_cache_new (256);
-       ctx->local_key = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX,
-                       RSPAMD_CRYPTOBOX_MODE_25519);
-
-       double_to_tv (ctx->rotate_tm, &rot_tv);
-       rot_tv.tv_sec += ottery_rand_range (rot_tv.tv_sec);
-       event_set (&ctx->rotate_ev, -1, EV_TIMEOUT, proxy_rotate_key, ctx);
-       event_base_set (ctx->ev_base, &ctx->rotate_ev);
-       event_add (&ctx->rotate_ev, &rot_tv);
-
-       event_base_loop (ctx->ev_base, 0);
-       rspamd_worker_block_signals ();
-
-       g_mime_shutdown ();
-       rspamd_log_close (worker->srv->logger);
-
-       if (ctx->key) {
-               rspamd_keypair_unref (ctx->key);
-       }
-
-       rspamd_keypair_cache_destroy (ctx->keys_cache);
-
-       exit (EXIT_SUCCESS);
-}
-
index a739fa5be3c328c4feeca7fefb001e2de231c684..19d78c7bbc2ad5bd956f5830bcf415cb4a3e1877 100644 (file)
@@ -16,7 +16,7 @@ SET(RSPAMADMSRC rspamadm.c
         ${CMAKE_SOURCE_DIR}/src/lua_worker.c
         ${CMAKE_SOURCE_DIR}/src/smtp_proxy.c
         ${CMAKE_SOURCE_DIR}/src/worker.c
-        ${CMAKE_SOURCE_DIR}/src/http_proxy.c
+        ${CMAKE_SOURCE_DIR}/src/rspamd_proxy.c
         ${CMAKE_SOURCE_DIR}/src/log_helper.c)
 SET(RSPAMADMLUASRC
         ${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_stat.lua
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
new file mode 100644 (file)
index 0000000..f7b4190
--- /dev/null
@@ -0,0 +1,505 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "libutil/util.h"
+#include "libutil/map.h"
+#include "libutil/upstream.h"
+#include "libserver/protocol.h"
+#include "libserver/cfg_file.h"
+#include "libserver/url.h"
+#include "libserver/dns.h"
+#include "libmime/message.h"
+#include "rspamd.h"
+#include "libserver/worker_util.h"
+#include "keypairs_cache.h"
+#include "ottery.h"
+#include "unix-std.h"
+
+/* Rotate keys each minute by default */
+#define DEFAULT_ROTATION_TIME 60.0
+
+gpointer init_rspamd_proxy (struct rspamd_config *cfg);
+void start_rspamd_proxy (struct rspamd_worker *worker);
+
+worker_t rspamd_proxy_worker = {
+       "rspamd_proxy",               /* Name */
+       init_rspamd_proxy,            /* Init function */
+       start_rspamd_proxy,           /* Start function */
+       RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE,
+       SOCK_STREAM,                /* TCP socket */
+       RSPAMD_WORKER_VER
+};
+
+struct rspamd_http_upstream {
+       gchar *name;
+       struct upstream_list *u;
+       struct rspamd_cryptobox_pubkey *key;
+};
+
+static const guint64 rspamd_rspamd_proxy_magic = 0xcdeb4fd1fc351980ULL;
+
+struct rspamd_proxy_ctx {
+       guint64 magic;
+       gdouble timeout;
+       struct timeval io_tv;
+       struct rspamd_config *cfg;
+       /* DNS resolver */
+       struct rspamd_dns_resolver *resolver;
+       /* Events base */
+       struct event_base *ev_base;
+       /* Encryption key for clients */
+       struct rspamd_cryptobox_keypair *key;
+       /* Keys cache */
+       struct rspamd_keypair_cache *keys_cache;
+       /* Upstreams to use */
+       GHashTable *upstreams;
+       /* Default upstream */
+       struct rspamd_http_upstream *default_upstream;
+       /* Local rotating keypair for upstreams */
+       struct rspamd_cryptobox_keypair *local_key;
+       struct event rotate_ev;
+       gdouble rotate_tm;
+};
+
+struct rspamd_proxy_session {
+       struct rspamd_proxy_ctx *ctx;
+       struct event_base *ev_base;
+       struct rspamd_cryptobox_keypair *local_key;
+       struct rspamd_cryptobox_pubkey *remote_key;
+       struct upstream *up;
+       gint client_sock;
+       gint backend_sock;
+       rspamd_inet_addr_t *client_addr;
+       struct rspamd_http_connection *client_conn;
+       struct rspamd_http_connection *backend_conn;
+       struct rspamd_dns_resolver *resolver;
+       gboolean replied;
+};
+
+static GQuark
+rspamd_proxy_quark (void)
+{
+       return g_quark_from_static_string ("http-proxy");
+}
+
+static gboolean
+rspamd_proxy_parse_upstream (rspamd_mempool_t *pool,
+       const ucl_object_t *obj,
+       gpointer ud,
+       struct rspamd_rcl_section *section,
+       GError **err)
+{
+       const ucl_object_t *elt;
+       struct rspamd_http_upstream *up = NULL;
+       struct rspamd_proxy_ctx *ctx;
+       struct rspamd_rcl_struct_parser *pd = ud;
+
+       ctx = pd->user_struct;
+
+       if (ucl_object_type (obj) != UCL_OBJECT) {
+               g_set_error (err, rspamd_proxy_quark (), 100,
+                               "upstream option must be an object");
+
+               return FALSE;
+       }
+
+       elt = ucl_object_lookup (obj, "name");
+       if (elt == NULL) {
+               g_set_error (err, rspamd_proxy_quark (), 100,
+                               "upstream option must have some name definition");
+
+               return FALSE;
+       }
+
+       up = g_slice_alloc0 (sizeof (*up));
+       up->name = g_strdup (ucl_object_tostring (elt));
+
+       elt = ucl_object_lookup (obj, "key");
+       if (elt != NULL) {
+               up->key = rspamd_pubkey_from_base32 (ucl_object_tostring (elt), 0,
+                               RSPAMD_KEYPAIR_KEX, RSPAMD_CRYPTOBOX_MODE_25519);
+
+               if (up->key == NULL) {
+                       g_set_error (err, rspamd_proxy_quark (), 100,
+                                       "cannot read upstream key");
+
+                       goto err;
+               }
+       }
+
+       elt = ucl_object_lookup (obj, "hosts");
+
+       if (elt == NULL) {
+               g_set_error (err, rspamd_proxy_quark (), 100,
+                               "upstream option must have some hosts definition");
+
+               goto err;
+       }
+
+       up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx);
+       if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) {
+               g_set_error (err, rspamd_proxy_quark (), 100,
+                               "upstream has bad hosts definition");
+
+               goto err;
+       }
+
+       elt = ucl_object_lookup (obj, "default");
+       if (elt && ucl_object_toboolean (elt)) {
+               ctx->default_upstream = up;
+       }
+
+       g_hash_table_insert (ctx->upstreams, up->name, up);
+
+       return TRUE;
+
+err:
+
+       if (up) {
+               g_free (up->name);
+               rspamd_upstreams_destroy (up->u);
+
+               if (up->key) {
+                       rspamd_pubkey_unref (up->key);
+               }
+
+               g_slice_free1 (sizeof (*up), up);
+       }
+
+       return FALSE;
+}
+
+gpointer
+init_rspamd_proxy (struct rspamd_config *cfg)
+{
+       struct rspamd_proxy_ctx *ctx;
+       GQuark type;
+
+       type = g_quark_try_string ("rspamd_proxy");
+
+       ctx = g_malloc0 (sizeof (struct rspamd_proxy_ctx));
+       ctx->magic = rspamd_rspamd_proxy_magic;
+       ctx->timeout = 5.0;
+       ctx->upstreams = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
+       ctx->rotate_tm = DEFAULT_ROTATION_TIME;
+       ctx->cfg = cfg;
+
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "timeout",
+                       rspamd_rcl_parse_struct_time,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_proxy_ctx,
+                                       timeout),
+                       RSPAMD_CL_FLAG_TIME_FLOAT,
+                       "IO timeout");
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "rotate",
+                       rspamd_rcl_parse_struct_time,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_proxy_ctx,
+                                       rotate_tm),
+                       RSPAMD_CL_FLAG_TIME_FLOAT,
+                       "Rotation keys time, default: "
+                       G_STRINGIFY (DEFAULT_ROTATION_TIME) " seconds");
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "keypair",
+                       rspamd_rcl_parse_struct_keypair,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_proxy_ctx,
+                                       key),
+                       0,
+                       "Server's keypair");
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "upstream",
+                       rspamd_proxy_parse_upstream,
+                       ctx,
+                       0,
+                       0,
+                       "List of upstreams");
+
+       return ctx;
+}
+
+static void
+proxy_session_cleanup (struct rspamd_proxy_session *session)
+{
+       rspamd_inet_address_destroy (session->client_addr);
+
+       if (session->backend_conn) {
+               rspamd_http_connection_unref (session->backend_conn);
+       }
+       if (session->client_conn) {
+               rspamd_http_connection_unref (session->client_conn);
+       }
+
+       close (session->backend_sock);
+       close (session->client_sock);
+
+       g_slice_free1 (sizeof (*session), session);
+}
+
+static void
+proxy_client_write_error (struct rspamd_proxy_session *session, gint code)
+{
+       struct rspamd_http_message *reply;
+
+       reply = rspamd_http_new_message (HTTP_RESPONSE);
+       reply->code = code;
+       rspamd_http_connection_write_message (session->client_conn,
+                       reply, NULL, NULL, session, session->client_sock,
+                       &session->ctx->io_tv, session->ev_base);
+}
+
+static void
+proxy_backend_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+       struct rspamd_proxy_session *session = conn->ud;
+
+       msg_info ("abnormally closing connection from backend: %s, error: %s",
+               rspamd_inet_address_to_string (rspamd_upstream_addr (session->up)),
+               err->message);
+       rspamd_http_connection_reset (session->backend_conn);
+       /* Terminate session immediately */
+       proxy_client_write_error (session, err->code);
+}
+
+static gint
+proxy_backend_finish_handler (struct rspamd_http_connection *conn,
+       struct rspamd_http_message *msg)
+{
+       struct rspamd_proxy_session *session = conn->ud;
+
+       rspamd_http_connection_steal_msg (session->backend_conn);
+       rspamd_http_message_remove_header (msg, "Content-Length");
+       rspamd_http_message_remove_header (msg, "Key");
+       rspamd_http_connection_reset (session->backend_conn);
+       rspamd_http_connection_write_message (session->client_conn,
+               msg, NULL, NULL, session, session->client_sock,
+               &session->ctx->io_tv, session->ev_base);
+
+       return 0;
+}
+
+static void
+proxy_client_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+       struct rspamd_proxy_session *session = conn->ud;
+
+       msg_info ("abnormally closing connection from: %s, error: %s",
+               rspamd_inet_address_to_string (session->client_addr), err->message);
+       /* Terminate session immediately */
+       proxy_session_cleanup (session);
+}
+
+static gint
+proxy_client_finish_handler (struct rspamd_http_connection *conn,
+       struct rspamd_http_message *msg)
+{
+       struct rspamd_proxy_session *session = conn->ud;
+       struct rspamd_http_upstream *backend = NULL;
+       const rspamd_ftok_t *host;
+       gchar hostbuf[512];
+
+       if (!session->replied) {
+               host = rspamd_http_message_find_header (msg, "Host");
+
+               if (host == NULL) {
+                       backend = session->ctx->default_upstream;
+               }
+               else {
+                       rspamd_strlcpy (hostbuf, host->begin, MIN(host->len + 1, sizeof (hostbuf)));
+                       backend = g_hash_table_lookup (session->ctx->upstreams, hostbuf);
+
+                       if (backend == NULL) {
+                               backend = session->ctx->default_upstream;
+                       }
+               }
+
+               if (backend == NULL) {
+                       /* No backend */
+                       msg_err ("cannot find upstream for %s", host ? hostbuf : "default");
+                       goto err;
+               }
+               else {
+                       session->up = rspamd_upstream_get (backend->u,
+                                       RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
+
+                       if (session->up == NULL) {
+                               msg_err ("cannot select upstream for %s", host ? hostbuf : "default");
+                               goto err;
+                       }
+
+                       session->backend_sock = rspamd_inet_address_connect (
+                                       rspamd_upstream_addr (session->up), SOCK_STREAM, TRUE);
+
+                       if (session->backend_sock == -1) {
+                               msg_err ("cannot connect upstream for %s", host ? hostbuf : "default");
+                               rspamd_upstream_fail (session->up);
+                               goto err;
+                       }
+
+                       rspamd_http_connection_steal_msg (session->client_conn);
+                       rspamd_http_message_remove_header (msg, "Content-Length");
+                       rspamd_http_message_remove_header (msg, "Key");
+                       rspamd_http_connection_reset (session->client_conn);
+                       session->backend_conn = rspamd_http_connection_new (
+                                       NULL,
+                                       proxy_backend_error_handler,
+                                       proxy_backend_finish_handler,
+                                       RSPAMD_HTTP_CLIENT_SIMPLE,
+                                       RSPAMD_HTTP_CLIENT,
+                                       session->ctx->keys_cache);
+
+                       rspamd_http_connection_set_key (session->backend_conn,
+                                       session->ctx->local_key);
+                       msg->peer_key = rspamd_pubkey_ref (backend->key);
+                       session->replied = TRUE;
+
+                       rspamd_http_connection_write_message (session->backend_conn,
+                               msg, NULL, NULL, session, session->backend_sock,
+                               &session->ctx->io_tv, session->ev_base);
+               }
+       }
+       else {
+               proxy_session_cleanup (session);
+       }
+
+       return 0;
+
+err:
+       session->replied = TRUE;
+       proxy_client_write_error (session, 404);
+
+       return 0;
+}
+
+static void
+proxy_accept_socket (gint fd, short what, void *arg)
+{
+       struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+       struct rspamd_proxy_ctx *ctx;
+       rspamd_inet_addr_t *addr;
+       struct rspamd_proxy_session *session;
+       gint nfd;
+
+       ctx = worker->ctx;
+
+       if ((nfd =
+               rspamd_accept_from_socket (fd, &addr)) == -1) {
+               msg_warn ("accept failed: %s", strerror (errno));
+               return;
+       }
+       /* Check for EAGAIN */
+       if (nfd == 0) {
+               return;
+       }
+
+       msg_info ("accepted connection from %s port %d",
+               rspamd_inet_address_to_string (addr),
+               rspamd_inet_address_get_port (addr));
+
+       session = g_slice_alloc0 (sizeof (*session));
+       session->client_sock = nfd;
+       session->client_addr = addr;
+
+       session->resolver = ctx->resolver;
+
+       session->client_conn = rspamd_http_connection_new (
+               NULL,
+               proxy_client_error_handler,
+               proxy_client_finish_handler,
+               0,
+               RSPAMD_HTTP_SERVER,
+               ctx->keys_cache);
+       session->ev_base = ctx->ev_base;
+       session->ctx = ctx;
+
+       if (ctx->key) {
+               rspamd_http_connection_set_key (session->client_conn, ctx->key);
+       }
+
+       rspamd_http_connection_read_message (session->client_conn,
+               session,
+               nfd,
+               &ctx->io_tv,
+               ctx->ev_base);
+}
+
+static void
+proxy_rotate_key (gint fd, short what, void *arg)
+{
+       struct timeval rot_tv;
+       struct rspamd_proxy_ctx *ctx = arg;
+       gpointer kp;
+
+       double_to_tv (ctx->rotate_tm, &rot_tv);
+       rot_tv.tv_sec += ottery_rand_range (rot_tv.tv_sec);
+       event_del (&ctx->rotate_ev);
+       event_add (&ctx->rotate_ev, &rot_tv);
+
+       kp = ctx->local_key;
+       ctx->local_key = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX,
+                       RSPAMD_CRYPTOBOX_MODE_25519);
+       rspamd_keypair_unref (kp);
+}
+
+void
+start_rspamd_proxy (struct rspamd_worker *worker)
+{
+       struct rspamd_proxy_ctx *ctx = worker->ctx;
+       struct timeval rot_tv;
+
+       ctx->ev_base = rspamd_prepare_worker (worker, "rspamd_proxy",
+                       proxy_accept_socket);
+
+       ctx->resolver = dns_resolver_init (worker->srv->logger,
+                       ctx->ev_base,
+                       worker->srv->cfg);
+       double_to_tv (ctx->timeout, &ctx->io_tv);
+       rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver);
+
+       rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
+                       ctx->ev_base, ctx->resolver->r);
+
+       /* XXX: stupid default */
+       ctx->keys_cache = rspamd_keypair_cache_new (256);
+       ctx->local_key = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX,
+                       RSPAMD_CRYPTOBOX_MODE_25519);
+
+       double_to_tv (ctx->rotate_tm, &rot_tv);
+       rot_tv.tv_sec += ottery_rand_range (rot_tv.tv_sec);
+       event_set (&ctx->rotate_ev, -1, EV_TIMEOUT, proxy_rotate_key, ctx);
+       event_base_set (ctx->ev_base, &ctx->rotate_ev);
+       event_add (&ctx->rotate_ev, &rot_tv);
+
+       event_base_loop (ctx->ev_base, 0);
+       rspamd_worker_block_signals ();
+
+       g_mime_shutdown ();
+       rspamd_log_close (worker->srv->logger);
+
+       if (ctx->key) {
+               rspamd_keypair_unref (ctx->key);
+       }
+
+       rspamd_keypair_cache_destroy (ctx->keys_cache);
+
+       exit (EXIT_SUCCESS);
+}