diff options
-rw-r--r-- | conf/modules.d/elastic.conf | 5 | ||||
-rw-r--r-- | src/plugins/lua/elastic.lua | 88 |
2 files changed, 60 insertions, 33 deletions
diff --git a/conf/modules.d/elastic.conf b/conf/modules.d/elastic.conf index e4c70bc87..f815bc61d 100644 --- a/conf/modules.d/elastic.conf +++ b/conf/modules.d/elastic.conf @@ -25,7 +25,7 @@ elastic { use_keepalive = true; version = { autodetect_enabled = true; - autodetect_max_fail = 12; + autodetect_max_fail = 30; # override works only if autodetect is disabled override = { name = "opensearch"; @@ -35,8 +35,7 @@ elastic { limits = { max_rows = 500; # max logs in one bulk req to elastic and first reason to flush buffer to elastic max_interval = 60; # seconds, if first log in buffer older then interval - flush buffer - max_size = 5000000; # max symbols count in buffer, if reached - flush buffer, f.e: 5000000 ~= 10MB/normal-worker - max_fail = 3; + max_fail = 10; }; index_template = { managed = true; diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index 28c683333..d76e0d723 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -72,7 +72,7 @@ local settings = { enabled = true, version = { autodetect_enabled = true, - autodetect_max_fail = 12, + autodetect_max_fail = 30, -- override works only if autodetect is disabled override = { name = 'opensearch', @@ -82,8 +82,7 @@ local settings = { limits = { max_rows = 500, -- max logs in one bulk req to elastic and first reason to flush buffer max_interval = 60, -- seconds, if first log in buffer older then interval - flush buffer - max_size = 5000000, -- max symbols count in buffer, if reached - flush buffer, f.e: 5000000 ~= 10MB/normal-worker - max_fail = 3, + max_fail = 15, }, index_template = { managed = true, @@ -179,17 +178,6 @@ function Queue:length() return self.last - self.first + 1 end -function Queue:size() - local size = 0 - for i = self.first, self.last do - local row = self.data[i] - if row ~= nil then - size = size + #row - end - end - return size -end - function Queue:get(index) local real_index = self.first + index - 1 if real_index <= self.last then @@ -327,6 +315,52 @@ local function get_received_delay(received_headers) return delay end +local function is_empty(str) + -- define a pattern that includes invisible unicode characters + local str_cleared = str:gsub('[' .. + '\xC2\xA0' .. -- U+00A0 non-breaking space + '\xE2\x80\x8B' .. -- U+200B zero width space + '\xEF\xBB\xBF' .. -- U+FEFF byte order mark (zero width no-break space) + '\xE2\x80\x8C' .. -- U+200C zero width non-joiner + '\xE2\x80\x8D' .. -- U+200D zero width joiner + '\xE2\x80\x8E' .. -- U+200E left-to-right mark + '\xE2\x80\x8F' .. -- U+200F right-to-left mark + '\xE2\x81\xA0' .. -- U+2060 word joiner + '\xE2\x80\xAA' .. -- U+202A left-to-right embedding + '\xE2\x80\xAB' .. -- U+202B right-to-left embedding + '\xE2\x80\xAC' .. -- U+202C pop directional formatting + '\xE2\x80\xAD' .. -- U+202D left-to-right override + '\xE2\x80\xAE' .. -- U+202E right-to-left override + '\xE2\x81\x9F' .. -- U+2061 function application + '\xE2\x81\xA1' .. -- U+2061 invisible separator + '\xE2\x81\xA2' .. -- U+2062 invisible times + '\xE2\x81\xA3' .. -- U+2063 invisible separator + '\xE2\x81\xA4' .. -- U+2064 invisible plus + ']', '') -- gsub replaces all matched characters with an empty string + if str_cleared:match('[%S]') then + return false + else + return true + end +end + +local function fill_empty_strings(tbl, empty_value) + local filled_tbl = {} + for key, value in pairs(tbl) do + if value and type(value) == 'table' then + local nested_filtered = fill_empty_strings(value, empty_value) + if next(nested_filtered) ~= nil then + filled_tbl[key] = nested_filtered + end + elseif value and type(value) == 'string' and is_empty(value) then + filled_tbl[key] = empty_value + elseif value then + filled_tbl[key] = value + end + end + return filled_tbl +end + local function create_bulk_json(es_index, logs_to_send) local tbl = {} for _, row in pairs(logs_to_send) do @@ -483,7 +517,7 @@ local function get_general_metadata(task) r.ip = '::' r.is_local = false local ip_addr = task:get_ip() - if ip_addr and ip_addr:is_valid() then + if ip_addr and ip_addr:is_valid() and tostring(ip_addr) ~= '...' then r.is_local = ip_addr:is_local() r.ip = tostring(ip_addr) end @@ -494,7 +528,7 @@ local function get_general_metadata(task) origin = origin:gsub('^%[', ''):gsub('%]:[0-9]+$', ''):gsub('%]$', '') local rspamd_ip = require "rspamd_ip" local origin_ip = rspamd_ip.from_string(origin) - if origin_ip and origin_ip:is_valid() then + if origin_ip and origin_ip:is_valid() and tostring(origin_ip) ~= '...' then r.sender_ip = tostring(origin_ip) end end @@ -510,7 +544,7 @@ local function get_general_metadata(task) local rcpt = task:get_recipients('smtp') local l = {} for _, a in ipairs(rcpt) do - if a['user'] and a['user'] ~= '' and a['domain'] and a['domain'] ~= '' then + if a['user'] and #a['user'] > 0 and a['domain'] and #a['domain'] > 0 then table.insert(l, a['user'] .. '@' .. a['domain']:lower()) end end @@ -528,8 +562,8 @@ local function get_general_metadata(task) if task:has_from('smtp') then local from = task:get_from({ 'smtp', 'orig' })[1] if from and - from['user'] and from['user'] ~= '' and - from['domain'] and from['domain'] ~= '' + from['user'] and #from['user'] > 0 and + from['domain'] and #from['domain'] > 0 then r.from_user = from['user'] r.from_domain = from['domain']:lower() @@ -541,8 +575,8 @@ local function get_general_metadata(task) if task:has_from('mime') then local mime_from = task:get_from({ 'mime', 'orig' })[1] if mime_from and - mime_from['user'] and mime_from['user'] ~= '' and - mime_from['domain'] and mime_from['domain'] ~= '' + mime_from['user'] and #mime_from['user'] > 0 and + mime_from['domain'] and #mime_from['domain'] > 0 then r.mime_from_user = mime_from['user'] r.mime_from_domain = mime_from['domain']:lower() @@ -583,10 +617,10 @@ local function get_general_metadata(task) end local header if settings['index_template']['headers_text_ignore_above'] ~= 0 and - #h.decoded >= headers_text_ignore_above + h.decoded and #h.decoded >= headers_text_ignore_above then header = h.decoded:sub(1, headers_text_ignore_above) .. '...' - elseif #h.decoded > 0 then + elseif h.decoded and #h.decoded > 0 then header = h.decoded else header = empty @@ -649,7 +683,7 @@ local function get_general_metadata(task) r.received_delay = get_received_delay(task:get_received_headers()) - return r + return fill_empty_strings(r, empty) end local function elastic_collect(task) @@ -693,12 +727,6 @@ local function periodic_send_data(cfg, ev_base) rspamd_logger.infox(rspamd_config, 'flushing buffer for %s by reaching max interval, oldest log in buffer written %s sec ago', time_diff_sec, first_row['@timestamp']) flush_needed = true - else - local size = buffer['logs']:size() - if size >= settings['limits']['max_size'] then - rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max size: %s/%s', size, settings['limits']['max_size']) - flush_needed = true - end end end end |