From: Vsevolod Stakhov Date: Wed, 8 Aug 2018 12:31:13 +0000 (+0100) Subject: [Project] Implement Clickhouse migrations X-Git-Tag: 1.8.0~296 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=dfac7cd80b6050cfb291e57be2e9ee44c6a68e30;p=rspamd.git [Project] Implement Clickhouse migrations --- diff --git a/lualib/lua_clickhouse.lua b/lualib/lua_clickhouse.lua index 3f3c4de40..01d1f0b2b 100644 --- a/lualib/lua_clickhouse.lua +++ b/lualib/lua_clickhouse.lua @@ -76,7 +76,6 @@ local function parse_clickhouse_response(params, data) local lua_util = require "lua_util" local ucl = require "ucl" - rspamd_logger.debugm(N, params.log_obj, "got clickhouse response: %s", data) if data == nil then -- clickhouse returned no data (i.e. empty result set): exiting return {} @@ -113,27 +112,35 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb) if code ~= 200 or err_message then if not err_message then err_message = data end local ip_addr = upstream:get_addr():to_string(true) - rspamd_logger.errx(params.log_obj, - "request failed on clickhouse server %s: %s", - ip_addr, err_message) if fail_cb then fail_cb(params, err_message, data) + else + rspamd_logger.errx(params.log_obj, + "request failed on clickhouse server %s: %s", + ip_addr, err_message) end upstream:fail() else upstream:ok() - rspamd_logger.debugm(N, params.log_obj, - "http_cb ok: %s, %s, %s, %s", err_message, code, data, _) local rows = parse_clickhouse_response(params, data) if rows then if ok_cb then ok_cb(params, rows) + else + rspamd_logger.debugm(N, params.log_obj, + "http_select_cb ok: %s, %s, %s, %s", err_message, code, + data:gsub('[\n%s]+', ' '), _) end else if fail_cb then fail_cb(params, 'failed to parse reply', data) + else + local ip_addr = upstream:get_addr():to_string(true) + rspamd_logger.errx(params.log_obj, + "request failed on clickhouse server %s: %s", + ip_addr, 'failed to parse reply') end end end @@ -148,21 +155,24 @@ local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb) if code ~= 200 or err_message then if not err_message then err_message = data end local ip_addr = upstream:get_addr():to_string(true) - rspamd_logger.errx(params.log_obj, - "request failed on clickhouse server %s: %s", - ip_addr, err_message) if fail_cb then fail_cb(params, err_message, data) + else + rspamd_logger.errx(params.log_obj, + "request failed on clickhouse server %s: %s", + ip_addr, err_message) end upstream:fail() else upstream:ok() - rspamd_logger.debugm(N, params.log_obj, - "http_cb ok: %s, %s, %s, %s", err_message, code, data, _) if ok_cb then ok_cb(params, data) + else + rspamd_logger.debugm(N, params.log_obj, + "http_insert_cb ok: %s, %s, %s, %s", err_message, code, + data:gsub('[\n%s]+', ' '), _) end end end @@ -204,7 +214,7 @@ exports.select = function (upstream, settings, params, query, ok_cb, fail_cb) http_params.body = query http_params.log_obj = params.task or params.config - rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", params.body) + rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body) if not http_params.url then local connect_prefix = "http://" @@ -306,6 +316,7 @@ exports.generic = function (upstream, settings, params, query, http_params.user = settings.user http_params.password = settings.password http_params.log_obj = params.task or params.config + http_params.body = query if not http_params.url then local connect_prefix = "http://" diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index 444465814..54128235b 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -37,6 +37,7 @@ local asn_rows = {} local symbols_rows = {} local custom_rows = {} local nrows = 0 +local schema_version = 2 -- Current schema version local connect_prefix = 'http://' local settings = { @@ -52,12 +53,6 @@ local settings = { dmarc_allow_symbols = {'DMARC_POLICY_ALLOW'}, dmarc_reject_symbols = {'DMARC_POLICY_REJECT', 'DMARC_POLICY_QUARANTINE'}, stop_symbols = {}, - table = 'rspamd', - attachments_table = 'rspamd_attachments', - urls_table = 'rspamd_urls', - emails_table = 'rspamd_emails', - symbols_table = 'rspamd_symbols', - asn_table = 'rspamd_asn', ipmask = 19, ipmask6 = 48, full_urls = false, @@ -78,9 +73,9 @@ local settings = { } } -local clickhouse_schema = { -table = [[ -CREATE TABLE IF NOT EXISTS ${table} +--- @language SQL +local clickhouse_schema = {[[ +CREATE TABLE rspamd ( Date Date, TS DateTime, @@ -103,59 +98,51 @@ CREATE TABLE IF NOT EXISTS ${table} RcptUser String, RcptDomain String, ListId String, - Digest FixedString(32) -) ENGINE = MergeTree(Date, (TS, From), 8192) -]], - -attachments_table = [[ -CREATE TABLE IF NOT EXISTS ${attachments_table} ( - Date Date, - Digest FixedString(32), `Attachments.FileName` Array(String), `Attachments.ContentType` Array(String), `Attachments.Length` Array(UInt32), - `Attachments.Digest` Array(FixedString(16)) -) ENGINE = MergeTree(Date, Digest, 8192) -]], - -urls_table = [[ -CREATE TABLE IF NOT EXISTS ${urls_table} ( - Date Date, - Digest FixedString(32), + `Attachments.Digest` Array(FixedString(16)), `Urls.Tld` Array(String), - `Urls.Url` Array(String) -) ENGINE = MergeTree(Date, Digest, 8192) -]], - -emails_table = [[ -CREATE TABLE IF NOT EXISTS ${emails_table} ( - Date Date, - Digest FixedString(32), - Emails Array(String) -) ENGINE = MergeTree(Date, Digest, 8192) -]], - -asn_table = [[ -CREATE TABLE IF NOT EXISTS ${asn_table} ( - Date Date, - Digest FixedString(32), + `Urls.Url` Array(String), + Emails Array(String), ASN String, Country FixedString(2), - IPNet String -) ENGINE = MergeTree(Date, Digest, 8192) -]], - -symbols_table = [[ -CREATE TABLE IF NOT EXISTS ${symbols_table} ( - Date Date, - Digest FixedString(32), + IPNet String, `Symbols.Names` Array(String), `Symbols.Scores` Array(Float64), - `Symbols.Options` Array(String) -) ENGINE = MergeTree(Date, Digest, 8192) -]] + `Symbols.Options` Array(String), + Digest FixedString(32) +) ENGINE = MergeTree(Date, (TS, From), 8192) +]], +[[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]], +[[INSERT INTO rspamd_version (Version) Values (2)]], } +-- This describes SQL queries to migrate between versions +local migrations = { + [1] = { + -- Move to a wide fat table + [[ALTER TABLE rspamd + ADD COLUMN `Attachments.FileName` Array(String) AFTER ListId, + ADD COLUMN `Attachments.ContentType` Array(String) AFTER `Attachments.FileName`, + ADD COLUMN `Attachments.Length` Array(UInt32) AFTER `Attachments.ContentType`, + ADD COLUMN `Attachments.Digest` Array(FixedString(16)) AFTER `Attachments.Length`, + ADD COLUMN `Urls.Tld` Array(String) AFTER `Attachments.Digest`, + ADD COLUMN `Urls.Url` Array(String) AFTER `Urls.Tld`, + ADD COLUMN Emails Array(String) AFTER `Urls.Url`, + ADD COLUMN ASN String AFTER Emails, + ADD COLUMN Country FixedString(2) AFTER ASN, + ADD COLUMN IPNet String AFTER Country, + ADD COLUMN `Symbols.Names` Array(String) AFTER IPNet, + ADD COLUMN `Symbols.Scores` Array(Float64) AFTER `Symbols.Names`, + ADD COLUMN `Symbols.Options` Array(String) AFTER `Symbols.Scores`]], + -- Add explicit version + [[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]], + [[INSERT INTO rspamd_version (Version) Values (2)]], + } +} + + local function clickhouse_main_row(tname) local fields = { 'Date', @@ -803,6 +790,163 @@ local function clickhouse_remove_old_partitions(cfg, ev_base) return settings.retention.period end +local function upload_clickhouse_schema(upstream, ev_base, cfg) + local ch_params = { + ev_base = ev_base, + config = cfg, + } + -- Apply schema sequentially + local function sql_recursor(i) + if clickhouse_schema[i] then + local sql = clickhouse_schema[i] + local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql, + function(_, _) + rspamd_logger.infox(rspamd_config, 'uploaded clickhouse schema element %s to %s', + i, upstream:get_addr():to_string(true)) + sql_recursor(i + 1) + end, + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot upload schema '%s' on clickhouse server %s: %s", + sql, upstream:get_addr():to_string(true), err) + end) + if not ret then + rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: cannot make request", + sql, upstream:get_addr():to_string(true)) + end + end + end + + sql_recursor(1) +end + +local function maybe_apply_migrations(upstream, ev_base, cfg, version) + local ch_params = { + ev_base = ev_base, + config = cfg, + } + -- Apply migrations sequentially + local function migration_recursor(i) + if i < schema_version then + if migrations[i] then + -- We also need to apply statements sequentially + local function sql_recursor(j) + if migrations[i][j] then + local sql = migrations[i][j] + local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql, + function(_, _) + rspamd_logger.infox(rspamd_config, + 'applied migration to version %s from version %s: %s', + i + 1, version, sql:gsub('[\n%s]+', ' ')) + if j == #migrations[i] then + -- Go to the next migration + migration_recursor(i + 1) + else + -- Apply the next statement + sql_recursor(j + 1) + end + end , + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot apply migration %s: '%s' on clickhouse server %s: %s", + i, sql, upstream:get_addr():to_string(true), err) + end) + if not ret then + rspamd_logger.errx(rspamd_config, + "cannot apply migration %s: '%s' on clickhouse server %s: cannot make request", + i, sql, upstream:get_addr():to_string(true)) + end + end + end + + sql_recursor(1) + else + -- Try another migration + migration_recursor(i + 1) + end + end + end + + migration_recursor(version) +end + +local function check_rspamd_table(upstream, ev_base, cfg) + local ch_params = { + ev_base = ev_base, + config = cfg, + } + local sql = [[EXISTS TABLE rspamd]] + local ret = lua_clickhouse.select(upstream, settings, ch_params, sql, + function(_, rows) + if rows[1] and rows[1].result then + if tonumber(rows[1].result) == 1 then + -- Apply migration + rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration') + maybe_apply_migrations(upstream, ev_base, cfg, 1) + else + -- Upload schema + rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema') + upload_clickhouse_schema(upstream, ev_base, cfg) + end + else + rspamd_logger.errx(rspamd_config, + "unexpected reply on EXISTS command from server %s: %s", + upstream:get_addr():to_string(true), rows) + end + end , + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot check if rspamd table exists on clickhouse server %s: %s", + upstream:get_addr():to_string(true), err) + end) + if not ret then + rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: cannot make request", + upstream:get_addr():to_string(true)) + end +end + + +local function check_clickhouse_upstream(upstream, ev_base, cfg) + local ch_params = { + ev_base = ev_base, + config = cfg, + } + -- If we have some custom rules, we just send its schema to the upstream + for k,rule in pairs(settings.custom_rules) do + if rule.schema then + local sql = rspamd_lua_utils.template(rule.schema, settings) + local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql, + nil, + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot send custom schema %s to clickhouse server %s: %s", + k, upstream:get_addr():to_string(true), err) + end) + if not ret then + rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request", + k, upstream:get_addr():to_string(true)) + end + end + end + + -- Now check the main schema and apply migrations if needed + local sql = [[SELECT MAX(Version) as v FROM rspamd_version]] + local ret = lua_clickhouse.select(upstream, settings, ch_params, sql, + function(_, rows) + local version = tonumber(rows[1].v) + maybe_apply_migrations(upstream, ev_base, cfg, version) + end, + function(_, err) + -- It might be either no rspamd table or version 1 + rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table') + check_rspamd_table(upstream, ev_base, cfg) + end) + if not ret then + rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request", + k, upstream:get_addr():to_string(true)) + end +end + local opts = rspamd_config:get_all_opt('clickhouse') if opts then for k,v in pairs(opts) do @@ -888,46 +1032,7 @@ if opts then local upstreams = settings.upstream:all_upstreams() for _,up in ipairs(upstreams) do - local ip_addr = up:get_addr():to_string(true) - - local function send_req(elt, sql) - local function http_cb(err_message, code, data, _) - if code ~= 200 or err_message then - if not err_message then err_message = data end - rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: %s", - elt, ip_addr, err_message) - up:fail() - else - up:ok() - end - end - - if not rspamd_http.request({ - ev_base = ev_base, - config = cfg, - url = connect_prefix .. ip_addr, - body = sql, - callback = http_cb, - mime_type = 'text/plain', - timeout = settings['timeout'], - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - }) then - rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request", - elt, ip_addr) - end - end - - for tab,sql in pairs(clickhouse_schema) do - send_req(tab, rspamd_lua_utils.template(sql, settings)) - end - - for k,rule in pairs(settings.custom_rules) do - if rule.schema then - send_req(k, rspamd_lua_utils.template(rule.schema, settings)) - end - end + check_clickhouse_upstream(up, ev_base, cfg) end if settings.retention.enable and settings.retention.method ~= 'drop' and settings.retention.method ~= 'detach' then