local rspamd_http = require "rspamd_http"
local rspamd_lua_utils = require "lua_util"
local upstream_list = require "rspamd_upstream_list"
+local lua_util = require "lua_util"
+local ucl = require "ucl"
+
local N = "clickhouse"
if confighelp then
user = nil,
password = nil,
no_ssl_verify = false,
+ retention = {
+ enable = false,
+ method = 'detach',
+ period_months = 3,
+ run_every = '7d',
+ }
}
local clickhouse_schema = {
-rspamd = [[
+table = [[
CREATE TABLE IF NOT EXISTS ${table}
(
Date Date,
) ENGINE = MergeTree(Date, (TS, From), 8192)
]],
- attachments = [[
+attachments_table = [[
CREATE TABLE IF NOT EXISTS ${attachments_table} (
Date Date,
Digest FixedString(32),
) ENGINE = MergeTree(Date, Digest, 8192)
]],
- urls = [[
+urls_table = [[
CREATE TABLE IF NOT EXISTS ${urls_table} (
Date Date,
Digest FixedString(32),
) ENGINE = MergeTree(Date, Digest, 8192)
]],
- emails = [[
+emails_table = [[
CREATE TABLE IF NOT EXISTS ${emails_table} (
Date Date,
Digest FixedString(32),
) ENGINE = MergeTree(Date, Digest, 8192)
]],
- asn = [[
+asn_table = [[
CREATE TABLE IF NOT EXISTS ${asn_table} (
Date Date,
Digest FixedString(32),
) ENGINE = MergeTree(Date, Digest, 8192)
]],
- symbols = [[
+symbols_table = [[
CREATE TABLE IF NOT EXISTS ${symbols_table} (
Date Date,
Digest FixedString(32),
end
end
+local function mk_remove_http_cb(upstream, params, ok_cb)
+ local function do_remove_http_cb(err_message, code, data, _)
+ if code ~= 200 or err_message then
+ if not err_message then err_message = data end
+ local ip_addr = upstream:get_addr():to_string(true)
+ rspamd_logger.errx(rspamd_config, "request failed on clickhouse server %s: %s",
+ ip_addr, err_message)
+ upstream:fail()
+ else
+ upstream:ok()
+ if (ok_cb) then
+ rspamd_logger.debugm(N, params.ev_base, "do_remove_http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
+ ok_cb(params.ev_base, params.config, data)
+ end
+ end
+ end
+ return do_remove_http_cb
+end
+
+local function clickhouse_request(upstream, ok_cb, params)
+ rspamd_logger.infox(rspamd_config, "clickhouse_request: %s", params.body) -- TODO: make debugm
+
+ params.callback = mk_remove_http_cb(upstream, params, ok_cb)
+ params.gzip = settings.use_gzip
+ params.mime_type = 'text/plain'
+ params.timeout = settings['timeout']
+ params.no_ssl_verify = settings.no_ssl_verify
+ params.user = settings.user
+ params.password = settings.password
+ if not params.url then
+ local ip_addr = upstream:get_addr():to_string(true)
+ params.url = connect_prefix .. ip_addr
+ end
+ return rspamd_http.request(params)
+end
+
+local function do_remove_partition(ev_base, cfg, table_name, partition_id)
+ rspamd_logger.debugm(N, ev_base, "removing partition %s.%s", table_name, partition_id)
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local remove_partition_sql = "ALTER TABLE ${table_name} ${remove_method} PARTITION ${partition_id}"
+ local remove_method = (settings.retention.method == 'drop') and 'DROP' or 'DETACH'
+ local sql_params = {
+ ['table_name'] = table_name,
+ ['remove_method'] = remove_method,
+ ['partition_id'] = partition_id
+ }
+
+ local sql = rspamd_lua_utils.template(remove_partition_sql, sql_params)
+
+ local ch_params = {
+ body = sql,
+ ev_base = ev_base,
+ cfg = cfg,
+ }
+
+ if not clickhouse_request(upstream, nil, ch_params) then
+ rspamd_logger.errx(ev_base, "cannot send data to clickhouse server %s: cannot make request",
+ settings['server'])
+ end
+end
+
+local function parse_clickhouse_response(ev_base, cfg, data)
+ rspamd_logger.debugm(N, ev_base, "got clickhouse response: %s", data)
+ if data == nil then
+ -- clickhouse returned no data: exiting
+ return
+ end
+ local function parse_string(s)
+ local parser = ucl.parser()
+ local res, err = parser:parse_string(s)
+ if not res then
+ rspamd_logger.errx(ev_base, 'Parser error: %s', err)
+ return nil
+ end
+ return parser:get_object()
+ end
+
+ -- iterate over rows
+ local ch_rows = lua_util.str_split(data, "\n")
+ for _, plain_row in pairs(ch_rows) do
+ if plain_row and plain_row:len() > 1 then
+ local parsed_row = parse_string(plain_row)
+ do_remove_partition(ev_base, cfg, parsed_row.table, parsed_row.partition_id)
+ end
+ end
+end
+
+
+--[[
+ nil - file is not writable, do not perform removal
+ 0 - it's time to perform removal
+ <int> - how many seconds wait until next run
+]]
+local function get_last_removal_ago(ev_base)
+ local ts_file = string.format('%s/%s', rspamd_paths['DBDIR'], 'clickhouse_retention_run')
+ local f, err = io.open(ts_file, 'r')
+ local write_file
+ local last_ts
+
+ if err then
+ rspamd_logger.debugm(N, ev_base, 'Failed to open %s: %s', ts_file, err)
+ else
+ last_ts = tonumber(f:read('*number'))
+ f:close()
+ end
+
+ write_file, err = io.open(ts_file, 'w')
+ if err then
+ rspamd_logger.errx(ev_base, 'Failed to open %s, will not perform retention: %s', ts_file, err)
+ return nil
+ end
+
+ local current_ts = os.time()
+
+ if last_ts == nil or (last_ts + settings.retention.period) <= current_ts then
+ local res
+ res, err = write_file:write(tostring(current_ts))
+ if err then
+ rspamd_logger.errx(ev_base, 'Failed to write %s, will not perform retention: %s', ts_file, err)
+ return nil
+ end
+ write_file:close()
+ return 0
+ end
+
+ return (last_ts + settings.retention.period) - current_ts
+end
+
+local function clickhouse_remove_old_partitions(cfg, ev_base)
+ local last_time_ago = get_last_removal_ago(ev_base)
+ if last_time_ago == nil then
+ rspamd_logger.errx(ev_base, "Failed to get last run time. Disabling retention")
+ return false
+ elseif last_time_ago ~= 0 then
+ return last_time_ago
+ end
+
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local ip_addr = upstream:get_addr():to_string(true)
+
+ local partition_to_remove_sql = "SELECT distinct partition_id, table FROM system.parts WHERE table in ('${tables}') and max_date < toDate(now() - interval ${month} month);"
+
+ local table_names = {}
+ for table_name,_ in pairs(clickhouse_schema) do
+ table.insert(table_names, settings[table_name])
+ end
+ local tables = table.concat(table_names, "', '")
+ local sql_params = {
+ tables = tables,
+ month = settings.retention.period_months,
+ }
+ local sql = rspamd_lua_utils.template(partition_to_remove_sql, sql_params)
+
+ local ch_params = {
+ body = sql,
+ url = string.format("%s%s/?default_format=JSONEachRow", connect_prefix, ip_addr),
+ ev_base = ev_base,
+ config = cfg,
+ }
+ if not clickhouse_request(upstream, parse_clickhouse_response, ch_params) then
+ rspamd_logger.errx(ev_base, "cannot send data to clickhouse server %s: cannot make request",
+ settings['server'])
+ end
+
+ -- settings.retention.period is added on initialisation, see below
+ return settings.retention.period
+end
+
local opts = rspamd_config:get_all_opt('clickhouse')
if opts then
for k,v in pairs(opts) do
send_req(tab, rspamd_lua_utils.template(sql, settings))
end
end
+
+ if settings.retention.enable and settings.retention.method ~= 'drop' and settings.retention.method ~= 'detach' then
+ rspamd_logger.errx(rspamd_config, "retention.method should be either 'drop' or 'detach' (now: %s). Disabling retention",
+ settings.retention.method)
+ settings.retention.enable = false
+ end
+ if settings.retention.enable and settings.retention.period_months < 1 or settings.retention.period_months > 1000 then
+ rspamd_logger.errx(rspamd_config, "please, set retention.period_months between 1 and 1000 (now: %s). Disabling retention",
+ settings.retention.period_months)
+ settings.retention.enable = false
+ end
+ local period = lua_util.parse_time_interval(settings.retention.run_every)
+ if settings.retention.enable and period == nil then
+ rspamd_logger.errx(rspamd_config, "invalid value for retention.run_every (%s). Disabling retention",
+ settings.retention.run_every)
+ settings.retention.enable = false
+ end
+
+ if settings.retention.enable then
+ settings.retention.period = period
+ rspamd_logger.infox(rspamd_config, "retention will be performed each %s seconds for %s month with method %s",
+ period, settings.retention.period_months, settings.retention.method)
+ rspamd_config:add_periodic(ev_base, 0, clickhouse_remove_old_partitions, false)
+ end
end
end)
end