aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-05-06 15:09:21 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-05-06 15:09:21 +0100
commitce904384d6711d0d04b43e051c597a11d74ffc36 (patch)
tree3fe5ed8c94cc77cefe064a2918b5b06831a096f9
parent3c92cf1de7fe84941f0a1f7362b221b5a951eec6 (diff)
downloadrspamd-ce904384d6711d0d04b43e051c597a11d74ffc36.tar.gz
rspamd-ce904384d6711d0d04b43e051c597a11d74ffc36.zip
[Feature] Add mirrors feature
-rw-r--r--src/rspamd_proxy.c176
1 files changed, 176 insertions, 0 deletions
diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c
index 79adc93c7..626cfdee1 100644
--- a/src/rspamd_proxy.c
+++ b/src/rspamd_proxy.c
@@ -24,6 +24,7 @@
#include "libmime/message.h"
#include "rspamd.h"
#include "libserver/worker_util.h"
+#include "lua/lua_common.h"
#include "keypairs_cache.h"
#include "ottery.h"
#include "unix-std.h"
@@ -66,6 +67,14 @@ struct rspamd_http_upstream {
struct rspamd_cryptobox_pubkey *key;
};
+struct rspamd_http_mirror {
+ gchar *name;
+ struct upstream_list *u;
+ struct rspamd_cryptobox_pubkey *key;
+ gdouble prob;
+ gint parser_ref;
+};
+
static const guint64 rspamd_rspamd_proxy_magic = 0xcdeb4fd1fc351980ULL;
struct rspamd_proxy_ctx {
@@ -83,12 +92,15 @@ struct rspamd_proxy_ctx {
struct rspamd_keypair_cache *keys_cache;
/* Upstreams to use */
GHashTable *upstreams;
+ /* Mirrors to send traffic to */
+ GPtrArray *mirrors;
/* Default upstream */
struct rspamd_http_upstream *default_upstream;
/* Local rotating keypair for upstreams */
struct rspamd_cryptobox_keypair *local_key;
struct event rotate_ev;
gdouble rotate_tm;
+ lua_State *lua_state;
};
struct rspamd_proxy_session {
@@ -203,6 +215,160 @@ err:
return FALSE;
}
+static gboolean
+rspamd_proxy_parse_mirror (rspamd_mempool_t *pool,
+ const ucl_object_t *obj,
+ gpointer ud,
+ struct rspamd_rcl_section *section,
+ GError **err)
+{
+ const ucl_object_t *elt;
+ struct rspamd_http_mirror *up = NULL;
+ struct rspamd_proxy_ctx *ctx;
+ struct rspamd_rcl_struct_parser *pd = ud;
+ lua_State *L;
+
+ ctx = pd->user_struct;
+ L = ctx->lua_state;
+
+ if (ucl_object_type (obj) != UCL_OBJECT) {
+ g_set_error (err, rspamd_proxy_quark (), 100,
+ "upstream option must be an object");
+
+ return FALSE;
+ }
+
+ elt = ucl_object_lookup (obj, "name");
+ if (elt == NULL) {
+ g_set_error (err, rspamd_proxy_quark (), 100,
+ "upstream option must have some name definition");
+
+ return FALSE;
+ }
+
+ up = g_slice_alloc0 (sizeof (*up));
+ up->name = g_strdup (ucl_object_tostring (elt));
+ up->parser_ref = -1;
+
+ elt = ucl_object_lookup (obj, "key");
+ if (elt != NULL) {
+ up->key = rspamd_pubkey_from_base32 (ucl_object_tostring (elt), 0,
+ RSPAMD_KEYPAIR_KEX, RSPAMD_CRYPTOBOX_MODE_25519);
+
+ if (up->key == NULL) {
+ g_set_error (err, rspamd_proxy_quark (), 100,
+ "cannot read upstream key");
+
+ goto err;
+ }
+ }
+
+ elt = ucl_object_lookup (obj, "hosts");
+
+ if (elt == NULL) {
+ g_set_error (err, rspamd_proxy_quark (), 100,
+ "upstream option must have some hosts definition");
+
+ goto err;
+ }
+
+ up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx);
+ if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) {
+ g_set_error (err, rspamd_proxy_quark (), 100,
+ "upstream has bad hosts definition");
+
+ goto err;
+ }
+
+ elt = ucl_object_lookup_any (obj, "probability", "prob", NULL);
+ if (elt) {
+ up->prob = ucl_object_todouble (elt);
+ }
+ else {
+ up->prob = 1.0;
+ }
+
+ /*
+ * Accept lua function here in form
+ * fun :: String -> UCL
+ */
+ elt = ucl_object_lookup (obj, "parser");
+ if (elt) {
+ const gchar *lua_script;
+ gsize slen;
+ gint err_idx, ref_idx;
+ GString *tb = NULL;
+
+ lua_script = ucl_object_tolstring (elt, &slen);
+ lua_pushcfunction (L, &rspamd_lua_traceback);
+ err_idx = lua_gettop (L);
+
+ /* Load data */
+ if (luaL_loadbuffer (L, lua_script, slen, "proxy parser") != 0) {
+ g_set_error (err,
+ rspamd_proxy_quark (),
+ EINVAL,
+ "cannot load lua parser script: %s",
+ lua_tostring (L, -1));
+ lua_settop (L, 0); /* Error function */
+
+ goto err;
+ }
+
+ /* Now do it */
+ if (lua_pcall (L, 0, 1, err_idx) != 0) {
+ tb = lua_touserdata (L, -1);
+ g_set_error (err,
+ rspamd_proxy_quark (),
+ EINVAL,
+ "cannot init lua parser script: %s",
+ tb->str);
+ g_string_free (tb, TRUE);
+ lua_settop (L, 0);
+
+ goto err;
+ }
+
+ if (!lua_isfunction (L, -1)) {
+ g_set_error (err,
+ rspamd_proxy_quark (),
+ EINVAL,
+ "cannot init lua parser script: "
+ "must return function");
+ lua_settop (L, 0);
+
+ return FALSE;
+ }
+
+ ref_idx = luaL_ref (L, LUA_REGISTRYINDEX);
+ up->parser_ref = ref_idx;
+ lua_settop (L, 0);
+ }
+
+ g_ptr_array_add (ctx->mirrors, up);
+
+ return TRUE;
+
+err:
+
+ if (up) {
+ g_free (up->name);
+ rspamd_upstreams_destroy (up->u);
+
+ if (up->key) {
+ rspamd_pubkey_unref (up->key);
+ }
+
+ if (up->parser_ref != -1) {
+ luaL_unref (L, LUA_REGISTRYINDEX, up->parser_ref);
+ }
+
+ g_slice_free1 (sizeof (*up), up);
+ }
+
+ return FALSE;
+}
+
gpointer
init_rspamd_proxy (struct rspamd_config *cfg)
{
@@ -215,6 +381,7 @@ init_rspamd_proxy (struct rspamd_config *cfg)
ctx->magic = rspamd_rspamd_proxy_magic;
ctx->timeout = 5.0;
ctx->upstreams = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
+ ctx->mirrors = g_ptr_array_new ();
ctx->rotate_tm = DEFAULT_ROTATION_TIME;
ctx->cfg = cfg;
@@ -254,6 +421,14 @@ init_rspamd_proxy (struct rspamd_config *cfg)
0,
0,
"List of upstreams");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "mirror",
+ rspamd_proxy_parse_mirror,
+ ctx,
+ 0,
+ RSPAMD_CL_FLAG_MULTIPLE,
+ "List of mirrors");
return ctx;
}
@@ -590,6 +765,7 @@ start_rspamd_proxy (struct rspamd_worker *worker)
ctx->resolver = dns_resolver_init (worker->srv->logger,
ctx->ev_base,
worker->srv->cfg);
+ ctx->lua_state = worker->srv->cfg->lua_state;
double_to_tv (ctx->timeout, &ctx->io_tv);
rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver);