]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Add mirrors feature
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 6 May 2016 14:09:21 +0000 (15:09 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 6 May 2016 14:09:21 +0000 (15:09 +0100)
src/rspamd_proxy.c

index 79adc93c7434ac43b7c4e25c0dcaeb7f4adb2378..626cfdee1fba35de22fb08beac6756a4c6adc249 100644 (file)
@@ -24,6 +24,7 @@
 #include "libmime/message.h"
 #include "rspamd.h"
 #include "libserver/worker_util.h"
+#include "lua/lua_common.h"
 #include "keypairs_cache.h"
 #include "ottery.h"
 #include "unix-std.h"
@@ -66,6 +67,14 @@ struct rspamd_http_upstream {
        struct rspamd_cryptobox_pubkey *key;
 };
 
+struct rspamd_http_mirror {
+       gchar *name;
+       struct upstream_list *u;
+       struct rspamd_cryptobox_pubkey *key;
+       gdouble prob;
+       gint parser_ref;
+};
+
 static const guint64 rspamd_rspamd_proxy_magic = 0xcdeb4fd1fc351980ULL;
 
 struct rspamd_proxy_ctx {
@@ -83,12 +92,15 @@ struct rspamd_proxy_ctx {
        struct rspamd_keypair_cache *keys_cache;
        /* Upstreams to use */
        GHashTable *upstreams;
+       /* Mirrors to send traffic to */
+       GPtrArray *mirrors;
        /* Default upstream */
        struct rspamd_http_upstream *default_upstream;
        /* Local rotating keypair for upstreams */
        struct rspamd_cryptobox_keypair *local_key;
        struct event rotate_ev;
        gdouble rotate_tm;
+       lua_State *lua_state;
 };
 
 struct rspamd_proxy_session {
@@ -203,6 +215,160 @@ err:
        return FALSE;
 }
 
+static gboolean
+rspamd_proxy_parse_mirror (rspamd_mempool_t *pool,
+       const ucl_object_t *obj,
+       gpointer ud,
+       struct rspamd_rcl_section *section,
+       GError **err)
+{
+       const ucl_object_t *elt;
+       struct rspamd_http_mirror *up = NULL;
+       struct rspamd_proxy_ctx *ctx;
+       struct rspamd_rcl_struct_parser *pd = ud;
+       lua_State *L;
+
+       ctx = pd->user_struct;
+       L = ctx->lua_state;
+
+       if (ucl_object_type (obj) != UCL_OBJECT) {
+               g_set_error (err, rspamd_proxy_quark (), 100,
+                               "upstream option must be an object");
+
+               return FALSE;
+       }
+
+       elt = ucl_object_lookup (obj, "name");
+       if (elt == NULL) {
+               g_set_error (err, rspamd_proxy_quark (), 100,
+                               "upstream option must have some name definition");
+
+               return FALSE;
+       }
+
+       up = g_slice_alloc0 (sizeof (*up));
+       up->name = g_strdup (ucl_object_tostring (elt));
+       up->parser_ref = -1;
+
+       elt = ucl_object_lookup (obj, "key");
+       if (elt != NULL) {
+               up->key = rspamd_pubkey_from_base32 (ucl_object_tostring (elt), 0,
+                               RSPAMD_KEYPAIR_KEX, RSPAMD_CRYPTOBOX_MODE_25519);
+
+               if (up->key == NULL) {
+                       g_set_error (err, rspamd_proxy_quark (), 100,
+                                       "cannot read upstream key");
+
+                       goto err;
+               }
+       }
+
+       elt = ucl_object_lookup (obj, "hosts");
+
+       if (elt == NULL) {
+               g_set_error (err, rspamd_proxy_quark (), 100,
+                               "upstream option must have some hosts definition");
+
+               goto err;
+       }
+
+       up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx);
+       if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) {
+               g_set_error (err, rspamd_proxy_quark (), 100,
+                               "upstream has bad hosts definition");
+
+               goto err;
+       }
+
+       elt = ucl_object_lookup_any (obj, "probability", "prob", NULL);
+       if (elt) {
+               up->prob = ucl_object_todouble (elt);
+       }
+       else {
+               up->prob = 1.0;
+       }
+
+       /*
+        * Accept lua function here in form
+        * fun :: String -> UCL
+        */
+       elt = ucl_object_lookup (obj, "parser");
+       if (elt) {
+               const gchar *lua_script;
+               gsize slen;
+               gint err_idx, ref_idx;
+               GString *tb = NULL;
+
+               lua_script = ucl_object_tolstring (elt, &slen);
+               lua_pushcfunction (L, &rspamd_lua_traceback);
+               err_idx = lua_gettop (L);
+
+               /* Load data */
+               if (luaL_loadbuffer (L, lua_script, slen, "proxy parser") != 0) {
+                       g_set_error (err,
+                                       rspamd_proxy_quark (),
+                                       EINVAL,
+                                       "cannot load lua parser script: %s",
+                                       lua_tostring (L, -1));
+                       lua_settop (L, 0); /* Error function */
+
+                       goto err;
+               }
+
+               /* Now do it */
+               if (lua_pcall (L, 0, 1, err_idx) != 0) {
+                       tb = lua_touserdata (L, -1);
+                       g_set_error (err,
+                                       rspamd_proxy_quark (),
+                                       EINVAL,
+                                       "cannot init lua parser script: %s",
+                                       tb->str);
+                       g_string_free (tb, TRUE);
+                       lua_settop (L, 0);
+
+                       goto err;
+               }
+
+               if (!lua_isfunction (L, -1)) {
+                       g_set_error (err,
+                                       rspamd_proxy_quark (),
+                                       EINVAL,
+                                       "cannot init lua parser script: "
+                                       "must return function");
+                       lua_settop (L, 0);
+
+                       return FALSE;
+               }
+
+               ref_idx = luaL_ref (L, LUA_REGISTRYINDEX);
+               up->parser_ref = ref_idx;
+               lua_settop (L, 0);
+       }
+
+       g_ptr_array_add (ctx->mirrors, up);
+
+       return TRUE;
+
+err:
+
+       if (up) {
+               g_free (up->name);
+               rspamd_upstreams_destroy (up->u);
+
+               if (up->key) {
+                       rspamd_pubkey_unref (up->key);
+               }
+
+               if (up->parser_ref != -1) {
+                       luaL_unref (L, LUA_REGISTRYINDEX, up->parser_ref);
+               }
+
+               g_slice_free1 (sizeof (*up), up);
+       }
+
+       return FALSE;
+}
+
 gpointer
 init_rspamd_proxy (struct rspamd_config *cfg)
 {
@@ -215,6 +381,7 @@ init_rspamd_proxy (struct rspamd_config *cfg)
        ctx->magic = rspamd_rspamd_proxy_magic;
        ctx->timeout = 5.0;
        ctx->upstreams = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
+       ctx->mirrors = g_ptr_array_new ();
        ctx->rotate_tm = DEFAULT_ROTATION_TIME;
        ctx->cfg = cfg;
 
@@ -254,6 +421,14 @@ init_rspamd_proxy (struct rspamd_config *cfg)
                        0,
                        0,
                        "List of upstreams");
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "mirror",
+                       rspamd_proxy_parse_mirror,
+                       ctx,
+                       0,
+                       RSPAMD_CL_FLAG_MULTIPLE,
+                       "List of mirrors");
 
        return ctx;
 }
@@ -590,6 +765,7 @@ start_rspamd_proxy (struct rspamd_worker *worker)
        ctx->resolver = dns_resolver_init (worker->srv->logger,
                        ctx->ev_base,
                        worker->srv->cfg);
+       ctx->lua_state = worker->srv->cfg->lua_state;
        double_to_tv (ctx->timeout, &ctx->io_tv);
        rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver);