]> source.dussan.org Git - rspamd.git/commitdiff
[Minor] DMARC reporting: rework backend 1636/head
authorAndrew Lewis <nerf@judo.za.org>
Thu, 11 May 2017 13:52:55 +0000 (15:52 +0200)
committerAndrew Lewis <nerf@judo.za.org>
Thu, 11 May 2017 13:53:58 +0000 (15:53 +0200)
 - Also fix deletion
 - Support redirecting reports (useful for evaluation)
 - Deal with possibly missing SPF results

src/plugins/lua/dmarc.lua

index f7a95c5affc1c479947a38ba30da37349778779d..8f148d1b1f9fa4e52aee27576acfbbfa25c12833 100644 (file)
@@ -17,7 +17,6 @@ limitations under the License.
 
 -- Dmarc policy filter
 
-local hash = require "rspamd_cryptobox_hash"
 local rspamd_logger = require "rspamd_logger"
 local mempool = require "rspamd_mempool"
 local rspamd_tcp = require "rspamd_tcp"
@@ -38,6 +37,7 @@ local pool = mempool.create()
 
 local report_settings = {
   helo = 'rspamd',
+  hscan_count = 1000,
   smtp = '127.0.0.1',
   smtp_port = 25,
 }
@@ -96,8 +96,9 @@ local dmarc_symbols = {
 }
 
 local redis_keys = {
-  dmarc_domains = 'dmarc_domains',
-  dmarc_domain = 'dmarc_%s',
+  index_prefix = 'dmarc_idx',
+  report_prefix = 'dmarc',
+  join_char = ';',
 }
 
 local function gen_xml_grammar()
@@ -128,12 +129,14 @@ local E = {}
 
 local take_report_sha
 local take_report_script = [[
-local dmarc_domains_key = KEYS[1]
-local dmarc_domain_key = KEYS[2]
+local index_key = KEYS[1]
+local report_key = KEYS[2]
 local dmarc_domain = ARGV[1]
 local report = ARGV[2]
-redis.call('SADD', dmarc_domains_key, dmarc_domain)
-redis.call('LPUSH', dmarc_domain_key, report)
+redis.call('SADD', index_key, report_key)
+redis.call('EXPIRE', index_key, 172800)
+redis.call('HINCRBY', report_key, report, 1)
+redis.call('EXPIRE', report_key, 172800)
 ]]
 
 local function redis_make_request(ev_base, cfg, key, is_write, callback, command, args)
@@ -235,10 +238,10 @@ local function dmarc_report(task, spf_ok, dkim_ok, disposition, sampled_out, hfr
   local dkim_fail = table.concat(dres.fail or E, '|')
   local dkim_temperror = table.concat(dres.temperror or E, '|')
   local dkim_permerror = table.concat(dres.permerror or E, '|')
-  local res = string.format('%d,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s',
-    task:get_date(0), ip:to_string(), spf_ok, dkim_ok,
+  local res = table.concat({
+    ip:to_string(), spf_ok, dkim_ok,
     disposition, (sampled_out and 'sampled_out' or ''), hfromdom,
-    dkim_pass, dkim_fail, dkim_temperror, dkim_permerror, spfdom, spf_result)
+    dkim_pass, dkim_fail, dkim_temperror, dkim_permerror, spfdom or '', spf_result or ''}, ',')
 
   return res
 end
@@ -554,9 +557,11 @@ local function dmarc_callback(task)
         end
       end
       -- Prepare and send redis report element
-      local dmarc_domain_key = string.format(redis_keys.dmarc_domain, hfromdom)
+      local period = os.date('%Y%m%d', task:get_date(0))
+      local dmarc_domain_key = table.concat({redis_keys.report_prefix, hfromdom, period}, redis_keys.join_char)
       local report_data = dmarc_report(task, spf_ok and 'pass' or 'fail', dkim_ok and 'pass' or 'fail', dmarc_ok and 'pass' or 'fail', sampled_out,
         hfromdom, spf_domain, dkim_results, spf_result)
+      local idx_key = table.concat({redis_keys.index_prefix, period}, redis_keys.join_char)
 
       if report_data then
         local ret = rspamd_redis_make_request(task,
@@ -565,7 +570,7 @@ local function dmarc_callback(task)
           true, -- is write
           dmarc_report_cb, --callback
           'EVALSHA', -- command
-          {take_report_sha, 2, redis_keys.dmarc_domains, dmarc_domain_key, hfromdom, report_data} -- arguments
+          {take_report_sha, 2, idx_key, dmarc_domain_key, hfromdom, report_data} -- arguments
         )
         if not ret then
           rspamd_logger.errx(task, 'Unable to schedule redis request')
@@ -649,11 +654,12 @@ if opts['reporting'] == true then
         assert(f:write(pool:get_variable(VAR_NAME, 'double')))
         assert(f:close())
       end)
-      local get_reporting_domain, reporting_domain, report_start, report_end, report_id
+      local get_reporting_domain, reporting_domain, report_start, report_end, report_id, want_period, report_key
       local reporting_addr = {}
       local domain_policy = {}
       local to_verify = {}
