aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAndrew Lewis <nerf@judo.za.org>2016-11-03 12:58:04 +0200
committerAndrew Lewis <nerf@judo.za.org>2016-11-03 15:21:27 +0200
commit1c1a742de5ad4b01b3c0d997ff275b3ea70bdf46 (patch)
tree78cf156998089a5f7dc82efb3a43b7b48270e1f5 /src
parent02a2e48fb60ec36b317497e21cf8df56b40aa73c (diff)
downloadrspamd-1c1a742de5ad4b01b3c0d997ff275b3ea70bdf46.tar.gz
rspamd-1c1a742de5ad4b01b3c0d997ff275b3ea70bdf46.zip
[Feature] Utility to convert fuzzy storage from sqlite to redis
Diffstat (limited to 'src')
-rw-r--r--src/rspamadm/CMakeLists.txt1
-rw-r--r--src/rspamadm/commands.c2
-rw-r--r--src/rspamadm/fuzzy_convert.c139
-rw-r--r--src/rspamadm/fuzzy_convert.lua198
4 files changed, 340 insertions, 0 deletions
diff --git a/src/rspamadm/CMakeLists.txt b/src/rspamadm/CMakeLists.txt
index 4fcf3fc16..8a016a497 100644
--- a/src/rspamadm/CMakeLists.txt
+++ b/src/rspamadm/CMakeLists.txt
@@ -3,6 +3,7 @@ SET(RSPAMADMSRC rspamadm.c
pw.c
keypair.c
configtest.c
+ fuzzy_convert.c
fuzzy_merge.c
configdump.c
control.c
diff --git a/src/rspamadm/commands.c b/src/rspamadm/commands.c
index 6f3e62251..300fc3dbe 100644
--- a/src/rspamadm/commands.c
+++ b/src/rspamadm/commands.c
@@ -23,6 +23,7 @@ extern struct rspamadm_command configdump_command;
extern struct rspamadm_command control_command;
extern struct rspamadm_command confighelp_command;
extern struct rspamadm_command statconvert_command;
+extern struct rspamadm_command fuzzyconvert_command;
extern struct rspamadm_command signtool_command;
extern struct rspamadm_command lua_command;
extern struct rspamadm_command dkim_keygen_command;
@@ -37,6 +38,7 @@ const struct rspamadm_command *commands[] = {
&control_command,
&confighelp_command,
&statconvert_command,
+ &fuzzyconvert_command,
&signtool_command,
&lua_command,
&dkim_keygen_command,
diff --git a/src/rspamadm/fuzzy_convert.c b/src/rspamadm/fuzzy_convert.c
new file mode 100644
index 000000000..6e31d9aad
--- /dev/null
+++ b/src/rspamadm/fuzzy_convert.c
@@ -0,0 +1,139 @@
+/*-
+ * Copyright (c) 2016, Andrew Lewis <nerf@judo.za.org>
+ * Copyright (c) 2016, Vsevolod Stakhov <vsevolod@highsecure.ru>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rspamadm.h"
+#include "lua/lua_common.h"
+#include "fuzzy_convert.lua.h"
+
+static gchar *source_db = NULL;
+static gchar *redis_host = NULL;
+static gchar *redis_db = NULL;
+static gchar *redis_password = NULL;
+static int64_t fuzzy_expiry = NULL;
+
+static void rspamadm_fuzzyconvert (gint argc, gchar **argv);
+static const char *rspamadm_fuzzyconvert_help (gboolean full_help);
+
+struct rspamadm_command fuzzyconvert_command = {
+ .name = "fuzzyconvert",
+ .flags = 0,
+ .help = rspamadm_fuzzyconvert_help,
+ .run = rspamadm_fuzzyconvert
+};
+
+static GOptionEntry entries[] = {
+ {"database", 'd', 0, G_OPTION_ARG_FILENAME, &source_db,
+ "Input sqlite", NULL},
+ {"expiry", 'e', 0, G_OPTION_ARG_INT, &fuzzy_expiry,
+ "Time in seconds after which hashes should be expired", NULL},
+ {"host", 'h', 0, G_OPTION_ARG_STRING, &redis_host,
+ "Output redis ip (in format ip:port)", NULL},
+ {"dbname", 'D', 0, G_OPTION_ARG_STRING, &redis_db,
+ "Database in redis (should be numeric)", NULL},
+ {"password", 'p', 0, G_OPTION_ARG_STRING, &redis_password,
+ "Password to connect to redis", NULL},
+ {NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL}
+};
+
+
+static const char *
+rspamadm_fuzzyconvert_help (gboolean full_help)
+{
+ const char *help_str;
+
+ if (full_help) {
+ help_str = "Convert statistics from sqlite3 to redis\n\n"
+ "Usage: rspamadm fuzzyconvert -d <sqlite_db> -h <redis_ip>\n"
+ "Where options are:\n\n"
+ "-d: input sqlite\n"
+ "-h: output redis ip (in format ip:port)\n"
+ "-D: output redis database\n"
+ "-p: redis password\n";
+ }
+ else {
+ help_str = "Convert statistics from sqlite3 to redis";
+ }
+
+ return help_str;
+}
+
+static void
+rspamadm_fuzzyconvert (gint argc, gchar **argv)
+{
+ GOptionContext *context;
+ GError *error = NULL;
+ lua_State *L;
+ ucl_object_t *obj;
+
+ context = g_option_context_new (
+ "fuzzyconvert - converts statistics from sqlite3 to redis");
+ g_option_context_set_summary (context,
+ "Summary:\n Rspamd administration utility version "
+ RVERSION
+ "\n Release id: "
+ RID);
+ g_option_context_add_main_entries (context, entries, NULL);
+ g_option_context_set_ignore_unknown_options (context, TRUE);
+
+ if (!g_option_context_parse (context, &argc, &argv, &error)) {
+ rspamd_fprintf (stderr, "option parsing failed: %s\n", error->message);
+ g_error_free (error);
+ exit (1);
+ }
+
+ if (!source_db) {
+ rspamd_fprintf (stderr, "source db is missing\n");
+ exit (1);
+ }
+ if (!redis_host) {
+ rspamd_fprintf (stderr, "redis host is missing\n");
+ exit (1);
+ }
+ if (!fuzzy_expiry) {
+ rspamd_fprintf (stderr, "expiry is missing\n");
+ exit (1);
+ }
+
+ L = rspamd_lua_init ();
+
+ obj = ucl_object_typed_new (UCL_OBJECT);
+ ucl_object_insert_key (obj, ucl_object_fromstring (source_db),
+ "source_db", 0, false);
+ ucl_object_insert_key (obj, ucl_object_fromstring (redis_host),
+ "redis_host", 0, false);
+ ucl_object_insert_key (obj, ucl_object_fromint (fuzzy_expiry),
+ "expiry", 0, false);
+
+ if (redis_password) {
+ ucl_object_insert_key (obj, ucl_object_fromstring (redis_password),
+ "redis_password", 0, false);
+ }
+
+ if (redis_db) {
+ ucl_object_insert_key (obj, ucl_object_fromstring (redis_db),
+ "redis_db", 0, false);
+ }
+
+ rspamadm_execute_lua_ucl_subr (L,
+ argc,
+ argv,
+ obj,
+ rspamadm_script_fuzzy_convert);
+
+ lua_close (L);
+ ucl_object_unref (obj);
+}
diff --git a/src/rspamadm/fuzzy_convert.lua b/src/rspamadm/fuzzy_convert.lua
new file mode 100644
index 000000000..53a44354c
--- /dev/null
+++ b/src/rspamadm/fuzzy_convert.lua
@@ -0,0 +1,198 @@
+local sqlite3 = require "rspamd_sqlite3"
+local redis = require "rspamd_redis"
+local util = require "rspamd_util"
+
+local function connect_redis(server, password, db)
+ local ret
+ local conn, err = redis.connect_sync({
+ host = server,
+ })
+
+ if not conn then
+ print('Cannot connect to ' .. server .. ' error: ' .. err)
+ return nil, err
+ end
+
+ if password then
+ ret = conn:add_cmd('AUTH', {password})
+ if not ret then
+ print('Cannot queue command. Error: ' .. err)
+ return nil, err
+ end
+ end
+ if db then
+ ret = conn:add_cmd('SELECT', {db})
+ if not ret then
+ print('Cannot queue command. Error: ' .. err)
+ return nil, err
+ end
+ end
+
+ if password or db then
+ ret, err = conn:exec()
+ if not ret then
+ print('Cannot send commands to ' .. server .. ' error: ' .. err)
+ return nil, err
+ end
+ end
+
+ return conn, nil
+end
+
+local function send_digests(digests, redis_host, redis_password, redis_db)
+ local conn, err = connect_redis(redis_host, redis_password, redis_db)
+ if not conn then return false end
+ local _, v
+ for _, v in ipairs(digests) do
+ local ret, err = conn:add_cmd('HMSET', {
+ 'fuzzy' .. v[1],
+ 'F', v[2],
+ 'V', v[3],
+ })
+ if not ret then
+ print('Cannot batch HMSET command: ' .. err)
+ return false
+ end
+ ret, err = conn:add_cmd('EXPIRE', {
+ 'fuzzy' .. v[1],
+ tostring(v[4]),
+ })
+ if not ret then
+ print('Cannot batch EXPIRE command: ' .. err)
+ return false
+ end
+ end
+ ret, err = conn:exec()
+ if not ret then
+ print('Cannot execute batched commands: ' .. err)
+ return false
+ end
+ return true
+end
+
+local function send_shingles(shingles, redis_host, redis_password, redis_db)
+ local conn, err = connect_redis(redis_host, redis_password, redis_db)
+ if not conn then return false end
+ local _, v
+ for _, v in ipairs(shingles) do
+ local ret, err = conn:add_cmd('SET', {
+ 'fuzzy_' .. v[2] .. '_' .. v[1],
+ v[4],
+ })
+ if not ret then
+ print('Cannot batch SET command: ' .. err)
+ return false
+ end
+ ret, err = conn:add_cmd('EXPIRE', {
+ 'fuzzy_' .. v[2] .. '_' .. v[1],
+ tostring(v[3]),
+ })
+ if not ret then
+ print('Cannot batch EXPIRE command: ' .. err)
+ return false
+ end
+ end
+ ret, err = conn:exec()
+ if not ret then
+ print('Cannot execute batched commands: ' .. err)
+ return false
+ end
+ return true
+end
+
+local function update_counters(total, redis_host, redis_password, redis_db)
+ local conn, err = connect_redis(redis_host, redis_password, redis_db)
+ if not conn then return false end
+ local ret, err = conn:add_cmd('SET', {
+ 'fuzzylocal',
+ total,
+ })
+ if not ret then
+ print('Cannot batch SET command: ' .. err)
+ return false
+ end
+ local ret, err = conn:add_cmd('SET', {
+ 'fuzzy_count',
+ total,
+ })
+ if not ret then
+ print('Cannot batch SET command: ' .. err)
+ return false
+ end
+ ret, err = conn:exec()
+ if not ret then
+ print('Cannot execute batched commands: ' .. err)
+ return false
+ end
+ return true
+end
+
+return function (args, res)
+ local db = sqlite3.open(res['source_db'])
+ local shingles = {}
+ local digests = {}
+ local num_batch_digests = 0
+ local num_batch_shingles = 0
+ local total_digests = 0
+ local total_shingles = 0
+ local lim_batch = 1000 -- Update each 1000 entries
+ local redis_password = res['redis_password']
+ local redis_db = nil
+
+ if res['redis_db'] then
+ redis_db = tostring(res['redis_db'])
+ end
+
+ if not db then
+ print('Cannot open source db: ' .. res['source_db'])
+ return
+ end
+
+ for row in db:rows('SELECT id, flag, digest, value, time FROM digests') do
+
+ local expire_in = math.floor(util:get_time() - row.time + res['expiry'])
+ if expire_in >= 1 then
+ table.insert(digests, {row.digest, row.flag, row.value, expire_in})
+ num_batch_digests = num_batch_digests + 1
+ total_digests = total_digests + 1
+ for srow in db:rows('SELECT value, number FROM shingles WHERE digest_id = ' .. row.id) do
+ table.insert(shingles, {srow.value, srow.number, expire_in, row.digest})
+ total_shingles = total_shingles + 1
+ num_batch_shingles = num_batch_shingles + 1
+ end
+ end
+ if num_batch_digests >= lim_batch then
+ if not send_digests(digests, res['redis_host'], redis_password, redis_db) then
+ return
+ end
+ num_batch_digests = 0
+ digests = {}
+ end
+ if num_batch_shingles >= lim_batch then
+ if not send_shingles(shingles, res['redis_host'], redis_password, redis_db) then
+ return
+ end
+ num_batch_shingles = 0
+ shingles = {}
+ end
+ end
+ if digests[1] then
+ if not send_digests(digests, res['redis_host'], redis_password, redis_db) then
+ return
+ end
+ end
+ if shingles[1] then
+ if not send_shingles(shingles, res['redis_host'], redis_password, redis_db) then
+ return
+ end
+ end
+
+ local message = string.format(
+ 'Migrated %d digests and %d shingles',
+ total_digests, total_shingles
+ )
+ if not update_counters(total_digests, res['redis_host'], redis_password, redis_db) then
+ message = message .. ' but failed to update counters'
+ end
+ print(message)
+end