Преглед изворни кода

[Project] Implement Clickhouse migrations

tags/1.8.0
Vsevolod Stakhov пре 5 година
родитељ
комит
dfac7cd80b
2 измењених фајлова са 220 додато и 104 уклоњено
  1. 23
    12
      lualib/lua_clickhouse.lua
  2. 197
    92
      src/plugins/lua/clickhouse.lua

+ 23
- 12
lualib/lua_clickhouse.lua Прегледај датотеку

@@ -76,7 +76,6 @@ local function parse_clickhouse_response(params, data)
local lua_util = require "lua_util"
local ucl = require "ucl"

rspamd_logger.debugm(N, params.log_obj, "got clickhouse response: %s", data)
if data == nil then
-- clickhouse returned no data (i.e. empty result set): exiting
return {}
@@ -113,27 +112,35 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
if code ~= 200 or err_message then
if not err_message then err_message = data end
local ip_addr = upstream:get_addr():to_string(true)
rspamd_logger.errx(params.log_obj,
"request failed on clickhouse server %s: %s",
ip_addr, err_message)

if fail_cb then
fail_cb(params, err_message, data)
else
rspamd_logger.errx(params.log_obj,
"request failed on clickhouse server %s: %s",
ip_addr, err_message)
end
upstream:fail()
else
upstream:ok()
rspamd_logger.debugm(N, params.log_obj,
"http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
local rows = parse_clickhouse_response(params, data)

if rows then
if ok_cb then
ok_cb(params, rows)
else
rspamd_logger.debugm(N, params.log_obj,
"http_select_cb ok: %s, %s, %s, %s", err_message, code,
data:gsub('[\n%s]+', ' '), _)
end
else
if fail_cb then
fail_cb(params, 'failed to parse reply', data)
else
local ip_addr = upstream:get_addr():to_string(true)
rspamd_logger.errx(params.log_obj,
"request failed on clickhouse server %s: %s",
ip_addr, 'failed to parse reply')
end
end
end
@@ -148,21 +155,24 @@ local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
if code ~= 200 or err_message then
if not err_message then err_message = data end
local ip_addr = upstream:get_addr():to_string(true)
rspamd_logger.errx(params.log_obj,
"request failed on clickhouse server %s: %s",
ip_addr, err_message)

if fail_cb then
fail_cb(params, err_message, data)
else
rspamd_logger.errx(params.log_obj,
"request failed on clickhouse server %s: %s",
ip_addr, err_message)
end
upstream:fail()
else
upstream:ok()
rspamd_logger.debugm(N, params.log_obj,
"http_cb ok: %s, %s, %s, %s", err_message, code, data, _)

if ok_cb then
ok_cb(params, data)
else
rspamd_logger.debugm(N, params.log_obj,
"http_insert_cb ok: %s, %s, %s, %s", err_message, code,
data:gsub('[\n%s]+', ' '), _)
end
end
end
@@ -204,7 +214,7 @@ exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
http_params.body = query
http_params.log_obj = params.task or params.config

rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", params.body)
rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)

