aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2020-04-16 17:27:47 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2020-04-16 17:27:47 +0100
commit4efc7e1df9863dae9314a9914fcdf5fe47d8d258 (patch)
treea8e7a32b24b29a38773066dec2528d48add7201c
parentc6822d66ed9825d317f6e59526d35974ccdaa1b2 (diff)
downloadrspamd-4efc7e1df9863dae9314a9914fcdf5fe47d8d258.tar.gz
rspamd-4efc7e1df9863dae9314a9914fcdf5fe47d8d258.zip
[Minor] Clickhouse: Rework schema upload to make it more resilent
-rw-r--r--src/plugins/lua/clickhouse.lua53
1 files changed, 39 insertions, 14 deletions
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
index 5cce0a442..06edd33b2 100644
--- a/src/plugins/lua/clickhouse.lua
+++ b/src/plugins/lua/clickhouse.lua
@@ -97,7 +97,7 @@ local settings = {
--- @language SQL
local clickhouse_schema = {[[
-CREATE TABLE rspamd
+CREATE TABLE IF NOT EXISTS rspamd
(
Date Date COMMENT 'Date (used for partitioning)',
TS DateTime COMMENT 'Date and time of the request start (UTC)',
@@ -155,8 +155,8 @@ CREATE TABLE rspamd
PARTITION BY toMonday(Date)
ORDER BY TS
]],
-[[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]],
-[[INSERT INTO rspamd_version (Version) Values (${SCHEMA_VERSION})]],
+[[CREATE TABLE IF NOT EXISTS rspamd_version ( Version UInt32) ENGINE = TinyLog]],
+{[[INSERT INTO rspamd_version (Version) Values (${SCHEMA_VERSION})]], true},
}
-- This describes SQL queries to migrate between versions
@@ -1046,7 +1046,7 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
return settings.retention.period
end
-local function upload_clickhouse_schema(upstream, ev_base, cfg)
+local function upload_clickhouse_schema(upstream, ev_base, cfg, initial)
local ch_params = {
ev_base = ev_base,
config = cfg,
@@ -1054,8 +1054,8 @@ local function upload_clickhouse_schema(upstream, ev_base, cfg)
local errored = false
- -- Apply schema sequentially
- fun.each(function(v)
+ -- Upload a single element of the schema
+ local function upload_schema_elt(v)
if errored then
rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: due to previous errors",
v, upstream:get_addr():to_string(true))
@@ -1066,17 +1066,40 @@ local function upload_clickhouse_schema(upstream, ev_base, cfg)
if err then
rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: %s",
- sql, upstream:get_addr():to_string(true), err)
+ sql, upstream:get_addr():to_string(true), err)
errored = true
return
end
rspamd_logger.debugm(N, rspamd_config, 'uploaded clickhouse schema element %s to %s: %s',
v, upstream:get_addr():to_string(true), reply)
- end,
- -- Also template schema version
- fun.map(function(v)
- return lua_util.template(v, {SCHEMA_VERSION = tostring(schema_version)})
- end, fun.chain(clickhouse_schema, settings.schema_additions)))
+ end
+
+ -- Process element and return nil if statement should be skipped
+ local function preprocess_schema_elt(v)
+ if type(v) == 'string' then
+ return lua_util.template(v, {SCHEMA_VERSION = tostring(schema_version)})
+ elseif type(v) == 'table' then
+ -- Pair of statement + boolean
+ if initial == v[2] then
+ return lua_util.template(v[1], {SCHEMA_VERSION = tostring(schema_version)})
+ else
+ rspamd_logger.debugm(N, rspamd_config, 'skip clickhouse schema element %s: schema already exists',
+ v)
+ end
+ end
+
+ return nil
+ end
+
+ -- Apply schema elements sequentially, users additions are concatenated to the tail
+ fun.each(upload_schema_elt,
+ -- Also template schema version
+ fun.filter(function(v) return v ~= nil end,
+ fun.map(preprocess_schema_elt,
+ fun.chain(clickhouse_schema, settings.schema_additions)
+ )
+ )
+ )
end
local function maybe_apply_migrations(upstream, ev_base, cfg, version)
@@ -1190,12 +1213,13 @@ local function check_rspamd_table(upstream, ev_base, cfg)
if rows[1] and rows[1].result then
if tonumber(rows[1].result) == 1 then
-- Apply migration
- rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration')
+ upload_clickhouse_schema(upstream, ev_base, cfg, false)
+ rspamd_logger.infox(rspamd_config, 'table rspamd exists, check if we need to apply migrations')
maybe_apply_migrations(upstream, ev_base, cfg, 1)
else
-- Upload schema
rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema')
- upload_clickhouse_schema(upstream, ev_base, cfg)
+ upload_clickhouse_schema(upstream, ev_base, cfg, true)
end
else
rspamd_logger.errx(rspamd_config,
@@ -1237,6 +1261,7 @@ local function check_clickhouse_upstream(upstream, ev_base, cfg)
upstream:get_addr():to_string(true), err)
end
else
+ upload_clickhouse_schema(upstream, ev_base, cfg, false)
local version = tonumber(rows[1].v)
maybe_apply_migrations(upstream, ev_base, cfg, version)
end