From ad7d5442c5aa124a4ba2cc7b4698b18faf20b60f Mon Sep 17 00:00:00 2001 From: Mikhail Galanin Date: Thu, 26 Jul 2018 11:52:09 +0100 Subject: [PATCH] Clickhouse tables retention --- conf/modules.d/clickhouse.conf | 12 ++ src/plugins/lua/clickhouse.lua | 213 ++++++++++++++++++++++++++++++++- 2 files changed, 219 insertions(+), 6 deletions(-) diff --git a/conf/modules.d/clickhouse.conf b/conf/modules.d/clickhouse.conf index 3dbc3b60c..c35352a03 100644 --- a/conf/modules.d/clickhouse.conf +++ b/conf/modules.d/clickhouse.conf @@ -51,6 +51,18 @@ clickhouse { #dmarc_allow_symbols = ["DMARC_POLICY_ALLOW"]; #dmarc_reject_symbols = ["DMARC_POLICY_REJECT", "DMARC_POLICY_QUARANTINE"]; + #retention { + # # disabled by default + # enable = true; + # # drop | detach, please refer to ClickHouse docs for details + # # http://clickhouse-docs.readthedocs.io/en/latest/query_language/queries.html#manipulations-with-partitions-and-parts + # method = "drop"; + # # how many month the data should be kept in ClickHouse + # period_months = 3; + # # how often run the cleanup process + # run_every = "7d"; + #} + .include(try=true,priority=5) "${DBDIR}/dynamic/clickhouse.conf" .include(try=true,priority=1,duplicate=merge) "$LOCAL_CONFDIR/local.d/clickhouse.conf" .include(try=true,priority=10) "$LOCAL_CONFDIR/override.d/clickhouse.conf" diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index 913fedf20..2d32982bd 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -18,6 +18,9 @@ local rspamd_logger = require 'rspamd_logger' local rspamd_http = require "rspamd_http" local rspamd_lua_utils = require "lua_util" local upstream_list = require "rspamd_upstream_list" +local lua_util = require "lua_util" +local ucl = require "ucl" + local N = "clickhouse" if confighelp then @@ -66,10 +69,16 @@ local settings = { user = nil, password = nil, no_ssl_verify = false, + retention = { + enable = false, + method = 'detach', + period_months = 3, + run_every = '7d', + } } local clickhouse_schema = { -rspamd = [[ +table = [[ CREATE TABLE IF NOT EXISTS ${table} ( Date Date, @@ -97,7 +106,7 @@ CREATE TABLE IF NOT EXISTS ${table} ) ENGINE = MergeTree(Date, (TS, From), 8192) ]], - attachments = [[ +attachments_table = [[ CREATE TABLE IF NOT EXISTS ${attachments_table} ( Date Date, Digest FixedString(32), @@ -108,7 +117,7 @@ CREATE TABLE IF NOT EXISTS ${attachments_table} ( ) ENGINE = MergeTree(Date, Digest, 8192) ]], - urls = [[ +urls_table = [[ CREATE TABLE IF NOT EXISTS ${urls_table} ( Date Date, Digest FixedString(32), @@ -117,7 +126,7 @@ CREATE TABLE IF NOT EXISTS ${urls_table} ( ) ENGINE = MergeTree(Date, Digest, 8192) ]], - emails = [[ +emails_table = [[ CREATE TABLE IF NOT EXISTS ${emails_table} ( Date Date, Digest FixedString(32), @@ -125,7 +134,7 @@ CREATE TABLE IF NOT EXISTS ${emails_table} ( ) ENGINE = MergeTree(Date, Digest, 8192) ]], - asn = [[ +asn_table = [[ CREATE TABLE IF NOT EXISTS ${asn_table} ( Date Date, Digest FixedString(32), @@ -135,7 +144,7 @@ CREATE TABLE IF NOT EXISTS ${asn_table} ( ) ENGINE = MergeTree(Date, Digest, 8192) ]], - symbols = [[ +symbols_table = [[ CREATE TABLE IF NOT EXISTS ${symbols_table} ( Date Date, Digest FixedString(32), @@ -741,6 +750,174 @@ local function clickhouse_collect(task) end end +local function mk_remove_http_cb(upstream, params, ok_cb) + local function do_remove_http_cb(err_message, code, data, _) + if code ~= 200 or err_message then + if not err_message then err_message = data end + local ip_addr = upstream:get_addr():to_string(true) + rspamd_logger.errx(rspamd_config, "request failed on clickhouse server %s: %s", + ip_addr, err_message) + upstream:fail() + else + upstream:ok() + if (ok_cb) then + rspamd_logger.debugm(N, params.ev_base, "do_remove_http_cb ok: %s, %s, %s, %s", err_message, code, data, _) + ok_cb(params.ev_base, params.config, data) + end + end + end + return do_remove_http_cb +end + +local function clickhouse_request(upstream, ok_cb, params) + rspamd_logger.infox(rspamd_config, "clickhouse_request: %s", params.body) -- TODO: make debugm + + params.callback = mk_remove_http_cb(upstream, params, ok_cb) + params.gzip = settings.use_gzip + params.mime_type = 'text/plain' + params.timeout = settings['timeout'] + params.no_ssl_verify = settings.no_ssl_verify + params.user = settings.user + params.password = settings.password + if not params.url then + local ip_addr = upstream:get_addr():to_string(true) + params.url = connect_prefix .. ip_addr + end + return rspamd_http.request(params) +end + +local function do_remove_partition(ev_base, cfg, table_name, partition_id) + rspamd_logger.debugm(N, ev_base, "removing partition %s.%s", table_name, partition_id) + local upstream = settings.upstream:get_upstream_round_robin() + local remove_partition_sql = "ALTER TABLE ${table_name} ${remove_method} PARTITION ${partition_id}" + local remove_method = (settings.retention.method == 'drop') and 'DROP' or 'DETACH' + local sql_params = { + ['table_name'] = table_name, + ['remove_method'] = remove_method, + ['partition_id'] = partition_id + } + + local sql = rspamd_lua_utils.template(remove_partition_sql, sql_params) + + local ch_params = { + body = sql, + ev_base = ev_base, + cfg = cfg, + } + + if not clickhouse_request(upstream, nil, ch_params) then + rspamd_logger.errx(ev_base, "cannot send data to clickhouse server %s: cannot make request", + settings['server']) + end +end + +local function parse_clickhouse_response(ev_base, cfg, data) + rspamd_logger.debugm(N, ev_base, "got clickhouse response: %s", data) + if data == nil then + -- clickhouse returned no data: exiting + return + end + local function parse_string(s) + local parser = ucl.parser() + local res, err = parser:parse_string(s) + if not res then + rspamd_logger.errx(ev_base, 'Parser error: %s', err) + return nil + end + return parser:get_object() + end + + -- iterate over rows + local ch_rows = lua_util.str_split(data, "\n") + for _, plain_row in pairs(ch_rows) do + if plain_row and plain_row:len() > 1 then + local parsed_row = parse_string(plain_row) + do_remove_partition(ev_base, cfg, parsed_row.table, parsed_row.partition_id) + end + end +end + + +--[[ + nil - file is not writable, do not perform removal + 0 - it's time to perform removal + - how many seconds wait until next run +]] +local function get_last_removal_ago(ev_base) + local ts_file = string.format('%s/%s', rspamd_paths['DBDIR'], 'clickhouse_retention_run') + local f, err = io.open(ts_file, 'r') + local write_file + local last_ts + + if err then + rspamd_logger.debugm(N, ev_base, 'Failed to open %s: %s', ts_file, err) + else + last_ts = tonumber(f:read('*number')) + f:close() + end + + write_file, err = io.open(ts_file, 'w') + if err then + rspamd_logger.errx(ev_base, 'Failed to open %s, will not perform retention: %s', ts_file, err) + return nil + end + + local current_ts = os.time() + + if last_ts == nil or (last_ts + settings.retention.period) <= current_ts then + local res + res, err = write_file:write(tostring(current_ts)) + if err then + rspamd_logger.errx(ev_base, 'Failed to write %s, will not perform retention: %s', ts_file, err) + return nil + end + write_file:close() + return 0 + end + + return (last_ts + settings.retention.period) - current_ts +end + +local function clickhouse_remove_old_partitions(cfg, ev_base) + local last_time_ago = get_last_removal_ago(ev_base) + if last_time_ago == nil then + rspamd_logger.errx(ev_base, "Failed to get last run time. Disabling retention") + return false + elseif last_time_ago ~= 0 then + return last_time_ago + end + + local upstream = settings.upstream:get_upstream_round_robin() + local ip_addr = upstream:get_addr():to_string(true) + + local partition_to_remove_sql = "SELECT distinct partition_id, table FROM system.parts WHERE table in ('${tables}') and max_date < toDate(now() - interval ${month} month);" + + local table_names = {} + for table_name,_ in pairs(clickhouse_schema) do + table.insert(table_names, settings[table_name]) + end + local tables = table.concat(table_names, "', '") + local sql_params = { + tables = tables, + month = settings.retention.period_months, + } + local sql = rspamd_lua_utils.template(partition_to_remove_sql, sql_params) + + local ch_params = { + body = sql, + url = string.format("%s%s/?default_format=JSONEachRow", connect_prefix, ip_addr), + ev_base = ev_base, + config = cfg, + } + if not clickhouse_request(upstream, parse_clickhouse_response, ch_params) then + rspamd_logger.errx(ev_base, "cannot send data to clickhouse server %s: cannot make request", + settings['server']) + end + + -- settings.retention.period is added on initialisation, see below + return settings.retention.period +end + local opts = rspamd_config:get_all_opt('clickhouse') if opts then for k,v in pairs(opts) do @@ -821,6 +998,30 @@ if opts then send_req(tab, rspamd_lua_utils.template(sql, settings)) end end + + if settings.retention.enable and settings.retention.method ~= 'drop' and settings.retention.method ~= 'detach' then + rspamd_logger.errx(rspamd_config, "retention.method should be either 'drop' or 'detach' (now: %s). Disabling retention", + settings.retention.method) + settings.retention.enable = false + end + if settings.retention.enable and settings.retention.period_months < 1 or settings.retention.period_months > 1000 then + rspamd_logger.errx(rspamd_config, "please, set retention.period_months between 1 and 1000 (now: %s). Disabling retention", + settings.retention.period_months) + settings.retention.enable = false + end + local period = lua_util.parse_time_interval(settings.retention.run_every) + if settings.retention.enable and period == nil then + rspamd_logger.errx(rspamd_config, "invalid value for retention.run_every (%s). Disabling retention", + settings.retention.run_every) + settings.retention.enable = false + end + + if settings.retention.enable then + settings.retention.period = period + rspamd_logger.infox(rspamd_config, "retention will be performed each %s seconds for %s month with method %s", + period, settings.retention.period_months, settings.retention.method) + rspamd_config:add_periodic(ev_base, 0, clickhouse_remove_old_partitions, false) + end end end) end -- 2.39.5