Browse Source

[Fix] Various fixes to elastic plugin

tags/1.7.0
Vsevolod Stakhov 6 years ago
parent
commit
62c8e12af8
1 changed files with 63 additions and 31 deletions
  1. 63
    31
      src/plugins/lua/elastic.lua

+ 63
- 31
src/plugins/lua/elastic.lua View File

@@ -65,6 +65,8 @@ local function elastic_send_data(task)
table.insert(tbl, ucl.to_format(value, 'json-compact'))
end

table.insert(tbl, '') -- For last \n

local upstream = settings.upstream:get_upstream_round_robin()
local ip_addr = upstream:get_addr():to_string(true)

@@ -110,16 +112,22 @@ end
local function get_general_metadata(task)
local r = {}
local ip_addr = task:get_ip()
r.ip = tostring(ip_addr) or 'unknown'
r.webmail = false
if ip_addr then

if ip_addr and ip_addr:is_valid() then
r.is_local = ip_addr:is_local()
local origin = task:get_header('X-Originating-IP')
if origin then
r.webmail = true
r.ip = origin
r.webmail = true
r.ip = origin
else
r.ip = tostring(ip_addr)
end
else
r.ip = '127.0.0.1'
end

r.direction = "Inbound"
r.user = task:get_user() or 'unknown'
r.qid = task:get_queue_id() or 'unknown'
@@ -176,7 +184,7 @@ end
local function elastic_collect(task)
if not enabled then return end
if rspamd_lua_utils.is_rspamc_or_controller(task) then return end
local row = {['rspam_meta'] = get_general_metadata(task),
local row = {['rspamd_meta'] = get_general_metadata(task),
['@timestamp'] = tostring(util.get_time() * 1000)}
table.insert(rows, row)
nrows = nrows + 1
@@ -195,29 +203,34 @@ local function check_elastic_server(cfg, ev_base, _)
local ip_addr = upstream:get_addr():to_string(true)

local plugins_url = connect_prefix .. ip_addr .. '/_nodes/plugins'
local function http_callback(_, _, body, _)
local parser = ucl.parser()
local res,err = parser:parse_string(body)
if not res then
local function http_callback(_, err, body, _)
if err == 200 then
local parser = ucl.parser()
local res,err = parser:parse_string(body)
if not res then
rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s',
plugins_url, err)
plugins_url, err)
enabled = false;
return
end
local obj = parser:get_object()
for node,value in pairs(obj['nodes']) do
local plugin_found = false
for _,plugin in pairs(value['plugins']) do
if plugin['name'] == 'ingest-geoip' then
plugin_found = true
end
end
if not plugin_found then
rspamd_logger.infox(rspamd_config,
'Unable to find ingest-geoip on %1 node, disabling module', node)
enabled = false
return
local obj = parser:get_object()
for node,value in pairs(obj['nodes']) do
local plugin_found = false
for _,plugin in pairs(value['plugins']) do
if plugin['name'] == 'ingest-geoip' then
plugin_found = true
end
end
if not plugin_found then
rspamd_logger.infox(rspamd_config,
'Unable to find ingest-geoip on %1 node, disabling module', node)
enabled = false
return
end
end
else
rspamd_logger.errx('cannot get plugins from %s: %s (%s)', plugins_url, err, body)
enabled = false
end
end
rspamd_http.request({
@@ -238,25 +251,39 @@ local function initial_setup(cfg, ev_base, worker)
if enabled then
-- create ingest pipeline
local geoip_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip'
local function geoip_cb(_, code, _, _)
local function geoip_cb(_, code, body, _)
if code ~= 200 then
rspamd_logger.errx('cannot get data from %s: %s', geoip_url, code)
rspamd_logger.errx('cannot get data from %s: %s (%s)', geoip_url, code, body)
enabled = false
end
end
local template = {
description = "Add geoip info for rspamd",
processors = {
{
geoip = {
field = "rspamd_meta.ip",
target_field = "rspamd_meta.geoip"
}
}
}
}
rspamd_http.request({
url = geoip_url,
ev_base = ev_base,
config = cfg,
callback = geoip_cb,
body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}',
headers = {
['Content-Type'] = 'application/json',
},
body = ucl.to_format(template, 'json-compact'),
method = 'put',
})
-- create template mappings if not exist
local template_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip'
local function http_template_put_callback(_, code, _, _)
local template_url = connect_prefix .. ip_addr ..'/_template/rspamd'
local function http_template_put_callback(_, code, body, _)
if code ~= 200 then
rspamd_logger.errx('cannot put template to %s: %s', template_url, code)
rspamd_logger.errx('cannot put template to %s: %s (%s)', template_url, code, body)
enabled = false
end
end
@@ -268,6 +295,9 @@ local function initial_setup(cfg, ev_base, worker)
config = cfg,
body = elastic_template,
method = 'put',
headers = {
['Content-Type'] = 'application/json',
},
callback = http_template_put_callback,
})
end
@@ -301,11 +331,13 @@ local function initial_setup(cfg, ev_base, worker)
item["_id"]..'"} }')
table.insert(tbl, ucl.to_format(item['_source'], 'json-compact'))
end
table.insert(tbl, '') -- For last \n

local kibana_url = connect_prefix .. ip_addr ..'/.kibana/_bulk'
local function kibana_template_callback(_, code, _, _)
local function kibana_template_callback(_, code, body, _)
if code ~= 200 then
rspamd_logger.errx('cannot put template to %s: %s', kibana_url, code)
rspamd_logger.errx('cannot put template to %s: %s (%s)', kibana_url,
code, body)
enabled = false
end
end

Loading…
Cancel
Save