|
|
@@ -78,10 +78,12 @@ local function elastic_send_data(task) |
|
|
|
|
|
|
|
local push_url = connect_prefix .. ip_addr .. '/'..es_index..'/_bulk' |
|
|
|
local bulk_json = table.concat(tbl, "\n") |
|
|
|
local function http_index_data_callback(_, code, body, _) |
|
|
|
local function http_index_data_callback(err, code, body, _) |
|
|
|
-- todo error handling we may store the rows it into redis and send it again late |
|
|
|
rspamd_logger.debugm(N, task, "After create data %1", body) |
|
|
|
if code ~= 200 then |
|
|
|
rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s (%s)", |
|
|
|
push_url, err, code) |
|
|
|
if settings['failover'] then |
|
|
|
local h = hash.create() |
|
|
|
h:update(bulk_json) |
|
|
@@ -226,8 +228,8 @@ local function check_elastic_server(cfg, ev_base, _) |
|
|
|
local ip_addr = upstream:get_addr():to_string(true) |
|
|
|
|
|
|
|
local plugins_url = connect_prefix .. ip_addr .. '/_nodes/plugins' |
|
|
|
local function http_callback(_, err, body, _) |
|
|
|
if err == 200 then |
|
|
|
local function http_callback(err, code, body, _) |
|
|
|
if code == 200 then |
|
|
|
local parser = ucl.parser() |
|
|
|
local res,ucl_err = parser:parse_string(body) |
|
|
|
if not res then |
|
|
@@ -252,7 +254,8 @@ local function check_elastic_server(cfg, ev_base, _) |
|
|
|
end |
|
|
|
end |
|
|
|
else |
|
|
|
rspamd_logger.errx('cannot get plugins from %s: %s (%s)', plugins_url, err, body) |
|
|
|
rspamd_logger.errx('cannot get plugins from %s: %s(%s) (%s)', plugins_url, |
|
|
|
err, code, body) |
|
|
|
enabled = false |
|
|
|
end |
|
|
|
end |
|
|
@@ -300,10 +303,10 @@ local function initial_setup(cfg, ev_base, worker) |
|
|
|
table.insert(tbl, '') -- For last \n |
|
|
|
|
|
|
|
local kibana_url = connect_prefix .. ip_addr ..'/.kibana/_bulk' |
|
|
|
local function kibana_template_callback(_, code, body, _) |
|
|
|
local function kibana_template_callback(err, code, body, _) |
|
|
|
if code ~= 200 then |
|
|
|
rspamd_logger.errx('cannot put template to %s: %s (%s)', kibana_url, |
|
|
|
code, body) |
|
|
|
rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)', kibana_url, |
|
|
|
err, code, body) |
|
|
|
enabled = false |
|
|
|
else |
|
|
|
rspamd_logger.debugm(N, 'pushed kibana template: %s', body) |
|
|
@@ -335,9 +338,10 @@ local function initial_setup(cfg, ev_base, worker) |
|
|
|
if enabled then |
|
|
|
-- create ingest pipeline |
|
|
|
local geoip_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip' |
|
|
|
local function geoip_cb(_, code, body, _) |
|
|
|
local function geoip_cb(err, code, body, _) |
|
|
|
if code ~= 200 then |
|
|
|
rspamd_logger.errx('cannot get data from %s: %s (%s)', geoip_url, code, body) |
|
|
|
rspamd_logger.errx('cannot get data from %s: %s(%s) (%s)', |
|
|
|
geoip_url, err, code, body) |
|
|
|
enabled = false |
|
|
|
end |
|
|
|
end |
|
|
@@ -370,9 +374,10 @@ local function initial_setup(cfg, ev_base, worker) |
|
|
|
}) |
|
|
|
-- create template mappings if not exist |
|
|
|
local template_url = connect_prefix .. ip_addr ..'/_template/rspamd' |
|
|
|
local function http_template_put_callback(_, code, body, _) |
|
|
|
local function http_template_put_callback(err, code, body, _) |
|
|
|
if code ~= 200 then |
|
|
|
rspamd_logger.errx('cannot put template to %s: %s (%s)', template_url, code, body) |
|
|
|
rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)', |
|
|
|
template_url, err, code, body) |
|
|
|
enabled = false |
|
|
|
else |
|
|
|
rspamd_logger.debugm(N, 'pushed rspamd template: %s', body) |