aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/lua/clickhouse.lua
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/lua/clickhouse.lua')
-rw-r--r--src/plugins/lua/clickhouse.lua47
1 files changed, 39 insertions, 8 deletions
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
index 878a15064..c4e30f1db 100644
--- a/src/plugins/lua/clickhouse.lua
+++ b/src/plugins/lua/clickhouse.lua
@@ -32,6 +32,7 @@ local custom_rows = {}
local nrows = 0
local used_memory = 0
local last_collection = 0
+local final_call = false -- If the final collection has been started
local schema_version = 8 -- Current schema version
local settings = {
@@ -375,7 +376,7 @@ local function clickhouse_check_symbol(task, settings_field_name, fields_table,
return false
end
-local function clickhouse_send_data(task, ev_base, why)
+local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows)
local log_object = task or rspamd_config
local upstream = settings.upstream:get_upstream_round_robin()
local ip_addr = upstream:get_addr():to_string(true)
@@ -429,11 +430,11 @@ local function clickhouse_send_data(task, ev_base, why)
clickhouse_groups_row(fields)
end
- send_data('generic data', data_rows,
+ send_data('generic data', gen_rows,
string.format('INSERT INTO rspamd (%s)',
table.concat(fields, ',')))
- for k,crows in pairs(custom_rows) do
+ for k,crows in pairs(cust_rows) do
if #crows > 1 then
send_data('custom data ('..k..')', crows,
settings.custom_rules[k].first_row())
@@ -929,19 +930,27 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now)
return settings.check_timeout
end
+ if final_call then
+ lua_util.debugm(N, cfg, "no need to send data, final call has been issued")
+ return 0
+ end
+
if settings.limits.max_rows > 0 then
if nrows > settings.limits.max_rows then
need_collect = true
reason = string.format('limit of rows has been reached: %d', nrows)
end
end
- if settings.limits.max_interval > 0 then
+
+ if last_collection > 0 and settings.limits.max_interval > 0 then
if now - last_collection > settings.limits.max_interval then
need_collect = true
- reason = string.format('limit of time since last collection has been reached: %d seconds passed',
- (now - last_collection) - settings.limits.max_interval)
+ reason = string.format('limit of time since last collection has been reached: %d seconds passed ' ..
+ '(%d seconds trigger)',
+ (now - last_collection), settings.limits.max_interval)
end
end
+
if settings.limits.max_memory > 0 then
if used_memory >= settings.limits.max_memory then
need_collect = true
@@ -950,14 +959,22 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now)
end
end
+ if last_collection == 0 then
+ last_collection = now
+ end
+
if need_collect then
- clickhouse_send_data(nil, ev_base, reason)
+ -- Do it atomic
+ local saved_rows = data_rows
+ local saved_custom = custom_rows
nrows = 0
last_collection = now
used_memory = 0
data_rows = {}
custom_rows = {}
+ clickhouse_send_data(nil, ev_base, reason, saved_rows, saved_custom)
+
if settings.collect_garbadge then
collectgarbage()
end
@@ -1243,7 +1260,21 @@ if opts then
})
rspamd_config:register_finish_script(function(task)
if nrows > 0 then
- clickhouse_send_data(task, nil, 'final collection')
+ final_call = true
+ local saved_rows = data_rows
+ local saved_custom = custom_rows
+
+ nrows = 0
+ data_rows = {}
+ used_memory = 0
+ custom_rows = {}
+
+ clickhouse_send_data(task, nil, 'final collection',
+ saved_rows, saved_custom)
+
+ if settings.collect_garbadge then
+ collectgarbage()
+ end
end
end)
-- Create tables on load