]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Implement cluster-aware bayes expiry
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 16 Apr 2018 13:58:55 +0000 (14:58 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 16 Apr 2018 13:58:55 +0000 (14:58 +0100)
src/plugins/lua/bayes_expiry.lua

index aeedf5f17397f887b0849d9589547efcdad75dc8..77ec2e52708c12b2a7e996abffd6e794918d0449 100644 (file)
@@ -22,6 +22,7 @@ end
 local N = 'bayes_expiry'
 local E = {}
 local logger = require "rspamd_logger"
+local rspamd_util = require "rspamd_util"
 local lutil = require "lua_util"
 local lredis = require "lua_redis"
 
@@ -147,6 +148,8 @@ template.common_ttl = settings.common_ttl
 template.epsilon_common = settings.epsilon_common
 template.significant_factor = settings.significant_factor
 template.lazy = settings.lazy
+template.expire_step = settings.interval
+template.hostname = rspamd_util.get_hostname()
 
 for k,v in pairs(template) do
   template[k] = tostring(v)
@@ -159,7 +162,28 @@ end
 -- returns new cursor
 local expiry_script = [[
   local expire = math.floor(KEYS[2])
-  local ret = redis.call('SCAN', KEYS[3], 'MATCH', KEYS[1], 'COUNT', '${count}')
+  local lock_key = redis.sha1hex(KEYS[1]) .. '_lock' -- Check locking
+  local lock = redis.call('GET', lock_key)
+
+  if lock then
+    if lock ~= ${hostname} then
+      return 'locked by ' .. lock
+    end
+  end
+
+  redis.replicate_commands()
+  redis.call('SETEX', lock_key, ${expire_step}, '${hostname}')
+
+  local cursor_key = redis.sha1hex(KEYS[1]) .. '_cursor'
+  local cursor = redis.call('GET', cursor_key)
+
+  if not cursor then
+    cursor = 0
+  else
+    cursor = tonumber(cursor)
+  end
+
+  local ret = redis.call('SCAN', cursor, 'MATCH', KEYS[1], 'COUNT', '${count}')
   local next = ret[1]
   local keys = ret[2]
   local nelts = 0
@@ -187,7 +211,6 @@ local expiry_script = [[
     sum_squares = sum_squares + total * total
     nelts = nelts + 1
   end
-  redis.replicate_commands()
 
   local mean, stddev = 0, 0
 
@@ -234,6 +257,9 @@ local expiry_script = [[
     end
   end
 
+  redis.call('SETEX', cursor_key, ${expire_step} * 10, tostring(next))
+  redis.call('DEL', lock_key)
+
   return {next, nelts, extended, discriminated, mean, stddev, common, significant, infrequent, ttls_set}
 ]]
 
@@ -266,22 +292,23 @@ local function expire_step(cls, ev_base, worker)
         local infrequent_action = (cls.expiry < 0) and 'made persistent' or 'ttls set'
 
         local d = cycle and {
-            'cycle in ' .. step .. ' steps', mode, c_data[1],
-            c_data[7], c_data[2], significant_action,
-            c_data[6], c_data[3],
-            c_data[8], c_data[9], infrequent_action,
-            math.floor(.5 + c_data[4] / c_data[1]),
-            math.floor(.5 + math.sqrt(c_data[5] / c_data[1]))
+          'cycle in ' .. step .. ' steps', mode, c_data[1],
+          c_data[7], c_data[2], significant_action,
+          c_data[6], c_data[3],
+          c_data[8], c_data[9], infrequent_action,
+          math.floor(.5 + c_data[4] / c_data[1]),
+          math.floor(.5 + math.sqrt(c_data[5] / c_data[1]))
         } or {
-            'step ' .. step, mode, data[1],
-            data[7], data[2], significant_action,
-            data[6], data[3],
-            data[8], data[9], infrequent_action,
-            data[4],
-            data[5]
+          'step ' .. step, mode, data[1],
+          data[7], data[2], significant_action,
+          data[6], data[3],
+          data[8], data[9], infrequent_action,
+          data[4],
+          data[5]
         }
-        logger.infox(rspamd_config, 'finished expiry %s%s: %s items checked, %s significant (%s %s), %s common (%s discriminated), %s infrequent (%s %s), %s mean, %s std',
-            lutil.unpack(d))
+        logger.infox(rspamd_config,
+                [[finished expiry %s%s: %s items checked, %s significant (%s %s), %s common (%s discriminated), %s infrequent (%s %s), %s mean, %s std]],
+                lutil.unpack(d))
       end
       log_stat(false)
       if cur == 0 then
@@ -289,12 +316,14 @@ local function expire_step(cls, ev_base, worker)
         c_data = {0,0,0,0,0,0,0,0,0};
         step = 0
       end
+    elseif type(data) == 'string' then
+      logger.infox(rspamd_config, 'skip expiry step: %s', data)
     end
   end
   lredis.exec_redis_script(cls.script,
       {ev_base = ev_base, is_write = true},
       redis_step_cb,
-      {'RS*_*', cls.expiry, cur}
+      {'RS*_*', cls.expiry}
   )
 end
 
@@ -331,7 +360,7 @@ rspamd_config:add_on_load(function (_, ev_base, worker)
   -- Expire tokens at regular intervals
   for _,cls in ipairs(settings.classifiers) do
     rspamd_config:add_periodic(ev_base,
-        settings['interval'] * (tonumber(settings.cluster_nodes) + 1),
+        settings['interval'],
         function ()
           expire_step(cls, ev_base, worker)
           return true