aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--conf/modules.d/elastic.conf5
-rw-r--r--src/plugins/lua/elastic.lua88
2 files changed, 60 insertions, 33 deletions
diff --git a/conf/modules.d/elastic.conf b/conf/modules.d/elastic.conf
index e4c70bc87..f815bc61d 100644
--- a/conf/modules.d/elastic.conf
+++ b/conf/modules.d/elastic.conf
@@ -25,7 +25,7 @@ elastic {
use_keepalive = true;
version = {
autodetect_enabled = true;
- autodetect_max_fail = 12;
+ autodetect_max_fail = 30;
# override works only if autodetect is disabled
override = {
name = "opensearch";
@@ -35,8 +35,7 @@ elastic {
limits = {
max_rows = 500; # max logs in one bulk req to elastic and first reason to flush buffer to elastic
max_interval = 60; # seconds, if first log in buffer older then interval - flush buffer
- max_size = 5000000; # max symbols count in buffer, if reached - flush buffer, f.e: 5000000 ~= 10MB/normal-worker
- max_fail = 3;
+ max_fail = 10;
};
index_template = {
managed = true;
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua
index 28c683333..d76e0d723 100644
--- a/src/plugins/lua/elastic.lua
+++ b/src/plugins/lua/elastic.lua
@@ -72,7 +72,7 @@ local settings = {
enabled = true,
version = {
autodetect_enabled = true,
- autodetect_max_fail = 12,
+ autodetect_max_fail = 30,
-- override works only if autodetect is disabled
override = {
name = 'opensearch',
@@ -82,8 +82,7 @@ local settings = {
limits = {
max_rows = 500, -- max logs in one bulk req to elastic and first reason to flush buffer
max_interval = 60, -- seconds, if first log in buffer older then interval - flush buffer
- max_size = 5000000, -- max symbols count in buffer, if reached - flush buffer, f.e: 5000000 ~= 10MB/normal-worker
- max_fail = 3,
+ max_fail = 15,
},
index_template = {
managed = true,
@@ -179,17 +178,6 @@ function Queue:length()
return self.last - self.first + 1
end
-function Queue:size()
- local size = 0
- for i = self.first, self.last do
- local row = self.data[i]
- if row ~= nil then
- size = size + #row
- end
- end
- return size
-end
-
function Queue:get(index)
local real_index = self.first + index - 1
if real_index <= self.last then
@@ -327,6 +315,52 @@ local function get_received_delay(received_headers)
return delay
end
+local function is_empty(str)
+ -- define a pattern that includes invisible unicode characters
+ local str_cleared = str:gsub('[' ..
+ '\xC2\xA0' .. -- U+00A0 non-breaking space
+ '\xE2\x80\x8B' .. -- U+200B zero width space
+ '\xEF\xBB\xBF' .. -- U+FEFF byte order mark (zero width no-break space)
+ '\xE2\x80\x8C' .. -- U+200C zero width non-joiner
+ '\xE2\x80\x8D' .. -- U+200D zero width joiner
+ '\xE2\x80\x8E' .. -- U+200E left-to-right mark
+ '\xE2\x80\x8F' .. -- U+200F right-to-left mark
+ '\xE2\x81\xA0' .. -- U+2060 word joiner
+ '\xE2\x80\xAA' .. -- U+202A left-to-right embedding
+ '\xE2\x80\xAB' .. -- U+202B right-to-left embedding
+ '\xE2\x80\xAC' .. -- U+202C pop directional formatting
+ '\xE2\x80\xAD' .. -- U+202D left-to-right override
+ '\xE2\x80\xAE' .. -- U+202E right-to-left override
+ '\xE2\x81\x9F' .. -- U+2061 function application
+ '\xE2\x81\xA1' .. -- U+2061 invisible separator
+ '\xE2\x81\xA2' .. -- U+2062 invisible times
+ '\xE2\x81\xA3' .. -- U+2063 invisible separator
+ '\xE2\x81\xA4' .. -- U+2064 invisible plus
+ ']', '') -- gsub replaces all matched characters with an empty string
+ if str_cleared:match('[%S]') then
+ return false
+ else
+ return true
+ end
+end
+
+local function fill_empty_strings(tbl, empty_value)
+ local filled_tbl = {}
+ for key, value in pairs(tbl) do
+ if value and type(value) == 'table' then
+ local nested_filtered = fill_empty_strings(value, empty_value)
+ if next(nested_filtered) ~= nil then
+ filled_tbl[key] = nested_filtered
+ end
+ elseif value and type(value) == 'string' and is_empty(value) then
+ filled_tbl[key] = empty_value
+ elseif value then
+ filled_tbl[key] = value
+ end
+ end
+ return filled_tbl
+end
+
local function create_bulk_json(es_index, logs_to_send)
local tbl = {}
for _, row in pairs(logs_to_send) do
@@ -483,7 +517,7 @@ local function get_general_metadata(task)
r.ip = '::'
r.is_local = false
local ip_addr = task:get_ip()
- if ip_addr and ip_addr:is_valid() then
+ if ip_addr and ip_addr:is_valid() and tostring(ip_addr) ~= '...' then
r.is_local = ip_addr:is_local()
r.ip = tostring(ip_addr)
end
@@ -494,7 +528,7 @@ local function get_general_metadata(task)
origin = origin:gsub('^%[', ''):gsub('%]:[0-9]+$', ''):gsub('%]$', '')
local rspamd_ip = require "rspamd_ip"
local origin_ip = rspamd_ip.from_string(origin)
- if origin_ip and origin_ip:is_valid() then
+ if origin_ip and origin_ip:is_valid() and tostring(origin_ip) ~= '...' then
r.sender_ip = tostring(origin_ip)
end
end
@@ -510,7 +544,7 @@ local function get_general_metadata(task)
local rcpt = task:get_recipients('smtp')
local l = {}
for _, a in ipairs(rcpt) do
- if a['user'] and a['user'] ~= '' and a['domain'] and a['domain'] ~= '' then
+ if a['user'] and #a['user'] > 0 and a['domain'] and #a['domain'] > 0 then
table.insert(l, a['user'] .. '@' .. a['domain']:lower())
end
end
@@ -528,8 +562,8 @@ local function get_general_metadata(task)
if task:has_from('smtp') then
local from = task:get_from({ 'smtp', 'orig' })[1]
if from and
- from['user'] and from['user'] ~= '' and
- from['domain'] and from['domain'] ~= ''
+ from['user'] and #from['user'] > 0 and
+ from['domain'] and #from['domain'] > 0
then
r.from_user = from['user']
r.from_domain = from['domain']:lower()
@@ -541,8 +575,8 @@ local function get_general_metadata(task)
if task:has_from('mime') then
local mime_from = task:get_from({ 'mime', 'orig' })[1]
if mime_from and
- mime_from['user'] and mime_from['user'] ~= '' and
- mime_from['domain'] and mime_from['domain'] ~= ''
+ mime_from['user'] and #mime_from['user'] > 0 and
+ mime_from['domain'] and #mime_from['domain'] > 0
then
r.mime_from_user = mime_from['user']
r.mime_from_domain = mime_from['domain']:lower()
@@ -583,10 +617,10 @@ local function get_general_metadata(task)
end
local header
if settings['index_template']['headers_text_ignore_above'] ~= 0 and
- #h.decoded >= headers_text_ignore_above
+ h.decoded and #h.decoded >= headers_text_ignore_above
then
header = h.decoded:sub(1, headers_text_ignore_above) .. '...'
- elseif #h.decoded > 0 then
+ elseif h.decoded and #h.decoded > 0 then
header = h.decoded
else
header = empty
@@ -649,7 +683,7 @@ local function get_general_metadata(task)
r.received_delay = get_received_delay(task:get_received_headers())
- return r
+ return fill_empty_strings(r, empty)
end
local function elastic_collect(task)
@@ -693,12 +727,6 @@ local function periodic_send_data(cfg, ev_base)
rspamd_logger.infox(rspamd_config, 'flushing buffer for %s by reaching max interval, oldest log in buffer written %s sec ago',
time_diff_sec, first_row['@timestamp'])
flush_needed = true
- else
- local size = buffer['logs']:size()
- if size >= settings['limits']['max_size'] then
- rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max size: %s/%s', size, settings['limits']['max_size'])
- flush_needed = true
- end
end
end
end