aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-07-26 16:13:20 +0100
committerGitHub <noreply@github.com>2018-07-26 16:13:20 +0100
commitd23aa45104049d0a92db39299e4f35fee3e3c006 (patch)
treef5e4565ff5b52e1814048abb65d80f1d3670c176 /src
parentfcf1677b2eb3955d62d657370aaad3f2d7247950 (diff)
parent1ba53e7f5aaee2ae2a5814c4b596cee58723fbf1 (diff)
downloadrspamd-d23aa45104049d0a92db39299e4f35fee3e3c006.tar.gz
rspamd-d23aa45104049d0a92db39299e4f35fee3e3c006.zip
Merge pull request #2375 from negram/clickhouse-retention
[Feature] Clickhouse retention
Diffstat (limited to 'src')
-rw-r--r--src/plugins/lua/clickhouse.lua215
1 files changed, 208 insertions, 7 deletions
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
index 913fedf20..f93d96c59 100644
--- a/src/plugins/lua/clickhouse.lua
+++ b/src/plugins/lua/clickhouse.lua
@@ -18,6 +18,9 @@ local rspamd_logger = require 'rspamd_logger'
local rspamd_http = require "rspamd_http"
local rspamd_lua_utils = require "lua_util"
local upstream_list = require "rspamd_upstream_list"
+local lua_util = require "lua_util"
+local ucl = require "ucl"
+
local N = "clickhouse"
if confighelp then
@@ -66,10 +69,16 @@ local settings = {
user = nil,
password = nil,
no_ssl_verify = false,
+ retention = {
+ enable = false,
+ method = 'detach',
+ period_months = 3,
+ run_every = '7d',
+ }
}
local clickhouse_schema = {
-rspamd = [[
+table = [[
CREATE TABLE IF NOT EXISTS ${table}
(
Date Date,
@@ -97,7 +106,7 @@ CREATE TABLE IF NOT EXISTS ${table}
) ENGINE = MergeTree(Date, (TS, From), 8192)
]],
- attachments = [[
+attachments_table = [[
CREATE TABLE IF NOT EXISTS ${attachments_table} (
Date Date,
Digest FixedString(32),
@@ -108,7 +117,7 @@ CREATE TABLE IF NOT EXISTS ${attachments_table} (
) ENGINE = MergeTree(Date, Digest, 8192)
]],
- urls = [[
+urls_table = [[
CREATE TABLE IF NOT EXISTS ${urls_table} (
Date Date,
Digest FixedString(32),
@@ -117,7 +126,7 @@ CREATE TABLE IF NOT EXISTS ${urls_table} (
) ENGINE = MergeTree(Date, Digest, 8192)
]],
- emails = [[
+emails_table = [[
CREATE TABLE IF NOT EXISTS ${emails_table} (
Date Date,
Digest FixedString(32),
@@ -125,7 +134,7 @@ CREATE TABLE IF NOT EXISTS ${emails_table} (
) ENGINE = MergeTree(Date, Digest, 8192)
]],
- asn = [[
+asn_table = [[
CREATE TABLE IF NOT EXISTS ${asn_table} (
Date Date,
Digest FixedString(32),
@@ -135,7 +144,7 @@ CREATE TABLE IF NOT EXISTS ${asn_table} (
) ENGINE = MergeTree(Date, Digest, 8192)
]],
- symbols = [[
+symbols_table = [[
CREATE TABLE IF NOT EXISTS ${symbols_table} (
Date Date,
Digest FixedString(32),
@@ -741,6 +750,174 @@ local function clickhouse_collect(task)
end
end
+local function mk_remove_http_cb(upstream, params, ok_cb)
+ local function do_remove_http_cb(err_message, code, data, _)
+ if code ~= 200 or err_message then
+ if not err_message then err_message = data end
+ local ip_addr = upstream:get_addr():to_string(true)
+ rspamd_logger.errx(rspamd_config, "request failed on clickhouse server %s: %s",
+ ip_addr, err_message)
+ upstream:fail()
+ else
+ upstream:ok()
+ if (ok_cb) then
+ rspamd_logger.debugm(N, rspamd_config, "do_remove_http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
+ ok_cb(params.ev_base, params.config, data)
+ end
+ end
+ end
+ return do_remove_http_cb
+end
+
+local function clickhouse_request(upstream, ok_cb, params)
+ rspamd_logger.debugm(N, rspamd_config, "clickhouse_request: %s", params.body)
+
+ params.callback = mk_remove_http_cb(upstream, params, ok_cb)
+ params.gzip = settings.use_gzip
+ params.mime_type = 'text/plain'
+ params.timeout = settings['timeout']
+ params.no_ssl_verify = settings.no_ssl_verify
+ params.user = settings.user
+ params.password = settings.password
+ if not params.url then
+ local ip_addr = upstream:get_addr():to_string(true)
+ params.url = connect_prefix .. ip_addr
+ end
+ return rspamd_http.request(params)
+end
+
+local function do_remove_partition(ev_base, cfg, table_name, partition_id)
+ rspamd_logger.debugm(N, rspamd_config, "removing partition %s.%s", table_name, partition_id)
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local remove_partition_sql = "ALTER TABLE ${table_name} ${remove_method} PARTITION ${partition_id}"
+ local remove_method = (settings.retention.method == 'drop') and 'DROP' or 'DETACH'
+ local sql_params = {
+ ['table_name'] = table_name,
+ ['remove_method'] = remove_method,
+ ['partition_id'] = partition_id
+ }
+
+ local sql = rspamd_lua_utils.template(remove_partition_sql, sql_params)
+
+ local ch_params = {
+ body = sql,
+ ev_base = ev_base,
+ cfg = cfg,
+ }
+
+ if not clickhouse_request(upstream, nil, ch_params) then
+ rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request",
+ settings['server'])
+ end
+end
+
+local function parse_clickhouse_response(ev_base, cfg, data)
+ rspamd_logger.debugm(N, rspamd_config, "got clickhouse response: %s", data)
+ if data == nil then
+ -- clickhouse returned no data (i.e. empty resultset): exiting
+ return
+ end
+ local function parse_string(s)
+ local parser = ucl.parser()
+ local res, err = parser:parse_string(s)
+ if not res then
+ rspamd_logger.errx(rspamd_config, 'Parser error: %s', err)
+ return nil
+ end
+ return parser:get_object()
+ end
+
+ -- iterate over rows
+ local ch_rows = lua_util.str_split(data, "\n")
+ for _, plain_row in pairs(ch_rows) do
+ if plain_row and plain_row:len() > 1 then
+ local parsed_row = parse_string(plain_row)
+ do_remove_partition(rspamd_config, cfg, parsed_row.table, parsed_row.partition_id)
+ end
+ end
+end
+
+
+--[[
+ nil - file is not writable, do not perform removal
+ 0 - it's time to perform removal
+ <int> - how many seconds wait until next run
+]]
+local function get_last_removal_ago()
+ local ts_file = string.format('%s/%s', rspamd_paths['DBDIR'], 'clickhouse_retention_run')
+ local f, err = io.open(ts_file, 'r')
+ local write_file
+ local last_ts
+
+ if err then
+ rspamd_logger.debugm(N, rspamd_config, 'Failed to open %s: %s', ts_file, err)
+ else
+ last_ts = tonumber(f:read('*number'))
+ f:close()
+ end
+
+ write_file, err = io.open(ts_file, 'w')
+ if err then
+ rspamd_logger.errx(rspamd_config, 'Failed to open %s, will not perform retention: %s', ts_file, err)
+ return nil
+ end
+
+ local current_ts = os.time()
+
+ if last_ts == nil or (last_ts + settings.retention.period) <= current_ts then
+ local res
+ res, err = write_file:write(tostring(current_ts))
+ if err then
+ rspamd_logger.errx(rspamd_config, 'Failed to write %s, will not perform retention: %s', ts_file, err)
+ return nil
+ end
+ write_file:close()
+ return 0
+ end
+
+ return (last_ts + settings.retention.period) - current_ts
+end
+
+local function clickhouse_remove_old_partitions(cfg, ev_base)
+ local last_time_ago = get_last_removal_ago()
+ if last_time_ago == nil then
+ rspamd_logger.errx(rspamd_config, "Failed to get last run time. Disabling retention")
+ return false
+ elseif last_time_ago ~= 0 then
+ return last_time_ago
+ end
+
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local ip_addr = upstream:get_addr():to_string(true)
+
+ local partition_to_remove_sql = "SELECT distinct partition_id, table FROM system.parts WHERE table in ('${tables}') and max_date < toDate(now() - interval ${month} month);"
+
+ local table_names = {}
+ for table_name,_ in pairs(clickhouse_schema) do
+ table.insert(table_names, settings[table_name])
+ end
+ local tables = table.concat(table_names, "', '")
+ local sql_params = {
+ tables = tables,
+ month = settings.retention.period_months,
+ }
+ local sql = rspamd_lua_utils.template(partition_to_remove_sql, sql_params)
+
+ local ch_params = {
+ body = sql,
+ url = string.format("%s%s/?default_format=JSONEachRow", connect_prefix, ip_addr),
+ ev_base = ev_base,
+ config = cfg,
+ }
+ if not clickhouse_request(upstream, parse_clickhouse_response, ch_params) then
+ rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request",
+ settings['server'])
+ end
+
+ -- settings.retention.period is added on initialisation, see below
+ return settings.retention.period
+end
+
local opts = rspamd_config:get_all_opt('clickhouse')
if opts then
for k,v in pairs(opts) do
@@ -761,7 +938,7 @@ if opts then
settings['server'] or settings['servers'], 8123)
if not settings.upstream then
- rspamd_logger.errx('cannot parse clickhouse address: %s',
+ rspamd_logger.errx(rspamd_config, 'cannot parse clickhouse address: %s',
settings['server'] or settings['servers'])
rspamd_lua_utils.disable_module(N, "config")
return
@@ -821,6 +998,30 @@ if opts then
send_req(tab, rspamd_lua_utils.template(sql, settings))
end
end
+
+ if settings.retention.enable and settings.retention.method ~= 'drop' and settings.retention.method ~= 'detach' then
+ rspamd_logger.errx(rspamd_config, "retention.method should be either 'drop' or 'detach' (now: %s). Disabling retention",
+ settings.retention.method)
+ settings.retention.enable = false
+ end
+ if settings.retention.enable and settings.retention.period_months < 1 or settings.retention.period_months > 1000 then
+ rspamd_logger.errx(rspamd_config, "please, set retention.period_months between 1 and 1000 (now: %s). Disabling retention",
+ settings.retention.period_months)
+ settings.retention.enable = false
+ end
+ local period = lua_util.parse_time_interval(settings.retention.run_every)
+ if settings.retention.enable and period == nil then
+ rspamd_logger.errx(rspamd_config, "invalid value for retention.run_every (%s). Disabling retention",
+ settings.retention.run_every)
+ settings.retention.enable = false
+ end
+
+ if settings.retention.enable then
+ settings.retention.period = period
+ rspamd_logger.infox(rspamd_config, "retention will be performed each %s seconds for %s month with method %s",
+ period, settings.retention.period_months, settings.retention.method)
+ rspamd_config:add_periodic(ev_base, 0, clickhouse_remove_old_partitions, false)
+ end
end
end)
end