Browse Source

[Minor] Convert retention logic in Clickhouse module

tags/1.8.0
Vsevolod Stakhov 5 years ago
parent
commit
a59fb5bf8b
1 changed files with 37 additions and 76 deletions
  1. 37
    76
      src/plugins/lua/clickhouse.lua

+ 37
- 76
src/plugins/lua/clickhouse.lua View File

@@ -19,7 +19,8 @@ local rspamd_http = require "rspamd_http"
local rspamd_lua_utils = require "lua_util"
local upstream_list = require "rspamd_upstream_list"
local lua_util = require "lua_util"
local ucl = require "ucl"
local lua_clickhouse = require "lua_clickhouse"
local fun = require "fun"

local N = "clickhouse"

@@ -29,7 +30,7 @@ end

local E = {}

local rows = {}
local main_rows = {}
local attachment_rows = {}
local urls_rows = {}
local emails_rows = {}
@@ -252,7 +253,7 @@ local function clickhouse_asn_row(tname)
end

local function clickhouse_first_row()
table.insert(rows, clickhouse_main_row(settings['table']))
table.insert(main_rows, clickhouse_main_row(settings['table']))
if settings['attachments_table'] then
table.insert(attachment_rows,
clickhouse_attachments_row(settings['attachments_table']))
@@ -315,12 +316,12 @@ local function clickhouse_send_data(task)
end
end

local body = table.concat(rows, ' ')
local body = table.concat(main_rows, ' ')
if not rspamd_http.request({
task = task,
url = connect_prefix .. ip_addr,
body = body,
callback = gen_http_cb('generic data', #rows),
callback = gen_http_cb('generic data', #main_rows),
gzip = settings.use_gzip,
mime_type = 'text/plain',
timeout = settings['timeout'],
@@ -622,7 +623,7 @@ local function clickhouse_collect(task)
clickhouse_quote(from_user), clickhouse_quote(mime_user),
clickhouse_quote(rcpt_user), clickhouse_quote(rcpt_domain),
clickhouse_quote(list_id), task:get_digest())
table.insert(rows, elt)
table.insert(main_rows, elt)

if settings['from_map'] and dkim == 'allow' then
-- Use dkim
@@ -773,7 +774,7 @@ local function clickhouse_collect(task)
if nrows > settings['limit'] then
clickhouse_send_data(task)
nrows = 0
rows = {}
main_rows = {}
attachment_rows = {}
urls_rows = {}
emails_rows = {}
@@ -785,42 +786,6 @@ local function clickhouse_collect(task)
end
end

local function mk_remove_http_cb(upstream, params, ok_cb)
local function do_remove_http_cb(err_message, code, data, _)
if code ~= 200 or err_message then
if not err_message then err_message = data end
local ip_addr = upstream:get_addr():to_string(true)
rspamd_logger.errx(rspamd_config, "request failed on clickhouse server %s: %s",
ip_addr, err_message)
upstream:fail()
else
upstream:ok()
if (ok_cb) then
rspamd_logger.debugm(N, rspamd_config, "do_remove_http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
ok_cb(params.ev_base, params.config, data)
end
end
end
return do_remove_http_cb
end

local function clickhouse_request(upstream, ok_cb, params)
rspamd_logger.debugm(N, rspamd_config, "clickhouse_request: %s", params.body)

params.callback = mk_remove_http_cb(upstream, params, ok_cb)
params.gzip = settings.use_gzip
params.mime_type = 'text/plain'
params.timeout = settings['timeout']
params.no_ssl_verify = settings.no_ssl_verify
params.user = settings.user
params.password = settings.password
if not params.url then
local ip_addr = upstream:get_addr():to_string(true)
params.url = connect_prefix .. ip_addr
end
return rspamd_http.request(params)
end

local function do_remove_partition(ev_base, cfg, table_name, partition_id)
rspamd_logger.debugm(N, rspamd_config, "removing partition %s.%s", table_name, partition_id)
local upstream = settings.upstream:get_upstream_round_robin()
@@ -840,39 +805,27 @@ local function do_remove_partition(ev_base, cfg, table_name, partition_id)
cfg = cfg,
}

if not clickhouse_request(upstream, nil, ch_params) then
rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request",
local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
function(_, rows)
rspamd_logger.infox(rspamd_config,
'detached partition %s:%s on server %s', table_name, partition_id,
settings['server'])
end
end

local function parse_clickhouse_response(ev_base, cfg, data)
rspamd_logger.debugm(N, rspamd_config, "got clickhouse response: %s", data)
if data == nil then
-- clickhouse returned no data (i.e. empty resultset): exiting
return
end
local function parse_string(s)
local parser = ucl.parser()
local res, err = parser:parse_string(s)
if not res then
rspamd_logger.errx(rspamd_config, 'Parser error: %s', err)
return nil
end
return parser:get_object()
end
end,
function(_, err)
rspamd_logger.errx(rspamd_config,
"cannot detach partition %s:%s from server %s: %s",
table_name, partition_id,
settings['server'], err)
end)

-- iterate over rows
local ch_rows = lua_util.str_split(data, "\n")
for _, plain_row in pairs(ch_rows) do
if plain_row and plain_row:len() > 1 then
local parsed_row = parse_string(plain_row)
do_remove_partition(rspamd_config, cfg, parsed_row.table, parsed_row.partition)
end
if not ret then
rspamd_logger.errx(rspamd_config,
"cannot detach partition %s:%s from server %s: cannot make request",
table_name, partition_id,
settings['server'])
end
end


--[[
nil - file is not writable, do not perform removal
0 - it's time to perform removal
@@ -923,8 +876,6 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
end

local upstream = settings.upstream:get_upstream_round_robin()
local ip_addr = upstream:get_addr():to_string(true)

local partition_to_remove_sql = "SELECT distinct partition, table FROM system.parts WHERE table in ('${tables}') and max_date <= toDate(now() - interval ${month} month);"

local table_names = {}
@@ -938,13 +889,23 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
}
local sql = rspamd_lua_utils.template(partition_to_remove_sql, sql_params)


local ch_params = {
body = sql,
url = string.format("%s%s/?default_format=JSONEachRow", connect_prefix, ip_addr),
ev_base = ev_base,
config = cfg,
}
if not clickhouse_request(upstream, parse_clickhouse_response, ch_params) then
local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
function(_, rows)
fun.each(function(row)
do_remove_partition(ev_base, cfg, row.table, row.partition)
end, rows)
end,
function(_, err)
rspamd_logger.errx(rspamd_config,
"cannot send data to clickhouse server %s: %s",
settings['server'], err)
end)
if not ret then
rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request",
settings['server'])
end

Loading…
Cancel
Save