aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/plugins/lua/elastic.lua76
1 files changed, 41 insertions, 35 deletions
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua
index c7f9f392e..aa3702112 100644
--- a/src/plugins/lua/elastic.lua
+++ b/src/plugins/lua/elastic.lua
@@ -20,7 +20,6 @@ local rspamd_http = require "rspamd_http"
local lua_util = require "lua_util"
local util = require "rspamd_util"
local ucl = require "ucl"
-local hash = require "rspamd_cryptobox_hash"
local rspamd_redis = require "lua_redis"
local upstream_list = require "rspamd_upstream_list"
@@ -30,6 +29,7 @@ end
local rows = {}
local nrows = 0
+local failed_sends = 0
local elastic_template
local redis_params
local N = "elastic"
@@ -37,7 +37,7 @@ local E = {}
local connect_prefix = 'http://'
local enabled = true
local settings = {
- limit = 10,
+ limit = 500,
index_pattern = 'rspamd-%Y.%m.%d',
template_file = rspamd_paths['PLUGINSDIR'] .. '/elastic/rspamd_template.json',
kibana_file = rspamd_paths['PLUGINSDIR'] ..'/elastic/kibana.json',
@@ -52,6 +52,7 @@ local settings = {
user = nil,
password = nil,
no_ssl_verify = false,
+ max_fail = 3,
}
local function read_file(path)
@@ -78,34 +79,7 @@ local function elastic_send_data(task)
local push_url = connect_prefix .. ip_addr .. '/'..es_index..'/_bulk'
local bulk_json = table.concat(tbl, "\n")
- local function http_index_data_callback(err, code, body, _)
- -- todo error handling we may store the rows it into redis and send it again late
- lua_util.debugm(N, task, "After create data %1", body)
- if code ~= 200 then
- rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s (%s)",
- push_url, err, code)
- if settings['failover'] then
- local h = hash.create()
- h:update(bulk_json)
- local key = settings['key_prefix'] ..es_index..":".. h:base32():sub(1, 20)
- local data = util.zstd_compress(bulk_json)
- local function redis_set_cb(rerr)
- if rerr ~=nil then
- rspamd_logger.errx(task, 'redis_set_cb received error: %1', rerr)
- end
- end
- rspamd_redis.make_request(task,
- redis_params, -- connect params
- key, -- hash key
- true, -- is write
- redis_set_cb, --callback
- 'SETEX', -- command
- {key, tostring(settings['expire']), data} -- arguments
- )
- end
- end
- end
- rspamd_http.request({
+ local err, response = rspamd_http.request({
url = push_url,
headers = {
['Content-Type'] = 'application/x-ndjson',
@@ -117,11 +91,27 @@ local function elastic_send_data(task)
no_ssl_verify = settings.no_ssl_verify,
user = settings.user,
password = settings.password,
- callback = http_index_data_callback,
timeout = settings.timeout,
})
+ if err then
+ rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s; failed attempts: %s/%s",
+ push_url, err, failed_sends, settings.max_fail)
+ else
+ if response.code ~= 200 then
+ rspamd_logger.infox(task, "cannot push data to elastic backend (%s): wrong http code %s (%s); failed attempts: %s/%s",
+ push_url, err, response.code, failed_sends, settings.max_fail)
+ else
+ lua_util.debugm(N, task, "successfully sent %s (%s bytes) rows to ES",
+ nrows, #bulk_json)
+
+ return true
+ end
+ end
+
+ return false
end
+
local function get_general_metadata(task)
local r = {}
local ip_addr = task:get_ip()
@@ -175,6 +165,7 @@ local function get_general_metadata(task)
else
r.from = 'unknown'
end
+
local syminf = task:get_symbols_all()
r.symbols = syminf
r.asn = {}
@@ -182,6 +173,7 @@ local function get_general_metadata(task)
r.asn.country = pool:get_variable("country") or 'unknown'
r.asn.asn = pool:get_variable("asn") or 0
r.asn.ipnet = pool:get_variable("ipnet") or 'unknown'
+
local function process_header(name)
local hdr = task:get_header_full(name)
if hdr then
@@ -216,9 +208,22 @@ local function elastic_collect(task)
table.insert(rows, row)
nrows = nrows + 1
if nrows > settings['limit'] then
- elastic_send_data(task)
- nrows = 0
- rows = {}
+ lua_util.debugm(N, task, 'send elastic search rows: %s', nrows)
+ if elastic_send_data(task) then
+ nrows = 0
+ rows = {}
+ failed_sends = 0;
+ else
+ failed_sends = failed_sends + 1
+
+ if failed_sends > settings.max_fail then
+ rspamd_logger.errx(task, 'cannot send %s rows to ES %s times, stop trying',
+ nrows, failed_sends)
+ nrows = 0
+ rows = {}
+ failed_sends = 0;
+ end
+ end
end
end
@@ -246,6 +251,7 @@ local function check_elastic_server(cfg, ev_base, _)
for _,plugin in pairs(value['plugins']) do
if plugin['name'] == 'ingest-geoip' then
plugin_found = true
+ lua_util.debugm(N, "ingest-geoip plugin has been found")
end
end
if not plugin_found then
@@ -276,7 +282,7 @@ end
-- import ingest pipeline and kibana dashboard/visualization
local function initial_setup(cfg, ev_base, worker)
- if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end
+ if not worker:is_primary_controller() then return end
local upstream = settings.upstream:get_upstream_round_robin()
local ip_addr = upstream:get_addr():to_string(true)