]> source.dussan.org Git - rspamd.git/commitdiff
[Fix] Various fixes to elastic plugin
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 9 Mar 2018 11:47:49 +0000 (11:47 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 9 Mar 2018 11:47:49 +0000 (11:47 +0000)
src/plugins/lua/elastic.lua

index c5919bf7fbc87535b1a39a806b64655621559965..dfe8ce84c3a9d990248db93b03cc87002cf4def7 100644 (file)
@@ -65,6 +65,8 @@ local function elastic_send_data(task)
     table.insert(tbl, ucl.to_format(value, 'json-compact'))
   end
 
+  table.insert(tbl, '') -- For last \n
+
   local upstream = settings.upstream:get_upstream_round_robin()
   local ip_addr = upstream:get_addr():to_string(true)
 
@@ -110,16 +112,22 @@ end
 local function get_general_metadata(task)
   local r = {}
   local ip_addr = task:get_ip()
-  r.ip = tostring(ip_addr) or 'unknown'
+
   r.webmail = false
-  if ip_addr  then
+
+  if ip_addr  and ip_addr:is_valid() then
     r.is_local = ip_addr:is_local()
     local origin = task:get_header('X-Originating-IP')
     if origin then
-        r.webmail = true
-        r.ip = origin
+      r.webmail = true
+      r.ip = origin
+    else
+      r.ip = tostring(ip_addr)
     end
+  else
+    r.ip = '127.0.0.1'
   end
+
   r.direction = "Inbound"
   r.user = task:get_user() or 'unknown'
   r.qid = task:get_queue_id() or 'unknown'
@@ -176,7 +184,7 @@ end
 local function elastic_collect(task)
   if not enabled then return end
   if rspamd_lua_utils.is_rspamc_or_controller(task) then return end
-  local row = {['rspam_meta'] = get_general_metadata(task),
+  local row = {['rspamd_meta'] = get_general_metadata(task),
     ['@timestamp'] = tostring(util.get_time() * 1000)}
   table.insert(rows, row)
   nrows = nrows + 1
@@ -195,29 +203,34 @@ local function check_elastic_server(cfg, ev_base, _)
   local ip_addr = upstream:get_addr():to_string(true)
 
   local plugins_url = connect_prefix .. ip_addr .. '/_nodes/plugins'
-  local function http_callback(_, _, body, _)
-    local parser = ucl.parser()
-    local res,err = parser:parse_string(body)
-    if not res then
+  local function http_callback(_, err, body, _)
+    if err == 200 then
+      local parser = ucl.parser()
+      local res,err = parser:parse_string(body)
+      if not res then
         rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s',
-          plugins_url, err)
+            plugins_url, err)
         enabled = false;
         return
-    end
-    local obj = parser:get_object()
-    for node,value in pairs(obj['nodes']) do
-      local plugin_found = false
-      for _,plugin in pairs(value['plugins']) do
-        if plugin['name'] == 'ingest-geoip' then
-          plugin_found = true
-        end
       end
-      if not plugin_found then
-        rspamd_logger.infox(rspamd_config,
-          'Unable to find ingest-geoip on %1 node, disabling module', node)
-        enabled = false
-        return
+      local obj = parser:get_object()
+      for node,value in pairs(obj['nodes']) do
+        local plugin_found = false
+        for _,plugin in pairs(value['plugins']) do
+          if plugin['name'] == 'ingest-geoip' then
+            plugin_found = true
+          end
+        end
+        if not plugin_found then
+          rspamd_logger.infox(rspamd_config,
+              'Unable to find ingest-geoip on %1 node, disabling module', node)
+          enabled = false
+          return
+        end
       end
+    else
+      rspamd_logger.errx('cannot get plugins from %s: %s (%s)', plugins_url, err, body)
+      enabled = false
     end
   end
   rspamd_http.request({
@@ -238,25 +251,39 @@ local function initial_setup(cfg, ev_base, worker)
   if enabled then
     -- create ingest pipeline
     local geoip_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip'
-    local function geoip_cb(_, code, _, _)
+    local function geoip_cb(_, code, body, _)
       if code ~= 200 then
-        rspamd_logger.errx('cannot get data from %s: %s', geoip_url, code)
+        rspamd_logger.errx('cannot get data from %s: %s (%s)', geoip_url, code, body)
         enabled = false
       end
     end
+    local template = {
+      description = "Add geoip info for rspamd",
+      processors = {
+        {
+          geoip = {
+            field = "rspamd_meta.ip",
+            target_field = "rspamd_meta.geoip"
+          }
+        }
+      }
+    }
     rspamd_http.request({
       url = geoip_url,
       ev_base = ev_base,
       config = cfg,
       callback = geoip_cb,
-      body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}',
+      headers = {
+        ['Content-Type'] = 'application/json',
+      },
+      body = ucl.to_format(template, 'json-compact'),
       method = 'put',
     })
     -- create template mappings if not exist
-    local template_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip'
-    local function http_template_put_callback(_, code, _, _)
+    local template_url = connect_prefix .. ip_addr ..'/_template/rspamd'
+    local function http_template_put_callback(_, code, body, _)
       if code ~= 200 then
-        rspamd_logger.errx('cannot put template to %s: %s', template_url, code)
+        rspamd_logger.errx('cannot put template to %s: %s (%s)', template_url, code, body)
         enabled = false
       end
     end
@@ -268,6 +295,9 @@ local function initial_setup(cfg, ev_base, worker)
           config = cfg,
           body = elastic_template,
           method = 'put',
+          headers = {
+            ['Content-Type'] = 'application/json',
+          },
           callback = http_template_put_callback,
         })
       end
@@ -301,11 +331,13 @@ local function initial_setup(cfg, ev_base, worker)
                 item["_id"]..'"} }')
             table.insert(tbl, ucl.to_format(item['_source'], 'json-compact'))
           end
+          table.insert(tbl, '') -- For last \n
 
           local kibana_url = connect_prefix .. ip_addr ..'/.kibana/_bulk'
-          local function kibana_template_callback(_, code, _, _)
+          local function kibana_template_callback(_, code, body, _)
             if code ~= 200 then
-              rspamd_logger.errx('cannot put template to %s: %s', kibana_url, code)
+              rspamd_logger.errx('cannot put template to %s: %s (%s)', kibana_url,
+                  code, body)
               enabled = false
             end
           end