]> source.dussan.org Git - rspamd.git/commitdiff
* add more validation on empty strings, required to not face errors in saving logs...
authorDmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
Sun, 3 Nov 2024 15:00:11 +0000 (16:00 +0100)
committerDmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
Sun, 3 Nov 2024 15:00:11 +0000 (16:00 +0100)
* remove max_size as it was looking to rows elements count, not strings size in total, such check will be too much compute intensive
* increase default errors max_fail as usually elastic not recover so quickly and needs a bit more time

conf/modules.d/elastic.conf
src/plugins/lua/elastic.lua

index e4c70bc87f2c598692280f475e5d604b25e224fb..f815bc61d5735bebc30749e441a679475071062e 100644 (file)
@@ -25,7 +25,7 @@ elastic {
   use_keepalive = true;
   version = {
     autodetect_enabled = true;
-    autodetect_max_fail = 12;
+    autodetect_max_fail = 30;
     # override works only if autodetect is disabled
     override = {
       name = "opensearch";
@@ -35,8 +35,7 @@ elastic {
   limits = {
     max_rows = 500; # max logs in one bulk req to elastic and first reason to flush buffer to elastic
     max_interval = 60; # seconds, if first log in buffer older then interval - flush buffer
-    max_size = 5000000; # max symbols count in buffer, if reached - flush buffer, f.e: 5000000 ~= 10MB/normal-worker
-    max_fail = 3;
+    max_fail = 10;
   };
   index_template = {
     managed = true;
index 28c683333767d03ed52464ec7f3f7aab70f1fa4b..d76e0d72341c0b7e0738d4bebe4024260a3e977e 100644 (file)
@@ -72,7 +72,7 @@ local settings = {
   enabled = true,
   version = {
     autodetect_enabled = true,
-    autodetect_max_fail  = 12,
+    autodetect_max_fail  = 30,
     -- override works only if autodetect is disabled
     override = {
       name = 'opensearch',
@@ -82,8 +82,7 @@ local settings = {
   limits = {
     max_rows = 500, -- max logs in one bulk req to elastic and first reason to flush buffer
     max_interval = 60, -- seconds, if first log in buffer older then interval - flush buffer
-    max_size = 5000000, -- max symbols count in buffer, if reached - flush buffer, f.e: 5000000 ~= 10MB/normal-worker
-    max_fail = 3,
+    max_fail = 15,
   },
   index_template = {
     managed = true,
@@ -179,17 +178,6 @@ function Queue:length()
   return self.last - self.first + 1
 end
 
-function Queue:size()
-  local size = 0
-  for i = self.first, self.last do
-    local row = self.data[i]
-    if row ~= nil then
-      size = size + #row
-    end
-  end
-  return size
-end
-
 function Queue:get(index)
   local real_index = self.first + index - 1
   if real_index <= self.last then
@@ -327,6 +315,52 @@ local function get_received_delay(received_headers)
   return delay
 end
 
+local function is_empty(str)
+  -- define a pattern that includes invisible unicode characters
+  local str_cleared = str:gsub('[' ..
+    '\xC2\xA0'     .. -- U+00A0 non-breaking space
+    '\xE2\x80\x8B' .. -- U+200B zero width space
+    '\xEF\xBB\xBF' .. -- U+FEFF byte order mark (zero width no-break space)
+    '\xE2\x80\x8C' .. -- U+200C zero width non-joiner
+    '\xE2\x80\x8D' .. -- U+200D zero width joiner
+    '\xE2\x80\x8E' .. -- U+200E left-to-right mark
+    '\xE2\x80\x8F' .. -- U+200F right-to-left mark
+    '\xE2\x81\xA0' .. -- U+2060 word joiner
+    '\xE2\x80\xAA' .. -- U+202A left-to-right embedding
+    '\xE2\x80\xAB' .. -- U+202B right-to-left embedding
+    '\xE2\x80\xAC' .. -- U+202C pop directional formatting
+    '\xE2\x80\xAD' .. -- U+202D left-to-right override
+    '\xE2\x80\xAE' .. -- U+202E right-to-left override
+    '\xE2\x81\x9F' .. -- U+2061 function application
+    '\xE2\x81\xA1' .. -- U+2061 invisible separator
+    '\xE2\x81\xA2' .. -- U+2062 invisible times
+    '\xE2\x81\xA3' .. -- U+2063 invisible separator
+    '\xE2\x81\xA4' .. -- U+2064 invisible plus
+    ']', '') -- gsub replaces all matched characters with an empty string
+  if str_cleared:match('[%S]') then
+    return false
+  else
+    return true
+  end
+end
+
+local function fill_empty_strings(tbl, empty_value)
+  local filled_tbl = {}
+  for key, value in pairs(tbl) do
+    if value and type(value) == 'table' then
+      local nested_filtered = fill_empty_strings(value, empty_value)
+      if next(nested_filtered) ~= nil then
+      filled_tbl[key] = nested_filtered
+      end
+    elseif value and type(value) == 'string' and is_empty(value) then
+      filled_tbl[key] = empty_value
+    elseif value then
+      filled_tbl[key] = value
+    end
+  end
+  return filled_tbl
+end
+
 local function create_bulk_json(es_index, logs_to_send)
   local tbl = {}
   for _, row in pairs(logs_to_send) do
@@ -483,7 +517,7 @@ local function get_general_metadata(task)
   r.ip = '::'
   r.is_local = false
   local ip_addr = task:get_ip()
-  if ip_addr and ip_addr:is_valid() then
+  if ip_addr and ip_addr:is_valid() and tostring(ip_addr) ~= '...' then
     r.is_local = ip_addr:is_local()
     r.ip = tostring(ip_addr)
   end
@@ -494,7 +528,7 @@ local function get_general_metadata(task)
     origin = origin:gsub('^%[', ''):gsub('%]:[0-9]+$', ''):gsub('%]$', '')
     local rspamd_ip = require "rspamd_ip"
     local origin_ip = rspamd_ip.from_string(origin)
-    if origin_ip and origin_ip:is_valid() then
+    if origin_ip and origin_ip:is_valid() and tostring(origin_ip) ~= '...' then
       r.sender_ip = tostring(origin_ip)
     end
   end
@@ -510,7 +544,7 @@ local function get_general_metadata(task)
     local rcpt = task:get_recipients('smtp')
     local l = {}
     for _, a in ipairs(rcpt) do
-      if a['user'] and a['user'] ~= '' and a['domain'] and a['domain'] ~= '' then
+      if a['user'] and #a['user'] > 0 and a['domain'] and #a['domain'] > 0 then
         table.insert(l, a['user'] .. '@' .. a['domain']:lower())
       end
     end
@@ -528,8 +562,8 @@ local function get_general_metadata(task)
   if task:has_from('smtp') then
     local from = task:get_from({ 'smtp', 'orig' })[1]
     if from and
-      from['user'] and from['user'] ~= '' and
-      from['domain'] and from['domain'] ~= ''
+      from['user'] and #from['user'] > 0 and
+      from['domain'] and #from['domain'] > 0
     then
       r.from_user = from['user']
       r.from_domain = from['domain']:lower()
@@ -541,8 +575,8 @@ local function get_general_metadata(task)
   if task:has_from('mime') then
     local mime_from = task:get_from({ 'mime', 'orig' })[1]
     if mime_from and
-      mime_from['user'] and mime_from['user'] ~= '' and
-      mime_from['domain'] and mime_from['domain'] ~= ''
+      mime_from['user'] and #mime_from['user'] > 0 and
+      mime_from['domain'] and #mime_from['domain'] > 0
     then
       r.mime_from_user = mime_from['user']
       r.mime_from_domain = mime_from['domain']:lower()
@@ -583,10 +617,10 @@ local function get_general_metadata(task)
         end
         local header
         if settings['index_template']['headers_text_ignore_above'] ~= 0 and
-          #h.decoded >= headers_text_ignore_above
+          h.decoded and #h.decoded >= headers_text_ignore_above
         then
           header = h.decoded:sub(1, headers_text_ignore_above) .. '...'
-        elseif #h.decoded > 0 then
+        elseif h.decoded and #h.decoded > 0 then
           header = h.decoded
         else
           header = empty
@@ -649,7 +683,7 @@ local function get_general_metadata(task)
 
   r.received_delay = get_received_delay(task:get_received_headers())
 
-  return r
+  return fill_empty_strings(r, empty)
 end
 
 local function elastic_collect(task)
@@ -693,12 +727,6 @@ local function periodic_send_data(cfg, ev_base)
         rspamd_logger.infox(rspamd_config, 'flushing buffer for %s by reaching max interval, oldest log in buffer written %s sec ago',
           time_diff_sec, first_row['@timestamp'])
         flush_needed = true
-      else
-        local size = buffer['logs']:size()
-        if size >= settings['limits']['max_size'] then
-          rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max size: %s/%s', size, settings['limits']['max_size'])
-          flush_needed = true
-        end
       end
     end
   end