summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-09-26 13:54:41 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-09-26 18:21:52 +0100
commitd86d2d23789e772c66f23bdd2cb5827ce3bb7699 (patch)
tree481bbff92d785617b8641f4206a9ffea7ffda5b3 /src
parent23a55f963e6c24d9411afadb4ce42c9a1b140de5 (diff)
downloadrspamd-d86d2d23789e772c66f23bdd2cb5827ce3bb7699.tar.gz
rspamd-d86d2d23789e772c66f23bdd2cb5827ce3bb7699.zip
[Project] Add Redis logic to the clustering module
Diffstat (limited to 'src')
-rw-r--r--src/plugins/lua/clustering.lua83
1 files changed, 80 insertions, 3 deletions
diff --git a/src/plugins/lua/clustering.lua b/src/plugins/lua/clustering.lua
index cf74141e7..6a7fd2c8d 100644
--- a/src/plugins/lua/clustering.lua
+++ b/src/plugins/lua/clustering.lua
@@ -40,9 +40,9 @@ local default_rule = {
expire_overflow = 36000, -- Expire for a bucket when limit is reached
spam_mult = 1.0, -- Increase on spam hit
junk_mult = 0.5, -- Increase on junk
- ham_mult = 0.1, -- Increase on ham
+ ham_mult = -0.1, -- Increase on ham
size_mult = 0.01, -- Reaches 1.0 on `max_elts`
- rate_mult = 0.1,
+ score_mult = 0.1,
}
local rule_schema = ts.shape{
@@ -53,13 +53,87 @@ local rule_schema = ts.shape{
junk_mult = ts.number,
ham_mult = ts.number,
size_mult = ts.number,
- rate_mult = ts.number,
+ score_mult = ts.number,
source_selector = ts.string,
cluster_selector = ts.string,
symbol = ts.string:is_optional(),
prefix = ts.string:is_optional(),
}
+-- Redis scripts
+
+-- Queries for a cluster's data
+-- Arguments:
+-- 1. Source selector (string)
+-- 2. Cluster selector (string)
+-- Returns: {cur_elts, total_score, element_score}
+local query_cluster_script = [[
+local sz = redis.call('HLEN', KEYS[1])
+
+if not sz or not tonumber(sz) then
+ -- New bucket, will update on idempotent phase
+ return {0, '0', '0'}
+end
+
+local total_score = redis.call('HGET', KEYS[1], '__s')
+total_score = tonumber(total_score) or 0
+local score = redis.call('HGET', KEYS[1], KEYS[2])
+if not score or not tonumber(score) then
+ return {sz, tostring(total_score), '0'}
+end
+return {sz, tostring(total_score), tostring(score)}
+]]
+local query_cluster_id
+
+-- Updates cluster's data
+-- Arguments:
+-- 1. Source selector (string)
+-- 2. Cluster selector (string)
+-- 3. Score (number)
+-- 4. Max buckets (number)
+-- 5. Expire (number)
+-- 6. Expire overflow (number)
+-- Returns: nothing
+local update_cluster_script = [[
+local sz = redis.call('HLEN', KEYS[1])
+
+if not sz or not tonumber(sz) then
+ -- Create bucket
+ redis.call('HSET', KEYS[1], KEYS[2], math.abs(KEYS[3]))
+ redis.call('HSET', KEYS[1], '__s', KEYS[3])
+ redis.call('EXPIRE', KEYS[1], KEYS[5])
+
+ return
+end
+
+sz = tonumber(sz)
+local lim = tonumber(KEYS[4])
+
+if sz > lim then
+
+ if k then
+ -- Existing key
+ redis.call('HINCRBYFLOAT', KEYS[1], KEYS[2], math.abs(KEYS[3]))
+ end
+else
+ redis.call('HINCRBYFLOAT', KEYS[1], KEYS[2], math.abs(KEYS[3]))
+ redis.call('EXPIRE', KEYS[1], KEYS[6])
+end
+
+redis.call('HINCRBYFLOAT', KEYS[1], '__s', KEYS[3])
+redis.call('EXPIRE', KEYS[1], KEYS[5])
+]]
+local update_cluster_id
+
+-- Callbacks and logic
+
+local function clusterting_filter_cb(task, rule)
+
+end
+
+local function clusterting_idempotent_cb(task, rule)
+
+end
-- Init part
redis_params = lua_redis.parse_redis_server('clustering')
local opts = rspamd_config:get_all_opt("clustering")
@@ -100,6 +174,9 @@ if opts['rules'] then
end
if #rules > 0 then
+
+ query_cluster_id = lua_redis.add_redis_script(query_cluster_script, redis_params)
+ update_cluster_id = lua_redis.add_redis_script(update_cluster_script, redis_params)
local function callback_gen(f, rule)
return function(task) return f(task, rule) end
end