-      local function entry_to_xml(data, count)
+      local cursor = 0
+      local function entry_to_xml(data)
         local buf = {
           string.format(
 [[     <record>
@@ -670,7 +676,7 @@ if opts['reporting'] == true then
                        <header_from>%s</header_from>
                </identifiers>
        </record>
-]], data.ip, count, data.disposition, data.dkim_disposition, data.spf_disposition, data.header_from),
+]], data.ip, data.count, data.disposition, data.dkim_disposition, data.spf_disposition, data.header_from),
         }
         if data.dkim_results[1] or (data.spf_result ~= '' and data.spf_domain ~= '') then
           table.insert(buf, '\t\t<auth_results>\n')
@@ -694,67 +700,46 @@ if opts['reporting'] == true then
         report_id = string.format('%s.%d.%d', reporting_domain, report_start, report_end)
         rspamd_logger.debugm(N, rspamd_config, 'new report: %s', report_id)
         local actions = {
-          push = function(data)
-            local counts = {}
+          push = function(t)
+            local data = t[1]
             local split = rspamd_str_split(data, ',')
-            local when = tonumber(split[1])
-            if when > report_end then
-              -- XXX: Replace data!
-              rspamd_logger.debugm(N, rspamd_config, 'report is newer than report_end: %1 %2', when, report_end)
-              return false
-            elseif when < report_start then
-              rspamd_logger.debugm(N, rspamd_config, 'report is older than report_start: %1 %2', when, report_start)
-              return true
-            elseif when >= report_start then
-              local h = hash.create()
-              for i = 2, #split do
-                h:update(split[i])
-              end
-              local bin = h:bin()
-              if counts[bin] then
-                counts[bin] = counts[bin] + 1
-              else
-                counts[bin] = 1
-              end
-              local count = counts[bin]
-              local row = {
-                ip = split[2],
-                spf_disposition = split[3],
-                dkim_disposition = split[4],
-                disposition = split[5],
-                override = split[6],
-                header_from = split[7],
-                dkim_results = {},
-                spf_domain = split[12],
-                spf_result = split[13],
-              }
-              if split[8] and split[8] ~= '' then
-                local tmp = rspamd_str_split(split[8], '|')
-                for _, d in ipairs(tmp) do
-                  table.insert(row.dkim_results, {domain = d, result = 'pass'})
-                end
+            local row = {
+              ip = split[1],
+              spf_disposition = split[2],
+              dkim_disposition = split[3],
+              disposition = split[4],
+              override = split[5],
+              header_from = split[6],
+              dkim_results = {},
+              spf_domain = split[11],
+              spf_result = split[12],
+              count = t[2],
+            }
+            if split[7] and split[7] ~= '' then
+              local tmp = rspamd_str_split(split[7], '|')
+              for _, d in ipairs(tmp) do
+                table.insert(row.dkim_results, {domain = d, result = 'pass'})
               end
-              if split[9] and split[9] ~= '' then
-                local tmp = rspamd_str_split(split[9], '|')
-                for _, d in ipairs(tmp) do
-                  table.insert(row.dkim_results, {domain = d, result = 'fail'})
-                end
+            end
+            if split[8] and split[8] ~= '' then
+              local tmp = rspamd_str_split(split[8], '|')
+              for _, d in ipairs(tmp) do
+                table.insert(row.dkim_results, {domain = d, result = 'fail'})
               end
-              if split[10] and split[10] ~= '' then
-                local tmp = rspamd_str_split(split[10], '|')
-                for _, d in ipairs(tmp) do
-                  table.insert(row.dkim_results, {domain = d, result = 'temperror'})
-                end
+            end
+            if split[9] and split[9] ~= '' then
+              local tmp = rspamd_str_split(split[9], '|')
+              for _, d in ipairs(tmp) do
+                table.insert(row.dkim_results, {domain = d, result = 'temperror'})
               end
-              if split[11] and split[11] ~= '' then
-                local tmp = rspamd_str_split(split[11], '|')
-                for _, d in ipairs(tmp) do
-                  table.insert(row.dkim_results, {domain = d, result = 'permerror'})
-                end
+            end
+            if split[10] and split[10] ~= '' then
+              local tmp = rspamd_str_split(split[10], '|')
+              for _, d in ipairs(tmp) do
+                table.insert(row.dkim_results, {domain = d, result = 'permerror'})
               end
-              entries[bin] = {[row] = count}
-              return true
             end
+            table.insert(entries, row)
           end,
           header = function()
             return string.format(
@@ -786,9 +771,7 @@ if opts['reporting'] == true then
           entries = function()
             local buf = {}
             for _, e in pairs(entries) do
-              for k, v in pairs(e) do
-                table.insert(buf, entry_to_xml(k, v))
-              end
+              table.insert(buf, entry_to_xml(e))
             end
             return table.concat(buf, '')
           end,
@@ -919,35 +902,38 @@ if opts['reporting'] == true then
         })
       end
       local function make_report()
