diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2020-04-16 17:27:47 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2020-04-16 17:27:47 +0100 |
commit | 4efc7e1df9863dae9314a9914fcdf5fe47d8d258 (patch) | |
tree | a8e7a32b24b29a38773066dec2528d48add7201c | |
parent | c6822d66ed9825d317f6e59526d35974ccdaa1b2 (diff) | |
download | rspamd-4efc7e1df9863dae9314a9914fcdf5fe47d8d258.tar.gz rspamd-4efc7e1df9863dae9314a9914fcdf5fe47d8d258.zip |
[Minor] Clickhouse: Rework schema upload to make it more resilent
-rw-r--r-- | src/plugins/lua/clickhouse.lua | 53 |
1 files changed, 39 insertions, 14 deletions
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index 5cce0a442..06edd33b2 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -97,7 +97,7 @@ local settings = { --- @language SQL local clickhouse_schema = {[[ -CREATE TABLE rspamd +CREATE TABLE IF NOT EXISTS rspamd ( Date Date COMMENT 'Date (used for partitioning)', TS DateTime COMMENT 'Date and time of the request start (UTC)', @@ -155,8 +155,8 @@ CREATE TABLE rspamd PARTITION BY toMonday(Date) ORDER BY TS ]], -[[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]], -[[INSERT INTO rspamd_version (Version) Values (${SCHEMA_VERSION})]], +[[CREATE TABLE IF NOT EXISTS rspamd_version ( Version UInt32) ENGINE = TinyLog]], +{[[INSERT INTO rspamd_version (Version) Values (${SCHEMA_VERSION})]], true}, } -- This describes SQL queries to migrate between versions @@ -1046,7 +1046,7 @@ local function clickhouse_remove_old_partitions(cfg, ev_base) return settings.retention.period end -local function upload_clickhouse_schema(upstream, ev_base, cfg) +local function upload_clickhouse_schema(upstream, ev_base, cfg, initial) local ch_params = { ev_base = ev_base, config = cfg, @@ -1054,8 +1054,8 @@ local function upload_clickhouse_schema(upstream, ev_base, cfg) local errored = false - -- Apply schema sequentially - fun.each(function(v) + -- Upload a single element of the schema + local function upload_schema_elt(v) if errored then rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: due to previous errors", v, upstream:get_addr():to_string(true)) @@ -1066,17 +1066,40 @@ local function upload_clickhouse_schema(upstream, ev_base, cfg) if err then rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: %s", - sql, upstream:get_addr():to_string(true), err) + sql, upstream:get_addr():to_string(true), err) errored = true return end rspamd_logger.debugm(N, rspamd_config, 'uploaded clickhouse schema element %s to %s: %s', v, upstream:get_addr():to_string(true), reply) - end, - -- Also template schema version - fun.map(function(v) - return lua_util.template(v, {SCHEMA_VERSION = tostring(schema_version)}) - end, fun.chain(clickhouse_schema, settings.schema_additions))) + end + + -- Process element and return nil if statement should be skipped + local function preprocess_schema_elt(v) + if type(v) == 'string' then + return lua_util.template(v, {SCHEMA_VERSION = tostring(schema_version)}) + elseif type(v) == 'table' then + -- Pair of statement + boolean + if initial == v[2] then + return lua_util.template(v[1], {SCHEMA_VERSION = tostring(schema_version)}) + else + rspamd_logger.debugm(N, rspamd_config, 'skip clickhouse schema element %s: schema already exists', + v) + end + end + + return nil + end + + -- Apply schema elements sequentially, users additions are concatenated to the tail + fun.each(upload_schema_elt, + -- Also template schema version + fun.filter(function(v) return v ~= nil end, + fun.map(preprocess_schema_elt, + fun.chain(clickhouse_schema, settings.schema_additions) + ) + ) + ) end local function maybe_apply_migrations(upstream, ev_base, cfg, version) @@ -1190,12 +1213,13 @@ local function check_rspamd_table(upstream, ev_base, cfg) if rows[1] and rows[1].result then if tonumber(rows[1].result) == 1 then -- Apply migration - rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration') + upload_clickhouse_schema(upstream, ev_base, cfg, false) + rspamd_logger.infox(rspamd_config, 'table rspamd exists, check if we need to apply migrations') maybe_apply_migrations(upstream, ev_base, cfg, 1) else -- Upload schema rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema') - upload_clickhouse_schema(upstream, ev_base, cfg) + upload_clickhouse_schema(upstream, ev_base, cfg, true) end else rspamd_logger.errx(rspamd_config, @@ -1237,6 +1261,7 @@ local function check_clickhouse_upstream(upstream, ev_base, cfg) upstream:get_addr():to_string(true), err) end else + upload_clickhouse_schema(upstream, ev_base, cfg, false) local version = tonumber(rows[1].v) maybe_apply_migrations(upstream, ev_base, cfg, version) end |