diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-10-31 15:26:20 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-10-31 15:26:20 +0000 |
commit | 27df9aa32df070bffec7ab27ceb9069f3e193560 (patch) | |
tree | a0345eed87cc8e98a709390eab68b78f72f106e5 /src | |
parent | 569c024acdb4a7eaff33cdaaefc1ae25764c2ae2 (diff) | |
parent | 5cb8d06186a1a38f72c5be56cfb48a840e52baca (diff) | |
download | rspamd-27df9aa32df070bffec7ab27ceb9069f3e193560.tar.gz rspamd-27df9aa32df070bffec7ab27ceb9069f3e193560.zip |
Merge pull request #1082 from fatalbanana/exporter
[Minor] Improve metric exporter scheduling mechanism
Diffstat (limited to 'src')
-rw-r--r-- | src/plugins/lua/metric_exporter.lua | 71 |
1 files changed, 54 insertions, 17 deletions
diff --git a/src/plugins/lua/metric_exporter.lua b/src/plugins/lua/metric_exporter.lua index 116dfbafd..c4a0cdc0e 100644 --- a/src/plugins/lua/metric_exporter.lua +++ b/src/plugins/lua/metric_exporter.lua @@ -22,7 +22,6 @@ local tcp = require "rspamd_tcp" local pool = mempool.create() local settings = { - poll = 30, interval = 120, timeout = 15, } @@ -88,7 +87,12 @@ local function graphite_config(opts) end local function graphite_push(kwargs) - local stamp = math.floor(kwargs['time']) + local stamp + if kwargs['time'] then + stamp = math.floor(kwargs['time']) + else + stamp = math.floor(util.get_time()) + end local metrics_str = '' for _, v in ipairs(settings['metrics']) do local mname = string.format('%s.%s', settings['metric_prefix'], v:gsub(' ', '_')) @@ -147,7 +151,9 @@ end if not configure_metric_exporter() then return end rspamd_config:add_on_load(function (cfg, ev_base, worker) + -- Exit unless we're the first 'normal' worker if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end + -- Persist mempool variable to statefile on shutdown rspamd_config:register_finish_script(function (task) local stamp = pool:get_variable(VAR_NAME, 'double') if not stamp then @@ -164,27 +170,58 @@ rspamd_config:add_on_load(function (cfg, ev_base, worker) f:close() end end) + -- Push metrics to backend + function push_metrics(worker, time) + logger.infox('Pushing metrics to %s backend', settings['backend']) + local args = { + ev_base = ev_base, + stats = worker:get_stat(), + } + if time then + table.insert(args, time) + end + backends[settings['backend']]['push'](args) + end + -- Push metrics to backend and reschedule check + function schedule_intermediate_push(when) + rspamd_config:add_periodic(ev_base, when, function (cfg, ev_base) + push_metrics(worker) + schedule_regular_push() + return false + end) + end + -- Push metrics at regular intervals + function schedule_regular_push() + rspamd_config:add_periodic(ev_base, settings['interval'], function (cfg, ev_base) + push_metrics(worker) + return true + end) + end + -- Try read statefile on startup + local stamp local f, err = io.open(settings['statefile'], 'r') if err then logger.errx('Failed to open statefile: %s', err) end if f then io.input(f) - local stamp = tonumber(io.read()) + stamp = tonumber(io.read()) pool:set_variable(VAR_NAME, stamp) end - rspamd_config:add_periodic(ev_base, settings['poll'], function (cfg, ev_base) - logger.debug('Checking if metrics need to be pushed') - local last_push = pool:get_variable(VAR_NAME, 'double') - local time = util.get_time() - if (not last_push) or ((time-last_push) >= settings['interval']) then - logger.infox('Pushing metrics to %s backend', settings['backend']) - backends[settings['backend']]['push']({ - ev_base = ev_base, - stats = worker:get_stat(), - time = time, - }) - end - return true - end) + if not stamp then + logger.debug('No state found - pushing stats immediately') + push_metrics(worker) + schedule_regular_push() + return + end + local time = util.get_time() + local delta = stamp - time + settings['interval'] + if delta <= 0 then + logger.debug('Last push is too old - pushing stats immediately') + push_metrics(worker, time) + schedule_regular_push() + return + end + logger.debugx('Scheduling next push in %s seconds', delta) + schedule_intermediate_push(delta) end) |