]> source.dussan.org Git - rspamd.git/commitdiff
[Minor] Clickhouse: Rework schema upload to make it more resilent
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 16 Apr 2020 16:27:47 +0000 (17:27 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 16 Apr 2020 16:27:47 +0000 (17:27 +0100)
src/plugins/lua/clickhouse.lua

index 5cce0a442c1b15f46167b1953f8f16b2258230ff..06edd33b27a53611fa18808122a56202ce447bd2 100644 (file)
@@ -97,7 +97,7 @@ local settings = {
 
 --- @language SQL
 local clickhouse_schema = {[[
-CREATE TABLE rspamd
+CREATE TABLE IF NOT EXISTS rspamd
 (
     Date Date COMMENT 'Date (used for partitioning)',
     TS DateTime COMMENT 'Date and time of the request start (UTC)',
@@ -155,8 +155,8 @@ CREATE TABLE rspamd
 PARTITION BY toMonday(Date)
 ORDER BY TS
 ]],
-[[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]],
-[[INSERT INTO rspamd_version (Version) Values (${SCHEMA_VERSION})]],
+[[CREATE TABLE IF NOT EXISTS rspamd_version ( Version UInt32) ENGINE = TinyLog]],
+{[[INSERT INTO rspamd_version (Version) Values (${SCHEMA_VERSION})]], true},
 }
 
 -- This describes SQL queries to migrate between versions
@@ -1046,7 +1046,7 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
   return settings.retention.period
 end
 
-local function upload_clickhouse_schema(upstream, ev_base, cfg)
+local function upload_clickhouse_schema(upstream, ev_base, cfg, initial)
   local ch_params = {
     ev_base = ev_base,
     config = cfg,
@@ -1054,8 +1054,8 @@ local function upload_clickhouse_schema(upstream, ev_base, cfg)
 
   local errored = false
 
-  -- Apply schema sequentially
-  fun.each(function(v)
+  -- Upload a single element of the schema
+  local function upload_schema_elt(v)
     if errored then
       rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: due to previous errors",
           v, upstream:get_addr():to_string(true))
@@ -1066,17 +1066,40 @@ local function upload_clickhouse_schema(upstream, ev_base, cfg)
 
     if err then
       rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: %s",
-        sql, upstream:get_addr():to_string(true), err)
+          sql, upstream:get_addr():to_string(true), err)
       errored = true
       return
     end
     rspamd_logger.debugm(N, rspamd_config, 'uploaded clickhouse schema element %s to %s: %s',
         v, upstream:get_addr():to_string(true), reply)
-  end,
-      -- Also template schema version
-      fun.map(function(v)
-        return lua_util.template(v, {SCHEMA_VERSION = tostring(schema_version)})
-      end, fun.chain(clickhouse_schema, settings.schema_additions)))
+  end
+
+  -- Process element and return nil if statement should be skipped
+  local function preprocess_schema_elt(v)
+    if type(v) == 'string' then
+      return lua_util.template(v, {SCHEMA_VERSION = tostring(schema_version)})
+    elseif type(v) == 'table' then
+      -- Pair of statement + boolean
+      if initial == v[2] then
+        return lua_util.template(v[1], {SCHEMA_VERSION = tostring(schema_version)})
+      else
+        rspamd_logger.debugm(N, rspamd_config, 'skip clickhouse schema element %s: schema already exists',
+            v)
+      end
+    end
+
+    return nil
+  end
+
+  -- Apply schema elements sequentially, users additions are concatenated to the tail
+  fun.each(upload_schema_elt,
+    -- Also template schema version
+    fun.filter(function(v) return v ~= nil end,
+      fun.map(preprocess_schema_elt,
+        fun.chain(clickhouse_schema, settings.schema_additions)
+      )
+    )
+  )
 end
 
 local function maybe_apply_migrations(upstream, ev_base, cfg, version)
@@ -1190,12 +1213,13 @@ local function check_rspamd_table(upstream, ev_base, cfg)
   if rows[1] and rows[1].result then
     if tonumber(rows[1].result) == 1 then
       -- Apply migration
-      rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration')
+      upload_clickhouse_schema(upstream, ev_base, cfg, false)
+      rspamd_logger.infox(rspamd_config, 'table rspamd exists, check if we need to apply migrations')
       maybe_apply_migrations(upstream, ev_base, cfg, 1)
     else
       -- Upload schema
       rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema')
-      upload_clickhouse_schema(upstream, ev_base, cfg)
+      upload_clickhouse_schema(upstream, ev_base, cfg, true)
     end
   else
     rspamd_logger.errx(rspamd_config,
@@ -1237,6 +1261,7 @@ local function check_clickhouse_upstream(upstream, ev_base, cfg)
         upstream:get_addr():to_string(true), err)
     end
   else
+    upload_clickhouse_schema(upstream, ev_base, cfg, false)
     local version = tonumber(rows[1].v)
     maybe_apply_migrations(upstream, ev_base, cfg, version)
   end