From: Andrew Lewis Date: Thu, 11 May 2017 13:52:55 +0000 (+0200) Subject: [Minor] DMARC reporting: rework backend X-Git-Tag: 1.6.0~234^2 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=f41a3d0c634f7b05e76417fdf7b6067299e8ae4d;p=rspamd.git [Minor] DMARC reporting: rework backend - Also fix deletion - Support redirecting reports (useful for evaluation) - Deal with possibly missing SPF results --- diff --git a/src/plugins/lua/dmarc.lua b/src/plugins/lua/dmarc.lua index f7a95c5af..8f148d1b1 100644 --- a/src/plugins/lua/dmarc.lua +++ b/src/plugins/lua/dmarc.lua @@ -17,7 +17,6 @@ limitations under the License. -- Dmarc policy filter -local hash = require "rspamd_cryptobox_hash" local rspamd_logger = require "rspamd_logger" local mempool = require "rspamd_mempool" local rspamd_tcp = require "rspamd_tcp" @@ -38,6 +37,7 @@ local pool = mempool.create() local report_settings = { helo = 'rspamd', + hscan_count = 1000, smtp = '127.0.0.1', smtp_port = 25, } @@ -96,8 +96,9 @@ local dmarc_symbols = { } local redis_keys = { - dmarc_domains = 'dmarc_domains', - dmarc_domain = 'dmarc_%s', + index_prefix = 'dmarc_idx', + report_prefix = 'dmarc', + join_char = ';', } local function gen_xml_grammar() @@ -128,12 +129,14 @@ local E = {} local take_report_sha local take_report_script = [[ -local dmarc_domains_key = KEYS[1] -local dmarc_domain_key = KEYS[2] +local index_key = KEYS[1] +local report_key = KEYS[2] local dmarc_domain = ARGV[1] local report = ARGV[2] -redis.call('SADD', dmarc_domains_key, dmarc_domain) -redis.call('LPUSH', dmarc_domain_key, report) +redis.call('SADD', index_key, report_key) +redis.call('EXPIRE', index_key, 172800) +redis.call('HINCRBY', report_key, report, 1) +redis.call('EXPIRE', report_key, 172800) ]] local function redis_make_request(ev_base, cfg, key, is_write, callback, command, args) @@ -235,10 +238,10 @@ local function dmarc_report(task, spf_ok, dkim_ok, disposition, sampled_out, hfr local dkim_fail = table.concat(dres.fail or E, '|') local dkim_temperror = table.concat(dres.temperror or E, '|') local dkim_permerror = table.concat(dres.permerror or E, '|') - local res = string.format('%d,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s', - task:get_date(0), ip:to_string(), spf_ok, dkim_ok, + local res = table.concat({ + ip:to_string(), spf_ok, dkim_ok, disposition, (sampled_out and 'sampled_out' or ''), hfromdom, - dkim_pass, dkim_fail, dkim_temperror, dkim_permerror, spfdom, spf_result) + dkim_pass, dkim_fail, dkim_temperror, dkim_permerror, spfdom or '', spf_result or ''}, ',') return res end @@ -554,9 +557,11 @@ local function dmarc_callback(task) end end -- Prepare and send redis report element - local dmarc_domain_key = string.format(redis_keys.dmarc_domain, hfromdom) + local period = os.date('%Y%m%d', task:get_date(0)) + local dmarc_domain_key = table.concat({redis_keys.report_prefix, hfromdom, period}, redis_keys.join_char) local report_data = dmarc_report(task, spf_ok and 'pass' or 'fail', dkim_ok and 'pass' or 'fail', dmarc_ok and 'pass' or 'fail', sampled_out, hfromdom, spf_domain, dkim_results, spf_result) + local idx_key = table.concat({redis_keys.index_prefix, period}, redis_keys.join_char) if report_data then local ret = rspamd_redis_make_request(task, @@ -565,7 +570,7 @@ local function dmarc_callback(task) true, -- is write dmarc_report_cb, --callback 'EVALSHA', -- command - {take_report_sha, 2, redis_keys.dmarc_domains, dmarc_domain_key, hfromdom, report_data} -- arguments + {take_report_sha, 2, idx_key, dmarc_domain_key, hfromdom, report_data} -- arguments ) if not ret then rspamd_logger.errx(task, 'Unable to schedule redis request') @@ -649,11 +654,12 @@ if opts['reporting'] == true then assert(f:write(pool:get_variable(VAR_NAME, 'double'))) assert(f:close()) end) - local get_reporting_domain, reporting_domain, report_start, report_end, report_id + local get_reporting_domain, reporting_domain, report_start, report_end, report_id, want_period, report_key local reporting_addr = {} local domain_policy = {} local to_verify = {} - local function entry_to_xml(data, count) + local cursor = 0 + local function entry_to_xml(data) local buf = { string.format( [[ @@ -670,7 +676,7 @@ if opts['reporting'] == true then %s -]], data.ip, count, data.disposition, data.dkim_disposition, data.spf_disposition, data.header_from), +]], data.ip, data.count, data.disposition, data.dkim_disposition, data.spf_disposition, data.header_from), } if data.dkim_results[1] or (data.spf_result ~= '' and data.spf_domain ~= '') then table.insert(buf, '\t\t\n') @@ -694,67 +700,46 @@ if opts['reporting'] == true then report_id = string.format('%s.%d.%d', reporting_domain, report_start, report_end) rspamd_logger.debugm(N, rspamd_config, 'new report: %s', report_id) local actions = { - push = function(data) - local counts = {} + push = function(t) + local data = t[1] local split = rspamd_str_split(data, ',') - local when = tonumber(split[1]) - if when > report_end then - -- XXX: Replace data! - rspamd_logger.debugm(N, rspamd_config, 'report is newer than report_end: %1 %2', when, report_end) - return false - elseif when < report_start then - rspamd_logger.debugm(N, rspamd_config, 'report is older than report_start: %1 %2', when, report_start) - return true - elseif when >= report_start then - local h = hash.create() - for i = 2, #split do - h:update(split[i]) - end - local bin = h:bin() - if counts[bin] then - counts[bin] = counts[bin] + 1 - else - counts[bin] = 1 - end - local count = counts[bin] - local row = { - ip = split[2], - spf_disposition = split[3], - dkim_disposition = split[4], - disposition = split[5], - override = split[6], - header_from = split[7], - dkim_results = {}, - spf_domain = split[12], - spf_result = split[13], - } - if split[8] and split[8] ~= '' then - local tmp = rspamd_str_split(split[8], '|') - for _, d in ipairs(tmp) do - table.insert(row.dkim_results, {domain = d, result = 'pass'}) - end + local row = { + ip = split[1], + spf_disposition = split[2], + dkim_disposition = split[3], + disposition = split[4], + override = split[5], + header_from = split[6], + dkim_results = {}, + spf_domain = split[11], + spf_result = split[12], + count = t[2], + } + if split[7] and split[7] ~= '' then + local tmp = rspamd_str_split(split[7], '|') + for _, d in ipairs(tmp) do + table.insert(row.dkim_results, {domain = d, result = 'pass'}) end - if split[9] and split[9] ~= '' then - local tmp = rspamd_str_split(split[9], '|') - for _, d in ipairs(tmp) do - table.insert(row.dkim_results, {domain = d, result = 'fail'}) - end + end + if split[8] and split[8] ~= '' then + local tmp = rspamd_str_split(split[8], '|') + for _, d in ipairs(tmp) do + table.insert(row.dkim_results, {domain = d, result = 'fail'}) end - if split[10] and split[10] ~= '' then - local tmp = rspamd_str_split(split[10], '|') - for _, d in ipairs(tmp) do - table.insert(row.dkim_results, {domain = d, result = 'temperror'}) - end + end + if split[9] and split[9] ~= '' then + local tmp = rspamd_str_split(split[9], '|') + for _, d in ipairs(tmp) do + table.insert(row.dkim_results, {domain = d, result = 'temperror'}) end - if split[11] and split[11] ~= '' then - local tmp = rspamd_str_split(split[11], '|') - for _, d in ipairs(tmp) do - table.insert(row.dkim_results, {domain = d, result = 'permerror'}) - end + end + if split[10] and split[10] ~= '' then + local tmp = rspamd_str_split(split[10], '|') + for _, d in ipairs(tmp) do + table.insert(row.dkim_results, {domain = d, result = 'permerror'}) end - entries[bin] = {[row] = count} - return true end + table.insert(entries, row) end, header = function() return string.format( @@ -786,9 +771,7 @@ if opts['reporting'] == true then entries = function() local buf = {} for _, e in pairs(entries) do - for k, v in pairs(e) do - table.insert(buf, entry_to_xml(k, v)) - end + table.insert(buf, entry_to_xml(e)) end return table.concat(buf, '') end, @@ -919,35 +902,38 @@ if opts['reporting'] == true then }) end local function make_report() + if type(report_settings.override_address) == 'string' then + reporting_addr = {report_settings.override_address} + end rspamd_logger.infox(ev_base, 'sending report for %s <%s>', reporting_domain, table.concat(reporting_addr, ',')) local dmarc_xml = dmarc_report_xml() local dmarc_push_cb - local dmarc_domain_key = string.format(redis_keys.dmarc_domain, reporting_domain) dmarc_push_cb = function(err, data) if err then rspamd_logger.errx(ev_base, 'Redis request failed: %s', err) -- XXX: data is orphaned; replace key or delete data get_reporting_domain() - elseif type(data) == 'string' then - if dmarc_xml('push', data) then + elseif type(data) == 'table' then + cursor = tonumber(data[1]) + for i = 1, #data[2], 2 do + dmarc_xml('push', {data[2][i], data[2][i+1]}) + end + if cursor ~= 0 then local ret = redis_make_request(ev_base, rspamd_config, nil, false, -- is write dmarc_push_cb, --callback - 'LPOP', -- command - {dmarc_domain_key} + 'HSCAN', -- command + {report_key, cursor, 'COUNT', report_settings.hscan_count} ) if not ret then rspamd_logger.errx(ev_base, 'Failed to schedule redis request') - -- XXX: data is orphaned; replace key or delete data get_reporting_domain() end else send_report_via_email(dmarc_xml) end - else - send_report_via_email(dmarc_xml) end end local ret = redis_make_request(ev_base, @@ -955,8 +941,8 @@ if opts['reporting'] == true then nil, false, -- is write dmarc_push_cb, --callback - 'LPOP', -- command - {dmarc_domain_key} + 'HSCAN', -- command + {report_key, cursor, 'COUNT', report_settings.hscan_count} ) if not ret then rspamd_logger.errx(rspamd_config, 'Failed to schedule redis request') @@ -972,14 +958,13 @@ if opts['reporting'] == true then rspamd_logger.infox(rspamd_config, 'Deleted reports for %s') get_reporting_domain() end - local dmarc_domain_key = string.format(redis_keys.dmarc_domain, reporting_domain) local ret = redis_make_request(ev_base, rspamd_config, nil, false, -- is write delete_reports_cb, --callback 'DEL', -- command - {dmarc_domain_key} + {report_key} ) if not ret then rspamd_logger.errx(rspamd_config, 'Failed to schedule redis request') @@ -1107,6 +1092,7 @@ if opts['reporting'] == true then reporting_domain = nil reporting_addr = {} domain_policy = {} + cursor = 0 local function get_reporting_domain_cb(err, data) if err then rspamd_logger.errx(cfg, 'Unable to get DMARC domain: %s', err) @@ -1114,7 +1100,9 @@ if opts['reporting'] == true then if type(data) == 'userdata' then reporting_domain = nil else - reporting_domain = data + report_key = data + local tmp = rspamd_str_split(data, redis_keys.join_char) + reporting_domain = tmp[2] end if not reporting_domain then rspamd_logger.infox(cfg, 'No more domains to generate reports for') @@ -1123,13 +1111,14 @@ if opts['reporting'] == true then end end end + local idx_key = table.concat({redis_keys.index_prefix, want_period}, redis_keys.join_char) local ret = redis_make_request(ev_base, rspamd_config, nil, false, -- is write get_reporting_domain_cb, --callback 'SPOP', -- command - {redis_keys.dmarc_domains} + {idx_key} ) if not ret then rspamd_logger.errx(cfg, 'Unable to get DMARC domain') @@ -1138,8 +1127,15 @@ if opts['reporting'] == true then local function send_reports(time) rspamd_logger.infox(ev_base, 'sending reports ostensibly %1', time) pool:set_variable(VAR_NAME, time) - report_end = time - report_start = time - INTERVAL + local yesterday = os.date('*t', rspamd_util.get_time() - INTERVAL) + local today = os.date('*t', rspamd_util.get_time()) + report_start = os.time({year = yesterday.year, month = yesterday.month, day = yesterday.day, hour = 0}) + report_end = os.time({year = today.year, month = today.month, day = today.day, hour = 0}) + want_period = table.concat({ + yesterday.year, + string.format('%02d', yesterday.month), + string.format('%02d', yesterday.day) + }) get_reporting_domain() end -- Push reports at regular intervals