]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Utility to convert fuzzy storage from sqlite to redis 1096/head
authorAndrew Lewis <nerf@judo.za.org>
Thu, 3 Nov 2016 10:58:04 +0000 (12:58 +0200)
committerAndrew Lewis <nerf@judo.za.org>
Thu, 3 Nov 2016 13:21:27 +0000 (15:21 +0200)
src/rspamadm/CMakeLists.txt
src/rspamadm/commands.c
src/rspamadm/fuzzy_convert.c [new file with mode: 0644]
src/rspamadm/fuzzy_convert.lua [new file with mode: 0644]

index 4fcf3fc16bccc3ba5e92634644c83687fc9c5e27..8a016a497f3139e08e3faced14c6ed3c1d6f58aa 100644 (file)
@@ -3,6 +3,7 @@ SET(RSPAMADMSRC rspamadm.c
         pw.c
         keypair.c
         configtest.c
+        fuzzy_convert.c
         fuzzy_merge.c
         configdump.c
         control.c
index 6f3e62251e11c60de359169f00fe9beeea3681a0..300fc3dbe2e056ccb06e068383b60c5cbf6cdb55 100644 (file)
@@ -23,6 +23,7 @@ extern struct rspamadm_command configdump_command;
 extern struct rspamadm_command control_command;
 extern struct rspamadm_command confighelp_command;
 extern struct rspamadm_command statconvert_command;
+extern struct rspamadm_command fuzzyconvert_command;
 extern struct rspamadm_command signtool_command;
 extern struct rspamadm_command lua_command;
 extern struct rspamadm_command dkim_keygen_command;
@@ -37,6 +38,7 @@ const struct rspamadm_command *commands[] = {
        &control_command,
        &confighelp_command,
        &statconvert_command,
+       &fuzzyconvert_command,
        &signtool_command,
        &lua_command,
        &dkim_keygen_command,
diff --git a/src/rspamadm/fuzzy_convert.c b/src/rspamadm/fuzzy_convert.c
new file mode 100644 (file)
index 0000000..6e31d9a
--- /dev/null
@@ -0,0 +1,139 @@
+/*-
+ * Copyright (c) 2016, Andrew Lewis <nerf@judo.za.org>
+ * Copyright (c) 2016, Vsevolod Stakhov <vsevolod@highsecure.ru>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rspamadm.h"
+#include "lua/lua_common.h"
+#include "fuzzy_convert.lua.h"
+
+static gchar *source_db = NULL;
+static gchar *redis_host = NULL;
+static gchar *redis_db = NULL;
+static gchar *redis_password = NULL;
+static int64_t fuzzy_expiry = NULL;
+
+static void rspamadm_fuzzyconvert (gint argc, gchar **argv);
+static const char *rspamadm_fuzzyconvert_help (gboolean full_help);
+
+struct rspamadm_command fuzzyconvert_command = {
+               .name = "fuzzyconvert",
+               .flags = 0,
+               .help = rspamadm_fuzzyconvert_help,
+               .run = rspamadm_fuzzyconvert
+};
+
+static GOptionEntry entries[] = {
+               {"database", 'd', 0, G_OPTION_ARG_FILENAME, &source_db,
+                               "Input sqlite",      NULL},
+               {"expiry", 'e', 0, G_OPTION_ARG_INT, &fuzzy_expiry,
+                               "Time in seconds after which hashes should be expired", NULL},
+               {"host", 'h', 0, G_OPTION_ARG_STRING, &redis_host,
+                               "Output redis ip (in format ip:port)", NULL},
+               {"dbname", 'D', 0, G_OPTION_ARG_STRING, &redis_db,
+                               "Database in redis (should be numeric)", NULL},
+               {"password", 'p', 0, G_OPTION_ARG_STRING, &redis_password,
+                               "Password to connect to redis", NULL},
+               {NULL,     0,   0, G_OPTION_ARG_NONE, NULL, NULL, NULL}
+};
+
+
+static const char *
+rspamadm_fuzzyconvert_help (gboolean full_help)
+{
+       const char *help_str;
+
+       if (full_help) {
+               help_str = "Convert statistics from sqlite3 to redis\n\n"
+                               "Usage: rspamadm fuzzyconvert -d <sqlite_db> -h <redis_ip>\n"
+                               "Where options are:\n\n"
+                               "-d: input sqlite\n"
+                               "-h: output redis ip (in format ip:port)\n"
+                               "-D: output redis database\n"
+                               "-p: redis password\n";
+       }
+       else {
+               help_str = "Convert statistics from sqlite3 to redis";
+       }
+
+       return help_str;
+}
+
+static void
+rspamadm_fuzzyconvert (gint argc, gchar **argv)
+{
+       GOptionContext *context;
+       GError *error = NULL;
+       lua_State *L;
+       ucl_object_t *obj;
+
+       context = g_option_context_new (
+                       "fuzzyconvert - converts statistics from sqlite3 to redis");
+       g_option_context_set_summary (context,
+                       "Summary:\n  Rspamd administration utility version "
+                                       RVERSION
+                                       "\n  Release id: "
+                                       RID);
+       g_option_context_add_main_entries (context, entries, NULL);
+       g_option_context_set_ignore_unknown_options (context, TRUE);
+
+       if (!g_option_context_parse (context, &argc, &argv, &error)) {
+               rspamd_fprintf (stderr, "option parsing failed: %s\n", error->message);
+               g_error_free (error);
+               exit (1);
+       }
+
+       if (!source_db) {
+               rspamd_fprintf (stderr, "source db is missing\n");
+               exit (1);
+       }
+       if (!redis_host) {
+               rspamd_fprintf (stderr, "redis host is missing\n");
+               exit (1);
+       }
+       if (!fuzzy_expiry) {
+               rspamd_fprintf (stderr, "expiry is missing\n");
+               exit (1);
+       }
+
+       L = rspamd_lua_init ();
+
+       obj = ucl_object_typed_new (UCL_OBJECT);
+       ucl_object_insert_key (obj, ucl_object_fromstring (source_db),
+                       "source_db", 0, false);
+       ucl_object_insert_key (obj, ucl_object_fromstring (redis_host),
+                       "redis_host", 0, false);
+       ucl_object_insert_key (obj, ucl_object_fromint (fuzzy_expiry),
+                       "expiry", 0, false);
+
+       if (redis_password) {
+               ucl_object_insert_key (obj, ucl_object_fromstring (redis_password),
+                               "redis_password", 0, false);
+       }
+
+       if (redis_db) {
+               ucl_object_insert_key (obj, ucl_object_fromstring (redis_db),
+                               "redis_db", 0, false);
+       }
+
+       rspamadm_execute_lua_ucl_subr (L,
+                       argc,
+                       argv,
+                       obj,
+                       rspamadm_script_fuzzy_convert);
+
+       lua_close (L);
+       ucl_object_unref (obj);
+}
diff --git a/src/rspamadm/fuzzy_convert.lua b/src/rspamadm/fuzzy_convert.lua
new file mode 100644 (file)
index 0000000..53a4435
--- /dev/null
@@ -0,0 +1,198 @@
+local sqlite3 = require "rspamd_sqlite3"
+local redis = require "rspamd_redis"
+local util = require "rspamd_util"
+
+local function connect_redis(server, password, db)
+  local ret
+  local conn, err = redis.connect_sync({
+    host = server,
+  })
+
+  if not conn then
+    print('Cannot connect to ' .. server .. ' error: ' .. err)
+    return nil, err
+  end
+
+  if password then
+    ret = conn:add_cmd('AUTH', {password})
+    if not ret then
+      print('Cannot queue command. Error: ' .. err)
+      return nil, err
+    end
+  end
+  if db then
+    ret = conn:add_cmd('SELECT', {db})
+    if not ret then
+      print('Cannot queue command. Error: ' .. err)
+      return nil, err
+    end
+  end
+
+  if password or db then
+    ret, err = conn:exec()
+    if not ret then
+     print('Cannot send commands to ' .. server .. ' error: ' .. err)
+     return nil, err
+    end
+  end
+
+  return conn, nil
+end
+
+local function send_digests(digests, redis_host, redis_password, redis_db)
+  local conn, err = connect_redis(redis_host, redis_password, redis_db)
+  if not conn then return false end
+  local _, v
+  for _, v in ipairs(digests) do
+    local ret, err = conn:add_cmd('HMSET', {
+      'fuzzy' .. v[1],
+      'F', v[2],
+      'V', v[3],
+    })
+    if not ret then
+      print('Cannot batch HMSET command: ' .. err)
+      return false
+    end
+    ret, err = conn:add_cmd('EXPIRE', {
+      'fuzzy' .. v[1],
+      tostring(v[4]),
+    })
+    if not ret then
+      print('Cannot batch EXPIRE command: ' .. err)
+      return false
+    end
+  end
+  ret, err = conn:exec()
+  if not ret then
+    print('Cannot execute batched commands: ' .. err)
+    return false
+  end
+  return true
+end
+
+local function send_shingles(shingles, redis_host, redis_password, redis_db)
+  local conn, err = connect_redis(redis_host, redis_password, redis_db)
+  if not conn then return false end
+  local _, v
+  for _, v in ipairs(shingles) do
+    local ret, err = conn:add_cmd('SET', {
+      'fuzzy_' .. v[2] .. '_' .. v[1],
+      v[4],
+    })
+    if not ret then
+      print('Cannot batch SET command: ' .. err)
+      return false
+    end
+    ret, err = conn:add_cmd('EXPIRE', {
+      'fuzzy_' .. v[2] .. '_' .. v[1],
+      tostring(v[3]),
+    })
+    if not ret then
+      print('Cannot batch EXPIRE command: ' .. err)
+      return false
+    end
+  end
+  ret, err = conn:exec()
+  if not ret then
+    print('Cannot execute batched commands: ' .. err)
+    return false
+  end
+  return true
+end
+
+local function update_counters(total, redis_host, redis_password, redis_db)
+  local conn, err = connect_redis(redis_host, redis_password, redis_db)
+  if not conn then return false end
+  local ret, err = conn:add_cmd('SET', {
+    'fuzzylocal',
+    total,
+  })
+  if not ret then
+    print('Cannot batch SET command: ' .. err)
+    return false
+  end
+  local ret, err = conn:add_cmd('SET', {
+    'fuzzy_count',
+    total,
+  })
+  if not ret then
+    print('Cannot batch SET command: ' .. err)
+    return false
+  end
+  ret, err = conn:exec()
+  if not ret then
+    print('Cannot execute batched commands: ' .. err)
+    return false
+  end
+  return true
+end
+
+return function (args, res)
+  local db = sqlite3.open(res['source_db'])
+  local shingles = {}
+  local digests = {}
+  local num_batch_digests = 0
+  local num_batch_shingles = 0
+  local total_digests = 0
+  local total_shingles = 0
+  local lim_batch = 1000 -- Update each 1000 entries
+  local redis_password = res['redis_password']
+  local redis_db = nil
+
+  if res['redis_db'] then
+    redis_db = tostring(res['redis_db'])
+  end
+
+  if not db then
+    print('Cannot open source db: ' .. res['source_db'])
+    return
+  end
+
+  for row in db:rows('SELECT id, flag, digest, value, time FROM digests') do
+
+    local expire_in = math.floor(util:get_time() - row.time + res['expiry'])
+    if expire_in >= 1 then
+      table.insert(digests, {row.digest, row.flag, row.value, expire_in})
+      num_batch_digests = num_batch_digests + 1
+      total_digests = total_digests + 1
+      for srow in db:rows('SELECT value, number FROM shingles WHERE digest_id = ' .. row.id) do
+        table.insert(shingles, {srow.value, srow.number, expire_in, row.digest})
+        total_shingles = total_shingles + 1
+        num_batch_shingles = num_batch_shingles + 1
+      end
+    end
+    if num_batch_digests >= lim_batch then
+      if not send_digests(digests, res['redis_host'], redis_password, redis_db) then
+        return
+      end
+      num_batch_digests = 0
+      digests = {}
+    end
+    if num_batch_shingles >= lim_batch then
+      if not send_shingles(shingles, res['redis_host'], redis_password, redis_db) then
+        return
+      end
+      num_batch_shingles = 0
+      shingles = {}
+    end
+  end
+  if digests[1] then
+    if not send_digests(digests, res['redis_host'], redis_password, redis_db) then
+      return
+    end
+  end
+  if shingles[1] then
+    if not send_shingles(shingles, res['redis_host'], redis_password, redis_db) then
+      return
+    end
+  end
+
+  local message = string.format(
+    'Migrated %d digests and %d shingles',
+    total_digests, total_shingles
+  )
+  if not update_counters(total_digests, res['redis_host'], redis_password, redis_db) then
+    message = message .. ' but failed to update counters'
+  end
+  print(message)
+end