From d43d9b9a3b9ed044f8e208bc2cb08f4f11f61b6c Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 28 Feb 2018 13:42:53 +0000 Subject: [PATCH] [Feature] Implement buckets for Redis backend --- src/plugins/lua/reputation.lua | 122 +++++++++++++++++++++++++-------- 1 file changed, 93 insertions(+), 29 deletions(-) diff --git a/src/plugins/lua/reputation.lua b/src/plugins/lua/reputation.lua index a4511a09e..5ab554220 100644 --- a/src/plugins/lua/reputation.lua +++ b/src/plugins/lua/reputation.lua @@ -28,6 +28,7 @@ local rspamd_util = require "rspamd_util" local lua_util = require "lua_util" local lua_maps = require "lua_maps" local hash = require 'rspamd_cryptobox_hash' +local lua_redis = require "lua_redis" local fun = require "fun" local redis_params = nil local default_expiry = 864000 -- 10 day by default @@ -652,6 +653,81 @@ local function reputation_dns_get_token(task, rule, token, continuation_cb) }) end +local function reputation_redis_init(rule, cfg, ev_base, worker) + if not redis_params then + return false + end + -- Init scripts for buckets + local redis_get_script_tpl = [[ +local key = KEYS[1] .. ${name} +local vals = redis.call('HGETALL', key) +for i=1,#vals,2 do + local k = vals[i] + local v = vals[i + 1] + if scores[k] then + scores[k] = scores[k] + tonumber(v) * ${mult} + else + scores[k] = tonumber(v) * ${mult} + end +end +]] + local redis_script_tbl = {'local scores = {}'} + for _,bucket in ipairs(rule.backend.config.buckets) do + table.insert(redis_script_tbl, lua_util.template(redis_get_script_tpl, + fun.totable(fun.map(tostring, bucket)))) + end + table.insert(redis_script_tbl, [[ + local result = {} + for k,v in pairs(scores) do + table.insert(result, k) + table.insert(result, v) + end + + return result +]]) + rule.backend.script_get = lua_redis.add_redis_script(table.concat(redis_script_tbl, '\n'), + redis_params) + + redis_script_tbl = {} + local redis_set_script_tpl = [[ +local key = KEYS[1] .. ${name} +local last = tonumber(redis.call('HGET', key, 'start')) +local now = tonumber(KEYS[2]) +if not last then + last = 0 +end +local discriminate_bucket = false +if now - last > ${time} then + discriminate_bucket = true + redis.call('HSET', key, 'start', now) +end +for i=1,#ARGV,2 do + local k = ARGV[i] + local v = tonumber(ARGV[i + 1]) + + if discriminate_bucket then + local last_value = redis.call('HGET', key, k) + if last_value then + redis.call('HSET', key, k, last_value / 2.0) + end + end + redis.call('HINCRBYFLOAT', key, k, v) +end + +redis.call('EXPIRE', key, KEYS[3]) +redis.call('HSET', key, 'last', now) +]] + for _,bucket in ipairs(rule.backend.config.buckets) do + table.insert(redis_script_tbl, lua_util.template(redis_set_script_tpl, + fun.totable(fun.map(tostring, bucket)))) + end + + rule.backend.script_set = lua_redis.add_redis_script(table.concat(redis_script_tbl, '\n'), + redis_params) + + return true +end + local function reputation_redis_get_token(task, rule, token, continuation_cb) local key = gen_token_key(token, rule) @@ -681,14 +757,10 @@ local function reputation_redis_get_token(task, rule, token, continuation_cb) end end - local ret = rspamd_redis_make_request(task, - redis_params, -- connect params - key, -- hash key - false, -- is write - redis_get_cb, --callback - 'HGETALL', -- command - {key} -- arguments - ) + local ret = lua_redis.exec_redis_script(rule.backend.script_get, + {task = task, is_write = false}, + redis_get_cb, + {token}) if not ret then rspamd_logger.errx(task, 'cannot make redis request to check results') end @@ -697,8 +769,6 @@ end local function reputation_redis_set_token(task, rule, token, values, continuation_cb) local key = gen_token_key(token, rule) - local ret,conn - local function redis_set_cb(err, data) if err then rspamd_logger.errx(task, 'got error while setting reputation keys %s: %s', @@ -714,24 +784,17 @@ local function reputation_redis_set_token(task, rule, token, values, continuatio end -- We start from expiry update - ret,conn = rspamd_redis_make_request(task, - redis_params, -- connect params - nil, -- hash key - true, -- is write - redis_set_cb, --callback - 'EXPIRE', -- command - {key, tostring(rule.backend.config.expiry)} -- arguments - ) - -- Update greylisting record expire - if ret then - -- Here, we increment all hash keys that are listed in values - -- E.g. {'s': 1.0} or {'h': -1.0}, floating point allows better flexibility - fun.each(function(k, v) - conn:add_cmd('HINCRBYFLOAT', {key, tostring(k), tostring(v)}) - end, values) - -- Add last modification time (might be not very consistent between updates) - conn:add_cmd('HSET', {key, 'last', tostring(rspamd_util:get_calendar_ticks())}) - else + local args = {} + for k,v in pairs(values) do + table.insert(args, k) + table.insert(args, v) + end + local ret = lua_redis.exec_redis_script(rule.backend.script_get, + {task = task, is_write = true}, + redis_set_cb, + {token, tostring(rspamd_util:get_calendar_ticks()), + tostring(rule.backend.config.expiry)}, args) + if not ret then rspamd_logger.errx(task, 'got error while connecting to redis') end end @@ -751,11 +814,12 @@ local backends = { { time = 60 * 60, name = '1h', + mult = 1.5, }, { time = 60 * 60 * 24 * 30, name = '1m', - + mult = 1.0, } }, -- What buckets should be used, default 1h and 1month }, -- 2.39.5