]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Preliminary DMARC reporting implementation 1624/head
authorAndrew Lewis <nerf@judo.za.org>
Tue, 9 May 2017 09:53:16 +0000 (11:53 +0200)
committerAndrew Lewis <nerf@judo.za.org>
Tue, 9 May 2017 09:53:16 +0000 (11:53 +0200)
src/plugins/lua/dmarc.lua

index 443b31e30f09ef3e725559fd6c80de4ad51e9e55..e3cc09bf08d18606e202f04923422f298b204796 100644 (file)
@@ -17,7 +17,11 @@ limitations under the License.
 
 -- Dmarc policy filter
 
+local hash = require "rspamd_cryptobox_hash"
 local rspamd_logger = require "rspamd_logger"
+local mempool = require "rspamd_mempool"
+local rspamd_tcp = require "rspamd_tcp"
+local rspamd_url = require "rspamd_url"
 local rspamd_util = require "rspamd_util"
 local check_local = false
 local check_authed = false
@@ -26,6 +30,47 @@ if confighelp then
   return
 end
 
+local N = 'dmarc'
+local statefile = string.format('%s/%s', rspamd_paths['DBDIR'], 'dmarc_reports_last_sent')
+local VAR_NAME = 'dmarc_reports_last_sent'
+local INTERVAL = 86400
+local pool = mempool.create()
+
+local report_settings = {
+  helo = 'rspamd',
+  smtp = '127.0.0.1',
+  smtp_port = 25,
+}
+local report_template = [[From: "Rspamd" <%s>
+To: %s
+Subject: Report Domain: %s
+       Submitter: %s
+       Report-ID: <%s>
+Date: %s
+MIME-Version: 1.0
+Message-ID: <%s>
+Content-Type: multipart/alternative;
+       boundary="----=_NextPart_000_024E_01CC9B0A.AFE54C00"
+
+This is a multipart message in MIME format.
+
+------=_NextPart_000_024E_01CC9B0A.AFE54C00
+Content-Type: text/plain; charset="us-ascii"
+Content-Transfer-Encoding: 7bit
+
+This is an aggregate report from %s.
+
+------=_NextPart_000_024E_01CC9B0A.AFE54C00
+Content-Type: text/xml
+Content-Transfer-Encoding: base64
+Content-Disposition: attachment;
+       filename="%s!%s!%s!%s.xml"
+
+]]
+local report_footer = [[
+
+------=_NextPart_000_024E_01CC9B0A.AFE54C00--]]
+
 local symbols = {
   spf_allow_symbol = 'R_SPF_ALLOW',
   spf_deny_symbol = 'R_SPF_FAIL',
@@ -50,16 +95,120 @@ local dmarc_symbols = {
   quarantine = 'DMARC_POLICY_QUARANTINE',
 }
 
+local redis_keys = {
+  dmarc_domains = 'dmarc_domains',
+  dmarc_domain = 'dmarc_%s',
+}
+
+local function gen_xml_grammar()
+  local lpeg = require 'lpeg'
+  local lt = lpeg.P('<') / '&lt;'
+  local gt = lpeg.P('>') / '&gt;'
+  local amp = lpeg.P('&') / '&amp;'
+  local quot = lpeg.P('"') / '&quot;'
+  local apos = lpeg.P("'") / '&apos;'
+  local special = lt + gt + amp + quot + apos
+  local grammar = lpeg.Cs((special + 1)^0)
+  return grammar
+end
+
+local xml_grammar = gen_xml_grammar()
+
+local function escape_xml(goo)
+  return xml_grammar:match(goo)
+end
+
 -- Default port for redis upstreams
 local redis_params = nil
-local dmarc_redis_key_prefix = "dmarc_"
 -- 2 days
-local dmarc_redis_key_expire = 60 * 60 * 24 * 2
 local dmarc_reporting = false
 local dmarc_actions = {}
 
 local E = {}
 
+local take_report_sha
+local take_report_script = [[
+local dmarc_domains_key = KEYS[1]
+local dmarc_domain_key = KEYS[2]
+local dmarc_domain = ARGV[1]
+local report = ARGV[2]
+redis.call('SADD', dmarc_domains_key, dmarc_domain)
+redis.call('LPUSH', dmarc_domain_key, report)
+]]
+
+local function redis_make_request(ev_base, cfg, key, is_write, callback, command, args)
+  if not ev_base or not redis_params or not callback or not command then
+    return false,nil,nil
+  end
+
+  local addr
+  local rspamd_redis = require "rspamd_redis"
+
+  if key then
+    if is_write then
+      addr = redis_params['write_servers']:get_upstream_by_hash(key)
+    else
+      addr = redis_params['read_servers']:get_upstream_by_hash(key)
+    end
+  else
+    if is_write then
+      addr = redis_params['write_servers']:get_upstream_master_slave(key)
+    else
+      addr = redis_params['read_servers']:get_upstream_round_robin(key)
+    end
+  end
+
+  if not addr then
+    rspamd_logger.errx(cfg, 'cannot select server to make redis request')
+  end
+
+  local options = {
+    ev_base = ev_base,
+    config = cfg,
+    callback = callback,
+    host = addr:get_addr(),
+    timeout = redis_params['timeout'],
+    cmd = command,
+    args = args
+  }
+
+  if redis_params['password'] then
+    options['password'] = redis_params['password']
+  end
+
+  if redis_params['db'] then
+    options['dbname'] = redis_params['db']
+  end
+
+  local ret,conn = rspamd_redis.make_request(options)
+  if not ret then
+    rspamd_logger.errx(cfg, 'cannot execute redis request')
+  end
+  return ret,conn,addr
+end
+
+local function load_scripts(cfg, ev_base)
+  local function redis_report_script_cb(err, data)
+    if err then
+      rspamd_logger.errx(cfg, 'DMARC report script loading failed: ' .. err)
+    else
+      take_report_sha = tostring(data)
+      rspamd_logger.infox(cfg, 'Loaded DMARC report script with SHA %s', take_report_sha)
+    end
+  end
+  local ret = redis_make_request(ev_base,
+    rspamd_config,
+    nil,
+    true, -- is write
+    redis_report_script_cb, --callback
+    'SCRIPT', -- command
+    {'LOAD', take_report_script}
+  )
+  if not ret then
+    rspamd_logger.errx(cfg, 'Unable to load DMARC report script')
+  end
+end
+
 local function gen_dmarc_grammar()
   local lpeg = require "lpeg"
   lpeg.locale(lpeg)
@@ -77,14 +226,19 @@ end
 
 local dmarc_grammar = gen_dmarc_grammar()
 
-local function dmarc_report(task, spf_ok, dkim_ok, disposition, sampled_out)
+local function dmarc_report(task, spf_ok, dkim_ok, disposition, sampled_out, hfromdom, spfdom, dres, spf_result)
   local ip = task:get_from_ip()
   if not ip:is_valid() then
     return nil
   end
-  local res = string.format('%d,%s,%s,%s,%s,%s', task:get_date(0),
-    ip:to_string(), tostring(spf_ok), tostring(dkim_ok),
-    disposition, (sampled_out and 'sampled_out' or ''))
+  local dkim_pass = table.concat(dres.pass or E, '|')
+  local dkim_fail = table.concat(dres.fail or E, '|')
+  local dkim_temperror = table.concat(dres.temperror or E, '|')
+  local dkim_permerror = table.concat(dres.permerror or E, '|')
+  local res = string.format('%d,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s',
+    task:get_date(0), ip:to_string(), spf_ok, dkim_ok,
+    disposition, (sampled_out and 'sampled_out' or ''), hfromdom,
+    dkim_pass, dkim_fail, dkim_temperror, dkim_permerror, spfdom, spf_result)
 
   return res
 end
@@ -97,16 +251,18 @@ local function dmarc_callback(task)
     end
   end
   local from = task:get_from(2)
-  local dmarc_domain
+  local hfromdom = ((from or E)[1] or E).domain
+  local dmarc_domain, efromdom, spf_domain
   local ip_addr = task:get_ip()
+  local dkim_results = {}
 
   if ((not check_authed and task:get_user()) or
       (not check_local and ip_addr and ip_addr:is_local())) then
     rspamd_logger.infox(task, "skip DMARC checks for local networks and authorized users");
     return
   end
-  if ((from or E)[1] or E).domain and ((from or E)[1] or E).domain ~= '' and not (from or E)[2] then
-    dmarc_domain = rspamd_util.get_tld(from[1]['domain'])
+  if hfromdom and hfromdom ~= '' and not (from or E)[2] then
+    dmarc_domain = rspamd_util.get_tld(hfromdom)
   elseif (from or E)[2] then
     task:insert_result(dmarc_symbols['na'], 1.0, 'Duplicate From header')
     return maybe_force_action('na')
@@ -121,10 +277,13 @@ local function dmarc_callback(task)
   local function dmarc_report_cb(err)
     if not err then
       rspamd_logger.infox(task, '<%1> dmarc report saved for %2',
-        task:get_message_id(), from[1]['domain'])
+        task:get_message_id(), hfromdom)
     else
