From 22e6bb543b392103486858f8ed202ff05e3c563e Mon Sep 17 00:00:00 2001 From: Andrew Lewis Date: Fri, 17 Feb 2017 17:08:01 +0200 Subject: [PATCH] [Minor] More new features/rework for metadata exporter --- src/plugins/lua/metadata_exporter.lua | 484 +++++++++++++++----------- 1 file changed, 289 insertions(+), 195 deletions(-) diff --git a/src/plugins/lua/metadata_exporter.lua b/src/plugins/lua/metadata_exporter.lua index 18466fa70..d71155f5a 100644 --- a/src/plugins/lua/metadata_exporter.lua +++ b/src/plugins/lua/metadata_exporter.lua @@ -17,16 +17,17 @@ limitations under the License. -- A plugin that pushes metadata (or whole messages) to external services -local rspamd_http -local rspamd_tcp -local rspamd_util +local redis_params +local rspamd_http = require "rspamd_http" +local rspamd_tcp = require "rspamd_tcp" +local rspamd_util = require "rspamd_util" local rspamd_logger = require "rspamd_logger" local N = 'metadata_exporter' local settings = { - format = function(task) - return task:get_content() - end, + pusher_enabled = {}, + pusher_format = {}, + pusher_select = {}, mime_type = 'text/plain', defer = false, mail_from = '', @@ -43,6 +44,9 @@ Spam received from user %s on IP %s - queue ID %s]], } local formatters = { + default = function(task) + return task:get_content() + end, email_alert = function(task) local auser = task:get_user() or '[not applicable]' local fip = task:get_from_ip() or '[unknown]' @@ -57,49 +61,260 @@ local formatters = { } local selectors = { + default = function(task) + return true + end, is_spam = function(task) local action = task:get_metric_action('default') - return (action == 'reject' or action == 'add header') and true or false + return (action == 'reject' or action == 'add header') end, is_spam_authed = function(task) if not task:get_user() then return false end local action = task:get_metric_action('default') - return (action == 'reject' or action == 'add header') and true or false + return (action == 'reject' or action == 'add header') end, is_reject = function(task) local action = task:get_metric_action('default') - return (action == 'reject') and true or false + return (action == 'reject') end, is_reject_authed = function(task) if not task:get_user() then return false end local action = task:get_metric_action('default') - return (action == 'reject') and true or false + return (action == 'reject') + end, +} + +local function maybe_defer(task) + if settings.defer then + rspamd_logger.warnx(task, 'deferring message') + task:set_metric_action('default', 'soft reject') + end +end + +local function maybe_force_action(task) + if settings.force_action then + rspamd_logger.warnx(task, 'forcing action: %s', settings.force_action) + task:set_metric_action('default', settings.force_action) + end +end + +local pushers = { + redis_pubsub = function(task, formatted) + local _,ret,upstream + local function redis_pub_cb(err) + if err then + rspamd_logger.errx(task, 'got error %s when publishing on server %s', + err, upstream:get_addr()) + return maybe_defer(task) + end + maybe_force_action(task) + end + ret,_,upstream = rspamd_redis_make_request(task, + redis_params, -- connect params + nil, -- hash key + true, -- is write + redis_pub_cb, --callback + 'PUBLISH', -- command + {settings.channel, formatted} -- arguments + ) + if not ret then + rspamd_logger.errx(task, 'error connecting to redis') + maybe_defer(task) + end + end, + http = function(task, formatted) + local function http_callback(err, code) + if err then + rspamd_logger.errx(task, 'got error %s in http callback', err) + return maybe_defer(task) + end + if code ~= 200 then + rspamd_logger.errx(task, 'got unexpected http status: %s', code) + return maybe_defer(task) + end + maybe_force_action(task) + end + rspamd_http.request({ + task=task, + url=settings.url, + body=formatted, + callback=http_callback, + mime_type=settings['mime_type'], + }) + end, + send_mail = function(task, formatted) + local function mail_cb(err, data, conn) + local function no_error(merr, mdata, wantcode) + wantcode = wantcode or '2' + if merr then + rspamd_logger.errx(task, 'got error in tcp callback: %s', merr) + if conn then + conn:close() + end + maybe_defer(task) + return false + end + if mdata then + if type(mdata) ~= 'string' then + mdata = tostring(mdata) + end + if string.sub(mdata, 1, 1) ~= wantcode then + rspamd_logger.errx(task, 'got bad smtp response: %s', mdata) + if conn then + conn:close() + end + maybe_defer(task) + return false + end + else + rspamd_logger.errx(task, 'no data') + if conn then + conn:close() + end + maybe_defer(task) + return false + end + return true + end + local function all_done_cb(merr, mdata) + maybe_force_action(task) + if conn then + conn:close() + end + end + local function quit_done_cb(merr, mdata) + conn:add_read(all_done_cb, '\r\n') + end + local function quit_cb(merr, mdata) + if no_error(merr, mdata) then + conn:add_write(quit_done_cb, 'QUIT\r\n') + end + end + local function pre_quit_cb(merr, mdata) + if no_error(merr, '2') then + conn:add_read(quit_cb, '\r\n') + end + end + local function data_done_cb(merr, mdata) + if no_error(merr, mdata, '3') then + conn:add_write(pre_quit_cb, {formatted, '\r\n.\r\n'}) + end + end + local function data_cb(merr, mdata) + if no_error(merr, '2') then + conn:add_read(data_done_cb, '\r\n') + end + end + local function rcpt_done_cb(merr, mdata) + if no_error(merr, mdata) then + conn:add_write(data_cb, 'DATA\r\n') + end + end + local function rcpt_cb(merr, mdata) + if no_error(merr, '2') then + conn:add_read(rcpt_done_cb, '\r\n') + end + end + local function from_done_cb(merr, mdata) + if no_error(merr, mdata) then + conn:add_write(rcpt_cb, 'RCPT TO: <' .. settings.mail_to .. '>\r\n') + end + end + local function from_cb(merr, mdata) + if no_error(merr, '2') then + conn:add_read(from_done_cb, '\r\n') + end + end + local function hello_done_cb(merr, mdata) + if no_error(merr, mdata) then + conn:add_write(from_cb, 'MAIL FROM: <' .. settings.mail_from .. '>\r\n') + end + end + local function hello_cb(merr) + if no_error(merr, '2') then + conn:add_read(hello_done_cb, '\r\n') + end + end + if no_error(err, data) then + conn:add_write(hello_cb, 'HELO ' .. settings.helo .. '\r\n') + end + end + rspamd_tcp.request({ + task = task, + callback = mail_cb, + stop_pattern = '\r\n', + host = settings.smtp, + port = settings.smtp_port or 25, + }) end, } local opts = rspamd_config:get_all_opt(N) if not opts then return end -local redis_params local process_settings = { - select = function(key) - settings.select = assert(load(opts['select']))() + select = function(val) + selectors.custom = assert(load(val))() end, - format = function(key) - settings.format = assert(load(opts['format']))() + format = function(val) + formatters.custom = assert(load(val))() + end, + push = function(key, val) + pushers.custom = assert(load(val))() + end, + pusher_enabled = function(val) + if type(val) == 'string' then + if pushers[val] then + settings.pusher_enabled[val] = true + else + rspamd_logger.errx(rspamd_config, 'Pusher type: %s is invalid', val) + end + elseif type(val) == 'table' then + for _, v in ipairs(val) do + if pushers[v] then + settings.pusher_enabled[v] = true + else + rspamd_logger.errx(rspamd_config, 'Pusher type: %s is invalid', val) + end + end + end end, } for k, v in pairs(opts) do local f = process_settings[k] if f then - f() + f(opts[k]) else settings[k] = v end end +if not next(settings.pusher_enabled) then + if pushers.custom then + rspamd_logger.infox(rspamd_config, 'Custom pusher implicitly enabled') + settings.pusher_enabled.custom = true + else + -- Check legacy options + if settings.url then + rspamd_logger.warnx(rspamd_config, 'HTTP pusher implicitly enabled') + settings.pusher_enabled.http = true + end + if settings.channel then + rspamd_logger.warnx(rspamd_config, 'Redis Pubsub pusher implicitly enabled') + settings.pusher_enabled.redis_pubsub = true + end + if settings.smtp and settings.mail_to then + rspamd_logger.warnx(rspamd_config, 'Redis Pubsub pusher implicitly enabled') + settings.pusher_enabled.send_mail = true + end + end +end +if not next(settings.pusher_enabled) then + rspamd_logger.errx(rspamd_config, 'No push backend enabled') + return +end if settings.formatter then settings.format = formatters[settings.formatter] if not settings.format then @@ -114,201 +329,80 @@ if settings.selector then return end end -if not ((settings.url or settings.channel) or (settings.mail_to and settings.smtp)) then - rspamd_logger.errx(rspamd_config, 'No backends configured') - return +for k in pairs(settings.pusher_enabled) do + local formatter = settings.pusher_format[k] + local selector = settings.pusher_select[k] + if not formatter then + settings.pusher_format[k] = 'default' + rspamd_logger.infox(rspamd_config, 'Using default formatter for %s pusher', k) + else + if not formatters[formatter] then + rspamd_logger.errx(rspamd_config, 'No such formatter: %s - disabling %s', formatter, k) + settings.pusher_enabled.k = nil + end + end + if not selector then + settings.pusher_select[k] = 'default' + rspamd_logger.infox(rspamd_config, 'Using default selector for %s pusher', k) + else + if not selectors[selector] then + rspamd_logger.errx(rspamd_config, 'No such selector: %s - disabling %s', selector, k) + settings.pusher_enabled.k = nil + end + end end -if settings.channel then +if settings.pusher_enabled.redis_pubsub then redis_params = rspamd_parse_redis_server(N) if not redis_params then rspamd_logger.errx(rspamd_config, 'No redis servers are specified') - return + settings.pusher_enabled.redis_pubsub = nil end end -if settings.url then - rspamd_http = require "rspamd_http" +if settings.pusher_enabled.http then + if not settings.url then + rspamd_logger.errx(rspamd_config, 'No URL is specified') + settings.pusher_enabled.http = nil + end end -if settings.mail_to then - rspamd_tcp = require "rspamd_tcp" - rspamd_util = require "rspamd_util" +if settings.pusher_enabled.send_mail then + if not (settings.mail_to and settings.smtp) then + rspamd_logger.errx(rspamd_config, 'No mail_to and/or smtp setting is specified') + settings.pusher_enabled.send_mail = nil + end end -if opts['mime_type'] then - settings['mime_type'] = opts['mime_type'] +if not next(settings.pusher_enabled) then + rspamd_logger.errx(rspamd_config, 'No push backend enabled') + return end local function metadata_exporter(task) - local _,ret,upstream - local function maybe_defer() - if settings.defer then - rspamd_logger.warnx(task, 'deferring message') - task:set_metric_action('default', 'soft reject') + local results = { + select = {}, + format = {}, + } + for k in pairs(settings.pusher_enabled) do + local selector = settings.pusher_select[k] or 'default' + local selected = results.select[selector] + if selected == nil then + results.select[selector] = selectors[selector](task) + selected = results.select[selector] end - end - local function mail_cb(err, data, conn) - local function no_error(merr, mdata, wantcode) - wantcode = wantcode or '2' - if merr then - rspamd_logger.errx(task, 'got error in tcp callback: %s', merr) - if conn then - conn:close() - end - maybe_defer() - return false + if selected then + rspamd_logger.debugm(N, task, 'Message selected for processing') + local formatter = settings.pusher_format[k] + local formatted = results.format[k] + if formatted == nil then + results.format[formatter] = formatters[formatter](task) + formatted = results.format[formatter] end - if mdata then - if type(mdata) ~= 'string' then - mdata = tostring(mdata) - end - if string.sub(mdata, 1, 1) ~= wantcode then - rspamd_logger.errx(task, 'got bad smtp response: %s', mdata) - if conn then - conn:close() - end - maybe_defer() - return false - end + if formatted then + pushers[k](task, formatted) + elseif formatted == nil then + rspamd_logger.warnx(task, 'Formatter [%s] returned NIL', formatter) else - rspamd_logger.errx(task, 'no data') - if conn then - conn:close() - end - maybe_defer() - return false - end - return true - end - local function all_done_cb(merr, mdata) - if settings.force_action then - rspamd_logger.warnx(task, 'forcing action: %s', settings.force_action) - task:set_metric_action('default', settings.force_action) - end - if conn then - conn:close() + rspamd_logger.debugm(N, task, 'Formatter [%s] returned non-truthy value [%s]', formatter, formatted) end end - local function quit_done_cb(merr, mdata) - conn:add_read(all_done_cb, '\r\n') - end - local function quit_cb(merr, mdata) - if no_error(merr, mdata) then - conn:add_write(quit_done_cb, 'QUIT\r\n') - end - end - local function pre_quit_cb(merr, mdata) - if no_error(merr, '2') then - conn:add_read(quit_cb, '\r\n') - end - end - local function data_done_cb(merr, mdata) - if no_error(merr, mdata, '3') then - local msg = settings.format(task) - conn:add_write(pre_quit_cb, msg .. '\r\n.\r\n') - end - end - local function data_cb(merr, mdata) - if no_error(merr, '2') then - conn:add_read(data_done_cb, '\r\n') - end - end - local function rcpt_done_cb(merr, mdata) - if no_error(merr, mdata) then - conn:add_write(data_cb, 'DATA\r\n') - end - end - local function rcpt_cb(merr, mdata) - if no_error(merr, '2') then - conn:add_read(rcpt_done_cb, '\r\n') - end - end - local function from_done_cb(merr, mdata) - if no_error(merr, mdata) then - conn:add_write(rcpt_cb, 'RCPT TO: <' .. settings.mail_to .. '>\r\n') - end - end - local function from_cb(merr, mdata) - if no_error(merr, '2') then - conn:add_read(from_done_cb, '\r\n') - end - end - local function hello_done_cb(merr, mdata) - if no_error(merr, mdata) then - conn:add_write(from_cb, 'MAIL FROM: <' .. settings.mail_from .. '>\r\n') - end - end - local function hello_cb(merr) - if no_error(merr, '2') then - conn:add_read(hello_done_cb, '\r\n') - end - end - if no_error(err, data) then - conn:add_write(hello_cb, 'HELO ' .. settings.helo .. '\r\n') - end - end - local function http_callback(err, code) - if err then - rspamd_logger.errx(task, 'got error %s in http callback', err) - return maybe_defer() - end - if code ~= 200 then - rspamd_logger.errx(task, 'got unexpected http status: %s', code) - return maybe_defer() - end - if settings.force_action then - rspamd_logger.warnx(task, 'forcing action: %s', settings.force_action) - task:set_metric_action('default', settings.force_action) - end - end - local function redis_set_cb(err) - if err then - rspamd_logger.errx(task, 'got error %s when publishing record on server %s', - err, upstream:get_addr()) - return maybe_defer() - end - if settings.force_action then - rspamd_logger.warnx(task, 'forcing action: %s', settings.force_action) - task:set_metric_action('default', settings.force_action) - end - end - if settings.select then - if not settings.select(task) then return end - rspamd_logger.debugm(N, task, 'Message selected for processing') - end - local data = settings.format(task) - if not data then - rspamd_logger.debugm(N, task, 'Format returned non-truthy value: %1', data) - return - end - if settings.channel then - ret,_,upstream = rspamd_redis_make_request(task, - redis_params, -- connect params - nil, -- hash key - true, -- is write - redis_set_cb, --callback - 'PUBLISH', -- command - {settings.channel, data} -- arguments - ) - if not ret then - rspamd_logger.errx(task, 'error connecting to redis') - maybe_defer() - end - end - if settings.url then - rspamd_http.request({ - task=task, - url=settings.url, - body=data, - callback=http_callback, - mime_type=settings['mime_type'], - }) - end - if (settings.mail_to and settings.smtp) then - rspamd_tcp.request({ - task = task, - callback = mail_cb, - stop_pattern = '\r\n', - host = settings.smtp, - port = settings.smtp_port or 25, - }) end end -- 2.39.5