|
|
@@ -32,6 +32,7 @@ local custom_rows = {} |
|
|
|
local nrows = 0 |
|
|
|
local used_memory = 0 |
|
|
|
local last_collection = 0 |
|
|
|
local final_call = false -- If the final collection has been started |
|
|
|
local schema_version = 8 -- Current schema version |
|
|
|
|
|
|
|
local settings = { |
|
|
@@ -375,7 +376,7 @@ local function clickhouse_check_symbol(task, settings_field_name, fields_table, |
|
|
|
return false |
|
|
|
end |
|
|
|
|
|
|
|
local function clickhouse_send_data(task, ev_base, why) |
|
|
|
local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows) |
|
|
|
local log_object = task or rspamd_config |
|
|
|
local upstream = settings.upstream:get_upstream_round_robin() |
|
|
|
local ip_addr = upstream:get_addr():to_string(true) |
|
|
@@ -429,11 +430,11 @@ local function clickhouse_send_data(task, ev_base, why) |
|
|
|
clickhouse_groups_row(fields) |
|
|
|
end |
|
|
|
|
|
|
|
send_data('generic data', data_rows, |
|
|
|
send_data('generic data', gen_rows, |
|
|
|
string.format('INSERT INTO rspamd (%s)', |
|
|
|
table.concat(fields, ','))) |
|
|
|
|
|
|
|
for k,crows in pairs(custom_rows) do |
|
|
|
for k,crows in pairs(cust_rows) do |
|
|
|
if #crows > 1 then |
|
|
|
send_data('custom data ('..k..')', crows, |
|
|
|
settings.custom_rules[k].first_row()) |
|
|
@@ -929,19 +930,26 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now) |
|
|
|
return settings.check_timeout |
|
|
|
end |
|
|
|
|
|
|
|
if final_call then |
|
|
|
lua_util.debugm(N, cfg, "no need to send data, final call has been issued") |
|
|
|
return 0 |
|
|
|
end |
|
|
|
|
|
|
|
if settings.limits.max_rows > 0 then |
|
|
|
if nrows > settings.limits.max_rows then |
|
|
|
need_collect = true |
|
|
|
reason = string.format('limit of rows has been reached: %d', nrows) |
|
|
|
end |
|
|
|
end |
|
|
|
if settings.limits.max_interval > 0 then |
|
|
|
|
|
|
|
if last_collection > 0 and settings.limits.max_interval > 0 then |
|
|
|
if now - last_collection > settings.limits.max_interval then |
|
|
|
need_collect = true |
|
|
|
reason = string.format('limit of time since last collection has been reached: %d seconds passed', |
|
|
|
(now - last_collection) - settings.limits.max_interval) |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
if settings.limits.max_memory > 0 then |
|
|
|
if used_memory >= settings.limits.max_memory then |
|
|
|
need_collect = true |
|
|
@@ -950,14 +958,22 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now) |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
if last_collection == 0 then |
|
|
|
last_collection = now |
|
|
|
end |
|
|
|
|
|
|
|
if need_collect then |
|
|
|
clickhouse_send_data(nil, ev_base, reason) |
|
|
|
-- Do it atomic |
|
|
|
local saved_rows = data_rows |
|
|
|
local saved_custom = custom_rows |
|
|
|
nrows = 0 |
|
|
|
last_collection = now |
|
|
|
used_memory = 0 |
|
|
|
data_rows = {} |
|
|
|
custom_rows = {} |
|
|
|
|
|
|
|
clickhouse_send_data(nil, ev_base, reason, saved_rows, saved_custom) |
|
|
|
|
|
|
|
if settings.collect_garbadge then |
|
|
|
collectgarbage() |
|
|
|
end |
|
|
@@ -1243,7 +1259,21 @@ if opts then |
|
|
|
}) |
|
|
|
rspamd_config:register_finish_script(function(task) |
|
|
|
if nrows > 0 then |
|
|
|
clickhouse_send_data(task, nil, 'final collection') |
|
|
|
final_call = true |
|
|
|
local saved_rows = data_rows |
|
|
|
local saved_custom = custom_rows |
|
|
|
|
|
|
|
nrows = 0 |
|
|
|
data_rows = {} |
|
|
|
used_memory = 0 |
|
|
|
custom_rows = {} |
|
|
|
|
|
|
|
clickhouse_send_data(task, nil, 'final collection', |
|
|
|
saved_rows, saved_custom) |
|
|
|
|
|
|
|
if settings.collect_garbadge then |
|
|
|
collectgarbage() |
|
|
|
end |
|
|
|
end |
|
|
|
end) |
|
|
|
-- Create tables on load |