--[[
-Copyright (c) 2011-2015, Vsevolod Stakhov <vsevolod@highsecure.ru>
+Copyright (c) 2011-2017, Vsevolod Stakhov <vsevolod@highsecure.ru>
+Copyright (c) 2016-2017, Andrew Lewis <nerf@judo.za.org>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
return
end
--- A plugin that implements ratelimits using redis or kvstorage server
+-- A plugin that implements ratelimits using redis
-local E = {}
-
--- Default settings for limits, 1-st member is burst, second is rate and the third is numeric type
-local settings = {
-}
+local E, settings = {}, {}
+local N = 'ratelimit'
-- Senders that are considered as bounce
local bounce_senders = {'postmaster', 'mailer-daemon', '', 'null', 'fetchmail-daemon', 'mdaemon'}
-- Do not check ratelimits for these recipients
local redis_params
local ratelimit_symbol
-- Do not delay mail after 1 day
-local max_delay = 24 * 3600
local use_ip_score = false
-local rl_prefix = 'rl'
+local rl_prefix = 'RL'
local ip_score_lower_bound = 10
local ip_score_ham_multiplier = 1.1
local ip_score_spam_divisor = 1.1
local rspamd_logger = require "rspamd_logger"
local rspamd_util = require "rspamd_util"
local rspamd_lua_utils = require "lua_util"
+local lua_redis = require "lua_redis"
local fun = require "fun"
local user_keywords = {'user'}
+local redis_script_sha
+local redis_script = [[local bucket
+local limited = false
+local buckets = {}
+local queue_id = table.remove(ARGV)
+local now = table.remove(ARGV)
+
+local argi = 0
+for i = 1, #KEYS do
+ local key = KEYS[i]
+ local period = tonumber(ARGV[argi+1])
+ local limit = tonumber(ARGV[argi+2])
+ if not buckets[key] then
+ buckets[key] = {
+ max_period = period,
+ limits = { {period, limit} },
+ }
+ else
+ table.insert(buckets[key].limits, {period, limit})
+ if period > buckets[key].max_period then
+ buckets[key].max_period = period
+ end
+ end
+ argi = argi + 2
+end
+
+for k, v in pairs(buckets) do
+ local maxp = v.max_period
+ redis.call('ZREMRANGEBYSCORE', k, '-inf', now - maxp)
+ for _, lim in ipairs(v.limits) do
+ local period = lim[1]
+ local limit = lim[2]
+ local rate
+ if period == maxp then
+ rate = redis.call('ZCARD', k)
+ else
+ rate = redis.call('ZCOUNT', k, now - period, '+inf')
+ end
+ if rate and rate >= limit then
+ limited = true
+ bucket = k
+ end
+ end
+ redis.call('EXPIRE', k, maxp)
+ if limited then break end
+end
+
+if not limited then
+ for k in pairs(buckets) do
+ redis.call('ZADD', k, now, queue_id)
+ end
+end
+
+return {limited, bucket}]]
+
+local redis_script_symbol = [[local limited = false
+local buckets, results = {}, {}
+local queue_id = table.remove(ARGV)
+local now = table.remove(ARGV)
+
+local argi = 0
+for i = 1, #KEYS do
+ local key = KEYS[i]
+ local period = tonumber(ARGV[argi+1])
+ local limit = tonumber(ARGV[argi+2])
+ if not buckets[key] then
+ buckets[key] = {
+ max_period = period,
+ limits = { {period, limit} },
+ }
+ else
+ table.insert(buckets[key].limits, {period, limit})
+ if period > buckets[key].max_period then
+ buckets[key].max_period = period
+ end
+ end
+ argi = argi + 2
+end
+
+for k, v in pairs(buckets) do
+ local maxp = v.max_period
+ redis.call('ZREMRANGEBYSCORE', k, '-inf', now - maxp)
+ for _, lim in ipairs(v.limits) do
+ local period = lim[1]
+ local limit = lim[2]
+ local rate
+ if period == maxp then
+ rate = redis.call('ZCARD', k)
+ else
+ rate = redis.call('ZCOUNT', k, now - period, '+inf')
+ end
+ if rate then
+ local mult = 2 * math.tanh(rate / (limit * 2))
+ if mult >= 0.5 then
+ table.insert(results, {k, tostring(mult)})
+ end
+ end
+ end
+ redis.call('ZADD', k, now, queue_id)
+ redis.call('EXPIRE', k, maxp)
+end
+
+return results]]
+
+local function load_scripts(cfg, ev_base)
+ local function rl_script_cb(err, data)
+ if err then
+ rspamd_logger.errx(cfg, 'Script loading failed: ' .. err)
+ elseif type(data) == 'string' then
+ redis_script_sha = data
+ end
+ end
+ local script
+ if ratelimit_symbol then
+ script = redis_script_symbol
+ else
+ script = redis_script
+ end
+ lua_redis.redis_make_request_taskless(
+ ev_base,
+ cfg,
+ redis_params,
+ nil, -- key
+ true, -- is write
+ rl_script_cb, --callback
+ 'SCRIPT', -- command
+ {'LOAD', script}
+ )
+end
+
local limit_parser
local function parse_string_limit(lim)
local function parse_time_suffix(s)
local t = lpeg.match(limit_parser.limit, lim)
if t and t[1] and t[2] and t[2] ~= 0 then
- return t[1] / t[2], t[1]
+ return t[2], t[1]
end
rspamd_logger.errx(rspamd_config, 'bad limit: %s', lim)
return nil
end
---- Parse atime and bucket of limit
-local function parse_limits(data)
- local function parse_limit_elt(str)
- local elts = rspamd_str_split(str, ':')
- if not elts or #elts < 2 then
- return {0, 0, 0}
- else
- local atime = tonumber(elts[1])
- local bucket = tonumber(elts[2])
- local ctime = atime
-
- if elts[3] then
- ctime = tonumber(elts[3])
- end
-
- if not ctime then
- ctime = atime
- end
-
- return {atime,bucket,ctime}
- end
- end
-
- return fun.iter(data):map(function(e)
- if type(e) == 'string' then
- return parse_limit_elt(e)
- else
- return {0, 0, 0}
- end
- end):totable()
-end
-
local function resize_element(x_score, x_total, element)
local x_ip_score
if not x_total then x_total = 0 end
end
end
---- Check specific limit inside redis
-local function check_limits(task, args)
-
- local key = fun.foldl(function(acc, k) return acc .. k[2] end, '', args)
- local ret
- --- Called when value is got from server
- local function rate_get_cb(err, data)
- if err then
- rspamd_logger.infox(task, 'got error while getting limit: %1', err)
- end
- if not data then return end
- local ntime = rspamd_util.get_time()
- local asn_score,total_asn,
- country_score,total_country,
- ipnet_score,total_ipnet,
- ip_score, total_ip
- if use_ip_score then
- asn_score,total_asn,
- country_score,total_country,
- ipnet_score,total_ipnet,
- ip_score, total_ip = task:get_mempool():get_variable('ip_score',
- 'double,double,double,double,double,double,double,double')
- end
-
- fun.each(function(elt, limit, rtype)
- local bucket = elt[2]
- local rate = limit[2]
- local threshold = limit[1]
- local atime = elt[1]
- local ctime = elt[3]
-
- if atime == 0 then return end
-
- if use_ip_score then
- local key_keywords = rspamd_str_split(rtype, '_')
- local has_asn, has_ip = false, false
- for _, v in ipairs(key_keywords) do
- if v == "asn" then has_asn = true end
- if v == "ip" then has_ip = true end
- if has_ip and has_asn then break end
- end
- if has_asn and not has_ip then
- bucket = resize_element(asn_score, total_asn, bucket)
- rate = resize_element(asn_score, total_asn, rate)
- elseif has_ip then
- if total_ip and total_ip > ip_score_lower_bound then
- bucket = resize_element(ip_score, total_ip, bucket)
- rate = resize_element(ip_score, total_ip, rate)
- elseif total_ipnet and total_ipnet > ip_score_lower_bound then
- bucket = resize_element(ipnet_score, total_ipnet, bucket)
- rate = resize_element(ipnet_score, total_ipnet, rate)
- elseif total_asn and total_asn > ip_score_lower_bound then
- bucket = resize_element(asn_score, total_asn, bucket)
- rate = resize_element(asn_score, total_asn, rate)
- elseif total_country and total_country > ip_score_lower_bound then
- bucket = resize_element(country_score, total_country, bucket)
- rate = resize_element(country_score, total_country, rate)
- else
- bucket = resize_element(ip_score, total_ip, bucket)
- rate = resize_element(ip_score, total_ip, rate)
- end
- end
- end
-
- if atime - ctime > max_delay then
- rspamd_logger.infox(task, 'limit is too old: %1 seconds; ignore it',
- atime - ctime)
- else
- bucket = bucket - rate * (ntime - atime);
- if bucket > 0 then
- if ratelimit_symbol then
- local mult = 2 * rspamd_util.tanh(bucket / (threshold * 2))
-
- if mult > 0.5 then
- task:insert_result(ratelimit_symbol, mult,
- rtype .. ':' .. string.format('%.2f', mult))
- end
- else
- if bucket > threshold then
- rspamd_logger.infox(task,
- 'ratelimit "%s" exceeded: %s elements with %s limit',
- rtype, bucket, threshold)
- task:set_pre_result('soft reject',
- message_func(task, rtype, bucket, threshold))
- end
- end
- end
- end
- end, fun.zip(parse_limits(data), fun.map(function(a) return a[1] end, args),
- fun.map(function(a) return rspamd_str_split(a[2], ":")[2] end, args)))
- end
-
- ret = rspamd_redis_make_request(task,
- redis_params, -- connect params
- key, -- hash key
- false, -- is write
- rate_get_cb, --callback
- 'mget', -- command
- fun.totable(fun.map(function(l) return l[2] end, args)) -- arguments
- )
- if not ret then
- rspamd_logger.errx(task, 'got error connecting to redis')
- end
-end
-
---- Set specific limit inside redis
-local function set_limits(task, args)
- local key = fun.foldl(function(acc, k) return acc .. k[2] end, '', args)
- local ret, upstream
-
- local function rate_set_cb(err)
- if err then
- rspamd_logger.infox(task, 'got error %s when setting ratelimit record on server %s',
- err, upstream:get_addr())
- end
- end
- local function rate_get_cb(err, data)
- if err then
- rspamd_logger.infox(task, 'got error while setting limit: %1', err)
- end
- if not data then return end
- local ntime = rspamd_util.get_time()
- local values = {}
- fun.each(function(elt, limit)
- local bucket = elt[2]
- local rate = limit[1][2]
- local atime = elt[1]
- local ctime = elt[3]
-
- if atime - ctime > max_delay then
- rspamd_logger.infox(task, 'limit is too old: %1 seconds; start it over',
- atime - ctime)
- bucket = 1
- ctime = ntime
- else
- if bucket > 0 then
- bucket = bucket - rate * (ntime - atime) + 1;
- if bucket < 0 then
- bucket = 1
- end
- else
- bucket = 1
- end
- end
-
- if ctime == 0 then ctime = ntime end
-
- local lstr = string.format('%.3f:%.3f:%.3f', ntime, bucket, ctime)
- table.insert(values, {limit[2], max_delay, lstr})
- end, fun.zip(parse_limits(data), fun.iter(args)))
-
- if #values > 0 then
- local conn
- ret,conn,upstream = rspamd_redis_make_request(task,
- redis_params, -- connect params
- key, -- hash key
- true, -- is write
- rate_set_cb, --callback
- 'setex', -- command
- values[1] -- arguments
- )
-
- if conn then
- fun.each(function(v)
- conn:add_cmd('setex', v)
- end, fun.drop_n(1, values))
- else
- rspamd_logger.errx(task, 'got error while connecting to redis')
- end
- end
- end
-
- local _
- ret,_,upstream = rspamd_redis_make_request(task,
- redis_params, -- connect params
- key, -- hash key
- false, -- is write
- rate_get_cb, --callback
- 'mget', -- command
- fun.totable(fun.map(function(l) return l[2] end, args)) -- arguments
- )
- if not ret then
- rspamd_logger.errx(task, 'got error connecting to redis')
- end
-end
-
---- Check or update ratelimit
-local function rate_test_set(task, func)
+local function get_buckets(task)
local args = {}
-- Get initial task data
local ip = task:get_from_ip()
if rate_key then
if type(rate_key) == 'table' then
for _, rk in ipairs(rate_key) do
- if type(settings[k]) == 'table' then
- table.insert(args, {settings[k], rk})
- elseif type(settings[k]) == 'string' and
+ if type(settings[k]) == 'string' and
(custom_keywords[settings[k]] and type(custom_keywords[settings[k]]['get_limit']) == 'function') then
local res = custom_keywords[settings[k]]['get_limit'](task)
- if type(res) == 'table' then
- table.insert(args, {res, rate_key})
- elseif type(res) == 'string' then
- local plim, size = parse_string_limit(res)
+ if type(res) == 'string' then res = {res} end
+ for _, r in ipairs(res) do
+ local plim, size = parse_string_limit(r)
if plim then
- table.insert(args, {{size, plim, 1}, rate_key})
+ table.insert(args, {{plim, size}, rk})
end
end
end
end
else
- if type(settings[k]) == 'table' then
- table.insert(args, {settings[k], rate_key})
- elseif type(settings[k]) == 'string' and
- (custom_keywords[settings[k]] and type(custom_keywords[settings[k]]['get_limit']) == 'function') then
+ if type(settings[k]) == 'string' and
+ (custom_keywords[settings[k]] and type(custom_keywords[settings[k]]['get_limit']) == 'function') then
local res = custom_keywords[settings[k]]['get_limit'](task)
- if type(res) == 'table' then
- table.insert(args, {res, rate_key})
- elseif type(res) == 'string' then
- local plim, size = parse_string_limit(res)
+ if type(res) == 'string' then res = {res} end
+ for _, r in ipairs(res) do
+ local plim, size = parse_string_limit(r)
if plim then
- table.insert(args, {{size, plim, 1}, rate_key})
+ table.insert(args, {{plim, size}, rate_key})
end
end
+ elseif type(settings[k]) == 'table' then
+ for _, rl in ipairs(settings[k]) do
+ table.insert(args, {{rl[1], rl[2]}, rate_key})
+ end
end
end
end
end
- if #args > 0 then
- func(task, args)
- end
+ return args
end
---- Check limit
-local function rate_test(task)
+local function ratelimit_cb(task)
if rspamd_lua_utils.is_rspamc_or_controller(task) then return end
- rate_test_set(task, check_limits)
-end
---- Update limit
-local function rate_set(task)
- local action = task:get_metric_action('default')
-
- if action ~= 'soft reject' then
- if rspamd_lua_utils.is_rspamc_or_controller(task) then return end
- rate_test_set(task, set_limits)
+ local function rl_redis_cb(err, data)
+ if err then
+ rspamd_logger.infox(task, 'got error while setting limit: %1', err)
+ end
+ if not data then return end
+ if data[1] == 1 then
+ rspamd_logger.infox(task,
+ 'ratelimit "%s" exceeded',
+ data[2])
+ task:set_pre_result('soft reject',
+ message_func(task, data[2]))
+ end
end
-end
-
-
---- Parse a single limit description
-local function parse_limit(str)
- local params = rspamd_str_split(str, ':')
-
- local function set_limit(limit, burst, rate)
- limit[1] = tonumber(burst)
- limit[2] = tonumber(rate)
+ local function rl_symbol_redis_cb(err, data)
+ if err then
+ rspamd_logger.infox(task, 'got error while setting limit: %1', err)
+ end
+ if not data then return end
+ for i, b in ipairs(data) do
+ task:insert_result(ratelimit_symbol, b[2], string.format('%s:%s:%s', i, b[1], b[2]))
+ end
end
-
- if #params ~= 3 then
- rspamd_logger.errx(rspamd_config, 'invalid limit definition: ' .. str)
- return
+ local redis_cb = rl_redis_cb
+ if ratelimit_symbol then redis_cb = rl_symbol_redis_cb end
+ local buckets = get_buckets(task)
+ if not buckets then return end
+ local args = {redis_script_sha, #buckets}
+ for _, bucket in ipairs(buckets) do
+ table.insert(args, bucket[2])
end
-
- local key_keywords = rspamd_str_split(params[1], '_')
- for _, k in ipairs(key_keywords) do
- if (custom_keywords[k] and type(custom_keywords[k]['get_value']) == 'function') or
- (keywords[k] and type(keywords[k]['get_value']) == 'function') then
- set_limit(settings[params[1]], params[2], params[3])
- else
- rspamd_logger.errx(rspamd_config, 'invalid limit type: ' .. params[1])
+ for _, bucket in ipairs(buckets) do
+ if use_ip_score then
+ local asn_score,total_asn,
+ country_score,total_country,
+ ipnet_score,total_ipnet,
+ ip_score, total_ip = task:get_mempool():get_variable('ip_score',
+ 'double,double,double,double,double,double,double,double')
+ local key_keywords = rspamd_str_split(bucket[2], '_')
+ local has_asn, has_ip = false, false
+ for _, v in ipairs(key_keywords) do
+ if v == "asn" then has_asn = true end
+ if v == "ip" then has_ip = true end
+ if has_ip and has_asn then break end
+ end
+ if has_asn and not has_ip then
+ bucket[1][2] = resize_element(asn_score, total_asn, bucket[1][2])
+ elseif has_ip then
+ if total_ip and total_ip > ip_score_lower_bound then
+ bucket[1][2] = resize_element(ip_score, total_ip, bucket[1][2])
+ elseif total_ipnet and total_ipnet > ip_score_lower_bound then
+ bucket[1][2] = resize_element(ipnet_score, total_ipnet, bucket[1][2])
+ elseif total_asn and total_asn > ip_score_lower_bound then
+ bucket[1][2] = resize_element(asn_score, total_asn, bucket[1][2])
+ elseif total_country and total_country > ip_score_lower_bound then
+ bucket[1][2] = resize_element(country_score, total_country, bucket[1][2])
+ else
+ bucket[1][2] = resize_element(ip_score, total_ip, bucket[1][2])
+ end
+ end
end
+ table.insert(args, bucket[1][1])
+ table.insert(args, bucket[1][2])
+ end
+ table.insert(args, rspamd_util.get_time())
+ table.insert(args, task:get_queue_id() or task:get_uid())
+ local ret = rspamd_redis_make_request(task,
+ redis_params, -- connect params
+ nil, -- hash key
+ true, -- is write
+ redis_cb, --callback
+ 'evalsha', -- command
+ args -- arguments
+ )
+ if not ret then
+ rspamd_logger.errx(task, 'got error connecting to redis')
end
end
-local opts = rspamd_config:get_all_opt('ratelimit')
+local opts = rspamd_config:get_all_opt(N)
if opts then
- local rates = opts['limit']
- if rates and type(rates) == 'table' then
- fun.each(parse_limit, rates)
- elseif rates and type(rates) == 'string' then
- parse_limit(rates)
+ if opts['limit'] then
+ rspamd_logger.errx(rspamd_config, 'Legacy ratelimit config format no longer supported')
end
if opts['rates'] and type(opts['rates']) == 'table' then
-- new way of setting limits
fun.each(function(t, lim)
if type(lim) == 'table' then
- settings[t] = lim
+ settings[t] = {}
+ fun.each(function(l)
+ local plim, size = parse_string_limit(l)
+ if plim then
+ table.insert(settings[t], {plim, size})
+ end
+ end, lim)
elseif type(lim) == 'string' then
local plim, size = parse_string_limit(lim)
if plim then
- settings[t] = {size, plim, 1}
+ settings[t] = { {plim, size} }
end
end
end, opts['rates'])
local enabled_limits = fun.totable(fun.map(function(t)
return t
- end, fun.filter(function(_, lim)
- return type(lim) == 'string' or
- (type(lim) == 'table' and type(lim[1]) == 'number' and lim[1] > 0)
- or (type(lim) == 'table' and (lim[3]))
- end, settings)))
+ end, settings))
rspamd_logger.infox(rspamd_config, 'enabled rate buckets: [%1]', table.concat(enabled_limits, ','))
if opts['whitelisted_rcpts'] and type(opts['whitelisted_rcpts']) == 'string' then
max_rcpt = tonumber(opts['max_rcpt'])
end
- if opts['max_delay'] then
- max_rcpt = tonumber(opts['max_delay'])
- end
-
if opts['use_ip_score'] then
use_ip_score = true
local ip_score_opts = rspamd_config:get_all_opt('ip_score')
if not redis_params then
rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
else
- if not ratelimit_symbol and not use_ip_score then
- rspamd_config:register_symbol({
- name = 'RATELIMIT_CHECK',
- callback = rate_test,
- type = 'prefilter,nostat',
- priority = 4,
- })
- else
- local symbol
- if not ratelimit_symbol then
- symbol = 'RATELIMIT_CHECK'
- else
- symbol = ratelimit_symbol
- end
- local id = rspamd_config:register_symbol({
- name = symbol,
- callback = rate_test,
- type = 'normal,nostat'
- })
- if use_ip_score then
- rspamd_config:register_dependency(id, 'IP_SCORE')
- end
+ local s = {
+ type = 'prefilter,nostat',
+ name = 'RATELIMIT_CHECK',
+ priority = 4,
+ callback = ratelimit_cb,
+ }
+ if use_ip_score then
+ s.type = 'normal'
+ end
+ if ratelimit_symbol then
+ s.name = ratelimit_symbol
+ end
+ local id = rspamd_config:register_symbol(s)
+ if use_ip_score then
+ rspamd_config:register_dependency(id, 'IP_SCORE')
end
- rspamd_config:register_symbol({
- name = 'RATELIMIT_SET',
- type = 'idempotent',
- priority = 5,
- callback = rate_set,
- })
for _, v in pairs(custom_keywords) do
if type(v) == 'table' and type(v['init']) == 'function' then
v['init']()
end
end
end
-
+rspamd_config:add_on_load(function(cfg, ev_base, worker)
+ load_scripts(cfg, ev_base)
+end)