rspamd.c
smtp_proxy.c
worker.c
- http_proxy.c
+ rspamd_proxy.c
log_helper.c)
SET(PLUGINSSRC plugins/surbl.c
plugins/fuzzy_check.c
plugins/spf.c
plugins/dkim_check.c
- libserver/rspamd_control.c lua/lua_fann.c)
+ libserver/rspamd_control.c
+ lua/lua_fann.c)
SET(MODULES_LIST surbl regexp chartable fuzzy_check spf dkim)
-SET(WORKERS_LIST normal controller smtp_proxy fuzzy lua http_proxy log_helper)
+SET(WORKERS_LIST normal controller smtp_proxy fuzzy lua rspamd_proxy log_helper)
IF (ENABLE_HYPERSCAN MATCHES "ON")
LIST(APPEND WORKERS_LIST "hs_helper")
LIST(APPEND RSPAMDSRC "hs_helper.c")
+++ /dev/null
-/*-
- * Copyright 2016 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "config.h"
-#include "libutil/util.h"
-#include "libutil/map.h"
-#include "libutil/upstream.h"
-#include "libserver/protocol.h"
-#include "libserver/cfg_file.h"
-#include "libserver/url.h"
-#include "libserver/dns.h"
-#include "libmime/message.h"
-#include "rspamd.h"
-#include "libserver/worker_util.h"
-#include "keypairs_cache.h"
-#include "ottery.h"
-#include "unix-std.h"
-
-/* Rotate keys each minute by default */
-#define DEFAULT_ROTATION_TIME 60.0
-
-gpointer init_http_proxy (struct rspamd_config *cfg);
-void start_http_proxy (struct rspamd_worker *worker);
-
-worker_t http_proxy_worker = {
- "http_proxy", /* Name */
- init_http_proxy, /* Init function */
- start_http_proxy, /* Start function */
- RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE,
- SOCK_STREAM, /* TCP socket */
- RSPAMD_WORKER_VER
-};
-
-struct rspamd_http_upstream {
- gchar *name;
- struct upstream_list *u;
- struct rspamd_cryptobox_pubkey *key;
-};
-
-static const guint64 rspamd_http_proxy_magic = 0xcdeb4fd1fc351980ULL;
-
-struct http_proxy_ctx {
- guint64 magic;
- gdouble timeout;
- struct timeval io_tv;
- struct rspamd_config *cfg;
- /* DNS resolver */
- struct rspamd_dns_resolver *resolver;
- /* Events base */
- struct event_base *ev_base;
- /* Encryption key for clients */
- struct rspamd_cryptobox_keypair *key;
- /* Keys cache */
- struct rspamd_keypair_cache *keys_cache;
- /* Upstreams to use */
- GHashTable *upstreams;
- /* Default upstream */
- struct rspamd_http_upstream *default_upstream;
- /* Local rotating keypair for upstreams */
- struct rspamd_cryptobox_keypair *local_key;
- struct event rotate_ev;
- gdouble rotate_tm;
-};
-
-struct http_proxy_session {
- struct http_proxy_ctx *ctx;
- struct event_base *ev_base;
- struct rspamd_cryptobox_keypair *local_key;
- struct rspamd_cryptobox_pubkey *remote_key;
- struct upstream *up;
- gint client_sock;
- gint backend_sock;
- rspamd_inet_addr_t *client_addr;
- struct rspamd_http_connection *client_conn;
- struct rspamd_http_connection *backend_conn;
- struct rspamd_dns_resolver *resolver;
- gboolean replied;
-};
-
-static GQuark
-http_proxy_quark (void)
-{
- return g_quark_from_static_string ("http-proxy");
-}
-
-static gboolean
-http_proxy_parse_upstream (rspamd_mempool_t *pool,
- const ucl_object_t *obj,
- gpointer ud,
- struct rspamd_rcl_section *section,
- GError **err)
-{
- const ucl_object_t *elt;
- struct rspamd_http_upstream *up = NULL;
- struct http_proxy_ctx *ctx;
- struct rspamd_rcl_struct_parser *pd = ud;
-
- ctx = pd->user_struct;
-
- if (ucl_object_type (obj) != UCL_OBJECT) {
- g_set_error (err, http_proxy_quark (), 100,
- "upstream option must be an object");
-
- return FALSE;
- }
-
- elt = ucl_object_lookup (obj, "name");
- if (elt == NULL) {
- g_set_error (err, http_proxy_quark (), 100,
- "upstream option must have some name definition");
-
- return FALSE;
- }
-
- up = g_slice_alloc0 (sizeof (*up));
- up->name = g_strdup (ucl_object_tostring (elt));
-
- elt = ucl_object_lookup (obj, "key");
- if (elt != NULL) {
- up->key = rspamd_pubkey_from_base32 (ucl_object_tostring (elt), 0,
- RSPAMD_KEYPAIR_KEX, RSPAMD_CRYPTOBOX_MODE_25519);
-
- if (up->key == NULL) {
- g_set_error (err, http_proxy_quark (), 100,
- "cannot read upstream key");
-
- goto err;
- }
- }
-
- elt = ucl_object_lookup (obj, "hosts");
-
- if (elt == NULL) {
- g_set_error (err, http_proxy_quark (), 100,
- "upstream option must have some hosts definition");
-
- goto err;
- }
-
- up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx);
- if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) {
- g_set_error (err, http_proxy_quark (), 100,
- "upstream has bad hosts definition");
-
- goto err;
- }
-
- elt = ucl_object_lookup (obj, "default");
- if (elt && ucl_object_toboolean (elt)) {
- ctx->default_upstream = up;
- }
-
- g_hash_table_insert (ctx->upstreams, up->name, up);
-
- return TRUE;
-
-err:
-
- if (up) {
- g_free (up->name);
- rspamd_upstreams_destroy (up->u);
-
- if (up->key) {
- rspamd_pubkey_unref (up->key);
- }
-
- g_slice_free1 (sizeof (*up), up);
- }
-
- return FALSE;
-}
-
-gpointer
-init_http_proxy (struct rspamd_config *cfg)
-{
- struct http_proxy_ctx *ctx;
- GQuark type;
-
- type = g_quark_try_string ("http_proxy");
-
- ctx = g_malloc0 (sizeof (struct http_proxy_ctx));
- ctx->magic = rspamd_http_proxy_magic;
- ctx->timeout = 5.0;
- ctx->upstreams = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
- ctx->rotate_tm = DEFAULT_ROTATION_TIME;
- ctx->cfg = cfg;
-
- rspamd_rcl_register_worker_option (cfg,
- type,
- "timeout",
- rspamd_rcl_parse_struct_time,
- ctx,
- G_STRUCT_OFFSET (struct http_proxy_ctx,
- timeout),
- RSPAMD_CL_FLAG_TIME_FLOAT,
- "IO timeout");
- rspamd_rcl_register_worker_option (cfg,
- type,
- "rotate",
- rspamd_rcl_parse_struct_time,
- ctx,
- G_STRUCT_OFFSET (struct http_proxy_ctx,
- rotate_tm),
- RSPAMD_CL_FLAG_TIME_FLOAT,
- "Rotation keys time, default: "
- G_STRINGIFY (DEFAULT_ROTATION_TIME) " seconds");
- rspamd_rcl_register_worker_option (cfg,
- type,
- "keypair",
- rspamd_rcl_parse_struct_keypair,
- ctx,
- G_STRUCT_OFFSET (struct http_proxy_ctx,
- key),
- 0,
- "Server's keypair");
- rspamd_rcl_register_worker_option (cfg,
- type,
- "upstream",
- http_proxy_parse_upstream,
- ctx,
- 0,
- 0,
- "List of upstreams");
-
- return ctx;
-}
-
-static void
-proxy_session_cleanup (struct http_proxy_session *session)
-{
- rspamd_inet_address_destroy (session->client_addr);
-
- if (session->backend_conn) {
- rspamd_http_connection_unref (session->backend_conn);
- }
- if (session->client_conn) {
- rspamd_http_connection_unref (session->client_conn);
- }
-
- close (session->backend_sock);
- close (session->client_sock);
-
- g_slice_free1 (sizeof (*session), session);
-}
-
-static void
-proxy_client_write_error (struct http_proxy_session *session, gint code)
-{
- struct rspamd_http_message *reply;
-
- reply = rspamd_http_new_message (HTTP_RESPONSE);
- reply->code = code;
- rspamd_http_connection_write_message (session->client_conn,
- reply, NULL, NULL, session, session->client_sock,
- &session->ctx->io_tv, session->ev_base);
-}
-
-static void
-proxy_backend_error_handler (struct rspamd_http_connection *conn, GError *err)
-{
- struct http_proxy_session *session = conn->ud;
-
- msg_info ("abnormally closing connection from backend: %s, error: %s",
- rspamd_inet_address_to_string (rspamd_upstream_addr (session->up)),
- err->message);
- rspamd_http_connection_reset (session->backend_conn);
- /* Terminate session immediately */
- proxy_client_write_error (session, err->code);
-}
-
-static gint
-proxy_backend_finish_handler (struct rspamd_http_connection *conn,
- struct rspamd_http_message *msg)
-{
- struct http_proxy_session *session = conn->ud;
-
- rspamd_http_connection_steal_msg (session->backend_conn);
- rspamd_http_message_remove_header (msg, "Content-Length");
- rspamd_http_message_remove_header (msg, "Key");
- rspamd_http_connection_reset (session->backend_conn);
- rspamd_http_connection_write_message (session->client_conn,
- msg, NULL, NULL, session, session->client_sock,
- &session->ctx->io_tv, session->ev_base);
-
- return 0;
-}
-
-static void
-proxy_client_error_handler (struct rspamd_http_connection *conn, GError *err)
-{
- struct http_proxy_session *session = conn->ud;
-
- msg_info ("abnormally closing connection from: %s, error: %s",
- rspamd_inet_address_to_string (session->client_addr), err->message);
- /* Terminate session immediately */
- proxy_session_cleanup (session);
-}
-
-static gint
-proxy_client_finish_handler (struct rspamd_http_connection *conn,
- struct rspamd_http_message *msg)
-{
- struct http_proxy_session *session = conn->ud;
- struct rspamd_http_upstream *backend = NULL;
- const rspamd_ftok_t *host;
- gchar hostbuf[512];
-
- if (!session->replied) {
- host = rspamd_http_message_find_header (msg, "Host");
-
- if (host == NULL) {
- backend = session->ctx->default_upstream;
- }
- else {
- rspamd_strlcpy (hostbuf, host->begin, MIN(host->len + 1, sizeof (hostbuf)));
- backend = g_hash_table_lookup (session->ctx->upstreams, hostbuf);
-
- if (backend == NULL) {
- backend = session->ctx->default_upstream;
- }
- }
-
- if (backend == NULL) {
- /* No backend */
- msg_err ("cannot find upstream for %s", host ? hostbuf : "default");
- goto err;
- }
- else {
- session->up = rspamd_upstream_get (backend->u,
- RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
-
- if (session->up == NULL) {
- msg_err ("cannot select upstream for %s", host ? hostbuf : "default");
- goto err;
- }
-
- session->backend_sock = rspamd_inet_address_connect (
- rspamd_upstream_addr (session->up), SOCK_STREAM, TRUE);
-
- if (session->backend_sock == -1) {
- msg_err ("cannot connect upstream for %s", host ? hostbuf : "default");
- rspamd_upstream_fail (session->up);
- goto err;
- }
-
- rspamd_http_connection_steal_msg (session->client_conn);
- rspamd_http_message_remove_header (msg, "Content-Length");
- rspamd_http_message_remove_header (msg, "Key");
- rspamd_http_connection_reset (session->client_conn);
- session->backend_conn = rspamd_http_connection_new (
- NULL,
- proxy_backend_error_handler,
- proxy_backend_finish_handler,
- RSPAMD_HTTP_CLIENT_SIMPLE,
- RSPAMD_HTTP_CLIENT,
- session->ctx->keys_cache);
-
- rspamd_http_connection_set_key (session->backend_conn,
- session->ctx->local_key);
- msg->peer_key = rspamd_pubkey_ref (backend->key);
- session->replied = TRUE;
-
- rspamd_http_connection_write_message (session->backend_conn,
- msg, NULL, NULL, session, session->backend_sock,
- &session->ctx->io_tv, session->ev_base);
- }
- }
- else {
- proxy_session_cleanup (session);
- }
-
- return 0;
-
-err:
- session->replied = TRUE;
- proxy_client_write_error (session, 404);
-
- return 0;
-}
-
-static void
-proxy_accept_socket (gint fd, short what, void *arg)
-{
- struct rspamd_worker *worker = (struct rspamd_worker *) arg;
- struct http_proxy_ctx *ctx;
- rspamd_inet_addr_t *addr;
- struct http_proxy_session *session;
- gint nfd;
-
- ctx = worker->ctx;
-
- if ((nfd =
- rspamd_accept_from_socket (fd, &addr)) == -1) {
- msg_warn ("accept failed: %s", strerror (errno));
- return;
- }
- /* Check for EAGAIN */
- if (nfd == 0) {
- return;
- }
-
- msg_info ("accepted connection from %s port %d",
- rspamd_inet_address_to_string (addr),
- rspamd_inet_address_get_port (addr));
-
- session = g_slice_alloc0 (sizeof (*session));
- session->client_sock = nfd;
- session->client_addr = addr;
-
- session->resolver = ctx->resolver;
-
- session->client_conn = rspamd_http_connection_new (
- NULL,
- proxy_client_error_handler,
- proxy_client_finish_handler,
- 0,
- RSPAMD_HTTP_SERVER,
- ctx->keys_cache);
- session->ev_base = ctx->ev_base;
- session->ctx = ctx;
-
- if (ctx->key) {
- rspamd_http_connection_set_key (session->client_conn, ctx->key);
- }
-
- rspamd_http_connection_read_message (session->client_conn,
- session,
- nfd,
- &ctx->io_tv,
- ctx->ev_base);
-}
-
-static void
-proxy_rotate_key (gint fd, short what, void *arg)
-{
- struct timeval rot_tv;
- struct http_proxy_ctx *ctx = arg;
- gpointer kp;
-
- double_to_tv (ctx->rotate_tm, &rot_tv);
- rot_tv.tv_sec += ottery_rand_range (rot_tv.tv_sec);
- event_del (&ctx->rotate_ev);
- event_add (&ctx->rotate_ev, &rot_tv);
-
- kp = ctx->local_key;
- ctx->local_key = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX,
- RSPAMD_CRYPTOBOX_MODE_25519);
- rspamd_keypair_unref (kp);
-}
-
-void
-start_http_proxy (struct rspamd_worker *worker)
-{
- struct http_proxy_ctx *ctx = worker->ctx;
- struct timeval rot_tv;
-
- ctx->ev_base = rspamd_prepare_worker (worker, "http_proxy",
- proxy_accept_socket);
-
- ctx->resolver = dns_resolver_init (worker->srv->logger,
- ctx->ev_base,
- worker->srv->cfg);
- double_to_tv (ctx->timeout, &ctx->io_tv);
- rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver);
-
- rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
- ctx->ev_base, ctx->resolver->r);
-
- /* XXX: stupid default */
- ctx->keys_cache = rspamd_keypair_cache_new (256);
- ctx->local_key = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX,
- RSPAMD_CRYPTOBOX_MODE_25519);
-
- double_to_tv (ctx->rotate_tm, &rot_tv);
- rot_tv.tv_sec += ottery_rand_range (rot_tv.tv_sec);
- event_set (&ctx->rotate_ev, -1, EV_TIMEOUT, proxy_rotate_key, ctx);
- event_base_set (ctx->ev_base, &ctx->rotate_ev);
- event_add (&ctx->rotate_ev, &rot_tv);
-
- event_base_loop (ctx->ev_base, 0);
- rspamd_worker_block_signals ();
-
- g_mime_shutdown ();
- rspamd_log_close (worker->srv->logger);
-
- if (ctx->key) {
- rspamd_keypair_unref (ctx->key);
- }
-
- rspamd_keypair_cache_destroy (ctx->keys_cache);
-
- exit (EXIT_SUCCESS);
-}
-
${CMAKE_SOURCE_DIR}/src/lua_worker.c
${CMAKE_SOURCE_DIR}/src/smtp_proxy.c
${CMAKE_SOURCE_DIR}/src/worker.c
- ${CMAKE_SOURCE_DIR}/src/http_proxy.c
+ ${CMAKE_SOURCE_DIR}/src/rspamd_proxy.c
${CMAKE_SOURCE_DIR}/src/log_helper.c)
SET(RSPAMADMLUASRC
${CMAKE_CURRENT_SOURCE_DIR}/fuzzy_stat.lua
--- /dev/null
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "libutil/util.h"
+#include "libutil/map.h"
+#include "libutil/upstream.h"
+#include "libserver/protocol.h"
+#include "libserver/cfg_file.h"
+#include "libserver/url.h"
+#include "libserver/dns.h"
+#include "libmime/message.h"
+#include "rspamd.h"
+#include "libserver/worker_util.h"
+#include "keypairs_cache.h"
+#include "ottery.h"
+#include "unix-std.h"
+
+/* Rotate keys each minute by default */
+#define DEFAULT_ROTATION_TIME 60.0
+
+gpointer init_rspamd_proxy (struct rspamd_config *cfg);
+void start_rspamd_proxy (struct rspamd_worker *worker);
+
+worker_t rspamd_proxy_worker = {
+ "rspamd_proxy", /* Name */
+ init_rspamd_proxy, /* Init function */
+ start_rspamd_proxy, /* Start function */
+ RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE,
+ SOCK_STREAM, /* TCP socket */
+ RSPAMD_WORKER_VER
+};
+
+struct rspamd_http_upstream {
+ gchar *name;
+ struct upstream_list *u;
+ struct rspamd_cryptobox_pubkey *key;
+};
+
+static const guint64 rspamd_rspamd_proxy_magic = 0xcdeb4fd1fc351980ULL;
+
+struct rspamd_proxy_ctx {
+ guint64 magic;
+ gdouble timeout;
+ struct timeval io_tv;
+ struct rspamd_config *cfg;
+ /* DNS resolver */
+ struct rspamd_dns_resolver *resolver;
+ /* Events base */
+ struct event_base *ev_base;
+ /* Encryption key for clients */
+ struct rspamd_cryptobox_keypair *key;
+ /* Keys cache */
+ struct rspamd_keypair_cache *keys_cache;
+ /* Upstreams to use */
+ GHashTable *upstreams;
+ /* Default upstream */
+ struct rspamd_http_upstream *default_upstream;
+ /* Local rotating keypair for upstreams */
+ struct rspamd_cryptobox_keypair *local_key;
+ struct event rotate_ev;
+ gdouble rotate_tm;
+};
+
+struct rspamd_proxy_session {
+ struct rspamd_proxy_ctx *ctx;
+ struct event_base *ev_base;
+ struct rspamd_cryptobox_keypair *local_key;
+ struct rspamd_cryptobox_pubkey *remote_key;
+ struct upstream *up;
+ gint client_sock;
+ gint backend_sock;
+ rspamd_inet_addr_t *client_addr;
+ struct rspamd_http_connection *client_conn;
+ struct rspamd_http_connection *backend_conn;
+ struct rspamd_dns_resolver *resolver;
+ gboolean replied;
+};
+
+static GQuark
+rspamd_proxy_quark (void)
+{
+ return g_quark_from_static_string ("http-proxy");
+}
+
+static gboolean
+rspamd_proxy_parse_upstream (rspamd_mempool_t *pool,
+ const ucl_object_t *obj,
+ gpointer ud,
+ struct rspamd_rcl_section *section,
+ GError **err)
+{
+ const ucl_object_t *elt;
+ struct rspamd_http_upstream *up = NULL;
+ struct rspamd_proxy_ctx *ctx;
+ struct rspamd_rcl_struct_parser *pd = ud;
+
+ ctx = pd->user_struct;
+
+ if (ucl_object_type (obj) != UCL_OBJECT) {
+ g_set_error (err, rspamd_proxy_quark (), 100,
+ "upstream option must be an object");
+
+ return FALSE;
+ }
+
+ elt = ucl_object_lookup (obj, "name");
+ if (elt == NULL) {
+ g_set_error (err, rspamd_proxy_quark (), 100,
+ "upstream option must have some name definition");
+
+ return FALSE;
+ }
+
+ up = g_slice_alloc0 (sizeof (*up));
+ up->name = g_strdup (ucl_object_tostring (elt));
+
+ elt = ucl_object_lookup (obj, "key");
+ if (elt != NULL) {
+ up->key = rspamd_pubkey_from_base32 (ucl_object_tostring (elt), 0,
+ RSPAMD_KEYPAIR_KEX, RSPAMD_CRYPTOBOX_MODE_25519);
+
+ if (up->key == NULL) {
+ g_set_error (err, rspamd_proxy_quark (), 100,
+ "cannot read upstream key");
+
+ goto err;
+ }
+ }
+
+ elt = ucl_object_lookup (obj, "hosts");
+
+ if (elt == NULL) {
+ g_set_error (err, rspamd_proxy_quark (), 100,
+ "upstream option must have some hosts definition");
+
+ goto err;
+ }
+
+ up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx);
+ if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) {
+ g_set_error (err, rspamd_proxy_quark (), 100,
+ "upstream has bad hosts definition");
+
+ goto err;
+ }
+
+ elt = ucl_object_lookup (obj, "default");
+ if (elt && ucl_object_toboolean (elt)) {
+ ctx->default_upstream = up;
+ }
+
+ g_hash_table_insert (ctx->upstreams, up->name, up);
+
+ return TRUE;
+
+err:
+
+ if (up) {
+ g_free (up->name);
+ rspamd_upstreams_destroy (up->u);
+
+ if (up->key) {
+ rspamd_pubkey_unref (up->key);
+ }
+
+ g_slice_free1 (sizeof (*up), up);
+ }
+
+ return FALSE;
+}
+
+gpointer
+init_rspamd_proxy (struct rspamd_config *cfg)
+{
+ struct rspamd_proxy_ctx *ctx;
+ GQuark type;
+
+ type = g_quark_try_string ("rspamd_proxy");
+
+ ctx = g_malloc0 (sizeof (struct rspamd_proxy_ctx));
+ ctx->magic = rspamd_rspamd_proxy_magic;
+ ctx->timeout = 5.0;
+ ctx->upstreams = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
+ ctx->rotate_tm = DEFAULT_ROTATION_TIME;
+ ctx->cfg = cfg;
+
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "timeout",
+ rspamd_rcl_parse_struct_time,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_proxy_ctx,
+ timeout),
+ RSPAMD_CL_FLAG_TIME_FLOAT,
+ "IO timeout");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "rotate",
+ rspamd_rcl_parse_struct_time,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_proxy_ctx,
+ rotate_tm),
+ RSPAMD_CL_FLAG_TIME_FLOAT,
+ "Rotation keys time, default: "
+ G_STRINGIFY (DEFAULT_ROTATION_TIME) " seconds");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "keypair",
+ rspamd_rcl_parse_struct_keypair,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_proxy_ctx,
+ key),
+ 0,
+ "Server's keypair");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "upstream",
+ rspamd_proxy_parse_upstream,
+ ctx,
+ 0,
+ 0,
+ "List of upstreams");
+
+ return ctx;
+}
+
+static void
+proxy_session_cleanup (struct rspamd_proxy_session *session)
+{
+ rspamd_inet_address_destroy (session->client_addr);
+
+ if (session->backend_conn) {
+ rspamd_http_connection_unref (session->backend_conn);
+ }
+ if (session->client_conn) {
+ rspamd_http_connection_unref (session->client_conn);
+ }
+
+ close (session->backend_sock);
+ close (session->client_sock);
+
+ g_slice_free1 (sizeof (*session), session);
+}
+
+static void
+proxy_client_write_error (struct rspamd_proxy_session *session, gint code)
+{
+ struct rspamd_http_message *reply;
+
+ reply = rspamd_http_new_message (HTTP_RESPONSE);
+ reply->code = code;
+ rspamd_http_connection_write_message (session->client_conn,
+ reply, NULL, NULL, session, session->client_sock,
+ &session->ctx->io_tv, session->ev_base);
+}
+
+static void
+proxy_backend_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+ struct rspamd_proxy_session *session = conn->ud;
+
+ msg_info ("abnormally closing connection from backend: %s, error: %s",
+ rspamd_inet_address_to_string (rspamd_upstream_addr (session->up)),
+ err->message);
+ rspamd_http_connection_reset (session->backend_conn);
+ /* Terminate session immediately */
+ proxy_client_write_error (session, err->code);
+}
+
+static gint
+proxy_backend_finish_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg)
+{
+ struct rspamd_proxy_session *session = conn->ud;
+
+ rspamd_http_connection_steal_msg (session->backend_conn);
+ rspamd_http_message_remove_header (msg, "Content-Length");
+ rspamd_http_message_remove_header (msg, "Key");
+ rspamd_http_connection_reset (session->backend_conn);
+ rspamd_http_connection_write_message (session->client_conn,
+ msg, NULL, NULL, session, session->client_sock,
+ &session->ctx->io_tv, session->ev_base);
+
+ return 0;
+}
+
+static void
+proxy_client_error_handler (struct rspamd_http_connection *conn, GError *err)
+{
+ struct rspamd_proxy_session *session = conn->ud;
+
+ msg_info ("abnormally closing connection from: %s, error: %s",
+ rspamd_inet_address_to_string (session->client_addr), err->message);
+ /* Terminate session immediately */
+ proxy_session_cleanup (session);
+}
+
+static gint
+proxy_client_finish_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg)
+{
+ struct rspamd_proxy_session *session = conn->ud;
+ struct rspamd_http_upstream *backend = NULL;
+ const rspamd_ftok_t *host;
+ gchar hostbuf[512];
+
+ if (!session->replied) {
+ host = rspamd_http_message_find_header (msg, "Host");
+
+ if (host == NULL) {
+ backend = session->ctx->default_upstream;
+ }
+ else {
+ rspamd_strlcpy (hostbuf, host->begin, MIN(host->len + 1, sizeof (hostbuf)));
+ backend = g_hash_table_lookup (session->ctx->upstreams, hostbuf);
+
+ if (backend == NULL) {
+ backend = session->ctx->default_upstream;
+ }
+ }
+
+ if (backend == NULL) {
+ /* No backend */
+ msg_err ("cannot find upstream for %s", host ? hostbuf : "default");
+ goto err;
+ }
+ else {
+ session->up = rspamd_upstream_get (backend->u,
+ RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
+
+ if (session->up == NULL) {
+ msg_err ("cannot select upstream for %s", host ? hostbuf : "default");
+ goto err;
+ }
+
+ session->backend_sock = rspamd_inet_address_connect (
+ rspamd_upstream_addr (session->up), SOCK_STREAM, TRUE);
+
+ if (session->backend_sock == -1) {
+ msg_err ("cannot connect upstream for %s", host ? hostbuf : "default");
+ rspamd_upstream_fail (session->up);
+ goto err;
+ }
+
+ rspamd_http_connection_steal_msg (session->client_conn);
+ rspamd_http_message_remove_header (msg, "Content-Length");
+ rspamd_http_message_remove_header (msg, "Key");
+ rspamd_http_connection_reset (session->client_conn);
+ session->backend_conn = rspamd_http_connection_new (
+ NULL,
+ proxy_backend_error_handler,
+ proxy_backend_finish_handler,
+ RSPAMD_HTTP_CLIENT_SIMPLE,
+ RSPAMD_HTTP_CLIENT,
+ session->ctx->keys_cache);
+
+ rspamd_http_connection_set_key (session->backend_conn,
+ session->ctx->local_key);
+ msg->peer_key = rspamd_pubkey_ref (backend->key);
+ session->replied = TRUE;
+
+ rspamd_http_connection_write_message (session->backend_conn,
+ msg, NULL, NULL, session, session->backend_sock,
+ &session->ctx->io_tv, session->ev_base);
+ }
+ }
+ else {
+ proxy_session_cleanup (session);
+ }
+
+ return 0;
+
+err:
+ session->replied = TRUE;
+ proxy_client_write_error (session, 404);
+
+ return 0;
+}
+
+static void
+proxy_accept_socket (gint fd, short what, void *arg)
+{
+ struct rspamd_worker *worker = (struct rspamd_worker *) arg;
+ struct rspamd_proxy_ctx *ctx;
+ rspamd_inet_addr_t *addr;
+ struct rspamd_proxy_session *session;
+ gint nfd;
+
+ ctx = worker->ctx;
+
+ if ((nfd =
+ rspamd_accept_from_socket (fd, &addr)) == -1) {
+ msg_warn ("accept failed: %s", strerror (errno));
+ return;
+ }
+ /* Check for EAGAIN */
+ if (nfd == 0) {
+ return;
+ }
+
+ msg_info ("accepted connection from %s port %d",
+ rspamd_inet_address_to_string (addr),
+ rspamd_inet_address_get_port (addr));
+
+ session = g_slice_alloc0 (sizeof (*session));
+ session->client_sock = nfd;
+ session->client_addr = addr;
+
+ session->resolver = ctx->resolver;
+
+ session->client_conn = rspamd_http_connection_new (
+ NULL,
+ proxy_client_error_handler,
+ proxy_client_finish_handler,
+ 0,
+ RSPAMD_HTTP_SERVER,
+ ctx->keys_cache);
+ session->ev_base = ctx->ev_base;
+ session->ctx = ctx;
+
+ if (ctx->key) {
+ rspamd_http_connection_set_key (session->client_conn, ctx->key);
+ }
+
+ rspamd_http_connection_read_message (session->client_conn,
+ session,
+ nfd,
+ &ctx->io_tv,
+ ctx->ev_base);
+}
+
+static void
+proxy_rotate_key (gint fd, short what, void *arg)
+{
+ struct timeval rot_tv;
+ struct rspamd_proxy_ctx *ctx = arg;
+ gpointer kp;
+
+ double_to_tv (ctx->rotate_tm, &rot_tv);
+ rot_tv.tv_sec += ottery_rand_range (rot_tv.tv_sec);
+ event_del (&ctx->rotate_ev);
+ event_add (&ctx->rotate_ev, &rot_tv);
+
+ kp = ctx->local_key;
+ ctx->local_key = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX,
+ RSPAMD_CRYPTOBOX_MODE_25519);
+ rspamd_keypair_unref (kp);
+}
+
+void
+start_rspamd_proxy (struct rspamd_worker *worker)
+{
+ struct rspamd_proxy_ctx *ctx = worker->ctx;
+ struct timeval rot_tv;
+
+ ctx->ev_base = rspamd_prepare_worker (worker, "rspamd_proxy",
+ proxy_accept_socket);
+
+ ctx->resolver = dns_resolver_init (worker->srv->logger,
+ ctx->ev_base,
+ worker->srv->cfg);
+ double_to_tv (ctx->timeout, &ctx->io_tv);
+ rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver);
+
+ rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
+ ctx->ev_base, ctx->resolver->r);
+
+ /* XXX: stupid default */
+ ctx->keys_cache = rspamd_keypair_cache_new (256);
+ ctx->local_key = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX,
+ RSPAMD_CRYPTOBOX_MODE_25519);
+
+ double_to_tv (ctx->rotate_tm, &rot_tv);
+ rot_tv.tv_sec += ottery_rand_range (rot_tv.tv_sec);
+ event_set (&ctx->rotate_ev, -1, EV_TIMEOUT, proxy_rotate_key, ctx);
+ event_base_set (ctx->ev_base, &ctx->rotate_ev);
+ event_add (&ctx->rotate_ev, &rot_tv);
+
+ event_base_loop (ctx->ev_base, 0);
+ rspamd_worker_block_signals ();
+
+ g_mime_shutdown ();
+ rspamd_log_close (worker->srv->logger);
+
+ if (ctx->key) {
+ rspamd_keypair_unref (ctx->key);
+ }
+
+ rspamd_keypair_cache_destroy (ctx->keys_cache);
+
+ exit (EXIT_SUCCESS);
+}