]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Implement Clickhouse migrations
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 8 Aug 2018 12:31:13 +0000 (13:31 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 8 Aug 2018 13:01:37 +0000 (14:01 +0100)
lualib/lua_clickhouse.lua
src/plugins/lua/clickhouse.lua

index 3f3c4de4045ea488f5d9b4ea222919b997f10f9b..01d1f0b2bb2386445d15691136cebf3c6b38eb62 100644 (file)
@@ -76,7 +76,6 @@ local function parse_clickhouse_response(params, data)
   local lua_util = require "lua_util"
   local ucl = require "ucl"
 
-  rspamd_logger.debugm(N, params.log_obj, "got clickhouse response: %s", data)
   if data == nil then
     -- clickhouse returned no data (i.e. empty result set): exiting
     return {}
@@ -113,27 +112,35 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
     if code ~= 200 or err_message then
       if not err_message then err_message = data end
       local ip_addr = upstream:get_addr():to_string(true)
-      rspamd_logger.errx(params.log_obj,
-          "request failed on clickhouse server %s: %s",
-          ip_addr, err_message)
 
       if fail_cb then
         fail_cb(params, err_message, data)
+      else
+        rspamd_logger.errx(params.log_obj,
+            "request failed on clickhouse server %s: %s",
+            ip_addr, err_message)
       end
       upstream:fail()
     else
       upstream:ok()
-      rspamd_logger.debugm(N, params.log_obj,
-          "http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
       local rows = parse_clickhouse_response(params, data)
 
       if rows then
         if ok_cb then
           ok_cb(params, rows)
+        else
+          rspamd_logger.debugm(N, params.log_obj,
+              "http_select_cb ok: %s, %s, %s, %s", err_message, code,
+              data:gsub('[\n%s]+', ' '), _)
         end
       else
         if fail_cb then
           fail_cb(params, 'failed to parse reply', data)
+        else
+          local ip_addr = upstream:get_addr():to_string(true)
+          rspamd_logger.errx(params.log_obj,
+            "request failed on clickhouse server %s: %s",
+            ip_addr, 'failed to parse reply')
         end
       end
     end
@@ -148,21 +155,24 @@ local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
     if code ~= 200 or err_message then
       if not err_message then err_message = data end
       local ip_addr = upstream:get_addr():to_string(true)
-      rspamd_logger.errx(params.log_obj,
-          "request failed on clickhouse server %s: %s",
-          ip_addr, err_message)
 
       if fail_cb then
         fail_cb(params, err_message, data)
+      else
+        rspamd_logger.errx(params.log_obj,
+            "request failed on clickhouse server %s: %s",
+            ip_addr, err_message)
       end
       upstream:fail()
     else
       upstream:ok()
-      rspamd_logger.debugm(N, params.log_obj,
-          "http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
 
       if ok_cb then
         ok_cb(params, data)
+      else
+        rspamd_logger.debugm(N, params.log_obj,
+            "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
+            data:gsub('[\n%s]+', ' '), _)
       end
     end
   end
@@ -204,7 +214,7 @@ exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
   http_params.body = query
   http_params.log_obj = params.task or params.config
 
-  rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", params.body)
+  rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
 
   if not http_params.url then
     local connect_prefix = "http://"
@@ -306,6 +316,7 @@ exports.generic = function (upstream, settings, params, query,
   http_params.user = settings.user
   http_params.password = settings.password
   http_params.log_obj = params.task or params.config
+  http_params.body = query
 
   if not http_params.url then
     local connect_prefix = "http://"
index 444465814b69363f8a9d0a00e252b853c78bffbb..54128235b520d4be2c778ee8a19873106c32ffe7 100644 (file)
@@ -37,6 +37,7 @@ local asn_rows = {}
 local symbols_rows = {}
 local custom_rows = {}
 local nrows = 0
+local schema_version = 2 -- Current schema version
 local connect_prefix = 'http://'
 
 local settings = {
@@ -52,12 +53,6 @@ local settings = {
   dmarc_allow_symbols = {'DMARC_POLICY_ALLOW'},
   dmarc_reject_symbols = {'DMARC_POLICY_REJECT', 'DMARC_POLICY_QUARANTINE'},
   stop_symbols = {},
-  table = 'rspamd',
-  attachments_table = 'rspamd_attachments',
-  urls_table = 'rspamd_urls',
-  emails_table = 'rspamd_emails',
-  symbols_table = 'rspamd_symbols',
-  asn_table = 'rspamd_asn',
   ipmask = 19,
   ipmask6 = 48,
   full_urls = false,
@@ -78,9 +73,9 @@ local settings = {
   }
 }
 
-local clickhouse_schema = {
-table = [[
-CREATE TABLE IF NOT EXISTS ${table}
+--- @language SQL
+local clickhouse_schema = {[[
+CREATE TABLE rspamd
 (
     Date Date,
     TS DateTime,
@@ -103,59 +98,51 @@ CREATE TABLE IF NOT EXISTS ${table}
     RcptUser String,
     RcptDomain String,
     ListId String,
-    Digest FixedString(32)
-) ENGINE = MergeTree(Date, (TS, From), 8192)
-]],
-
-attachments_table = [[
-CREATE TABLE IF NOT EXISTS ${attachments_table} (
-    Date Date,
-    Digest FixedString(32),
     `Attachments.FileName` Array(String),
     `Attachments.ContentType` Array(String),
     `Attachments.Length` Array(UInt32),
-    `Attachments.Digest` Array(FixedString(16))
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-urls_table = [[
-CREATE TABLE IF NOT EXISTS ${urls_table} (
-    Date Date,
-    Digest FixedString(32),
+    `Attachments.Digest` Array(FixedString(16)),
     `Urls.Tld` Array(String),
-    `Urls.Url` Array(String)
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-emails_table = [[
-CREATE TABLE IF NOT EXISTS ${emails_table} (
-    Date Date,
-    Digest FixedString(32),
-    Emails Array(String)
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-asn_table = [[
-CREATE TABLE IF NOT EXISTS ${asn_table} (
-    Date Date,
-    Digest FixedString(32),
+    `Urls.Url` Array(String),
+    Emails Array(String),
     ASN String,
     Country FixedString(2),
-    IPNet String
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-symbols_table = [[
-CREATE TABLE IF NOT EXISTS ${symbols_table} (
-    Date Date,
-    Digest FixedString(32),
+    IPNet String,
     `Symbols.Names` Array(String),
     `Symbols.Scores` Array(Float64),
-    `Symbols.Options` Array(String)
-) ENGINE = MergeTree(Date, Digest, 8192)
-]]
+    `Symbols.Options` Array(String),
+    Digest FixedString(32)
+) ENGINE = MergeTree(Date, (TS, From), 8192)
+]],
+[[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]],
+[[INSERT INTO rspamd_version (Version) Values (2)]],
 }
 
+-- This describes SQL queries to migrate between versions
+local migrations = {
+  [1] = {
+    -- Move to a wide fat table
+    [[ALTER TABLE rspamd
+      ADD COLUMN `Attachments.FileName` Array(String) AFTER ListId,
+      ADD COLUMN `Attachments.ContentType` Array(String) AFTER `Attachments.FileName`,
+      ADD COLUMN `Attachments.Length` Array(UInt32) AFTER `Attachments.ContentType`,
+      ADD COLUMN `Attachments.Digest` Array(FixedString(16)) AFTER `Attachments.Length`,
+      ADD COLUMN `Urls.Tld` Array(String) AFTER `Attachments.Digest`,
+      ADD COLUMN `Urls.Url` Array(String) AFTER `Urls.Tld`,
+      ADD COLUMN Emails Array(String) AFTER `Urls.Url`,
+      ADD COLUMN ASN String AFTER Emails,
+      ADD COLUMN Country FixedString(2) AFTER ASN,
+      ADD COLUMN IPNet String AFTER Country,
+      ADD COLUMN `Symbols.Names` Array(String) AFTER IPNet,
+      ADD COLUMN `Symbols.Scores` Array(Float64) AFTER `Symbols.Names`,
+      ADD COLUMN `Symbols.Options` Array(String) AFTER `Symbols.Scores`]],
+    -- Add explicit version
+    [[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]],
+    [[INSERT INTO rspamd_version (Version) Values (2)]],
+  }
+}
+
+
 local function clickhouse_main_row(tname)
   local fields = {
     'Date',
@@ -803,6 +790,163 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
   return settings.retention.period
 end
 
+local function upload_clickhouse_schema(upstream, ev_base, cfg)
+  local ch_params = {
+    ev_base = ev_base,
+    config = cfg,
+  }
+  -- Apply schema sequentially
+  local function sql_recursor(i)
+    if clickhouse_schema[i] then
+      local sql = clickhouse_schema[i]
+      local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
+          function(_, _)
+            rspamd_logger.infox(rspamd_config, 'uploaded clickhouse schema element %s to %s',
+                i, upstream:get_addr():to_string(true))
+            sql_recursor(i + 1)
+          end,
+          function(_, err)
+            rspamd_logger.errx(rspamd_config,
+                "cannot upload schema '%s' on clickhouse server %s: %s",
+                sql, upstream:get_addr():to_string(true), err)
+          end)
+      if not ret then
+        rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: cannot make request",
+            sql, upstream:get_addr():to_string(true))
+      end
+    end
+  end
+
+  sql_recursor(1)
+end
+
+local function maybe_apply_migrations(upstream, ev_base, cfg, version)
+  local ch_params = {
+    ev_base = ev_base,
+    config = cfg,
+  }
+  -- Apply migrations sequentially
+  local function migration_recursor(i)
+    if i < schema_version  then
+      if migrations[i] then
+        -- We also need to apply statements sequentially
+        local function sql_recursor(j)
+          if migrations[i][j] then
+            local sql = migrations[i][j]
+            local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
+                function(_, _)
+                  rspamd_logger.infox(rspamd_config,
+                      'applied migration to version %s from version %s: %s',
+                      i + 1, version, sql:gsub('[\n%s]+', ' '))
+                  if j == #migrations[i] then
+                    -- Go to the next migration
+                    migration_recursor(i + 1)
+                  else
+                    -- Apply the next statement
+                    sql_recursor(j + 1)
+                  end
+                end ,
+                function(_, err)
+                  rspamd_logger.errx(rspamd_config,
+                      "cannot apply migration %s: '%s' on clickhouse server %s: %s",
+                      i, sql, upstream:get_addr():to_string(true), err)
+                end)
+            if not ret then
+              rspamd_logger.errx(rspamd_config,
+                  "cannot apply migration %s: '%s' on clickhouse server %s: cannot make request",
+                  i, sql, upstream:get_addr():to_string(true))
+            end
+          end
+        end
+
+        sql_recursor(1)
+      else
+        -- Try another migration
+        migration_recursor(i + 1)
+      end
+    end
+  end
+
+  migration_recursor(version)
+end
+
+local function check_rspamd_table(upstream, ev_base, cfg)
+  local ch_params = {
+    ev_base = ev_base,
+    config = cfg,
+  }
+  local sql = [[EXISTS TABLE rspamd]]
+  local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
+      function(_, rows)
+        if rows[1] and rows[1].result then
+          if tonumber(rows[1].result) == 1 then
+            -- Apply migration
+            rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration')
+            maybe_apply_migrations(upstream, ev_base, cfg, 1)
+          else
+            -- Upload schema
+            rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema')
+            upload_clickhouse_schema(upstream, ev_base, cfg)
+          end
+        else
+          rspamd_logger.errx(rspamd_config,
+              "unexpected reply on EXISTS command from server %s: %s",
+              upstream:get_addr():to_string(true), rows)
+        end
+      end ,
+      function(_, err)
+        rspamd_logger.errx(rspamd_config,
+            "cannot check if rspamd table exists on clickhouse server %s: %s",
+            upstream:get_addr():to_string(true), err)
+      end)
+  if not ret then
+    rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: cannot make request",
+        upstream:get_addr():to_string(true))
+  end
+end
+
+
+local function check_clickhouse_upstream(upstream, ev_base, cfg)
+  local ch_params = {
+    ev_base = ev_base,
+    config = cfg,
+  }
+  -- If we have some custom rules, we just send its schema to the upstream
+  for k,rule in pairs(settings.custom_rules) do
+    if rule.schema then
+      local sql = rspamd_lua_utils.template(rule.schema, settings)
+      local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
+          nil,
+          function(_, err)
+            rspamd_logger.errx(rspamd_config,
+                "cannot send custom schema %s to clickhouse server %s: %s",
+                k, upstream:get_addr():to_string(true), err)
+          end)
+      if not ret then
+        rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request",
+            k, upstream:get_addr():to_string(true))
+      end
+    end
+  end
+
+  -- Now check the main schema and apply migrations if needed
+  local sql = [[SELECT MAX(Version) as v FROM rspamd_version]]
+  local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
+      function(_, rows)
+        local version = tonumber(rows[1].v)
+        maybe_apply_migrations(upstream, ev_base, cfg, version)
+      end,
+      function(_, err)
+        -- It might be either no rspamd table or version 1
+        rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table')
+        check_rspamd_table(upstream, ev_base, cfg)
+      end)
+  if not ret then
+    rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request",
+        k, upstream:get_addr():to_string(true))
+  end
+end
+
 local opts = rspamd_config:get_all_opt('clickhouse')
 if opts then
     for k,v in pairs(opts) do
@@ -888,46 +1032,7 @@ if opts then
           local upstreams = settings.upstream:all_upstreams()
 
           for _,up in ipairs(upstreams) do
-            local ip_addr = up:get_addr():to_string(true)
-
-            local function send_req(elt, sql)
-              local function http_cb(err_message, code, data, _)
-                if code ~= 200 or err_message then
-                  if not err_message then err_message = data end
-                  rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: %s",
-                      elt, ip_addr, err_message)
-                  up:fail()
-                else
-                  up:ok()
-                end
-              end
-
-              if not rspamd_http.request({
-                ev_base = ev_base,
-                config = cfg,
-                url = connect_prefix .. ip_addr,
-                body = sql,
-                callback = http_cb,
-                mime_type = 'text/plain',
-                timeout = settings['timeout'],
-                no_ssl_verify = settings.no_ssl_verify,
-                user = settings.user,
-                password = settings.password,
-              }) then
-                rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request",
-                    elt, ip_addr)
-              end
-            end
-
-            for tab,sql in pairs(clickhouse_schema) do
-              send_req(tab, rspamd_lua_utils.template(sql, settings))
-            end
-
-            for k,rule in pairs(settings.custom_rules) do
-              if rule.schema then
-                send_req(k, rspamd_lua_utils.template(rule.schema, settings))
-              end
-            end
+            check_clickhouse_upstream(up, ev_base, cfg)
           end
 
           if settings.retention.enable and settings.retention.method ~= 'drop' and settings.retention.method ~= 'detach' then