From ce904384d6711d0d04b43e051c597a11d74ffc36 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Fri, 6 May 2016 15:09:21 +0100 Subject: [PATCH] [Feature] Add mirrors feature --- src/rspamd_proxy.c | 176 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index 79adc93c7..626cfdee1 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -24,6 +24,7 @@ #include "libmime/message.h" #include "rspamd.h" #include "libserver/worker_util.h" +#include "lua/lua_common.h" #include "keypairs_cache.h" #include "ottery.h" #include "unix-std.h" @@ -66,6 +67,14 @@ struct rspamd_http_upstream { struct rspamd_cryptobox_pubkey *key; }; +struct rspamd_http_mirror { + gchar *name; + struct upstream_list *u; + struct rspamd_cryptobox_pubkey *key; + gdouble prob; + gint parser_ref; +}; + static const guint64 rspamd_rspamd_proxy_magic = 0xcdeb4fd1fc351980ULL; struct rspamd_proxy_ctx { @@ -83,12 +92,15 @@ struct rspamd_proxy_ctx { struct rspamd_keypair_cache *keys_cache; /* Upstreams to use */ GHashTable *upstreams; + /* Mirrors to send traffic to */ + GPtrArray *mirrors; /* Default upstream */ struct rspamd_http_upstream *default_upstream; /* Local rotating keypair for upstreams */ struct rspamd_cryptobox_keypair *local_key; struct event rotate_ev; gdouble rotate_tm; + lua_State *lua_state; }; struct rspamd_proxy_session { @@ -203,6 +215,160 @@ err: return FALSE; } +static gboolean +rspamd_proxy_parse_mirror (rspamd_mempool_t *pool, + const ucl_object_t *obj, + gpointer ud, + struct rspamd_rcl_section *section, + GError **err) +{ + const ucl_object_t *elt; + struct rspamd_http_mirror *up = NULL; + struct rspamd_proxy_ctx *ctx; + struct rspamd_rcl_struct_parser *pd = ud; + lua_State *L; + + ctx = pd->user_struct; + L = ctx->lua_state; + + if (ucl_object_type (obj) != UCL_OBJECT) { + g_set_error (err, rspamd_proxy_quark (), 100, + "upstream option must be an object"); + + return FALSE; + } + + elt = ucl_object_lookup (obj, "name"); + if (elt == NULL) { + g_set_error (err, rspamd_proxy_quark (), 100, + "upstream option must have some name definition"); + + return FALSE; + } + + up = g_slice_alloc0 (sizeof (*up)); + up->name = g_strdup (ucl_object_tostring (elt)); + up->parser_ref = -1; + + elt = ucl_object_lookup (obj, "key"); + if (elt != NULL) { + up->key = rspamd_pubkey_from_base32 (ucl_object_tostring (elt), 0, + RSPAMD_KEYPAIR_KEX, RSPAMD_CRYPTOBOX_MODE_25519); + + if (up->key == NULL) { + g_set_error (err, rspamd_proxy_quark (), 100, + "cannot read upstream key"); + + goto err; + } + } + + elt = ucl_object_lookup (obj, "hosts"); + + if (elt == NULL) { + g_set_error (err, rspamd_proxy_quark (), 100, + "upstream option must have some hosts definition"); + + goto err; + } + + up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx); + if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) { + g_set_error (err, rspamd_proxy_quark (), 100, + "upstream has bad hosts definition"); + + goto err; + } + + elt = ucl_object_lookup_any (obj, "probability", "prob", NULL); + if (elt) { + up->prob = ucl_object_todouble (elt); + } + else { + up->prob = 1.0; + } + + /* + * Accept lua function here in form + * fun :: String -> UCL + */ + elt = ucl_object_lookup (obj, "parser"); + if (elt) { + const gchar *lua_script; + gsize slen; + gint err_idx, ref_idx; + GString *tb = NULL; + + lua_script = ucl_object_tolstring (elt, &slen); + lua_pushcfunction (L, &rspamd_lua_traceback); + err_idx = lua_gettop (L); + + /* Load data */ + if (luaL_loadbuffer (L, lua_script, slen, "proxy parser") != 0) { + g_set_error (err, + rspamd_proxy_quark (), + EINVAL, + "cannot load lua parser script: %s", + lua_tostring (L, -1)); + lua_settop (L, 0); /* Error function */ + + goto err; + } + + /* Now do it */ + if (lua_pcall (L, 0, 1, err_idx) != 0) { + tb = lua_touserdata (L, -1); + g_set_error (err, + rspamd_proxy_quark (), + EINVAL, + "cannot init lua parser script: %s", + tb->str); + g_string_free (tb, TRUE); + lua_settop (L, 0); + + goto err; + } + + if (!lua_isfunction (L, -1)) { + g_set_error (err, + rspamd_proxy_quark (), + EINVAL, + "cannot init lua parser script: " + "must return function"); + lua_settop (L, 0); + + return FALSE; + } + + ref_idx = luaL_ref (L, LUA_REGISTRYINDEX); + up->parser_ref = ref_idx; + lua_settop (L, 0); + } + + g_ptr_array_add (ctx->mirrors, up); + + return TRUE; + +err: + + if (up) { + g_free (up->name); + rspamd_upstreams_destroy (up->u); + + if (up->key) { + rspamd_pubkey_unref (up->key); + } + + if (up->parser_ref != -1) { + luaL_unref (L, LUA_REGISTRYINDEX, up->parser_ref); + } + + g_slice_free1 (sizeof (*up), up); + } + + return FALSE; +} + gpointer init_rspamd_proxy (struct rspamd_config *cfg) { @@ -215,6 +381,7 @@ init_rspamd_proxy (struct rspamd_config *cfg) ctx->magic = rspamd_rspamd_proxy_magic; ctx->timeout = 5.0; ctx->upstreams = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal); + ctx->mirrors = g_ptr_array_new (); ctx->rotate_tm = DEFAULT_ROTATION_TIME; ctx->cfg = cfg; @@ -254,6 +421,14 @@ init_rspamd_proxy (struct rspamd_config *cfg) 0, 0, "List of upstreams"); + rspamd_rcl_register_worker_option (cfg, + type, + "mirror", + rspamd_proxy_parse_mirror, + ctx, + 0, + RSPAMD_CL_FLAG_MULTIPLE, + "List of mirrors"); return ctx; } @@ -590,6 +765,7 @@ start_rspamd_proxy (struct rspamd_worker *worker) ctx->resolver = dns_resolver_init (worker->srv->logger, ctx->ev_base, worker->srv->cfg); + ctx->lua_state = worker->srv->cfg->lua_state; double_to_tv (ctx->timeout, &ctx->io_tv); rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver); -- 2.39.5