if not http_params.url then
local connect_prefix = "http://"
@@ -306,6 +316,7 @@ exports.generic = function (upstream, settings, params, query,
http_params.user = settings.user
http_params.password = settings.password
http_params.log_obj = params.task or params.config
http_params.body = query

if not http_params.url then
local connect_prefix = "http://"

+ 197
- 92
src/plugins/lua/clickhouse.lua Прегледај датотеку

@@ -37,6 +37,7 @@ local asn_rows = {}
local symbols_rows = {}
local custom_rows = {}
local nrows = 0
local schema_version = 2 -- Current schema version
local connect_prefix = 'http://'

local settings = {
@@ -52,12 +53,6 @@ local settings = {
dmarc_allow_symbols = {'DMARC_POLICY_ALLOW'},
dmarc_reject_symbols = {'DMARC_POLICY_REJECT', 'DMARC_POLICY_QUARANTINE'},
stop_symbols = {},
table = 'rspamd',
attachments_table = 'rspamd_attachments',
urls_table = 'rspamd_urls',
emails_table = 'rspamd_emails',
symbols_table = 'rspamd_symbols',
asn_table = 'rspamd_asn',
ipmask = 19,
ipmask6 = 48,
full_urls = false,
@@ -78,9 +73,9 @@ local settings = {
}
}

local clickhouse_schema = {
table = [[
CREATE TABLE IF NOT EXISTS ${table}
--- @language SQL
local clickhouse_schema = {[[
CREATE TABLE rspamd
(
Date Date,
TS DateTime,
@@ -103,59 +98,51 @@ CREATE TABLE IF NOT EXISTS ${table}
RcptUser String,
RcptDomain String,
ListId String,
Digest FixedString(32)
) ENGINE = MergeTree(Date, (TS, From), 8192)
]],

attachments_table = [[
CREATE TABLE IF NOT EXISTS ${attachments_table} (
Date Date,
Digest FixedString(32),
`Attachments.FileName` Array(String),
`Attachments.ContentType` Array(String),
`Attachments.Length` Array(UInt32),
`Attachments.Digest` Array(FixedString(16))
) ENGINE = MergeTree(Date, Digest, 8192)
]],

urls_table = [[
CREATE TABLE IF NOT EXISTS ${urls_table} (
Date Date,
Digest FixedString(32),
`Attachments.Digest` Array(FixedString(16)),
`Urls.Tld` Array(String),
`Urls.Url` Array(String)
) ENGINE = MergeTree(Date, Digest, 8192)
]],

emails_table = [[
CREATE TABLE IF NOT EXISTS ${emails_table} (
Date Date,
Digest FixedString(32),
Emails Array(String)
) ENGINE = MergeTree(Date, Digest, 8192)
]],

asn_table = [[
CREATE TABLE IF NOT EXISTS ${asn_table} (
Date Date,
Digest FixedString(32),
`Urls.Url` Array(String),
Emails Array(String),
ASN String,
Country FixedString(2),
IPNet String
) ENGINE = MergeTree(Date, Digest, 8192)
]],

symbols_table = [[
CREATE TABLE IF NOT EXISTS ${symbols_table} (
Date Date,
Digest FixedString(32),
IPNet String,
`Symbols.Names` Array(String),
`Symbols.Scores` Array(Float64),
`Symbols.Options` Array(String)
) ENGINE = MergeTree(Date, Digest, 8192)
]]
`Symbols.Options` Array(String),
Digest FixedString(32)
) ENGINE = MergeTree(Date, (TS, From), 8192)
]],
[[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]],
[[INSERT INTO rspamd_version (Version) Values (2)]],
}

-- This describes SQL queries to migrate between versions
local migrations = {
[1] = {
-- Move to a wide fat table
[[ALTER TABLE rspamd
ADD COLUMN `Attachments.FileName` Array(String) AFTER ListId,
ADD COLUMN `Attachments.ContentType` Array(String) AFTER `Attachments.FileName`,
ADD COLUMN `Attachments.Length` Array(UInt32) AFTER `Attachments.ContentType`,
ADD COLUMN `Attachments.Digest` Array(FixedString(16)) AFTER `Attachments.Length`,
ADD COLUMN `Urls.Tld` Array(String) AFTER `Attachments.Digest`,
ADD COLUMN `Urls.Url` Array(String) AFTER `Urls.Tld`,
ADD COLUMN Emails Array(String) AFTER `Urls.Url`,
ADD COLUMN ASN String AFTER Emails,
ADD COLUMN Country FixedString(2) AFTER ASN,
ADD COLUMN IPNet String AFTER Country,
ADD COLUMN `Symbols.Names` Array(String) AFTER IPNet,
ADD COLUMN `Symbols.Scores` Array(Float64) AFTER `Symbols.Names`,
ADD COLUMN `Symbols.Options` Array(String) AFTER `Symbols.Scores`]],
-- Add explicit version
[[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]],
[[INSERT INTO rspamd_version (Version) Values (2)]],
}
}


local function clickhouse_main_row(tname)
local fields = {
'Date',
@@ -803,6 +790,163 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
return settings.retention.period
end

local function upload_clickhouse_schema(upstream, ev_base, cfg)
local ch_params = {
ev_base = ev_base,
config = cfg,
}
-- Apply schema sequentially
local function sql_recursor(i)
if clickhouse_schema[i] then
local sql = clickhouse_schema[i]
local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
function(_, _)
rspamd_logger.infox(rspamd_config, 'uploaded clickhouse schema element %s to %s',
i, upstream:get_addr():to_string(true))
sql_recursor(i + 1)
end,
function(_, err)
rspamd_logger.errx(rspamd_config,
"cannot upload schema '%s' on clickhouse server %s: %s",
sql, upstream:get_addr():to_string(true), err)
end)
if not ret then
rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: cannot make request",
sql, upstream:get_addr():to_string(true))
end
end
end

sql_recursor(1)
end

local function maybe_apply_migrations(upstream, ev_base, cfg, version)
local ch_params = {
ev_base = ev_base,
config = cfg,
}
-- Apply migrations sequentially
local function migration_recursor(i)
if i < schema_version then
if migrations[i] then
-- We also need to apply statements sequentially
local function sql_recursor(j)
if migrations[i][j] then
local sql = migrations[i][j]
local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
function(_, _)
rspamd_logger.infox(rspamd_config,
'applied migration to version %s from version %s: %s',
i + 1, version, sql:gsub('[\n%s]+', ' '))
if j == #migrations[i] then
-- Go to the next migration
migration_recursor(i + 1)
else
-- Apply the next statement
sql_recursor(j + 1)
end
end ,
function(_, err)
rspamd_logger.errx(rspamd_config,
"cannot apply migration %s: '%s' on clickhouse server %s: %s",
i, sql, upstream:get_addr():to_string(true), err)
end)
if not ret then
rspamd_logger.errx(rspamd_config,
"cannot apply migration %s: '%s' on clickhouse server %s: cannot make request",
i, sql, upstream:get_addr():to_string(true))
end
end
end

