]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Implement buckets for Redis backend
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 28 Feb 2018 13:42:53 +0000 (13:42 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 28 Feb 2018 13:43:14 +0000 (13:43 +0000)
src/plugins/lua/reputation.lua

index a4511a09e402df223e8f9c3e857331d2b1fcf09f..5ab55422031cf957aae6d8833b0a5296e01851cf 100644 (file)
@@ -28,6 +28,7 @@ local rspamd_util = require "rspamd_util"
 local lua_util = require "lua_util"
 local lua_maps = require "lua_maps"
 local hash = require 'rspamd_cryptobox_hash'
+local lua_redis = require "lua_redis"
 local fun = require "fun"
 local redis_params = nil
 local default_expiry = 864000 -- 10 day by default
@@ -652,6 +653,81 @@ local function reputation_dns_get_token(task, rule, token, continuation_cb)
   })
 end
 
+local function reputation_redis_init(rule, cfg, ev_base, worker)
+  if not redis_params then
+    return false
+  end
+  -- Init scripts for buckets
+  local redis_get_script_tpl = [[
+local key = KEYS[1] .. ${name}
+local vals = redis.call('HGETALL', key)
+for i=1,#vals,2 do
+  local k = vals[i]
+  local v = vals[i + 1]
+  if scores[k] then
+    scores[k] = scores[k] + tonumber(v) * ${mult}
+  else
+    scores[k] = tonumber(v) * ${mult}
+  end
+end
+]]
+  local redis_script_tbl = {'local scores = {}'}
+  for _,bucket in ipairs(rule.backend.config.buckets) do
+    table.insert(redis_script_tbl, lua_util.template(redis_get_script_tpl,
+        fun.totable(fun.map(tostring, bucket))))
+  end
+  table.insert(redis_script_tbl, [[
+  local result = {}
+  for k,v in pairs(scores) do
+   table.insert(result, k)
+   table.insert(result, v)
+  end
+
+  return result
+]])
+  rule.backend.script_get = lua_redis.add_redis_script(table.concat(redis_script_tbl, '\n'),
+      redis_params)
+
+  redis_script_tbl = {}
+  local redis_set_script_tpl = [[
+local key = KEYS[1] .. ${name}
+local last = tonumber(redis.call('HGET', key, 'start'))
+local now = tonumber(KEYS[2])
+if not last then
+  last = 0
+end
+local discriminate_bucket = false
+if now - last > ${time} then
+  discriminate_bucket = true
+  redis.call('HSET', key, 'start', now)
+end
+for i=1,#ARGV,2 do
+  local k = ARGV[i]
+  local v = tonumber(ARGV[i + 1])
+
+  if discriminate_bucket then
+    local last_value = redis.call('HGET', key, k)
+    if last_value then
+      redis.call('HSET', key, k, last_value / 2.0)
+    end
+  end
+  redis.call('HINCRBYFLOAT', key, k, v)
+end
+
+redis.call('EXPIRE', key, KEYS[3])
+redis.call('HSET', key, 'last', now)
+]]
+  for _,bucket in ipairs(rule.backend.config.buckets) do
+    table.insert(redis_script_tbl, lua_util.template(redis_set_script_tpl,
+        fun.totable(fun.map(tostring, bucket))))
+  end
+
+  rule.backend.script_set = lua_redis.add_redis_script(table.concat(redis_script_tbl, '\n'),
+      redis_params)
+
+  return true
+end
+
 local function reputation_redis_get_token(task, rule, token, continuation_cb)
   local key = gen_token_key(token, rule)
 
@@ -681,14 +757,10 @@ local function reputation_redis_get_token(task, rule, token, continuation_cb)
     end
   end
 
-  local ret = rspamd_redis_make_request(task,
-    redis_params, -- connect params
-    key, -- hash key
-    false, -- is write
-    redis_get_cb, --callback
-    'HGETALL', -- command
-    {key} -- arguments
-  )
+  local ret = lua_redis.exec_redis_script(rule.backend.script_get,
+      {task = task, is_write = false},
+      redis_get_cb,
+      {token})
   if not ret then
     rspamd_logger.errx(task, 'cannot make redis request to check results')
   end
@@ -697,8 +769,6 @@ end
 local function reputation_redis_set_token(task, rule, token, values, continuation_cb)
   local key = gen_token_key(token, rule)
 
-  local ret,conn
-
   local function redis_set_cb(err, data)
     if err then
       rspamd_logger.errx(task, 'got error while setting reputation keys %s: %s',
@@ -714,24 +784,17 @@ local function reputation_redis_set_token(task, rule, token, values, continuatio
   end
 
   -- We start from expiry update
-  ret,conn = rspamd_redis_make_request(task,
-    redis_params, -- connect params
-    nil, -- hash key
-    true, -- is write
-    redis_set_cb, --callback
-    'EXPIRE', -- command
-    {key, tostring(rule.backend.config.expiry)} -- arguments
-  )
-  -- Update greylisting record expire
-  if ret then
-    -- Here, we increment all hash keys that are listed in values
-    -- E.g. {'s': 1.0} or {'h': -1.0}, floating point allows better flexibility
-    fun.each(function(k, v)
-      conn:add_cmd('HINCRBYFLOAT', {key, tostring(k), tostring(v)})
-    end, values)
-    -- Add last modification time (might be not very consistent between updates)
-    conn:add_cmd('HSET', {key, 'last', tostring(rspamd_util:get_calendar_ticks())})
-  else
+  local args = {}
+  for k,v in pairs(values) do
+    table.insert(args, k)
+    table.insert(args, v)
+  end
+  local ret = lua_redis.exec_redis_script(rule.backend.script_get,
+      {task = task, is_write = true},
+      redis_set_cb,
+      {token, tostring(rspamd_util:get_calendar_ticks()),
+       tostring(rule.backend.config.expiry)}, args)
+  if not ret then
     rspamd_logger.errx(task, 'got error while connecting to redis')
   end
 end
@@ -751,11 +814,12 @@ local backends = {
         {
           time = 60 * 60,
           name = '1h',
+          mult = 1.5,
         },
         {
           time = 60 * 60 * 24 * 30,
           name = '1m',
-
+          mult = 1.0,
         }
       }, -- What buckets should be used, default 1h and 1month
     },