From 1c1a742de5ad4b01b3c0d997ff275b3ea70bdf46 Mon Sep 17 00:00:00 2001 From: Andrew Lewis Date: Thu, 3 Nov 2016 12:58:04 +0200 Subject: [PATCH] [Feature] Utility to convert fuzzy storage from sqlite to redis --- src/rspamadm/CMakeLists.txt | 1 + src/rspamadm/commands.c | 2 + src/rspamadm/fuzzy_convert.c | 139 +++++++++++++++++++++++ src/rspamadm/fuzzy_convert.lua | 198 +++++++++++++++++++++++++++++++++ 4 files changed, 340 insertions(+) create mode 100644 src/rspamadm/fuzzy_convert.c create mode 100644 src/rspamadm/fuzzy_convert.lua diff --git a/src/rspamadm/CMakeLists.txt b/src/rspamadm/CMakeLists.txt index 4fcf3fc16..8a016a497 100644 --- a/src/rspamadm/CMakeLists.txt +++ b/src/rspamadm/CMakeLists.txt @@ -3,6 +3,7 @@ SET(RSPAMADMSRC rspamadm.c pw.c keypair.c configtest.c + fuzzy_convert.c fuzzy_merge.c configdump.c control.c diff --git a/src/rspamadm/commands.c b/src/rspamadm/commands.c index 6f3e62251..300fc3dbe 100644 --- a/src/rspamadm/commands.c +++ b/src/rspamadm/commands.c @@ -23,6 +23,7 @@ extern struct rspamadm_command configdump_command; extern struct rspamadm_command control_command; extern struct rspamadm_command confighelp_command; extern struct rspamadm_command statconvert_command; +extern struct rspamadm_command fuzzyconvert_command; extern struct rspamadm_command signtool_command; extern struct rspamadm_command lua_command; extern struct rspamadm_command dkim_keygen_command; @@ -37,6 +38,7 @@ const struct rspamadm_command *commands[] = { &control_command, &confighelp_command, &statconvert_command, + &fuzzyconvert_command, &signtool_command, &lua_command, &dkim_keygen_command, diff --git a/src/rspamadm/fuzzy_convert.c b/src/rspamadm/fuzzy_convert.c new file mode 100644 index 000000000..6e31d9aad --- /dev/null +++ b/src/rspamadm/fuzzy_convert.c @@ -0,0 +1,139 @@ +/*- + * Copyright (c) 2016, Andrew Lewis + * Copyright (c) 2016, Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rspamadm.h" +#include "lua/lua_common.h" +#include "fuzzy_convert.lua.h" + +static gchar *source_db = NULL; +static gchar *redis_host = NULL; +static gchar *redis_db = NULL; +static gchar *redis_password = NULL; +static int64_t fuzzy_expiry = NULL; + +static void rspamadm_fuzzyconvert (gint argc, gchar **argv); +static const char *rspamadm_fuzzyconvert_help (gboolean full_help); + +struct rspamadm_command fuzzyconvert_command = { + .name = "fuzzyconvert", + .flags = 0, + .help = rspamadm_fuzzyconvert_help, + .run = rspamadm_fuzzyconvert +}; + +static GOptionEntry entries[] = { + {"database", 'd', 0, G_OPTION_ARG_FILENAME, &source_db, + "Input sqlite", NULL}, + {"expiry", 'e', 0, G_OPTION_ARG_INT, &fuzzy_expiry, + "Time in seconds after which hashes should be expired", NULL}, + {"host", 'h', 0, G_OPTION_ARG_STRING, &redis_host, + "Output redis ip (in format ip:port)", NULL}, + {"dbname", 'D', 0, G_OPTION_ARG_STRING, &redis_db, + "Database in redis (should be numeric)", NULL}, + {"password", 'p', 0, G_OPTION_ARG_STRING, &redis_password, + "Password to connect to redis", NULL}, + {NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL} +}; + + +static const char * +rspamadm_fuzzyconvert_help (gboolean full_help) +{ + const char *help_str; + + if (full_help) { + help_str = "Convert statistics from sqlite3 to redis\n\n" + "Usage: rspamadm fuzzyconvert -d -h \n" + "Where options are:\n\n" + "-d: input sqlite\n" + "-h: output redis ip (in format ip:port)\n" + "-D: output redis database\n" + "-p: redis password\n"; + } + else { + help_str = "Convert statistics from sqlite3 to redis"; + } + + return help_str; +} + +static void +rspamadm_fuzzyconvert (gint argc, gchar **argv) +{ + GOptionContext *context; + GError *error = NULL; + lua_State *L; + ucl_object_t *obj; + + context = g_option_context_new ( + "fuzzyconvert - converts statistics from sqlite3 to redis"); + g_option_context_set_summary (context, + "Summary:\n Rspamd administration utility version " + RVERSION + "\n Release id: " + RID); + g_option_context_add_main_entries (context, entries, NULL); + g_option_context_set_ignore_unknown_options (context, TRUE); + + if (!g_option_context_parse (context, &argc, &argv, &error)) { + rspamd_fprintf (stderr, "option parsing failed: %s\n", error->message); + g_error_free (error); + exit (1); + } + + if (!source_db) { + rspamd_fprintf (stderr, "source db is missing\n"); + exit (1); + } + if (!redis_host) { + rspamd_fprintf (stderr, "redis host is missing\n"); + exit (1); + } + if (!fuzzy_expiry) { + rspamd_fprintf (stderr, "expiry is missing\n"); + exit (1); + } + + L = rspamd_lua_init (); + + obj = ucl_object_typed_new (UCL_OBJECT); + ucl_object_insert_key (obj, ucl_object_fromstring (source_db), + "source_db", 0, false); + ucl_object_insert_key (obj, ucl_object_fromstring (redis_host), + "redis_host", 0, false); + ucl_object_insert_key (obj, ucl_object_fromint (fuzzy_expiry), + "expiry", 0, false); + + if (redis_password) { + ucl_object_insert_key (obj, ucl_object_fromstring (redis_password), + "redis_password", 0, false); + } + + if (redis_db) { + ucl_object_insert_key (obj, ucl_object_fromstring (redis_db), + "redis_db", 0, false); + } + + rspamadm_execute_lua_ucl_subr (L, + argc, + argv, + obj, + rspamadm_script_fuzzy_convert); + + lua_close (L); + ucl_object_unref (obj); +} diff --git a/src/rspamadm/fuzzy_convert.lua b/src/rspamadm/fuzzy_convert.lua new file mode 100644 index 000000000..53a44354c --- /dev/null +++ b/src/rspamadm/fuzzy_convert.lua @@ -0,0 +1,198 @@ +local sqlite3 = require "rspamd_sqlite3" +local redis = require "rspamd_redis" +local util = require "rspamd_util" + +local function connect_redis(server, password, db) + local ret + local conn, err = redis.connect_sync({ + host = server, + }) + + if not conn then + print('Cannot connect to ' .. server .. ' error: ' .. err) + return nil, err + end + + if password then + ret = conn:add_cmd('AUTH', {password}) + if not ret then + print('Cannot queue command. Error: ' .. err) + return nil, err + end + end + if db then + ret = conn:add_cmd('SELECT', {db}) + if not ret then + print('Cannot queue command. Error: ' .. err) + return nil, err + end + end + + if password or db then + ret, err = conn:exec() + if not ret then + print('Cannot send commands to ' .. server .. ' error: ' .. err) + return nil, err + end + end + + return conn, nil +end + +local function send_digests(digests, redis_host, redis_password, redis_db) + local conn, err = connect_redis(redis_host, redis_password, redis_db) + if not conn then return false end + local _, v + for _, v in ipairs(digests) do + local ret, err = conn:add_cmd('HMSET', { + 'fuzzy' .. v[1], + 'F', v[2], + 'V', v[3], + }) + if not ret then + print('Cannot batch HMSET command: ' .. err) + return false + end + ret, err = conn:add_cmd('EXPIRE', { + 'fuzzy' .. v[1], + tostring(v[4]), + }) + if not ret then + print('Cannot batch EXPIRE command: ' .. err) + return false + end + end + ret, err = conn:exec() + if not ret then + print('Cannot execute batched commands: ' .. err) + return false + end + return true +end + +local function send_shingles(shingles, redis_host, redis_password, redis_db) + local conn, err = connect_redis(redis_host, redis_password, redis_db) + if not conn then return false end + local _, v + for _, v in ipairs(shingles) do + local ret, err = conn:add_cmd('SET', { + 'fuzzy_' .. v[2] .. '_' .. v[1], + v[4], + }) + if not ret then + print('Cannot batch SET command: ' .. err) + return false + end + ret, err = conn:add_cmd('EXPIRE', { + 'fuzzy_' .. v[2] .. '_' .. v[1], + tostring(v[3]), + }) + if not ret then + print('Cannot batch EXPIRE command: ' .. err) + return false + end + end + ret, err = conn:exec() + if not ret then + print('Cannot execute batched commands: ' .. err) + return false + end + return true +end + +local function update_counters(total, redis_host, redis_password, redis_db) + local conn, err = connect_redis(redis_host, redis_password, redis_db) + if not conn then return false end + local ret, err = conn:add_cmd('SET', { + 'fuzzylocal', + total, + }) + if not ret then + print('Cannot batch SET command: ' .. err) + return false + end + local ret, err = conn:add_cmd('SET', { + 'fuzzy_count', + total, + }) + if not ret then + print('Cannot batch SET command: ' .. err) + return false + end + ret, err = conn:exec() + if not ret then + print('Cannot execute batched commands: ' .. err) + return false + end + return true +end + +return function (args, res) + local db = sqlite3.open(res['source_db']) + local shingles = {} + local digests = {} + local num_batch_digests = 0 + local num_batch_shingles = 0 + local total_digests = 0 + local total_shingles = 0 + local lim_batch = 1000 -- Update each 1000 entries + local redis_password = res['redis_password'] + local redis_db = nil + + if res['redis_db'] then + redis_db = tostring(res['redis_db']) + end + + if not db then + print('Cannot open source db: ' .. res['source_db']) + return + end + + for row in db:rows('SELECT id, flag, digest, value, time FROM digests') do + + local expire_in = math.floor(util:get_time() - row.time + res['expiry']) + if expire_in >= 1 then + table.insert(digests, {row.digest, row.flag, row.value, expire_in}) + num_batch_digests = num_batch_digests + 1 + total_digests = total_digests + 1 + for srow in db:rows('SELECT value, number FROM shingles WHERE digest_id = ' .. row.id) do + table.insert(shingles, {srow.value, srow.number, expire_in, row.digest}) + total_shingles = total_shingles + 1 + num_batch_shingles = num_batch_shingles + 1 + end + end + if num_batch_digests >= lim_batch then + if not send_digests(digests, res['redis_host'], redis_password, redis_db) then + return + end + num_batch_digests = 0 + digests = {} + end + if num_batch_shingles >= lim_batch then + if not send_shingles(shingles, res['redis_host'], redis_password, redis_db) then + return + end + num_batch_shingles = 0 + shingles = {} + end + end + if digests[1] then + if not send_digests(digests, res['redis_host'], redis_password, redis_db) then + return + end + end + if shingles[1] then + if not send_shingles(shingles, res['redis_host'], redis_password, redis_db) then + return + end + end + + local message = string.format( + 'Migrated %d digests and %d shingles', + total_digests, total_shingles + ) + if not update_counters(total_digests, res['redis_host'], redis_password, redis_db) then + message = message .. ' but failed to update counters' + end + print(message) +end -- 2.39.5