local lua_util = require "lua_util"
local ucl = require "ucl"
- rspamd_logger.debugm(N, params.log_obj, "got clickhouse response: %s", data)
if data == nil then
-- clickhouse returned no data (i.e. empty result set): exiting
return {}
if code ~= 200 or err_message then
if not err_message then err_message = data end
local ip_addr = upstream:get_addr():to_string(true)
- rspamd_logger.errx(params.log_obj,
- "request failed on clickhouse server %s: %s",
- ip_addr, err_message)
if fail_cb then
fail_cb(params, err_message, data)
+ else
+ rspamd_logger.errx(params.log_obj,
+ "request failed on clickhouse server %s: %s",
+ ip_addr, err_message)
end
upstream:fail()
else
upstream:ok()
- rspamd_logger.debugm(N, params.log_obj,
- "http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
local rows = parse_clickhouse_response(params, data)
if rows then
if ok_cb then
ok_cb(params, rows)
+ else
+ rspamd_logger.debugm(N, params.log_obj,
+ "http_select_cb ok: %s, %s, %s, %s", err_message, code,
+ data:gsub('[\n%s]+', ' '), _)
end
else
if fail_cb then
fail_cb(params, 'failed to parse reply', data)
+ else
+ local ip_addr = upstream:get_addr():to_string(true)
+ rspamd_logger.errx(params.log_obj,
+ "request failed on clickhouse server %s: %s",
+ ip_addr, 'failed to parse reply')
end
end
end
if code ~= 200 or err_message then
if not err_message then err_message = data end
local ip_addr = upstream:get_addr():to_string(true)
- rspamd_logger.errx(params.log_obj,
- "request failed on clickhouse server %s: %s",
- ip_addr, err_message)
if fail_cb then
fail_cb(params, err_message, data)
+ else
+ rspamd_logger.errx(params.log_obj,
+ "request failed on clickhouse server %s: %s",
+ ip_addr, err_message)
end
upstream:fail()
else
upstream:ok()
- rspamd_logger.debugm(N, params.log_obj,
- "http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
if ok_cb then
ok_cb(params, data)
+ else
+ rspamd_logger.debugm(N, params.log_obj,
+ "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
+ data:gsub('[\n%s]+', ' '), _)
end
end
end
http_params.body = query
http_params.log_obj = params.task or params.config
- rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", params.body)
+ rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
if not http_params.url then
local connect_prefix = "http://"
http_params.user = settings.user
http_params.password = settings.password
http_params.log_obj = params.task or params.config
+ http_params.body = query
if not http_params.url then
local connect_prefix = "http://"
local symbols_rows = {}
local custom_rows = {}
local nrows = 0
+local schema_version = 2 -- Current schema version
local connect_prefix = 'http://'
local settings = {
dmarc_allow_symbols = {'DMARC_POLICY_ALLOW'},
dmarc_reject_symbols = {'DMARC_POLICY_REJECT', 'DMARC_POLICY_QUARANTINE'},
stop_symbols = {},
- table = 'rspamd',
- attachments_table = 'rspamd_attachments',
- urls_table = 'rspamd_urls',
- emails_table = 'rspamd_emails',
- symbols_table = 'rspamd_symbols',
- asn_table = 'rspamd_asn',
ipmask = 19,
ipmask6 = 48,
full_urls = false,
}
}
-local clickhouse_schema = {
-table = [[
-CREATE TABLE IF NOT EXISTS ${table}
+--- @language SQL
+local clickhouse_schema = {[[
+CREATE TABLE rspamd
(
Date Date,
TS DateTime,
RcptUser String,
RcptDomain String,
ListId String,
- Digest FixedString(32)
-) ENGINE = MergeTree(Date, (TS, From), 8192)
-]],
-
-attachments_table = [[
-CREATE TABLE IF NOT EXISTS ${attachments_table} (
- Date Date,
- Digest FixedString(32),
`Attachments.FileName` Array(String),
`Attachments.ContentType` Array(String),
`Attachments.Length` Array(UInt32),
- `Attachments.Digest` Array(FixedString(16))
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-urls_table = [[
-CREATE TABLE IF NOT EXISTS ${urls_table} (
- Date Date,
- Digest FixedString(32),
+ `Attachments.Digest` Array(FixedString(16)),
`Urls.Tld` Array(String),
- `Urls.Url` Array(String)
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-emails_table = [[
-CREATE TABLE IF NOT EXISTS ${emails_table} (
- Date Date,
- Digest FixedString(32),
- Emails Array(String)
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-asn_table = [[
-CREATE TABLE IF NOT EXISTS ${asn_table} (
- Date Date,
- Digest FixedString(32),
+ `Urls.Url` Array(String),
+ Emails Array(String),
ASN String,
Country FixedString(2),
- IPNet String
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-symbols_table = [[
-CREATE TABLE IF NOT EXISTS ${symbols_table} (
- Date Date,
- Digest FixedString(32),
+ IPNet String,
`Symbols.Names` Array(String),
`Symbols.Scores` Array(Float64),
- `Symbols.Options` Array(String)
-) ENGINE = MergeTree(Date, Digest, 8192)
-]]
+ `Symbols.Options` Array(String),
+ Digest FixedString(32)
+) ENGINE = MergeTree(Date, (TS, From), 8192)
+]],
+[[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]],
+[[INSERT INTO rspamd_version (Version) Values (2)]],
}
+-- This describes SQL queries to migrate between versions
+local migrations = {
+ [1] = {
+ -- Move to a wide fat table
+ [[ALTER TABLE rspamd
+ ADD COLUMN `Attachments.FileName` Array(String) AFTER ListId,
+ ADD COLUMN `Attachments.ContentType` Array(String) AFTER `Attachments.FileName`,
+ ADD COLUMN `Attachments.Length` Array(UInt32) AFTER `Attachments.ContentType`,
+ ADD COLUMN `Attachments.Digest` Array(FixedString(16)) AFTER `Attachments.Length`,
+ ADD COLUMN `Urls.Tld` Array(String) AFTER `Attachments.Digest`,
+ ADD COLUMN `Urls.Url` Array(String) AFTER `Urls.Tld`,
+ ADD COLUMN Emails Array(String) AFTER `Urls.Url`,
+ ADD COLUMN ASN String AFTER Emails,
+ ADD COLUMN Country FixedString(2) AFTER ASN,
+ ADD COLUMN IPNet String AFTER Country,
+ ADD COLUMN `Symbols.Names` Array(String) AFTER IPNet,
+ ADD COLUMN `Symbols.Scores` Array(Float64) AFTER `Symbols.Names`,
+ ADD COLUMN `Symbols.Options` Array(String) AFTER `Symbols.Scores`]],
+ -- Add explicit version
+ [[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]],
+ [[INSERT INTO rspamd_version (Version) Values (2)]],
+ }
+}
+
+
local function clickhouse_main_row(tname)
local fields = {
'Date',
return settings.retention.period
end
+local function upload_clickhouse_schema(upstream, ev_base, cfg)
+ local ch_params = {
+ ev_base = ev_base,
+ config = cfg,
+ }
+ -- Apply schema sequentially
+ local function sql_recursor(i)
+ if clickhouse_schema[i] then
+ local sql = clickhouse_schema[i]
+ local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
+ function(_, _)
+ rspamd_logger.infox(rspamd_config, 'uploaded clickhouse schema element %s to %s',
+ i, upstream:get_addr():to_string(true))
+ sql_recursor(i + 1)
+ end,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot upload schema '%s' on clickhouse server %s: %s",
+ sql, upstream:get_addr():to_string(true), err)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: cannot make request",
+ sql, upstream:get_addr():to_string(true))
+ end
+ end
+ end
+
+ sql_recursor(1)
+end
+
+local function maybe_apply_migrations(upstream, ev_base, cfg, version)
+ local ch_params = {
+ ev_base = ev_base,
+ config = cfg,
+ }
+ -- Apply migrations sequentially
+ local function migration_recursor(i)
+ if i < schema_version then
+ if migrations[i] then
+ -- We also need to apply statements sequentially
+ local function sql_recursor(j)
+ if migrations[i][j] then
+ local sql = migrations[i][j]
+ local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
+ function(_, _)
+ rspamd_logger.infox(rspamd_config,
+ 'applied migration to version %s from version %s: %s',
+ i + 1, version, sql:gsub('[\n%s]+', ' '))
+ if j == #migrations[i] then
+ -- Go to the next migration
+ migration_recursor(i + 1)
+ else
+ -- Apply the next statement
+ sql_recursor(j + 1)
+ end
+ end ,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot apply migration %s: '%s' on clickhouse server %s: %s",
+ i, sql, upstream:get_addr():to_string(true), err)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config,
+ "cannot apply migration %s: '%s' on clickhouse server %s: cannot make request",
+ i, sql, upstream:get_addr():to_string(true))
+ end
+ end
+ end
+
+ sql_recursor(1)
+ else
+ -- Try another migration
+ migration_recursor(i + 1)
+ end
+ end
+ end
+
+ migration_recursor(version)
+end
+
+local function check_rspamd_table(upstream, ev_base, cfg)
+ local ch_params = {
+ ev_base = ev_base,
+ config = cfg,
+ }
+ local sql = [[EXISTS TABLE rspamd]]
+ local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
+ function(_, rows)
+ if rows[1] and rows[1].result then
+ if tonumber(rows[1].result) == 1 then
+ -- Apply migration
+ rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration')
+ maybe_apply_migrations(upstream, ev_base, cfg, 1)
+ else
+ -- Upload schema
+ rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema')
+ upload_clickhouse_schema(upstream, ev_base, cfg)
+ end
+ else
+ rspamd_logger.errx(rspamd_config,
+ "unexpected reply on EXISTS command from server %s: %s",
+ upstream:get_addr():to_string(true), rows)
+ end
+ end ,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot check if rspamd table exists on clickhouse server %s: %s",
+ upstream:get_addr():to_string(true), err)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: cannot make request",
+ upstream:get_addr():to_string(true))
+ end
+end
+
+
+local function check_clickhouse_upstream(upstream, ev_base, cfg)
+ local ch_params = {
+ ev_base = ev_base,
+ config = cfg,
+ }
+ -- If we have some custom rules, we just send its schema to the upstream
+ for k,rule in pairs(settings.custom_rules) do
+ if rule.schema then
+ local sql = rspamd_lua_utils.template(rule.schema, settings)
+ local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
+ nil,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot send custom schema %s to clickhouse server %s: %s",
+ k, upstream:get_addr():to_string(true), err)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request",
+ k, upstream:get_addr():to_string(true))
+ end
+ end
+ end
+
+ -- Now check the main schema and apply migrations if needed
+ local sql = [[SELECT MAX(Version) as v FROM rspamd_version]]
+ local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
+ function(_, rows)
+ local version = tonumber(rows[1].v)
+ maybe_apply_migrations(upstream, ev_base, cfg, version)
+ end,
+ function(_, err)
+ -- It might be either no rspamd table or version 1
+ rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table')
+ check_rspamd_table(upstream, ev_base, cfg)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request",
+ k, upstream:get_addr():to_string(true))
+ end
+end
+
local opts = rspamd_config:get_all_opt('clickhouse')
if opts then
for k,v in pairs(opts) do
local upstreams = settings.upstream:all_upstreams()
for _,up in ipairs(upstreams) do
- local ip_addr = up:get_addr():to_string(true)
-
- local function send_req(elt, sql)
- local function http_cb(err_message, code, data, _)
- if code ~= 200 or err_message then
- if not err_message then err_message = data end
- rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: %s",
- elt, ip_addr, err_message)
- up:fail()
- else
- up:ok()
- end
- end
-
- if not rspamd_http.request({
- ev_base = ev_base,
- config = cfg,
- url = connect_prefix .. ip_addr,
- body = sql,
- callback = http_cb,
- mime_type = 'text/plain',
- timeout = settings['timeout'],
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- }) then
- rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request",
- elt, ip_addr)
- end
- end
-
- for tab,sql in pairs(clickhouse_schema) do
- send_req(tab, rspamd_lua_utils.template(sql, settings))
- end
-
- for k,rule in pairs(settings.custom_rules) do
- if rule.schema then
- send_req(k, rspamd_lua_utils.template(rule.schema, settings))
- end
- end
+ check_clickhouse_upstream(up, ev_base, cfg)
end
if settings.retention.enable and settings.retention.method ~= 'drop' and settings.retention.method ~= 'detach' then