diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-05-06 13:28:56 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-05-06 13:28:56 +0100 |
commit | ad2f39226b42f7100899bd7f00f1b690bfc57083 (patch) | |
tree | b39a5477c82d510d02a6336925ffbd00e1d1f5c9 /src/rspamd_proxy.c | |
parent | e7434337e6282ab12fb5637124fb4a3773bfb755 (diff) | |
download | rspamd-ad2f39226b42f7100899bd7f00f1b690bfc57083.tar.gz rspamd-ad2f39226b42f7100899bd7f00f1b690bfc57083.zip |
[Rework] Rename http proxy to rspamd proxy
Diffstat (limited to 'src/rspamd_proxy.c')
-rw-r--r-- | src/rspamd_proxy.c | 505 |
1 files changed, 505 insertions, 0 deletions
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c new file mode 100644 index 000000000..f7b41907c --- /dev/null +++ b/src/rspamd_proxy.c @@ -0,0 +1,505 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "libutil/util.h" +#include "libutil/map.h" +#include "libutil/upstream.h" +#include "libserver/protocol.h" +#include "libserver/cfg_file.h" +#include "libserver/url.h" +#include "libserver/dns.h" +#include "libmime/message.h" +#include "rspamd.h" +#include "libserver/worker_util.h" +#include "keypairs_cache.h" +#include "ottery.h" +#include "unix-std.h" + +/* Rotate keys each minute by default */ +#define DEFAULT_ROTATION_TIME 60.0 + +gpointer init_rspamd_proxy (struct rspamd_config *cfg); +void start_rspamd_proxy (struct rspamd_worker *worker); + +worker_t rspamd_proxy_worker = { + "rspamd_proxy", /* Name */ + init_rspamd_proxy, /* Init function */ + start_rspamd_proxy, /* Start function */ + RSPAMD_WORKER_HAS_SOCKET | RSPAMD_WORKER_KILLABLE, + SOCK_STREAM, /* TCP socket */ + RSPAMD_WORKER_VER +}; + +struct rspamd_http_upstream { + gchar *name; + struct upstream_list *u; + struct rspamd_cryptobox_pubkey *key; +}; + +static const guint64 rspamd_rspamd_proxy_magic = 0xcdeb4fd1fc351980ULL; + +struct rspamd_proxy_ctx { + guint64 magic; + gdouble timeout; + struct timeval io_tv; + struct rspamd_config *cfg; + /* DNS resolver */ + struct rspamd_dns_resolver *resolver; + /* Events base */ + struct event_base *ev_base; + /* Encryption key for clients */ + struct rspamd_cryptobox_keypair *key; + /* Keys cache */ + struct rspamd_keypair_cache *keys_cache; + /* Upstreams to use */ + GHashTable *upstreams; + /* Default upstream */ + struct rspamd_http_upstream *default_upstream; + /* Local rotating keypair for upstreams */ + struct rspamd_cryptobox_keypair *local_key; + struct event rotate_ev; + gdouble rotate_tm; +}; + +struct rspamd_proxy_session { + struct rspamd_proxy_ctx *ctx; + struct event_base *ev_base; + struct rspamd_cryptobox_keypair *local_key; + struct rspamd_cryptobox_pubkey *remote_key; + struct upstream *up; + gint client_sock; + gint backend_sock; + rspamd_inet_addr_t *client_addr; + struct rspamd_http_connection *client_conn; + struct rspamd_http_connection *backend_conn; + struct rspamd_dns_resolver *resolver; + gboolean replied; +}; + +static GQuark +rspamd_proxy_quark (void) +{ + return g_quark_from_static_string ("http-proxy"); +} + +static gboolean +rspamd_proxy_parse_upstream (rspamd_mempool_t *pool, + const ucl_object_t *obj, + gpointer ud, + struct rspamd_rcl_section *section, + GError **err) +{ + const ucl_object_t *elt; + struct rspamd_http_upstream *up = NULL; + struct rspamd_proxy_ctx *ctx; + struct rspamd_rcl_struct_parser *pd = ud; + + ctx = pd->user_struct; + + if (ucl_object_type (obj) != UCL_OBJECT) { + g_set_error (err, rspamd_proxy_quark (), 100, + "upstream option must be an object"); + + return FALSE; + } + + elt = ucl_object_lookup (obj, "name"); + if (elt == NULL) { + g_set_error (err, rspamd_proxy_quark (), 100, + "upstream option must have some name definition"); + + return FALSE; + } + + up = g_slice_alloc0 (sizeof (*up)); + up->name = g_strdup (ucl_object_tostring (elt)); + + elt = ucl_object_lookup (obj, "key"); + if (elt != NULL) { + up->key = rspamd_pubkey_from_base32 (ucl_object_tostring (elt), 0, + RSPAMD_KEYPAIR_KEX, RSPAMD_CRYPTOBOX_MODE_25519); + + if (up->key == NULL) { + g_set_error (err, rspamd_proxy_quark (), 100, + "cannot read upstream key"); + + goto err; + } + } + + elt = ucl_object_lookup (obj, "hosts"); + + if (elt == NULL) { + g_set_error (err, rspamd_proxy_quark (), 100, + "upstream option must have some hosts definition"); + + goto err; + } + + up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx); + if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) { + g_set_error (err, rspamd_proxy_quark (), 100, + "upstream has bad hosts definition"); + + goto err; + } + + elt = ucl_object_lookup (obj, "default"); + if (elt && ucl_object_toboolean (elt)) { + ctx->default_upstream = up; + } + + g_hash_table_insert (ctx->upstreams, up->name, up); + + return TRUE; + +err: + + if (up) { + g_free (up->name); + rspamd_upstreams_destroy (up->u); + + if (up->key) { + rspamd_pubkey_unref (up->key); + } + + g_slice_free1 (sizeof (*up), up); + } + + return FALSE; +} + +gpointer +init_rspamd_proxy (struct rspamd_config *cfg) +{ + struct rspamd_proxy_ctx *ctx; + GQuark type; + + type = g_quark_try_string ("rspamd_proxy"); + + ctx = g_malloc0 (sizeof (struct rspamd_proxy_ctx)); + ctx->magic = rspamd_rspamd_proxy_magic; + ctx->timeout = 5.0; + ctx->upstreams = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal); + ctx->rotate_tm = DEFAULT_ROTATION_TIME; + ctx->cfg = cfg; + + rspamd_rcl_register_worker_option (cfg, + type, + "timeout", + rspamd_rcl_parse_struct_time, + ctx, + G_STRUCT_OFFSET (struct rspamd_proxy_ctx, + timeout), + RSPAMD_CL_FLAG_TIME_FLOAT, + "IO timeout"); + rspamd_rcl_register_worker_option (cfg, + type, + "rotate", + rspamd_rcl_parse_struct_time, + ctx, + G_STRUCT_OFFSET (struct rspamd_proxy_ctx, + rotate_tm), + RSPAMD_CL_FLAG_TIME_FLOAT, + "Rotation keys time, default: " + G_STRINGIFY (DEFAULT_ROTATION_TIME) " seconds"); + rspamd_rcl_register_worker_option (cfg, + type, + "keypair", + rspamd_rcl_parse_struct_keypair, + ctx, + G_STRUCT_OFFSET (struct rspamd_proxy_ctx, + key), + 0, + "Server's keypair"); + rspamd_rcl_register_worker_option (cfg, + type, + "upstream", + rspamd_proxy_parse_upstream, + ctx, + 0, + 0, + "List of upstreams"); + + return ctx; +} + +static void +proxy_session_cleanup (struct rspamd_proxy_session *session) +{ + rspamd_inet_address_destroy (session->client_addr); + + if (session->backend_conn) { + rspamd_http_connection_unref (session->backend_conn); + } + if (session->client_conn) { + rspamd_http_connection_unref (session->client_conn); + } + + close (session->backend_sock); + close (session->client_sock); + + g_slice_free1 (sizeof (*session), session); +} + +static void +proxy_client_write_error (struct rspamd_proxy_session *session, gint code) +{ + struct rspamd_http_message *reply; + + reply = rspamd_http_new_message (HTTP_RESPONSE); + reply->code = code; + rspamd_http_connection_write_message (session->client_conn, + reply, NULL, NULL, session, session->client_sock, + &session->ctx->io_tv, session->ev_base); +} + +static void +proxy_backend_error_handler (struct rspamd_http_connection *conn, GError *err) +{ + struct rspamd_proxy_session *session = conn->ud; + + msg_info ("abnormally closing connection from backend: %s, error: %s", + rspamd_inet_address_to_string (rspamd_upstream_addr (session->up)), + err->message); + rspamd_http_connection_reset (session->backend_conn); + /* Terminate session immediately */ + proxy_client_write_error (session, err->code); +} + +static gint +proxy_backend_finish_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg) +{ + struct rspamd_proxy_session *session = conn->ud; + + rspamd_http_connection_steal_msg (session->backend_conn); + rspamd_http_message_remove_header (msg, "Content-Length"); + rspamd_http_message_remove_header (msg, "Key"); + rspamd_http_connection_reset (session->backend_conn); + rspamd_http_connection_write_message (session->client_conn, + msg, NULL, NULL, session, session->client_sock, + &session->ctx->io_tv, session->ev_base); + + return 0; +} + +static void +proxy_client_error_handler (struct rspamd_http_connection *conn, GError *err) +{ + struct rspamd_proxy_session *session = conn->ud; + + msg_info ("abnormally closing connection from: %s, error: %s", + rspamd_inet_address_to_string (session->client_addr), err->message); + /* Terminate session immediately */ + proxy_session_cleanup (session); +} + +static gint +proxy_client_finish_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg) +{ + struct rspamd_proxy_session *session = conn->ud; + struct rspamd_http_upstream *backend = NULL; + const rspamd_ftok_t *host; + gchar hostbuf[512]; + + if (!session->replied) { + host = rspamd_http_message_find_header (msg, "Host"); + + if (host == NULL) { + backend = session->ctx->default_upstream; + } + else { + rspamd_strlcpy (hostbuf, host->begin, MIN(host->len + 1, sizeof (hostbuf))); + backend = g_hash_table_lookup (session->ctx->upstreams, hostbuf); + + if (backend == NULL) { + backend = session->ctx->default_upstream; + } + } + + if (backend == NULL) { + /* No backend */ + msg_err ("cannot find upstream for %s", host ? hostbuf : "default"); + goto err; + } + else { + session->up = rspamd_upstream_get (backend->u, + RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); + + if (session->up == NULL) { + msg_err ("cannot select upstream for %s", host ? hostbuf : "default"); + goto err; + } + + session->backend_sock = rspamd_inet_address_connect ( + rspamd_upstream_addr (session->up), SOCK_STREAM, TRUE); + + if (session->backend_sock == -1) { + msg_err ("cannot connect upstream for %s", host ? hostbuf : "default"); + rspamd_upstream_fail (session->up); + goto err; + } + + rspamd_http_connection_steal_msg (session->client_conn); + rspamd_http_message_remove_header (msg, "Content-Length"); + rspamd_http_message_remove_header (msg, "Key"); + rspamd_http_connection_reset (session->client_conn); + session->backend_conn = rspamd_http_connection_new ( + NULL, + proxy_backend_error_handler, + proxy_backend_finish_handler, + RSPAMD_HTTP_CLIENT_SIMPLE, + RSPAMD_HTTP_CLIENT, + session->ctx->keys_cache); + + rspamd_http_connection_set_key (session->backend_conn, + session->ctx->local_key); + msg->peer_key = rspamd_pubkey_ref (backend->key); + session->replied = TRUE; + + rspamd_http_connection_write_message (session->backend_conn, + msg, NULL, NULL, session, session->backend_sock, + &session->ctx->io_tv, session->ev_base); + } + } + else { + proxy_session_cleanup (session); + } + + return 0; + +err: + session->replied = TRUE; + proxy_client_write_error (session, 404); + + return 0; +} + +static void +proxy_accept_socket (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *) arg; + struct rspamd_proxy_ctx *ctx; + rspamd_inet_addr_t *addr; + struct rspamd_proxy_session *session; + gint nfd; + + ctx = worker->ctx; + + if ((nfd = + rspamd_accept_from_socket (fd, &addr)) == -1) { + msg_warn ("accept failed: %s", strerror (errno)); + return; + } + /* Check for EAGAIN */ + if (nfd == 0) { + return; + } + + msg_info ("accepted connection from %s port %d", + rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + + session = g_slice_alloc0 (sizeof (*session)); + session->client_sock = nfd; + session->client_addr = addr; + + session->resolver = ctx->resolver; + + session->client_conn = rspamd_http_connection_new ( + NULL, + proxy_client_error_handler, + proxy_client_finish_handler, + 0, + RSPAMD_HTTP_SERVER, + ctx->keys_cache); + session->ev_base = ctx->ev_base; + session->ctx = ctx; + + if (ctx->key) { + rspamd_http_connection_set_key (session->client_conn, ctx->key); + } + + rspamd_http_connection_read_message (session->client_conn, + session, + nfd, + &ctx->io_tv, + ctx->ev_base); +} + +static void +proxy_rotate_key (gint fd, short what, void *arg) +{ + struct timeval rot_tv; + struct rspamd_proxy_ctx *ctx = arg; + gpointer kp; + + double_to_tv (ctx->rotate_tm, &rot_tv); + rot_tv.tv_sec += ottery_rand_range (rot_tv.tv_sec); + event_del (&ctx->rotate_ev); + event_add (&ctx->rotate_ev, &rot_tv); + + kp = ctx->local_key; + ctx->local_key = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX, + RSPAMD_CRYPTOBOX_MODE_25519); + rspamd_keypair_unref (kp); +} + +void +start_rspamd_proxy (struct rspamd_worker *worker) +{ + struct rspamd_proxy_ctx *ctx = worker->ctx; + struct timeval rot_tv; + + ctx->ev_base = rspamd_prepare_worker (worker, "rspamd_proxy", + proxy_accept_socket); + + ctx->resolver = dns_resolver_init (worker->srv->logger, + ctx->ev_base, + worker->srv->cfg); + double_to_tv (ctx->timeout, &ctx->io_tv); + rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver); + + rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx, + ctx->ev_base, ctx->resolver->r); + + /* XXX: stupid default */ + ctx->keys_cache = rspamd_keypair_cache_new (256); + ctx->local_key = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX, + RSPAMD_CRYPTOBOX_MODE_25519); + + double_to_tv (ctx->rotate_tm, &rot_tv); + rot_tv.tv_sec += ottery_rand_range (rot_tv.tv_sec); + event_set (&ctx->rotate_ev, -1, EV_TIMEOUT, proxy_rotate_key, ctx); + event_base_set (ctx->ev_base, &ctx->rotate_ev); + event_add (&ctx->rotate_ev, &rot_tv); + + event_base_loop (ctx->ev_base, 0); + rspamd_worker_block_signals (); + + g_mime_shutdown (); + rspamd_log_close (worker->srv->logger); + + if (ctx->key) { + rspamd_keypair_unref (ctx->key); + } + + rspamd_keypair_cache_destroy (ctx->keys_cache); + + exit (EXIT_SUCCESS); +} |