aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/lua/metadata_exporter.lua
diff options
context:
space:
mode:
authorAndrew Lewis <nerf@judo.za.org>2017-02-17 17:08:01 +0200
committerAndrew Lewis <nerf@judo.za.org>2017-02-17 18:57:38 +0200
commit22e6bb543b392103486858f8ed202ff05e3c563e (patch)
tree9c683e1456723111d5971fce3b9e2c042872d852 /src/plugins/lua/metadata_exporter.lua
parent8b18feb6bc086a9ae7f2eef31f78325221707476 (diff)
downloadrspamd-22e6bb543b392103486858f8ed202ff05e3c563e.tar.gz
rspamd-22e6bb543b392103486858f8ed202ff05e3c563e.zip
[Minor] More new features/rework for metadata exporter
Diffstat (limited to 'src/plugins/lua/metadata_exporter.lua')
-rw-r--r--src/plugins/lua/metadata_exporter.lua484
1 files changed, 289 insertions, 195 deletions
diff --git a/src/plugins/lua/metadata_exporter.lua b/src/plugins/lua/metadata_exporter.lua
index 18466fa70..d71155f5a 100644
--- a/src/plugins/lua/metadata_exporter.lua
+++ b/src/plugins/lua/metadata_exporter.lua
@@ -17,16 +17,17 @@ limitations under the License.
-- A plugin that pushes metadata (or whole messages) to external services
-local rspamd_http
-local rspamd_tcp
-local rspamd_util
+local redis_params
+local rspamd_http = require "rspamd_http"
+local rspamd_tcp = require "rspamd_tcp"
+local rspamd_util = require "rspamd_util"
local rspamd_logger = require "rspamd_logger"
local N = 'metadata_exporter'
local settings = {
- format = function(task)
- return task:get_content()
- end,
+ pusher_enabled = {},
+ pusher_format = {},
+ pusher_select = {},
mime_type = 'text/plain',
defer = false,
mail_from = '',
@@ -43,6 +44,9 @@ Spam received from user %s on IP %s - queue ID %s]],
}
local formatters = {
+ default = function(task)
+ return task:get_content()
+ end,
email_alert = function(task)
local auser = task:get_user() or '[not applicable]'
local fip = task:get_from_ip() or '[unknown]'
@@ -57,49 +61,260 @@ local formatters = {
}
local selectors = {
+ default = function(task)
+ return true
+ end,
is_spam = function(task)
local action = task:get_metric_action('default')
- return (action == 'reject' or action == 'add header') and true or false
+ return (action == 'reject' or action == 'add header')
end,
is_spam_authed = function(task)
if not task:get_user() then
return false
end
local action = task:get_metric_action('default')
- return (action == 'reject' or action == 'add header') and true or false
+ return (action == 'reject' or action == 'add header')
end,
is_reject = function(task)
local action = task:get_metric_action('default')
- return (action == 'reject') and true or false
+ return (action == 'reject')
end,
is_reject_authed = function(task)
if not task:get_user() then
return false
end
local action = task:get_metric_action('default')
- return (action == 'reject') and true or false
+ return (action == 'reject')
+ end,
+}
+
+local function maybe_defer(task)
+ if settings.defer then
+ rspamd_logger.warnx(task, 'deferring message')
+ task:set_metric_action('default', 'soft reject')
+ end
+end
+
+local function maybe_force_action(task)
+ if settings.force_action then
+ rspamd_logger.warnx(task, 'forcing action: %s', settings.force_action)
+ task:set_metric_action('default', settings.force_action)
+ end
+end
+
+local pushers = {
+ redis_pubsub = function(task, formatted)
+ local _,ret,upstream
+ local function redis_pub_cb(err)
+ if err then
+ rspamd_logger.errx(task, 'got error %s when publishing on server %s',
+ err, upstream:get_addr())
+ return maybe_defer(task)
+ end
+ maybe_force_action(task)
+ end
+ ret,_,upstream = rspamd_redis_make_request(task,
+ redis_params, -- connect params
+ nil, -- hash key
+ true, -- is write
+ redis_pub_cb, --callback
+ 'PUBLISH', -- command
+ {settings.channel, formatted} -- arguments
+ )
+ if not ret then
+ rspamd_logger.errx(task, 'error connecting to redis')
+ maybe_defer(task)
+ end
+ end,
+ http = function(task, formatted)
+ local function http_callback(err, code)
+ if err then
+ rspamd_logger.errx(task, 'got error %s in http callback', err)
+ return maybe_defer(task)
+ end
+ if code ~= 200 then
+ rspamd_logger.errx(task, 'got unexpected http status: %s', code)
+ return maybe_defer(task)
+ end
+ maybe_force_action(task)
+ end
+ rspamd_http.request({
+ task=task,
+ url=settings.url,
+ body=formatted,
+ callback=http_callback,
+ mime_type=settings['mime_type'],
+ })
+ end,
+ send_mail = function(task, formatted)
+ local function mail_cb(err, data, conn)
+ local function no_error(merr, mdata, wantcode)
+ wantcode = wantcode or '2'
+ if merr then
+ rspamd_logger.errx(task, 'got error in tcp callback: %s', merr)
+ if conn then
+ conn:close()
+ end
+ maybe_defer(task)
+ return false
+ end
+ if mdata then
+ if type(mdata) ~= 'string' then
+ mdata = tostring(mdata)
+ end
+ if string.sub(mdata, 1, 1) ~= wantcode then
+ rspamd_logger.errx(task, 'got bad smtp response: %s', mdata)
+ if conn then
+ conn:close()
+ end
+ maybe_defer(task)
+ return false
+ end
+ else
+ rspamd_logger.errx(task, 'no data')
+ if conn then
+ conn:close()
+ end
+ maybe_defer(task)
+ return false
+ end
+ return true
+ end
+ local function all_done_cb(merr, mdata)
+ maybe_force_action(task)
+ if conn then
+ conn:close()
+ end
+ end
+ local function quit_done_cb(merr, mdata)
+ conn:add_read(all_done_cb, '\r\n')
+ end
+ local function quit_cb(merr, mdata)
+ if no_error(merr, mdata) then
+ conn:add_write(quit_done_cb, 'QUIT\r\n')
+ end
+ end
+ local function pre_quit_cb(merr, mdata)
+ if no_error(merr, '2') then
+ conn:add_read(quit_cb, '\r\n')
+ end
+ end
+ local function data_done_cb(merr, mdata)
+ if no_error(merr, mdata, '3') then
+ conn:add_write(pre_quit_cb, {formatted, '\r\n.\r\n'})
+ end
+ end
+ local function data_cb(merr, mdata)
+ if no_error(merr, '2') then
+ conn:add_read(data_done_cb, '\r\n')
+ end
+ end
+ local function rcpt_done_cb(merr, mdata)
+ if no_error(merr, mdata) then
+ conn:add_write(data_cb, 'DATA\r\n')
+ end
+ end
+ local function rcpt_cb(merr, mdata)
+ if no_error(merr, '2') then
+ conn:add_read(rcpt_done_cb, '\r\n')
+ end
+ end
+ local function from_done_cb(merr, mdata)
+ if no_error(merr, mdata) then
+ conn:add_write(rcpt_cb, 'RCPT TO: <' .. settings.mail_to .. '>\r\n')
+ end
+ end
+ local function from_cb(merr, mdata)
+ if no_error(merr, '2') then
+ conn:add_read(from_done_cb, '\r\n')
+ end
+ end
+ local function hello_done_cb(merr, mdata)
+ if no_error(merr, mdata) then
+ conn:add_write(from_cb, 'MAIL FROM: <' .. settings.mail_from .. '>\r\n')
+ end
+ end
+ local function hello_cb(merr)
+ if no_error(merr, '2') then
+ conn:add_read(hello_done_cb, '\r\n')
+ end
+ end
+ if no_error(err, data) then
+ conn:add_write(hello_cb, 'HELO ' .. settings.helo .. '\r\n')
+ end
+ end
+ rspamd_tcp.request({
+ task = task,
+ callback = mail_cb,
+ stop_pattern = '\r\n',
+ host = settings.smtp,
+ port = settings.smtp_port or 25,
+ })
end,
}
local opts = rspamd_config:get_all_opt(N)
if not opts then return end
-local redis_params
local process_settings = {
- select = function(key)
- settings.select = assert(load(opts['select']))()
+ select = function(val)
+ selectors.custom = assert(load(val))()
end,
- format = function(key)
- settings.format = assert(load(opts['format']))()
+ format = function(val)
+ formatters.custom = assert(load(val))()
+ end,
+ push = function(key, val)
+ pushers.custom = assert(load(val))()
+ end,
+ pusher_enabled = function(val)
+ if type(val) == 'string' then
+ if pushers[val] then
+ settings.pusher_enabled[val] = true
+ else
+ rspamd_logger.errx(rspamd_config, 'Pusher type: %s is invalid', val)
+ end
+ elseif type(val) == 'table' then
+ for _, v in ipairs(val) do
+ if pushers[v] then
+ settings.pusher_enabled[v] = true
+ else
+ rspamd_logger.errx(rspamd_config, 'Pusher type: %s is invalid', val)
+ end
+ end
+ end
end,
}
for k, v in pairs(opts) do
local f = process_settings[k]
if f then
- f()
+ f(opts[k])
else
settings[k] = v
end
end
+if not next(settings.pusher_enabled) then
+ if pushers.custom then
+ rspamd_logger.infox(rspamd_config, 'Custom pusher implicitly enabled')
+ settings.pusher_enabled.custom = true
+ else
+ -- Check legacy options
+ if settings.url then
+ rspamd_logger.warnx(rspamd_config, 'HTTP pusher implicitly enabled')
+ settings.pusher_enabled.http = true
+ end
+ if settings.channel then
+ rspamd_logger.warnx(rspamd_config, 'Redis Pubsub pusher implicitly enabled')
+ settings.pusher_enabled.redis_pubsub = true
+ end
+ if settings.smtp and settings.mail_to then
+ rspamd_logger.warnx(rspamd_config, 'Redis Pubsub pusher implicitly enabled')
+ settings.pusher_enabled.send_mail = true
+ end
+ end
+end
+if not next(settings.pusher_enabled) then
+ rspamd_logger.errx(rspamd_config, 'No push backend enabled')
+ return
+end
if settings.formatter then
settings.format = formatters[settings.formatter]
if not settings.format then
@@ -114,201 +329,80 @@ if settings.selector then
return
end
end
-if not ((settings.url or settings.channel) or (settings.mail_to and settings.smtp)) then
- rspamd_logger.errx(rspamd_config, 'No backends configured')
- return
+for k in pairs(settings.pusher_enabled) do
+ local formatter = settings.pusher_format[k]
+ local selector = settings.pusher_select[k]
+ if not formatter then
+ settings.pusher_format[k] = 'default'
+ rspamd_logger.infox(rspamd_config, 'Using default formatter for %s pusher', k)
+ else
+ if not formatters[formatter] then
+ rspamd_logger.errx(rspamd_config, 'No such formatter: %s - disabling %s', formatter, k)
+ settings.pusher_enabled.k = nil
+ end
+ end
+ if not selector then
+ settings.pusher_select[k] = 'default'
+ rspamd_logger.infox(rspamd_config, 'Using default selector for %s pusher', k)
+ else
+ if not selectors[selector] then
+ rspamd_logger.errx(rspamd_config, 'No such selector: %s - disabling %s', selector, k)
+ settings.pusher_enabled.k = nil
+ end
+ end
end
-if settings.channel then
+if settings.pusher_enabled.redis_pubsub then
redis_params = rspamd_parse_redis_server(N)
if not redis_params then
rspamd_logger.errx(rspamd_config, 'No redis servers are specified')
- return
+ settings.pusher_enabled.redis_pubsub = nil
end
end
-if settings.url then
- rspamd_http = require "rspamd_http"
+if settings.pusher_enabled.http then
+ if not settings.url then
+ rspamd_logger.errx(rspamd_config, 'No URL is specified')
+ settings.pusher_enabled.http = nil
+ end
end
-if settings.mail_to then
- rspamd_tcp = require "rspamd_tcp"
- rspamd_util = require "rspamd_util"
+if settings.pusher_enabled.send_mail then
+ if not (settings.mail_to and settings.smtp) then
+ rspamd_logger.errx(rspamd_config, 'No mail_to and/or smtp setting is specified')
+ settings.pusher_enabled.send_mail = nil
+ end
end
-if opts['mime_type'] then
- settings['mime_type'] = opts['mime_type']
+if not next(settings.pusher_enabled) then
+ rspamd_logger.errx(rspamd_config, 'No push backend enabled')
+ return
end
local function metadata_exporter(task)
- local _,ret,upstream
- local function maybe_defer()
- if settings.defer then
- rspamd_logger.warnx(task, 'deferring message')
- task:set_metric_action('default', 'soft reject')
+ local results = {
+ select = {},
+ format = {},
+ }
+ for k in pairs(settings.pusher_enabled) do
+ local selector = settings.pusher_select[k] or 'default'
+ local selected = results.select[selector]
+ if selected == nil then
+ results.select[selector] = selectors[selector](task)
+ selected = results.select[selector]
end
- end
- local function mail_cb(err, data, conn)
- local function no_error(merr, mdata, wantcode)
- wantcode = wantcode or '2'
- if merr then
- rspamd_logger.errx(task, 'got error in tcp callback: %s', merr)
- if conn then
- conn:close()
- end
- maybe_defer()
- return false
+ if selected then
+ rspamd_logger.debugm(N, task, 'Message selected for processing')
+ local formatter = settings.pusher_format[k]
+ local formatted = results.format[k]
+ if formatted == nil then
+ results.format[formatter] = formatters[formatter](task)
+ formatted = results.format[formatter]
end
- if mdata then
- if type(mdata) ~= 'string' then
- mdata = tostring(mdata)
- end
- if string.sub(mdata, 1, 1) ~= wantcode then
- rspamd_logger.errx(task, 'got bad smtp response: %s', mdata)
- if conn then
- conn:close()
- end
- maybe_defer()
- return false
- end
+ if formatted then
+ pushers[k](task, formatted)
+ elseif formatted == nil then
+ rspamd_logger.warnx(task, 'Formatter [%s] returned NIL', formatter)
else
- rspamd_logger.errx(task, 'no data')
- if conn then
- conn:close()
- end
- maybe_defer()
- return false
- end
- return true
- end
- local function all_done_cb(merr, mdata)
- if settings.force_action then
- rspamd_logger.warnx(task, 'forcing action: %s', settings.force_action)
- task:set_metric_action('default', settings.force_action)
- end
- if conn then
- conn:close()
+ rspamd_logger.debugm(N, task, 'Formatter [%s] returned non-truthy value [%s]', formatter, formatted)
end
end
- local function quit_done_cb(merr, mdata)
- conn:add_read(all_done_cb, '\r\n')
- end
- local function quit_cb(merr, mdata)
- if no_error(merr, mdata) then
- conn:add_write(quit_done_cb, 'QUIT\r\n')
- end
- end
- local function pre_quit_cb(merr, mdata)
- if no_error(merr, '2') then
- conn:add_read(quit_cb, '\r\n')
- end
- end
- local function data_done_cb(merr, mdata)
- if no_error(merr, mdata, '3') then
- local msg = settings.format(task)
- conn:add_write(pre_quit_cb, msg .. '\r\n.\r\n')
- end
- end
- local function data_cb(merr, mdata)
- if no_error(merr, '2') then
- conn:add_read(data_done_cb, '\r\n')
- end
- end
- local function rcpt_done_cb(merr, mdata)
- if no_error(merr, mdata) then
- conn:add_write(data_cb, 'DATA\r\n')
- end
- end
- local function rcpt_cb(merr, mdata)
- if no_error(merr, '2') then
- conn:add_read(rcpt_done_cb, '\r\n')
- end
- end
- local function from_done_cb(merr, mdata)
- if no_error(merr, mdata) then
- conn:add_write(rcpt_cb, 'RCPT TO: <' .. settings.mail_to .. '>\r\n')
- end
- end
- local function from_cb(merr, mdata)
- if no_error(merr, '2') then
- conn:add_read(from_done_cb, '\r\n')
- end
- end
- local function hello_done_cb(merr, mdata)
- if no_error(merr, mdata) then
- conn:add_write(from_cb, 'MAIL FROM: <' .. settings.mail_from .. '>\r\n')
- end
- end
- local function hello_cb(merr)
- if no_error(merr, '2') then
- conn:add_read(hello_done_cb, '\r\n')
- end
- end
- if no_error(err, data) then
- conn:add_write(hello_cb, 'HELO ' .. settings.helo .. '\r\n')
- end
- end
- local function http_callback(err, code)
- if err then
- rspamd_logger.errx(task, 'got error %s in http callback', err)
- return maybe_defer()
- end
- if code ~= 200 then
- rspamd_logger.errx(task, 'got unexpected http status: %s', code)
- return maybe_defer()
- end
- if settings.force_action then
- rspamd_logger.warnx(task, 'forcing action: %s', settings.force_action)
- task:set_metric_action('default', settings.force_action)
- end
- end
- local function redis_set_cb(err)
- if err then
- rspamd_logger.errx(task, 'got error %s when publishing record on server %s',
- err, upstream:get_addr())
- return maybe_defer()
- end
- if settings.force_action then
- rspamd_logger.warnx(task, 'forcing action: %s', settings.force_action)
- task:set_metric_action('default', settings.force_action)
- end
- end
- if settings.select then
- if not settings.select(task) then return end
- rspamd_logger.debugm(N, task, 'Message selected for processing')
- end
- local data = settings.format(task)
- if not data then
- rspamd_logger.debugm(N, task, 'Format returned non-truthy value: %1', data)
- return
- end
- if settings.channel then
- ret,_,upstream = rspamd_redis_make_request(task,
- redis_params, -- connect params
- nil, -- hash key
- true, -- is write
- redis_set_cb, --callback
- 'PUBLISH', -- command
- {settings.channel, data} -- arguments
- )
- if not ret then
- rspamd_logger.errx(task, 'error connecting to redis')
- maybe_defer()
- end
- end
- if settings.url then
- rspamd_http.request({
- task=task,
- url=settings.url,
- body=data,
- callback=http_callback,
- mime_type=settings['mime_type'],
- })
- end
- if (settings.mail_to and settings.smtp) then
- rspamd_tcp.request({
- task = task,
- callback = mail_cb,
- stop_pattern = '\r\n',
- host = settings.smtp,
- port = settings.smtp_port or 25,
- })
end
end