]]--
local rspamd_logger = require 'rspamd_logger'
-local rspamd_lua_utils = require "lua_util"
local upstream_list = require "rspamd_upstream_list"
local lua_util = require "lua_util"
local lua_clickhouse = require "lua_clickhouse"
local data_rows = {}
local custom_rows = {}
local nrows = 0
+local used_memory = 0
+local last_collection = 0
local schema_version = 8 -- Current schema version
local settings = {
- limit = 1000,
+ limits = { -- Collection limits
+ max_rows = 1000, -- How many rows are allowed (0 for disable this)
+ max_memory = 50 * 1024 * 1024, -- How many memory should be occupied before sending collection
+ max_interval = 60, -- Maximum collection interval
+ },
+ collect_garbage = false, -- Peform GC collection after sending the data
+ check_timeout = 10.0, -- Periodic timeout
timeout = 5.0,
bayes_spam_symbols = {'BAYES_SPAM'},
bayes_ham_symbols = {'BAYES_HAM'},
return false
end
-local function clickhouse_send_data(task, ev_base)
+local function clickhouse_send_data(task, ev_base, why)
local log_object = task or rspamd_config
local upstream = settings.upstream:get_upstream_round_robin()
local ip_addr = upstream:get_addr():to_string(true)
+ rspamd_logger.infox(log_object, "trying to send %s rows to clickhouse server %s; started as %s",
+ nrows, ip_addr, why)
local function gen_success_cb(what, how_many)
return function (_, _)
- rspamd_logger.infox(log_object, "sent %s rows of %s to clickhouse server %s",
- how_many, what, ip_addr)
+ rspamd_logger.infox(log_object, "sent %s rows of %s to clickhouse server %s; started as %s",
+ how_many, what, ip_addr, why)
upstream:ok()
end
end
local function gen_fail_cb(what, how_many)
return function (_, err)
- rspamd_logger.errx(log_object, "cannot send %s rows of %s data to clickhouse server %s: %s",
- how_many, what, ip_addr, err)
+ rspamd_logger.errx(log_object, "cannot send %s rows of %s data to clickhouse server %s: %s; started as %s",
+ how_many, what, ip_addr, err, why)
upstream:fail()
end
end
return
end
- if not settings.allow_local and rspamd_lua_utils.is_rspamc_or_controller(task) then
+ if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then
return
end
end
nrows = nrows + 1
- data_rows[#data_rows + 1] = lua_clickhouse.row_to_tsv(row)
- lua_util.debugm(N, task, "add clickhouse row %s / %s", nrows, settings.limit)
-
- if nrows >= settings['limit'] then
- clickhouse_send_data(task)
- nrows = 0
- data_rows = {}
- custom_rows = {}
- end
+ local tsv_row = lua_clickhouse.row_to_tsv(row)
+ used_memory = used_memory + #tsv_row
+ data_rows[#data_rows + 1] = tsv_row
+ lua_util.debugm(N, task,
+ "add clickhouse row %s / %s; used memory: %s / %s",
+ nrows, settings.limits.max_rows,
+ used_memory, settings.limits.max_memory)
end
local function do_remove_partition(ev_base, cfg, table_name, partition_id)
['partition_id'] = partition_id
}
- local sql = rspamd_lua_utils.template(remove_partition_sql, sql_params)
+ local sql = lua_util.template(remove_partition_sql, sql_params)
local ch_params = {
body = sql,
return (last_ts + settings.retention.period) - current_ts
end
+local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now)
+ local need_collect = false
+ local reason
+
+ if nrows == 0 then
+ lua_util.debugm(N, cfg, "no need to send data, as there are no rows to collect")
+ return settings.check_timeout
+ end
+
+ if settings.limits.max_rows > 0 then
+ if nrows > settings.max_rows then
+ need_collect = true
+ reason = 'limit of rows has been reached'
+ end
+ end
+ if settings.limits.max_interval > 0 then
+ if now - last_collection > settings.limits.max_interval then
+ need_collect = true
+ reason = 'limit of time since last collection has been reached'
+ end
+ end
+ if settings.limits.max_memory > 0 then
+ if used_memory >= settings.limits.max_memory then
+ need_collect = true
+ reason = 'limit of memory has been reached'
+ end
+ end
+
+ if need_collect then
+ clickhouse_send_data(nil, ev_base, reason)
+ nrows = 0
+ last_collection = now
+ used_memory = 0
+ data_rows = {}
+ custom_rows = {}
+
+ if settings.collect_garbadge then
+ collectgarbage()
+ end
+ end
+
+ return settings.check_timeout
+end
+
local function clickhouse_remove_old_partitions(cfg, ev_base)
local last_time_ago = get_last_removal_ago()
if last_time_ago == nil then
tables = tables,
month = settings.retention.period_months,
}
- local sql = rspamd_lua_utils.template(partition_to_remove_sql, sql_params)
+ local sql = lua_util.template(partition_to_remove_sql, sql_params)
local ch_params = {
-- If we have some custom rules, we just send its schema to the upstream
for k,rule in pairs(settings.custom_rules) do
if rule.schema then
- local sql = rspamd_lua_utils.template(rule.schema, settings)
+ local sql = lua_util.template(rule.schema, settings)
local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql)
if err then
rspamd_logger.errx(rspamd_config, 'cannot send custom schema %s to clickhouse server %s: ' ..
local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql)
if err then
if rows and rows.code == 404 then
- rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table')
+ rspamd_logger.infox(rspamd_config,
+ 'table rspamd_version does not exist, check rspamd table')
check_rspamd_table(upstream, ev_base, cfg)
else
- rspamd_logger.errx(rspamd_config, "cannot get version on clickhouse server %s: %s",
+ rspamd_logger.errx(rspamd_config,
+ "cannot get version on clickhouse server %s: %s",
upstream:get_addr():to_string(true), err)
end
else
local opts = rspamd_config:get_all_opt('clickhouse')
if opts then
- for k,v in pairs(opts) do
- if k == 'custom_rules' then
- if not v[1] then
- v = {v}
- end
+ -- Legacy `limit` options
+ if opts.limit and not opts.limits then
+ settings.limits.max_rows = opts.limit
+ end
+ for k,v in pairs(opts) do
+ if k == 'custom_rules' then
+ if not v[1] then
+ v = {v}
+ end
- for i,rule in ipairs(v) do
- if rule.schema and rule.first_row and rule.get_row then
- local first_row, get_row
- local loadstring = loadstring or load
- local ret, res_or_err = pcall(loadstring(rule.first_row))
-
- if not ret or type(res_or_err) ~= 'function' then
- rspamd_logger.errx(rspamd_config, 'invalid first_row (%s) - must be a function',
- res_or_err)
- else
- first_row = res_or_err
- end
+ for i,rule in ipairs(v) do
+ if rule.schema and rule.first_row and rule.get_row then
+ local first_row, get_row
+ local loadstring = loadstring or load
+ local ret, res_or_err = pcall(loadstring(rule.first_row))
- ret, res_or_err = pcall(loadstring(rule.get_row))
+ if not ret or type(res_or_err) ~= 'function' then
+ rspamd_logger.errx(rspamd_config, 'invalid first_row (%s) - must be a function',
+ res_or_err)
+ else
+ first_row = res_or_err
+ end
- if not ret or type(res_or_err) ~= 'function' then
- rspamd_logger.errx(rspamd_config, 'invalid get_row (%s) - must be a function',
- res_or_err)
- else
- get_row = res_or_err
- end
+ ret, res_or_err = pcall(loadstring(rule.get_row))
- if first_row and get_row then
- local name = rule.name or tostring(i)
- settings.custom_rules[name] = {
- schema = rule.schema,
- first_row = first_row,
- get_row = get_row,
- }
- end
+ if not ret or type(res_or_err) ~= 'function' then
+ rspamd_logger.errx(rspamd_config,
+ 'invalid get_row (%s) - must be a function',
+ res_or_err)
else
- rspamd_logger.errx(rspamd_config, 'custom rule has no required attributes: schema, first_row and get_row')
+ get_row = res_or_err
end
+
+ if first_row and get_row then
+ local name = rule.name or tostring(i)
+ settings.custom_rules[name] = {
+ schema = rule.schema,
+ first_row = first_row,
+ get_row = get_row,
+ }
+ end
+ else
+ rspamd_logger.errx(rspamd_config, 'custom rule has no required attributes: schema, first_row and get_row')
end
- else
- settings[k] = v
end
+ else
+ settings[k] = v
end
+ end
- if not settings['server'] and not settings['servers'] then
- rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
- rspamd_lua_utils.disable_module(N, "config")
- else
- settings['from_map'] = rspamd_map_add('clickhouse', 'from_tables',
+ if not settings['server'] and not settings['servers'] then
+ rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
+ lua_util.disable_module(N, "config")
+ else
+ settings['from_map'] = rspamd_map_add('clickhouse', 'from_tables',
'regexp', 'clickhouse specific domains')
- settings.upstream = upstream_list.create(rspamd_config,
+ settings.upstream = upstream_list.create(rspamd_config,
settings['server'] or settings['servers'], 8123)
- if not settings.upstream then
- rspamd_logger.errx(rspamd_config, 'cannot parse clickhouse address: %s',
- settings['server'] or settings['servers'])
- rspamd_lua_utils.disable_module(N, "config")
- return
- end
+ if not settings.upstream then
+ rspamd_logger.errx(rspamd_config, 'cannot parse clickhouse address: %s',
+ settings['server'] or settings['servers'])
+ lua_util.disable_module(N, "config")
+ return
+ end
- if settings.exceptions then
- local maps_expressions = require "lua_maps_expressions"
+ if settings.exceptions then
+ local maps_expressions = require "lua_maps_expressions"
- settings.exceptions = maps_expressions.create(rspamd_config,
- settings.exceptions, N)
- end
+ settings.exceptions = maps_expressions.create(rspamd_config,
+ settings.exceptions, N)
+ end
- rspamd_config:register_symbol({
- name = 'CLICKHOUSE_COLLECT',
- type = 'idempotent',
- callback = clickhouse_collect,
- priority = 10,
- flags = 'empty,explicit_disable,ignore_passthrough',
- })
- rspamd_config:register_finish_script(function(task)
- if nrows > 0 then
- clickhouse_send_data(task, nil)
+ rspamd_config:register_symbol({
+ name = 'CLICKHOUSE_COLLECT',
+ type = 'idempotent',
+ callback = clickhouse_collect,
+ priority = 10,
+ flags = 'empty,explicit_disable,ignore_passthrough',
+ })
+ rspamd_config:register_finish_script(function(task)
+ if nrows > 0 then
+ clickhouse_send_data(task, nil, 'final collection')
+ end
+ end)
+ -- Create tables on load
+ rspamd_config:add_on_load(function(cfg, ev_base, worker)
+ rspamd_config:add_periodic(ev_base, 0,
+ clickhouse_maybe_send_data_periodic, true)
+ if worker:is_primary_controller() then
+ local upstreams = settings.upstream:all_upstreams()
+
+ for _,up in ipairs(upstreams) do
+ check_clickhouse_upstream(up, ev_base, cfg)
end
- end)
- -- Create tables on load
- rspamd_config:add_on_load(function(cfg, ev_base, worker)
- if worker:is_primary_controller() then
- local upstreams = settings.upstream:all_upstreams()
-
- for _,up in ipairs(upstreams) do
- check_clickhouse_upstream(up, ev_base, cfg)
- end
- if settings.retention.enable and settings.retention.method ~= 'drop' and
- settings.retention.method ~= 'detach' then
- rspamd_logger.errx(rspamd_config,
- "retention.method should be either 'drop' or 'detach' (now: %s). Disabling retention",
- settings.retention.method)
- settings.retention.enable = false
- end
- if settings.retention.enable and settings.retention.period_months < 1 or
- settings.retention.period_months > 1000 then
- rspamd_logger.errx(rspamd_config,
- "please, set retention.period_months between 1 and 1000 (now: %s). Disabling retention",
- settings.retention.period_months)
- settings.retention.enable = false
- end
- local period = lua_util.parse_time_interval(settings.retention.run_every)
- if settings.retention.enable and period == nil then
- rspamd_logger.errx(rspamd_config, "invalid value for retention.run_every (%s). Disabling retention",
- settings.retention.run_every)
- settings.retention.enable = false
- end
+ if settings.retention.enable and settings.retention.method ~= 'drop' and
+ settings.retention.method ~= 'detach' then
+ rspamd_logger.errx(rspamd_config,
+ "retention.method should be either 'drop' or 'detach' (now: %s). Disabling retention",
+ settings.retention.method)
+ settings.retention.enable = false
+ end
+ if settings.retention.enable and settings.retention.period_months < 1 or
+ settings.retention.period_months > 1000 then
+ rspamd_logger.errx(rspamd_config,
+ "please, set retention.period_months between 1 and 1000 (now: %s). Disabling retention",
+ settings.retention.period_months)
+ settings.retention.enable = false
+ end
+ local period = lua_util.parse_time_interval(settings.retention.run_every)
+ if settings.retention.enable and period == nil then
+ rspamd_logger.errx(rspamd_config, "invalid value for retention.run_every (%s). Disabling retention",
+ settings.retention.run_every)
+ settings.retention.enable = false
+ end
- if settings.retention.enable then
- settings.retention.period = period
- rspamd_logger.infox(rspamd_config,
- "retention will be performed each %s seconds for %s month with method %s",
- period, settings.retention.period_months, settings.retention.method)
- rspamd_config:add_periodic(ev_base, 0, clickhouse_remove_old_partitions, false)
- end
+ if settings.retention.enable then
+ settings.retention.period = period
+ rspamd_logger.infox(rspamd_config,
+ "retention will be performed each %s seconds for %s month with method %s",
+ period, settings.retention.period_months, settings.retention.method)
+ rspamd_config:add_periodic(ev_base, 0, clickhouse_remove_old_partitions, false)
end
- end)
- end
+ end
+ end)
+ end
end