diff options
-rw-r--r-- | src/plugins/lua/elastic.lua | 76 |
1 files changed, 41 insertions, 35 deletions
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index c7f9f392e..aa3702112 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -20,7 +20,6 @@ local rspamd_http = require "rspamd_http" local lua_util = require "lua_util" local util = require "rspamd_util" local ucl = require "ucl" -local hash = require "rspamd_cryptobox_hash" local rspamd_redis = require "lua_redis" local upstream_list = require "rspamd_upstream_list" @@ -30,6 +29,7 @@ end local rows = {} local nrows = 0 +local failed_sends = 0 local elastic_template local redis_params local N = "elastic" @@ -37,7 +37,7 @@ local E = {} local connect_prefix = 'http://' local enabled = true local settings = { - limit = 10, + limit = 500, index_pattern = 'rspamd-%Y.%m.%d', template_file = rspamd_paths['PLUGINSDIR'] .. '/elastic/rspamd_template.json', kibana_file = rspamd_paths['PLUGINSDIR'] ..'/elastic/kibana.json', @@ -52,6 +52,7 @@ local settings = { user = nil, password = nil, no_ssl_verify = false, + max_fail = 3, } local function read_file(path) @@ -78,34 +79,7 @@ local function elastic_send_data(task) local push_url = connect_prefix .. ip_addr .. '/'..es_index..'/_bulk' local bulk_json = table.concat(tbl, "\n") - local function http_index_data_callback(err, code, body, _) - -- todo error handling we may store the rows it into redis and send it again late - lua_util.debugm(N, task, "After create data %1", body) - if code ~= 200 then - rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s (%s)", - push_url, err, code) - if settings['failover'] then - local h = hash.create() - h:update(bulk_json) - local key = settings['key_prefix'] ..es_index..":".. h:base32():sub(1, 20) - local data = util.zstd_compress(bulk_json) - local function redis_set_cb(rerr) - if rerr ~=nil then - rspamd_logger.errx(task, 'redis_set_cb received error: %1', rerr) - end - end - rspamd_redis.make_request(task, - redis_params, -- connect params - key, -- hash key - true, -- is write - redis_set_cb, --callback - 'SETEX', -- command - {key, tostring(settings['expire']), data} -- arguments - ) - end - end - end - rspamd_http.request({ + local err, response = rspamd_http.request({ url = push_url, headers = { ['Content-Type'] = 'application/x-ndjson', @@ -117,11 +91,27 @@ local function elastic_send_data(task) no_ssl_verify = settings.no_ssl_verify, user = settings.user, password = settings.password, - callback = http_index_data_callback, timeout = settings.timeout, }) + if err then + rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s; failed attempts: %s/%s", + push_url, err, failed_sends, settings.max_fail) + else + if response.code ~= 200 then + rspamd_logger.infox(task, "cannot push data to elastic backend (%s): wrong http code %s (%s); failed attempts: %s/%s", + push_url, err, response.code, failed_sends, settings.max_fail) + else + lua_util.debugm(N, task, "successfully sent %s (%s bytes) rows to ES", + nrows, #bulk_json) + + return true + end + end + + return false end + local function get_general_metadata(task) local r = {} local ip_addr = task:get_ip() @@ -175,6 +165,7 @@ local function get_general_metadata(task) else r.from = 'unknown' end + local syminf = task:get_symbols_all() r.symbols = syminf r.asn = {} @@ -182,6 +173,7 @@ local function get_general_metadata(task) r.asn.country = pool:get_variable("country") or 'unknown' r.asn.asn = pool:get_variable("asn") or 0 r.asn.ipnet = pool:get_variable("ipnet") or 'unknown' + local function process_header(name) local hdr = task:get_header_full(name) if hdr then @@ -216,9 +208,22 @@ local function elastic_collect(task) table.insert(rows, row) nrows = nrows + 1 if nrows > settings['limit'] then - elastic_send_data(task) - nrows = 0 - rows = {} + lua_util.debugm(N, task, 'send elastic search rows: %s', nrows) + if elastic_send_data(task) then + nrows = 0 + rows = {} + failed_sends = 0; + else + failed_sends = failed_sends + 1 + + if failed_sends > settings.max_fail then + rspamd_logger.errx(task, 'cannot send %s rows to ES %s times, stop trying', + nrows, failed_sends) + nrows = 0 + rows = {} + failed_sends = 0; + end + end end end @@ -246,6 +251,7 @@ local function check_elastic_server(cfg, ev_base, _) for _,plugin in pairs(value['plugins']) do if plugin['name'] == 'ingest-geoip' then plugin_found = true + lua_util.debugm(N, "ingest-geoip plugin has been found") end end if not plugin_found then @@ -276,7 +282,7 @@ end -- import ingest pipeline and kibana dashboard/visualization local function initial_setup(cfg, ev_base, worker) - if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end + if not worker:is_primary_controller() then return end local upstream = settings.upstream:get_upstream_round_robin() local ip_addr = upstream:get_addr():to_string(true) |