sql_recursor(1)
else
-- Try another migration
migration_recursor(i + 1)
end
end
end

migration_recursor(version)
end

local function check_rspamd_table(upstream, ev_base, cfg)
local ch_params = {
ev_base = ev_base,
config = cfg,
}
local sql = [[EXISTS TABLE rspamd]]
local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
function(_, rows)
if rows[1] and rows[1].result then
if tonumber(rows[1].result) == 1 then
-- Apply migration
rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration')
maybe_apply_migrations(upstream, ev_base, cfg, 1)
else
-- Upload schema
rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema')
upload_clickhouse_schema(upstream, ev_base, cfg)
end
else
rspamd_logger.errx(rspamd_config,
"unexpected reply on EXISTS command from server %s: %s",
upstream:get_addr():to_string(true), rows)
end
end ,
function(_, err)
rspamd_logger.errx(rspamd_config,
"cannot check if rspamd table exists on clickhouse server %s: %s",
upstream:get_addr():to_string(true), err)
end)
if not ret then
rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: cannot make request",
upstream:get_addr():to_string(true))
end
end


local function check_clickhouse_upstream(upstream, ev_base, cfg)
local ch_params = {
ev_base = ev_base,
config = cfg,
}
-- If we have some custom rules, we just send its schema to the upstream
for k,rule in pairs(settings.custom_rules) do
if rule.schema then
local sql = rspamd_lua_utils.template(rule.schema, settings)
local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
nil,
function(_, err)
rspamd_logger.errx(rspamd_config,
"cannot send custom schema %s to clickhouse server %s: %s",
k, upstream:get_addr():to_string(true), err)
end)
if not ret then
rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request",
k, upstream:get_addr():to_string(true))
end
end
end

-- Now check the main schema and apply migrations if needed
local sql = [[SELECT MAX(Version) as v FROM rspamd_version]]
local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
function(_, rows)
local version = tonumber(rows[1].v)
maybe_apply_migrations(upstream, ev_base, cfg, version)
end,
function(_, err)
-- It might be either no rspamd table or version 1
rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table')
check_rspamd_table(upstream, ev_base, cfg)
end)
if not ret then
rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request",
k, upstream:get_addr():to_string(true))
end
end

local opts = rspamd_config:get_all_opt('clickhouse')
if opts then
for k,v in pairs(opts) do
@@ -888,46 +1032,7 @@ if opts then
local upstreams = settings.upstream:all_upstreams()

for _,up in ipairs(upstreams) do
local ip_addr = up:get_addr():to_string(true)

local function send_req(elt, sql)
local function http_cb(err_message, code, data, _)
if code ~= 200 or err_message then
if not err_message then err_message = data end
rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: %s",
elt, ip_addr, err_message)
up:fail()
else
up:ok()
end
end

if not rspamd_http.request({
ev_base = ev_base,
config = cfg,
url = connect_prefix .. ip_addr,
body = sql,
callback = http_cb,
mime_type = 'text/plain',
timeout = settings['timeout'],
no_ssl_verify = settings.no_ssl_verify,
user = settings.user,
password = settings.password,
}) then
rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request",
elt, ip_addr)
end
end

for tab,sql in pairs(clickhouse_schema) do
send_req(tab, rspamd_lua_utils.template(sql, settings))
end

for k,rule in pairs(settings.custom_rules) do
if rule.schema then
send_req(k, rspamd_lua_utils.template(rule.schema, settings))
end
end
check_clickhouse_upstream(up, ev_base, cfg)
end

if settings.retention.enable and settings.retention.method ~= 'drop' and settings.retention.method ~= 'detach' then

Loading…
Откажи
Сачувај