]> source.dussan.org Git - rspamd.git/commitdiff
[Minor] More new features/rework for metadata exporter 1432/head
authorAndrew Lewis <nerf@judo.za.org>
Fri, 17 Feb 2017 15:08:01 +0000 (17:08 +0200)
committerAndrew Lewis <nerf@judo.za.org>
Fri, 17 Feb 2017 16:57:38 +0000 (18:57 +0200)
src/plugins/lua/metadata_exporter.lua

index 18466fa700cb4e2bf306f94417f8f0b35005f6cc..d71155f5a0247892963f02af6697daf4d696a27b 100644 (file)
@@ -17,16 +17,17 @@ limitations under the License.
 
 -- A plugin that pushes metadata (or whole messages) to external services
 
-local rspamd_http
-local rspamd_tcp
-local rspamd_util
+local redis_params
+local rspamd_http = require "rspamd_http"
+local rspamd_tcp = require "rspamd_tcp"
+local rspamd_util = require "rspamd_util"
 local rspamd_logger = require "rspamd_logger"
 local N = 'metadata_exporter'
 
 local settings = {
-  format = function(task)
-    return task:get_content()
-  end,
+  pusher_enabled = {},
+  pusher_format = {},
+  pusher_select = {},
   mime_type = 'text/plain',
   defer = false,
   mail_from = '',
@@ -43,6 +44,9 @@ Spam received from user %s on IP %s - queue ID %s]],
 }
 
 local formatters = {
+  default = function(task)
+    return task:get_content()
+  end,
   email_alert = function(task)
     local auser = task:get_user() or '[not applicable]'
     local fip = task:get_from_ip() or '[unknown]'
@@ -57,49 +61,260 @@ local formatters = {
 }
 
 local selectors = {
+  default = function(task)
+    return true
+  end,
   is_spam = function(task)
     local action = task:get_metric_action('default')
-    return (action == 'reject' or action == 'add header') and true or false
+    return (action == 'reject' or action == 'add header')
   end,
   is_spam_authed = function(task)
     if not task:get_user() then
       return false
     end
     local action = task:get_metric_action('default')
-    return (action == 'reject' or action == 'add header') and true or false
+    return (action == 'reject' or action == 'add header')
   end,
   is_reject = function(task)
     local action = task:get_metric_action('default')
-    return (action == 'reject') and true or false
+    return (action == 'reject')
   end,
   is_reject_authed = function(task)
     if not task:get_user() then
       return false
     end
     local action = task:get_metric_action('default')
-    return (action == 'reject') and true or false
+    return (action == 'reject')
+  end,
+}
+
+local function maybe_defer(task)
+  if settings.defer then
+    rspamd_logger.warnx(task, 'deferring message')
+    task:set_metric_action('default', 'soft reject')
+  end
+end
+
+local function maybe_force_action(task)
+  if settings.force_action then
+    rspamd_logger.warnx(task, 'forcing action: %s', settings.force_action)
+    task:set_metric_action('default', settings.force_action)
+  end
+end
+
+local pushers = {
+  redis_pubsub = function(task, formatted)
+    local _,ret,upstream
+    local function redis_pub_cb(err)
+      if err then
+        rspamd_logger.errx(task, 'got error %s when publishing on server %s',
+            err, upstream:get_addr())
+        return maybe_defer(task)
+      end
+      maybe_force_action(task)
+    end
+    ret,_,upstream = rspamd_redis_make_request(task,
+      redis_params, -- connect params
+      nil, -- hash key
+      true, -- is write
+      redis_pub_cb, --callback
+      'PUBLISH', -- command
+      {settings.channel, formatted} -- arguments
+    )
+    if not ret then
+      rspamd_logger.errx(task, 'error connecting to redis')
+      maybe_defer(task)
+    end
+  end,
+  http = function(task, formatted)
+    local function http_callback(err, code)
+      if err then
+        rspamd_logger.errx(task, 'got error %s in http callback', err)
+        return maybe_defer(task)
+      end
+      if code ~= 200 then
+        rspamd_logger.errx(task, 'got unexpected http status: %s', code)
+        return maybe_defer(task)
+      end
+      maybe_force_action(task)
+    end
+    rspamd_http.request({
+      task=task,
+      url=settings.url,
+      body=formatted,
+      callback=http_callback,
+      mime_type=settings['mime_type'],
+    })
+  end,
+  send_mail = function(task, formatted)
+    local function mail_cb(err, data, conn)
+      local function no_error(merr, mdata, wantcode)
+        wantcode = wantcode or '2'
+        if merr then
+          rspamd_logger.errx(task, 'got error in tcp callback: %s', merr)
+          if conn then
+            conn:close()
+          end
+          maybe_defer(task)
+          return false
+        end
+        if mdata then
+          if type(mdata) ~= 'string' then
+            mdata = tostring(mdata)
+            end
+          if string.sub(mdata, 1, 1) ~= wantcode then
+            rspamd_logger.errx(task, 'got bad smtp response: %s', mdata)
+            if conn then
+              conn:close()
+            end
+            maybe_defer(task)
+            return false
+            end
+        else
+            rspamd_logger.errx(task, 'no data')
+          if conn then
+            conn:close()
+          end
+          maybe_defer(task)
+          return false
+        end
+        return true
+      end
+      local function all_done_cb(merr, mdata)
+        maybe_force_action(task)
+        if conn then
+          conn:close()
+        end
+      end
+      local function quit_done_cb(merr, mdata)
+        conn:add_read(all_done_cb, '\r\n')
+      end
+      local function quit_cb(merr, mdata)
+        if no_error(merr, mdata) then
+          conn:add_write(quit_done_cb, 'QUIT\r\n')
+        end
+      end
+      local function pre_quit_cb(merr, mdata)
+        if no_error(merr, '2') then
+          conn:add_read(quit_cb, '\r\n')
+        end
+      end
+      local function data_done_cb(merr, mdata)
+        if no_error(merr, mdata, '3') then
+          conn:add_write(pre_quit_cb, {formatted, '\r\n.\r\n'})
+        end
+      end
+      local function data_cb(merr, mdata)
+        if no_error(merr, '2') then
+          conn:add_read(data_done_cb, '\r\n')
+        end
+      end
+      local function rcpt_done_cb(merr, mdata)
+        if no_error(merr, mdata) then
+          conn:add_write(data_cb, 'DATA\r\n')
+        end
+      end
+      local function rcpt_cb(merr, mdata)
+        if no_error(merr, '2') then
+          conn:add_read(rcpt_done_cb, '\r\n')
+        end
+      end
+      local function from_done_cb(merr, mdata)
+        if no_error(merr, mdata) then
+          conn:add_write(rcpt_cb, 'RCPT TO: <' .. settings.mail_to .. '>\r\n')
+        end
+      end
+      local function from_cb(merr, mdata)
+        if no_error(merr, '2') then
+          conn:add_read(from_done_cb, '\r\n')
+        end
+      end
+        local function hello_done_cb(merr, mdata)
+        if no_error(merr, mdata) then
+          conn:add_write(from_cb, 'MAIL FROM: <' .. settings.mail_from .. '>\r\n')
+        end
+      end
+      local function hello_cb(merr)
+        if no_error(merr, '2') then
+          conn:add_read(hello_done_cb, '\r\n')
+        end
+      end
+      if no_error(err, data) then
+        conn:add_write(hello_cb, 'HELO ' .. settings.helo .. '\r\n')
+      end
+    end
+    rspamd_tcp.request({
+      task = task,
+      callback = mail_cb,
+      stop_pattern = '\r\n',
+      host = settings.smtp,
+      port = settings.smtp_port or 25,
+    })
   end,
 }
 
 local opts = rspamd_config:get_all_opt(N)
 if not opts then return end
-local redis_params
 local process_settings = {
-  select = function(key)
-    settings.select = assert(load(opts['select']))()
+  select = function(val)
+    selectors.custom = assert(load(val))()
   end,
-  format = function(key)
-    settings.format = assert(load(opts['format']))()
+  format = function(val)
+    formatters.custom = assert(load(val))()
+  end,
+  push = function(key, val)
+    pushers.custom = assert(load(val))()
+  end,
+  pusher_enabled = function(val)
+    if type(val) == 'string' then
+      if pushers[val] then
+        settings.pusher_enabled[val] = true
+      else
+        rspamd_logger.errx(rspamd_config, 'Pusher type: %s is invalid', val)
+      end
+    elseif type(val) == 'table' then
+      for _, v in ipairs(val) do
+        if pushers[v] then
+          settings.pusher_enabled[v] = true
+        else
+          rspamd_logger.errx(rspamd_config, 'Pusher type: %s is invalid', val)
+        end
+      end
+    end
   end,
 }
 for k, v in pairs(opts) do
   local f = process_settings[k]
   if f then
-    f()
+    f(opts[k])
   else
     settings[k] = v
   end
 end
+if not next(settings.pusher_enabled) then
+  if pushers.custom then
+    rspamd_logger.infox(rspamd_config, 'Custom pusher implicitly enabled')
+    settings.pusher_enabled.custom = true
+  else
+    -- Check legacy options
+    if settings.url then
+      rspamd_logger.warnx(rspamd_config, 'HTTP pusher implicitly enabled')
+      settings.pusher_enabled.http = true
+    end
+    if settings.channel then
+      rspamd_logger.warnx(rspamd_config, 'Redis Pubsub pusher implicitly enabled')
+      settings.pusher_enabled.redis_pubsub = true
+    end
+    if settings.smtp and settings.mail_to then
+      rspamd_logger.warnx(rspamd_config, 'Redis Pubsub pusher implicitly enabled')
+      settings.pusher_enabled.send_mail = true
+    end
+  end
+end
+if not next(settings.pusher_enabled) then
+  rspamd_logger.errx(rspamd_config, 'No push backend enabled')
+  return
+end
 if settings.formatter then
   settings.format = formatters[settings.formatter]
   if not settings.format then
@@ -114,201 +329,80 @@ if settings.selector then
     return
   end
 end
-if not ((settings.url or settings.channel) or (settings.mail_to and settings.smtp)) then
-  rspamd_logger.errx(rspamd_config, 'No backends configured')
-  return
+for k in pairs(settings.pusher_enabled) do
+  local formatter = settings.pusher_format[k]
+  local selector = settings.pusher_select[k]
+  if not formatter then
+    settings.pusher_format[k] = 'default'
+    rspamd_logger.infox(rspamd_config, 'Using default formatter for %s pusher', k)
+  else
+    if not formatters[formatter] then
+      rspamd_logger.errx(rspamd_config, 'No such formatter: %s - disabling %s', formatter, k)
+      settings.pusher_enabled.k = nil
+    end
+  end
+  if not selector then
+    settings.pusher_select[k] = 'default'
+    rspamd_logger.infox(rspamd_config, 'Using default selector for %s pusher', k)
+  else
+    if not selectors[selector] then
+      rspamd_logger.errx(rspamd_config, 'No such selector: %s - disabling %s', selector, k)
+      settings.pusher_enabled.k = nil
+    end
+  end
 end
-if settings.channel then
+if settings.pusher_enabled.redis_pubsub then
   redis_params = rspamd_parse_redis_server(N)
   if not redis_params then
     rspamd_logger.errx(rspamd_config, 'No redis servers are specified')
-    return
+    settings.pusher_enabled.redis_pubsub = nil
   end
 end
-if settings.url then
-  rspamd_http = require "rspamd_http"
+if settings.pusher_enabled.http then
+  if not settings.url then
+    rspamd_logger.errx(rspamd_config, 'No URL is specified')
+    settings.pusher_enabled.http = nil
+  end
 end
-if settings.mail_to then
-  rspamd_tcp = require "rspamd_tcp"
-  rspamd_util = require "rspamd_util"
+if settings.pusher_enabled.send_mail then
+  if not (settings.mail_to and settings.smtp) then
+    rspamd_logger.errx(rspamd_config, 'No mail_to and/or smtp setting is specified')
+    settings.pusher_enabled.send_mail = nil
+  end
 end
-if opts['mime_type'] then
-  settings['mime_type'] = opts['mime_type']
+if not next(settings.pusher_enabled) then
+  rspamd_logger.errx(rspamd_config, 'No push backend enabled')
+  return
 end
 
 local function metadata_exporter(task)
-  local _,ret,upstream
-  local function maybe_defer()
-    if settings.defer then
-      rspamd_logger.warnx(task, 'deferring message')
-      task:set_metric_action('default', 'soft reject')
+  local results = {
+    select = {},
+    format = {},
+  }
+  for k in pairs(settings.pusher_enabled) do
+    local selector = settings.pusher_select[k] or 'default'
+    local selected = results.select[selector]
+    if selected == nil then
+      results.select[selector] = selectors[selector](task)
+      selected = results.select[selector]
     end
-  end
-  local function mail_cb(err, data, conn)
-    local function no_error(merr, mdata, wantcode)
-      wantcode = wantcode or '2'
-      if merr then
-        rspamd_logger.errx(task, 'got error in tcp callback: %s', merr)
-        if conn then
-          conn:close()
-        end
-        maybe_defer()
-        return false
+    if selected then
+      rspamd_logger.debugm(N, task, 'Message selected for processing')
+      local formatter = settings.pusher_format[k]
+      local formatted = results.format[k]
+      if formatted == nil then
+        results.format[formatter] = formatters[formatter](task)
+        formatted = results.format[formatter]
       end
-      if mdata then
-        if type(mdata) ~= 'string' then
-          mdata = tostring(mdata)
-        end
-        if string.sub(mdata, 1, 1) ~= wantcode then
-          rspamd_logger.errx(task, 'got bad smtp response: %s', mdata)
-          if conn then
-            conn:close()
-          end
-          maybe_defer()
-          return false
-        end
+      if formatted then
+        pushers[k](task, formatted)
+      elseif formatted == nil then
+        rspamd_logger.warnx(task, 'Formatter [%s] returned NIL', formatter)
       else
-        rspamd_logger.errx(task, 'no data')
-        if conn then
-          conn:close()
-        end
-        maybe_defer()
-        return false
-      end
-      return true
-    end
-    local function all_done_cb(merr, mdata)
-      if settings.force_action then
-        rspamd_logger.warnx(task, 'forcing action: %s', settings.force_action)
-        task:set_metric_action('default', settings.force_action)
-      end
-      if conn then
-        conn:close()
+        rspamd_logger.debugm(N, task, 'Formatter [%s] returned non-truthy value [%s]', formatter, formatted)
       end
     end
-    local function quit_done_cb(merr, mdata)
-      conn:add_read(all_done_cb, '\r\n')
-    end
-    local function quit_cb(merr, mdata)
-      if no_error(merr, mdata) then
-        conn:add_write(quit_done_cb, 'QUIT\r\n')
-      end
-    end
-    local function pre_quit_cb(merr, mdata)
-      if no_error(merr, '2') then
-        conn:add_read(quit_cb, '\r\n')
-      end
-    end
-    local function data_done_cb(merr, mdata)
-      if no_error(merr, mdata, '3') then
-        local msg = settings.format(task)
-        conn:add_write(pre_quit_cb, msg .. '\r\n.\r\n')
-      end
-    end
-    local function data_cb(merr, mdata)
-      if no_error(merr, '2') then
-        conn:add_read(data_done_cb, '\r\n')
-      end
-    end
-    local function rcpt_done_cb(merr, mdata)
-      if no_error(merr, mdata) then
-        conn:add_write(data_cb, 'DATA\r\n')
-      end
-    end
-    local function rcpt_cb(merr, mdata)
-      if no_error(merr, '2') then
-        conn:add_read(rcpt_done_cb, '\r\n')
-      end
-    end
-    local function from_done_cb(merr, mdata)
-      if no_error(merr, mdata) then
-        conn:add_write(rcpt_cb, 'RCPT TO: <' .. settings.mail_to .. '>\r\n')
-      end
-    end
-    local function from_cb(merr, mdata)
-      if no_error(merr, '2') then
-        conn:add_read(from_done_cb, '\r\n')
-      end
-    end
-    local function hello_done_cb(merr, mdata)
-      if no_error(merr, mdata) then
-        conn:add_write(from_cb, 'MAIL FROM: <' .. settings.mail_from .. '>\r\n')
-      end
-    end
-    local function hello_cb(merr)
-      if no_error(merr, '2') then
-        conn:add_read(hello_done_cb, '\r\n')
-      end
-    end
-    if no_error(err, data) then
-      conn:add_write(hello_cb, 'HELO ' .. settings.helo .. '\r\n')
-    end
-  end
-  local function http_callback(err, code)
-    if err then
-      rspamd_logger.errx(task, 'got error %s in http callback', err)
-      return maybe_defer()
-    end
-    if code ~= 200 then
-      rspamd_logger.errx(task, 'got unexpected http status: %s', code)
-      return maybe_defer()
-    end
-    if settings.force_action then
-      rspamd_logger.warnx(task, 'forcing action: %s', settings.force_action)
-      task:set_metric_action('default', settings.force_action)
-    end
-  end
-  local function redis_set_cb(err)
-    if err then
-      rspamd_logger.errx(task, 'got error %s when publishing record on server %s',
-          err, upstream:get_addr())
-      return maybe_defer()
-    end
-    if settings.force_action then
-      rspamd_logger.warnx(task, 'forcing action: %s', settings.force_action)
-      task:set_metric_action('default', settings.force_action)
-    end
-  end
-  if settings.select then
-    if not settings.select(task) then return end
-    rspamd_logger.debugm(N, task, 'Message selected for processing')
-  end
-  local data = settings.format(task)
-  if not data then
-    rspamd_logger.debugm(N, task, 'Format returned non-truthy value: %1', data)
-    return
-  end
-  if settings.channel then
-    ret,_,upstream = rspamd_redis_make_request(task,
-      redis_params, -- connect params
-      nil, -- hash key
-      true, -- is write
-      redis_set_cb, --callback
-      'PUBLISH', -- command
-      {settings.channel, data} -- arguments
-    )
-    if not ret then
-      rspamd_logger.errx(task, 'error connecting to redis')
-      maybe_defer()
-    end
-  end
-  if settings.url then
-    rspamd_http.request({
-      task=task,
-      url=settings.url,
-      body=data,
-      callback=http_callback,
-      mime_type=settings['mime_type'],
-    })
-  end
-  if (settings.mail_to and settings.smtp) then
-    rspamd_tcp.request({
-      task = task,
-      callback = mail_cb,
-      stop_pattern = '\r\n',
-      host = settings.smtp,
-      port = settings.smtp_port or 25,
-    })
   end
 end