enabled = true,
version = {
autodetect_enabled = true,
- autodetect_max_fail = 12,
+ autodetect_max_fail = 30,
-- override works only if autodetect is disabled
override = {
name = 'opensearch',
limits = {
max_rows = 500, -- max logs in one bulk req to elastic and first reason to flush buffer
max_interval = 60, -- seconds, if first log in buffer older then interval - flush buffer
- max_size = 5000000, -- max symbols count in buffer, if reached - flush buffer, f.e: 5000000 ~= 10MB/normal-worker
- max_fail = 3,
+ max_fail = 15,
},
index_template = {
managed = true,
return self.last - self.first + 1
end
-function Queue:size()
- local size = 0
- for i = self.first, self.last do
- local row = self.data[i]
- if row ~= nil then
- size = size + #row
- end
- end
- return size
-end
-
function Queue:get(index)
local real_index = self.first + index - 1
if real_index <= self.last then
return delay
end
+local function is_empty(str)
+ -- define a pattern that includes invisible unicode characters
+ local str_cleared = str:gsub('[' ..
+ '\xC2\xA0' .. -- U+00A0 non-breaking space
+ '\xE2\x80\x8B' .. -- U+200B zero width space
+ '\xEF\xBB\xBF' .. -- U+FEFF byte order mark (zero width no-break space)
+ '\xE2\x80\x8C' .. -- U+200C zero width non-joiner
+ '\xE2\x80\x8D' .. -- U+200D zero width joiner
+ '\xE2\x80\x8E' .. -- U+200E left-to-right mark
+ '\xE2\x80\x8F' .. -- U+200F right-to-left mark
+ '\xE2\x81\xA0' .. -- U+2060 word joiner
+ '\xE2\x80\xAA' .. -- U+202A left-to-right embedding
+ '\xE2\x80\xAB' .. -- U+202B right-to-left embedding
+ '\xE2\x80\xAC' .. -- U+202C pop directional formatting
+ '\xE2\x80\xAD' .. -- U+202D left-to-right override
+ '\xE2\x80\xAE' .. -- U+202E right-to-left override
+ '\xE2\x81\x9F' .. -- U+2061 function application
+ '\xE2\x81\xA1' .. -- U+2061 invisible separator
+ '\xE2\x81\xA2' .. -- U+2062 invisible times
+ '\xE2\x81\xA3' .. -- U+2063 invisible separator
+ '\xE2\x81\xA4' .. -- U+2064 invisible plus
+ ']', '') -- gsub replaces all matched characters with an empty string
+ if str_cleared:match('[%S]') then
+ return false
+ else
+ return true
+ end
+end
+
+local function fill_empty_strings(tbl, empty_value)
+ local filled_tbl = {}
+ for key, value in pairs(tbl) do
+ if value and type(value) == 'table' then
+ local nested_filtered = fill_empty_strings(value, empty_value)
+ if next(nested_filtered) ~= nil then
+ filled_tbl[key] = nested_filtered
+ end
+ elseif value and type(value) == 'string' and is_empty(value) then
+ filled_tbl[key] = empty_value
+ elseif value then
+ filled_tbl[key] = value
+ end
+ end
+ return filled_tbl
+end
+
local function create_bulk_json(es_index, logs_to_send)
local tbl = {}
for _, row in pairs(logs_to_send) do
r.ip = '::'
r.is_local = false
local ip_addr = task:get_ip()
- if ip_addr and ip_addr:is_valid() then
+ if ip_addr and ip_addr:is_valid() and tostring(ip_addr) ~= '...' then
r.is_local = ip_addr:is_local()
r.ip = tostring(ip_addr)
end
origin = origin:gsub('^%[', ''):gsub('%]:[0-9]+$', ''):gsub('%]$', '')
local rspamd_ip = require "rspamd_ip"
local origin_ip = rspamd_ip.from_string(origin)
- if origin_ip and origin_ip:is_valid() then
+ if origin_ip and origin_ip:is_valid() and tostring(origin_ip) ~= '...' then
r.sender_ip = tostring(origin_ip)
end
end
local rcpt = task:get_recipients('smtp')
local l = {}
for _, a in ipairs(rcpt) do
- if a['user'] and a['user'] ~= '' and a['domain'] and a['domain'] ~= '' then
+ if a['user'] and #a['user'] > 0 and a['domain'] and #a['domain'] > 0 then
table.insert(l, a['user'] .. '@' .. a['domain']:lower())
end
end
if task:has_from('smtp') then
local from = task:get_from({ 'smtp', 'orig' })[1]
if from and
- from['user'] and from['user'] ~= '' and
- from['domain'] and from['domain'] ~= ''
+ from['user'] and #from['user'] > 0 and
+ from['domain'] and #from['domain'] > 0
then
r.from_user = from['user']
r.from_domain = from['domain']:lower()
if task:has_from('mime') then
local mime_from = task:get_from({ 'mime', 'orig' })[1]
if mime_from and
- mime_from['user'] and mime_from['user'] ~= '' and
- mime_from['domain'] and mime_from['domain'] ~= ''
+ mime_from['user'] and #mime_from['user'] > 0 and
+ mime_from['domain'] and #mime_from['domain'] > 0
then
r.mime_from_user = mime_from['user']
r.mime_from_domain = mime_from['domain']:lower()
end
local header
if settings['index_template']['headers_text_ignore_above'] ~= 0 and
- #h.decoded >= headers_text_ignore_above
+ h.decoded and #h.decoded >= headers_text_ignore_above
then
header = h.decoded:sub(1, headers_text_ignore_above) .. '...'
- elseif #h.decoded > 0 then
+ elseif h.decoded and #h.decoded > 0 then
header = h.decoded
else
header = empty
r.received_delay = get_received_delay(task:get_received_headers())
- return r
+ return fill_empty_strings(r, empty)
end
local function elastic_collect(task)
rspamd_logger.infox(rspamd_config, 'flushing buffer for %s by reaching max interval, oldest log in buffer written %s sec ago',
time_diff_sec, first_row['@timestamp'])
flush_needed = true
- else
- local size = buffer['logs']:size()
- if size >= settings['limits']['max_size'] then
- rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max size: %s/%s', size, settings['limits']['max_size'])
- flush_needed = true
- end
end
end
end