diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-08-06 16:31:04 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-08-06 18:00:34 +0100 |
commit | a59fb5bf8b4d6ee0900d3c74286d46e30ed2b560 (patch) | |
tree | 0576c78d90e08d5544bf3c43948b0f9531ee0b27 | |
parent | c0dd861ca9f579379ab9957ef98c1aaf20f672b3 (diff) | |
download | rspamd-a59fb5bf8b4d6ee0900d3c74286d46e30ed2b560.tar.gz rspamd-a59fb5bf8b4d6ee0900d3c74286d46e30ed2b560.zip |
[Minor] Convert retention logic in Clickhouse module
-rw-r--r-- | src/plugins/lua/clickhouse.lua | 113 |
1 files changed, 37 insertions, 76 deletions
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index f3f77849d..a4fd07034 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -19,7 +19,8 @@ local rspamd_http = require "rspamd_http" local rspamd_lua_utils = require "lua_util" local upstream_list = require "rspamd_upstream_list" local lua_util = require "lua_util" -local ucl = require "ucl" +local lua_clickhouse = require "lua_clickhouse" +local fun = require "fun" local N = "clickhouse" @@ -29,7 +30,7 @@ end local E = {} -local rows = {} +local main_rows = {} local attachment_rows = {} local urls_rows = {} local emails_rows = {} @@ -252,7 +253,7 @@ local function clickhouse_asn_row(tname) end local function clickhouse_first_row() - table.insert(rows, clickhouse_main_row(settings['table'])) + table.insert(main_rows, clickhouse_main_row(settings['table'])) if settings['attachments_table'] then table.insert(attachment_rows, clickhouse_attachments_row(settings['attachments_table'])) @@ -315,12 +316,12 @@ local function clickhouse_send_data(task) end end - local body = table.concat(rows, ' ') + local body = table.concat(main_rows, ' ') if not rspamd_http.request({ task = task, url = connect_prefix .. ip_addr, body = body, - callback = gen_http_cb('generic data', #rows), + callback = gen_http_cb('generic data', #main_rows), gzip = settings.use_gzip, mime_type = 'text/plain', timeout = settings['timeout'], @@ -622,7 +623,7 @@ local function clickhouse_collect(task) clickhouse_quote(from_user), clickhouse_quote(mime_user), clickhouse_quote(rcpt_user), clickhouse_quote(rcpt_domain), clickhouse_quote(list_id), task:get_digest()) - table.insert(rows, elt) + table.insert(main_rows, elt) if settings['from_map'] and dkim == 'allow' then -- Use dkim @@ -773,7 +774,7 @@ local function clickhouse_collect(task) if nrows > settings['limit'] then clickhouse_send_data(task) nrows = 0 - rows = {} + main_rows = {} attachment_rows = {} urls_rows = {} emails_rows = {} @@ -785,42 +786,6 @@ local function clickhouse_collect(task) end end -local function mk_remove_http_cb(upstream, params, ok_cb) - local function do_remove_http_cb(err_message, code, data, _) - if code ~= 200 or err_message then - if not err_message then err_message = data end - local ip_addr = upstream:get_addr():to_string(true) - rspamd_logger.errx(rspamd_config, "request failed on clickhouse server %s: %s", - ip_addr, err_message) - upstream:fail() - else - upstream:ok() - if (ok_cb) then - rspamd_logger.debugm(N, rspamd_config, "do_remove_http_cb ok: %s, %s, %s, %s", err_message, code, data, _) - ok_cb(params.ev_base, params.config, data) - end - end - end - return do_remove_http_cb -end - -local function clickhouse_request(upstream, ok_cb, params) - rspamd_logger.debugm(N, rspamd_config, "clickhouse_request: %s", params.body) - - params.callback = mk_remove_http_cb(upstream, params, ok_cb) - params.gzip = settings.use_gzip - params.mime_type = 'text/plain' - params.timeout = settings['timeout'] - params.no_ssl_verify = settings.no_ssl_verify - params.user = settings.user - params.password = settings.password - if not params.url then - local ip_addr = upstream:get_addr():to_string(true) - params.url = connect_prefix .. ip_addr - end - return rspamd_http.request(params) -end - local function do_remove_partition(ev_base, cfg, table_name, partition_id) rspamd_logger.debugm(N, rspamd_config, "removing partition %s.%s", table_name, partition_id) local upstream = settings.upstream:get_upstream_round_robin() @@ -840,39 +805,27 @@ local function do_remove_partition(ev_base, cfg, table_name, partition_id) cfg = cfg, } - if not clickhouse_request(upstream, nil, ch_params) then - rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request", + local ret = lua_clickhouse.select(upstream, settings, ch_params, sql, + function(_, rows) + rspamd_logger.infox(rspamd_config, + 'detached partition %s:%s on server %s', table_name, partition_id, settings['server']) - end -end - -local function parse_clickhouse_response(ev_base, cfg, data) - rspamd_logger.debugm(N, rspamd_config, "got clickhouse response: %s", data) - if data == nil then - -- clickhouse returned no data (i.e. empty resultset): exiting - return - end - local function parse_string(s) - local parser = ucl.parser() - local res, err = parser:parse_string(s) - if not res then - rspamd_logger.errx(rspamd_config, 'Parser error: %s', err) - return nil - end - return parser:get_object() - end + end, + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot detach partition %s:%s from server %s: %s", + table_name, partition_id, + settings['server'], err) + end) - -- iterate over rows - local ch_rows = lua_util.str_split(data, "\n") - for _, plain_row in pairs(ch_rows) do - if plain_row and plain_row:len() > 1 then - local parsed_row = parse_string(plain_row) - do_remove_partition(rspamd_config, cfg, parsed_row.table, parsed_row.partition) - end + if not ret then + rspamd_logger.errx(rspamd_config, + "cannot detach partition %s:%s from server %s: cannot make request", + table_name, partition_id, + settings['server']) end end - --[[ nil - file is not writable, do not perform removal 0 - it's time to perform removal @@ -923,8 +876,6 @@ local function clickhouse_remove_old_partitions(cfg, ev_base) end local upstream = settings.upstream:get_upstream_round_robin() - local ip_addr = upstream:get_addr():to_string(true) - local partition_to_remove_sql = "SELECT distinct partition, table FROM system.parts WHERE table in ('${tables}') and max_date <= toDate(now() - interval ${month} month);" local table_names = {} @@ -938,13 +889,23 @@ local function clickhouse_remove_old_partitions(cfg, ev_base) } local sql = rspamd_lua_utils.template(partition_to_remove_sql, sql_params) + local ch_params = { - body = sql, - url = string.format("%s%s/?default_format=JSONEachRow", connect_prefix, ip_addr), ev_base = ev_base, config = cfg, } - if not clickhouse_request(upstream, parse_clickhouse_response, ch_params) then + local ret = lua_clickhouse.select(upstream, settings, ch_params, sql, + function(_, rows) + fun.each(function(row) + do_remove_partition(ev_base, cfg, row.table, row.partition) + end, rows) + end, + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot send data to clickhouse server %s: %s", + settings['server'], err) + end) + if not ret then rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request", settings['server']) end |