]> source.dussan.org Git - rspamd.git/commitdiff
[Minor] Improve metric exporter scheduling mechanism 1082/head
authorAndrew Lewis <nerf@judo.za.org>
Mon, 31 Oct 2016 08:23:53 +0000 (10:23 +0200)
committerAndrew Lewis <nerf@judo.za.org>
Mon, 31 Oct 2016 09:19:03 +0000 (11:19 +0200)
src/plugins/lua/metric_exporter.lua

index 116dfbafdc039c9637292049f7a4ace1a15b76b2..c4a0cdc0e6ed5217abd27a09377aecb1f31b4a6e 100644 (file)
@@ -22,7 +22,6 @@ local tcp = require "rspamd_tcp"
 
 local pool = mempool.create()
 local settings = {
-  poll = 30,
   interval = 120,
   timeout = 15,
 }
@@ -88,7 +87,12 @@ local function graphite_config(opts)
 end
 
 local function graphite_push(kwargs)
-  local stamp = math.floor(kwargs['time'])
+  local stamp
+  if kwargs['time'] then
+    stamp = math.floor(kwargs['time'])
+  else
+    stamp = math.floor(util.get_time())
+  end
   local metrics_str = ''
   for _, v in ipairs(settings['metrics']) do
     local mname = string.format('%s.%s', settings['metric_prefix'], v:gsub(' ', '_'))
@@ -147,7 +151,9 @@ end
 if not configure_metric_exporter() then return end
 
 rspamd_config:add_on_load(function (cfg, ev_base, worker)
+  -- Exit unless we're the first 'normal' worker
   if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end
+  -- Persist mempool variable to statefile on shutdown
   rspamd_config:register_finish_script(function (task)
     local stamp = pool:get_variable(VAR_NAME, 'double')
     if not stamp then
@@ -164,27 +170,58 @@ rspamd_config:add_on_load(function (cfg, ev_base, worker)
       f:close()
     end
   end)
+  -- Push metrics to backend
+  function push_metrics(worker, time)
+    logger.infox('Pushing metrics to %s backend', settings['backend'])
+    local args = {
+      ev_base = ev_base,
+      stats = worker:get_stat(),
+    }
+    if time then
+      table.insert(args, time)
+    end
+    backends[settings['backend']]['push'](args)
+  end
+  -- Push metrics to backend and reschedule check
+  function schedule_intermediate_push(when)
+    rspamd_config:add_periodic(ev_base, when, function (cfg, ev_base)
+      push_metrics(worker)
+      schedule_regular_push()
+      return false
+    end)
+  end
+  -- Push metrics at regular intervals
+  function schedule_regular_push()
+    rspamd_config:add_periodic(ev_base, settings['interval'], function (cfg, ev_base)
+      push_metrics(worker)
+      return true
+    end)
+  end
+  -- Try read statefile on startup
+  local stamp
   local f, err = io.open(settings['statefile'], 'r')
   if err then
     logger.errx('Failed to open statefile: %s', err)
   end
   if f then
     io.input(f)
-    local stamp = tonumber(io.read())
+    stamp = tonumber(io.read())
     pool:set_variable(VAR_NAME, stamp)
   end
-  rspamd_config:add_periodic(ev_base, settings['poll'], function (cfg, ev_base)
-    logger.debug('Checking if metrics need to be pushed')
-    local last_push = pool:get_variable(VAR_NAME, 'double')
-    local time = util.get_time()
-    if (not last_push) or ((time-last_push) >= settings['interval']) then
-      logger.infox('Pushing metrics to %s backend', settings['backend'])
-      backends[settings['backend']]['push']({
-        ev_base = ev_base,
-        stats = worker:get_stat(),
-        time = time,
-      })
-    end
-    return true
-  end)
+  if not stamp then
+    logger.debug('No state found - pushing stats immediately')
+    push_metrics(worker)
+    schedule_regular_push()
+    return
+  end
+  local time = util.get_time()
+  local delta = stamp - time + settings['interval']
+  if delta <= 0 then
+    logger.debug('Last push is too old - pushing stats immediately')
+    push_metrics(worker, time)
+    schedule_regular_push()
+    return
+  end
+  logger.debugx('Scheduling next push in %s seconds', delta)
+  schedule_intermediate_push(delta)
 end)