aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-10-31 15:26:20 +0000
committerGitHub <noreply@github.com>2016-10-31 15:26:20 +0000
commit27df9aa32df070bffec7ab27ceb9069f3e193560 (patch)
treea0345eed87cc8e98a709390eab68b78f72f106e5 /src
parent569c024acdb4a7eaff33cdaaefc1ae25764c2ae2 (diff)
parent5cb8d06186a1a38f72c5be56cfb48a840e52baca (diff)
downloadrspamd-27df9aa32df070bffec7ab27ceb9069f3e193560.tar.gz
rspamd-27df9aa32df070bffec7ab27ceb9069f3e193560.zip
Merge pull request #1082 from fatalbanana/exporter
[Minor] Improve metric exporter scheduling mechanism
Diffstat (limited to 'src')
-rw-r--r--src/plugins/lua/metric_exporter.lua71
1 files changed, 54 insertions, 17 deletions
diff --git a/src/plugins/lua/metric_exporter.lua b/src/plugins/lua/metric_exporter.lua
index 116dfbafd..c4a0cdc0e 100644
--- a/src/plugins/lua/metric_exporter.lua
+++ b/src/plugins/lua/metric_exporter.lua
@@ -22,7 +22,6 @@ local tcp = require "rspamd_tcp"
local pool = mempool.create()
local settings = {
- poll = 30,
interval = 120,
timeout = 15,
}
@@ -88,7 +87,12 @@ local function graphite_config(opts)
end
local function graphite_push(kwargs)
- local stamp = math.floor(kwargs['time'])
+ local stamp
+ if kwargs['time'] then
+ stamp = math.floor(kwargs['time'])
+ else
+ stamp = math.floor(util.get_time())
+ end
local metrics_str = ''
for _, v in ipairs(settings['metrics']) do
local mname = string.format('%s.%s', settings['metric_prefix'], v:gsub(' ', '_'))
@@ -147,7 +151,9 @@ end
if not configure_metric_exporter() then return end
rspamd_config:add_on_load(function (cfg, ev_base, worker)
+ -- Exit unless we're the first 'normal' worker
if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end
+ -- Persist mempool variable to statefile on shutdown
rspamd_config:register_finish_script(function (task)
local stamp = pool:get_variable(VAR_NAME, 'double')
if not stamp then
@@ -164,27 +170,58 @@ rspamd_config:add_on_load(function (cfg, ev_base, worker)
f:close()
end
end)
+ -- Push metrics to backend
+ function push_metrics(worker, time)
+ logger.infox('Pushing metrics to %s backend', settings['backend'])
+ local args = {
+ ev_base = ev_base,
+ stats = worker:get_stat(),
+ }
+ if time then
+ table.insert(args, time)
+ end
+ backends[settings['backend']]['push'](args)
+ end
+ -- Push metrics to backend and reschedule check
+ function schedule_intermediate_push(when)
+ rspamd_config:add_periodic(ev_base, when, function (cfg, ev_base)
+ push_metrics(worker)
+ schedule_regular_push()
+ return false
+ end)
+ end
+ -- Push metrics at regular intervals
+ function schedule_regular_push()
+ rspamd_config:add_periodic(ev_base, settings['interval'], function (cfg, ev_base)
+ push_metrics(worker)
+ return true
+ end)
+ end
+ -- Try read statefile on startup
+ local stamp
local f, err = io.open(settings['statefile'], 'r')
if err then
logger.errx('Failed to open statefile: %s', err)
end
if f then
io.input(f)
- local stamp = tonumber(io.read())
+ stamp = tonumber(io.read())
pool:set_variable(VAR_NAME, stamp)
end
- rspamd_config:add_periodic(ev_base, settings['poll'], function (cfg, ev_base)
- logger.debug('Checking if metrics need to be pushed')
- local last_push = pool:get_variable(VAR_NAME, 'double')
- local time = util.get_time()
- if (not last_push) or ((time-last_push) >= settings['interval']) then
- logger.infox('Pushing metrics to %s backend', settings['backend'])
- backends[settings['backend']]['push']({
- ev_base = ev_base,
- stats = worker:get_stat(),
- time = time,
- })
- end
- return true
- end)
+ if not stamp then
+ logger.debug('No state found - pushing stats immediately')
+ push_metrics(worker)
+ schedule_regular_push()
+ return
+ end
+ local time = util.get_time()
+ local delta = stamp - time + settings['interval']
+ if delta <= 0 then
+ logger.debug('Last push is too old - pushing stats immediately')
+ push_metrics(worker, time)
+ schedule_regular_push()
+ return
+ end
+ logger.debugx('Scheduling next push in %s seconds', delta)
+ schedule_intermediate_push(delta)
end)