]> source.dussan.org Git - rspamd.git/commitdiff
[Fix] Rework elasticsearch plugin
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sun, 18 Feb 2018 16:43:50 +0000 (16:43 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sun, 18 Feb 2018 16:43:50 +0000 (16:43 +0000)
CMakeLists.txt
src/plugins/lua/dynamic_conf.lua
src/plugins/lua/elastic.lua

index dcfc35cbb83c493b88f2d83ff937e5966b5e7bdc..1ba6e767261d81c7c4d064daad709cefd3468425 100644 (file)
@@ -1364,6 +1364,9 @@ IF(INSTALL_WEBUI MATCHES "ON")
        INSTALL(DIRECTORY "interface/" DESTINATION ${WWWDIR} PATTERN ".git" EXCLUDE)
 ENDIF(INSTALL_WEBUI MATCHES "ON")
 
+
+INSTALL(DIRECTORY "contrib/elastic/" DESTINATION "${PLUGINSDIR}/elastic" PATTERN ".git" EXCLUDE)
+
 ADD_CUSTOM_TARGET(dist ${CMAKE_SOURCE_DIR}/dist.sh
        "${CMAKE_BINARY_DIR}/rspamd-${RSPAMD_VERSION}.tar.xz" "${TAR}"
        COMMENT "Create source distribution"
index 569d00cbd01169c97962bd5189f2212d7e984223..a4b7527a52a61573ee1e04e4dba5398a323c9ad7 100644 (file)
@@ -232,7 +232,7 @@ end
 
 local section = rspamd_config:get_all_opt("dynamic_conf")
 if section then
-  redis_params = rspamd_parse_redis_server('dynamic_conf')
+  redis_params = rspamd_redis.parse_redis_server('dynamic_conf')
   if not redis_params then
     rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
     return
index f3bf69ead26ff0345a5e26256f1dfc4dc5284fcc..ae789f0c5b76753398417777b2734ec29ea04530 100644 (file)
@@ -18,31 +18,34 @@ limitations under the License.
 local rspamd_logger = require 'rspamd_logger'
 local rspamd_http = require "rspamd_http"
 local rspamd_lua_utils = require "lua_util"
+local util = require "rspamd_util"
 local ucl = require "ucl"
 local hash = require "rspamd_cryptobox_hash"
+local rspamd_redis = require "lua_redis"
+local upstream_list = require "rspamd_upstream_list"
 
 if confighelp then
   return
 end
 
-local redis_params
-redis_params = rspamd_parse_redis_server('elastic')
-
 local rows = {}
 local nrows = 0
 local elastic_template
 local redis_params
+local N = "elastic"
+local E = {}
+local connect_prefix = 'http://'
+local enabled = true
 local settings = {
   limit = 10,
   index_pattern = 'rspamd-%Y.%m.%d',
-  server = 'localhost:9200',
   template_file = '/etc/rspamd/rspamd_template.json',
   kibana_file = '/etc/rspamd/kibana.json',
   key_prefix = 'elastic-',
   expire = 3600,
-  debug = false,
   failover = false,
   import_kibana = false,
+  use_https = false,
 }
 
 local function read_file(path)
@@ -52,30 +55,36 @@ local function read_file(path)
     file:close()
     return content
 end
+
 local function elastic_send_data(task)
   local es_index = os.date(settings['index_pattern'])
-  local bulk_json = ""
-  for key,value in pairs(rows) do
-    bulk_json = bulk_json..'{ "index" : { "_index" : "'..es_index..'", "_type" : "logs" ,"pipeline": "rspamd-geoip"} }'.."\n"
-    bulk_json = bulk_json..ucl.to_format(value, 'json-compact').."\n"
+  local tbl = {}
+  for _,value in pairs(rows) do
+    table.insert(tbl, '{ "index" : { "_index" : "'..es_index..
+        '", "_type" : "logs" ,"pipeline": "rspamd-geoip"} }')
+    table.insert(tbl, ucl.to_format(value, 'json-compact'))
   end
-  local function http_index_data_callback(err, code, body, headers)
-    -- todo error handling we may store the rows it into redis and send it again later
-    if settings['debug'] then
-      rspamd_logger.infox(task, "After create data %1",body)
-    end
-    if code ~= 200 or err_message then
+
+  local upstream = settings.upstream:get_upstream_round_robin()
+  local ip_addr = upstream:get_addr():to_string(true)
+
+  local push_url = connect_prefix .. ip_addr .. '/'..es_index..'/_bulk'
+  local bulk_json = table.concat(tbl, "\n")
+  local function http_index_data_callback(_, code, body, _)
+    -- todo error handling we may store the rows it into redis and send it again late
+    rspamd_logger.debugm(N, task, "After create data %1",body)
+    if code ~= 200 then
       if settings['failover'] then
         local h = hash.create()
         h:update(bulk_json)
         local key = settings['key_prefix'] ..es_index..":".. h:base32():sub(1, 20)
-        local data = rspamd_util.zstd_compress(bulk_json)
+        local data = util.zstd_compress(bulk_json)
         local function redis_set_cb(err)
           if err ~=nil then
             rspamd_logger.errx(task, 'redis_set_cb received error: %1', err)
           end
         end
-        rspamd_redis_make_request(task,
+        rspamd_redis.make_request(task,
           redis_params, -- connect params
           key, -- hash key
           true, -- is write
@@ -87,7 +96,7 @@ local function elastic_send_data(task)
     end
   end
   rspamd_http.request({
-    url = 'http://'..settings['server']..'/'..es_index..'/_bulk',
+    url = push_url,
     headers = {
       ['Content-Type'] = 'application/x-ndjson',
     },
@@ -167,7 +176,8 @@ end
 local function elastic_collect(task)
   if not enabled then return end
   if rspamd_lua_utils.is_rspamc_or_controller(task) then return end
-  local row = {['rspam_meta'] = get_general_metadata(task), ['@timestamp'] = os.time(os.date("*t")).."000"}
+  local row = {['rspam_meta'] = get_general_metadata(task),
+    ['@timestamp'] = tostring(util.get_time()).."000"}
   table.insert(rows, row)
   nrows = nrows + 1
   if nrows > settings['limit'] then
@@ -176,131 +186,139 @@ local function elastic_collect(task)
     rows = {}
   end
 end
+
+
 local opts = rspamd_config:get_all_opt('elastic')
-local enabled = true;
 
-local function check_elastic_server(ev_base)
-  local function http_callback(err, code, body, headers)
+local function check_elastic_server(cfg, ev_base, _)
+  local upstream = settings.upstream:get_upstream_round_robin()
+  local ip_addr = upstream:get_addr():to_string(true)
+
+  local plugins_url = connect_prefix .. ip_addr .. '/_nodes/plugins'
+  local function http_callback(_, _, body, _)
     local parser = ucl.parser()
     local res,err = parser:parse_string(body)
     if not res then
-        rspamd_logger.infox(rspamd_config, 'failed to query elasticsearch server %1, disabling module',settings['server'])
+        rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s',
+          plugins_url, err)
         enabled = false;
         return
     end
     local obj = parser:get_object()
     for node,value in pairs(obj['nodes']) do
       local plugin_found = false
-      for i,plugin in pairs(value['plugins']) do
+      for _,plugin in pairs(value['plugins']) do
         if plugin['name'] == 'ingest-geoip' then
           plugin_found = true
         end
       end
       if not plugin_found then
-        rspamd_logger.infox(rspamd_config, 'Unable to find ingest-geoip on %1 node, disabling module',node)
+        rspamd_logger.infox(rspamd_config,
+          'Unable to find ingest-geoip on %1 node, disabling module', node)
         enabled = false
         return
       end
     end
   end
   rspamd_http.request({
-    url = 'http://'..settings['server']..'/_nodes/plugins',
+    url = plugins_url,
     ev_base = ev_base,
+    config = cfg,
     method = 'get',
     callback = http_callback
   })
-  if enabled then
-    local function http_ingest_callback(err, code, body, headers)
-      if code ~= 200 or err_message then
-        -- pipeline not exist
-      end
-    end
-    rspamd_http.request({
-      url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
-      ev_base = ev_base,
-      method = 'get',
-      callback = http_ingest_callback
-    })
-    -- lets try to create ingest pipeline if not exist
-    rspamd_http.request({
-      url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
-      task = task,
-      body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}',
-      method = 'put',
-    })
-  end
-  local function http_template_create_callback(err, code, body, headers)
-  end
-  local function http_template_exist_callback(err, code, body, headers)
-    if code ~= 200 or err_message then
-      rspamd_http.request({
-        url = 'http://'..settings['server']..'/_template/rspamd',
-        task = task,
-        body = elastic_template,
-        method = 'put',
-        callback = http_template_create_callback
-      })
-    end
-  end
-  rspamd_http.request({
-    url = 'http://'..settings['server']..'/_template/rspamd',
-    task = task,
-    method = 'head',
-    callback = http_template_exist_callback
-  })
 end
+
 -- import ingest pipeline and kibana dashboard/visualization
-local function initial_setup(worker)
-  if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end
+local function initial_setup(cfg, ev_base, worker)
+  if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end
+
+  local upstream = settings.upstream:get_upstream_round_robin()
+  local ip_addr = upstream:get_addr():to_string(true)
   if enabled then
     -- create ingest pipeline
+    local geoip_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip'
+    local function geoip_cb(_, code, _, _)
+      if code ~= 200 then
+        rspamd_logger.errx('cannot get data from %s: %s', geoip_url, code)
+        enabled = false
+      end
+    end
     rspamd_http.request({
-      url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
-      task = task,
+      url = geoip_url,
+      ev_base = ev_base,
+      config = cfg,
+      callback = geoip_cb,
       body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}',
       method = 'put',
     })
     -- create template mappings if not exist
-    local function http_template_exist_callback(err, code, body, headers)
-      if code ~= 200 or err_message then
+    local template_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip'
+    local function http_template_put_callback(_, code, _, _)
+      if code ~= 200 then
+        rspamd_logger.errx('cannot put template to %s: %s', template_url, code)
+        enabled = false
+      end
+    end
+    local function http_template_exist_callback(_, code, _, _)
+      if code ~= 200 then
         rspamd_http.request({
-          url = 'http://'..settings['server']..'/_template/rspamd',
-          task = task,
+          url = template_url,
+          ev_base = ev_base,
+          config = cfg,
           body = elastic_template,
           method = 'put',
+          callback = http_template_put_callback,
         })
       end
     end
+
     rspamd_http.request({
-      url = 'http://'..settings['server']..'/_template/rspamd',
-      task = task,
+      url = template_url,
+      ev_base = ev_base,
+      config = cfg,
       method = 'head',
       callback = http_template_exist_callback
     })
     -- add kibana dashboard and visualizations
-    local kibana_mappings = read_file(settings['kibana_file']);
     if enabled and settings['import_kibana'] then
-        local kibana_mappings = read_file(settings['kibana_file']);
+        local kibana_mappings = read_file(settings['kibana_file'])
         if kibana_mappings then
           local parser = ucl.parser()
           local res,err = parser:parse_string(kibana_mappings)
           if not res then
+            rspamd_logger.infox(rspamd_config, 'kibana template cannot be parsed: %s',
+              err)
+            enabled = false
+
             return
           end
           local obj = parser:get_object()
-          local bulk_json = ""
+          local tbl = {}
           for _,item in ipairs(obj) do
-            bulk_json = bulk_json..'{ "index" : { "_index" : ".kibana", "_type" : "'..item["_type"]..'" ,"_id": "'..item["_id"]..'"} }'.."\n"
-            bulk_json = bulk_json..ucl.to_format(item['_source'], 'json-compact').."\n"
+            table.insert(tbl, '{ "index" : { "_index" : ".kibana", "_type" : "'..
+                item["_type"]..'" ,"_id": "'..
+                item["_id"]..'"} }')
+            table.insert(tbl, ucl.to_format(item['_source'], 'json-compact'))
+          end
+
+          local kibana_url = connect_prefix .. ip_addr ..'/.kibana/_bulk'
+          local function kibana_template_callback(_, code, _, _)
+            if code ~= 200 then
+              rspamd_logger.errx('cannot put template to %s: %s', kibana_url, code)
+              enabled = false
+            end
           end
           rspamd_http.request({
-            url = 'http://'..settings['server']..'/.kibana/_bulk',
+            url = kibana_url,
+            ev_base = ev_base,
+            config = cfg,
             headers = {
               ['Content-Type'] = 'application/x-ndjson',
             },
-            body = bulk_json,
-            task = task,
-            method = 'post'
+            body = table.concat(tbl, "\n"),
+            method = 'post',
+            callback = kibana_template_callback
           })
         else
           rspamd_logger.infox(rspamd_config, 'kibana templatefile not found')
@@ -308,38 +326,56 @@ local function initial_setup(worker)
     end
   end
 end
-rspamd_config:add_on_load(function(cfg, ev_base,worker)
-  if opts then
-      for k,v in pairs(opts) do
-        settings[k] = v
-      end
-      if not settings['server'] then
-        rspamd_logger.infox(rspamd_config, 'no elastic servers are specified, disabling module')
-        enabled = false
-        return
-      end
-      if not settings['template_file'] then
-          rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module')
-          enabled = false
-          return
-      end
+
+redis_params = rspamd_redis.parse_redis_server('elastic')
+
+if redis_params and opts then
+  for k,v in pairs(opts) do
+    settings[k] = v
   end
-  elastic_template = read_file(settings['template_file']);
-  if not elastic_template then
-        rspamd_logger.infox(rspamd_config, 'elastic unable to read mappings, disabling module')
-        enabled = false
-        return
+
+  if not settings['server'] and not settings['servers'] then
+    rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
+    rspamd_lua_utils.disable_module(N, "config")
+  else
+    if settings.use_https then
+      connect_prefix = 'https://'
+    end
+
+    settings.upstream = upstream_list.create(rspamd_config,
+      settings['server'] or settings['servers'], 9200)
+
+    if not settings.upstream then
+      rspamd_logger.errx('cannot parse elastic address: %s',
+        settings['server'] or settings['servers'])
+      rspamd_lua_utils.disable_module(N, "config")
+      return
+    end
+    if not settings['template_file'] then
+      rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module')
+      rspamd_lua_utils.disable_module(N, "config")
+      return
+    end
+
+    elastic_template = read_file(settings['template_file']);
+    if not elastic_template then
+      rspamd_logger.infox(rspamd_config, 'elastic unable to read %s, disabling module',
+        settings['template_file'])
+      rspamd_lua_utils.disable_module(N, "config")
+      return
+    end
+
+    rspamd_config:register_symbol({
+      name = 'ELASTIC_COLLECT',
+      type = 'idempotent',
+      callback = elastic_collect,
+      priority = 10
+    })
+
+    rspamd_config:add_on_load(function(cfg, ev_base,worker)
+      check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements
+      initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations
+    end)
   end
-  redis_params = rspamd_parse_redis_server('elastic')
-  check_elastic_server(ev_base) -- check for elasticsearch requirements
-  initial_setup(worker) -- import mappings pipeline and visualizations
-end)
 
-if enabled == true then
-  rspamd_config:register_symbol({
-    name = 'ELASTIC_COLLECT',
-    type = 'postfilter',
-    callback = elastic_collect,
-    priority = 10
-  })
 end