From 776d1048a93c9bb609ee4ae918eaf513ea8d7a09 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 5 Aug 2021 15:53:00 +0100 Subject: [PATCH] [Feature] Dmarc_report: allow sending reports in batches --- lualib/rspamadm/dmarc_report.lua | 105 ++++++++++++++++++++----------- 1 file changed, 70 insertions(+), 35 deletions(-) diff --git a/lualib/rspamadm/dmarc_report.lua b/lualib/rspamadm/dmarc_report.lua index 8894f23e3..fcc4f5e7f 100644 --- a/lualib/rspamadm/dmarc_report.lua +++ b/lualib/rspamadm/dmarc_report.lua @@ -49,6 +49,11 @@ parser:argument "date" :description "Date to process (today by default)" :argname "" :args "*" +parser:option "-b --batch-size" + :description "Send reports in batches up to messages" + :argname "" + :convert(tonumber) + :default "10" local report_template = [[From: "{= from_name =}" <{= from_addr =}> To: {= rcpt =} @@ -396,44 +401,66 @@ local function rcpt_list(tbl, func) end -- Synchronous smtp send function -local function send_reports_by_smtp(opts, reports) +local function send_reports_by_smtp(opts, reports, finish_cb) local lua_smtp = require "lua_smtp" - local reports_remaining = #reports local reports_failed = 0 + local reports_sent = 0 local report_settings = dmarc_settings.reporting - local function gen_sendmail_cb(report) + local function gen_sendmail_cb(report, args) return function(ret, err) - reports_remaining = reports_remaining - 1 + -- We modify this from all callbacks + args.nreports = args.nreports - 1 if not ret then logger.errx("Couldn't send mail for %s: %s", report.reporting_domain, err) reports_failed = reports_failed + 1 else + reports_sent = reports_sent + 1 lua_util.debugm(N, 'successfully sent a report for %s: %s bytes sent', report.reporting_domain, #report.message) end + + -- Tail call to the next batch or to the final function + if args.nreports == 0 then + if args.next_start > #reports then + finish_cb(reports_sent, reports_failed) + else + args.cont_func(args.next_start) + end + end end end - for _,report in ipairs(reports) do - local ret = lua_smtp.sendmail({ - ev_base = rspamadm_ev_base, - session = rspamadm_session, - config = rspamd_config, - host = report_settings.smtp, - port = report_settings.smtp_port or 25, - resolver = rspamadm_dns_resolver, - from = report_settings.email, - recipients = report.rcpts, - helo = report_settings.helo or 'rspamd.localhost', - }, report.message, gen_sendmail_cb(report)) - - if ret then - reports_remaining = reports_remaining + 1 + local function send_data_in_batches(cur_batch) + local nreports = math.min(#reports - cur_batch + 1, opts.batch_size) + local next_start = cur_batch + nreports + lua_util.debugm(N, 'send data for %s domains (from %s to %s)', + nreports, cur_batch, next_start-1) + -- Shared across all closures + local gen_args = { + cont_func = send_data_in_batches, + nreports = nreports, + next_start = next_start + } + for i=cur_batch,next_start-1 do + local report = reports[i] + lua_smtp.sendmail({ + ev_base = rspamadm_ev_base, + session = rspamadm_session, + config = rspamd_config, + host = report_settings.smtp, + port = report_settings.smtp_port or 25, + resolver = rspamadm_dns_resolver, + from = report_settings.email, + recipients = report.rcpts, + helo = report_settings.helo or 'rspamd.localhost', + }, + report.message, + gen_sendmail_cb(report, gen_args)) end end - return reports_remaining + send_data_in_batches(1) end local function prepare_report(opts, start_time, rep_key) @@ -546,7 +573,7 @@ local function process_report_date(opts, start_time, date) if not ret or not results or results == 0 then logger.messagex('No reports for %s', date) - return 0 + return {} end -- Rename index key to avoid races @@ -565,7 +592,7 @@ local function process_report_date(opts, start_time, date) {'DEL', idx_key}) end logger.messagex('Cannot get reports for %s', date) - return 0 + return {} end local reports = {} @@ -585,7 +612,7 @@ local function process_report_date(opts, start_time, date) {'DEL', idx_key}) end - return send_reports_by_smtp(opts, reports) + return reports end local function handler(args) @@ -638,26 +665,34 @@ local function handler(args) local ndates = 0 local nreports = 0 + local all_reports = {} for _,date in ipairs(opts.date) do lua_util.debugm(N, 'Process date %s', date) - local nproc = process_report_date(opts, start_time, date) - if nproc > 0 then + local reports_for_date = process_report_date(opts, start_time, date) + if #reports_for_date > 0 then ndates = ndates + 1 - nreports = nreports + nproc + nreports = nreports + #reports_for_date + + for _,r in ipairs(reports_for_date) do + table.insert(all_reports, r) + end end end - if not opts.no_opt then - lua_util.debugm(N, 'set last report date to %s', os.time()) - lua_redis.request(redis_params, redis_attrs, - {'SETEX', 'rspamd_dmarc_last_collection', dmarc_settings.reporting.keys_expire, - tostring(os.time())}) - end + local function finish_cb(nsuccess, nfail) + if not opts.no_opt then + lua_util.debugm(N, 'set last report date to %s', os.time()) + lua_redis.request(redis_params, redis_attrs, + {'SETEX', 'rspamd_dmarc_last_collection', dmarc_settings.reporting.keys_expire, + tostring(os.time())}) + end - logger.messagex('Reporting collection has finished %s dates processed, %s reports', - ndates, nreports) + logger.messagex('Reporting collection has finished %s dates processed, %s reports: %s completed, %s failed', + ndates, nreports, nsuccess, nfail) - pool:destroy() + pool:destroy() + end + send_reports_by_smtp(opts, all_reports, finish_cb) end return { -- 2.39.5