diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-07-26 16:13:20 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-07-26 16:13:20 +0100 |
commit | d23aa45104049d0a92db39299e4f35fee3e3c006 (patch) | |
tree | f5e4565ff5b52e1814048abb65d80f1d3670c176 /src | |
parent | fcf1677b2eb3955d62d657370aaad3f2d7247950 (diff) | |
parent | 1ba53e7f5aaee2ae2a5814c4b596cee58723fbf1 (diff) | |
download | rspamd-d23aa45104049d0a92db39299e4f35fee3e3c006.tar.gz rspamd-d23aa45104049d0a92db39299e4f35fee3e3c006.zip |
Merge pull request #2375 from negram/clickhouse-retention
[Feature] Clickhouse retention
Diffstat (limited to 'src')
-rw-r--r-- | src/plugins/lua/clickhouse.lua | 215 |
1 files changed, 208 insertions, 7 deletions
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index 913fedf20..f93d96c59 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -18,6 +18,9 @@ local rspamd_logger = require 'rspamd_logger' local rspamd_http = require "rspamd_http" local rspamd_lua_utils = require "lua_util" local upstream_list = require "rspamd_upstream_list" +local lua_util = require "lua_util" +local ucl = require "ucl" + local N = "clickhouse" if confighelp then @@ -66,10 +69,16 @@ local settings = { user = nil, password = nil, no_ssl_verify = false, + retention = { + enable = false, + method = 'detach', + period_months = 3, + run_every = '7d', + } } local clickhouse_schema = { -rspamd = [[ +table = [[ CREATE TABLE IF NOT EXISTS ${table} ( Date Date, @@ -97,7 +106,7 @@ CREATE TABLE IF NOT EXISTS ${table} ) ENGINE = MergeTree(Date, (TS, From), 8192) ]], - attachments = [[ +attachments_table = [[ CREATE TABLE IF NOT EXISTS ${attachments_table} ( Date Date, Digest FixedString(32), @@ -108,7 +117,7 @@ CREATE TABLE IF NOT EXISTS ${attachments_table} ( ) ENGINE = MergeTree(Date, Digest, 8192) ]], - urls = [[ +urls_table = [[ CREATE TABLE IF NOT EXISTS ${urls_table} ( Date Date, Digest FixedString(32), @@ -117,7 +126,7 @@ CREATE TABLE IF NOT EXISTS ${urls_table} ( ) ENGINE = MergeTree(Date, Digest, 8192) ]], - emails = [[ +emails_table = [[ CREATE TABLE IF NOT EXISTS ${emails_table} ( Date Date, Digest FixedString(32), @@ -125,7 +134,7 @@ CREATE TABLE IF NOT EXISTS ${emails_table} ( ) ENGINE = MergeTree(Date, Digest, 8192) ]], - asn = [[ +asn_table = [[ CREATE TABLE IF NOT EXISTS ${asn_table} ( Date Date, Digest FixedString(32), @@ -135,7 +144,7 @@ CREATE TABLE IF NOT EXISTS ${asn_table} ( ) ENGINE = MergeTree(Date, Digest, 8192) ]], - symbols = [[ +symbols_table = [[ CREATE TABLE IF NOT EXISTS ${symbols_table} ( Date Date, Digest FixedString(32), @@ -741,6 +750,174 @@ local function clickhouse_collect(task) end end +local function mk_remove_http_cb(upstream, params, ok_cb) + local function do_remove_http_cb(err_message, code, data, _) + if code ~= 200 or err_message then + if not err_message then err_message = data end + local ip_addr = upstream:get_addr():to_string(true) + rspamd_logger.errx(rspamd_config, "request failed on clickhouse server %s: %s", + ip_addr, err_message) + upstream:fail() + else + upstream:ok() + if (ok_cb) then + rspamd_logger.debugm(N, rspamd_config, "do_remove_http_cb ok: %s, %s, %s, %s", err_message, code, data, _) + ok_cb(params.ev_base, params.config, data) + end + end + end + return do_remove_http_cb +end + +local function clickhouse_request(upstream, ok_cb, params) + rspamd_logger.debugm(N, rspamd_config, "clickhouse_request: %s", params.body) + + params.callback = mk_remove_http_cb(upstream, params, ok_cb) + params.gzip = settings.use_gzip + params.mime_type = 'text/plain' + params.timeout = settings['timeout'] + params.no_ssl_verify = settings.no_ssl_verify + params.user = settings.user + params.password = settings.password + if not params.url then + local ip_addr = upstream:get_addr():to_string(true) + params.url = connect_prefix .. ip_addr + end + return rspamd_http.request(params) +end + +local function do_remove_partition(ev_base, cfg, table_name, partition_id) + rspamd_logger.debugm(N, rspamd_config, "removing partition %s.%s", table_name, partition_id) + local upstream = settings.upstream:get_upstream_round_robin() + local remove_partition_sql = "ALTER TABLE ${table_name} ${remove_method} PARTITION ${partition_id}" + local remove_method = (settings.retention.method == 'drop') and 'DROP' or 'DETACH' + local sql_params = { + ['table_name'] = table_name, + ['remove_method'] = remove_method, + ['partition_id'] = partition_id + } + + local sql = rspamd_lua_utils.template(remove_partition_sql, sql_params) + + local ch_params = { + body = sql, + ev_base = ev_base, + cfg = cfg, + } + + if not clickhouse_request(upstream, nil, ch_params) then + rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request", + settings['server']) + end +end + +local function parse_clickhouse_response(ev_base, cfg, data) + rspamd_logger.debugm(N, rspamd_config, "got clickhouse response: %s", data) + if data == nil then + -- clickhouse returned no data (i.e. empty resultset): exiting + return + end + local function parse_string(s) + local parser = ucl.parser() + local res, err = parser:parse_string(s) + if not res then + rspamd_logger.errx(rspamd_config, 'Parser error: %s', err) + return nil + end + return parser:get_object() + end + + -- iterate over rows + local ch_rows = lua_util.str_split(data, "\n") + for _, plain_row in pairs(ch_rows) do + if plain_row and plain_row:len() > 1 then + local parsed_row = parse_string(plain_row) + do_remove_partition(rspamd_config, cfg, parsed_row.table, parsed_row.partition_id) + end + end +end + + +--[[ + nil - file is not writable, do not perform removal + 0 - it's time to perform removal + <int> - how many seconds wait until next run +]] +local function get_last_removal_ago() + local ts_file = string.format('%s/%s', rspamd_paths['DBDIR'], 'clickhouse_retention_run') + local f, err = io.open(ts_file, 'r') + local write_file + local last_ts + + if err then + rspamd_logger.debugm(N, rspamd_config, 'Failed to open %s: %s', ts_file, err) + else + last_ts = tonumber(f:read('*number')) + f:close() + end + + write_file, err = io.open(ts_file, 'w') + if err then + rspamd_logger.errx(rspamd_config, 'Failed to open %s, will not perform retention: %s', ts_file, err) + return nil + end + + local current_ts = os.time() + + if last_ts == nil or (last_ts + settings.retention.period) <= current_ts then + local res + res, err = write_file:write(tostring(current_ts)) + if err then + rspamd_logger.errx(rspamd_config, 'Failed to write %s, will not perform retention: %s', ts_file, err) + return nil + end + write_file:close() + return 0 + end + + return (last_ts + settings.retention.period) - current_ts +end + +local function clickhouse_remove_old_partitions(cfg, ev_base) + local last_time_ago = get_last_removal_ago() + if last_time_ago == nil then + rspamd_logger.errx(rspamd_config, "Failed to get last run time. Disabling retention") + return false + elseif last_time_ago ~= 0 then + return last_time_ago + end + + local upstream = settings.upstream:get_upstream_round_robin() + local ip_addr = upstream:get_addr():to_string(true) + + local partition_to_remove_sql = "SELECT distinct partition_id, table FROM system.parts WHERE table in ('${tables}') and max_date < toDate(now() - interval ${month} month);" + + local table_names = {} + for table_name,_ in pairs(clickhouse_schema) do + table.insert(table_names, settings[table_name]) + end + local tables = table.concat(table_names, "', '") + local sql_params = { + tables = tables, + month = settings.retention.period_months, + } + local sql = rspamd_lua_utils.template(partition_to_remove_sql, sql_params) + + local ch_params = { + body = sql, + url = string.format("%s%s/?default_format=JSONEachRow", connect_prefix, ip_addr), + ev_base = ev_base, + config = cfg, + } + if not clickhouse_request(upstream, parse_clickhouse_response, ch_params) then + rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request", + settings['server']) + end + + -- settings.retention.period is added on initialisation, see below + return settings.retention.period +end + local opts = rspamd_config:get_all_opt('clickhouse') if opts then for k,v in pairs(opts) do @@ -761,7 +938,7 @@ if opts then settings['server'] or settings['servers'], 8123) if not settings.upstream then - rspamd_logger.errx('cannot parse clickhouse address: %s', + rspamd_logger.errx(rspamd_config, 'cannot parse clickhouse address: %s', settings['server'] or settings['servers']) rspamd_lua_utils.disable_module(N, "config") return @@ -821,6 +998,30 @@ if opts then send_req(tab, rspamd_lua_utils.template(sql, settings)) end end + + if settings.retention.enable and settings.retention.method ~= 'drop' and settings.retention.method ~= 'detach' then + rspamd_logger.errx(rspamd_config, "retention.method should be either 'drop' or 'detach' (now: %s). Disabling retention", + settings.retention.method) + settings.retention.enable = false + end + if settings.retention.enable and settings.retention.period_months < 1 or settings.retention.period_months > 1000 then + rspamd_logger.errx(rspamd_config, "please, set retention.period_months between 1 and 1000 (now: %s). Disabling retention", + settings.retention.period_months) + settings.retention.enable = false + end + local period = lua_util.parse_time_interval(settings.retention.run_every) + if settings.retention.enable and period == nil then + rspamd_logger.errx(rspamd_config, "invalid value for retention.run_every (%s). Disabling retention", + settings.retention.run_every) + settings.retention.enable = false + end + + if settings.retention.enable then + settings.retention.period = period + rspamd_logger.infox(rspamd_config, "retention will be performed each %s seconds for %s month with method %s", + period, settings.retention.period_months, settings.retention.method) + rspamd_config:add_periodic(ev_base, 0, clickhouse_remove_old_partitions, false) + end end end) end |