aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-08-06 16:31:04 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-08-06 18:00:34 +0100
commita59fb5bf8b4d6ee0900d3c74286d46e30ed2b560 (patch)
tree0576c78d90e08d5544bf3c43948b0f9531ee0b27
parentc0dd861ca9f579379ab9957ef98c1aaf20f672b3 (diff)
downloadrspamd-a59fb5bf8b4d6ee0900d3c74286d46e30ed2b560.tar.gz
rspamd-a59fb5bf8b4d6ee0900d3c74286d46e30ed2b560.zip
[Minor] Convert retention logic in Clickhouse module
-rw-r--r--src/plugins/lua/clickhouse.lua113
1 files changed, 37 insertions, 76 deletions
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
index f3f77849d..a4fd07034 100644
--- a/src/plugins/lua/clickhouse.lua
+++ b/src/plugins/lua/clickhouse.lua
@@ -19,7 +19,8 @@ local rspamd_http = require "rspamd_http"
local rspamd_lua_utils = require "lua_util"
local upstream_list = require "rspamd_upstream_list"
local lua_util = require "lua_util"
-local ucl = require "ucl"
+local lua_clickhouse = require "lua_clickhouse"
+local fun = require "fun"
local N = "clickhouse"
@@ -29,7 +30,7 @@ end
local E = {}
-local rows = {}
+local main_rows = {}
local attachment_rows = {}
local urls_rows = {}
local emails_rows = {}
@@ -252,7 +253,7 @@ local function clickhouse_asn_row(tname)
end
local function clickhouse_first_row()
- table.insert(rows, clickhouse_main_row(settings['table']))
+ table.insert(main_rows, clickhouse_main_row(settings['table']))
if settings['attachments_table'] then
table.insert(attachment_rows,
clickhouse_attachments_row(settings['attachments_table']))
@@ -315,12 +316,12 @@ local function clickhouse_send_data(task)
end
end
- local body = table.concat(rows, ' ')
+ local body = table.concat(main_rows, ' ')
if not rspamd_http.request({
task = task,
url = connect_prefix .. ip_addr,
body = body,
- callback = gen_http_cb('generic data', #rows),
+ callback = gen_http_cb('generic data', #main_rows),
gzip = settings.use_gzip,
mime_type = 'text/plain',
timeout = settings['timeout'],
@@ -622,7 +623,7 @@ local function clickhouse_collect(task)
clickhouse_quote(from_user), clickhouse_quote(mime_user),
clickhouse_quote(rcpt_user), clickhouse_quote(rcpt_domain),
clickhouse_quote(list_id), task:get_digest())
- table.insert(rows, elt)
+ table.insert(main_rows, elt)
if settings['from_map'] and dkim == 'allow' then
-- Use dkim
@@ -773,7 +774,7 @@ local function clickhouse_collect(task)
if nrows > settings['limit'] then
clickhouse_send_data(task)
nrows = 0
- rows = {}
+ main_rows = {}
attachment_rows = {}
urls_rows = {}
emails_rows = {}
@@ -785,42 +786,6 @@ local function clickhouse_collect(task)
end
end
-local function mk_remove_http_cb(upstream, params, ok_cb)
- local function do_remove_http_cb(err_message, code, data, _)
- if code ~= 200 or err_message then
- if not err_message then err_message = data end
- local ip_addr = upstream:get_addr():to_string(true)
- rspamd_logger.errx(rspamd_config, "request failed on clickhouse server %s: %s",
- ip_addr, err_message)
- upstream:fail()
- else
- upstream:ok()
- if (ok_cb) then
- rspamd_logger.debugm(N, rspamd_config, "do_remove_http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
- ok_cb(params.ev_base, params.config, data)
- end
- end
- end
- return do_remove_http_cb
-end
-
-local function clickhouse_request(upstream, ok_cb, params)
- rspamd_logger.debugm(N, rspamd_config, "clickhouse_request: %s", params.body)
-
- params.callback = mk_remove_http_cb(upstream, params, ok_cb)
- params.gzip = settings.use_gzip
- params.mime_type = 'text/plain'
- params.timeout = settings['timeout']
- params.no_ssl_verify = settings.no_ssl_verify
- params.user = settings.user
- params.password = settings.password
- if not params.url then
- local ip_addr = upstream:get_addr():to_string(true)
- params.url = connect_prefix .. ip_addr
- end
- return rspamd_http.request(params)
-end
-
local function do_remove_partition(ev_base, cfg, table_name, partition_id)
rspamd_logger.debugm(N, rspamd_config, "removing partition %s.%s", table_name, partition_id)
local upstream = settings.upstream:get_upstream_round_robin()
@@ -840,39 +805,27 @@ local function do_remove_partition(ev_base, cfg, table_name, partition_id)
cfg = cfg,
}
- if not clickhouse_request(upstream, nil, ch_params) then
- rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request",
+ local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
+ function(_, rows)
+ rspamd_logger.infox(rspamd_config,
+ 'detached partition %s:%s on server %s', table_name, partition_id,
settings['server'])
- end
-end
-
-local function parse_clickhouse_response(ev_base, cfg, data)
- rspamd_logger.debugm(N, rspamd_config, "got clickhouse response: %s", data)
- if data == nil then
- -- clickhouse returned no data (i.e. empty resultset): exiting
- return
- end
- local function parse_string(s)
- local parser = ucl.parser()
- local res, err = parser:parse_string(s)
- if not res then
- rspamd_logger.errx(rspamd_config, 'Parser error: %s', err)
- return nil
- end
- return parser:get_object()
- end
+ end,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot detach partition %s:%s from server %s: %s",
+ table_name, partition_id,
+ settings['server'], err)
+ end)
- -- iterate over rows
- local ch_rows = lua_util.str_split(data, "\n")
- for _, plain_row in pairs(ch_rows) do
- if plain_row and plain_row:len() > 1 then
- local parsed_row = parse_string(plain_row)
- do_remove_partition(rspamd_config, cfg, parsed_row.table, parsed_row.partition)
- end
+ if not ret then
+ rspamd_logger.errx(rspamd_config,
+ "cannot detach partition %s:%s from server %s: cannot make request",
+ table_name, partition_id,
+ settings['server'])
end
end
-
--[[
nil - file is not writable, do not perform removal
0 - it's time to perform removal
@@ -923,8 +876,6 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
end
local upstream = settings.upstream:get_upstream_round_robin()
- local ip_addr = upstream:get_addr():to_string(true)
-
local partition_to_remove_sql = "SELECT distinct partition, table FROM system.parts WHERE table in ('${tables}') and max_date <= toDate(now() - interval ${month} month);"
local table_names = {}
@@ -938,13 +889,23 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
}
local sql = rspamd_lua_utils.template(partition_to_remove_sql, sql_params)
+
local ch_params = {
- body = sql,
- url = string.format("%s%s/?default_format=JSONEachRow", connect_prefix, ip_addr),
ev_base = ev_base,
config = cfg,
}
- if not clickhouse_request(upstream, parse_clickhouse_response, ch_params) then
+ local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
+ function(_, rows)
+ fun.each(function(row)
+ do_remove_partition(ev_base, cfg, row.table, row.partition)
+ end, rows)
+ end,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot send data to clickhouse server %s: %s",
+ settings['server'], err)
+ end)
+ if not ret then
rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request",
settings['server'])
end