aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/lua/elastic.lua
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-03-09 11:47:49 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-03-09 11:47:49 +0000
commit62c8e12af8299e5a97ea371c8425a8e4a0bec40e (patch)
treec01c9c5df65bb0f6f8eda191f13d90c16c001494 /src/plugins/lua/elastic.lua
parent028b3714ba75df76a489aa1347f9b47c59e6f692 (diff)
downloadrspamd-62c8e12af8299e5a97ea371c8425a8e4a0bec40e.tar.gz
rspamd-62c8e12af8299e5a97ea371c8425a8e4a0bec40e.zip
[Fix] Various fixes to elastic plugin
Diffstat (limited to 'src/plugins/lua/elastic.lua')
-rw-r--r--src/plugins/lua/elastic.lua94
1 files changed, 63 insertions, 31 deletions
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua
index c5919bf7f..dfe8ce84c 100644
--- a/src/plugins/lua/elastic.lua
+++ b/src/plugins/lua/elastic.lua
@@ -65,6 +65,8 @@ local function elastic_send_data(task)
table.insert(tbl, ucl.to_format(value, 'json-compact'))
end
+ table.insert(tbl, '') -- For last \n
+
local upstream = settings.upstream:get_upstream_round_robin()
local ip_addr = upstream:get_addr():to_string(true)
@@ -110,16 +112,22 @@ end
local function get_general_metadata(task)
local r = {}
local ip_addr = task:get_ip()
- r.ip = tostring(ip_addr) or 'unknown'
+
r.webmail = false
- if ip_addr then
+
+ if ip_addr and ip_addr:is_valid() then
r.is_local = ip_addr:is_local()
local origin = task:get_header('X-Originating-IP')
if origin then
- r.webmail = true
- r.ip = origin
+ r.webmail = true
+ r.ip = origin
+ else
+ r.ip = tostring(ip_addr)
end
+ else
+ r.ip = '127.0.0.1'
end
+
r.direction = "Inbound"
r.user = task:get_user() or 'unknown'
r.qid = task:get_queue_id() or 'unknown'
@@ -176,7 +184,7 @@ end
local function elastic_collect(task)
if not enabled then return end
if rspamd_lua_utils.is_rspamc_or_controller(task) then return end
- local row = {['rspam_meta'] = get_general_metadata(task),
+ local row = {['rspamd_meta'] = get_general_metadata(task),
['@timestamp'] = tostring(util.get_time() * 1000)}
table.insert(rows, row)
nrows = nrows + 1
@@ -195,29 +203,34 @@ local function check_elastic_server(cfg, ev_base, _)
local ip_addr = upstream:get_addr():to_string(true)
local plugins_url = connect_prefix .. ip_addr .. '/_nodes/plugins'
- local function http_callback(_, _, body, _)
- local parser = ucl.parser()
- local res,err = parser:parse_string(body)
- if not res then
+ local function http_callback(_, err, body, _)
+ if err == 200 then
+ local parser = ucl.parser()
+ local res,err = parser:parse_string(body)
+ if not res then
rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s',
- plugins_url, err)
+ plugins_url, err)
enabled = false;
return
- end
- local obj = parser:get_object()
- for node,value in pairs(obj['nodes']) do
- local plugin_found = false
- for _,plugin in pairs(value['plugins']) do
- if plugin['name'] == 'ingest-geoip' then
- plugin_found = true
- end
end
- if not plugin_found then
- rspamd_logger.infox(rspamd_config,
- 'Unable to find ingest-geoip on %1 node, disabling module', node)
- enabled = false
- return
+ local obj = parser:get_object()
+ for node,value in pairs(obj['nodes']) do
+ local plugin_found = false
+ for _,plugin in pairs(value['plugins']) do
+ if plugin['name'] == 'ingest-geoip' then
+ plugin_found = true
+ end
+ end
+ if not plugin_found then
+ rspamd_logger.infox(rspamd_config,
+ 'Unable to find ingest-geoip on %1 node, disabling module', node)
+ enabled = false
+ return
+ end
end
+ else
+ rspamd_logger.errx('cannot get plugins from %s: %s (%s)', plugins_url, err, body)
+ enabled = false
end
end
rspamd_http.request({
@@ -238,25 +251,39 @@ local function initial_setup(cfg, ev_base, worker)
if enabled then
-- create ingest pipeline
local geoip_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip'
- local function geoip_cb(_, code, _, _)
+ local function geoip_cb(_, code, body, _)
if code ~= 200 then
- rspamd_logger.errx('cannot get data from %s: %s', geoip_url, code)
+ rspamd_logger.errx('cannot get data from %s: %s (%s)', geoip_url, code, body)
enabled = false
end
end
+ local template = {
+ description = "Add geoip info for rspamd",
+ processors = {
+ {
+ geoip = {
+ field = "rspamd_meta.ip",
+ target_field = "rspamd_meta.geoip"
+ }
+ }
+ }
+ }
rspamd_http.request({
url = geoip_url,
ev_base = ev_base,
config = cfg,
callback = geoip_cb,
- body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}',
+ headers = {
+ ['Content-Type'] = 'application/json',
+ },
+ body = ucl.to_format(template, 'json-compact'),
method = 'put',
})
-- create template mappings if not exist
- local template_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip'
- local function http_template_put_callback(_, code, _, _)
+ local template_url = connect_prefix .. ip_addr ..'/_template/rspamd'
+ local function http_template_put_callback(_, code, body, _)
if code ~= 200 then
- rspamd_logger.errx('cannot put template to %s: %s', template_url, code)
+ rspamd_logger.errx('cannot put template to %s: %s (%s)', template_url, code, body)
enabled = false
end
end
@@ -268,6 +295,9 @@ local function initial_setup(cfg, ev_base, worker)
config = cfg,
body = elastic_template,
method = 'put',
+ headers = {
+ ['Content-Type'] = 'application/json',
+ },
callback = http_template_put_callback,
})
end
@@ -301,11 +331,13 @@ local function initial_setup(cfg, ev_base, worker)
item["_id"]..'"} }')
table.insert(tbl, ucl.to_format(item['_source'], 'json-compact'))
end
+ table.insert(tbl, '') -- For last \n
local kibana_url = connect_prefix .. ip_addr ..'/.kibana/_bulk'
- local function kibana_template_callback(_, code, _, _)
+ local function kibana_template_callback(_, code, body, _)
if code ~= 200 then
- rspamd_logger.errx('cannot put template to %s: %s', kibana_url, code)
+ rspamd_logger.errx('cannot put template to %s: %s (%s)', kibana_url,
+ code, body)
enabled = false
end
end