|
|
@@ -28,6 +28,7 @@ local rspamd_util = require "rspamd_util" |
|
|
|
local lua_util = require "lua_util" |
|
|
|
local lua_maps = require "lua_maps" |
|
|
|
local hash = require 'rspamd_cryptobox_hash' |
|
|
|
local lua_redis = require "lua_redis" |
|
|
|
local fun = require "fun" |
|
|
|
local redis_params = nil |
|
|
|
local default_expiry = 864000 -- 10 day by default |
|
|
@@ -652,6 +653,81 @@ local function reputation_dns_get_token(task, rule, token, continuation_cb) |
|
|
|
}) |
|
|
|
end |
|
|
|
|
|
|
|
local function reputation_redis_init(rule, cfg, ev_base, worker) |
|
|
|
if not redis_params then |
|
|
|
return false |
|
|
|
end |
|
|
|
-- Init scripts for buckets |
|
|
|
local redis_get_script_tpl = [[ |
|
|
|
local key = KEYS[1] .. ${name} |
|
|
|
local vals = redis.call('HGETALL', key) |
|
|
|
for i=1,#vals,2 do |
|
|
|
local k = vals[i] |
|
|
|
local v = vals[i + 1] |
|
|
|
if scores[k] then |
|
|
|
scores[k] = scores[k] + tonumber(v) * ${mult} |
|
|
|
else |
|
|
|
scores[k] = tonumber(v) * ${mult} |
|
|
|
end |
|
|
|
end |
|
|
|
]] |
|
|
|
local redis_script_tbl = {'local scores = {}'} |
|
|
|
for _,bucket in ipairs(rule.backend.config.buckets) do |
|
|
|
table.insert(redis_script_tbl, lua_util.template(redis_get_script_tpl, |
|
|
|
fun.totable(fun.map(tostring, bucket)))) |
|
|
|
end |
|
|
|
table.insert(redis_script_tbl, [[ |
|
|
|
local result = {} |
|
|
|
for k,v in pairs(scores) do |
|
|
|
table.insert(result, k) |
|
|
|
table.insert(result, v) |
|
|
|
end |
|
|
|
|
|
|
|
return result |
|
|
|
]]) |
|
|
|
rule.backend.script_get = lua_redis.add_redis_script(table.concat(redis_script_tbl, '\n'), |
|
|
|
redis_params) |
|
|
|
|
|
|
|
redis_script_tbl = {} |
|
|
|
local redis_set_script_tpl = [[ |
|
|
|
local key = KEYS[1] .. ${name} |
|
|
|
local last = tonumber(redis.call('HGET', key, 'start')) |
|
|
|
local now = tonumber(KEYS[2]) |
|
|
|
if not last then |
|
|
|
last = 0 |
|
|
|
end |
|
|
|
local discriminate_bucket = false |
|
|
|
if now - last > ${time} then |
|
|
|
discriminate_bucket = true |
|
|
|
redis.call('HSET', key, 'start', now) |
|
|
|
end |
|
|
|
for i=1,#ARGV,2 do |
|
|
|
local k = ARGV[i] |
|
|
|
local v = tonumber(ARGV[i + 1]) |
|
|
|
|
|
|
|
if discriminate_bucket then |
|
|
|
local last_value = redis.call('HGET', key, k) |
|
|
|
if last_value then |
|
|
|
redis.call('HSET', key, k, last_value / 2.0) |
|
|
|
end |
|
|
|
end |
|
|
|
redis.call('HINCRBYFLOAT', key, k, v) |
|
|
|
end |
|
|
|
|
|
|
|
redis.call('EXPIRE', key, KEYS[3]) |
|
|
|
redis.call('HSET', key, 'last', now) |
|
|
|
]] |
|
|
|
for _,bucket in ipairs(rule.backend.config.buckets) do |
|
|
|
table.insert(redis_script_tbl, lua_util.template(redis_set_script_tpl, |
|
|
|
fun.totable(fun.map(tostring, bucket)))) |
|
|
|
end |
|
|
|
|
|
|
|
rule.backend.script_set = lua_redis.add_redis_script(table.concat(redis_script_tbl, '\n'), |
|
|
|
redis_params) |
|
|
|
|
|
|
|
return true |
|
|
|
end |
|
|
|
|
|
|
|
local function reputation_redis_get_token(task, rule, token, continuation_cb) |
|
|
|
local key = gen_token_key(token, rule) |
|
|
|
|
|
|
@@ -681,14 +757,10 @@ local function reputation_redis_get_token(task, rule, token, continuation_cb) |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
local ret = rspamd_redis_make_request(task, |
|
|
|
redis_params, -- connect params |
|
|
|
key, -- hash key |
|
|
|
false, -- is write |
|
|
|
redis_get_cb, --callback |
|
|
|
'HGETALL', -- command |
|
|
|
{key} -- arguments |
|
|
|
) |
|
|
|
local ret = lua_redis.exec_redis_script(rule.backend.script_get, |
|
|
|
{task = task, is_write = false}, |
|
|
|
redis_get_cb, |
|
|
|
{token}) |
|
|
|
if not ret then |
|
|
|
rspamd_logger.errx(task, 'cannot make redis request to check results') |
|
|
|
end |
|
|
@@ -697,8 +769,6 @@ end |
|
|
|
local function reputation_redis_set_token(task, rule, token, values, continuation_cb) |
|
|
|
local key = gen_token_key(token, rule) |
|
|
|
|
|
|
|
local ret,conn |
|
|
|
|
|
|
|
local function redis_set_cb(err, data) |
|
|
|
if err then |
|
|
|
rspamd_logger.errx(task, 'got error while setting reputation keys %s: %s', |
|
|
@@ -714,24 +784,17 @@ local function reputation_redis_set_token(task, rule, token, values, continuatio |
|
|
|
end |
|
|
|
|
|
|
|
-- We start from expiry update |
|
|
|
ret,conn = rspamd_redis_make_request(task, |
|
|
|
redis_params, -- connect params |
|
|
|
nil, -- hash key |
|
|
|
true, -- is write |
|
|
|
redis_set_cb, --callback |
|
|
|
'EXPIRE', -- command |
|
|
|
{key, tostring(rule.backend.config.expiry)} -- arguments |
|
|
|
) |
|
|
|
-- Update greylisting record expire |
|
|
|
if ret then |
|
|
|
-- Here, we increment all hash keys that are listed in values |
|
|
|
-- E.g. {'s': 1.0} or {'h': -1.0}, floating point allows better flexibility |
|
|
|
fun.each(function(k, v) |
|
|
|
conn:add_cmd('HINCRBYFLOAT', {key, tostring(k), tostring(v)}) |
|
|
|
end, values) |
|
|
|
-- Add last modification time (might be not very consistent between updates) |
|
|
|
conn:add_cmd('HSET', {key, 'last', tostring(rspamd_util:get_calendar_ticks())}) |
|
|
|
else |
|
|
|
local args = {} |
|
|
|
for k,v in pairs(values) do |
|
|
|
table.insert(args, k) |
|
|
|
table.insert(args, v) |
|
|
|
end |
|
|
|
local ret = lua_redis.exec_redis_script(rule.backend.script_get, |
|
|
|
{task = task, is_write = true}, |
|
|
|
redis_set_cb, |
|
|
|
{token, tostring(rspamd_util:get_calendar_ticks()), |
|
|
|
tostring(rule.backend.config.expiry)}, args) |
|
|
|
if not ret then |
|
|
|
rspamd_logger.errx(task, 'got error while connecting to redis') |
|
|
|
end |
|
|
|
end |
|
|
@@ -751,11 +814,12 @@ local backends = { |
|
|
|
{ |
|
|
|
time = 60 * 60, |
|
|
|
name = '1h', |
|
|
|
mult = 1.5, |
|
|
|
}, |
|
|
|
{ |
|
|
|
time = 60 * 60 * 24 * 30, |
|
|
|
name = '1m', |
|
|
|
|
|
|
|
mult = 1.0, |
|
|
|
} |
|
|
|
}, -- What buckets should be used, default 1h and 1month |
|
|
|
}, |