From d86d2d23789e772c66f23bdd2cb5827ce3bb7699 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 26 Sep 2018 13:54:41 +0100 Subject: [Project] Add Redis logic to the clustering module --- src/plugins/lua/clustering.lua | 83 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 80 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/plugins/lua/clustering.lua b/src/plugins/lua/clustering.lua index cf74141e7..6a7fd2c8d 100644 --- a/src/plugins/lua/clustering.lua +++ b/src/plugins/lua/clustering.lua @@ -40,9 +40,9 @@ local default_rule = { expire_overflow = 36000, -- Expire for a bucket when limit is reached spam_mult = 1.0, -- Increase on spam hit junk_mult = 0.5, -- Increase on junk - ham_mult = 0.1, -- Increase on ham + ham_mult = -0.1, -- Increase on ham size_mult = 0.01, -- Reaches 1.0 on `max_elts` - rate_mult = 0.1, + score_mult = 0.1, } local rule_schema = ts.shape{ @@ -53,13 +53,87 @@ local rule_schema = ts.shape{ junk_mult = ts.number, ham_mult = ts.number, size_mult = ts.number, - rate_mult = ts.number, + score_mult = ts.number, source_selector = ts.string, cluster_selector = ts.string, symbol = ts.string:is_optional(), prefix = ts.string:is_optional(), } +-- Redis scripts + +-- Queries for a cluster's data +-- Arguments: +-- 1. Source selector (string) +-- 2. Cluster selector (string) +-- Returns: {cur_elts, total_score, element_score} +local query_cluster_script = [[ +local sz = redis.call('HLEN', KEYS[1]) + +if not sz or not tonumber(sz) then + -- New bucket, will update on idempotent phase + return {0, '0', '0'} +end + +local total_score = redis.call('HGET', KEYS[1], '__s') +total_score = tonumber(total_score) or 0 +local score = redis.call('HGET', KEYS[1], KEYS[2]) +if not score or not tonumber(score) then + return {sz, tostring(total_score), '0'} +end +return {sz, tostring(total_score), tostring(score)} +]] +local query_cluster_id + +-- Updates cluster's data +-- Arguments: +-- 1. Source selector (string) +-- 2. Cluster selector (string) +-- 3. Score (number) +-- 4. Max buckets (number) +-- 5. Expire (number) +-- 6. Expire overflow (number) +-- Returns: nothing +local update_cluster_script = [[ +local sz = redis.call('HLEN', KEYS[1]) + +if not sz or not tonumber(sz) then + -- Create bucket + redis.call('HSET', KEYS[1], KEYS[2], math.abs(KEYS[3])) + redis.call('HSET', KEYS[1], '__s', KEYS[3]) + redis.call('EXPIRE', KEYS[1], KEYS[5]) + + return +end + +sz = tonumber(sz) +local lim = tonumber(KEYS[4]) + +if sz > lim then + + if k then + -- Existing key + redis.call('HINCRBYFLOAT', KEYS[1], KEYS[2], math.abs(KEYS[3])) + end +else + redis.call('HINCRBYFLOAT', KEYS[1], KEYS[2], math.abs(KEYS[3])) + redis.call('EXPIRE', KEYS[1], KEYS[6]) +end + +redis.call('HINCRBYFLOAT', KEYS[1], '__s', KEYS[3]) +redis.call('EXPIRE', KEYS[1], KEYS[5]) +]] +local update_cluster_id + +-- Callbacks and logic + +local function clusterting_filter_cb(task, rule) + +end + +local function clusterting_idempotent_cb(task, rule) + +end -- Init part redis_params = lua_redis.parse_redis_server('clustering') local opts = rspamd_config:get_all_opt("clustering") @@ -100,6 +174,9 @@ if opts['rules'] then end if #rules > 0 then + + query_cluster_id = lua_redis.add_redis_script(query_cluster_script, redis_params) + update_cluster_id = lua_redis.add_redis_script(update_cluster_script, redis_params) local function callback_gen(f, rule) return function(task) return f(task, rule) end end -- cgit v1.2.3