diff options
Diffstat (limited to 'src/plugins/lua')
-rw-r--r-- | src/plugins/lua/clickhouse.lua | 47 |
1 files changed, 39 insertions, 8 deletions
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index 878a15064..c4e30f1db 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -32,6 +32,7 @@ local custom_rows = {} local nrows = 0 local used_memory = 0 local last_collection = 0 +local final_call = false -- If the final collection has been started local schema_version = 8 -- Current schema version local settings = { @@ -375,7 +376,7 @@ local function clickhouse_check_symbol(task, settings_field_name, fields_table, return false end -local function clickhouse_send_data(task, ev_base, why) +local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows) local log_object = task or rspamd_config local upstream = settings.upstream:get_upstream_round_robin() local ip_addr = upstream:get_addr():to_string(true) @@ -429,11 +430,11 @@ local function clickhouse_send_data(task, ev_base, why) clickhouse_groups_row(fields) end - send_data('generic data', data_rows, + send_data('generic data', gen_rows, string.format('INSERT INTO rspamd (%s)', table.concat(fields, ','))) - for k,crows in pairs(custom_rows) do + for k,crows in pairs(cust_rows) do if #crows > 1 then send_data('custom data ('..k..')', crows, settings.custom_rules[k].first_row()) @@ -929,19 +930,27 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now) return settings.check_timeout end + if final_call then + lua_util.debugm(N, cfg, "no need to send data, final call has been issued") + return 0 + end + if settings.limits.max_rows > 0 then if nrows > settings.limits.max_rows then need_collect = true reason = string.format('limit of rows has been reached: %d', nrows) end end - if settings.limits.max_interval > 0 then + + if last_collection > 0 and settings.limits.max_interval > 0 then if now - last_collection > settings.limits.max_interval then need_collect = true - reason = string.format('limit of time since last collection has been reached: %d seconds passed', - (now - last_collection) - settings.limits.max_interval) + reason = string.format('limit of time since last collection has been reached: %d seconds passed ' .. + '(%d seconds trigger)', + (now - last_collection), settings.limits.max_interval) end end + if settings.limits.max_memory > 0 then if used_memory >= settings.limits.max_memory then need_collect = true @@ -950,14 +959,22 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now) end end + if last_collection == 0 then + last_collection = now + end + if need_collect then - clickhouse_send_data(nil, ev_base, reason) + -- Do it atomic + local saved_rows = data_rows + local saved_custom = custom_rows nrows = 0 last_collection = now used_memory = 0 data_rows = {} custom_rows = {} + clickhouse_send_data(nil, ev_base, reason, saved_rows, saved_custom) + if settings.collect_garbadge then collectgarbage() end @@ -1243,7 +1260,21 @@ if opts then }) rspamd_config:register_finish_script(function(task) if nrows > 0 then - clickhouse_send_data(task, nil, 'final collection') + final_call = true + local saved_rows = data_rows + local saved_custom = custom_rows + + nrows = 0 + data_rows = {} + used_memory = 0 + custom_rows = {} + + clickhouse_send_data(task, nil, 'final collection', + saved_rows, saved_custom) + + if settings.collect_garbadge then + collectgarbage() + end end end) -- Create tables on load |