summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-04-16 14:58:55 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-04-16 14:58:55 +0100
commitefe8fd4efe09768748c570fde46d1fa20bef5d1b (patch)
tree07bdaca72faff5db34330483c3d5832000dcdfad
parenta3d5365b47f31be10fa4703e365e83d7f467e962 (diff)
downloadrspamd-efe8fd4efe09768748c570fde46d1fa20bef5d1b.tar.gz
rspamd-efe8fd4efe09768748c570fde46d1fa20bef5d1b.zip
[Feature] Implement cluster-aware bayes expiry
-rw-r--r--src/plugins/lua/bayes_expiry.lua65
1 files changed, 47 insertions, 18 deletions
diff --git a/src/plugins/lua/bayes_expiry.lua b/src/plugins/lua/bayes_expiry.lua
index aeedf5f17..77ec2e527 100644
--- a/src/plugins/lua/bayes_expiry.lua
+++ b/src/plugins/lua/bayes_expiry.lua
@@ -22,6 +22,7 @@ end
local N = 'bayes_expiry'
local E = {}
local logger = require "rspamd_logger"
+local rspamd_util = require "rspamd_util"
local lutil = require "lua_util"
local lredis = require "lua_redis"
@@ -147,6 +148,8 @@ template.common_ttl = settings.common_ttl
template.epsilon_common = settings.epsilon_common
template.significant_factor = settings.significant_factor
template.lazy = settings.lazy
+template.expire_step = settings.interval
+template.hostname = rspamd_util.get_hostname()
for k,v in pairs(template) do
template[k] = tostring(v)
@@ -159,7 +162,28 @@ end
-- returns new cursor
local expiry_script = [[
local expire = math.floor(KEYS[2])
- local ret = redis.call('SCAN', KEYS[3], 'MATCH', KEYS[1], 'COUNT', '${count}')
+ local lock_key = redis.sha1hex(KEYS[1]) .. '_lock' -- Check locking
+ local lock = redis.call('GET', lock_key)
+
+ if lock then
+ if lock ~= ${hostname} then
+ return 'locked by ' .. lock
+ end
+ end
+
+ redis.replicate_commands()
+ redis.call('SETEX', lock_key, ${expire_step}, '${hostname}')
+
+ local cursor_key = redis.sha1hex(KEYS[1]) .. '_cursor'
+ local cursor = redis.call('GET', cursor_key)
+
+ if not cursor then
+ cursor = 0
+ else
+ cursor = tonumber(cursor)
+ end
+
+ local ret = redis.call('SCAN', cursor, 'MATCH', KEYS[1], 'COUNT', '${count}')
local next = ret[1]
local keys = ret[2]
local nelts = 0
@@ -187,7 +211,6 @@ local expiry_script = [[
sum_squares = sum_squares + total * total
nelts = nelts + 1
end
- redis.replicate_commands()
local mean, stddev = 0, 0
@@ -234,6 +257,9 @@ local expiry_script = [[
end
end
+ redis.call('SETEX', cursor_key, ${expire_step} * 10, tostring(next))
+ redis.call('DEL', lock_key)
+
return {next, nelts, extended, discriminated, mean, stddev, common, significant, infrequent, ttls_set}
]]
@@ -266,22 +292,23 @@ local function expire_step(cls, ev_base, worker)
local infrequent_action = (cls.expiry < 0) and 'made persistent' or 'ttls set'
local d = cycle and {
- 'cycle in ' .. step .. ' steps', mode, c_data[1],
- c_data[7], c_data[2], significant_action,
- c_data[6], c_data[3],
- c_data[8], c_data[9], infrequent_action,
- math.floor(.5 + c_data[4] / c_data[1]),
- math.floor(.5 + math.sqrt(c_data[5] / c_data[1]))
+ 'cycle in ' .. step .. ' steps', mode, c_data[1],
+ c_data[7], c_data[2], significant_action,
+ c_data[6], c_data[3],
+ c_data[8], c_data[9], infrequent_action,
+ math.floor(.5 + c_data[4] / c_data[1]),
+ math.floor(.5 + math.sqrt(c_data[5] / c_data[1]))
} or {
- 'step ' .. step, mode, data[1],
- data[7], data[2], significant_action,
- data[6], data[3],
- data[8], data[9], infrequent_action,
- data[4],
- data[5]
+ 'step ' .. step, mode, data[1],
+ data[7], data[2], significant_action,
+ data[6], data[3],
+ data[8], data[9], infrequent_action,
+ data[4],
+ data[5]
}
- logger.infox(rspamd_config, 'finished expiry %s%s: %s items checked, %s significant (%s %s), %s common (%s discriminated), %s infrequent (%s %s), %s mean, %s std',
- lutil.unpack(d))
+ logger.infox(rspamd_config,
+ [[finished expiry %s%s: %s items checked, %s significant (%s %s), %s common (%s discriminated), %s infrequent (%s %s), %s mean, %s std]],
+ lutil.unpack(d))
end
log_stat(false)
if cur == 0 then
@@ -289,12 +316,14 @@ local function expire_step(cls, ev_base, worker)
c_data = {0,0,0,0,0,0,0,0,0};
step = 0
end
+ elseif type(data) == 'string' then
+ logger.infox(rspamd_config, 'skip expiry step: %s', data)
end
end
lredis.exec_redis_script(cls.script,
{ev_base = ev_base, is_write = true},
redis_step_cb,
- {'RS*_*', cls.expiry, cur}
+ {'RS*_*', cls.expiry}
)
end
@@ -331,7 +360,7 @@ rspamd_config:add_on_load(function (_, ev_base, worker)
-- Expire tokens at regular intervals
for _,cls in ipairs(settings.classifiers) do
rspamd_config:add_periodic(ev_base,
- settings['interval'] * (tonumber(settings.cluster_nodes) + 1),
+ settings['interval'],
function ()
expire_step(cls, ev_base, worker)
return true