+      if string.match(err, 'NOSCRIPT') then
+        load_scripts(rspamd_config, task:get_ev_base())
+      end
       rspamd_logger.errx(task, '<%1> dmarc report is not saved for %2: %3',
-        task:get_message_id(), from[1]['domain'], err)
+        task:get_message_id(), hfromdom, err)
     end
   end
 
@@ -216,15 +375,15 @@ local function dmarc_callback(task)
           local subdomain_policy = elts['sp']
           if subdomain_policy and lookup_domain == dmarc_domain then
             if (subdomain_policy == 'reject') then
-              if dmarc_domain ~= from[1]['domain'] then
+              if dmarc_domain ~= hfromdom then
                 dmarc_policy = 'reject'
               end
             elseif (subdomain_policy == 'quarantine') then
-              if dmarc_domain ~= from[1]['domain'] then
+              if dmarc_domain ~= hfromdom then
                 dmarc_policy = 'quarantine'
               end
             elseif (subdomain_policy == 'none') then
-              if dmarc_domain ~= from[1]['domain'] then
+              if dmarc_domain ~= hfromdom then
                 dmarc_policy = 'none'
               end
             elseif (subdomain_policy ~= 'none') then
@@ -270,16 +429,20 @@ local function dmarc_callback(task)
     -- Check dkim and spf symbols
     local spf_ok = false
     local dkim_ok = false
