From 62c8e12af8299e5a97ea371c8425a8e4a0bec40e Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Fri, 9 Mar 2018 11:47:49 +0000 Subject: [PATCH] [Fix] Various fixes to elastic plugin --- src/plugins/lua/elastic.lua | 94 +++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 31 deletions(-) diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index c5919bf7f..dfe8ce84c 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -65,6 +65,8 @@ local function elastic_send_data(task) table.insert(tbl, ucl.to_format(value, 'json-compact')) end + table.insert(tbl, '') -- For last \n + local upstream = settings.upstream:get_upstream_round_robin() local ip_addr = upstream:get_addr():to_string(true) @@ -110,16 +112,22 @@ end local function get_general_metadata(task) local r = {} local ip_addr = task:get_ip() - r.ip = tostring(ip_addr) or 'unknown' + r.webmail = false - if ip_addr then + + if ip_addr and ip_addr:is_valid() then r.is_local = ip_addr:is_local() local origin = task:get_header('X-Originating-IP') if origin then - r.webmail = true - r.ip = origin + r.webmail = true + r.ip = origin + else + r.ip = tostring(ip_addr) end + else + r.ip = '127.0.0.1' end + r.direction = "Inbound" r.user = task:get_user() or 'unknown' r.qid = task:get_queue_id() or 'unknown' @@ -176,7 +184,7 @@ end local function elastic_collect(task) if not enabled then return end if rspamd_lua_utils.is_rspamc_or_controller(task) then return end - local row = {['rspam_meta'] = get_general_metadata(task), + local row = {['rspamd_meta'] = get_general_metadata(task), ['@timestamp'] = tostring(util.get_time() * 1000)} table.insert(rows, row) nrows = nrows + 1 @@ -195,29 +203,34 @@ local function check_elastic_server(cfg, ev_base, _) local ip_addr = upstream:get_addr():to_string(true) local plugins_url = connect_prefix .. ip_addr .. '/_nodes/plugins' - local function http_callback(_, _, body, _) - local parser = ucl.parser() - local res,err = parser:parse_string(body) - if not res then + local function http_callback(_, err, body, _) + if err == 200 then + local parser = ucl.parser() + local res,err = parser:parse_string(body) + if not res then rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s', - plugins_url, err) + plugins_url, err) enabled = false; return - end - local obj = parser:get_object() - for node,value in pairs(obj['nodes']) do - local plugin_found = false - for _,plugin in pairs(value['plugins']) do - if plugin['name'] == 'ingest-geoip' then - plugin_found = true - end end - if not plugin_found then - rspamd_logger.infox(rspamd_config, - 'Unable to find ingest-geoip on %1 node, disabling module', node) - enabled = false - return + local obj = parser:get_object() + for node,value in pairs(obj['nodes']) do + local plugin_found = false + for _,plugin in pairs(value['plugins']) do + if plugin['name'] == 'ingest-geoip' then + plugin_found = true + end + end + if not plugin_found then + rspamd_logger.infox(rspamd_config, + 'Unable to find ingest-geoip on %1 node, disabling module', node) + enabled = false + return + end end + else + rspamd_logger.errx('cannot get plugins from %s: %s (%s)', plugins_url, err, body) + enabled = false end end rspamd_http.request({ @@ -238,25 +251,39 @@ local function initial_setup(cfg, ev_base, worker) if enabled then -- create ingest pipeline local geoip_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip' - local function geoip_cb(_, code, _, _) + local function geoip_cb(_, code, body, _) if code ~= 200 then - rspamd_logger.errx('cannot get data from %s: %s', geoip_url, code) + rspamd_logger.errx('cannot get data from %s: %s (%s)', geoip_url, code, body) enabled = false end end + local template = { + description = "Add geoip info for rspamd", + processors = { + { + geoip = { + field = "rspamd_meta.ip", + target_field = "rspamd_meta.geoip" + } + } + } + } rspamd_http.request({ url = geoip_url, ev_base = ev_base, config = cfg, callback = geoip_cb, - body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}', + headers = { + ['Content-Type'] = 'application/json', + }, + body = ucl.to_format(template, 'json-compact'), method = 'put', }) -- create template mappings if not exist - local template_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip' - local function http_template_put_callback(_, code, _, _) + local template_url = connect_prefix .. ip_addr ..'/_template/rspamd' + local function http_template_put_callback(_, code, body, _) if code ~= 200 then - rspamd_logger.errx('cannot put template to %s: %s', template_url, code) + rspamd_logger.errx('cannot put template to %s: %s (%s)', template_url, code, body) enabled = false end end @@ -268,6 +295,9 @@ local function initial_setup(cfg, ev_base, worker) config = cfg, body = elastic_template, method = 'put', + headers = { + ['Content-Type'] = 'application/json', + }, callback = http_template_put_callback, }) end @@ -301,11 +331,13 @@ local function initial_setup(cfg, ev_base, worker) item["_id"]..'"} }') table.insert(tbl, ucl.to_format(item['_source'], 'json-compact')) end + table.insert(tbl, '') -- For last \n local kibana_url = connect_prefix .. ip_addr ..'/.kibana/_bulk' - local function kibana_template_callback(_, code, _, _) + local function kibana_template_callback(_, code, body, _) if code ~= 200 then - rspamd_logger.errx('cannot put template to %s: %s', kibana_url, code) + rspamd_logger.errx('cannot put template to %s: %s (%s)', kibana_url, + code, body) enabled = false end end -- 2.39.5