From efe8fd4efe09768748c570fde46d1fa20bef5d1b Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 16 Apr 2018 14:58:55 +0100 Subject: [Feature] Implement cluster-aware bayes expiry --- src/plugins/lua/bayes_expiry.lua | 65 +++++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 18 deletions(-) (limited to 'src/plugins/lua/bayes_expiry.lua') diff --git a/src/plugins/lua/bayes_expiry.lua b/src/plugins/lua/bayes_expiry.lua index aeedf5f17..77ec2e527 100644 --- a/src/plugins/lua/bayes_expiry.lua +++ b/src/plugins/lua/bayes_expiry.lua @@ -22,6 +22,7 @@ end local N = 'bayes_expiry' local E = {} local logger = require "rspamd_logger" +local rspamd_util = require "rspamd_util" local lutil = require "lua_util" local lredis = require "lua_redis" @@ -147,6 +148,8 @@ template.common_ttl = settings.common_ttl template.epsilon_common = settings.epsilon_common template.significant_factor = settings.significant_factor template.lazy = settings.lazy +template.expire_step = settings.interval +template.hostname = rspamd_util.get_hostname() for k,v in pairs(template) do template[k] = tostring(v) @@ -159,7 +162,28 @@ end -- returns new cursor local expiry_script = [[ local expire = math.floor(KEYS[2]) - local ret = redis.call('SCAN', KEYS[3], 'MATCH', KEYS[1], 'COUNT', '${count}') + local lock_key = redis.sha1hex(KEYS[1]) .. '_lock' -- Check locking + local lock = redis.call('GET', lock_key) + + if lock then + if lock ~= ${hostname} then + return 'locked by ' .. lock + end + end + + redis.replicate_commands() + redis.call('SETEX', lock_key, ${expire_step}, '${hostname}') + + local cursor_key = redis.sha1hex(KEYS[1]) .. '_cursor' + local cursor = redis.call('GET', cursor_key) + + if not cursor then + cursor = 0 + else + cursor = tonumber(cursor) + end + + local ret = redis.call('SCAN', cursor, 'MATCH', KEYS[1], 'COUNT', '${count}') local next = ret[1] local keys = ret[2] local nelts = 0 @@ -187,7 +211,6 @@ local expiry_script = [[ sum_squares = sum_squares + total * total nelts = nelts + 1 end - redis.replicate_commands() local mean, stddev = 0, 0 @@ -234,6 +257,9 @@ local expiry_script = [[ end end + redis.call('SETEX', cursor_key, ${expire_step} * 10, tostring(next)) + redis.call('DEL', lock_key) + return {next, nelts, extended, discriminated, mean, stddev, common, significant, infrequent, ttls_set} ]] @@ -266,22 +292,23 @@ local function expire_step(cls, ev_base, worker) local infrequent_action = (cls.expiry < 0) and 'made persistent' or 'ttls set' local d = cycle and { - 'cycle in ' .. step .. ' steps', mode, c_data[1], - c_data[7], c_data[2], significant_action, - c_data[6], c_data[3], - c_data[8], c_data[9], infrequent_action, - math.floor(.5 + c_data[4] / c_data[1]), - math.floor(.5 + math.sqrt(c_data[5] / c_data[1])) + 'cycle in ' .. step .. ' steps', mode, c_data[1], + c_data[7], c_data[2], significant_action, + c_data[6], c_data[3], + c_data[8], c_data[9], infrequent_action, + math.floor(.5 + c_data[4] / c_data[1]), + math.floor(.5 + math.sqrt(c_data[5] / c_data[1])) } or { - 'step ' .. step, mode, data[1], - data[7], data[2], significant_action, - data[6], data[3], - data[8], data[9], infrequent_action, - data[4], - data[5] + 'step ' .. step, mode, data[1], + data[7], data[2], significant_action, + data[6], data[3], + data[8], data[9], infrequent_action, + data[4], + data[5] } - logger.infox(rspamd_config, 'finished expiry %s%s: %s items checked, %s significant (%s %s), %s common (%s discriminated), %s infrequent (%s %s), %s mean, %s std', - lutil.unpack(d)) + logger.infox(rspamd_config, + [[finished expiry %s%s: %s items checked, %s significant (%s %s), %s common (%s discriminated), %s infrequent (%s %s), %s mean, %s std]], + lutil.unpack(d)) end log_stat(false) if cur == 0 then @@ -289,12 +316,14 @@ local function expire_step(cls, ev_base, worker) c_data = {0,0,0,0,0,0,0,0,0}; step = 0 end + elseif type(data) == 'string' then + logger.infox(rspamd_config, 'skip expiry step: %s', data) end end lredis.exec_redis_script(cls.script, {ev_base = ev_base, is_write = true}, redis_step_cb, - {'RS*_*', cls.expiry, cur} + {'RS*_*', cls.expiry} ) end @@ -331,7 +360,7 @@ rspamd_config:add_on_load(function (_, ev_base, worker) -- Expire tokens at regular intervals for _,cls in ipairs(settings.classifiers) do rspamd_config:add_periodic(ev_base, - settings['interval'] * (tonumber(settings.cluster_nodes) + 1), + settings['interval'], function () expire_step(cls, ev_base, worker) return true -- cgit v1.2.3