+    local dmarc_ok = false
+
     if task:has_symbol(symbols['spf_allow_symbol']) then
       local efrom = task:get_from(1)
-      if ((efrom or E)[1] or E).domain then
-        if strict_spf and rspamd_util.strequal_caseless(efrom[1]['domain'], from[1]['domain']) then
+      efromdom = ((efrom or E)[1] or E).domain
+      spf_domain = efromdom or task:get_helo()
+      if efromdom then
+        if strict_spf and rspamd_util.strequal_caseless(spf_domain, hfromdom) then
           spf_ok = true
         elseif strict_spf then
           table.insert(reason, "SPF not aligned (strict)")
         end
         if not strict_spf then
-          local spf_tld = rspamd_util.get_tld(efrom[1]['domain'])
+          local spf_tld = rspamd_util.get_tld(spf_domain)
           if rspamd_util.strequal_caseless(spf_tld, dmarc_domain) then
             spf_ok = true
           else
@@ -292,14 +455,16 @@ local function dmarc_callback(task)
     end
     local das = task:get_symbol(symbols['dkim_allow_symbol'])
     if ((das or E)[1] or E).options then
-      for _,dkim_domain in ipairs(das[1]['options']) do
-        if strict_dkim and rspamd_util.strequal_caseless(from[1]['domain'], dkim_domain) then
+      dkim_results.pass = {}
+      for _,domain in ipairs(das[1]['options']) do
+        table.insert(dkim_results.pass, domain)
+        if strict_dkim and rspamd_util.strequal_caseless(hfromdom, domain) then
           dkim_ok = true
         elseif strict_dkim then
           table.insert(reason, "DKIM not aligned (strict)")
         end
         if not strict_dkim then
-          local dkim_tld = rspamd_util.get_tld(dkim_domain)
+          local dkim_tld = rspamd_util.get_tld(domain)
           if rspamd_util.strequal_caseless(dkim_tld, dmarc_domain) then
             dkim_ok = true
           else
@@ -313,13 +478,20 @@ local function dmarc_callback(task)
 
     local disposition = 'none'
     local sampled_out = false
+    local spf_tmpfail, dkim_tmpfail
 
     if not (spf_ok or dkim_ok) then
       local reason_str = table.concat(reason, ", ")
       res = 1.0
-      local spf_tmpfail = task:get_symbol(symbols['spf_tempfail_symbol'])
-      local dkim_tmpfail = task:get_symbol(symbols['dkim_tempfail_symbol'])
+      spf_tmpfail = task:get_symbol(symbols['spf_tempfail_symbol'])
+      dkim_tmpfail = task:get_symbol(symbols['dkim_tempfail_symbol'])
       if (spf_tmpfail or dkim_tmpfail) then
+        if ((dkim_tmpfail or E)[1] or E).options then
+          dkim_results.tempfail = {}
+          for _,domain in ipairs(dkim_tmpfail[1]['options']) do
+            table.insert(dkim_results.tempfail, domain)
+          end
+        end
         task:insert_result(dmarc_symbols['dnsfail'], 1.0, lookup_domain .. ' : ' .. 'SPF/DKIM temp error', dmarc_policy)
         return maybe_force_action('dnsfail')
       end
@@ -344,27 +516,53 @@ local function dmarc_callback(task)
         task:insert_result(dmarc_symbols['softfail'], res, lookup_domain .. ' : ' .. reason_str, dmarc_policy)
       end
     else
+      dmarc_ok = true
       task:insert_result(dmarc_symbols['allow'], res, lookup_domain, dmarc_policy)
     end
 
     if rua and redis_params and dmarc_reporting then
+
+      local spf_result
+      if spf_ok then
+        spf_result = 'pass'
+      elseif spf_tmpfail then
+        spf_result = 'temperror'
+      else
+        if task:get_symbol(symbols.spf_deny_symbol) then
+          spf_result = 'fail'
+        elseif task:get_symbol(symbols.spf_softfail_symbol) then
+          spf_result = 'softfail'
+        elseif task:get_symbol(symbols.spf_neutral_symbol) then
+          spf_result = 'neutral'
+        elseif task:get_symbol(symbols.spf_softfail_symbol) then
+          spf_result = 'permerror'
+        else
+          spf_result = 'none'
+        end
+      end
+      local dkim_deny = ((task:get_symbol(symbols.dkim_deny_symbol) or E)[1] or E).options
+      if dkim_deny then
+        dkim_results.fail = {}
+        for _, domain in ipairs(dkim_deny) do
+          table.insert(dkim_results.fail, domain)
+        end
+      end
       -- Prepare and send redis report element
