--- @language SQL
local clickhouse_schema = {[[
-CREATE TABLE rspamd
+CREATE TABLE IF NOT EXISTS rspamd
(
Date Date COMMENT 'Date (used for partitioning)',
TS DateTime COMMENT 'Date and time of the request start (UTC)',
PARTITION BY toMonday(Date)
ORDER BY TS
]],
-[[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]],
-[[INSERT INTO rspamd_version (Version) Values (${SCHEMA_VERSION})]],
+[[CREATE TABLE IF NOT EXISTS rspamd_version ( Version UInt32) ENGINE = TinyLog]],
+{[[INSERT INTO rspamd_version (Version) Values (${SCHEMA_VERSION})]], true},
}
-- This describes SQL queries to migrate between versions
return settings.retention.period
end
-local function upload_clickhouse_schema(upstream, ev_base, cfg)
+local function upload_clickhouse_schema(upstream, ev_base, cfg, initial)
local ch_params = {
ev_base = ev_base,
config = cfg,
local errored = false
- -- Apply schema sequentially
- fun.each(function(v)
+ -- Upload a single element of the schema
+ local function upload_schema_elt(v)
if errored then
rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: due to previous errors",
v, upstream:get_addr():to_string(true))
if err then
rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: %s",
- sql, upstream:get_addr():to_string(true), err)
+ sql, upstream:get_addr():to_string(true), err)
errored = true
return
end
rspamd_logger.debugm(N, rspamd_config, 'uploaded clickhouse schema element %s to %s: %s',
v, upstream:get_addr():to_string(true), reply)
- end,
- -- Also template schema version
- fun.map(function(v)
- return lua_util.template(v, {SCHEMA_VERSION = tostring(schema_version)})
- end, fun.chain(clickhouse_schema, settings.schema_additions)))
+ end
+
+ -- Process element and return nil if statement should be skipped
+ local function preprocess_schema_elt(v)
+ if type(v) == 'string' then
+ return lua_util.template(v, {SCHEMA_VERSION = tostring(schema_version)})
+ elseif type(v) == 'table' then
+ -- Pair of statement + boolean
+ if initial == v[2] then
+ return lua_util.template(v[1], {SCHEMA_VERSION = tostring(schema_version)})
+ else
+ rspamd_logger.debugm(N, rspamd_config, 'skip clickhouse schema element %s: schema already exists',
+ v)
+ end
+ end
+
+ return nil
+ end
+
+ -- Apply schema elements sequentially, users additions are concatenated to the tail
+ fun.each(upload_schema_elt,
+ -- Also template schema version
+ fun.filter(function(v) return v ~= nil end,
+ fun.map(preprocess_schema_elt,
+ fun.chain(clickhouse_schema, settings.schema_additions)
+ )
+ )
+ )
end
local function maybe_apply_migrations(upstream, ev_base, cfg, version)
if rows[1] and rows[1].result then
if tonumber(rows[1].result) == 1 then
-- Apply migration
- rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration')
+ upload_clickhouse_schema(upstream, ev_base, cfg, false)
+ rspamd_logger.infox(rspamd_config, 'table rspamd exists, check if we need to apply migrations')
maybe_apply_migrations(upstream, ev_base, cfg, 1)
else
-- Upload schema
rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema')
- upload_clickhouse_schema(upstream, ev_base, cfg)
+ upload_clickhouse_schema(upstream, ev_base, cfg, true)
end
else
rspamd_logger.errx(rspamd_config,
upstream:get_addr():to_string(true), err)
end
else
+ upload_clickhouse_schema(upstream, ev_base, cfg, false)
local version = tonumber(rows[1].v)
maybe_apply_migrations(upstream, ev_base, cfg, version)
end