|
|
@@ -242,13 +242,14 @@ local function clickhouse_check_symbol(task, symbols, need_score) |
|
|
|
return false |
|
|
|
end |
|
|
|
|
|
|
|
local function clickhouse_send_data(task) |
|
|
|
local function clickhouse_send_data(task, ev_base) |
|
|
|
local log_object = task or rspamd_config |
|
|
|
local upstream = settings.upstream:get_upstream_round_robin() |
|
|
|
local ip_addr = upstream:get_addr():to_string(true) |
|
|
|
|
|
|
|
local function gen_success_cb(what, how_many) |
|
|
|
return function (_, _) |
|
|
|
rspamd_logger.infox(task, "sent %s rows of %s to clickhouse server %s", |
|
|
|
rspamd_logger.infox(log_object, "sent %s rows of %s to clickhouse server %s", |
|
|
|
how_many, what, ip_addr) |
|
|
|
upstream:ok() |
|
|
|
end |
|
|
@@ -256,23 +257,27 @@ local function clickhouse_send_data(task) |
|
|
|
|
|
|
|
local function gen_fail_cb(what, how_many) |
|
|
|
return function (_, err) |
|
|
|
rspamd_logger.errx(task, "cannot send %s rows of %s data to clickhouse server %s: %s", |
|
|
|
rspamd_logger.errx(log_object, "cannot send %s rows of %s data to clickhouse server %s: %s", |
|
|
|
how_many, what, ip_addr, err) |
|
|
|
upstream:fail() |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
local function send_data(what, tbl, query) |
|
|
|
local ch_params = { |
|
|
|
task = task, |
|
|
|
} |
|
|
|
local ch_params = {} |
|
|
|
if task then |
|
|
|
ch_params.task = task |
|
|
|
else |
|
|
|
ch_params.config = rspamd_config |
|
|
|
ch_params.ev_base = ev_base |
|
|
|
end |
|
|
|
|
|
|
|
local ret = lua_clickhouse.insert(upstream, settings, ch_params, |
|
|
|
query, tbl, |
|
|
|
gen_success_cb(what, #tbl), |
|
|
|
gen_fail_cb(what, #tbl)) |
|
|
|
if not ret then |
|
|
|
rspamd_logger.errx(task, "cannot send %s rows of %s data to clickhouse server %s: %s", |
|
|
|
rspamd_logger.errx(log_object, "cannot send %s rows of %s data to clickhouse server %s: %s", |
|
|
|
#tbl, what, ip_addr, 'cannot make HTTP request') |
|
|
|
end |
|
|
|
end |
|
|
@@ -927,9 +932,9 @@ if opts then |
|
|
|
priority = 10, |
|
|
|
flags = 'empty,explicit_disable,ignore_passthrough', |
|
|
|
}) |
|
|
|
rspamd_config:register_finish_script(function(task) |
|
|
|
rspamd_config:register_finish_script(function(_, ev_base, _) |
|
|
|
if nrows > 0 then |
|
|
|
clickhouse_send_data(task) |
|
|
|
clickhouse_send_data(nil, ev_base) |
|
|
|
end |
|
|
|
end) |
|
|
|
-- Create tables on load |