-      local redis_key = dmarc_redis_key_prefix .. from[1]['domain']
-      local report_data = dmarc_report(task, spf_ok, dkim_ok, disposition, sampled_out)
+      local dmarc_domain_key = string.format(redis_keys.dmarc_domain, hfromdom)
+      local report_data = dmarc_report(task, spf_ok and 'pass' or 'fail', dkim_ok and 'pass' or 'fail', dmarc_ok and 'pass' or 'fail', sampled_out,
+        hfromdom, spf_domain, dkim_results, spf_result)
 
       if report_data then
-        local ret,conn,_ = rspamd_redis_make_request(task,
+        local ret = rspamd_redis_make_request(task,
           redis_params, -- connect params
-          from[1]['domain'], -- hash key
+          hfromdom, -- hash key
           true, -- is write
           dmarc_report_cb, --callback
-          'LPUSH', -- command
-          {redis_key, report_data} -- arguments
+          'EVALSHA', -- command
+          {take_report_sha, 2, redis_keys.dmarc_domains, dmarc_domain_key, hfromdom, report_data} -- arguments
         )
-        if ret and conn then
-          conn:add_cmd('EXPIRE', {
-            redis_key, tostring(dmarc_redis_key_expire)
-          })
+        if not ret then
+          rspamd_logger.errx(task, 'Unable to schedule redis request')
         end
       end
     end
@@ -374,7 +572,7 @@ local function dmarc_callback(task)
   end
 
   -- Do initial request
-  local resolve_name = '_dmarc.' .. from[1]['domain']
+  local resolve_name = '_dmarc.' .. hfromdom
   task:get_resolver():resolve_txt({
     task=task,
     name = resolve_name,
@@ -406,27 +604,598 @@ if opts['symbols'] then
 end
 
 if opts['reporting'] == true then
-  dmarc_reporting = true
+  redis_params = rspamd_parse_redis_server('dmarc')
+  if not redis_params then
+    rspamd_logger.errx(rspamd_config, 'cannot parse servers parameter')
+  elseif not opts['send_reports'] then
+    dmarc_reporting = true
+    rspamd_config:add_on_load(function(cfg, ev_base, worker)
+      if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end
+      load_scripts(cfg, ev_base)
+    end)
+  else
+    dmarc_reporting = true
+    if type(opts['report_settings']) == 'table' then
+      for k, v in pairs(opts['report_settings']) do
+        report_settings[k] = v
+      end
+    end
+    for _, e in ipairs({'email', 'domain', 'org_name'}) do
+      if not report_settings[e] then
+        rspamd_logger.errx(rspamd_config, 'Missing required setting: report_settings.%s', e)
+        return
+      end
+    end
+    rspamd_config:add_on_load(function(cfg, ev_base, worker)
+      if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end
+      load_scripts(cfg, ev_base)
+      rspamd_config:register_finish_script(function ()
+        local stamp = pool:get_variable(VAR_NAME, 'double')
+        if not stamp then
+          rspamd_logger.warnx(rspamd_config, 'No last DMARC report information to persist to disk')
+          return
+        end
+        local f, err = io.open(statefile, 'w')
+        if err then
+          rspamd_logger.errx(rspamd_config, 'Unable to write statefile to disk: %s', err)
+          return
+        end
+        assert(f:write(pool:get_variable(VAR_NAME, 'double')))
+        assert(f:close())
+      end)
+      local get_reporting_domain, reporting_domain, report_start, report_end, report_id
+      local reporting_addr = {}
+      local domain_policy = {}
+      local to_verify = {}
+      local function entry_to_xml(data, count)
+        local buf = {
+          string.format(
+[[     <record>
+               <row>
+                       <source_ip>%s</source_ip>
+                       <count>%d</count>
+                       <policy_evaluated>
+                               <disposition>%s</disposition>
+                               <dkim>%s</dkim>
+                               <spf>%s</spf>
+                       </policy_evaluated>
+               </row>
+               <identifiers>
+                       <header_from>%s</header_from>
+               </identifiers>
+       </record>
+]], data.ip, count, data.disposition, data.dkim_disposition, data.spf_disposition, data.header_from),
+        }
+        if data.dkim_results[1] or (data.spf_result ~= '' and data.spf_domain ~= '') then
+          table.insert(buf, '\t\t<auth_results>\n')
+          for _, d in ipairs(data.dkim_results) do
+            table.insert(buf, string.format(
+              '\t\t\t<dkim>\n\t\t\t\t<domain>%s</domain>\n\t\t\t\t<result>%s</result>\n\t\t\t</dkim>\n',
+              d.domain, d.result))
+          end
+          if (data.spf_result ~= '' and data.spf_domain ~= '') then
+            table.insert(buf, string.format(
+              '\t\t\t<spf>\n\t\t\t\t<domain>%s</domain>\n\t\t\t\t<result>%s</result>\n\t\t\t</spf>\n',
+              data.spf_domain, data.spf_result))
+          end
+          table.insert(buf, '\t\t</auth_results>\n')
+        end
+        table.insert(buf, '\t<record>\n')
+        return table.concat(buf)
+      end
+      local function dmarc_report_xml()
+        local entries = {}
+        report_id = string.format('%s.%d.%d', reporting_domain, report_start, report_end)
+        rspamd_logger.debugm(N, rspamd_config, 'new report: %s', report_id)
+        local actions = {
+          push = function(data)
+            local counts = {}
+            local split = rspamd_str_split(data, ',')
+            local when = tonumber(split[1])
+            if when > report_end then
+              -- XXX: Replace data!
+              rspamd_logger.debugm(N, rspamd_config, 'report is newer than report_end: %1 %2', when, report_end)
+              return false
+            elseif when < report_start then
+              rspamd_logger.debugm(N, rspamd_config, 'report is older than report_start: %1 %2', when, report_start)
+              return true
+            elseif when >= report_start then
+              local h = hash.create()
+              for i = 2, #split do
+                h:update(split[i])
+              end
+              local bin = h:bin()
+              if counts[bin] then
+                counts[bin] = counts[bin] + 1
+              else
+                counts[bin] = 1
+              end
+              local count = counts[bin]
+              local row = {
+                ip = split[2],
+                spf_disposition = split[3],
+                dkim_disposition = split[4],
+                disposition = split[5],
+                override = split[6],
+                header_from = split[7],
+                dkim_results = {},
+                spf_domain = split[12],
+                spf_result = split[13],
+              }
+              if split[8] and split[8] ~= '' then
+                local tmp = rspamd_str_split(split[8], '|')
+                for _, d in ipairs(tmp) do
+                  table.insert(row.dkim_results, {domain = d, result = 'pass'})
+                end
+              end
+              if split[9] and split[9] ~= '' then
+                local tmp = rspamd_str_split(split[9], '|')
+                for _, d in ipairs(tmp) do
+                  table.insert(row.dkim_results, {domain = d, result = 'fail'})
+                end
+              end
+              if split[10] and split[10] ~= '' then
+                local tmp = rspamd_str_split(split[10], '|')
+                for _, d in ipairs(tmp) do
+                  table.insert(row.dkim_results, {domain = d, result = 'temperror'})
+                end
+              end
+              if split[11] and split[11] ~= '' then
+                local tmp = rspamd_str_split(split[11], '|')
+                for _, d in ipairs(tmp) do
+                  table.insert(row.dkim_results, {domain = d, result = 'permerror'})
+                end
+              end
+              entries[bin] = {[row] = count}
+              return true
+            end
+          end,
+          header = function()
+            return string.format(
+[[<?xml version="1.0" encoding="utf-8"?><feedback>
+       <report_metadata>
+               <org_name>%s</org_name>
+               <email>%s</email>
+               <report_id>%s</report_id>
+               <date_range>
+                       <begin>%d</begin>
+                       <end>%d</end>
+               </date_range>
+       </report_metadata>
+       <policy_published>
+               <domain>%s</domain>
+               <adkim>%s</adkim>
+               <aspf>%s</aspf>
+               <p>%s</p>
+               <sp>%s</sp>
+               <pct>%s</pct>
+       </policy_published>
+]], escape_xml(report_settings.org_name), escape_xml(report_settings.email), report_id, report_start, report_end,
+    reporting_domain, escape_xml(domain_policy.adkim), escape_xml(domain_policy.aspf), escape_xml(domain_policy.p),
+    escape_xml(domain_policy.sp), escape_xml(domain_policy.pct))
+          end,
+          footer = function()
+            return [[</feedback>]]
+          end,
+          entries = function()
+            local buf = {}
+            for _, e in pairs(entries) do
+              for k, v in pairs(e) do
+                table.insert(buf, entry_to_xml(k, v))
+              end
+            end
+            return table.concat(buf, '')
+          end,
+        }
+        return function(action, p)
+          local f = actions[action]
+          if not f then error('invalid action: ' .. action) end
+          return f(p)
+        end
+      end
+      local function send_report_via_email(xmlf)
+        local tmp_addr = reporting_addr
+        local encoded = rspamd_util.encode_base64(table.concat({xmlf('header'), xmlf('entries'), xmlf('footer')}, 78))
+        local function mail_cb(err, data, conn)
+          local function no_error(merr, mdata, wantcode)
+            wantcode = wantcode or '2'
+            if merr then
+              rspamd_logger.errx(ev_base, 'got error in tcp callback: %s', merr)
+              if conn then
+                conn:close()
+              end
+              return false
+            end
+            if mdata then
+              if type(mdata) ~= 'string' then
+                mdata = tostring(mdata)
+              end
+              if string.sub(mdata, 1, 1) ~= wantcode then
+                rspamd_logger.errx(ev_base, 'got bad smtp response: %s', mdata)
+                if conn then
+                  conn:close()
+                end
+                return false
+              end
+            else
+              rspamd_logger.errx(ev_base, 'no data')
+              if conn then
+                conn:close()
+              end
+              return false
+            end
+            return true
+          end
+          local function all_done_cb(merr, mdata)
+            if conn then
+              conn:close()
+            end
+            get_reporting_domain()
+            return true
+          end
+          local function quit_done_cb(merr, mdata)
+            conn:add_read(all_done_cb, '\r\n')
+          end
+          local function quit_cb(merr, mdata)
+            if no_error(merr, mdata) then
+              conn:add_write(quit_done_cb, 'QUIT\r\n')
+            end
+          end
+          local function pre_quit_cb(merr, mdata)
+            if no_error(merr, '2') then
+              conn:add_read(quit_cb, '\r\n')
+            end
+          end
+          local function data_done_cb(merr, mdata)
+            if no_error(merr, mdata, '3') then
+              local atmp = {}
+              for k in pairs(reporting_addr) do
+                table.insert(atmp, k)
+              end
+              local addr_string = table.concat(atmp, ', ')
+              local rhead = string.format(report_template, report_settings.email, addr_string,
+                reporting_domain, report_settings.domain, report_id, rspamd_util.time_to_string(rspamd_util.get_time()),
+                rspamd_util.random_hex(12) .. '@rspamd', report_settings.domain, report_settings.domain, reporting_domain,
+                report_start, report_end)
+              conn:add_write(pre_quit_cb, {rhead, encoded, report_footer, '\r\n.\r\n'})
+            end
+          end
+          local function data_cb(merr, mdata)
+            if no_error(merr, '2') then
+              conn:add_read(data_done_cb, '\r\n')
+            end
+          end
+          local function rcpt_done_cb(merr, mdata)
+            if no_error(merr, mdata) then
+              conn:add_write(data_cb, 'DATA\r\n')
+            end
+          end
+          local from_done_cb
+          local function rcpt_cb(merr, mdata)
+            if no_error(merr, '2') then
+              if tmp_addr[1] then
+                conn:add_read(from_done_cb, '\r\n')
+              else
+                conn:add_read(rcpt_done_cb, '\r\n')
+              end
+            end
+          end
+          from_done_cb = function(merr, mdata)
+            if no_error(merr, mdata) then
+              conn:add_write(rcpt_cb, {'RCPT TO: <', table.remove(tmp_addr), '>\r\n'})
+            end
+          end
+          local function from_cb(merr, mdata)
+            if no_error(merr, '2') then
+              conn:add_read(from_done_cb, '\r\n')
+            end
+          end
+            local function hello_done_cb(merr, mdata)
+            if no_error(merr, mdata) then
+              conn:add_write(from_cb, {'MAIL FROM: <', report_settings.email, '>\r\n'})
+            end
+          end
+          local function hello_cb(merr)
+            if no_error(merr, '2') then
+              conn:add_read(hello_done_cb, '\r\n')
+            end
+          end
+          if no_error(err, data) then
+            conn:add_write(hello_cb, {'HELO ', report_settings.helo, '\r\n'})
+          end
+        end
+        rspamd_tcp.request({
+          ev_base = ev_base,
+          callback = mail_cb,
+          stop_pattern = '\r\n',
+          host = report_settings.smtp,
+          port = report_settings.smtp_port,
+        })
+      end
+      local function make_report()
+        rspamd_logger.infox(ev_base, 'sending report for %s <%s>', reporting_domain, table.concat(reporting_addr, ','))
+        local dmarc_xml = dmarc_report_xml()
+        local dmarc_push_cb
+        local dmarc_domain_key = string.format(redis_keys.dmarc_domain, reporting_domain)
+        dmarc_push_cb = function(err, data)
+          if err then
+            rspamd_logger.errx(ev_base, 'Redis request failed: %s', err)
+            -- XXX: data is orphaned; replace key or delete data
+            get_reporting_domain()
+          elseif type(data) == 'string' then
+            if dmarc_xml('push', data) then
+              local ret = redis_make_request(ev_base,
+                rspamd_config,
+                nil,
+                false, -- is write
+                dmarc_push_cb, --callback
+                'LPOP', -- command
+                {dmarc_domain_key}
+              )
+              if not ret then
+                rspamd_logger.errx(ev_base, 'Failed to schedule redis request')
+                -- XXX: data is orphaned; replace key or delete data
+                get_reporting_domain()
+              end
+            else
+              send_report_via_email(dmarc_xml)
+            end
+          else
+            send_report_via_email(dmarc_xml)
+          end
+        end
+        local ret = redis_make_request(ev_base,
+          rspamd_config,
+          nil,
+          false, -- is write
+          dmarc_push_cb, --callback
+          'LPOP', -- command
+          {dmarc_domain_key}
+        )
+        if not ret then
+          rspamd_logger.errx(rspamd_config, 'Failed to schedule redis request')
+          -- XXX: data is orphaned; replace key or delete data
+          get_reporting_domain()
+        end
+      end
+      local function delete_reports()
+        local function delete_reports_cb(err)
+          if err then
+            rspamd_logger.errx(rspamd_config, 'Error deleting reports: %s', err)
+          end
+          rspamd_logger.infox(rspamd_config, 'Deleted reports for %s')
+          get_reporting_domain()
+        end
+        local dmarc_domain_key = string.format(redis_keys.dmarc_domain, reporting_domain)
+        local ret = redis_make_request(ev_base,
+          rspamd_config,
+          nil,
+          false, -- is write
+          delete_reports_cb, --callback
+          'DEL', -- command
+          {dmarc_domain_key}
+        )
+        if not ret then
+          rspamd_logger.errx(rspamd_config, 'Failed to schedule redis request')
+          get_reporting_domain()
+        end
+      end
+      local function verify_reporting_address()
+        local function verifier(test_addr, vdom)
+          local function verify_cb(resolver, to_resolve, results, err, _, authenticated)
+            if err then
+              if err == 'no records with this name' or err == 'requested record is not found' then
+                rspamd_logger.infox(rspamd_config, 'Reports to %s for %s not authorised', test_addr, reporting_domain)
+                to_verify[test_addr] = nil
+              else
+                rspamd_logger.errx(rspamd_config, 'Lookup error [%s]: %s', to_resolve, err)
+                -- XXX: retry?
+                delete_reports()
+              end
+            else
+              local is_authed = false
+              -- XXX: reporting address could be overidden
+              for _, r in ipairs(results) do
+                if string.match(r, 'v=DMARC1') then
+                  is_authed = true
+                  break
+                end
+              end
+              if not is_authed then
+                to_verify[test_addr] = nil
+                rspamd_logger.infox(rspamd_config, 'Reports to %s for %s not authorised', test_addr, reporting_domain)
+              else
+                to_verify[test_addr] = nil
+                reporting_addr[test_addr] = true
+              end
+              if not next(to_verify) then
+                if next(reporting_addr) then
+                  make_report()
+                else
+                  rspamd_logger.infox(rspamd_config, 'No valid reporting addresses for %s', reporting_domain)
+                  delete_reports()
+                end
+              end
+            end
+          end
+          rspamd_config:get_resolver():resolve_txt(nil, pool,
+            string.format('%s._report._dmarc.%s', reporting_domain, vdom), verify_cb)
+        end
+        for t, vdom in pairs(to_verify) do
+          verifier(t, vdom)
+        end
+      end
+      local function get_reporting_address()
+        local function check_addr_cb(resolver, to_resolve, results, err, _, authenticated)
+          if err then
+            if err == 'no records with this name' or err == 'requested record is not found' then
+              rspamd_logger.errx(rspamd_config, 'No DMARC record found for %s', reporting_domain)
+              delete_reports()
+            else
+              rspamd_logger.errx(rspamd_config, 'Lookup error [%s]: %s', to_resolve, err)
+              -- XXX: retry?
+              delete_reports()
+            end
+          else
+            local policy
+            local found_policy, failed_policy = false, false
+            for _, r in ipairs(results) do
+              local elts = dmarc_grammar:match(r)
+              if elts and found_policy then
+                failed_policy = true
+              elseif elts then
+                found_policy = true
+                policy = elts
+              end
+            end
+            if not found_policy then
+              rspamd_logger.errx(rspamd_config, 'No policy: %s', to_resolve)
+              delete_reports()
+            elseif failed_policy then
+              rspamd_logger.errx(rspamd_config, 'Duplicate policies: %s', to_resolve)
+              delete_reports()
+            elseif not policy['rua'] then
+              rspamd_logger.errx(rspamd_config, 'No reporting address: %s', to_resolve)
+              delete_reports()
+            else
+              local upool = mempool.create()
+              local split = rspamd_str_split(policy['rua'], ',')
+              for _, m in ipairs(split) do
+                local url = rspamd_url.create(upool, m)
+                if not url then
+                  rspamd_logger.errx(rspamd_config, 'Couldnt extract reporting address: %s', policy['rua'])
+                else
+                  local urlt = url:to_table()
+                  if urlt['protocol'] ~= 'mailto' then
+                    rspamd_logger.errx(rspamd_config, 'Invalid URL: %s', url)
+                  else
+                    if urlt['tld'] == rspamd_util.get_tld(reporting_domain) then
+                      reporting_addr[string.format('%s@%s', urlt['user'], urlt['host'])] = true
+                    else
+                      to_verify[string.format('%s@%s', urlt['user'], urlt['host'])] = urlt['host']
+                    end
+                  end
+                end
+              end
+              upool:destroy()
+              domain_policy['pct'] = policy['pct'] or 100
+              domain_policy['adkim'] = policy['adkim'] or 'r'
+              domain_policy['aspf'] = policy['aspf'] or 'r'
+              domain_policy['p'] = policy['p'] or 'none'
+              domain_policy['sp'] = policy['sp'] or 'none'
+              if next(to_verify) then
+                verify_reporting_address()
+              elseif next(reporting_addr) then
+                make_report()
+              else
+                rspamd_logger.errx(rspamd_config, 'No reporting address for %s', reporting_domain)
+                delete_reports()
+              end
+            end
+          end
+        end
+        rspamd_config:get_resolver():resolve_txt(nil, pool,
+          string.format('_dmarc.%s', reporting_domain), check_addr_cb)
+      end
+      get_reporting_domain = function()
+        reporting_domain = nil
+        reporting_addr = {}
+        domain_policy = {}
+        local function get_reporting_domain_cb(err, data)
+          if err then
+            rspamd_logger.errx(cfg, 'Unable to get DMARC domain: %s', err)
+          else
+            if type(data) == 'userdata' then
+              reporting_domain = nil
+            else
+              reporting_domain = data
+            end
+            if not reporting_domain then
+              rspamd_logger.infox(cfg, 'No more domains to generate reports for')
+            else
+              get_reporting_address()
+            end
+          end
+        end
+        local ret = redis_make_request(ev_base,
+          rspamd_config,
+          nil,
+          false, -- is write
+          get_reporting_domain_cb, --callback
+          'SPOP', -- command
+          {redis_keys.dmarc_domains}
+        )
+        if not ret then
+          rspamd_logger.errx(cfg, 'Unable to get DMARC domain')
+        end
+      end
+      local function send_reports(time)
+        rspamd_logger.infox(ev_base, 'sending reports ostensibly %1', time)
+        pool:set_variable(VAR_NAME, time)
+        report_end = time
+        report_start = time - INTERVAL
+        get_reporting_domain()
+      end
+      -- Push reports at regular intervals
+      local function schedule_regular_send()
+        rspamd_config:add_periodic(ev_base, INTERVAL, function ()
+          send_reports()
+          return true
+        end)
+      end
+      -- Push reports to backend and reschedule check
+      local function schedule_intermediate_send(when)
+        rspamd_config:add_periodic(ev_base, when, function ()
+          schedule_regular_send()
+          send_reports(rspamd_util.get_time())
+          return false
+        end)
+      end
+      -- Try read statefile on startup
+      local stamp
+      local f, err = io.open(statefile, 'r')
+      if err then
+        rspamd_logger.errx('Failed to open statefile: %s', err)
+      end
+      if f then
+        io.input(f)
+        stamp = tonumber(io.read())
+        pool:set_variable(VAR_NAME, stamp)
+      end
+      local time = rspamd_util.get_time()
+      if not stamp then
+        rspamd_logger.debugm(N, rspamd_config, 'No state found - sending reports immediately')
+        schedule_regular_send()
+        send_reports(time)
+        return
+      end
+      local delta = stamp - time + INTERVAL
+      if delta <= 0 then
+        rspamd_logger.debugm(N, rspamd_config, 'Last send is too old - sending reports immediately')
+        schedule_regular_send()
+        send_reports(time)
+        return
+      end
+      rspamd_logger.debugm(N, rspamd_config, 'Scheduling next send in %s seconds', delta)
+      schedule_intermediate_send(delta)
+    end)
+  end
 end
 if type(opts['actions']) == 'table' then
   dmarc_actions = opts['actions']
 end
-
-redis_params = rspamd_parse_redis_server('dmarc')
-if not redis_params then
-  rspamd_logger.infox(rspamd_config, 'cannot parse servers parameter')
-end
-
-if opts['key_prefix'] then
-  dmarc_redis_key_prefix = opts['key_prefix']
-end
-
-if opts['expire'] then
-  dmarc_redis_key_expire = opts['expire']
+if type(opts['report_settings']) == 'table' then
+  for k, v in pairs(opts['report_settings']) do
+    report_settings[k] = v
+  end
 end
-
-if opts['key_expire'] then
-  dmarc_redis_key_expire = opts['key_expire']
+if dmarc_reporting then
+  for _, e in ipairs({'email', 'domain', 'org_name'}) do
+    if not report_settings[e] then
+      rspamd_logger.errx(rspamd_config, 'Missing required setting: report_settings.%s', e)
+      return
+    end
+  end
 end
 
 -- Check spf and dkim sections for changed symbols