+        if type(report_settings.override_address) == 'string' then
+          reporting_addr = {report_settings.override_address}
+        end
         rspamd_logger.infox(ev_base, 'sending report for %s <%s>', reporting_domain, table.concat(reporting_addr, ','))
         local dmarc_xml = dmarc_report_xml()
         local dmarc_push_cb
-        local dmarc_domain_key = string.format(redis_keys.dmarc_domain, reporting_domain)
         dmarc_push_cb = function(err, data)
           if err then
             rspamd_logger.errx(ev_base, 'Redis request failed: %s', err)
             -- XXX: data is orphaned; replace key or delete data
             get_reporting_domain()
-          elseif type(data) == 'string' then
-            if dmarc_xml('push', data) then
+          elseif type(data) == 'table' then
+            cursor = tonumber(data[1])
+            for i = 1, #data[2], 2 do
+              dmarc_xml('push', {data[2][i], data[2][i+1]})
+            end
+            if cursor ~= 0 then
               local ret = redis_make_request(ev_base,
                 rspamd_config,
                 nil,
                 false, -- is write
                 dmarc_push_cb, --callback
-                'LPOP', -- command
-                {dmarc_domain_key}
+                'HSCAN', -- command
+                {report_key, cursor, 'COUNT', report_settings.hscan_count}
               )
               if not ret then
                 rspamd_logger.errx(ev_base, 'Failed to schedule redis request')
-                -- XXX: data is orphaned; replace key or delete data
                 get_reporting_domain()
               end
             else
               send_report_via_email(dmarc_xml)
             end
-          else
-            send_report_via_email(dmarc_xml)
           end
         end
         local ret = redis_make_request(ev_base,
@@ -955,8 +941,8 @@ if opts['reporting'] == true then
           nil,
           false, -- is write
           dmarc_push_cb, --callback
-          'LPOP', -- command
-          {dmarc_domain_key}
+          'HSCAN', -- command
+          {report_key, cursor, 'COUNT', report_settings.hscan_count}
         )
         if not ret then
           rspamd_logger.errx(rspamd_config, 'Failed to schedule redis request')
@@ -972,14 +958,13 @@ if opts['reporting'] == true then
           rspamd_logger.infox(rspamd_config, 'Deleted reports for %s')
           get_reporting_domain()
         end
-        local dmarc_domain_key = string.format(redis_keys.dmarc_domain, reporting_domain)
         local ret = redis_make_request(ev_base,
           rspamd_config,
           nil,
           false, -- is write
           delete_reports_cb, --callback
           'DEL', -- command
-          {dmarc_domain_key}
+          {report_key}
         )
         if not ret then
           rspamd_logger.errx(rspamd_config, 'Failed to schedule redis request')
@@ -1107,6 +1092,7 @@ if opts['reporting'] == true then
         reporting_domain = nil
         reporting_addr = {}
         domain_policy = {}
+        cursor = 0
         local function get_reporting_domain_cb(err, data)
           if err then
             rspamd_logger.errx(cfg, 'Unable to get DMARC domain: %s', err)
@@ -1114,7 +1100,9 @@ if opts['reporting'] == true then
             if type(data) == 'userdata' then
               reporting_domain = nil
             else
-              reporting_domain = data
+              report_key = data
+              local tmp = rspamd_str_split(data, redis_keys.join_char)
+              reporting_domain = tmp[2]
             end
             if not reporting_domain then
               rspamd_logger.infox(cfg, 'No more domains to generate reports for')
@@ -1123,13 +1111,14 @@ if opts['reporting'] == true then
             end
           end
         end
+        local idx_key = table.concat({redis_keys.index_prefix, want_period}, redis_keys.join_char)
         local ret = redis_make_request(ev_base,
           rspamd_config,
           nil,
           false, -- is write
           get_reporting_domain_cb, --callback
           'SPOP', -- command
-          {redis_keys.dmarc_domains}
+          {idx_key}
         )
         if not ret then
           rspamd_logger.errx(cfg, 'Unable to get DMARC domain')
@@ -1138,8 +1127,15 @@ if opts['reporting'] == true then
       local function send_reports(time)
         rspamd_logger.infox(ev_base, 'sending reports ostensibly %1', time)
         pool:set_variable(VAR_NAME, time)
-        report_end = time
-        report_start = time - INTERVAL
+        local yesterday = os.date('*t', rspamd_util.get_time() - INTERVAL)
+        local today = os.date('*t', rspamd_util.get_time())
+        report_start = os.time({year = yesterday.year, month = yesterday.month, day = yesterday.day, hour = 0})
+        report_end = os.time({year = today.year, month = today.month, day = today.day, hour = 0})
+        want_period = table.concat({
+          yesterday.year,
+          string.format('%02d', yesterday.month),
+          string.format('%02d', yesterday.day)
+        })
         get_reporting_domain()
       end
       -- Push reports at regular intervals