From 7f33530b75d015c71d239dcefa66408b86a6eb66 Mon Sep 17 00:00:00 2001 From: Andrew Lewis Date: Tue, 9 May 2017 11:53:16 +0200 Subject: [PATCH] [Feature] Preliminary DMARC reporting implementation --- src/plugins/lua/dmarc.lua | 867 +++++++++++++++++++++++++++++++++++--- 1 file changed, 818 insertions(+), 49 deletions(-) diff --git a/src/plugins/lua/dmarc.lua b/src/plugins/lua/dmarc.lua index 443b31e30..e3cc09bf0 100644 --- a/src/plugins/lua/dmarc.lua +++ b/src/plugins/lua/dmarc.lua @@ -17,7 +17,11 @@ limitations under the License. -- Dmarc policy filter +local hash = require "rspamd_cryptobox_hash" local rspamd_logger = require "rspamd_logger" +local mempool = require "rspamd_mempool" +local rspamd_tcp = require "rspamd_tcp" +local rspamd_url = require "rspamd_url" local rspamd_util = require "rspamd_util" local check_local = false local check_authed = false @@ -26,6 +30,47 @@ if confighelp then return end +local N = 'dmarc' +local statefile = string.format('%s/%s', rspamd_paths['DBDIR'], 'dmarc_reports_last_sent') +local VAR_NAME = 'dmarc_reports_last_sent' +local INTERVAL = 86400 +local pool = mempool.create() + +local report_settings = { + helo = 'rspamd', + smtp = '127.0.0.1', + smtp_port = 25, +} +local report_template = [[From: "Rspamd" <%s> +To: %s +Subject: Report Domain: %s + Submitter: %s + Report-ID: <%s> +Date: %s +MIME-Version: 1.0 +Message-ID: <%s> +Content-Type: multipart/alternative; + boundary="----=_NextPart_000_024E_01CC9B0A.AFE54C00" + +This is a multipart message in MIME format. + +------=_NextPart_000_024E_01CC9B0A.AFE54C00 +Content-Type: text/plain; charset="us-ascii" +Content-Transfer-Encoding: 7bit + +This is an aggregate report from %s. + +------=_NextPart_000_024E_01CC9B0A.AFE54C00 +Content-Type: text/xml +Content-Transfer-Encoding: base64 +Content-Disposition: attachment; + filename="%s!%s!%s!%s.xml" + +]] +local report_footer = [[ + +------=_NextPart_000_024E_01CC9B0A.AFE54C00--]] + local symbols = { spf_allow_symbol = 'R_SPF_ALLOW', spf_deny_symbol = 'R_SPF_FAIL', @@ -50,16 +95,120 @@ local dmarc_symbols = { quarantine = 'DMARC_POLICY_QUARANTINE', } +local redis_keys = { + dmarc_domains = 'dmarc_domains', + dmarc_domain = 'dmarc_%s', +} + +local function gen_xml_grammar() + local lpeg = require 'lpeg' + local lt = lpeg.P('<') / '<' + local gt = lpeg.P('>') / '>' + local amp = lpeg.P('&') / '&' + local quot = lpeg.P('"') / '"' + local apos = lpeg.P("'") / ''' + local special = lt + gt + amp + quot + apos + local grammar = lpeg.Cs((special + 1)^0) + return grammar +end + +local xml_grammar = gen_xml_grammar() + +local function escape_xml(goo) + return xml_grammar:match(goo) +end + -- Default port for redis upstreams local redis_params = nil -local dmarc_redis_key_prefix = "dmarc_" -- 2 days -local dmarc_redis_key_expire = 60 * 60 * 24 * 2 local dmarc_reporting = false local dmarc_actions = {} local E = {} +local take_report_sha +local take_report_script = [[ +local dmarc_domains_key = KEYS[1] +local dmarc_domain_key = KEYS[2] +local dmarc_domain = ARGV[1] +local report = ARGV[2] +redis.call('SADD', dmarc_domains_key, dmarc_domain) +redis.call('LPUSH', dmarc_domain_key, report) +]] + +local function redis_make_request(ev_base, cfg, key, is_write, callback, command, args) + if not ev_base or not redis_params or not callback or not command then + return false,nil,nil + end + + local addr + local rspamd_redis = require "rspamd_redis" + + if key then + if is_write then + addr = redis_params['write_servers']:get_upstream_by_hash(key) + else + addr = redis_params['read_servers']:get_upstream_by_hash(key) + end + else + if is_write then + addr = redis_params['write_servers']:get_upstream_master_slave(key) + else + addr = redis_params['read_servers']:get_upstream_round_robin(key) + end + end + + if not addr then + rspamd_logger.errx(cfg, 'cannot select server to make redis request') + end + + local options = { + ev_base = ev_base, + config = cfg, + callback = callback, + host = addr:get_addr(), + timeout = redis_params['timeout'], + cmd = command, + args = args + } + + if redis_params['password'] then + options['password'] = redis_params['password'] + end + + if redis_params['db'] then + options['dbname'] = redis_params['db'] + end + + local ret,conn = rspamd_redis.make_request(options) + if not ret then + rspamd_logger.errx(cfg, 'cannot execute redis request') + end + return ret,conn,addr +end + +local function load_scripts(cfg, ev_base) + local function redis_report_script_cb(err, data) + if err then + rspamd_logger.errx(cfg, 'DMARC report script loading failed: ' .. err) + else + take_report_sha = tostring(data) + rspamd_logger.infox(cfg, 'Loaded DMARC report script with SHA %s', take_report_sha) + end + end + local ret = redis_make_request(ev_base, + rspamd_config, + nil, + true, -- is write + redis_report_script_cb, --callback + 'SCRIPT', -- command + {'LOAD', take_report_script} + ) + if not ret then + rspamd_logger.errx(cfg, 'Unable to load DMARC report script') + end +end + local function gen_dmarc_grammar() local lpeg = require "lpeg" lpeg.locale(lpeg) @@ -77,14 +226,19 @@ end local dmarc_grammar = gen_dmarc_grammar() -local function dmarc_report(task, spf_ok, dkim_ok, disposition, sampled_out) +local function dmarc_report(task, spf_ok, dkim_ok, disposition, sampled_out, hfromdom, spfdom, dres, spf_result) local ip = task:get_from_ip() if not ip:is_valid() then return nil end - local res = string.format('%d,%s,%s,%s,%s,%s', task:get_date(0), - ip:to_string(), tostring(spf_ok), tostring(dkim_ok), - disposition, (sampled_out and 'sampled_out' or '')) + local dkim_pass = table.concat(dres.pass or E, '|') + local dkim_fail = table.concat(dres.fail or E, '|') + local dkim_temperror = table.concat(dres.temperror or E, '|') + local dkim_permerror = table.concat(dres.permerror or E, '|') + local res = string.format('%d,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s', + task:get_date(0), ip:to_string(), spf_ok, dkim_ok, + disposition, (sampled_out and 'sampled_out' or ''), hfromdom, + dkim_pass, dkim_fail, dkim_temperror, dkim_permerror, spfdom, spf_result) return res end @@ -97,16 +251,18 @@ local function dmarc_callback(task) end end local from = task:get_from(2) - local dmarc_domain + local hfromdom = ((from or E)[1] or E).domain + local dmarc_domain, efromdom, spf_domain local ip_addr = task:get_ip() + local dkim_results = {} if ((not check_authed and task:get_user()) or (not check_local and ip_addr and ip_addr:is_local())) then rspamd_logger.infox(task, "skip DMARC checks for local networks and authorized users"); return end - if ((from or E)[1] or E).domain and ((from or E)[1] or E).domain ~= '' and not (from or E)[2] then - dmarc_domain = rspamd_util.get_tld(from[1]['domain']) + if hfromdom and hfromdom ~= '' and not (from or E)[2] then + dmarc_domain = rspamd_util.get_tld(hfromdom) elseif (from or E)[2] then task:insert_result(dmarc_symbols['na'], 1.0, 'Duplicate From header') return maybe_force_action('na') @@ -121,10 +277,13 @@ local function dmarc_callback(task) local function dmarc_report_cb(err) if not err then rspamd_logger.infox(task, '<%1> dmarc report saved for %2', - task:get_message_id(), from[1]['domain']) + task:get_message_id(), hfromdom) else + if string.match(err, 'NOSCRIPT') then + load_scripts(rspamd_config, task:get_ev_base()) + end rspamd_logger.errx(task, '<%1> dmarc report is not saved for %2: %3', - task:get_message_id(), from[1]['domain'], err) + task:get_message_id(), hfromdom, err) end end @@ -216,15 +375,15 @@ local function dmarc_callback(task) local subdomain_policy = elts['sp'] if subdomain_policy and lookup_domain == dmarc_domain then if (subdomain_policy == 'reject') then - if dmarc_domain ~= from[1]['domain'] then + if dmarc_domain ~= hfromdom then dmarc_policy = 'reject' end elseif (subdomain_policy == 'quarantine') then - if dmarc_domain ~= from[1]['domain'] then + if dmarc_domain ~= hfromdom then dmarc_policy = 'quarantine' end elseif (subdomain_policy == 'none') then - if dmarc_domain ~= from[1]['domain'] then + if dmarc_domain ~= hfromdom then dmarc_policy = 'none' end elseif (subdomain_policy ~= 'none') then @@ -270,16 +429,20 @@ local function dmarc_callback(task) -- Check dkim and spf symbols local spf_ok = false local dkim_ok = false + local dmarc_ok = false + if task:has_symbol(symbols['spf_allow_symbol']) then local efrom = task:get_from(1) - if ((efrom or E)[1] or E).domain then - if strict_spf and rspamd_util.strequal_caseless(efrom[1]['domain'], from[1]['domain']) then + efromdom = ((efrom or E)[1] or E).domain + spf_domain = efromdom or task:get_helo() + if efromdom then + if strict_spf and rspamd_util.strequal_caseless(spf_domain, hfromdom) then spf_ok = true elseif strict_spf then table.insert(reason, "SPF not aligned (strict)") end if not strict_spf then - local spf_tld = rspamd_util.get_tld(efrom[1]['domain']) + local spf_tld = rspamd_util.get_tld(spf_domain) if rspamd_util.strequal_caseless(spf_tld, dmarc_domain) then spf_ok = true else @@ -292,14 +455,16 @@ local function dmarc_callback(task) end local das = task:get_symbol(symbols['dkim_allow_symbol']) if ((das or E)[1] or E).options then - for _,dkim_domain in ipairs(das[1]['options']) do - if strict_dkim and rspamd_util.strequal_caseless(from[1]['domain'], dkim_domain) then + dkim_results.pass = {} + for _,domain in ipairs(das[1]['options']) do + table.insert(dkim_results.pass, domain) + if strict_dkim and rspamd_util.strequal_caseless(hfromdom, domain) then dkim_ok = true elseif strict_dkim then table.insert(reason, "DKIM not aligned (strict)") end if not strict_dkim then - local dkim_tld = rspamd_util.get_tld(dkim_domain) + local dkim_tld = rspamd_util.get_tld(domain) if rspamd_util.strequal_caseless(dkim_tld, dmarc_domain) then dkim_ok = true else @@ -313,13 +478,20 @@ local function dmarc_callback(task) local disposition = 'none' local sampled_out = false + local spf_tmpfail, dkim_tmpfail if not (spf_ok or dkim_ok) then local reason_str = table.concat(reason, ", ") res = 1.0 - local spf_tmpfail = task:get_symbol(symbols['spf_tempfail_symbol']) - local dkim_tmpfail = task:get_symbol(symbols['dkim_tempfail_symbol']) + spf_tmpfail = task:get_symbol(symbols['spf_tempfail_symbol']) + dkim_tmpfail = task:get_symbol(symbols['dkim_tempfail_symbol']) if (spf_tmpfail or dkim_tmpfail) then + if ((dkim_tmpfail or E)[1] or E).options then + dkim_results.tempfail = {} + for _,domain in ipairs(dkim_tmpfail[1]['options']) do + table.insert(dkim_results.tempfail, domain) + end + end task:insert_result(dmarc_symbols['dnsfail'], 1.0, lookup_domain .. ' : ' .. 'SPF/DKIM temp error', dmarc_policy) return maybe_force_action('dnsfail') end @@ -344,27 +516,53 @@ local function dmarc_callback(task) task:insert_result(dmarc_symbols['softfail'], res, lookup_domain .. ' : ' .. reason_str, dmarc_policy) end else + dmarc_ok = true task:insert_result(dmarc_symbols['allow'], res, lookup_domain, dmarc_policy) end if rua and redis_params and dmarc_reporting then + + local spf_result + if spf_ok then + spf_result = 'pass' + elseif spf_tmpfail then + spf_result = 'temperror' + else + if task:get_symbol(symbols.spf_deny_symbol) then + spf_result = 'fail' + elseif task:get_symbol(symbols.spf_softfail_symbol) then + spf_result = 'softfail' + elseif task:get_symbol(symbols.spf_neutral_symbol) then + spf_result = 'neutral' + elseif task:get_symbol(symbols.spf_softfail_symbol) then + spf_result = 'permerror' + else + spf_result = 'none' + end + end + local dkim_deny = ((task:get_symbol(symbols.dkim_deny_symbol) or E)[1] or E).options + if dkim_deny then + dkim_results.fail = {} + for _, domain in ipairs(dkim_deny) do + table.insert(dkim_results.fail, domain) + end + end -- Prepare and send redis report element - local redis_key = dmarc_redis_key_prefix .. from[1]['domain'] - local report_data = dmarc_report(task, spf_ok, dkim_ok, disposition, sampled_out) + local dmarc_domain_key = string.format(redis_keys.dmarc_domain, hfromdom) + local report_data = dmarc_report(task, spf_ok and 'pass' or 'fail', dkim_ok and 'pass' or 'fail', dmarc_ok and 'pass' or 'fail', sampled_out, + hfromdom, spf_domain, dkim_results, spf_result) if report_data then - local ret,conn,_ = rspamd_redis_make_request(task, + local ret = rspamd_redis_make_request(task, redis_params, -- connect params - from[1]['domain'], -- hash key + hfromdom, -- hash key true, -- is write dmarc_report_cb, --callback - 'LPUSH', -- command - {redis_key, report_data} -- arguments + 'EVALSHA', -- command + {take_report_sha, 2, redis_keys.dmarc_domains, dmarc_domain_key, hfromdom, report_data} -- arguments ) - if ret and conn then - conn:add_cmd('EXPIRE', { - redis_key, tostring(dmarc_redis_key_expire) - }) + if not ret then + rspamd_logger.errx(task, 'Unable to schedule redis request') end end end @@ -374,7 +572,7 @@ local function dmarc_callback(task) end -- Do initial request - local resolve_name = '_dmarc.' .. from[1]['domain'] + local resolve_name = '_dmarc.' .. hfromdom task:get_resolver():resolve_txt({ task=task, name = resolve_name, @@ -406,27 +604,598 @@ if opts['symbols'] then end if opts['reporting'] == true then - dmarc_reporting = true + redis_params = rspamd_parse_redis_server('dmarc') + if not redis_params then + rspamd_logger.errx(rspamd_config, 'cannot parse servers parameter') + elseif not opts['send_reports'] then + dmarc_reporting = true + rspamd_config:add_on_load(function(cfg, ev_base, worker) + if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end + load_scripts(cfg, ev_base) + end) + else + dmarc_reporting = true + if type(opts['report_settings']) == 'table' then + for k, v in pairs(opts['report_settings']) do + report_settings[k] = v + end + end + for _, e in ipairs({'email', 'domain', 'org_name'}) do + if not report_settings[e] then + rspamd_logger.errx(rspamd_config, 'Missing required setting: report_settings.%s', e) + return + end + end + rspamd_config:add_on_load(function(cfg, ev_base, worker) + if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end + load_scripts(cfg, ev_base) + rspamd_config:register_finish_script(function () + local stamp = pool:get_variable(VAR_NAME, 'double') + if not stamp then + rspamd_logger.warnx(rspamd_config, 'No last DMARC report information to persist to disk') + return + end + local f, err = io.open(statefile, 'w') + if err then + rspamd_logger.errx(rspamd_config, 'Unable to write statefile to disk: %s', err) + return + end + assert(f:write(pool:get_variable(VAR_NAME, 'double'))) + assert(f:close()) + end) + local get_reporting_domain, reporting_domain, report_start, report_end, report_id + local reporting_addr = {} + local domain_policy = {} + local to_verify = {} + local function entry_to_xml(data, count) + local buf = { + string.format( +[[ + + %s + %d + + %s + %s + %s + + + + %s + + +]], data.ip, count, data.disposition, data.dkim_disposition, data.spf_disposition, data.header_from), + } + if data.dkim_results[1] or (data.spf_result ~= '' and data.spf_domain ~= '') then + table.insert(buf, '\t\t\n') + for _, d in ipairs(data.dkim_results) do + table.insert(buf, string.format( + '\t\t\t\n\t\t\t\t%s\n\t\t\t\t%s\n\t\t\t\n', + d.domain, d.result)) + end + if (data.spf_result ~= '' and data.spf_domain ~= '') then + table.insert(buf, string.format( + '\t\t\t\n\t\t\t\t%s\n\t\t\t\t%s\n\t\t\t\n', + data.spf_domain, data.spf_result)) + end + table.insert(buf, '\t\t\n') + end + table.insert(buf, '\t\n') + return table.concat(buf) + end + local function dmarc_report_xml() + local entries = {} + report_id = string.format('%s.%d.%d', reporting_domain, report_start, report_end) + rspamd_logger.debugm(N, rspamd_config, 'new report: %s', report_id) + local actions = { + push = function(data) + local counts = {} + local split = rspamd_str_split(data, ',') + local when = tonumber(split[1]) + if when > report_end then + -- XXX: Replace data! + rspamd_logger.debugm(N, rspamd_config, 'report is newer than report_end: %1 %2', when, report_end) + return false + elseif when < report_start then + rspamd_logger.debugm(N, rspamd_config, 'report is older than report_start: %1 %2', when, report_start) + return true + elseif when >= report_start then + local h = hash.create() + for i = 2, #split do + h:update(split[i]) + end + local bin = h:bin() + if counts[bin] then + counts[bin] = counts[bin] + 1 + else + counts[bin] = 1 + end + local count = counts[bin] + local row = { + ip = split[2], + spf_disposition = split[3], + dkim_disposition = split[4], + disposition = split[5], + override = split[6], + header_from = split[7], + dkim_results = {}, + spf_domain = split[12], + spf_result = split[13], + } + if split[8] and split[8] ~= '' then + local tmp = rspamd_str_split(split[8], '|') + for _, d in ipairs(tmp) do + table.insert(row.dkim_results, {domain = d, result = 'pass'}) + end + end + if split[9] and split[9] ~= '' then + local tmp = rspamd_str_split(split[9], '|') + for _, d in ipairs(tmp) do + table.insert(row.dkim_results, {domain = d, result = 'fail'}) + end + end + if split[10] and split[10] ~= '' then + local tmp = rspamd_str_split(split[10], '|') + for _, d in ipairs(tmp) do + table.insert(row.dkim_results, {domain = d, result = 'temperror'}) + end + end + if split[11] and split[11] ~= '' then + local tmp = rspamd_str_split(split[11], '|') + for _, d in ipairs(tmp) do + table.insert(row.dkim_results, {domain = d, result = 'permerror'}) + end + end + entries[bin] = {[row] = count} + return true + end + end, + header = function() + return string.format( +[[ + + %s + %s + %s + + %d + %d + + + + %s + %s + %s +

%s

+ %s + %s +
+]], escape_xml(report_settings.org_name), escape_xml(report_settings.email), report_id, report_start, report_end, + reporting_domain, escape_xml(domain_policy.adkim), escape_xml(domain_policy.aspf), escape_xml(domain_policy.p), + escape_xml(domain_policy.sp), escape_xml(domain_policy.pct)) + end, + footer = function() + return [[
]] + end, + entries = function() + local buf = {} + for _, e in pairs(entries) do + for k, v in pairs(e) do + table.insert(buf, entry_to_xml(k, v)) + end + end + return table.concat(buf, '') + end, + } + return function(action, p) + local f = actions[action] + if not f then error('invalid action: ' .. action) end + return f(p) + end + end + local function send_report_via_email(xmlf) + local tmp_addr = reporting_addr + local encoded = rspamd_util.encode_base64(table.concat({xmlf('header'), xmlf('entries'), xmlf('footer')}, 78)) + local function mail_cb(err, data, conn) + local function no_error(merr, mdata, wantcode) + wantcode = wantcode or '2' + if merr then + rspamd_logger.errx(ev_base, 'got error in tcp callback: %s', merr) + if conn then + conn:close() + end + return false + end + if mdata then + if type(mdata) ~= 'string' then + mdata = tostring(mdata) + end + if string.sub(mdata, 1, 1) ~= wantcode then + rspamd_logger.errx(ev_base, 'got bad smtp response: %s', mdata) + if conn then + conn:close() + end + return false + end + else + rspamd_logger.errx(ev_base, 'no data') + if conn then + conn:close() + end + return false + end + return true + end + local function all_done_cb(merr, mdata) + if conn then + conn:close() + end + get_reporting_domain() + return true + end + local function quit_done_cb(merr, mdata) + conn:add_read(all_done_cb, '\r\n') + end + local function quit_cb(merr, mdata) + if no_error(merr, mdata) then + conn:add_write(quit_done_cb, 'QUIT\r\n') + end + end + local function pre_quit_cb(merr, mdata) + if no_error(merr, '2') then + conn:add_read(quit_cb, '\r\n') + end + end + local function data_done_cb(merr, mdata) + if no_error(merr, mdata, '3') then + local atmp = {} + for k in pairs(reporting_addr) do + table.insert(atmp, k) + end + local addr_string = table.concat(atmp, ', ') + local rhead = string.format(report_template, report_settings.email, addr_string, + reporting_domain, report_settings.domain, report_id, rspamd_util.time_to_string(rspamd_util.get_time()), + rspamd_util.random_hex(12) .. '@rspamd', report_settings.domain, report_settings.domain, reporting_domain, + report_start, report_end) + conn:add_write(pre_quit_cb, {rhead, encoded, report_footer, '\r\n.\r\n'}) + end + end + local function data_cb(merr, mdata) + if no_error(merr, '2') then + conn:add_read(data_done_cb, '\r\n') + end + end + local function rcpt_done_cb(merr, mdata) + if no_error(merr, mdata) then + conn:add_write(data_cb, 'DATA\r\n') + end + end + local from_done_cb + local function rcpt_cb(merr, mdata) + if no_error(merr, '2') then + if tmp_addr[1] then + conn:add_read(from_done_cb, '\r\n') + else + conn:add_read(rcpt_done_cb, '\r\n') + end + end + end + from_done_cb = function(merr, mdata) + if no_error(merr, mdata) then + conn:add_write(rcpt_cb, {'RCPT TO: <', table.remove(tmp_addr), '>\r\n'}) + end + end + local function from_cb(merr, mdata) + if no_error(merr, '2') then + conn:add_read(from_done_cb, '\r\n') + end + end + local function hello_done_cb(merr, mdata) + if no_error(merr, mdata) then + conn:add_write(from_cb, {'MAIL FROM: <', report_settings.email, '>\r\n'}) + end + end + local function hello_cb(merr) + if no_error(merr, '2') then + conn:add_read(hello_done_cb, '\r\n') + end + end + if no_error(err, data) then + conn:add_write(hello_cb, {'HELO ', report_settings.helo, '\r\n'}) + end + end + rspamd_tcp.request({ + ev_base = ev_base, + callback = mail_cb, + stop_pattern = '\r\n', + host = report_settings.smtp, + port = report_settings.smtp_port, + }) + end + local function make_report() + rspamd_logger.infox(ev_base, 'sending report for %s <%s>', reporting_domain, table.concat(reporting_addr, ',')) + local dmarc_xml = dmarc_report_xml() + local dmarc_push_cb + local dmarc_domain_key = string.format(redis_keys.dmarc_domain, reporting_domain) + dmarc_push_cb = function(err, data) + if err then + rspamd_logger.errx(ev_base, 'Redis request failed: %s', err) + -- XXX: data is orphaned; replace key or delete data + get_reporting_domain() + elseif type(data) == 'string' then + if dmarc_xml('push', data) then + local ret = redis_make_request(ev_base, + rspamd_config, + nil, + false, -- is write + dmarc_push_cb, --callback + 'LPOP', -- command + {dmarc_domain_key} + ) + if not ret then + rspamd_logger.errx(ev_base, 'Failed to schedule redis request') + -- XXX: data is orphaned; replace key or delete data + get_reporting_domain() + end + else + send_report_via_email(dmarc_xml) + end + else + send_report_via_email(dmarc_xml) + end + end + local ret = redis_make_request(ev_base, + rspamd_config, + nil, + false, -- is write + dmarc_push_cb, --callback + 'LPOP', -- command + {dmarc_domain_key} + ) + if not ret then + rspamd_logger.errx(rspamd_config, 'Failed to schedule redis request') + -- XXX: data is orphaned; replace key or delete data + get_reporting_domain() + end + end + local function delete_reports() + local function delete_reports_cb(err) + if err then + rspamd_logger.errx(rspamd_config, 'Error deleting reports: %s', err) + end + rspamd_logger.infox(rspamd_config, 'Deleted reports for %s') + get_reporting_domain() + end + local dmarc_domain_key = string.format(redis_keys.dmarc_domain, reporting_domain) + local ret = redis_make_request(ev_base, + rspamd_config, + nil, + false, -- is write + delete_reports_cb, --callback + 'DEL', -- command + {dmarc_domain_key} + ) + if not ret then + rspamd_logger.errx(rspamd_config, 'Failed to schedule redis request') + get_reporting_domain() + end + end + local function verify_reporting_address() + local function verifier(test_addr, vdom) + local function verify_cb(resolver, to_resolve, results, err, _, authenticated) + if err then + if err == 'no records with this name' or err == 'requested record is not found' then + rspamd_logger.infox(rspamd_config, 'Reports to %s for %s not authorised', test_addr, reporting_domain) + to_verify[test_addr] = nil + else + rspamd_logger.errx(rspamd_config, 'Lookup error [%s]: %s', to_resolve, err) + -- XXX: retry? + delete_reports() + end + else + local is_authed = false + -- XXX: reporting address could be overidden + for _, r in ipairs(results) do + if string.match(r, 'v=DMARC1') then + is_authed = true + break + end + end + if not is_authed then + to_verify[test_addr] = nil + rspamd_logger.infox(rspamd_config, 'Reports to %s for %s not authorised', test_addr, reporting_domain) + else + to_verify[test_addr] = nil + reporting_addr[test_addr] = true + end + if not next(to_verify) then + if next(reporting_addr) then + make_report() + else + rspamd_logger.infox(rspamd_config, 'No valid reporting addresses for %s', reporting_domain) + delete_reports() + end + end + end + end + rspamd_config:get_resolver():resolve_txt(nil, pool, + string.format('%s._report._dmarc.%s', reporting_domain, vdom), verify_cb) + end + for t, vdom in pairs(to_verify) do + verifier(t, vdom) + end + end + local function get_reporting_address() + local function check_addr_cb(resolver, to_resolve, results, err, _, authenticated) + if err then + if err == 'no records with this name' or err == 'requested record is not found' then + rspamd_logger.errx(rspamd_config, 'No DMARC record found for %s', reporting_domain) + delete_reports() + else + rspamd_logger.errx(rspamd_config, 'Lookup error [%s]: %s', to_resolve, err) + -- XXX: retry? + delete_reports() + end + else + local policy + local found_policy, failed_policy = false, false + for _, r in ipairs(results) do + local elts = dmarc_grammar:match(r) + if elts and found_policy then + failed_policy = true + elseif elts then + found_policy = true + policy = elts + end + end + if not found_policy then + rspamd_logger.errx(rspamd_config, 'No policy: %s', to_resolve) + delete_reports() + elseif failed_policy then + rspamd_logger.errx(rspamd_config, 'Duplicate policies: %s', to_resolve) + delete_reports() + elseif not policy['rua'] then + rspamd_logger.errx(rspamd_config, 'No reporting address: %s', to_resolve) + delete_reports() + else + local upool = mempool.create() + local split = rspamd_str_split(policy['rua'], ',') + for _, m in ipairs(split) do + local url = rspamd_url.create(upool, m) + if not url then + rspamd_logger.errx(rspamd_config, 'Couldnt extract reporting address: %s', policy['rua']) + else + local urlt = url:to_table() + if urlt['protocol'] ~= 'mailto' then + rspamd_logger.errx(rspamd_config, 'Invalid URL: %s', url) + else + if urlt['tld'] == rspamd_util.get_tld(reporting_domain) then + reporting_addr[string.format('%s@%s', urlt['user'], urlt['host'])] = true + else + to_verify[string.format('%s@%s', urlt['user'], urlt['host'])] = urlt['host'] + end + end + end + end + upool:destroy() + domain_policy['pct'] = policy['pct'] or 100 + domain_policy['adkim'] = policy['adkim'] or 'r' + domain_policy['aspf'] = policy['aspf'] or 'r' + domain_policy['p'] = policy['p'] or 'none' + domain_policy['sp'] = policy['sp'] or 'none' + if next(to_verify) then + verify_reporting_address() + elseif next(reporting_addr) then + make_report() + else + rspamd_logger.errx(rspamd_config, 'No reporting address for %s', reporting_domain) + delete_reports() + end + end + end + end + rspamd_config:get_resolver():resolve_txt(nil, pool, + string.format('_dmarc.%s', reporting_domain), check_addr_cb) + end + get_reporting_domain = function() + reporting_domain = nil + reporting_addr = {} + domain_policy = {} + local function get_reporting_domain_cb(err, data) + if err then + rspamd_logger.errx(cfg, 'Unable to get DMARC domain: %s', err) + else + if type(data) == 'userdata' then + reporting_domain = nil + else + reporting_domain = data + end + if not reporting_domain then + rspamd_logger.infox(cfg, 'No more domains to generate reports for') + else + get_reporting_address() + end + end + end + local ret = redis_make_request(ev_base, + rspamd_config, + nil, + false, -- is write + get_reporting_domain_cb, --callback + 'SPOP', -- command + {redis_keys.dmarc_domains} + ) + if not ret then + rspamd_logger.errx(cfg, 'Unable to get DMARC domain') + end + end + local function send_reports(time) + rspamd_logger.infox(ev_base, 'sending reports ostensibly %1', time) + pool:set_variable(VAR_NAME, time) + report_end = time + report_start = time - INTERVAL + get_reporting_domain() + end + -- Push reports at regular intervals + local function schedule_regular_send() + rspamd_config:add_periodic(ev_base, INTERVAL, function () + send_reports() + return true + end) + end + -- Push reports to backend and reschedule check + local function schedule_intermediate_send(when) + rspamd_config:add_periodic(ev_base, when, function () + schedule_regular_send() + send_reports(rspamd_util.get_time()) + return false + end) + end + -- Try read statefile on startup + local stamp + local f, err = io.open(statefile, 'r') + if err then + rspamd_logger.errx('Failed to open statefile: %s', err) + end + if f then + io.input(f) + stamp = tonumber(io.read()) + pool:set_variable(VAR_NAME, stamp) + end + local time = rspamd_util.get_time() + if not stamp then + rspamd_logger.debugm(N, rspamd_config, 'No state found - sending reports immediately') + schedule_regular_send() + send_reports(time) + return + end + local delta = stamp - time + INTERVAL + if delta <= 0 then + rspamd_logger.debugm(N, rspamd_config, 'Last send is too old - sending reports immediately') + schedule_regular_send() + send_reports(time) + return + end + rspamd_logger.debugm(N, rspamd_config, 'Scheduling next send in %s seconds', delta) + schedule_intermediate_send(delta) + end) + end end if type(opts['actions']) == 'table' then dmarc_actions = opts['actions'] end - -redis_params = rspamd_parse_redis_server('dmarc') -if not redis_params then - rspamd_logger.infox(rspamd_config, 'cannot parse servers parameter') -end - -if opts['key_prefix'] then - dmarc_redis_key_prefix = opts['key_prefix'] -end - -if opts['expire'] then - dmarc_redis_key_expire = opts['expire'] +if type(opts['report_settings']) == 'table' then + for k, v in pairs(opts['report_settings']) do + report_settings[k] = v + end end - -if opts['key_expire'] then - dmarc_redis_key_expire = opts['key_expire'] +if dmarc_reporting then + for _, e in ipairs({'email', 'domain', 'org_name'}) do + if not report_settings[e] then + rspamd_logger.errx(rspamd_config, 'Missing required setting: report_settings.%s', e) + return + end + end end -- Check spf and dkim sections for changed symbols -- 2.39.5