if code ~= 200 or err_message then
if not err_message then err_message = data end
local ip_addr = upstream:get_addr():to_string(true)
- rspamd_logger.errx(params.ev_base, "request failed on clickhouse server %s: %s",
+ rspamd_logger.errx(rspamd_config, "request failed on clickhouse server %s: %s",
ip_addr, err_message)
upstream:fail()
else
upstream:ok()
if (ok_cb) then
- rspamd_logger.debugm(N, params.ev_base, "do_remove_http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
+ rspamd_logger.debugm(N, rspamd_config, "do_remove_http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
ok_cb(params.ev_base, params.config, data)
end
end
end
local function do_remove_partition(ev_base, cfg, table_name, partition_id)
- rspamd_logger.debugm(N, ev_base, "removing partition %s.%s", table_name, partition_id)
+ rspamd_logger.debugm(N, rspamd_config, "removing partition %s.%s", table_name, partition_id)
local upstream = settings.upstream:get_upstream_round_robin()
local remove_partition_sql = "ALTER TABLE ${table_name} ${remove_method} PARTITION ${partition_id}"
local remove_method = (settings.retention.method == 'drop') and 'DROP' or 'DETACH'
}
if not clickhouse_request(upstream, nil, ch_params) then
- rspamd_logger.errx(ev_base, "cannot send data to clickhouse server %s: cannot make request",
+ rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request",
settings['server'])
end
end
local function parse_clickhouse_response(ev_base, cfg, data)
- rspamd_logger.debugm(N, ev_base, "got clickhouse response: %s", data)
+ rspamd_logger.debugm(N, rspamd_config, "got clickhouse response: %s", data)
if data == nil then
-- clickhouse returned no data (i.e. empty resultset): exiting
return
local parser = ucl.parser()
local res, err = parser:parse_string(s)
if not res then
- rspamd_logger.errx(ev_base, 'Parser error: %s', err)
+ rspamd_logger.errx(rspamd_config, 'Parser error: %s', err)
return nil
end
return parser:get_object()
for _, plain_row in pairs(ch_rows) do
if plain_row and plain_row:len() > 1 then
local parsed_row = parse_string(plain_row)
- do_remove_partition(ev_base, cfg, parsed_row.table, parsed_row.partition_id)
+ do_remove_partition(rspamd_config, cfg, parsed_row.table, parsed_row.partition_id)
end
end
end
0 - it's time to perform removal
<int> - how many seconds wait until next run
]]
-local function get_last_removal_ago(ev_base)
+local function get_last_removal_ago()
local ts_file = string.format('%s/%s', rspamd_paths['DBDIR'], 'clickhouse_retention_run')
local f, err = io.open(ts_file, 'r')
local write_file
local last_ts
if err then
- rspamd_logger.debugm(N, ev_base, 'Failed to open %s: %s', ts_file, err)
+ rspamd_logger.debugm(N, rspamd_config, 'Failed to open %s: %s', ts_file, err)
else
last_ts = tonumber(f:read('*number'))
f:close()
write_file, err = io.open(ts_file, 'w')
if err then
- rspamd_logger.errx(ev_base, 'Failed to open %s, will not perform retention: %s', ts_file, err)
+ rspamd_logger.errx(rspamd_config, 'Failed to open %s, will not perform retention: %s', ts_file, err)
return nil
end
local res
res, err = write_file:write(tostring(current_ts))
if err then
- rspamd_logger.errx(ev_base, 'Failed to write %s, will not perform retention: %s', ts_file, err)
+ rspamd_logger.errx(rspamd_config, 'Failed to write %s, will not perform retention: %s', ts_file, err)
return nil
end
write_file:close()
end
local function clickhouse_remove_old_partitions(cfg, ev_base)
- local last_time_ago = get_last_removal_ago(ev_base)
+ local last_time_ago = get_last_removal_ago()
if last_time_ago == nil then
- rspamd_logger.errx(ev_base, "Failed to get last run time. Disabling retention")
+ rspamd_logger.errx(rspamd_config, "Failed to get last run time. Disabling retention")
return false
elseif last_time_ago ~= 0 then
return last_time_ago
config = cfg,
}
if not clickhouse_request(upstream, parse_clickhouse_response, ch_params) then
- rspamd_logger.errx(ev_base, "cannot send data to clickhouse server %s: cannot make request",
+ rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request",
settings['server'])
end
settings['server'] or settings['servers'], 8123)
if not settings.upstream then
- rspamd_logger.errx('cannot parse clickhouse address: %s',
+ rspamd_logger.errx(rspamd_config, 'cannot parse clickhouse address: %s',
settings['server'] or settings['servers'])
rspamd_lua_utils.disable_module(N, "config")
return