From 9a90347ea82b7a18aee99ed308257d668a20e4b6 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Fri, 11 Oct 2024 18:49:58 +0200 Subject: [Rework] Breaking: Actualize elastic module, support Elastic 8 & OpenSearch 2, add index policy with logs retention and many more Signed-off-by: Dmytro Alieksieiev <1865999+dragoangel@users.noreply.github.com> --- src/plugins/lua/elastic.lua | 1753 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 1429 insertions(+), 324 deletions(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index ccbb7c198..205a5ce86 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -18,219 +18,615 @@ limitations under the License. local rspamd_logger = require 'rspamd_logger' local rspamd_http = require "rspamd_http" local lua_util = require "lua_util" -local util = require "rspamd_util" +local rspamd_util = require "rspamd_util" local ucl = require "ucl" -local rspamd_redis = require "lua_redis" local upstream_list = require "rspamd_upstream_list" -local lua_settings = require "lua_settings" if confighelp then return end -local rows = {} -local nrows = 0 -local failed_sends = 0 -local elastic_template -local redis_params local N = "elastic" -local E = {} -local HOSTNAME = util.get_hostname() local connect_prefix = 'http://' -local enabled = true -local ingest_geoip_type = 'plugins' +local rspamd_hostname = rspamd_util.get_hostname() +-- supported_distro: +-- from - minimal compatible version supported, including +-- till & till_unknown = true - yet unreleased version, so unknown compitability, excluding +-- hill & till_unknown = false - version is known to be not yet compatible with this module +local supported_distro = { + elastic = { + from = '7.8', + till = '9', + till_unknown = true, + }, + opensearch = { + from = '1', + till = '3', + till_unknown = true, + }, +} +local detected_distro = { + name = nil, + version = nil, + supported = false, +} +local states = { + distro = { + configured = false, + errors = 0, + }, + index_template = { + configured = false, + errors = 0, + }, + index_policy = { + configured = false, + errors = 0, + }, + geoip_pipeline = { + configured = false, + errors = 0, + }, +} local settings = { - limit = 500, - index_pattern = 'rspamd-%Y.%m.%d', - template_file = rspamd_paths['SHAREDIR'] .. '/elastic/rspamd_template.json', - kibana_file = rspamd_paths['SHAREDIR'] .. '/elastic/kibana.json', - key_prefix = 'elastic-', - expire = 3600, + enabled = true, + version = { + autodetect_enabled = true, + autodetect_max_fail = 12, + -- override works only if autodetect is disabled + override = { + name = 'opensearch', + version = '2.17', + } + }, + limits = { + max_rows = 1000, -- max logs in one bulk req to elastic and first reason to flush buffer + max_interval = 60, -- seconds, if first log in buffer older then interval - flush buffer + max_size = 5000000, -- max symbols count in buffer, if reached - flush buffer, f.e: 5000000 ~= 10MB/normal-worker + max_fail = 3, + }, + index_template = { + managed = true, + name = 'rspamd', + pattern = '{service}-%Y.%m.%d', + priority = 0, + shards_count = 3, + replicas_count = 1, + refresh_interval = 5, -- seconds + dynamic_keyword_ignore_above = 256, + headers_text_ignore_above = 2048, -- strip headers value and add '...' to the end, set 0 to disable limit + symbols_nested = false, + empty_value = 'unknown', -- empty numbers, ips and ipnets are not customizable they will be always 0, :: and ::/128 respectively + }, + index_policy = { + enabled = true, + managed = true, + name = 'rspamd', -- if you want use custom lifecycle policy, change name and set managed = false + hot = { + index_priority = 100, + }, + warm = { + enabled = true, + after = '2d', + index_priority = 50, + migrate = true, -- only supported with elastic distro, will not have impact elsewhere + read_only = true, + change_replicas = false, + replicas_count = 1, + shrink = false, + shards_count = 1, + max_gb_per_shard = 0, -- zero - disabled by default, if enabled - shards_count is ignored + force_merge = true, + segments_count = 1, + }, + cold = { + enabled = true, + after = '14d', + index_priority = 0, + migrate = true, -- only supported with elastic distro, will not have impact elsewhere + read_only = true, + change_replicas = false, + replicas_count = 1, + }, + delete = { + enabled = true, + after = '30d', + }, + }, + collect_headers = { + 'From', + 'To', + 'Subject', + 'Date', + 'User-Agent', + }, + extra_collect_headers = { + -- 'List-Id', + -- 'X-Mailer', + }, + geoip = { + enabled = true, + managed = true, + pipeline_name = 'rspamd-geoip', + }, + periodic_interval = 5.0, timeout = 5.0, - failover = false, - import_kibana = false, use_https = false, + no_ssl_verify = false, use_gzip = true, + use_keepalive = true, allow_local = false, user = nil, password = nil, - no_ssl_verify = false, - max_fail = 3, - ingest_module = false, - elasticsearch_version = 6, } -local function read_file(path) - local file = io.open(path, "rb") - if not file then +local Queue = {} +Queue.__index = Queue + +function Queue:new() + local obj = {first = 1, last = 0, data = {}} + setmetatable(obj, self) + return obj +end + +function Queue:push(value) + self.last = self.last + 1 + self.data[self.last] = value +end + +function Queue:length() + return self.last - self.first + 1 +end + +function Queue:size() + local size = 0 + for i = self.first, self.last do + local row = self.data[i] + if row ~= nil then + size = size + #row + end + end + return size +end + +function Queue:get(index) + local real_index = self.first + index - 1 + if real_index <= self.last then + return self.data[real_index] + else return nil end - local content = file:read "*a" - file:close() - return content end -local function elastic_send_data(task) - local es_index = os.date(settings['index_pattern']) +function Queue:get_all() + local items = {} + for i = self.first, self.last do + table.insert(items, self.data[i]) + end + return items +end + +function Queue:pop() + if self.first > self.last then + return nil + end + local value = self.data[self.first] + self.data[self.first] = nil + self.first = self.first + 1 + return value +end + +function Queue:get_first(count) + local items = {} + count = count or self:length() + local actual_end = math.min(self.first + count - 1, self.last) + for i = self.first, actual_end do + table.insert(items, self.data[i]) + end + return items +end + +function Queue:pop_first(count) + local popped_items = {} + count = count or self:length() + local actual_count = math.min(count, self:length()) + for i = 1, actual_count do + local item = self:pop() + table.insert(popped_items, item) + end + return popped_items +end + +local buffer = { + logs = Queue:new(), + errors = 0, +} + +local function contains(tbl, val) + for i=1,#tbl do + if tbl[i]:lower() == val:lower() then + return true + end + end + return false +end + +local function safe_get(table, ...) + local value = table + for _, key in ipairs({...}) do + if value[key] == nil then + return nil + end + value = value[key] + end + return value +end + +local function deep_compare(t1, t2, visited) + if t1 == t2 then + return true + end + + if type(t1) ~= "table" or type(t2) ~= "table" then + return false + end + + -- use visited to keep track of already compared tables to handle cycles + visited = visited or {} + if visited[t1] and visited[t1][t2] then + return true + end + + visited[t1] = visited[t1] or {} + visited[t1][t2] = true + + -- compare the number of keys in both tables + local t1len = 0 + for _ in pairs(t1) do + t1len = t1len + 1 + end + + local t2len = 0 + for _ in pairs(t2) do + t2len = t2len + 1 + end + + if t1len ~= t2len then + return false + end + + -- recursively compare each key-value pair + for k, v1 in pairs(t1) do + local v2 = t2[k] + if v2 == nil or not deep_compare(v1, v2, visited) then + return false + end + end + return true +end + +local function compare_versions(v1, v2) + -- helper function to extract the numeric version string + local function extract_numeric_version(version) + -- remove any trailing characters that are not digits or dots + version = version:match("^([%.%d]+)") + local parts = {} + for part in string.gmatch(version or "", '([^.]+)') do + table.insert(parts, tonumber(part) or 0) + end + return parts + end + + local v1_parts = extract_numeric_version(v1) + local v2_parts = extract_numeric_version(v2) + local max_length = math.max(#v1_parts, #v2_parts) + + -- compare each part of the version strings + for i = 1, max_length do + local num1 = v1_parts[i] or 0 + local num2 = v2_parts[i] or 0 + + if num1 > num2 then + return 1 -- v1 is greater than v2 + elseif num1 < num2 then + return -1 -- v1 is less than v2 + end + -- if equal, continue to the next part + end + return 0 -- versions are equal +end + +local function handle_error(action,component,limit) + if states[component]['errors'] >= limit then + rspamd_logger.errx(rspamd_config, 'cannot %s elastic %s, failed attempts: %s/%s, stop trying', + action, component:gsub('_', ' '), states[component]['errors'], limit) + states[component]['configured'] = true + else + states[component]['errors'] = states[component]['errors'] + 1 + end + return true +end + +local function create_bulk_json(es_index, logs_to_send) local tbl = {} - for _, value in pairs(rows) do - if settings.elasticsearch_version >= 7 then - table.insert(tbl, '{ "index" : { "_index" : "' .. es_index .. - '","pipeline": "rspamd-geoip"} }') - else - table.insert(tbl, '{ "index" : { "_index" : "' .. es_index .. - '", "_type" : "_doc" ,"pipeline": "rspamd-geoip"} }') + for _, row in pairs(logs_to_send) do + local pipeline = '' + if settings['geoip']['enabled']then + pipeline = ',"pipeline":"'.. settings['geoip']['pipeline_name'] .. '"' end - table.insert(tbl, ucl.to_format(value, 'json-compact')) + table.insert(tbl, '{"index":{"_index":"' .. es_index .. '"' .. pipeline .. '}}') + table.insert(tbl, ucl.to_format(row, 'json-compact')) end + table.insert(tbl, '') -- for last \n + return table.concat(tbl, "\n") +end - table.insert(tbl, '') -- For last \n +local function elastic_send_data(flush_all, task, cfg, ev_base) + local log_object = task or rspamd_config + local nlogs_to_send = 0 + local es_index + local upstream + local host + local push_url + local bulk_json + local logs_to_send + if flush_all then + logs_to_send = buffer['logs']:get_all() + else + logs_to_send = buffer['logs']:get_first(settings['limits']['max_rows']) + end + nlogs_to_send = #logs_to_send -- actual size can be lower then max_rows + if nlogs_to_send > 0 then + es_index = settings['index_template']['name'] .. '-' .. os.date(settings['index_template']['pattern']) - local upstream = settings.upstream:get_upstream_round_robin() - local ip_addr = upstream:get_addr():to_string(true) + upstream = settings.upstream:get_upstream_round_robin() + host = upstream:get_name():gsub(":[1-9][0-9]*$", "") + local ip_addr = upstream:get_addr():to_string(true) + push_url = connect_prefix .. ip_addr .. '/' .. es_index .. '/_bulk' - local push_url = connect_prefix .. ip_addr .. '/' .. es_index .. '/_bulk' - local bulk_json = table.concat(tbl, "\n") + bulk_json = create_bulk_json(es_index, logs_to_send) + rspamd_logger.debugm(N, log_object, 'successfully composed payload with %s log lines', nlogs_to_send) + end - local function http_callback(err, code, _, _) + local function http_callback(err, code, body, _) + local push_done = false + local push_errors = false if err then - rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s; failed attempts: %s/%s", - push_url, err, failed_sends, settings.max_fail) + rspamd_logger.errx(log_object, 'cannot send logs to elastic (%s): %s; failed attempts: %s/%s', + push_url, err, buffer['errors'], settings['limits']['max_fail']) + elseif code == 200 then + local parser = ucl.parser() + local res, ucl_err = parser:parse_string(body) + if not ucl_err and res then + local obj = parser:get_object() + if not obj['errors'] then + push_done = true + rspamd_logger.debugm(N, log_object, 'successfully sent payload with %s logs', nlogs_to_send) + else + push_errors = true + for _, value in pairs(obj['items']) do + if value['index']['status'] >= 400 then + if value['index']['error'] then + if value['index']['error']['type'] and value['index']['error']['reason'] then + rspamd_logger.errx(log_object, + 'cannot send logs to elastic (%s) due to error: %s status, %s type, due to: %s; failed attempts: %s/%s', + push_url, value['index']['status'], value['index']['error']['type'], value['index']['error']['reason'], + buffer['errors'], settings['limits']['max_fail']) + end + end + end + end + end + else + rspamd_logger.errx(log_object, + 'cannot parse response from elastic (%s): %s; failed attempts: %s/%s', + push_url, ucl_err, buffer['errors'], settings['limits']['max_fail']) + end else - if code ~= 200 then - rspamd_logger.infox(task, - "cannot push data to elastic backend (%s): wrong http code %s (%s); failed attempts: %s/%s", - push_url, err, code, failed_sends, settings.max_fail) + rspamd_logger.errx(log_object, + 'cannot send logs to elastic (%s) due to bad http status code: %s, response: %s; failed attempts: %s/%s', + push_url, code, body, buffer['errors'], settings['limits']['max_fail']) + end + -- proccess results + if push_done then + buffer['logs']:pop_first(nlogs_to_send) + buffer['errors'] = 0 + upstream:ok() + else + if buffer['errors'] >= settings['limits']['max_fail'] then + rspamd_logger.errx(log_object, 'failed to send %s log lines, failed attempts: %s/%s, removing failed logs from bugger', + nlogs_to_send, buffer['errors'], settings['limits']['max_fail']) + buffer['logs']:pop_first(nlogs_to_send) + buffer['errors'] = 0 + else + buffer['errors'] = buffer['errors'] + 1 + end + if push_errors then + upstream:ok() -- we not assume upstream is failed if it return errors in response body else - lua_util.debugm(N, task, "successfully sent %s (%s bytes) rows to ES", - nrows, #bulk_json) + upstream:fail() end end end - return rspamd_http.request({ - url = push_url, - headers = { - ['Content-Type'] = 'application/x-ndjson', - }, - body = bulk_json, - callback = http_callback, - task = task, - method = 'post', - gzip = settings.use_gzip, - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - timeout = settings.timeout, - }) + if nlogs_to_send > 0 then + if task then + return rspamd_http.request({ + url = push_url, + headers = { + ['Host'] = host, + ['Content-Type'] = 'application/x-ndjson', + }, + body = bulk_json, + task = task, + method = 'post', + callback=http_callback, + gzip = settings.use_gzip, + keepalive = settings.use_keepalive, + no_ssl_verify = settings.no_ssl_verify, + user = settings.user, + password = settings.password, + timeout = settings.timeout, + }) + else + return rspamd_http.request({ + url = push_url, + headers = { + ['Host'] = host, + ['Content-Type'] = 'application/x-ndjson', + }, + body = bulk_json, + ev_base = ev_base, + config = cfg, + method = 'post', + callback=http_callback, + gzip = settings.use_gzip, + keepalive = settings.use_keepalive, + no_ssl_verify = settings.no_ssl_verify, + user = settings.user, + password = settings.password, + timeout = settings.timeout, + }) + end + end +end + +local function get_header_name(name) + return 'header_' .. name:lower():gsub('[%s%-]', '_') end local function get_general_metadata(task) local r = {} - local ip_addr = task:get_ip() + local empty = settings['index_template']['empty_value'] + local user = task:get_user() + r.rspamd_server = rspamd_hostname or empty + r.action = task:get_metric_action() or empty + r.score = task:get_metric_score()[1] or 0 + r.symbols = task:get_symbols_all() + for _, symbol in ipairs(r.symbols) do + symbol.groups = nil -- we don't need groups array in elastic + if type(symbol.options) == "table" then + symbol.options = table.concat(symbol.options, "; ") + end + end + r.user = user or empty + if user then + r.direction = "Inbound" + else + r.direction = "Outbound" + end + r.qid = task:get_queue_id() or empty + r.helo = task:get_helo() or empty + r.hostname = task:get_hostname() or empty + + r.ip = '::' + r.is_local = false + local ip_addr = task:get_ip() if ip_addr and ip_addr:is_valid() then r.is_local = ip_addr:is_local() r.ip = tostring(ip_addr) - else - r.ip = '127.0.0.1' end - r.webmail = false - r.sender_ip = 'unknown' + r.sender_ip = '::' local origin = task:get_header('X-Originating-IP') if origin then origin = origin:gsub('%[', ''):gsub('%]', '') local rspamd_ip = require "rspamd_ip" local origin_ip = rspamd_ip.from_string(origin) if origin_ip and origin_ip:is_valid() then - r.webmail = true r.sender_ip = origin -- use string here end end - r.direction = "Inbound" - r.user = task:get_user() or 'unknown' - r.qid = task:get_queue_id() or 'unknown' - r.action = task:get_metric_action() - r.rspamd_server = HOSTNAME - if r.user ~= 'unknown' then - r.direction = "Outbound" + local message_id = task:get_message_id() + if message_id == 'undef' then + r.message_id = empty + else + r.message_id = message_id end - local s = task:get_metric_score()[1] - r.score = s - - local rcpt = task:get_recipients('smtp') - if rcpt then + if task:has_recipients('smtp') then + local rcpt = task:get_recipients('smtp') local l = {} for _, a in ipairs(rcpt) do table.insert(l, a['addr']) end r.rcpt = l else - r.rcpt = 'unknown' + r.rcpt = empty end - local from = task:get_from { 'smtp', 'orig' } - if ((from or E)[1] or E).addr then - r.from = from[1].addr - else - r.from = 'unknown' + r.from_domain = empty + r.from_user = empty + if task:has_from('smtp') then + local from = task:get_from({ 'smtp', 'orig' })[1] + if from then + r.from_domain = from['domain']:lower() + r.from_user = from['user']:lower() + end end - local mime_from = task:get_from { 'mime', 'orig' } - if ((mime_from or E)[1] or E).addr then - r.mime_from = mime_from[1].addr - else - r.mime_from = 'unknown' + r.mime_from_domain = empty + r.mime_from_user = empty + if task:has_from('mime') then + local mime_from = task:get_from({ 'mime', 'orig' })[1] + if mime_from then + r.mime_from_domain = mime_from['domain']:lower() + r.mime_from_user = mime_from['user']:lower() + end end - local syminf = task:get_symbols_all() - r.symbols = syminf + local settings_id = task:get_settings_id() + if settings_id then + -- Convert to string + local lua_settings = require "lua_settings" + settings_id = lua_settings.settings_by_id(settings_id) + if settings_id then + settings_id = settings_id.name + end + end + if not settings_id then + settings_id = empty + end + r.settings_id = settings_id + r.asn = {} local pool = task:get_mempool() - r.asn.country = pool:get_variable("country") or 'unknown' + r.asn.country = pool:get_variable("country") or empty r.asn.asn = pool:get_variable("asn") or 0 - r.asn.ipnet = pool:get_variable("ipnet") or 'unknown' + r.asn.ipnet = pool:get_variable("ipnet") or '::/128' local function process_header(name) local hdr = task:get_header_full(name) + local headers_text_ignore_above = settings['index_template']['headers_text_ignore_above'] - 3 if hdr then local l = {} for _, h in ipairs(hdr) do table.insert(l, h.decoded) end + if #l > headers_text_ignore_above and headers_text_ignore_above ~= -3 then + l = l:sub(1, headers_text_ignore_above) .. '...' + end return l else - return 'unknown' + return empty end end - r.header_from = process_header('from') - r.header_to = process_header('to') - r.header_subject = process_header('subject') - r.header_date = process_header('date') - r.message_id = task:get_message_id() - local hname = task:get_hostname() or 'unknown' - r.hostname = hname - - local settings_id = task:get_settings_id() - - if settings_id then - -- Convert to string - settings_id = lua_settings.settings_by_id(settings_id) - - if settings_id then - settings_id = settings_id.name + for _, header in ipairs(settings['collect_headers']) do + local header_name = get_header_name(header) + if not r[header_name] then + r[header_name] = process_header(header) end end - if not settings_id then - settings_id = '' + for _, header in ipairs(settings['extra_collect_headers']) do + local header_name = get_header_name(header) + if not r[header_name] then + r[header_name] = process_header(header) + end end - r.settings_id = settings_id - local scan_real = task:get_scan_time() scan_real = math.floor(scan_real * 1000) if scan_real < 0 then @@ -239,91 +635,295 @@ local function get_general_metadata(task) scan_real) scan_real = 0 end - r.scan_time = scan_real + local parts = task:get_text_parts() + local lang_t = {} + if parts then + for _, part in ipairs(parts) do + local l = part:get_language() + if l and not contains(lang_t, l) then + table.insert(lang_t, l) + end + end + if table.getn(lang_t) > 0 then + r.language = lang_t + else + r.language = empty + end + if table.getn(lang_t) == 1 and lang_t[1] == 'en' then + r.non_en = false + else + r.non_en = true + end + end + + local fuzzy_hashes = task:get_mempool():get_variable('fuzzy_hashes', 'fstrings') + if fuzzy_hashes and #fuzzy_hashes > 0 then + local l = {} + for _, h in ipairs(fuzzy_hashes) do + table.insert(l, h) + end + r.fuzzy_hashes = l + else + r.fuzzy_hashes = empty + end + + r.received_delay = 0 + if user then -- calculate received_delay only for incoming traffic + local recieved_hop = 2 + local received_headers = task:get_received_headers() + if received_headers[recieved_hop] then + if received_headers[recieved_hop]['timestamp'] then + r.received_delay = math.floor(rspamd_util.get_time()) - received_headers[recieved_hop]['timestamp'] + if r.received_delay < 0 then + r.received_delay = 0 + end + end + end + end + return r end local function elastic_collect(task) - if not enabled then - return - end if task:has_flag('skip') then return end + if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then return end - local row = { ['rspamd_meta'] = get_general_metadata(task), - ['@timestamp'] = tostring(util.get_time() * 1000) } - table.insert(rows, row) - nrows = nrows + 1 - if nrows > settings['limit'] then - lua_util.debugm(N, task, 'send elastic search rows: %s', nrows) - if elastic_send_data(task) then - nrows = 0 - rows = {} - failed_sends = 0; - else - failed_sends = failed_sends + 1 - - if failed_sends > settings.max_fail then - rspamd_logger.errx(task, 'cannot send %s rows to ES %s times, stop trying', - nrows, failed_sends) - nrows = 0 - rows = {} - failed_sends = 0; - end + if not detected_distro['supported'] then + if buffer['logs']:length() >= settings['limits']['max_rows'] then + buffer['logs']:pop_first(settings['limits']['max_rows']) + rspamd_logger.errx(task, + 'elastic distro not supported, deleting %s logs from buffer due to reaching max rows limit', + settings['limits']['max_rows']) end end + + local now = tostring(rspamd_util.get_time() * 1000) + local row = { ['rspamd_meta'] = get_general_metadata(task), ['@timestamp'] = now } + buffer['logs']:push(row) + rspamd_logger.debugm(N, task, 'saved log to buffer') end -local opts = rspamd_config:get_all_opt('elastic') +local function periodic_send_data(cfg, ev_base) + local now = tostring(rspamd_util.get_time() * 1000) + local flush_needed = false -local function check_elastic_server(cfg, ev_base, _) + + local nlogs_total = buffer['logs']:length() + if nlogs_total >= settings['limits']['max_rows'] then + rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max rows: %s/%s', nlogs_total, settings['limits']['max_rows']) + flush_needed = true + else + local first_row = buffer['logs']:get(1) + if first_row then + local time_diff = now - first_row['@timestamp'] + if time_diff > settings.limits.max_interval * 1000 then + rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max interval, diff: %s, current time: %s, log timestamp: %s', + time_diff, now, first_row['@timestamp']) + flush_needed = true + else + local size = buffer['logs']:size() + if size >= settings['limits']['max_size'] then + rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max size: %s/%s', size, settings['limits']['max_size']) + flush_needed = true + end + end + end + end + + if flush_needed then + elastic_send_data(false, nil, cfg, ev_base) + end +end + +local function configure_geoip_pipeline(cfg, ev_base) local upstream = settings.upstream:get_upstream_round_robin() + local host = upstream:get_name():gsub(":[1-9][0-9]*$", "") local ip_addr = upstream:get_addr():to_string(true) - local plugins_url = connect_prefix .. ip_addr .. '/_nodes/' .. ingest_geoip_type + local geoip_url = connect_prefix .. ip_addr .. '/_ingest/pipeline/' .. settings['geoip']['pipeline_name'] + local geoip_pipeline = { + description = "Add geoip info for rspamd", + processors = { + { + geoip = { + field = "rspamd_meta.ip", + target_field = "rspamd_meta.geoip" + } + }, + { + geoip = { + field = "rspamd_meta.sender_ip", + target_field = "rspamd_meta.sender_geoip" + } + } + } + } + + local function geoip_cb(err, code, body, _) + if err then + rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', geoip_url, err) + upstream:fail() + elseif code == 200 then + states['geoip_pipeline']['configured'] = true + upstream:ok() + else + rspamd_logger.errx(rspamd_config, + 'cannot configure elastic geoip pipeline (%s), status code: %s, response: %s', + geoip_url, code, body) + upstream:fail() + handle_error('configure', 'geoip_pipeline', settings['limits']['max_fail']) + end + end + + rspamd_http.request({ + url = geoip_url, + ev_base = ev_base, + config = cfg, + callback = geoip_cb, + headers = { + ['Host'] = host, + ['Content-Type'] = 'application/json', + }, + body = ucl.to_format(geoip_pipeline, 'json-compact'), + method = 'put', + gzip = settings.use_gzip, + keepalive = settings.use_keepalive, + no_ssl_verify = settings.no_ssl_verify, + user = settings.user, + password = settings.password, + timeout = settings.timeout, + }) +end + +local function put_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json) local function http_callback(err, code, body, _) - if code == 200 then - local parser = ucl.parser() - local res, ucl_err = parser:parse_string(body) - if not res then - rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s', - plugins_url, ucl_err) - enabled = false; - return - end - local obj = parser:get_object() - for node, value in pairs(obj['nodes']) do - local plugin_found = false - for _, plugin in pairs(value['plugins']) do - if plugin['name'] == 'ingest-geoip' then - plugin_found = true - lua_util.debugm(N, "ingest-geoip plugin has been found") + if err then + rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', policy_url, err) + upstream:fail() + elseif code == 200 or code == 201 then + rspamd_logger.infox(rspamd_config, 'successfully updated elastic index policy: %s', body) + states['index_policy']['configured'] = true + upstream:ok() + else + rspamd_logger.errx(rspamd_config, 'cannot configure elastic index policy (%s), status code: %s, response: %s', policy_url, code, body) + upstream:fail() + handle_error('configure', 'index_policy', settings['limits']['max_fail']) + end + end + + rspamd_http.request({ + url = policy_url, + ev_base = ev_base, + config = cfg, + body = index_policy_json, + headers = { + ['Host'] = host, + ['Content-Type'] = 'application/json', + }, + method = 'put', + callback = http_callback, + gzip = settings.use_gzip, + keepalive = settings.use_keepalive, + no_ssl_verify = settings.no_ssl_verify, + user = settings.user, + password = settings.password, + timeout = settings.timeout, + }) +end + +local function get_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json) + local function http_callback(err, code, body, _) + if err then + rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', policy_url, err) + upstream:fail() + elseif code == 404 then + put_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json) + elseif code == 200 then + local remote_policy_parser = ucl.parser() + local our_policy_parser = ucl.parser() + local res, ucl_err = remote_policy_parser:parse_string(body) + if not ucl_err and res then + local res, ucl_err = our_policy_parser:parse_string(index_policy_json) + if not ucl_err and res then + local remote_policy = remote_policy_parser:get_object() + local our_policy = our_policy_parser:get_object() + local update_needed = false + if detected_distro['name'] == 'elastic' then + local index_policy_name = settings['index_policy']['name'] + local current_phases = safe_get(remote_policy, index_policy_name, 'policy', 'phases') + if not deep_compare(our_policy['policy']['phases'], current_phases) then + update_needed = true + end + elseif detected_distro['name'] == 'opensearch' then + local current_default_state = safe_get(remote_policy, 'policy', 'default_state') + local current_ism_index_patterns = safe_get(remote_policy, 'policy', 'ism_template', 1, 'index_patterns') + local current_states = safe_get(remote_policy, 'policy', 'states') + if not deep_compare(our_policy['policy']['default_state'], current_default_state) then + update_needed = true + elseif not deep_compare(our_policy['policy']['ism_template'][1]['index_patterns'], current_ism_index_patterns) then + update_needed = true + elseif not deep_compare(our_policy['policy']['states'], current_states) then + update_needed = true + end end + if update_needed then + if detected_distro['name'] == 'elastic' then + put_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json) + elseif detected_distro['name'] == 'opensearch' then + local seq_no = remote_policy['_seq_no'] + local primary_term = remote_policy['_primary_term'] + if type(seq_no) == 'number' and type(primary_term) == 'number' then + upstream:ok() + -- adjust policy url to include seq_no with primary_term + -- https://opensearch.org/docs/2.17/im-plugin/ism/api/#update-policy + policy_url = policy_url .. '?if_seq_no=' .. seq_no .. '&if_primary_term=' .. primary_term + put_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json) + else + rspamd_logger.errx(rspamd_config, + 'current elastic index policy (%s) not returned correct seq_no/primary_term, policy will not be updated, response: %s', + policy_url, body) + upstream:fail() + handle_error('validate current', 'index_policy', settings['limits']['max_fail']) + end + end + end + else + rspamd_logger.errx(rspamd_config, 'failed to parse our index policy for elastic: %s', ucl_err) + upstream:fail() + handle_error('parse our', 'index_policy', settings['limits']['max_fail']) end - if not plugin_found then - rspamd_logger.infox(rspamd_config, - 'Unable to find ingest-geoip on %1 node, disabling module', node) - enabled = false - return - end + else + rspamd_logger.errx(rspamd_config, 'failed to parse remote index policy from elastic: %s', ucl_err) + upstream:fail() + handle_error('parse remote', 'index_policy', settings['limits']['max_fail']) end else - rspamd_logger.errx('cannot get plugins from %s: %s(%s) (%s)', plugins_url, - err, code, body) - enabled = false + rspamd_logger.errx(rspamd_config, + 'cannot get current elastic index policy (%s), status code: %s, response: %s', + policy_url, code, body) + handle_error('get current', 'index_policy', settings['limits']['max_fail']) + upstream:fail() end end + rspamd_http.request({ - url = plugins_url, + url = policy_url, ev_base = ev_base, config = cfg, + headers = { + ['Host'] = host, + ['Content-Type'] = 'application/json', + }, method = 'get', callback = http_callback, + gzip = settings.use_gzip, + keepalive = settings.use_keepalive, no_ssl_verify = settings.no_ssl_verify, user = settings.user, password = settings.password, @@ -331,165 +931,622 @@ local function check_elastic_server(cfg, ev_base, _) }) end --- import ingest pipeline and kibana dashboard/visualization -local function initial_setup(cfg, ev_base, worker) - if not worker:is_primary_controller() then - return - end - +local function configure_index_policy(cfg, ev_base) local upstream = settings.upstream:get_upstream_round_robin() + local host = upstream:get_name():gsub(":[1-9][0-9]*$", "") local ip_addr = upstream:get_addr():to_string(true) + local index_policy_path = nil + local index_policy = {} + if detected_distro['name'] == 'elastic' then + index_policy_path = '/_ilm/policy/' + elseif detected_distro['name'] == 'opensearch' then + index_policy_path = '/_plugins/_ism/policies/' + end + local policy_url = connect_prefix .. ip_addr .. index_policy_path .. settings['index_policy']['name'] - local function push_kibana_template() - -- add kibana dashboard and visualizations - if settings['import_kibana'] then - local kibana_mappings = read_file(settings['kibana_file']) - if kibana_mappings then - local parser = ucl.parser() - local res, parser_err = parser:parse_string(kibana_mappings) - if not res then - rspamd_logger.infox(rspamd_config, 'kibana template cannot be parsed: %s', - parser_err) - enabled = false - - return - end - local obj = parser:get_object() - local tbl = {} - for _, item in ipairs(obj) do - table.insert(tbl, '{ "index" : { "_index" : ".kibana", "_type" : "doc" ,"_id": "' .. - item['_type'] .. ':' .. item["_id"] .. '"} }') - table.insert(tbl, ucl.to_format(item['_source'], 'json-compact')) - end - table.insert(tbl, '') -- For last \n - - local kibana_url = connect_prefix .. ip_addr .. '/.kibana/_bulk' - local function kibana_template_callback(err, code, body, _) - if code ~= 200 then - rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)', kibana_url, - err, code, body) - enabled = false - else - lua_util.debugm(N, 'pushed kibana template: %s', body) - end - end + -- ucl.to_format(obj, 'json') can't manage empty {} objects, it will be treat them as [] in json as result, + -- so we write {} as '{emty_object}', which allows us to replace '"{emty_object}"' string after convertion to json to '{}' + local index_policy_json = '' - rspamd_http.request({ - url = kibana_url, - ev_base = ev_base, - config = cfg, - headers = { - ['Content-Type'] = 'application/x-ndjson', + -- elastic lifecycle policy with hot state + if detected_distro['name'] == 'elastic' then + index_policy = { + policy = { + phases = { + hot = { + min_age = '0ms', + actions = { + set_priority = { + priority = settings['index_policy']['hot']['index_priority'], + }, + }, }, - body = table.concat(tbl, "\n"), - method = 'post', - gzip = settings.use_gzip, - callback = kibana_template_callback, - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - timeout = settings.timeout, - }) - else - rspamd_logger.infox(rspamd_config, 'kibana template file %s not found', settings['kibana_file']) + }, + }, + } + -- elastic lifecycle warm + if settings['index_policy']['warm']['enabled'] then + local warm_obj = {} + warm_obj['min_age'] = settings['index_policy']['warm']['after'] + warm_obj['actions'] = { + set_priority = { + priority = settings['index_policy']['warm']['index_priority'], + }, + } + if not settings['index_policy']['warm']['migrate'] then + warm_obj['actions']['migrate'] = { enabled = false } + end + if settings['index_policy']['warm']['read_only'] then + warm_obj['actions']['readonly'] = '{empty_object}' end + if settings['index_policy']['warm']['change_replicas'] then + warm_obj['actions']['allocate'] = { + number_of_replicas = settings['index_policy']['warm']['replicas_count'], + } + end + if settings['index_policy']['warm']['shrink'] then + if settings['index_policy']['warm']['max_gb_per_shard'] then + warm_obj['actions']['shrink'] = { + max_primary_shard_size = settings['index_policy']['warm']['max_gb_per_shard'] .. 'gb', + } + else + warm_obj['actions']['shrink'] = { + number_of_shards = settings['index_policy']['warm']['shards_count'], + } + end + end + if settings['index_policy']['warm']['force_merge'] then + warm_obj['actions']['forcemerge'] = { + max_num_segments = settings['index_policy']['warm']['segments_count'], + } + end + index_policy['policy']['phases']['warm'] = warm_obj end - end - - if enabled then - -- create ingest pipeline - local geoip_url = connect_prefix .. ip_addr .. '/_ingest/pipeline/rspamd-geoip' - local function geoip_cb(err, code, body, _) - if code ~= 200 then - rspamd_logger.errx('cannot get data from %s: %s(%s) (%s)', - geoip_url, err, code, body) - enabled = false + -- elastic lifecycle cold + if settings['index_policy']['cold']['enabled'] then + local cold_obj = {} + cold_obj['min_age'] = settings['index_policy']['cold']['after'] + cold_obj['actions'] = { + set_priority = { + priority = settings['index_policy']['cold']['index_priority'], + }, + } + if not settings['index_policy']['cold']['migrate'] then + cold_obj['actions']['migrate'] = { enabled = false } + end + if settings['index_policy']['cold']['read_only'] then + cold_obj['actions']['readonly'] = '{empty_object}' + end + if settings['index_policy']['cold']['change_replicas'] then + cold_obj['actions']['allocate'] = { + number_of_replicas = settings['index_policy']['cold']['replicas_count'], + } + end + index_policy['policy']['phases']['cold'] = cold_obj + end + -- elastic lifecycle delete + if settings['index_policy']['delete']['enabled'] then + local delete_obj = {} + delete_obj['min_age'] = settings['index_policy']['delete']['after'] + delete_obj['actions'] = { + delete = { delete_searchable_snapshot = true }, + } + index_policy['policy']['phases']['delete'] = delete_obj + end + -- opensearch state policy with hot state + elseif detected_distro['name'] == 'opensearch' then + local retry = { + count = 3, + backoff = 'exponential', + delay = '1m', + } + index_policy = { + policy = { + description = 'Rspamd index state policy', + ism_template = { + { + index_patterns = { settings['index_template']['name'] .. '-*' }, + priority = 100, + }, + }, + default_state = 'hot', + states = { + { + name = 'hot', + actions = { + { + index_priority = { + priority = settings['index_policy']['hot']['index_priority'], + }, + retry = retry, + }, + }, + transitions = {}, + }, + }, + }, + } + local state_id = 1 -- includes hot state + -- opensearch state warm + if settings['index_policy']['warm']['enabled'] then + local prev_state_id = state_id + state_id = state_id + 1 + index_policy['policy']['states'][prev_state_id]['transitions'] = { + { + state_name = 'warm', + conditions = { + min_index_age = settings['index_policy']['warm']['after'] + }, + }, + } + local warm_obj = { + name = 'warm', + actions = { + { + index_priority = { + priority = settings['index_policy']['warm']['index_priority'], + }, + retry = retry, + }, + }, + transitions = {}, + } + table.insert(index_policy['policy']['states'], warm_obj) + if settings['index_policy']['warm']['read_only'] then + local read_only = { + read_only = '{empty_object}', + retry = retry, + } + table.insert(index_policy['policy']['states'][state_id]['actions'], read_only) + end + if settings['index_policy']['warm']['change_replicas'] then + local change_replicas = { + replica_count = { + number_of_replicas = settings['index_policy']['warm']['replicas_count'], + }, + retry = retry, + } + table.insert(index_policy['policy']['states'][state_id]['actions'], change_replicas) + end + if settings['index_policy']['warm']['shrink'] then + local shrink = { + shrink = {}, + retry = retry, + } + if settings['index_policy']['warm']['max_gb_per_shard'] then + shrink['shrink']['max_shard_size'] = settings['index_policy']['warm']['max_gb_per_shard'] .. 'gb' + else + shrink['shrink']['num_new_shards'] = settings['index_policy']['warm']['shards_count'] + end + shrink['shrink']['switch_aliases'] = false + table.insert(index_policy['policy']['states'][state_id]['actions'], shrink) + end + if settings['index_policy']['warm']['force_merge'] then + local force_merge = { + force_merge = { + max_num_segments = settings['index_policy']['warm']['segments_count'], + }, + retry = retry, + } + table.insert(index_policy['policy']['states'][state_id]['actions'], force_merge) end end - local template = { - description = "Add geoip info for rspamd", - processors = { + -- opensearch state cold + if settings['index_policy']['cold']['enabled'] then + local prev_state_id = state_id + state_id = state_id + 1 + index_policy['policy']['states'][prev_state_id]['transitions'] = { { - geoip = { - field = "rspamd_meta.ip", - target_field = "rspamd_meta.geoip" - } + state_name = 'cold', + conditions = { + min_index_age = settings['index_policy']['cold']['after'] + }, + }, + } + local cold_obj = { + name = 'cold', + actions = { + { + index_priority = { + priority = settings['index_policy']['cold']['index_priority'], + }, + retry = retry, + }, + }, + transitions = {}, + } + table.insert(index_policy['policy']['states'], cold_obj) + if settings['index_policy']['cold']['read_only'] then + local read_only = { + read_only = '{empty_object}', + retry = retry, } + table.insert(index_policy['policy']['states'][state_id]['actions'], read_only) + end + if settings['index_policy']['cold']['change_replicas'] then + local change_replicas = { + replica_count = { + number_of_replicas = settings['index_policy']['cold']['replicas_count'], + }, + retry = retry, + } + table.insert(index_policy['policy']['states'][state_id]['actions'], change_replicas) + end + end + -- opensearch state delete + if settings['index_policy']['delete']['enabled'] then + local prev_state_id = state_id + state_id = state_id + 1 + index_policy['policy']['states'][prev_state_id]['transitions'] = { + { + state_name = 'delete', + conditions = { + min_index_age = settings['index_policy']['delete']['after'] + }, + }, } - } - rspamd_http.request({ - url = geoip_url, - ev_base = ev_base, - config = cfg, - callback = geoip_cb, - headers = { - ['Content-Type'] = 'application/json', + local delete_obj = { + name = 'delete', + actions = { + { + delete = '{empty_object}', + retry = retry, + }, + }, + transitions = {}, + } + table.insert(index_policy['policy']['states'], delete_obj) + end + end + + -- finish rendering index policy, will now get current version and update it if neeeded + index_policy_json = ucl.to_format(index_policy, 'json-compact'):gsub('"{empty_object}"', '{}') + get_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json) +end + +local function configure_index_template(cfg, ev_base) + local upstream = settings.upstream:get_upstream_round_robin() + local host = upstream:get_name():gsub(":[1-9][0-9]*$", "") + local ip_addr = upstream:get_addr():to_string(true) + local template_url = connect_prefix .. ip_addr .. '/_index_template/' .. settings['index_template']['name'] + + -- common data types + local t_boolean_nil_true = { type = 'boolean', null_value = true } + local t_boolean_nil_false = { type = 'boolean', null_value = false } + local t_date = { type = 'date' } + local t_long = { type = 'long', null_value = 0 } + local t_float = { type = 'float', null_value = 0 } + local t_double = { type = 'double', null_value = 0 } + local t_ip = { type = 'ip', null_value = '::' } + local t_geo_point = { type = 'geo_point' } + local t_keyword = { type = 'keyword', null_value = settings['index_template']['empty_value'] } + local t_text = { type = 'text' } + local t_text_with_keyword = { + type = 'text', + fields = { + keyword = { + type = 'keyword', + ignore_above = settings['index_template']['dynamic_keyword_ignore_above'], }, - gzip = settings.use_gzip, - body = ucl.to_format(template, 'json-compact'), - method = 'put', - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - timeout = settings.timeout, - }) - -- create template mappings if not exist - local template_url = connect_prefix .. ip_addr .. '/_template/rspamd' - local function http_template_put_callback(err, code, body, _) - if code ~= 200 then - rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)', - template_url, err, code, body) - enabled = false + }, + } + + -- common objects types + local geoip_obj = { + dynamic = false, + type = 'object', + properties = { + continent_name = t_text, + region_iso_code = t_keyword, + city_name = t_text, + country_iso_code = t_keyword, + country_name = t_text, + location = t_geo_point, + region_name = t_text, + }, + } + local asn_obj = { + dynamic = false, + type = 'object', + properties = { + country = t_keyword, + asn = t_long, + ipnet = t_keyword, -- do not use ip_range type, it's not usable for search + }, + } + local symbols_obj = { + dynamic = false, + type = 'object', + properties = { + name = t_keyword, + group = t_keyword, + options = t_text_with_keyword, + score = t_double, + weight = t_double, + }, + } + if settings['index_template']['symbols_nested'] then + symbols_obj['type'] = 'nested' + end + + -- dynamic templates + local dynamic_templates_obj = {} + local dynamic_strings = { + strings = { + match_mapping_type = 'string', + mapping = { + type = 'text', + fields = { + keyword = { + type = 'keyword', + ignore_above = settings['index_template']['dynamic_keyword_ignore_above'], + }, + }, + }, + }, + } + table.insert(dynamic_templates_obj, dynamic_strings) + + -- index template rendering + local index_template = { + index_patterns = { settings['index_template']['name'] .. '-*', }, + priority = settings['index_template']['priority'], + template = { + settings = { + index = { + number_of_shards = settings['index_template']['shards_count'], + number_of_replicas = settings['index_template']['replicas_count'], + refresh_interval = settings['index_template']['refresh_interval'] .. 's', + }, + }, + mappings = { + dynamic = false, + dynamic_templates = dynamic_templates_obj, + properties = { + ['@timestamp'] = t_date, + rspamd_meta = { + dynamic = true, + type = 'object', + properties = { + rspamd_server = t_keyword, + action = t_keyword, + score = t_double, + symbols = symbols_obj, + user = t_keyword, + direction = t_keyword, + qid = t_keyword, + helo = t_text_with_keyword, + hostname = t_text_with_keyword, + ip = t_ip, + is_local = t_boolean_nil_false, + sender_ip = t_ip, + message_id = t_text_with_keyword, + rcpt = t_text_with_keyword, + from_domain = t_keyword, + from_user = t_keyword, + mime_from_domain = t_keyword, + mime_from_user = t_keyword, + settings_id = t_keyword, + asn = asn_obj, + scan_time = t_float, + language = t_text, + non_en = t_boolean_nil_true, + fuzzy_hashes = t_text, + received_delay = t_long, + }, + }, + }, + }, + }, + } + + -- render index lifecycle policy + if detected_distro['name'] == 'elastic' and settings['index_policy']['enabled'] then + index_template['template']['settings']['index']['lifecycle'] = { + name = settings['index_policy']['name'] + } + end + + -- render geoip mappings + if settings['geoip']['enabled'] then + index_template['template']['mappings']['properties']['rspamd_meta']['properties']['geoip'] = geoip_obj + index_template['template']['mappings']['properties']['rspamd_meta']['properties']['sender_geoip'] = geoip_obj + end + + -- render collect_headers and extra_collect_headers mappings + for _, header in ipairs(settings['collect_headers']) do + local header_name = get_header_name(header) + if not index_template['template']['mappings']['properties']['rspamd_meta']['properties'][header_name] then + index_template['template']['mappings']['properties']['rspamd_meta']['properties'][header_name] = t_text_with_keyword + end + end + for _, header in ipairs(settings['extra_collect_headers']) do + local header_name = get_header_name(header) + if not index_template['template']['mappings']['properties']['rspamd_meta']['properties'][header_name] then + index_template['template']['mappings']['properties']['rspamd_meta']['properties'][header_name] = t_text_with_keyword + end + end + + local function http_callback(err, code, body, _) + if err then + rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', template_url, err) + upstream:fail() + elseif code == 200 then + rspamd_logger.infox(rspamd_config, 'successfully updated elastic index template: %s', body) + states['index_template']['configured'] = true + upstream:ok() + else + rspamd_logger.errx(rspamd_config, 'cannot configure elastic index template (%s), status code: %s, response: %s', + template_url, code, body) + upstream:fail() + handle_error('configure', 'index_template', settings['limits']['max_fail']) + end + end + + rspamd_http.request({ + url = template_url, + ev_base = ev_base, + config = cfg, + body = ucl.to_format(index_template, 'json-compact'), + headers = { + ['Host'] = host, + ['Content-Type'] = 'application/json', + }, + method = 'put', + callback = http_callback, + gzip = settings.use_gzip, + keepalive = settings.use_keepalive, + no_ssl_verify = settings.no_ssl_verify, + user = settings.user, + password = settings.password, + timeout = settings.timeout, + }) +end + +local function verify_distro(manual) + local detected_distro_name = detected_distro['name'] + local detected_distro_version = detected_distro['version'] + local valid = true + local valid_unknown = false + + -- check that detected_distro_name is valid + if not detected_distro_name then + rspamd_logger.errx(rspamd_config, 'failed to detect elastic distribution') + valid = false + elseif not supported_distro[detected_distro_name] then + rspamd_logger.errx(rspamd_config, 'unsupported elastic distribution: %s', detected_distro_name) + valid = false + else + local supported_distro_info = supported_distro[detected_distro_name] + -- check that detected_distro_version is valid + if not detected_distro_version or type(detected_distro_version) ~= 'string' then + rspamd_logger.errx(rspamd_config, 'elastic version should be a string, but we received: %s', type(detected_distro_version)) + valid = false + elseif detected_distro_version == '' then + rspamd_logger.errx(rspamd_config, 'unsupported elastic version: empty string') + valid = false + else + -- compare versions using compare_versions + local cmp_from = compare_versions(detected_distro_version, supported_distro_info['from']) + if cmp_from == -1 then + rspamd_logger.errx(rspamd_config, 'unsupported elastic version: %s, minimal supported version of %s is %s', + detected_distro_version, detected_distro_name, supported_distro_info['from']) + valid = false else - lua_util.debugm(N, 'pushed rspamd template: %s', body) - push_kibana_template() + local cmp_till = compare_versions(detected_distro_version, supported_distro_info['till']) + if (cmp_till >= 0) and not supported_distro_info['till_unknown'] then + rspamd_logger.errx(rspamd_config, 'unsupported elastic version: %s, maximum supported version of %s is less than %s', + detected_distro_version, detected_distro_name, supported_distro_info['till']) + valid = false + elseif (cmp_till >= 0) and supported_distro_info['till_unknown'] then + rspamd_logger.warnx(rspamd_config, + 'compatibility of elastic version: %s is unknown, maximum known supported version of %s is less than %s, use at your own risk', + detected_distro_version, detected_distro_name, supported_distro_info['till']) + valid_unknown = true + end end end - local function http_template_exist_callback(_, code, _, _) - if code ~= 200 then - rspamd_http.request({ - url = template_url, - ev_base = ev_base, - config = cfg, - body = elastic_template, - method = 'put', - headers = { - ['Content-Type'] = 'application/json', - }, - gzip = settings.use_gzip, - callback = http_template_put_callback, - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - timeout = settings.timeout, - }) + end + + if valid_unknown then + detected_distro['supported'] = true + else + if valid and manual then + rspamd_logger.infox( + rspamd_config, 'assuming elastic distro: %s, version: %s', detected_distro_name, detected_distro_version) + detected_distro['supported'] = true + elseif valid and not manual then + rspamd_logger.infox(rspamd_config, 'successfully connected to elastic distro: %s, version: %s', + detected_distro_name, detected_distro_version) + detected_distro['supported'] = true + else + handle_error('configure','distro',settings['version']['autodetect_max_fail']) + end + end +end + +local function configure_distro(cfg, ev_base) + if not settings['version']['autodetect_enabled'] then + detected_distro['name'] = settings['version']['override']['name'] + detected_distro['version'] = settings['version']['override']['version'] + rspamd_logger.infox(rspamd_config, 'automatic detection of elastic distro and version is disabled, taking configuration from settings') + verify_distro(true) + end + + local upstream = settings.upstream:get_upstream_round_robin() + local host = upstream:get_name():gsub(":[1-9][0-9]*$", "") + local ip_addr = upstream:get_addr():to_string(true) + local root_url = connect_prefix .. ip_addr .. '/' + local function http_callback(err, code, body, _) + if err then + rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', root_url, err) + upstream:fail() + elseif code ~= 200 then + rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s), status code: %s, response: %s', root_url, code, body) + upstream:fail() + else + local parser = ucl.parser() + local res, ucl_err = parser:parse_string(body) + if not res then + rspamd_logger.errx(rspamd_config, 'failed to parse reply from elastic (%s): %s', root_url, ucl_err) + upstream:fail() else - push_kibana_template() + local obj = parser:get_object() + if obj['tagline'] == "The OpenSearch Project: https://opensearch.org/" then + detected_distro['name'] = 'opensearch' + end + if obj['tagline'] == "You Know, for Search" then + detected_distro['name'] = 'elastic' + end + if obj['version'] then + if obj['version']['number'] then + detected_distro['version'] = obj['version']['number'] + end + if not detected_distro['name'] and obj['version']['distribution'] then + detected_distro['name'] = obj['version']['distribution'] + end + end + verify_distro() + if detected_distro['supported'] then + upstream:ok() + end end end + end + if settings['version']['autodetect_enabled'] then rspamd_http.request({ - url = template_url, + url = root_url, ev_base = ev_base, config = cfg, - method = 'head', - callback = http_template_exist_callback, + headers = { + ['Host'] = host, + ['Content-Type'] = 'application/json', + }, + method = 'get', + callback = http_callback, + gzip = settings.use_gzip, + keepalive = settings.use_keepalive, no_ssl_verify = settings.no_ssl_verify, user = settings.user, password = settings.password, timeout = settings.timeout, }) - end end -redis_params = rspamd_redis.parse_redis_server('elastic') +local opts = rspamd_config:get_all_opt('elastic') -if redis_params and opts then - for k, v in pairs(opts) do +if opts then + for k,v in pairs(opts) do settings[k] = v end + if not settings['enabled'] then + rspamd_logger.infox(rspamd_config, 'module disabled in config') + lua_util.disable_module(N, "config") + end + if not settings['server'] and not settings['servers'] then rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module') lua_util.disable_module(N, "config") @@ -498,29 +1555,10 @@ if redis_params and opts then connect_prefix = 'https://' end - if settings.ingest_module then - ingest_geoip_type = 'modules' - end - - settings.upstream = upstream_list.create(rspamd_config, - settings['server'] or settings['servers'], 9200) + settings.upstream = upstream_list.create(rspamd_config, settings['server'] or settings['servers'], 9200) if not settings.upstream then - rspamd_logger.errx('cannot parse elastic address: %s', - settings['server'] or settings['servers']) - lua_util.disable_module(N, "config") - return - end - if not settings['template_file'] then - rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module') - lua_util.disable_module(N, "config") - return - end - - elastic_template = read_file(settings['template_file']); - if not elastic_template then - rspamd_logger.infox(rspamd_config, 'elastic unable to read %s, disabling module', - settings['template_file']) + rspamd_logger.errx(rspamd_config, 'cannot parse elastic address: %s', settings['server'] or settings['servers']) lua_util.disable_module(N, "config") return end @@ -533,12 +1571,79 @@ if redis_params and opts then augmentations = { string.format("timeout=%f", settings.timeout) }, }) + -- send tail of data if worker going to stop + rspamd_config:register_finish_script(function(task) + local nlogs_total = buffer['logs']:length() + if nlogs_total > 0 then + rspamd_logger.debugm(N, task, 'flushing buffer on shutdown, buffer size: %s', nlogs_total) + elastic_send_data(true, task) + end + end) + rspamd_config:add_on_load(function(cfg, ev_base, worker) if worker:is_scanner() then - check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements - initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations + rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base) + if not detected_distro['supported'] then + if states['distro']['configured'] then + return false -- stop running periodic job + else + configure_distro(cfg, ev_base) + return true -- continue running periodic job + end + end + end) + -- send data periodically if any of limits reached + rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base) + if detected_distro['supported'] then + periodic_send_data(cfg, ev_base) + end + return true + end) + end + if worker:is_primary_controller() then + rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base) + if not settings['index_template']['managed'] then + return false + elseif not detected_distro['supported'] then + return true + else + if states['index_template']['configured'] then + return false + else + configure_index_template(cfg, ev_base) + return true + end + end + end) + rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base) + if not settings['index_policy']['enabled'] or not settings['index_policy']['managed'] then + return false + elseif not detected_distro['supported'] then + return true + else + if states['index_policy']['configured'] then + return false + else + configure_index_policy(cfg, ev_base) + return true + end + end + end) + rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base) + if not settings['geoip']['enabled'] or not settings['geoip']['managed'] then + return false + elseif not detected_distro['supported'] then + return true + else + if states['geoip_pipeline']['configured'] then + return false + else + configure_geoip_pipeline(cfg, ev_base) + return true + end + end + end) end end) end - end -- cgit v1.2.3 From e660799311c2ebc60eca0f83e80fdd9394344420 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 14 Oct 2024 14:08:52 +0200 Subject: Update src/plugins/lua/elastic.lua Co-authored-by: Vsevolod Stakhov --- src/plugins/lua/elastic.lua | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index 205a5ce86..4303ce6cc 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -659,15 +659,7 @@ local function get_general_metadata(task) end local fuzzy_hashes = task:get_mempool():get_variable('fuzzy_hashes', 'fstrings') - if fuzzy_hashes and #fuzzy_hashes > 0 then - local l = {} - for _, h in ipairs(fuzzy_hashes) do - table.insert(l, h) - end - r.fuzzy_hashes = l - else - r.fuzzy_hashes = empty - end +r.fuzzy_hashes = fuzzy_hashes or empty r.received_delay = 0 if user then -- calculate received_delay only for incoming traffic -- cgit v1.2.3 From 4a704531cac168af1dd903825b6fc53fa43e7cfe Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 14 Oct 2024 14:09:05 +0200 Subject: Update src/plugins/lua/elastic.lua Co-authored-by: Vsevolod Stakhov --- src/plugins/lua/elastic.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index 4303ce6cc..334402a7e 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -651,7 +651,7 @@ local function get_general_metadata(task) else r.language = empty end - if table.getn(lang_t) == 1 and lang_t[1] == 'en' then + if #lang_t == 1 and lang_t[1] == 'en' then r.non_en = false else r.non_en = true -- cgit v1.2.3 From 77ff75a78d99c073edc0f002af2d8a89d5b44288 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 14 Oct 2024 14:09:12 +0200 Subject: Update src/plugins/lua/elastic.lua Co-authored-by: Vsevolod Stakhov --- src/plugins/lua/elastic.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index 334402a7e..d553a65c6 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -646,7 +646,7 @@ local function get_general_metadata(task) table.insert(lang_t, l) end end - if table.getn(lang_t) > 0 then + if #lang_t > 0 then r.language = lang_t else r.language = empty -- cgit v1.2.3 From 8b46d65e54b0447459edd1f7c2c586325a6350e5 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 14 Oct 2024 14:34:37 +0200 Subject: resolve linter issues and remove custom deep_compare function in favor of lua_util.table_cmp --- src/plugins/lua/elastic.lua | 83 ++++++++++++--------------------------------- 1 file changed, 21 insertions(+), 62 deletions(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index d553a65c6..b2c6d36ba 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -230,9 +230,11 @@ function Queue:pop_first(count) local popped_items = {} count = count or self:length() local actual_count = math.min(count, self:length()) - for i = 1, actual_count do + local n = 0 + while n < actual_count do local item = self:pop() table.insert(popped_items, item) + n = n + 1 end return popped_items end @@ -262,49 +264,6 @@ local function safe_get(table, ...) return value end -local function deep_compare(t1, t2, visited) - if t1 == t2 then - return true - end - - if type(t1) ~= "table" or type(t2) ~= "table" then - return false - end - - -- use visited to keep track of already compared tables to handle cycles - visited = visited or {} - if visited[t1] and visited[t1][t2] then - return true - end - - visited[t1] = visited[t1] or {} - visited[t1][t2] = true - - -- compare the number of keys in both tables - local t1len = 0 - for _ in pairs(t1) do - t1len = t1len + 1 - end - - local t2len = 0 - for _ in pairs(t2) do - t2len = t2len + 1 - end - - if t1len ~= t2len then - return false - end - - -- recursively compare each key-value pair - for k, v1 in pairs(t1) do - local v2 = t2[k] - if v2 == nil or not deep_compare(v1, v2, visited) then - return false - end - end - return true -end - local function compare_versions(v1, v2) -- helper function to extract the numeric version string local function extract_numeric_version(version) @@ -839,28 +798,28 @@ local function get_index_policy(cfg, ev_base, upstream, host, policy_url, index_ elseif code == 200 then local remote_policy_parser = ucl.parser() local our_policy_parser = ucl.parser() - local res, ucl_err = remote_policy_parser:parse_string(body) - if not ucl_err and res then - local res, ucl_err = our_policy_parser:parse_string(index_policy_json) - if not ucl_err and res then + local rp_res, rp_ucl_err = remote_policy_parser:parse_string(body) + if rp_res and not rp_ucl_err then + local op_res, op_ucl_err = our_policy_parser:parse_string(index_policy_json) + if op_res and not op_ucl_err then local remote_policy = remote_policy_parser:get_object() local our_policy = our_policy_parser:get_object() local update_needed = false if detected_distro['name'] == 'elastic' then local index_policy_name = settings['index_policy']['name'] local current_phases = safe_get(remote_policy, index_policy_name, 'policy', 'phases') - if not deep_compare(our_policy['policy']['phases'], current_phases) then + if not lua_util.table_cmp(our_policy['policy']['phases'], current_phases) then update_needed = true end elseif detected_distro['name'] == 'opensearch' then local current_default_state = safe_get(remote_policy, 'policy', 'default_state') local current_ism_index_patterns = safe_get(remote_policy, 'policy', 'ism_template', 1, 'index_patterns') local current_states = safe_get(remote_policy, 'policy', 'states') - if not deep_compare(our_policy['policy']['default_state'], current_default_state) then + if not lua_util.table_cmp(our_policy['policy']['default_state'], current_default_state) then update_needed = true - elseif not deep_compare(our_policy['policy']['ism_template'][1]['index_patterns'], current_ism_index_patterns) then + elseif not lua_util.table_cmp(our_policy['policy']['ism_template'][1]['index_patterns'], current_ism_index_patterns) then update_needed = true - elseif not deep_compare(our_policy['policy']['states'], current_states) then + elseif not lua_util.table_cmp(our_policy['policy']['states'], current_states) then update_needed = true end end @@ -1574,26 +1533,26 @@ if opts then rspamd_config:add_on_load(function(cfg, ev_base, worker) if worker:is_scanner() then - rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base) + rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base) if not detected_distro['supported'] then if states['distro']['configured'] then return false -- stop running periodic job else - configure_distro(cfg, ev_base) + configure_distro(p_cfg, p_ev_base) return true -- continue running periodic job end end end) -- send data periodically if any of limits reached - rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base) + rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base) if detected_distro['supported'] then - periodic_send_data(cfg, ev_base) + periodic_send_data(p_cfg, p_ev_base) end return true end) end if worker:is_primary_controller() then - rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base) + rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base) if not settings['index_template']['managed'] then return false elseif not detected_distro['supported'] then @@ -1602,12 +1561,12 @@ if opts then if states['index_template']['configured'] then return false else - configure_index_template(cfg, ev_base) + configure_index_template(p_cfg, p_ev_base) return true end end end) - rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base) + rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base) if not settings['index_policy']['enabled'] or not settings['index_policy']['managed'] then return false elseif not detected_distro['supported'] then @@ -1616,12 +1575,12 @@ if opts then if states['index_policy']['configured'] then return false else - configure_index_policy(cfg, ev_base) + configure_index_policy(p_cfg, p_ev_base) return true end end end) - rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base) + rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base) if not settings['geoip']['enabled'] or not settings['geoip']['managed'] then return false elseif not detected_distro['supported'] then @@ -1630,7 +1589,7 @@ if opts then if states['geoip_pipeline']['configured'] then return false else - configure_geoip_pipeline(cfg, ev_base) + configure_geoip_pipeline(p_cfg, p_ev_base) return true end end -- cgit v1.2.3 From 28b772738c09661bb9845cb7330c2381eb143404 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 14 Oct 2024 14:35:24 +0200 Subject: add missing change in configured state when no update is needed on index_policy --- src/plugins/lua/elastic.lua | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index b2c6d36ba..b117d6e4c 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -823,7 +823,10 @@ local function get_index_policy(cfg, ev_base, upstream, host, policy_url, index_ update_needed = true end end - if update_needed then + if not update_needed then + rspamd_logger.infox(rspamd_config, 'elastic index policy is up-to-date') + states['index_policy']['configured'] = true + else if detected_distro['name'] == 'elastic' then put_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json) elseif detected_distro['name'] == 'opensearch' then -- cgit v1.2.3 From bcc4e26d8c695ab30fb3e4037fd116dac17bb06d Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 14 Oct 2024 14:49:31 +0200 Subject: align conf and module settings in lua --- src/plugins/lua/elastic.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index b117d6e4c..66199db78 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -80,7 +80,7 @@ local settings = { } }, limits = { - max_rows = 1000, -- max logs in one bulk req to elastic and first reason to flush buffer + max_rows = 500, -- max logs in one bulk req to elastic and first reason to flush buffer max_interval = 60, -- seconds, if first log in buffer older then interval - flush buffer max_size = 5000000, -- max symbols count in buffer, if reached - flush buffer, f.e: 5000000 ~= 10MB/normal-worker max_fail = 3, -- cgit v1.2.3 From beff1ed34a04a9886f349bfb7ab2abf913bec51a Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 14 Oct 2024 15:23:36 +0200 Subject: fix tabulation --- src/plugins/lua/elastic.lua | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index 66199db78..368ef5b74 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -618,7 +618,7 @@ local function get_general_metadata(task) end local fuzzy_hashes = task:get_mempool():get_variable('fuzzy_hashes', 'fstrings') -r.fuzzy_hashes = fuzzy_hashes or empty + r.fuzzy_hashes = fuzzy_hashes or empty r.received_delay = 0 if user then -- calculate received_delay only for incoming traffic @@ -665,7 +665,6 @@ local function periodic_send_data(cfg, ev_base) local now = tostring(rspamd_util.get_time() * 1000) local flush_needed = false - local nlogs_total = buffer['logs']:length() if nlogs_total >= settings['limits']['max_rows'] then rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max rows: %s/%s', nlogs_total, settings['limits']['max_rows']) -- cgit v1.2.3 From 6387273113ff352d6036dbcde445e196c7db3a08 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 14 Oct 2024 16:01:35 +0200 Subject: reuse http_request data --- src/plugins/lua/elastic.lua | 55 +++++++++++++++++---------------------------- 1 file changed, 20 insertions(+), 35 deletions(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index 368ef5b74..45c5c2edf 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -409,44 +409,29 @@ local function elastic_send_data(flush_all, task, cfg, ev_base) end if nlogs_to_send > 0 then + local http_request = { + url = push_url, + headers = { + ['Host'] = host, + ['Content-Type'] = 'application/x-ndjson', + }, + body = bulk_json, + method = 'post', + callback=http_callback, + gzip = settings.use_gzip, + keepalive = settings.use_keepalive, + no_ssl_verify = settings.no_ssl_verify, + user = settings.user, + password = settings.password, + timeout = settings.timeout, + } if task then - return rspamd_http.request({ - url = push_url, - headers = { - ['Host'] = host, - ['Content-Type'] = 'application/x-ndjson', - }, - body = bulk_json, - task = task, - method = 'post', - callback=http_callback, - gzip = settings.use_gzip, - keepalive = settings.use_keepalive, - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - timeout = settings.timeout, - }) + http_request['task'] = task else - return rspamd_http.request({ - url = push_url, - headers = { - ['Host'] = host, - ['Content-Type'] = 'application/x-ndjson', - }, - body = bulk_json, - ev_base = ev_base, - config = cfg, - method = 'post', - callback=http_callback, - gzip = settings.use_gzip, - keepalive = settings.use_keepalive, - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - timeout = settings.timeout, - }) + http_request['ev_base'] = ev_base + http_request['config'] = cfg end + return rspamd_http.request(http_request) end end -- cgit v1.2.3 From 0a73f356e6af18d1f78534667a4bd298bb08ce19 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 14 Oct 2024 16:16:44 +0200 Subject: round time_diff to seconds on interval check, improve log message readability --- src/plugins/lua/elastic.lua | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index 45c5c2edf..18c29711f 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -658,9 +658,10 @@ local function periodic_send_data(cfg, ev_base) local first_row = buffer['logs']:get(1) if first_row then local time_diff = now - first_row['@timestamp'] - if time_diff > settings.limits.max_interval * 1000 then - rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max interval, diff: %s, current time: %s, log timestamp: %s', - time_diff, now, first_row['@timestamp']) + local time_diff_sec = lua_util.round((time_diff / 1000), 1) + if time_diff_sec > settings.limits.max_interval then + rspamd_logger.infox(rspamd_config, 'flushing buffer for %s by reaching max interval, oldest log in buffer written %s sec ago', + time_diff_sec, first_row['@timestamp']) flush_needed = true else local size = buffer['logs']:size() -- cgit v1.2.3 From 5196d84d421e3e17d7ef4d3a690bb1ba6988a838 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 14 Oct 2024 17:32:16 +0200 Subject: Use received for all logs, fix direction --- src/plugins/lua/elastic.lua | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index 18c29711f..c5e24d773 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -306,6 +306,25 @@ local function handle_error(action,component,limit) return true end +local function get_received_delay(received_headers) + local now = math.floor(rspamd_util.get_time()) + local timestamp = 0 + local delay = 0 + for _, received_header in ipairs(received_headers) do + if received_header['timestamp'] and received_header['timestamp'] > 0 then + timestamp = received_header['timestamp'] + break + end + end + if timestamp > 0 then + delay = now - timestamp + if delay < 0 then + delay = 0 + end + end + return delay +end + local function create_bulk_json(es_index, logs_to_send) local tbl = {} for _, row in pairs(logs_to_send) do @@ -456,9 +475,9 @@ local function get_general_metadata(task) end r.user = user or empty if user then - r.direction = "Inbound" - else r.direction = "Outbound" + else + r.direction = "Inbound" end r.qid = task:get_queue_id() or empty r.helo = task:get_helo() or empty @@ -605,19 +624,7 @@ local function get_general_metadata(task) local fuzzy_hashes = task:get_mempool():get_variable('fuzzy_hashes', 'fstrings') r.fuzzy_hashes = fuzzy_hashes or empty - r.received_delay = 0 - if user then -- calculate received_delay only for incoming traffic - local recieved_hop = 2 - local received_headers = task:get_received_headers() - if received_headers[recieved_hop] then - if received_headers[recieved_hop]['timestamp'] then - r.received_delay = math.floor(rspamd_util.get_time()) - received_headers[recieved_hop]['timestamp'] - if r.received_delay < 0 then - r.received_delay = 0 - end - end - end - end + r.received_delay = get_received_delay(task:get_received_headers()) return r end -- cgit v1.2.3 From a36862365709e1bad71b1612d01ea5e3b47cc5bc Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 14 Oct 2024 19:42:00 +0200 Subject: We need take 2nd hop --- src/plugins/lua/elastic.lua | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index c5e24d773..f35de152b 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -310,8 +310,9 @@ local function get_received_delay(received_headers) local now = math.floor(rspamd_util.get_time()) local timestamp = 0 local delay = 0 - for _, received_header in ipairs(received_headers) do - if received_header['timestamp'] and received_header['timestamp'] > 0 then + for i, received_header in ipairs(received_headers) do + -- skip first received_header as it's own relay + if i > 1 and received_header['timestamp'] and received_header['timestamp'] > 0 then timestamp = received_header['timestamp'] break end -- cgit v1.2.3 From 5f26066f137bc7130b205dba51effa43b42dc0d9 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Tue, 15 Oct 2024 21:31:02 +0200 Subject: do not retry on errors in body as it will create duplicated logs --- conf/modules.d/elastic.conf | 2 +- src/plugins/lua/elastic.lua | 30 ++++++------------------------ 2 files changed, 7 insertions(+), 25 deletions(-) (limited to 'src') diff --git a/conf/modules.d/elastic.conf b/conf/modules.d/elastic.conf index 6255d528f..aafa8cbdf 100644 --- a/conf/modules.d/elastic.conf +++ b/conf/modules.d/elastic.conf @@ -69,7 +69,7 @@ elastic { shrink = false; shards_count = 1; max_gb_per_shard = 0; # zero - disabled by default, if enabled - shards_count is ignored - force_merge = true; + force_merge = false; segments_count = 1; }; cold = { diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index f35de152b..9e3288304 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -116,7 +116,7 @@ local settings = { shrink = false, shards_count = 1, max_gb_per_shard = 0, -- zero - disabled by default, if enabled - shards_count is ignored - force_merge = true, + force_merge = false, segments_count = 1, }, cold = { @@ -369,7 +369,6 @@ local function elastic_send_data(flush_all, task, cfg, ev_base) local function http_callback(err, code, body, _) local push_done = false - local push_errors = false if err then rspamd_logger.errx(log_object, 'cannot send logs to elastic (%s): %s; failed attempts: %s/%s', push_url, err, buffer['errors'], settings['limits']['max_fail']) @@ -378,23 +377,10 @@ local function elastic_send_data(flush_all, task, cfg, ev_base) local res, ucl_err = parser:parse_string(body) if not ucl_err and res then local obj = parser:get_object() - if not obj['errors'] then - push_done = true - rspamd_logger.debugm(N, log_object, 'successfully sent payload with %s logs', nlogs_to_send) - else - push_errors = true - for _, value in pairs(obj['items']) do - if value['index']['status'] >= 400 then - if value['index']['error'] then - if value['index']['error']['type'] and value['index']['error']['reason'] then - rspamd_logger.errx(log_object, - 'cannot send logs to elastic (%s) due to error: %s status, %s type, due to: %s; failed attempts: %s/%s', - push_url, value['index']['status'], value['index']['error']['type'], value['index']['error']['reason'], - buffer['errors'], settings['limits']['max_fail']) - end - end - end - end + push_done = true + rspamd_logger.debugm(N, log_object, 'successfully sent payload with %s logs', nlogs_to_send) + if obj['errors'] then + rspamd_logger.debugm(N, log_object, 'faced errors while pushing logs to elastic (%s): %s', obj['errors']) end else rspamd_logger.errx(log_object, @@ -412,6 +398,7 @@ local function elastic_send_data(flush_all, task, cfg, ev_base) buffer['errors'] = 0 upstream:ok() else + upstream:fail() if buffer['errors'] >= settings['limits']['max_fail'] then rspamd_logger.errx(log_object, 'failed to send %s log lines, failed attempts: %s/%s, removing failed logs from bugger', nlogs_to_send, buffer['errors'], settings['limits']['max_fail']) @@ -420,11 +407,6 @@ local function elastic_send_data(flush_all, task, cfg, ev_base) else buffer['errors'] = buffer['errors'] + 1 end - if push_errors then - upstream:ok() -- we not assume upstream is failed if it return errors in response body - else - upstream:fail() - end end end -- cgit v1.2.3 From 2f9eb903ad049da6a07f35e7515a454b22aa6655 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Tue, 15 Oct 2024 21:40:52 +0200 Subject: Fix from and rcpt --- src/plugins/lua/elastic.lua | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index 9e3288304..e90c70260 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -495,7 +495,7 @@ local function get_general_metadata(task) local rcpt = task:get_recipients('smtp') local l = {} for _, a in ipairs(rcpt) do - table.insert(l, a['addr']) + table.insert(l, a['user'] .. '@' .. a['domain']:lower()) end r.rcpt = l else @@ -507,8 +507,8 @@ local function get_general_metadata(task) if task:has_from('smtp') then local from = task:get_from({ 'smtp', 'orig' })[1] if from then + r.from_user = from['user'] r.from_domain = from['domain']:lower() - r.from_user = from['user']:lower() end end @@ -517,8 +517,8 @@ local function get_general_metadata(task) if task:has_from('mime') then local mime_from = task:get_from({ 'mime', 'orig' })[1] if mime_from then + r.mime_from_user = mime_from['user'] r.mime_from_domain = mime_from['domain']:lower() - r.mime_from_user = mime_from['user']:lower() end end -- cgit v1.2.3 From ffda608b2e73179e3af67f783f84eb9eb31390c3 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Tue, 15 Oct 2024 23:50:48 +0200 Subject: return item status and reasons on bulk push error --- src/plugins/lua/elastic.lua | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index e90c70260..ae7e577f3 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -380,7 +380,17 @@ local function elastic_send_data(flush_all, task, cfg, ev_base) push_done = true rspamd_logger.debugm(N, log_object, 'successfully sent payload with %s logs', nlogs_to_send) if obj['errors'] then - rspamd_logger.debugm(N, log_object, 'faced errors while pushing logs to elastic (%s): %s', obj['errors']) + for _, value in pairs(obj['items']) do + if value['index'] and value['index']['status'] >= 400 then + local status = value['index']['status'] + local index = safe_get(value, 'index', '_index') or '' + local error_type = safe_get(value, 'index', 'error', 'type') or '' + local error_reason = safe_get(value, 'index', 'error', 'reason') or '' + rspamd_logger.warnx(log_object, + 'error while pushing logs to elastic, status: %s, index: %s, type: %s, reason: %s', + status, index, error_type, error_reason) + end + end end else rspamd_logger.errx(log_object, -- cgit v1.2.3 From 69099098799749cf6499cf0a37e04af7fa7731ac Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Thu, 17 Oct 2024 14:42:02 +0200 Subject: fix variables for ucl errors --- src/plugins/lua/elastic.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index ae7e577f3..d4a5fdf85 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -833,12 +833,12 @@ local function get_index_policy(cfg, ev_base, upstream, host, policy_url, index_ end end else - rspamd_logger.errx(rspamd_config, 'failed to parse our index policy for elastic: %s', ucl_err) + rspamd_logger.errx(rspamd_config, 'failed to parse our index policy for elastic: %s', op_ucl_err) upstream:fail() handle_error('parse our', 'index_policy', settings['limits']['max_fail']) end else - rspamd_logger.errx(rspamd_config, 'failed to parse remote index policy from elastic: %s', ucl_err) + rspamd_logger.errx(rspamd_config, 'failed to parse remote index policy from elastic: %s', rp_ucl_err) upstream:fail() handle_error('parse remote', 'index_policy', settings['limits']['max_fail']) end -- cgit v1.2.3 From af7e5e4b3567a93e42e70de33e46e6e67792553a Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Thu, 17 Oct 2024 15:35:02 +0200 Subject: round scores and weight --- src/plugins/lua/elastic.lua | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index d4a5fdf85..c1935b1f0 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -462,15 +462,21 @@ local function get_general_metadata(task) r.symbols = task:get_symbols_all() for _, symbol in ipairs(r.symbols) do symbol.groups = nil -- we don't need groups array in elastic - if type(symbol.options) == "table" then - symbol.options = table.concat(symbol.options, "; ") + if type(symbol.score) == 'number' then + symbol.score = lua_util.round(symbol.score, 3) + end + if type(symbol.weight) == 'number' then + symbol.weight = lua_util.round(symbol.weight, 3) + end + if type(symbol.options) == 'table' then + symbol.options = table.concat(symbol.options, '; ') end end r.user = user or empty if user then - r.direction = "Outbound" + r.direction = 'Outbound' else - r.direction = "Inbound" + r.direction = 'Inbound' end r.qid = task:get_queue_id() or empty r.helo = task:get_helo() or empty -- cgit v1.2.3 From 96f30a7662a6702c75474b04fa46bff0300fba19 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 21 Oct 2024 13:33:48 +0200 Subject: do not allow empty from --- src/plugins/lua/elastic.lua | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index c1935b1f0..ea6d069c2 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -518,23 +518,23 @@ local function get_general_metadata(task) r.rcpt = empty end - r.from_domain = empty r.from_user = empty + r.from_domain = empty if task:has_from('smtp') then local from = task:get_from({ 'smtp', 'orig' })[1] if from then - r.from_user = from['user'] - r.from_domain = from['domain']:lower() + r.from_user = from['user'] or empty + r.from_domain = from['domain']:lower() or empty end end - r.mime_from_domain = empty r.mime_from_user = empty + r.mime_from_domain = empty if task:has_from('mime') then local mime_from = task:get_from({ 'mime', 'orig' })[1] if mime_from then - r.mime_from_user = mime_from['user'] - r.mime_from_domain = mime_from['domain']:lower() + r.mime_from_user = mime_from['user'] or empty + r.mime_from_domain = mime_from['domain']:lower() or empty end end @@ -1267,10 +1267,10 @@ local function configure_index_template(cfg, ev_base) sender_ip = t_ip, message_id = t_text_with_keyword, rcpt = t_text_with_keyword, - from_domain = t_keyword, from_user = t_keyword, - mime_from_domain = t_keyword, + from_domain = t_keyword, mime_from_user = t_keyword, + mime_from_domain = t_keyword, settings_id = t_keyword, asn = asn_obj, scan_time = t_float, -- cgit v1.2.3 From 0945152bc4f99edddc926ed554b6ab6f9ed887a8 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 21 Oct 2024 15:36:03 +0200 Subject: do not allow empty headers, fix stripping of headers by limit of symbols count and add headers count limit --- conf/modules.d/elastic.conf | 3 ++- src/plugins/lua/elastic.lua | 27 +++++++++++++++++++-------- 2 files changed, 21 insertions(+), 9 deletions(-) (limited to 'src') diff --git a/conf/modules.d/elastic.conf b/conf/modules.d/elastic.conf index aafa8cbdf..e4c70bc87 100644 --- a/conf/modules.d/elastic.conf +++ b/conf/modules.d/elastic.conf @@ -47,7 +47,8 @@ elastic { replicas_count = 1; refresh_interval = 5; # seconds dynamic_keyword_ignore_above = 256; - headers_text_ignore_above = 2048; # strip headers value and add "..." to the end; set 0 to disable limit + headers_count_ignore_above = 5; # record only N first same named headers, add "ignored above..." if reached, set 0 to disable limit + headers_text_ignore_above = 2048; # strip specific header value and add "..." to the end; set 0 to disable limit symbols_nested = false; empty_value = "unknown"; # empty numbers, ips and ipnets are not customizable they will be always 0, :: and ::/128 respectively }; diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index ea6d069c2..373733f11 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -94,7 +94,8 @@ local settings = { replicas_count = 1, refresh_interval = 5, -- seconds dynamic_keyword_ignore_above = 256, - headers_text_ignore_above = 2048, -- strip headers value and add '...' to the end, set 0 to disable limit + headers_count_ignore_above = 5, -- record only N first same named headers, add 'ignored above...' if reached, set 0 to disable limit + headers_text_ignore_above = 2048, -- strip specific header value and add '...' to the end, set 0 to disable limit symbols_nested = false, empty_value = 'unknown', -- empty numbers, ips and ipnets are not customizable they will be always 0, :: and ::/128 respectively }, @@ -468,9 +469,6 @@ local function get_general_metadata(task) if type(symbol.weight) == 'number' then symbol.weight = lua_util.round(symbol.weight, 3) end - if type(symbol.options) == 'table' then - symbol.options = table.concat(symbol.options, '; ') - end end r.user = user or empty if user then @@ -564,10 +562,23 @@ local function get_general_metadata(task) if hdr then local l = {} for _, h in ipairs(hdr) do - table.insert(l, h.decoded) - end - if #l > headers_text_ignore_above and headers_text_ignore_above ~= -3 then - l = l:sub(1, headers_text_ignore_above) .. '...' + if settings['index_template']['headers_count_ignore_above'] ~= 0 and + #l >= settings['index_template']['headers_count_ignore_above'] + then + table.insert(l, 'ignored above...') + break + end + local header + if settings['index_template']['headers_text_ignore_above'] ~= 0 and + #h.decoded >= headers_text_ignore_above + then + header = h.decoded:sub(1, headers_text_ignore_above) .. '...' + elseif #h.decoded > 0 then + header = h.decoded + else + header = empty + end + table.insert(l, header) end return l else -- cgit v1.2.3 From 275879825c151abb2b2f65fbe2011f89019dd56e Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 21 Oct 2024 17:22:28 +0200 Subject: strip also port from sender_ip --- src/plugins/lua/elastic.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index 373733f11..e0e941b60 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -491,7 +491,7 @@ local function get_general_metadata(task) r.sender_ip = '::' local origin = task:get_header('X-Originating-IP') if origin then - origin = origin:gsub('%[', ''):gsub('%]', '') + origin = origin:gsub('^%[', ''):gsub('%]:[0-9]+$', ''):gsub('%]$', '') local rspamd_ip = require "rspamd_ip" local origin_ip = rspamd_ip.from_string(origin) if origin_ip and origin_ip:is_valid() then -- cgit v1.2.3 From 49539ea290046901d1110be2138e71b51857415e Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Mon, 21 Oct 2024 18:35:03 +0200 Subject: use rspamd_ip object in sender_ip to avoid posting ipv4:port --- src/plugins/lua/elastic.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index e0e941b60..f525461ae 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -495,7 +495,7 @@ local function get_general_metadata(task) local rspamd_ip = require "rspamd_ip" local origin_ip = rspamd_ip.from_string(origin) if origin_ip and origin_ip:is_valid() then - r.sender_ip = origin -- use string here + r.sender_ip = tostring(origin_ip) end end -- cgit v1.2.3 From 87b9a614a4e35b4dea75c8ec51b071ab13dbccb0 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Tue, 22 Oct 2024 12:02:45 +0200 Subject: better check from to exclude cases when its empty --- src/plugins/lua/elastic.lua | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index f525461ae..28c683333 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -505,13 +505,20 @@ local function get_general_metadata(task) else r.message_id = message_id end + if task:has_recipients('smtp') then local rcpt = task:get_recipients('smtp') local l = {} for _, a in ipairs(rcpt) do - table.insert(l, a['user'] .. '@' .. a['domain']:lower()) + if a['user'] and a['user'] ~= '' and a['domain'] and a['domain'] ~= '' then + table.insert(l, a['user'] .. '@' .. a['domain']:lower()) + end + end + if #l > 0 then + r.rcpt = l + else + r.rcpt = empty end - r.rcpt = l else r.rcpt = empty end @@ -520,9 +527,12 @@ local function get_general_metadata(task) r.from_domain = empty if task:has_from('smtp') then local from = task:get_from({ 'smtp', 'orig' })[1] - if from then - r.from_user = from['user'] or empty - r.from_domain = from['domain']:lower() or empty + if from and + from['user'] and from['user'] ~= '' and + from['domain'] and from['domain'] ~= '' + then + r.from_user = from['user'] + r.from_domain = from['domain']:lower() end end @@ -530,9 +540,12 @@ local function get_general_metadata(task) r.mime_from_domain = empty if task:has_from('mime') then local mime_from = task:get_from({ 'mime', 'orig' })[1] - if mime_from then - r.mime_from_user = mime_from['user'] or empty - r.mime_from_domain = mime_from['domain']:lower() or empty + if mime_from and + mime_from['user'] and mime_from['user'] ~= '' and + mime_from['domain'] and mime_from['domain'] ~= '' + then + r.mime_from_user = mime_from['user'] + r.mime_from_domain = mime_from['domain']:lower() end end @@ -559,7 +572,7 @@ local function get_general_metadata(task) local function process_header(name) local hdr = task:get_header_full(name) local headers_text_ignore_above = settings['index_template']['headers_text_ignore_above'] - 3 - if hdr then + if hdr and #hdr > 0 then local l = {} for _, h in ipairs(hdr) do if settings['index_template']['headers_count_ignore_above'] ~= 0 and -- cgit v1.2.3 From f7ba1730b0579ebb0b3a41280f6d64b5e02c09e3 Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Sun, 3 Nov 2024 16:00:11 +0100 Subject: * add more validation on empty strings, required to not face errors in saving logs to elastic * remove max_size as it was looking to rows elements count, not strings size in total, such check will be too much compute intensive * increase default errors max_fail as usually elastic not recover so quickly and needs a bit more time --- conf/modules.d/elastic.conf | 5 ++- src/plugins/lua/elastic.lua | 88 +++++++++++++++++++++++++++++---------------- 2 files changed, 60 insertions(+), 33 deletions(-) (limited to 'src') diff --git a/conf/modules.d/elastic.conf b/conf/modules.d/elastic.conf index e4c70bc87..f815bc61d 100644 --- a/conf/modules.d/elastic.conf +++ b/conf/modules.d/elastic.conf @@ -25,7 +25,7 @@ elastic { use_keepalive = true; version = { autodetect_enabled = true; - autodetect_max_fail = 12; + autodetect_max_fail = 30; # override works only if autodetect is disabled override = { name = "opensearch"; @@ -35,8 +35,7 @@ elastic { limits = { max_rows = 500; # max logs in one bulk req to elastic and first reason to flush buffer to elastic max_interval = 60; # seconds, if first log in buffer older then interval - flush buffer - max_size = 5000000; # max symbols count in buffer, if reached - flush buffer, f.e: 5000000 ~= 10MB/normal-worker - max_fail = 3; + max_fail = 10; }; index_template = { managed = true; diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index 28c683333..d76e0d723 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -72,7 +72,7 @@ local settings = { enabled = true, version = { autodetect_enabled = true, - autodetect_max_fail = 12, + autodetect_max_fail = 30, -- override works only if autodetect is disabled override = { name = 'opensearch', @@ -82,8 +82,7 @@ local settings = { limits = { max_rows = 500, -- max logs in one bulk req to elastic and first reason to flush buffer max_interval = 60, -- seconds, if first log in buffer older then interval - flush buffer - max_size = 5000000, -- max symbols count in buffer, if reached - flush buffer, f.e: 5000000 ~= 10MB/normal-worker - max_fail = 3, + max_fail = 15, }, index_template = { managed = true, @@ -179,17 +178,6 @@ function Queue:length() return self.last - self.first + 1 end -function Queue:size() - local size = 0 - for i = self.first, self.last do - local row = self.data[i] - if row ~= nil then - size = size + #row - end - end - return size -end - function Queue:get(index) local real_index = self.first + index - 1 if real_index <= self.last then @@ -327,6 +315,52 @@ local function get_received_delay(received_headers) return delay end +local function is_empty(str) + -- define a pattern that includes invisible unicode characters + local str_cleared = str:gsub('[' .. + '\xC2\xA0' .. -- U+00A0 non-breaking space + '\xE2\x80\x8B' .. -- U+200B zero width space + '\xEF\xBB\xBF' .. -- U+FEFF byte order mark (zero width no-break space) + '\xE2\x80\x8C' .. -- U+200C zero width non-joiner + '\xE2\x80\x8D' .. -- U+200D zero width joiner + '\xE2\x80\x8E' .. -- U+200E left-to-right mark + '\xE2\x80\x8F' .. -- U+200F right-to-left mark + '\xE2\x81\xA0' .. -- U+2060 word joiner + '\xE2\x80\xAA' .. -- U+202A left-to-right embedding + '\xE2\x80\xAB' .. -- U+202B right-to-left embedding + '\xE2\x80\xAC' .. -- U+202C pop directional formatting + '\xE2\x80\xAD' .. -- U+202D left-to-right override + '\xE2\x80\xAE' .. -- U+202E right-to-left override + '\xE2\x81\x9F' .. -- U+2061 function application + '\xE2\x81\xA1' .. -- U+2061 invisible separator + '\xE2\x81\xA2' .. -- U+2062 invisible times + '\xE2\x81\xA3' .. -- U+2063 invisible separator + '\xE2\x81\xA4' .. -- U+2064 invisible plus + ']', '') -- gsub replaces all matched characters with an empty string + if str_cleared:match('[%S]') then + return false + else + return true + end +end + +local function fill_empty_strings(tbl, empty_value) + local filled_tbl = {} + for key, value in pairs(tbl) do + if value and type(value) == 'table' then + local nested_filtered = fill_empty_strings(value, empty_value) + if next(nested_filtered) ~= nil then + filled_tbl[key] = nested_filtered + end + elseif value and type(value) == 'string' and is_empty(value) then + filled_tbl[key] = empty_value + elseif value then + filled_tbl[key] = value + end + end + return filled_tbl +end + local function create_bulk_json(es_index, logs_to_send) local tbl = {} for _, row in pairs(logs_to_send) do @@ -483,7 +517,7 @@ local function get_general_metadata(task) r.ip = '::' r.is_local = false local ip_addr = task:get_ip() - if ip_addr and ip_addr:is_valid() then + if ip_addr and ip_addr:is_valid() and tostring(ip_addr) ~= '...' then r.is_local = ip_addr:is_local() r.ip = tostring(ip_addr) end @@ -494,7 +528,7 @@ local function get_general_metadata(task) origin = origin:gsub('^%[', ''):gsub('%]:[0-9]+$', ''):gsub('%]$', '') local rspamd_ip = require "rspamd_ip" local origin_ip = rspamd_ip.from_string(origin) - if origin_ip and origin_ip:is_valid() then + if origin_ip and origin_ip:is_valid() and tostring(origin_ip) ~= '...' then r.sender_ip = tostring(origin_ip) end end @@ -510,7 +544,7 @@ local function get_general_metadata(task) local rcpt = task:get_recipients('smtp') local l = {} for _, a in ipairs(rcpt) do - if a['user'] and a['user'] ~= '' and a['domain'] and a['domain'] ~= '' then + if a['user'] and #a['user'] > 0 and a['domain'] and #a['domain'] > 0 then table.insert(l, a['user'] .. '@' .. a['domain']:lower()) end end @@ -528,8 +562,8 @@ local function get_general_metadata(task) if task:has_from('smtp') then local from = task:get_from({ 'smtp', 'orig' })[1] if from and - from['user'] and from['user'] ~= '' and - from['domain'] and from['domain'] ~= '' + from['user'] and #from['user'] > 0 and + from['domain'] and #from['domain'] > 0 then r.from_user = from['user'] r.from_domain = from['domain']:lower() @@ -541,8 +575,8 @@ local function get_general_metadata(task) if task:has_from('mime') then local mime_from = task:get_from({ 'mime', 'orig' })[1] if mime_from and - mime_from['user'] and mime_from['user'] ~= '' and - mime_from['domain'] and mime_from['domain'] ~= '' + mime_from['user'] and #mime_from['user'] > 0 and + mime_from['domain'] and #mime_from['domain'] > 0 then r.mime_from_user = mime_from['user'] r.mime_from_domain = mime_from['domain']:lower() @@ -583,10 +617,10 @@ local function get_general_metadata(task) end local header if settings['index_template']['headers_text_ignore_above'] ~= 0 and - #h.decoded >= headers_text_ignore_above + h.decoded and #h.decoded >= headers_text_ignore_above then header = h.decoded:sub(1, headers_text_ignore_above) .. '...' - elseif #h.decoded > 0 then + elseif h.decoded and #h.decoded > 0 then header = h.decoded else header = empty @@ -649,7 +683,7 @@ local function get_general_metadata(task) r.received_delay = get_received_delay(task:get_received_headers()) - return r + return fill_empty_strings(r, empty) end local function elastic_collect(task) @@ -693,12 +727,6 @@ local function periodic_send_data(cfg, ev_base) rspamd_logger.infox(rspamd_config, 'flushing buffer for %s by reaching max interval, oldest log in buffer written %s sec ago', time_diff_sec, first_row['@timestamp']) flush_needed = true - else - local size = buffer['logs']:size() - if size >= settings['limits']['max_size'] then - rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max size: %s/%s', size, settings['limits']['max_size']) - flush_needed = true - end end end end -- cgit v1.2.3 From 53cc23ffc2b15045b72980181811914394393b4f Mon Sep 17 00:00:00 2001 From: Dmitriy Alekseev <1865999+dragoangel@users.noreply.github.com> Date: Fri, 8 Nov 2024 12:30:16 +0100 Subject: Fix missing non_en false due to fill_empty_strings function --- src/plugins/lua/elastic.lua | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src') diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index d76e0d723..f3eb3cc4f 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -352,6 +352,8 @@ local function fill_empty_strings(tbl, empty_value) if next(nested_filtered) ~= nil then filled_tbl[key] = nested_filtered end + elseif type(value) == 'boolean' then + filled_tbl[key] = value elseif value and type(value) == 'string' and is_empty(value) then filled_tbl[key] = empty_value elseif value then -- cgit v1.2.3