diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-10-29 15:55:58 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-10-29 15:55:58 +0000 |
commit | 7a7791926e0ad43ce69d3ba9cc5c73dc8fa9d3a2 (patch) | |
tree | 60584f69c26a31e028b50422172f606acd74f381 /src/plugins/lua/clickhouse.lua | |
parent | db225575e350429686fce3012cbfbb38fd8fb2a4 (diff) | |
download | rspamd-7a7791926e0ad43ce69d3ba9cc5c73dc8fa9d3a2.tar.gz rspamd-7a7791926e0ad43ce69d3ba9cc5c73dc8fa9d3a2.zip |
[Feature] Clickhouse: Rework Clickhouse collection logic
Issue: #3127
Diffstat (limited to 'src/plugins/lua/clickhouse.lua')
-rw-r--r-- | src/plugins/lua/clickhouse.lua | 304 |
1 files changed, 182 insertions, 122 deletions
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index 45c555460..93165d842 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -15,7 +15,6 @@ limitations under the License. ]]-- local rspamd_logger = require 'rspamd_logger' -local rspamd_lua_utils = require "lua_util" local upstream_list = require "rspamd_upstream_list" local lua_util = require "lua_util" local lua_clickhouse = require "lua_clickhouse" @@ -31,10 +30,18 @@ end local data_rows = {} local custom_rows = {} local nrows = 0 +local used_memory = 0 +local last_collection = 0 local schema_version = 8 -- Current schema version local settings = { - limit = 1000, + limits = { -- Collection limits + max_rows = 1000, -- How many rows are allowed (0 for disable this) + max_memory = 50 * 1024 * 1024, -- How many memory should be occupied before sending collection + max_interval = 60, -- Maximum collection interval + }, + collect_garbage = false, -- Peform GC collection after sending the data + check_timeout = 10.0, -- Periodic timeout timeout = 5.0, bayes_spam_symbols = {'BAYES_SPAM'}, bayes_ham_symbols = {'BAYES_HAM'}, @@ -368,23 +375,25 @@ local function clickhouse_check_symbol(task, settings_field_name, fields_table, return false end -local function clickhouse_send_data(task, ev_base) +local function clickhouse_send_data(task, ev_base, why) local log_object = task or rspamd_config local upstream = settings.upstream:get_upstream_round_robin() local ip_addr = upstream:get_addr():to_string(true) + rspamd_logger.infox(log_object, "trying to send %s rows to clickhouse server %s; started as %s", + nrows, ip_addr, why) local function gen_success_cb(what, how_many) return function (_, _) - rspamd_logger.infox(log_object, "sent %s rows of %s to clickhouse server %s", - how_many, what, ip_addr) + rspamd_logger.infox(log_object, "sent %s rows of %s to clickhouse server %s; started as %s", + how_many, what, ip_addr, why) upstream:ok() end end local function gen_fail_cb(what, how_many) return function (_, err) - rspamd_logger.errx(log_object, "cannot send %s rows of %s data to clickhouse server %s: %s", - how_many, what, ip_addr, err) + rspamd_logger.errx(log_object, "cannot send %s rows of %s data to clickhouse server %s: %s; started as %s", + how_many, what, ip_addr, err, why) upstream:fail() end end @@ -437,7 +446,7 @@ local function clickhouse_collect(task) return end - if not settings.allow_local and rspamd_lua_utils.is_rspamc_or_controller(task) then + if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then return end @@ -828,15 +837,13 @@ local function clickhouse_collect(task) end nrows = nrows + 1 - data_rows[#data_rows + 1] = lua_clickhouse.row_to_tsv(row) - lua_util.debugm(N, task, "add clickhouse row %s / %s", nrows, settings.limit) - - if nrows >= settings['limit'] then - clickhouse_send_data(task) - nrows = 0 - data_rows = {} - custom_rows = {} - end + local tsv_row = lua_clickhouse.row_to_tsv(row) + used_memory = used_memory + #tsv_row + data_rows[#data_rows + 1] = tsv_row + lua_util.debugm(N, task, + "add clickhouse row %s / %s; used memory: %s / %s", + nrows, settings.limits.max_rows, + used_memory, settings.limits.max_memory) end local function do_remove_partition(ev_base, cfg, table_name, partition_id) @@ -850,7 +857,7 @@ local function do_remove_partition(ev_base, cfg, table_name, partition_id) ['partition_id'] = partition_id } - local sql = rspamd_lua_utils.template(remove_partition_sql, sql_params) + local sql = lua_util.template(remove_partition_sql, sql_params) local ch_params = { body = sql, @@ -913,6 +920,50 @@ local function get_last_removal_ago() return (last_ts + settings.retention.period) - current_ts end +local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now) + local need_collect = false + local reason + + if nrows == 0 then + lua_util.debugm(N, cfg, "no need to send data, as there are no rows to collect") + return settings.check_timeout + end + + if settings.limits.max_rows > 0 then + if nrows > settings.max_rows then + need_collect = true + reason = 'limit of rows has been reached' + end + end + if settings.limits.max_interval > 0 then + if now - last_collection > settings.limits.max_interval then + need_collect = true + reason = 'limit of time since last collection has been reached' + end + end + if settings.limits.max_memory > 0 then + if used_memory >= settings.limits.max_memory then + need_collect = true + reason = 'limit of memory has been reached' + end + end + + if need_collect then + clickhouse_send_data(nil, ev_base, reason) + nrows = 0 + last_collection = now + used_memory = 0 + data_rows = {} + custom_rows = {} + + if settings.collect_garbadge then + collectgarbage() + end + end + + return settings.check_timeout +end + local function clickhouse_remove_old_partitions(cfg, ev_base) local last_time_ago = get_last_removal_ago() if last_time_ago == nil then @@ -932,7 +983,7 @@ local function clickhouse_remove_old_partitions(cfg, ev_base) tables = tables, month = settings.retention.period_months, } - local sql = rspamd_lua_utils.template(partition_to_remove_sql, sql_params) + local sql = lua_util.template(partition_to_remove_sql, sql_params) local ch_params = { @@ -1076,7 +1127,7 @@ local function check_clickhouse_upstream(upstream, ev_base, cfg) -- If we have some custom rules, we just send its schema to the upstream for k,rule in pairs(settings.custom_rules) do if rule.schema then - local sql = rspamd_lua_utils.template(rule.schema, settings) + local sql = lua_util.template(rule.schema, settings) local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql) if err then rspamd_logger.errx(rspamd_config, 'cannot send custom schema %s to clickhouse server %s: ' .. @@ -1091,10 +1142,12 @@ local function check_clickhouse_upstream(upstream, ev_base, cfg) local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql) if err then if rows and rows.code == 404 then - rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table') + rspamd_logger.infox(rspamd_config, + 'table rspamd_version does not exist, check rspamd table') check_rspamd_table(upstream, ev_base, cfg) else - rspamd_logger.errx(rspamd_config, "cannot get version on clickhouse server %s: %s", + rspamd_logger.errx(rspamd_config, + "cannot get version on clickhouse server %s: %s", upstream:get_addr():to_string(true), err) end else @@ -1105,125 +1158,132 @@ end local opts = rspamd_config:get_all_opt('clickhouse') if opts then - for k,v in pairs(opts) do - if k == 'custom_rules' then - if not v[1] then - v = {v} - end + -- Legacy `limit` options + if opts.limit and not opts.limits then + settings.limits.max_rows = opts.limit + end + for k,v in pairs(opts) do + if k == 'custom_rules' then + if not v[1] then + v = {v} + end - for i,rule in ipairs(v) do - if rule.schema and rule.first_row and rule.get_row then - local first_row, get_row - local loadstring = loadstring or load - local ret, res_or_err = pcall(loadstring(rule.first_row)) - - if not ret or type(res_or_err) ~= 'function' then - rspamd_logger.errx(rspamd_config, 'invalid first_row (%s) - must be a function', - res_or_err) - else - first_row = res_or_err - end + for i,rule in ipairs(v) do + if rule.schema and rule.first_row and rule.get_row then + local first_row, get_row + local loadstring = loadstring or load + local ret, res_or_err = pcall(loadstring(rule.first_row)) - ret, res_or_err = pcall(loadstring(rule.get_row)) + if not ret or type(res_or_err) ~= 'function' then + rspamd_logger.errx(rspamd_config, 'invalid first_row (%s) - must be a function', + res_or_err) + else + first_row = res_or_err + end - if not ret or type(res_or_err) ~= 'function' then - rspamd_logger.errx(rspamd_config, 'invalid get_row (%s) - must be a function', - res_or_err) - else - get_row = res_or_err - end + ret, res_or_err = pcall(loadstring(rule.get_row)) - if first_row and get_row then - local name = rule.name or tostring(i) - settings.custom_rules[name] = { - schema = rule.schema, - first_row = first_row, - get_row = get_row, - } - end + if not ret or type(res_or_err) ~= 'function' then + rspamd_logger.errx(rspamd_config, + 'invalid get_row (%s) - must be a function', + res_or_err) else - rspamd_logger.errx(rspamd_config, 'custom rule has no required attributes: schema, first_row and get_row') + get_row = res_or_err end + + if first_row and get_row then + local name = rule.name or tostring(i) + settings.custom_rules[name] = { + schema = rule.schema, + first_row = first_row, + get_row = get_row, + } + end + else + rspamd_logger.errx(rspamd_config, 'custom rule has no required attributes: schema, first_row and get_row') end - else - settings[k] = v end + else + settings[k] = v end + end - if not settings['server'] and not settings['servers'] then - rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module') - rspamd_lua_utils.disable_module(N, "config") - else - settings['from_map'] = rspamd_map_add('clickhouse', 'from_tables', + if not settings['server'] and not settings['servers'] then + rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module') + lua_util.disable_module(N, "config") + else + settings['from_map'] = rspamd_map_add('clickhouse', 'from_tables', 'regexp', 'clickhouse specific domains') - settings.upstream = upstream_list.create(rspamd_config, + settings.upstream = upstream_list.create(rspamd_config, settings['server'] or settings['servers'], 8123) - if not settings.upstream then - rspamd_logger.errx(rspamd_config, 'cannot parse clickhouse address: %s', - settings['server'] or settings['servers']) - rspamd_lua_utils.disable_module(N, "config") - return - end + if not settings.upstream then + rspamd_logger.errx(rspamd_config, 'cannot parse clickhouse address: %s', + settings['server'] or settings['servers']) + lua_util.disable_module(N, "config") + return + end - if settings.exceptions then - local maps_expressions = require "lua_maps_expressions" + if settings.exceptions then + local maps_expressions = require "lua_maps_expressions" - settings.exceptions = maps_expressions.create(rspamd_config, - settings.exceptions, N) - end + settings.exceptions = maps_expressions.create(rspamd_config, + settings.exceptions, N) + end - rspamd_config:register_symbol({ - name = 'CLICKHOUSE_COLLECT', - type = 'idempotent', - callback = clickhouse_collect, - priority = 10, - flags = 'empty,explicit_disable,ignore_passthrough', - }) - rspamd_config:register_finish_script(function(task) - if nrows > 0 then - clickhouse_send_data(task, nil) + rspamd_config:register_symbol({ + name = 'CLICKHOUSE_COLLECT', + type = 'idempotent', + callback = clickhouse_collect, + priority = 10, + flags = 'empty,explicit_disable,ignore_passthrough', + }) + rspamd_config:register_finish_script(function(task) + if nrows > 0 then + clickhouse_send_data(task, nil, 'final collection') + end + end) + -- Create tables on load + rspamd_config:add_on_load(function(cfg, ev_base, worker) + rspamd_config:add_periodic(ev_base, 0, + clickhouse_maybe_send_data_periodic, true) + if worker:is_primary_controller() then + local upstreams = settings.upstream:all_upstreams() + + for _,up in ipairs(upstreams) do + check_clickhouse_upstream(up, ev_base, cfg) end - end) - -- Create tables on load - rspamd_config:add_on_load(function(cfg, ev_base, worker) - if worker:is_primary_controller() then - local upstreams = settings.upstream:all_upstreams() - - for _,up in ipairs(upstreams) do - check_clickhouse_upstream(up, ev_base, cfg) - end - if settings.retention.enable and settings.retention.method ~= 'drop' and - settings.retention.method ~= 'detach' then - rspamd_logger.errx(rspamd_config, - "retention.method should be either 'drop' or 'detach' (now: %s). Disabling retention", - settings.retention.method) - settings.retention.enable = false - end - if settings.retention.enable and settings.retention.period_months < 1 or - settings.retention.period_months > 1000 then - rspamd_logger.errx(rspamd_config, - "please, set retention.period_months between 1 and 1000 (now: %s). Disabling retention", - settings.retention.period_months) - settings.retention.enable = false - end - local period = lua_util.parse_time_interval(settings.retention.run_every) - if settings.retention.enable and period == nil then - rspamd_logger.errx(rspamd_config, "invalid value for retention.run_every (%s). Disabling retention", - settings.retention.run_every) - settings.retention.enable = false - end + if settings.retention.enable and settings.retention.method ~= 'drop' and + settings.retention.method ~= 'detach' then + rspamd_logger.errx(rspamd_config, + "retention.method should be either 'drop' or 'detach' (now: %s). Disabling retention", + settings.retention.method) + settings.retention.enable = false + end + if settings.retention.enable and settings.retention.period_months < 1 or + settings.retention.period_months > 1000 then + rspamd_logger.errx(rspamd_config, + "please, set retention.period_months between 1 and 1000 (now: %s). Disabling retention", + settings.retention.period_months) + settings.retention.enable = false + end + local period = lua_util.parse_time_interval(settings.retention.run_every) + if settings.retention.enable and period == nil then + rspamd_logger.errx(rspamd_config, "invalid value for retention.run_every (%s). Disabling retention", + settings.retention.run_every) + settings.retention.enable = false + end - if settings.retention.enable then - settings.retention.period = period - rspamd_logger.infox(rspamd_config, - "retention will be performed each %s seconds for %s month with method %s", - period, settings.retention.period_months, settings.retention.method) - rspamd_config:add_periodic(ev_base, 0, clickhouse_remove_old_partitions, false) - end + if settings.retention.enable then + settings.retention.period = period + rspamd_logger.infox(rspamd_config, + "retention will be performed each %s seconds for %s month with method %s", + period, settings.retention.period_months, settings.retention.method) + rspamd_config:add_periodic(ev_base, 0, clickhouse_remove_old_partitions, false) end - end) - end + end + end) + end end |