From ac9471b712a2fccd630ca42394c9d80825cda1aa Mon Sep 17 00:00:00 2001 From: Andrew Lewis Date: Fri, 13 Jan 2017 13:39:08 +0200 Subject: [PATCH] [Minor] URL reputation plugin changes - Fix plugin post changes - Fix Lua API - Limit redis queries - Ignore messages with many TLDs in dynamic reputation - Try find relevant domain in dynamic reputation --- src/lua/lua_url.c | 2 +- src/plugins/lua/url_reputation.lua | 275 ++++++++++++++++++++--------- 2 files changed, 197 insertions(+), 80 deletions(-) diff --git a/src/lua/lua_url.c b/src/lua/lua_url.c index cdc74e8d8..2f1270099 100644 --- a/src/lua/lua_url.c +++ b/src/lua/lua_url.c @@ -418,7 +418,7 @@ static gint lua_url_add_tag (lua_State *L) { struct rspamd_lua_url *url = lua_check_url (L, 1); - rspamd_mempool_t *mempool = rspamd_lua_check_mempool (L, 3); + rspamd_mempool_t *mempool = rspamd_lua_check_mempool (L, 4); const gchar *tag = luaL_checkstring (L, 2); const gchar *value; diff --git a/src/plugins/lua/url_reputation.lua b/src/plugins/lua/url_reputation.lua index 26093cd2a..7366c1f11 100644 --- a/src/plugins/lua/url_reputation.lua +++ b/src/plugins/lua/url_reputation.lua @@ -17,6 +17,7 @@ limitations under the License. -- A plugin that restores/persists URL reputation (tags) +local E = {} local N = 'url_reputation' local redis_params, redis_set_script_sha @@ -41,21 +42,33 @@ local settings = { grey = 'URL_REPUTATION_GREY', neutral = 'URL_REPUTATION_NEUTRAL', }, + foreign_symbols = { + dmarc = 'DMARC_POLICY_ALLOW', + dkim = 'R_DKIM_ALLOW', + spf = 'R_SPF_ALLOW', + }, + -- how many messages to score reputation threshold = 5, - limit = 3, + -- set reputation for only so many TLDs + update_limit = 1, + -- query dynamic reputation for up to so many TLDs + query_limit = 100, + -- try find most relevant URL + relevance = true, } local scale = { - 'white', - 'neutral', - 'grey', - 'black', + 'white', -- 1 + 'neutral', -- 2 + 'grey', -- 3 + 'black', -- 4 } local rspamd_logger = require "rspamd_logger" local rspamd_util = require "rspamd_util" local ucl = require "ucl" +-- This function is used for taskless redis requests (to load scripts) local function redis_make_request(ev_base, cfg, key, is_write, callback, command, args) if not ev_base or not redis_params or not callback or not command then return false,nil,nil @@ -107,29 +120,54 @@ local function redis_make_request(ev_base, cfg, key, is_write, callback, command return ret,conn,addr end +-- Tags are stored in format: [timestamp]|[tag1],[timestamp]|[tag2] local redis_set_script_head = 'local expiry = ' local redis_set_script_tail = [[ local now = math.floor(table.remove(ARGV)) local res = redis.call('MGET', unpack(KEYS)) for i = 1, #res do - local tmp1, tmp2 = {}, {} + local tmp1, tmp2, metatags = {}, {}, {} if res[i] then - for time, tag in string.gmatch(res[i], '(%d+)|([^,]+)') do + for goo in string.gmatch(res[i], '[^/]+') do + local time, tag, meta = string.match(goo, '(%d+)|([^|]+)|(.+)') if (time + expiry) > now then - tmp1[tag] = time + for m in string.gmatch(meta, '[^,]+') do + metatags[m] = true + end + tmp1[tag] = {time, metatags} end end end - for tag in string.gmatch(ARGV[i], '[^,]+') do - tmp1[tag] = now + local idx = string.find(ARGV[i], '|') + if not idx then + return redis.error_reply('bad arguments') end - for k in pairs(tmp1) do - table.insert(tmp2, tmp1[k] .. '|' .. k) + local t_str = string.sub(ARGV[i], 1, idx - 1) + local m_str = string.sub(ARGV[i], idx + 1) + local mm = string.gmatch(m_str, '[^,]+') + for t in string.gmatch(t_str, '[^,]+') do + if not tmp1[t] then + tmp1[t] = {now, {}} + else + tmp1[t][1] = now + end + local mt_str = mm() + for mt in string.gmatch(mt_str, '[^,]+') do + tmp1[t][2][mt] = true + end end - redis.call('SETEX', KEYS[i], expiry, table.concat(tmp2, ',')) + for k, v in pairs(tmp1) do + local meta_list = {} + for kk in pairs(v[2]) do + table.insert(meta_list, kk) + end + table.insert(tmp2, v[1] .. '|' .. k .. '|' .. table.concat(meta_list, ',')) + end + redis.call('SETEX', KEYS[i], expiry, table.concat(tmp2, '/')) end ]] +-- Function to load the script local function load_scripts(cfg, ev_base) local function redis_set_script_cb(err, data) if err then @@ -153,8 +191,10 @@ local function load_scripts(cfg, ev_base) ) end +-- Saves tags and calculates URL reputation local function tags_save(task) + -- Handle errors (reloads script if necessary) local function redis_set_cb(err) if err then rspamd_logger.errx(task, 'Redis error: %s', err) @@ -170,19 +210,86 @@ local function tags_save(task) local reputation = 2 local which + -- Save tags to redis and insert symbol + local function insert_results() + task:insert_result(settings.symbols[scale[reputation]], 1.0, which) + -- Abort if no tags were found + if not next(tags) then return end + -- Don't populate old tags + local old_tags = task:get_mempool():get_variable('urltags') + if old_tags then + local parser = ucl.parser() + local res, err = parser:parse_string(old_tags) + if not res then + rspamd_logger.errx(task, 'Parser error: %s', err) + return + end + local obj = parser:get_object() + for k, v in pairs(obj) do + if tags[k] then + for sk in pairs(v) do + tags[k][sk] = nil + end + if not next(tags[k]) then + tags[k] = nil + end + end + end + end + -- Prepare arguments to send to Redis + local redis_keys = {} + local redis_args = {} + for dom, v in pairs(tags) do + table.insert(redis_keys, settings.key_prefix_tags .. dom) + local tmp, tmp2 = {}, {} + for k, vv in pairs(v) do + table.insert(tmp, k) + for kk in pairs(vv) do + table.insert(tmp2, kk) + end + end + table.insert(redis_args, table.concat(tmp, ',') .. '|' .. table.concat(tmp2, ',')) + end + local redis_final = {redis_set_script_sha} + table.insert(redis_final, #redis_keys) + for _, k in ipairs(redis_keys) do + table.insert(redis_final, k) + end + for _, a in ipairs(redis_args) do + table.insert(redis_final, a) + end + table.insert(redis_final, rspamd_util.get_time()) + rspamd_redis_make_request(task, + redis_params, + nil, + true, -- is write + redis_set_cb, --callback + 'EVALSHA', -- command + redis_final + ) + end + + -- Dynamic reputation is used in absence of tags local function dynamic_reputation() + local subset = {} local keys = {} + + -- Spit out log if INCR fails local function redis_incr_cb(err) if err then rspamd_logger.errx(task, 'couldnt increment reputation: %s', err) end end + local function rep_get_cb(err, data) + -- Abort if we couldn't query redis for reputation info if err then rspamd_logger.errx(task, 'couldnt get dynamic reputation: %s', err) return end + + -- Try find worst reputation domain and set reputation accordingly local i, x, highest = 1, 1, 0 while(data[i]) do if type(data[i]) == 'string' then @@ -220,21 +327,65 @@ local function tags_save(task) end local rk if which then + -- Update reputation for guilty domain only rk = { settings.key_prefix_rep .. which .. '_total', settings.key_prefix_rep .. which .. '_' .. scale[reputation], } else + -- No reputation found, pick some URLs + local most_relevant + if settings.relevance then + -- XXX: blacklist for non-relevant identifiers (gmail etc) + local dmarc = ((task:get_symbol(settings.foreign_symbols['dmarc']) or E)[1] or E).options + local dkim = ((task:get_symbol(settings.foreign_symbols['dkim']) or E)[1] or E).options + local spf = task:get_symbol(settings.foreign_symbols['spf']) + local hostname = task:get_hostname() + if hostname then + hostname = rspamd_util.get_tld(hostname) + end + if spf then + local from = task:get_from(1) + if ((from or E)[1] or E).domain then + spf = rspamd_util.get_tld(from[1]['domain']) + else + local helo = task:get_helo() + if helo then + spf = rspamd_util.get_tld(helo) + end + end + end + for _, t in ipairs(tlds) do + if t == dmarc then + most_relevant = t + break + elseif t == dkim then + most_relevant = t + break + elseif t == spf then + most_relevant = t + break + elseif t == hostname then + most_relevant = t + break + end + end + end + rk = {} local added = 0 + if most_relevant then + tlds = {most_relevant} + which = most_relevant + end for t in pairs(tlds) do + if settings.update_limit and added > settings.update_limit then + rspamd_logger.warnx(task, 'Not updating reputation on all TLDs') + break + end table.insert(rk, settings.key_prefix_rep .. t .. '_total') table.insert(rk, settings.key_prefix_rep .. t .. '_' .. scale[reputation]) added = added + 1 - if added >= settings.limit then - rspamd_logger.warnx(task, 'Not setting reputation on all TLDs') - break - end end end for _, k in ipairs(rk) do @@ -250,10 +401,9 @@ local function tags_save(task) rspamd_logger.errx(task, 'couldnt schedule increment') end end - if which then - task:insert_result(settings.symbols[scale[reputation]], 1.0, which) - end + insert_results() end + local action = task:get_metric_action('default') if action == 'reject' then reputation = 4 @@ -265,16 +415,22 @@ local function tags_save(task) reputation = 1 end end - local count = 0 + + local added = 0 for k in pairs(tlds) do + if settings.query_limit and added >= settings.query_limit then + rspamd_logger.warnx(task, 'not querying reputation for all TLDs') + break + end + added = added + 1 table.insert(subset, k) table.insert(keys, settings.key_prefix_rep .. k .. '_total') table.insert(keys, settings.key_prefix_rep .. k .. '_white') table.insert(keys, settings.key_prefix_rep .. k .. '_black') table.insert(keys, settings.key_prefix_rep .. k .. '_grey') table.insert(keys, settings.key_prefix_rep .. k .. '_neutral') - count = count + 1 end + local key = keys[1] if key then rspamd_redis_make_request(task, @@ -297,13 +453,19 @@ local function tags_save(task) tld_count = tld_count + 1 end local utags = url:get_tags() - if utags[1] then + if next(utags) then local dom = url:get_tld() if not tags[dom] then tags[dom] = {} end - for _, ut in ipairs(utags) do - tags[dom][ut] = true + for ut, utv in pairs(utags) do + if tags[dom][ut] then + for _, e in ipairs(utv) do + table.insert(tags[dom][ut], e) + end + else + tags[dom][ut] = utv + end local cat = category[ut] if cat == 'black' then reputation = 4 @@ -324,58 +486,7 @@ local function tags_save(task) end return end - task:insert_result(settings.symbols[scale[reputation]], 1.0, which) - -- Abort if no tags were found - if not next(tags) then return end - -- Don't populate old tags - local old_tags = task:get_mempool():get_variable('urltags') - if old_tags then - local parser = ucl.parser() - local res, err = parser:parse_string(old_tags) - if not res then - rspamd_logger.errx(task, 'Parser error: %s', err) - return - end - local obj = parser:get_object() - for k, v in pairs(obj) do - if tags[k] then - for sk in pairs(v) do - tags[k][sk] = nil - end - if not next(tags[k]) then - tags[k] = nil - end - end - end - end - -- Prepare arguments to send to Redis - local redis_keys = {} - local redis_args = {} - for dom, v in pairs(tags) do - table.insert(redis_keys, settings.key_prefix_tags .. dom) - local tmp = {} - for k in pairs(v) do - table.insert(tmp, k) - end - table.insert(redis_args, table.concat(tmp, ',')) - end - local redis_final = {redis_set_script_sha} - table.insert(redis_final, #redis_keys) - for _, k in ipairs(redis_keys) do - table.insert(redis_final, k) - end - for _, a in ipairs(redis_args) do - table.insert(redis_final, a) - end - table.insert(redis_final, rspamd_util.get_time()) - rspamd_redis_make_request(task, - redis_params, - nil, - true, -- is write - redis_set_cb, --callback - 'EVALSHA', -- command - redis_final - ) + insert_results() end local function tags_restore(task) @@ -396,10 +507,16 @@ local function tags_restore(task) for i = 1, d_len do if type(data[i]) == 'string' then local tld = tld_reverse[i] - for time, tag in string.gmatch(data[i], '(%d+)|([^,]+)') do + for time, tag, meta in string.gmatch(data[i], '(%d+)|([^|]+)|(.+)') do if (time + settings.expire) > now then + local metatags = {} + for m in string.gmatch(meta, '[^,]+') do + table.insert(metatags, m) + end for _, idx in ipairs(tlds[tld]) do - urls[idx]:add_tag(tag, mpool) + for _, ttag in ipairs(metatags) do + urls[idx]:add_tag(tag, ttag, mpool) + end end if not tracking[tld] then tracking[tld] = {} -- 2.39.5