local nrows = 0
local used_memory = 0
local last_collection = 0
+local final_call = false -- If the final collection has been started
local schema_version = 8 -- Current schema version
local settings = {
return false
end
-local function clickhouse_send_data(task, ev_base, why)
+local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows)
local log_object = task or rspamd_config
local upstream = settings.upstream:get_upstream_round_robin()
local ip_addr = upstream:get_addr():to_string(true)
clickhouse_groups_row(fields)
end
- send_data('generic data', data_rows,
+ send_data('generic data', gen_rows,
string.format('INSERT INTO rspamd (%s)',
table.concat(fields, ',')))
- for k,crows in pairs(custom_rows) do
+ for k,crows in pairs(cust_rows) do
if #crows > 1 then
send_data('custom data ('..k..')', crows,
settings.custom_rules[k].first_row())
return settings.check_timeout
end
+ if final_call then
+ lua_util.debugm(N, cfg, "no need to send data, final call has been issued")
+ return 0
+ end
+
if settings.limits.max_rows > 0 then
if nrows > settings.limits.max_rows then
need_collect = true
reason = string.format('limit of rows has been reached: %d', nrows)
end
end
- if settings.limits.max_interval > 0 then
+
+ if last_collection > 0 and settings.limits.max_interval > 0 then
if now - last_collection > settings.limits.max_interval then
need_collect = true
reason = string.format('limit of time since last collection has been reached: %d seconds passed',
(now - last_collection) - settings.limits.max_interval)
end
end
+
if settings.limits.max_memory > 0 then
if used_memory >= settings.limits.max_memory then
need_collect = true
end
end
+ if last_collection == 0 then
+ last_collection = now
+ end
+
if need_collect then
- clickhouse_send_data(nil, ev_base, reason)
+ -- Do it atomic
+ local saved_rows = data_rows
+ local saved_custom = custom_rows
nrows = 0
last_collection = now
used_memory = 0
data_rows = {}
custom_rows = {}
+ clickhouse_send_data(nil, ev_base, reason, saved_rows, saved_custom)
+
if settings.collect_garbadge then
collectgarbage()
end
})
rspamd_config:register_finish_script(function(task)
if nrows > 0 then
- clickhouse_send_data(task, nil, 'final collection')
+ final_call = true
+ local saved_rows = data_rows
+ local saved_custom = custom_rows
+
+ nrows = 0
+ data_rows = {}
+ used_memory = 0
+ custom_rows = {}
+
+ clickhouse_send_data(task, nil, 'final collection',
+ saved_rows, saved_custom)
+
+ if settings.collect_garbadge then
+ collectgarbage()
+ end
end
end)
-- Create tables on load