aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/lua/elastic.lua
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/lua/elastic.lua')
-rw-r--r--src/plugins/lua/elastic.lua1768
1 files changed, 1436 insertions, 332 deletions
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua
index ccbb7c198..f3eb3cc4f 100644
--- a/src/plugins/lua/elastic.lua
+++ b/src/plugins/lua/elastic.lua
@@ -18,219 +18,637 @@ limitations under the License.
local rspamd_logger = require 'rspamd_logger'
local rspamd_http = require "rspamd_http"
local lua_util = require "lua_util"
-local util = require "rspamd_util"
+local rspamd_util = require "rspamd_util"
local ucl = require "ucl"
-local rspamd_redis = require "lua_redis"
local upstream_list = require "rspamd_upstream_list"
-local lua_settings = require "lua_settings"
if confighelp then
return
end
-local rows = {}
-local nrows = 0
-local failed_sends = 0
-local elastic_template
-local redis_params
local N = "elastic"
-local E = {}
-local HOSTNAME = util.get_hostname()
local connect_prefix = 'http://'
-local enabled = true
-local ingest_geoip_type = 'plugins'
+local rspamd_hostname = rspamd_util.get_hostname()
+-- supported_distro:
+-- from - minimal compatible version supported, including
+-- till & till_unknown = true - yet unreleased version, so unknown compitability, excluding
+-- hill & till_unknown = false - version is known to be not yet compatible with this module
+local supported_distro = {
+ elastic = {
+ from = '7.8',
+ till = '9',
+ till_unknown = true,
+ },
+ opensearch = {
+ from = '1',
+ till = '3',
+ till_unknown = true,
+ },
+}
+local detected_distro = {
+ name = nil,
+ version = nil,
+ supported = false,
+}
+local states = {
+ distro = {
+ configured = false,
+ errors = 0,
+ },
+ index_template = {
+ configured = false,
+ errors = 0,
+ },
+ index_policy = {
+ configured = false,
+ errors = 0,
+ },
+ geoip_pipeline = {
+ configured = false,
+ errors = 0,
+ },
+}
local settings = {
- limit = 500,
- index_pattern = 'rspamd-%Y.%m.%d',
- template_file = rspamd_paths['SHAREDIR'] .. '/elastic/rspamd_template.json',
- kibana_file = rspamd_paths['SHAREDIR'] .. '/elastic/kibana.json',
- key_prefix = 'elastic-',
- expire = 3600,
+ enabled = true,
+ version = {
+ autodetect_enabled = true,
+ autodetect_max_fail = 30,
+ -- override works only if autodetect is disabled
+ override = {
+ name = 'opensearch',
+ version = '2.17',
+ }
+ },
+ limits = {
+ max_rows = 500, -- max logs in one bulk req to elastic and first reason to flush buffer
+ max_interval = 60, -- seconds, if first log in buffer older then interval - flush buffer
+ max_fail = 15,
+ },
+ index_template = {
+ managed = true,
+ name = 'rspamd',
+ pattern = '{service}-%Y.%m.%d',
+ priority = 0,
+ shards_count = 3,
+ replicas_count = 1,
+ refresh_interval = 5, -- seconds
+ dynamic_keyword_ignore_above = 256,
+ headers_count_ignore_above = 5, -- record only N first same named headers, add 'ignored above...' if reached, set 0 to disable limit
+ headers_text_ignore_above = 2048, -- strip specific header value and add '...' to the end, set 0 to disable limit
+ symbols_nested = false,
+ empty_value = 'unknown', -- empty numbers, ips and ipnets are not customizable they will be always 0, :: and ::/128 respectively
+ },
+ index_policy = {
+ enabled = true,
+ managed = true,
+ name = 'rspamd', -- if you want use custom lifecycle policy, change name and set managed = false
+ hot = {
+ index_priority = 100,
+ },
+ warm = {
+ enabled = true,
+ after = '2d',
+ index_priority = 50,
+ migrate = true, -- only supported with elastic distro, will not have impact elsewhere
+ read_only = true,
+ change_replicas = false,
+ replicas_count = 1,
+ shrink = false,
+ shards_count = 1,
+ max_gb_per_shard = 0, -- zero - disabled by default, if enabled - shards_count is ignored
+ force_merge = false,
+ segments_count = 1,
+ },
+ cold = {
+ enabled = true,
+ after = '14d',
+ index_priority = 0,
+ migrate = true, -- only supported with elastic distro, will not have impact elsewhere
+ read_only = true,
+ change_replicas = false,
+ replicas_count = 1,
+ },
+ delete = {
+ enabled = true,
+ after = '30d',
+ },
+ },
+ collect_headers = {
+ 'From',
+ 'To',
+ 'Subject',
+ 'Date',
+ 'User-Agent',
+ },
+ extra_collect_headers = {
+ -- 'List-Id',
+ -- 'X-Mailer',
+ },
+ geoip = {
+ enabled = true,
+ managed = true,
+ pipeline_name = 'rspamd-geoip',
+ },
+ periodic_interval = 5.0,
timeout = 5.0,
- failover = false,
- import_kibana = false,
use_https = false,
+ no_ssl_verify = false,
use_gzip = true,
+ use_keepalive = true,
allow_local = false,
user = nil,
password = nil,
- no_ssl_verify = false,
- max_fail = 3,
- ingest_module = false,
- elasticsearch_version = 6,
}
-local function read_file(path)
- local file = io.open(path, "rb")
- if not file then
+local Queue = {}
+Queue.__index = Queue
+
+function Queue:new()
+ local obj = {first = 1, last = 0, data = {}}
+ setmetatable(obj, self)
+ return obj
+end
+
+function Queue:push(value)
+ self.last = self.last + 1
+ self.data[self.last] = value
+end
+
+function Queue:length()
+ return self.last - self.first + 1
+end
+
+function Queue:get(index)
+ local real_index = self.first + index - 1
+ if real_index <= self.last then
+ return self.data[real_index]
+ else
return nil
end
- local content = file:read "*a"
- file:close()
- return content
end
-local function elastic_send_data(task)
- local es_index = os.date(settings['index_pattern'])
+function Queue:get_all()
+ local items = {}
+ for i = self.first, self.last do
+ table.insert(items, self.data[i])
+ end
+ return items
+end
+
+function Queue:pop()
+ if self.first > self.last then
+ return nil
+ end
+ local value = self.data[self.first]
+ self.data[self.first] = nil
+ self.first = self.first + 1
+ return value
+end
+
+function Queue:get_first(count)
+ local items = {}
+ count = count or self:length()
+ local actual_end = math.min(self.first + count - 1, self.last)
+ for i = self.first, actual_end do
+ table.insert(items, self.data[i])
+ end
+ return items
+end
+
+function Queue:pop_first(count)
+ local popped_items = {}
+ count = count or self:length()
+ local actual_count = math.min(count, self:length())
+ local n = 0
+ while n < actual_count do
+ local item = self:pop()
+ table.insert(popped_items, item)
+ n = n + 1
+ end
+ return popped_items
+end
+
+local buffer = {
+ logs = Queue:new(),
+ errors = 0,
+}
+
+local function contains(tbl, val)
+ for i=1,#tbl do
+ if tbl[i]:lower() == val:lower() then
+ return true
+ end
+ end
+ return false
+end
+
+local function safe_get(table, ...)
+ local value = table
+ for _, key in ipairs({...}) do
+ if value[key] == nil then
+ return nil
+ end
+ value = value[key]
+ end
+ return value
+end
+
+local function compare_versions(v1, v2)
+ -- helper function to extract the numeric version string
+ local function extract_numeric_version(version)
+ -- remove any trailing characters that are not digits or dots
+ version = version:match("^([%.%d]+)")
+ local parts = {}
+ for part in string.gmatch(version or "", '([^.]+)') do
+ table.insert(parts, tonumber(part) or 0)
+ end
+ return parts
+ end
+
+ local v1_parts = extract_numeric_version(v1)
+ local v2_parts = extract_numeric_version(v2)
+ local max_length = math.max(#v1_parts, #v2_parts)
+
+ -- compare each part of the version strings
+ for i = 1, max_length do
+ local num1 = v1_parts[i] or 0
+ local num2 = v2_parts[i] or 0
+
+ if num1 > num2 then
+ return 1 -- v1 is greater than v2
+ elseif num1 < num2 then
+ return -1 -- v1 is less than v2
+ end
+ -- if equal, continue to the next part
+ end
+ return 0 -- versions are equal
+end
+
+local function handle_error(action,component,limit)
+ if states[component]['errors'] >= limit then
+ rspamd_logger.errx(rspamd_config, 'cannot %s elastic %s, failed attempts: %s/%s, stop trying',
+ action, component:gsub('_', ' '), states[component]['errors'], limit)
+ states[component]['configured'] = true
+ else
+ states[component]['errors'] = states[component]['errors'] + 1
+ end
+ return true
+end
+
+local function get_received_delay(received_headers)
+ local now = math.floor(rspamd_util.get_time())
+ local timestamp = 0
+ local delay = 0
+ for i, received_header in ipairs(received_headers) do
+ -- skip first received_header as it's own relay
+ if i > 1 and received_header['timestamp'] and received_header['timestamp'] > 0 then
+ timestamp = received_header['timestamp']
+ break
+ end
+ end
+ if timestamp > 0 then
+ delay = now - timestamp
+ if delay < 0 then
+ delay = 0
+ end
+ end
+ return delay
+end
+
+local function is_empty(str)
+ -- define a pattern that includes invisible unicode characters
+ local str_cleared = str:gsub('[' ..
+ '\xC2\xA0' .. -- U+00A0 non-breaking space
+ '\xE2\x80\x8B' .. -- U+200B zero width space
+ '\xEF\xBB\xBF' .. -- U+FEFF byte order mark (zero width no-break space)
+ '\xE2\x80\x8C' .. -- U+200C zero width non-joiner
+ '\xE2\x80\x8D' .. -- U+200D zero width joiner
+ '\xE2\x80\x8E' .. -- U+200E left-to-right mark
+ '\xE2\x80\x8F' .. -- U+200F right-to-left mark
+ '\xE2\x81\xA0' .. -- U+2060 word joiner
+ '\xE2\x80\xAA' .. -- U+202A left-to-right embedding
+ '\xE2\x80\xAB' .. -- U+202B right-to-left embedding
+ '\xE2\x80\xAC' .. -- U+202C pop directional formatting
+ '\xE2\x80\xAD' .. -- U+202D left-to-right override
+ '\xE2\x80\xAE' .. -- U+202E right-to-left override
+ '\xE2\x81\x9F' .. -- U+2061 function application
+ '\xE2\x81\xA1' .. -- U+2061 invisible separator
+ '\xE2\x81\xA2' .. -- U+2062 invisible times
+ '\xE2\x81\xA3' .. -- U+2063 invisible separator
+ '\xE2\x81\xA4' .. -- U+2064 invisible plus
+ ']', '') -- gsub replaces all matched characters with an empty string
+ if str_cleared:match('[%S]') then
+ return false
+ else
+ return true
+ end
+end
+
+local function fill_empty_strings(tbl, empty_value)
+ local filled_tbl = {}
+ for key, value in pairs(tbl) do
+ if value and type(value) == 'table' then
+ local nested_filtered = fill_empty_strings(value, empty_value)
+ if next(nested_filtered) ~= nil then
+ filled_tbl[key] = nested_filtered
+ end
+ elseif type(value) == 'boolean' then
+ filled_tbl[key] = value
+ elseif value and type(value) == 'string' and is_empty(value) then
+ filled_tbl[key] = empty_value
+ elseif value then
+ filled_tbl[key] = value
+ end
+ end
+ return filled_tbl
+end
+
+local function create_bulk_json(es_index, logs_to_send)
local tbl = {}
- for _, value in pairs(rows) do
- if settings.elasticsearch_version >= 7 then
- table.insert(tbl, '{ "index" : { "_index" : "' .. es_index ..
- '","pipeline": "rspamd-geoip"} }')
- else
- table.insert(tbl, '{ "index" : { "_index" : "' .. es_index ..
- '", "_type" : "_doc" ,"pipeline": "rspamd-geoip"} }')
+ for _, row in pairs(logs_to_send) do
+ local pipeline = ''
+ if settings['geoip']['enabled']then
+ pipeline = ',"pipeline":"'.. settings['geoip']['pipeline_name'] .. '"'
end
- table.insert(tbl, ucl.to_format(value, 'json-compact'))
+ table.insert(tbl, '{"index":{"_index":"' .. es_index .. '"' .. pipeline .. '}}')
+ table.insert(tbl, ucl.to_format(row, 'json-compact'))
end
+ table.insert(tbl, '') -- for last \n
+ return table.concat(tbl, "\n")
+end
- table.insert(tbl, '') -- For last \n
+local function elastic_send_data(flush_all, task, cfg, ev_base)
+ local log_object = task or rspamd_config
+ local nlogs_to_send = 0
+ local es_index
+ local upstream
+ local host
+ local push_url
+ local bulk_json
+ local logs_to_send
+ if flush_all then
+ logs_to_send = buffer['logs']:get_all()
+ else
+ logs_to_send = buffer['logs']:get_first(settings['limits']['max_rows'])
+ end
+ nlogs_to_send = #logs_to_send -- actual size can be lower then max_rows
+ if nlogs_to_send > 0 then
+ es_index = settings['index_template']['name'] .. '-' .. os.date(settings['index_template']['pattern'])
- local upstream = settings.upstream:get_upstream_round_robin()
- local ip_addr = upstream:get_addr():to_string(true)
+ upstream = settings.upstream:get_upstream_round_robin()
+ host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
+ local ip_addr = upstream:get_addr():to_string(true)
+ push_url = connect_prefix .. ip_addr .. '/' .. es_index .. '/_bulk'
- local push_url = connect_prefix .. ip_addr .. '/' .. es_index .. '/_bulk'
- local bulk_json = table.concat(tbl, "\n")
+ bulk_json = create_bulk_json(es_index, logs_to_send)
+ rspamd_logger.debugm(N, log_object, 'successfully composed payload with %s log lines', nlogs_to_send)
+ end
- local function http_callback(err, code, _, _)
+ local function http_callback(err, code, body, _)
+ local push_done = false
if err then
- rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s; failed attempts: %s/%s",
- push_url, err, failed_sends, settings.max_fail)
+ rspamd_logger.errx(log_object, 'cannot send logs to elastic (%s): %s; failed attempts: %s/%s',
+ push_url, err, buffer['errors'], settings['limits']['max_fail'])
+ elseif code == 200 then
+ local parser = ucl.parser()
+ local res, ucl_err = parser:parse_string(body)
+ if not ucl_err and res then
+ local obj = parser:get_object()
+ push_done = true
+ rspamd_logger.debugm(N, log_object, 'successfully sent payload with %s logs', nlogs_to_send)
+ if obj['errors'] then
+ for _, value in pairs(obj['items']) do
+ if value['index'] and value['index']['status'] >= 400 then
+ local status = value['index']['status']
+ local index = safe_get(value, 'index', '_index') or ''
+ local error_type = safe_get(value, 'index', 'error', 'type') or ''
+ local error_reason = safe_get(value, 'index', 'error', 'reason') or ''
+ rspamd_logger.warnx(log_object,
+ 'error while pushing logs to elastic, status: %s, index: %s, type: %s, reason: %s',
+ status, index, error_type, error_reason)
+ end
+ end
+ end
+ else
+ rspamd_logger.errx(log_object,
+ 'cannot parse response from elastic (%s): %s; failed attempts: %s/%s',
+ push_url, ucl_err, buffer['errors'], settings['limits']['max_fail'])
+ end
else
- if code ~= 200 then
- rspamd_logger.infox(task,
- "cannot push data to elastic backend (%s): wrong http code %s (%s); failed attempts: %s/%s",
- push_url, err, code, failed_sends, settings.max_fail)
+ rspamd_logger.errx(log_object,
+ 'cannot send logs to elastic (%s) due to bad http status code: %s, response: %s; failed attempts: %s/%s',
+ push_url, code, body, buffer['errors'], settings['limits']['max_fail'])
+ end
+ -- proccess results
+ if push_done then
+ buffer['logs']:pop_first(nlogs_to_send)
+ buffer['errors'] = 0
+ upstream:ok()
+ else
+ upstream:fail()
+ if buffer['errors'] >= settings['limits']['max_fail'] then
+ rspamd_logger.errx(log_object, 'failed to send %s log lines, failed attempts: %s/%s, removing failed logs from bugger',
+ nlogs_to_send, buffer['errors'], settings['limits']['max_fail'])
+ buffer['logs']:pop_first(nlogs_to_send)
+ buffer['errors'] = 0
else
- lua_util.debugm(N, task, "successfully sent %s (%s bytes) rows to ES",
- nrows, #bulk_json)
+ buffer['errors'] = buffer['errors'] + 1
end
end
end
- return rspamd_http.request({
- url = push_url,
- headers = {
- ['Content-Type'] = 'application/x-ndjson',
- },
- body = bulk_json,
- callback = http_callback,
- task = task,
- method = 'post',
- gzip = settings.use_gzip,
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- timeout = settings.timeout,
- })
+ if nlogs_to_send > 0 then
+ local http_request = {
+ url = push_url,
+ headers = {
+ ['Host'] = host,
+ ['Content-Type'] = 'application/x-ndjson',
+ },
+ body = bulk_json,
+ method = 'post',
+ callback=http_callback,
+ gzip = settings.use_gzip,
+ keepalive = settings.use_keepalive,
+ no_ssl_verify = settings.no_ssl_verify,
+ user = settings.user,
+ password = settings.password,
+ timeout = settings.timeout,
+ }
+ if task then
+ http_request['task'] = task
+ else
+ http_request['ev_base'] = ev_base
+ http_request['config'] = cfg
+ end
+ return rspamd_http.request(http_request)
+ end
+end
+
+local function get_header_name(name)
+ return 'header_' .. name:lower():gsub('[%s%-]', '_')
end
local function get_general_metadata(task)
local r = {}
- local ip_addr = task:get_ip()
+ local empty = settings['index_template']['empty_value']
+ local user = task:get_user()
+ r.rspamd_server = rspamd_hostname or empty
+
+ r.action = task:get_metric_action() or empty
+ r.score = task:get_metric_score()[1] or 0
+ r.symbols = task:get_symbols_all()
+ for _, symbol in ipairs(r.symbols) do
+ symbol.groups = nil -- we don't need groups array in elastic
+ if type(symbol.score) == 'number' then
+ symbol.score = lua_util.round(symbol.score, 3)
+ end
+ if type(symbol.weight) == 'number' then
+ symbol.weight = lua_util.round(symbol.weight, 3)
+ end
+ end
+ r.user = user or empty
+ if user then
+ r.direction = 'Outbound'
+ else
+ r.direction = 'Inbound'
+ end
+ r.qid = task:get_queue_id() or empty
+ r.helo = task:get_helo() or empty
+ r.hostname = task:get_hostname() or empty
- if ip_addr and ip_addr:is_valid() then
+ r.ip = '::'
+ r.is_local = false
+ local ip_addr = task:get_ip()
+ if ip_addr and ip_addr:is_valid() and tostring(ip_addr) ~= '...' then
r.is_local = ip_addr:is_local()
r.ip = tostring(ip_addr)
- else
- r.ip = '127.0.0.1'
end
- r.webmail = false
- r.sender_ip = 'unknown'
+ r.sender_ip = '::'
local origin = task:get_header('X-Originating-IP')
if origin then
- origin = origin:gsub('%[', ''):gsub('%]', '')
+ origin = origin:gsub('^%[', ''):gsub('%]:[0-9]+$', ''):gsub('%]$', '')
local rspamd_ip = require "rspamd_ip"
local origin_ip = rspamd_ip.from_string(origin)
- if origin_ip and origin_ip:is_valid() then
- r.webmail = true
- r.sender_ip = origin -- use string here
+ if origin_ip and origin_ip:is_valid() and tostring(origin_ip) ~= '...' then
+ r.sender_ip = tostring(origin_ip)
end
end
- r.direction = "Inbound"
- r.user = task:get_user() or 'unknown'
- r.qid = task:get_queue_id() or 'unknown'
- r.action = task:get_metric_action()
- r.rspamd_server = HOSTNAME
- if r.user ~= 'unknown' then
- r.direction = "Outbound"
+ local message_id = task:get_message_id()
+ if message_id == 'undef' then
+ r.message_id = empty
+ else
+ r.message_id = message_id
end
- local s = task:get_metric_score()[1]
- r.score = s
- local rcpt = task:get_recipients('smtp')
- if rcpt then
+ if task:has_recipients('smtp') then
+ local rcpt = task:get_recipients('smtp')
local l = {}
for _, a in ipairs(rcpt) do
- table.insert(l, a['addr'])
+ if a['user'] and #a['user'] > 0 and a['domain'] and #a['domain'] > 0 then
+ table.insert(l, a['user'] .. '@' .. a['domain']:lower())
+ end
+ end
+ if #l > 0 then
+ r.rcpt = l
+ else
+ r.rcpt = empty
end
- r.rcpt = l
else
- r.rcpt = 'unknown'
+ r.rcpt = empty
end
- local from = task:get_from { 'smtp', 'orig' }
- if ((from or E)[1] or E).addr then
- r.from = from[1].addr
- else
- r.from = 'unknown'
+ r.from_user = empty
+ r.from_domain = empty
+ if task:has_from('smtp') then
+ local from = task:get_from({ 'smtp', 'orig' })[1]
+ if from and
+ from['user'] and #from['user'] > 0 and
+ from['domain'] and #from['domain'] > 0
+ then
+ r.from_user = from['user']
+ r.from_domain = from['domain']:lower()
+ end
end
- local mime_from = task:get_from { 'mime', 'orig' }
- if ((mime_from or E)[1] or E).addr then
- r.mime_from = mime_from[1].addr
- else
- r.mime_from = 'unknown'
+ r.mime_from_user = empty
+ r.mime_from_domain = empty
+ if task:has_from('mime') then
+ local mime_from = task:get_from({ 'mime', 'orig' })[1]
+ if mime_from and
+ mime_from['user'] and #mime_from['user'] > 0 and
+ mime_from['domain'] and #mime_from['domain'] > 0
+ then
+ r.mime_from_user = mime_from['user']
+ r.mime_from_domain = mime_from['domain']:lower()
+ end
+ end
+
+ local settings_id = task:get_settings_id()
+ if settings_id then
+ -- Convert to string
+ local lua_settings = require "lua_settings"
+ settings_id = lua_settings.settings_by_id(settings_id)
+ if settings_id then
+ settings_id = settings_id.name
+ end
+ end
+ if not settings_id then
+ settings_id = empty
end
+ r.settings_id = settings_id
- local syminf = task:get_symbols_all()
- r.symbols = syminf
r.asn = {}
local pool = task:get_mempool()
- r.asn.country = pool:get_variable("country") or 'unknown'
+ r.asn.country = pool:get_variable("country") or empty
r.asn.asn = pool:get_variable("asn") or 0
- r.asn.ipnet = pool:get_variable("ipnet") or 'unknown'
+ r.asn.ipnet = pool:get_variable("ipnet") or '::/128'
local function process_header(name)
local hdr = task:get_header_full(name)
- if hdr then
+ local headers_text_ignore_above = settings['index_template']['headers_text_ignore_above'] - 3
+ if hdr and #hdr > 0 then
local l = {}
for _, h in ipairs(hdr) do
- table.insert(l, h.decoded)
+ if settings['index_template']['headers_count_ignore_above'] ~= 0 and
+ #l >= settings['index_template']['headers_count_ignore_above']
+ then
+ table.insert(l, 'ignored above...')
+ break
+ end
+ local header
+ if settings['index_template']['headers_text_ignore_above'] ~= 0 and
+ h.decoded and #h.decoded >= headers_text_ignore_above
+ then
+ header = h.decoded:sub(1, headers_text_ignore_above) .. '...'
+ elseif h.decoded and #h.decoded > 0 then
+ header = h.decoded
+ else
+ header = empty
+ end
+ table.insert(l, header)
end
return l
else
- return 'unknown'
+ return empty
end
end
- r.header_from = process_header('from')
- r.header_to = process_header('to')
- r.header_subject = process_header('subject')
- r.header_date = process_header('date')
- r.message_id = task:get_message_id()
- local hname = task:get_hostname() or 'unknown'
- r.hostname = hname
-
- local settings_id = task:get_settings_id()
-
- if settings_id then
- -- Convert to string
- settings_id = lua_settings.settings_by_id(settings_id)
-
- if settings_id then
- settings_id = settings_id.name
+ for _, header in ipairs(settings['collect_headers']) do
+ local header_name = get_header_name(header)
+ if not r[header_name] then
+ r[header_name] = process_header(header)
end
end
- if not settings_id then
- settings_id = ''
+ for _, header in ipairs(settings['extra_collect_headers']) do
+ local header_name = get_header_name(header)
+ if not r[header_name] then
+ r[header_name] = process_header(header)
+ end
end
- r.settings_id = settings_id
-
local scan_real = task:get_scan_time()
scan_real = math.floor(scan_real * 1000)
if scan_real < 0 then
@@ -239,91 +657,272 @@ local function get_general_metadata(task)
scan_real)
scan_real = 0
end
-
r.scan_time = scan_real
- return r
+ local parts = task:get_text_parts()
+ local lang_t = {}
+ if parts then
+ for _, part in ipairs(parts) do
+ local l = part:get_language()
+ if l and not contains(lang_t, l) then
+ table.insert(lang_t, l)
+ end
+ end
+ if #lang_t > 0 then
+ r.language = lang_t
+ else
+ r.language = empty
+ end
+ if #lang_t == 1 and lang_t[1] == 'en' then
+ r.non_en = false
+ else
+ r.non_en = true
+ end
+ end
+
+ local fuzzy_hashes = task:get_mempool():get_variable('fuzzy_hashes', 'fstrings')
+ r.fuzzy_hashes = fuzzy_hashes or empty
+
+ r.received_delay = get_received_delay(task:get_received_headers())
+
+ return fill_empty_strings(r, empty)
end
local function elastic_collect(task)
- if not enabled then
- return
- end
if task:has_flag('skip') then
return
end
+
if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then
return
end
- local row = { ['rspamd_meta'] = get_general_metadata(task),
- ['@timestamp'] = tostring(util.get_time() * 1000) }
- table.insert(rows, row)
- nrows = nrows + 1
- if nrows > settings['limit'] then
- lua_util.debugm(N, task, 'send elastic search rows: %s', nrows)
- if elastic_send_data(task) then
- nrows = 0
- rows = {}
- failed_sends = 0;
- else
- failed_sends = failed_sends + 1
-
- if failed_sends > settings.max_fail then
- rspamd_logger.errx(task, 'cannot send %s rows to ES %s times, stop trying',
- nrows, failed_sends)
- nrows = 0
- rows = {}
- failed_sends = 0;
- end
+ if not detected_distro['supported'] then
+ if buffer['logs']:length() >= settings['limits']['max_rows'] then
+ buffer['logs']:pop_first(settings['limits']['max_rows'])
+ rspamd_logger.errx(task,
+ 'elastic distro not supported, deleting %s logs from buffer due to reaching max rows limit',
+ settings['limits']['max_rows'])
end
end
+
+ local now = tostring(rspamd_util.get_time() * 1000)
+ local row = { ['rspamd_meta'] = get_general_metadata(task), ['@timestamp'] = now }
+ buffer['logs']:push(row)
+ rspamd_logger.debugm(N, task, 'saved log to buffer')
end
-local opts = rspamd_config:get_all_opt('elastic')
+local function periodic_send_data(cfg, ev_base)
+ local now = tostring(rspamd_util.get_time() * 1000)
+ local flush_needed = false
-local function check_elastic_server(cfg, ev_base, _)
+ local nlogs_total = buffer['logs']:length()
+ if nlogs_total >= settings['limits']['max_rows'] then
+ rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max rows: %s/%s', nlogs_total, settings['limits']['max_rows'])
+ flush_needed = true
+ else
+ local first_row = buffer['logs']:get(1)
+ if first_row then
+ local time_diff = now - first_row['@timestamp']
+ local time_diff_sec = lua_util.round((time_diff / 1000), 1)
+ if time_diff_sec > settings.limits.max_interval then
+ rspamd_logger.infox(rspamd_config, 'flushing buffer for %s by reaching max interval, oldest log in buffer written %s sec ago',
+ time_diff_sec, first_row['@timestamp'])
+ flush_needed = true
+ end
+ end
+ end
+
+ if flush_needed then
+ elastic_send_data(false, nil, cfg, ev_base)
+ end
+end
+
+local function configure_geoip_pipeline(cfg, ev_base)
local upstream = settings.upstream:get_upstream_round_robin()
+ local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
local ip_addr = upstream:get_addr():to_string(true)
- local plugins_url = connect_prefix .. ip_addr .. '/_nodes/' .. ingest_geoip_type
+ local geoip_url = connect_prefix .. ip_addr .. '/_ingest/pipeline/' .. settings['geoip']['pipeline_name']
+ local geoip_pipeline = {
+ description = "Add geoip info for rspamd",
+ processors = {
+ {
+ geoip = {
+ field = "rspamd_meta.ip",
+ target_field = "rspamd_meta.geoip"
+ }
+ },
+ {
+ geoip = {
+ field = "rspamd_meta.sender_ip",
+ target_field = "rspamd_meta.sender_geoip"
+ }
+ }
+ }
+ }
+
+ local function geoip_cb(err, code, body, _)
+ if err then
+ rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', geoip_url, err)
+ upstream:fail()
+ elseif code == 200 then
+ states['geoip_pipeline']['configured'] = true
+ upstream:ok()
+ else
+ rspamd_logger.errx(rspamd_config,
+ 'cannot configure elastic geoip pipeline (%s), status code: %s, response: %s',
+ geoip_url, code, body)
+ upstream:fail()
+ handle_error('configure', 'geoip_pipeline', settings['limits']['max_fail'])
+ end
+ end
+
+ rspamd_http.request({
+ url = geoip_url,
+ ev_base = ev_base,
+ config = cfg,
+ callback = geoip_cb,
+ headers = {
+ ['Host'] = host,
+ ['Content-Type'] = 'application/json',
+ },
+ body = ucl.to_format(geoip_pipeline, 'json-compact'),
+ method = 'put',
+ gzip = settings.use_gzip,
+ keepalive = settings.use_keepalive,
+ no_ssl_verify = settings.no_ssl_verify,
+ user = settings.user,
+ password = settings.password,
+ timeout = settings.timeout,
+ })
+end
+
+local function put_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json)
local function http_callback(err, code, body, _)
- if code == 200 then
- local parser = ucl.parser()
- local res, ucl_err = parser:parse_string(body)
- if not res then
- rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s',
- plugins_url, ucl_err)
- enabled = false;
- return
- end
- local obj = parser:get_object()
- for node, value in pairs(obj['nodes']) do
- local plugin_found = false
- for _, plugin in pairs(value['plugins']) do
- if plugin['name'] == 'ingest-geoip' then
- plugin_found = true
- lua_util.debugm(N, "ingest-geoip plugin has been found")
+ if err then
+ rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', policy_url, err)
+ upstream:fail()
+ elseif code == 200 or code == 201 then
+ rspamd_logger.infox(rspamd_config, 'successfully updated elastic index policy: %s', body)
+ states['index_policy']['configured'] = true
+ upstream:ok()
+ else
+ rspamd_logger.errx(rspamd_config, 'cannot configure elastic index policy (%s), status code: %s, response: %s', policy_url, code, body)
+ upstream:fail()
+ handle_error('configure', 'index_policy', settings['limits']['max_fail'])
+ end
+ end
+
+ rspamd_http.request({
+ url = policy_url,
+ ev_base = ev_base,
+ config = cfg,
+ body = index_policy_json,
+ headers = {
+ ['Host'] = host,
+ ['Content-Type'] = 'application/json',
+ },
+ method = 'put',
+ callback = http_callback,
+ gzip = settings.use_gzip,
+ keepalive = settings.use_keepalive,
+ no_ssl_verify = settings.no_ssl_verify,
+ user = settings.user,
+ password = settings.password,
+ timeout = settings.timeout,
+ })
+end
+
+local function get_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json)
+ local function http_callback(err, code, body, _)
+ if err then
+ rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', policy_url, err)
+ upstream:fail()
+ elseif code == 404 then
+ put_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json)
+ elseif code == 200 then
+ local remote_policy_parser = ucl.parser()
+ local our_policy_parser = ucl.parser()
+ local rp_res, rp_ucl_err = remote_policy_parser:parse_string(body)
+ if rp_res and not rp_ucl_err then
+ local op_res, op_ucl_err = our_policy_parser:parse_string(index_policy_json)
+ if op_res and not op_ucl_err then
+ local remote_policy = remote_policy_parser:get_object()
+ local our_policy = our_policy_parser:get_object()
+ local update_needed = false
+ if detected_distro['name'] == 'elastic' then
+ local index_policy_name = settings['index_policy']['name']
+ local current_phases = safe_get(remote_policy, index_policy_name, 'policy', 'phases')
+ if not lua_util.table_cmp(our_policy['policy']['phases'], current_phases) then
+ update_needed = true
+ end
+ elseif detected_distro['name'] == 'opensearch' then
+ local current_default_state = safe_get(remote_policy, 'policy', 'default_state')
+ local current_ism_index_patterns = safe_get(remote_policy, 'policy', 'ism_template', 1, 'index_patterns')
+ local current_states = safe_get(remote_policy, 'policy', 'states')
+ if not lua_util.table_cmp(our_policy['policy']['default_state'], current_default_state) then
+ update_needed = true
+ elseif not lua_util.table_cmp(our_policy['policy']['ism_template'][1]['index_patterns'], current_ism_index_patterns) then
+ update_needed = true
+ elseif not lua_util.table_cmp(our_policy['policy']['states'], current_states) then
+ update_needed = true
+ end
end
+ if not update_needed then
+ rspamd_logger.infox(rspamd_config, 'elastic index policy is up-to-date')
+ states['index_policy']['configured'] = true
+ else
+ if detected_distro['name'] == 'elastic' then
+ put_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json)
+ elseif detected_distro['name'] == 'opensearch' then
+ local seq_no = remote_policy['_seq_no']
+ local primary_term = remote_policy['_primary_term']
+ if type(seq_no) == 'number' and type(primary_term) == 'number' then
+ upstream:ok()
+ -- adjust policy url to include seq_no with primary_term
+ -- https://opensearch.org/docs/2.17/im-plugin/ism/api/#update-policy
+ policy_url = policy_url .. '?if_seq_no=' .. seq_no .. '&if_primary_term=' .. primary_term
+ put_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json)
+ else
+ rspamd_logger.errx(rspamd_config,
+ 'current elastic index policy (%s) not returned correct seq_no/primary_term, policy will not be updated, response: %s',
+ policy_url, body)
+ upstream:fail()
+ handle_error('validate current', 'index_policy', settings['limits']['max_fail'])
+ end
+ end
+ end
+ else
+ rspamd_logger.errx(rspamd_config, 'failed to parse our index policy for elastic: %s', op_ucl_err)
+ upstream:fail()
+ handle_error('parse our', 'index_policy', settings['limits']['max_fail'])
end
- if not plugin_found then
- rspamd_logger.infox(rspamd_config,
- 'Unable to find ingest-geoip on %1 node, disabling module', node)
- enabled = false
- return
- end
+ else
+ rspamd_logger.errx(rspamd_config, 'failed to parse remote index policy from elastic: %s', rp_ucl_err)
+ upstream:fail()
+ handle_error('parse remote', 'index_policy', settings['limits']['max_fail'])
end
else
- rspamd_logger.errx('cannot get plugins from %s: %s(%s) (%s)', plugins_url,
- err, code, body)
- enabled = false
+ rspamd_logger.errx(rspamd_config,
+ 'cannot get current elastic index policy (%s), status code: %s, response: %s',
+ policy_url, code, body)
+ handle_error('get current', 'index_policy', settings['limits']['max_fail'])
+ upstream:fail()
end
end
+
rspamd_http.request({
- url = plugins_url,
+ url = policy_url,
ev_base = ev_base,
config = cfg,
+ headers = {
+ ['Host'] = host,
+ ['Content-Type'] = 'application/json',
+ },
method = 'get',
callback = http_callback,
+ gzip = settings.use_gzip,
+ keepalive = settings.use_keepalive,
no_ssl_verify = settings.no_ssl_verify,
user = settings.user,
password = settings.password,
@@ -331,165 +930,622 @@ local function check_elastic_server(cfg, ev_base, _)
})
end
--- import ingest pipeline and kibana dashboard/visualization
-local function initial_setup(cfg, ev_base, worker)
- if not worker:is_primary_controller() then
- return
- end
-
+local function configure_index_policy(cfg, ev_base)
local upstream = settings.upstream:get_upstream_round_robin()
+ local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
local ip_addr = upstream:get_addr():to_string(true)
+ local index_policy_path = nil
+ local index_policy = {}
+ if detected_distro['name'] == 'elastic' then
+ index_policy_path = '/_ilm/policy/'
+ elseif detected_distro['name'] == 'opensearch' then
+ index_policy_path = '/_plugins/_ism/policies/'
+ end
+ local policy_url = connect_prefix .. ip_addr .. index_policy_path .. settings['index_policy']['name']
- local function push_kibana_template()
- -- add kibana dashboard and visualizations
- if settings['import_kibana'] then
- local kibana_mappings = read_file(settings['kibana_file'])
- if kibana_mappings then
- local parser = ucl.parser()
- local res, parser_err = parser:parse_string(kibana_mappings)
- if not res then
- rspamd_logger.infox(rspamd_config, 'kibana template cannot be parsed: %s',
- parser_err)
- enabled = false
-
- return
- end
- local obj = parser:get_object()
- local tbl = {}
- for _, item in ipairs(obj) do
- table.insert(tbl, '{ "index" : { "_index" : ".kibana", "_type" : "doc" ,"_id": "' ..
- item['_type'] .. ':' .. item["_id"] .. '"} }')
- table.insert(tbl, ucl.to_format(item['_source'], 'json-compact'))
- end
- table.insert(tbl, '') -- For last \n
-
- local kibana_url = connect_prefix .. ip_addr .. '/.kibana/_bulk'
- local function kibana_template_callback(err, code, body, _)
- if code ~= 200 then
- rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)', kibana_url,
- err, code, body)
- enabled = false
- else
- lua_util.debugm(N, 'pushed kibana template: %s', body)
- end
- end
+ -- ucl.to_format(obj, 'json') can't manage empty {} objects, it will be treat them as [] in json as result,
+ -- so we write {} as '{emty_object}', which allows us to replace '"{emty_object}"' string after convertion to json to '{}'
+ local index_policy_json = ''
- rspamd_http.request({
- url = kibana_url,
- ev_base = ev_base,
- config = cfg,
- headers = {
- ['Content-Type'] = 'application/x-ndjson',
+ -- elastic lifecycle policy with hot state
+ if detected_distro['name'] == 'elastic' then
+ index_policy = {
+ policy = {
+ phases = {
+ hot = {
+ min_age = '0ms',
+ actions = {
+ set_priority = {
+ priority = settings['index_policy']['hot']['index_priority'],
+ },
+ },
},
- body = table.concat(tbl, "\n"),
- method = 'post',
- gzip = settings.use_gzip,
- callback = kibana_template_callback,
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- timeout = settings.timeout,
- })
- else
- rspamd_logger.infox(rspamd_config, 'kibana template file %s not found', settings['kibana_file'])
+ },
+ },
+ }
+ -- elastic lifecycle warm
+ if settings['index_policy']['warm']['enabled'] then
+ local warm_obj = {}
+ warm_obj['min_age'] = settings['index_policy']['warm']['after']
+ warm_obj['actions'] = {
+ set_priority = {
+ priority = settings['index_policy']['warm']['index_priority'],
+ },
+ }
+ if not settings['index_policy']['warm']['migrate'] then
+ warm_obj['actions']['migrate'] = { enabled = false }
+ end
+ if settings['index_policy']['warm']['read_only'] then
+ warm_obj['actions']['readonly'] = '{empty_object}'
+ end
+ if settings['index_policy']['warm']['change_replicas'] then
+ warm_obj['actions']['allocate'] = {
+ number_of_replicas = settings['index_policy']['warm']['replicas_count'],
+ }
+ end
+ if settings['index_policy']['warm']['shrink'] then
+ if settings['index_policy']['warm']['max_gb_per_shard'] then
+ warm_obj['actions']['shrink'] = {
+ max_primary_shard_size = settings['index_policy']['warm']['max_gb_per_shard'] .. 'gb',
+ }
+ else
+ warm_obj['actions']['shrink'] = {
+ number_of_shards = settings['index_policy']['warm']['shards_count'],
+ }
+ end
end
+ if settings['index_policy']['warm']['force_merge'] then
+ warm_obj['actions']['forcemerge'] = {
+ max_num_segments = settings['index_policy']['warm']['segments_count'],
+ }
+ end
+ index_policy['policy']['phases']['warm'] = warm_obj
end
- end
-
- if enabled then
- -- create ingest pipeline
- local geoip_url = connect_prefix .. ip_addr .. '/_ingest/pipeline/rspamd-geoip'
- local function geoip_cb(err, code, body, _)
- if code ~= 200 then
- rspamd_logger.errx('cannot get data from %s: %s(%s) (%s)',
- geoip_url, err, code, body)
- enabled = false
+ -- elastic lifecycle cold
+ if settings['index_policy']['cold']['enabled'] then
+ local cold_obj = {}
+ cold_obj['min_age'] = settings['index_policy']['cold']['after']
+ cold_obj['actions'] = {
+ set_priority = {
+ priority = settings['index_policy']['cold']['index_priority'],
+ },
+ }
+ if not settings['index_policy']['cold']['migrate'] then
+ cold_obj['actions']['migrate'] = { enabled = false }
+ end
+ if settings['index_policy']['cold']['read_only'] then
+ cold_obj['actions']['readonly'] = '{empty_object}'
+ end
+ if settings['index_policy']['cold']['change_replicas'] then
+ cold_obj['actions']['allocate'] = {
+ number_of_replicas = settings['index_policy']['cold']['replicas_count'],
+ }
+ end
+ index_policy['policy']['phases']['cold'] = cold_obj
+ end
+ -- elastic lifecycle delete
+ if settings['index_policy']['delete']['enabled'] then
+ local delete_obj = {}
+ delete_obj['min_age'] = settings['index_policy']['delete']['after']
+ delete_obj['actions'] = {
+ delete = { delete_searchable_snapshot = true },
+ }
+ index_policy['policy']['phases']['delete'] = delete_obj
+ end
+ -- opensearch state policy with hot state
+ elseif detected_distro['name'] == 'opensearch' then
+ local retry = {
+ count = 3,
+ backoff = 'exponential',
+ delay = '1m',
+ }
+ index_policy = {
+ policy = {
+ description = 'Rspamd index state policy',
+ ism_template = {
+ {
+ index_patterns = { settings['index_template']['name'] .. '-*' },
+ priority = 100,
+ },
+ },
+ default_state = 'hot',
+ states = {
+ {
+ name = 'hot',
+ actions = {
+ {
+ index_priority = {
+ priority = settings['index_policy']['hot']['index_priority'],
+ },
+ retry = retry,
+ },
+ },
+ transitions = {},
+ },
+ },
+ },
+ }
+ local state_id = 1 -- includes hot state
+ -- opensearch state warm
+ if settings['index_policy']['warm']['enabled'] then
+ local prev_state_id = state_id
+ state_id = state_id + 1
+ index_policy['policy']['states'][prev_state_id]['transitions'] = {
+ {
+ state_name = 'warm',
+ conditions = {
+ min_index_age = settings['index_policy']['warm']['after']
+ },
+ },
+ }
+ local warm_obj = {
+ name = 'warm',
+ actions = {
+ {
+ index_priority = {
+ priority = settings['index_policy']['warm']['index_priority'],
+ },
+ retry = retry,
+ },
+ },
+ transitions = {},
+ }
+ table.insert(index_policy['policy']['states'], warm_obj)
+ if settings['index_policy']['warm']['read_only'] then
+ local read_only = {
+ read_only = '{empty_object}',
+ retry = retry,
+ }
+ table.insert(index_policy['policy']['states'][state_id]['actions'], read_only)
+ end
+ if settings['index_policy']['warm']['change_replicas'] then
+ local change_replicas = {
+ replica_count = {
+ number_of_replicas = settings['index_policy']['warm']['replicas_count'],
+ },
+ retry = retry,
+ }
+ table.insert(index_policy['policy']['states'][state_id]['actions'], change_replicas)
+ end
+ if settings['index_policy']['warm']['shrink'] then
+ local shrink = {
+ shrink = {},
+ retry = retry,
+ }
+ if settings['index_policy']['warm']['max_gb_per_shard'] then
+ shrink['shrink']['max_shard_size'] = settings['index_policy']['warm']['max_gb_per_shard'] .. 'gb'
+ else
+ shrink['shrink']['num_new_shards'] = settings['index_policy']['warm']['shards_count']
+ end
+ shrink['shrink']['switch_aliases'] = false
+ table.insert(index_policy['policy']['states'][state_id]['actions'], shrink)
+ end
+ if settings['index_policy']['warm']['force_merge'] then
+ local force_merge = {
+ force_merge = {
+ max_num_segments = settings['index_policy']['warm']['segments_count'],
+ },
+ retry = retry,
+ }
+ table.insert(index_policy['policy']['states'][state_id]['actions'], force_merge)
end
end
- local template = {
- description = "Add geoip info for rspamd",
- processors = {
+ -- opensearch state cold
+ if settings['index_policy']['cold']['enabled'] then
+ local prev_state_id = state_id
+ state_id = state_id + 1
+ index_policy['policy']['states'][prev_state_id]['transitions'] = {
{
- geoip = {
- field = "rspamd_meta.ip",
- target_field = "rspamd_meta.geoip"
- }
+ state_name = 'cold',
+ conditions = {
+ min_index_age = settings['index_policy']['cold']['after']
+ },
+ },
+ }
+ local cold_obj = {
+ name = 'cold',
+ actions = {
+ {
+ index_priority = {
+ priority = settings['index_policy']['cold']['index_priority'],
+ },
+ retry = retry,
+ },
+ },
+ transitions = {},
+ }
+ table.insert(index_policy['policy']['states'], cold_obj)
+ if settings['index_policy']['cold']['read_only'] then
+ local read_only = {
+ read_only = '{empty_object}',
+ retry = retry,
+ }
+ table.insert(index_policy['policy']['states'][state_id]['actions'], read_only)
+ end
+ if settings['index_policy']['cold']['change_replicas'] then
+ local change_replicas = {
+ replica_count = {
+ number_of_replicas = settings['index_policy']['cold']['replicas_count'],
+ },
+ retry = retry,
}
+ table.insert(index_policy['policy']['states'][state_id]['actions'], change_replicas)
+ end
+ end
+ -- opensearch state delete
+ if settings['index_policy']['delete']['enabled'] then
+ local prev_state_id = state_id
+ state_id = state_id + 1
+ index_policy['policy']['states'][prev_state_id]['transitions'] = {
+ {
+ state_name = 'delete',
+ conditions = {
+ min_index_age = settings['index_policy']['delete']['after']
+ },
+ },
}
- }
- rspamd_http.request({
- url = geoip_url,
- ev_base = ev_base,
- config = cfg,
- callback = geoip_cb,
- headers = {
- ['Content-Type'] = 'application/json',
+ local delete_obj = {
+ name = 'delete',
+ actions = {
+ {
+ delete = '{empty_object}',
+ retry = retry,
+ },
+ },
+ transitions = {},
+ }
+ table.insert(index_policy['policy']['states'], delete_obj)
+ end
+ end
+
+ -- finish rendering index policy, will now get current version and update it if neeeded
+ index_policy_json = ucl.to_format(index_policy, 'json-compact'):gsub('"{empty_object}"', '{}')
+ get_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json)
+end
+
+local function configure_index_template(cfg, ev_base)
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
+ local ip_addr = upstream:get_addr():to_string(true)
+ local template_url = connect_prefix .. ip_addr .. '/_index_template/' .. settings['index_template']['name']
+
+ -- common data types
+ local t_boolean_nil_true = { type = 'boolean', null_value = true }
+ local t_boolean_nil_false = { type = 'boolean', null_value = false }
+ local t_date = { type = 'date' }
+ local t_long = { type = 'long', null_value = 0 }
+ local t_float = { type = 'float', null_value = 0 }
+ local t_double = { type = 'double', null_value = 0 }
+ local t_ip = { type = 'ip', null_value = '::' }
+ local t_geo_point = { type = 'geo_point' }
+ local t_keyword = { type = 'keyword', null_value = settings['index_template']['empty_value'] }
+ local t_text = { type = 'text' }
+ local t_text_with_keyword = {
+ type = 'text',
+ fields = {
+ keyword = {
+ type = 'keyword',
+ ignore_above = settings['index_template']['dynamic_keyword_ignore_above'],
},
- gzip = settings.use_gzip,
- body = ucl.to_format(template, 'json-compact'),
- method = 'put',
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- timeout = settings.timeout,
- })
- -- create template mappings if not exist
- local template_url = connect_prefix .. ip_addr .. '/_template/rspamd'
- local function http_template_put_callback(err, code, body, _)
- if code ~= 200 then
- rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)',
- template_url, err, code, body)
- enabled = false
+ },
+ }
+
+ -- common objects types
+ local geoip_obj = {
+ dynamic = false,
+ type = 'object',
+ properties = {
+ continent_name = t_text,
+ region_iso_code = t_keyword,
+ city_name = t_text,
+ country_iso_code = t_keyword,
+ country_name = t_text,
+ location = t_geo_point,
+ region_name = t_text,
+ },
+ }
+ local asn_obj = {
+ dynamic = false,
+ type = 'object',
+ properties = {
+ country = t_keyword,
+ asn = t_long,
+ ipnet = t_keyword, -- do not use ip_range type, it's not usable for search
+ },
+ }
+ local symbols_obj = {
+ dynamic = false,
+ type = 'object',
+ properties = {
+ name = t_keyword,
+ group = t_keyword,
+ options = t_text_with_keyword,
+ score = t_double,
+ weight = t_double,
+ },
+ }
+ if settings['index_template']['symbols_nested'] then
+ symbols_obj['type'] = 'nested'
+ end
+
+ -- dynamic templates
+ local dynamic_templates_obj = {}
+ local dynamic_strings = {
+ strings = {
+ match_mapping_type = 'string',
+ mapping = {
+ type = 'text',
+ fields = {
+ keyword = {
+ type = 'keyword',
+ ignore_above = settings['index_template']['dynamic_keyword_ignore_above'],
+ },
+ },
+ },
+ },
+ }
+ table.insert(dynamic_templates_obj, dynamic_strings)
+
+ -- index template rendering
+ local index_template = {
+ index_patterns = { settings['index_template']['name'] .. '-*', },
+ priority = settings['index_template']['priority'],
+ template = {
+ settings = {
+ index = {
+ number_of_shards = settings['index_template']['shards_count'],
+ number_of_replicas = settings['index_template']['replicas_count'],
+ refresh_interval = settings['index_template']['refresh_interval'] .. 's',
+ },
+ },
+ mappings = {
+ dynamic = false,
+ dynamic_templates = dynamic_templates_obj,
+ properties = {
+ ['@timestamp'] = t_date,
+ rspamd_meta = {
+ dynamic = true,
+ type = 'object',
+ properties = {
+ rspamd_server = t_keyword,
+ action = t_keyword,
+ score = t_double,
+ symbols = symbols_obj,
+ user = t_keyword,
+ direction = t_keyword,
+ qid = t_keyword,
+ helo = t_text_with_keyword,
+ hostname = t_text_with_keyword,
+ ip = t_ip,
+ is_local = t_boolean_nil_false,
+ sender_ip = t_ip,
+ message_id = t_text_with_keyword,
+ rcpt = t_text_with_keyword,
+ from_user = t_keyword,
+ from_domain = t_keyword,
+ mime_from_user = t_keyword,
+ mime_from_domain = t_keyword,
+ settings_id = t_keyword,
+ asn = asn_obj,
+ scan_time = t_float,
+ language = t_text,
+ non_en = t_boolean_nil_true,
+ fuzzy_hashes = t_text,
+ received_delay = t_long,
+ },
+ },
+ },
+ },
+ },
+ }
+
+ -- render index lifecycle policy
+ if detected_distro['name'] == 'elastic' and settings['index_policy']['enabled'] then
+ index_template['template']['settings']['index']['lifecycle'] = {
+ name = settings['index_policy']['name']
+ }
+ end
+
+ -- render geoip mappings
+ if settings['geoip']['enabled'] then
+ index_template['template']['mappings']['properties']['rspamd_meta']['properties']['geoip'] = geoip_obj
+ index_template['template']['mappings']['properties']['rspamd_meta']['properties']['sender_geoip'] = geoip_obj
+ end
+
+ -- render collect_headers and extra_collect_headers mappings
+ for _, header in ipairs(settings['collect_headers']) do
+ local header_name = get_header_name(header)
+ if not index_template['template']['mappings']['properties']['rspamd_meta']['properties'][header_name] then
+ index_template['template']['mappings']['properties']['rspamd_meta']['properties'][header_name] = t_text_with_keyword
+ end
+ end
+ for _, header in ipairs(settings['extra_collect_headers']) do
+ local header_name = get_header_name(header)
+ if not index_template['template']['mappings']['properties']['rspamd_meta']['properties'][header_name] then
+ index_template['template']['mappings']['properties']['rspamd_meta']['properties'][header_name] = t_text_with_keyword
+ end
+ end
+
+ local function http_callback(err, code, body, _)
+ if err then
+ rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', template_url, err)
+ upstream:fail()
+ elseif code == 200 then
+ rspamd_logger.infox(rspamd_config, 'successfully updated elastic index template: %s', body)
+ states['index_template']['configured'] = true
+ upstream:ok()
+ else
+ rspamd_logger.errx(rspamd_config, 'cannot configure elastic index template (%s), status code: %s, response: %s',
+ template_url, code, body)
+ upstream:fail()
+ handle_error('configure', 'index_template', settings['limits']['max_fail'])
+ end
+ end
+
+ rspamd_http.request({
+ url = template_url,
+ ev_base = ev_base,
+ config = cfg,
+ body = ucl.to_format(index_template, 'json-compact'),
+ headers = {
+ ['Host'] = host,
+ ['Content-Type'] = 'application/json',
+ },
+ method = 'put',
+ callback = http_callback,
+ gzip = settings.use_gzip,
+ keepalive = settings.use_keepalive,
+ no_ssl_verify = settings.no_ssl_verify,
+ user = settings.user,
+ password = settings.password,
+ timeout = settings.timeout,
+ })
+end
+
+local function verify_distro(manual)
+ local detected_distro_name = detected_distro['name']
+ local detected_distro_version = detected_distro['version']
+ local valid = true
+ local valid_unknown = false
+
+ -- check that detected_distro_name is valid
+ if not detected_distro_name then
+ rspamd_logger.errx(rspamd_config, 'failed to detect elastic distribution')
+ valid = false
+ elseif not supported_distro[detected_distro_name] then
+ rspamd_logger.errx(rspamd_config, 'unsupported elastic distribution: %s', detected_distro_name)
+ valid = false
+ else
+ local supported_distro_info = supported_distro[detected_distro_name]
+ -- check that detected_distro_version is valid
+ if not detected_distro_version or type(detected_distro_version) ~= 'string' then
+ rspamd_logger.errx(rspamd_config, 'elastic version should be a string, but we received: %s', type(detected_distro_version))
+ valid = false
+ elseif detected_distro_version == '' then
+ rspamd_logger.errx(rspamd_config, 'unsupported elastic version: empty string')
+ valid = false
+ else
+ -- compare versions using compare_versions
+ local cmp_from = compare_versions(detected_distro_version, supported_distro_info['from'])
+ if cmp_from == -1 then
+ rspamd_logger.errx(rspamd_config, 'unsupported elastic version: %s, minimal supported version of %s is %s',
+ detected_distro_version, detected_distro_name, supported_distro_info['from'])
+ valid = false
else
- lua_util.debugm(N, 'pushed rspamd template: %s', body)
- push_kibana_template()
+ local cmp_till = compare_versions(detected_distro_version, supported_distro_info['till'])
+ if (cmp_till >= 0) and not supported_distro_info['till_unknown'] then
+ rspamd_logger.errx(rspamd_config, 'unsupported elastic version: %s, maximum supported version of %s is less than %s',
+ detected_distro_version, detected_distro_name, supported_distro_info['till'])
+ valid = false
+ elseif (cmp_till >= 0) and supported_distro_info['till_unknown'] then
+ rspamd_logger.warnx(rspamd_config,
+ 'compatibility of elastic version: %s is unknown, maximum known supported version of %s is less than %s, use at your own risk',
+ detected_distro_version, detected_distro_name, supported_distro_info['till'])
+ valid_unknown = true
+ end
end
end
- local function http_template_exist_callback(_, code, _, _)
- if code ~= 200 then
- rspamd_http.request({
- url = template_url,
- ev_base = ev_base,
- config = cfg,
- body = elastic_template,
- method = 'put',
- headers = {
- ['Content-Type'] = 'application/json',
- },
- gzip = settings.use_gzip,
- callback = http_template_put_callback,
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- timeout = settings.timeout,
- })
+ end
+
+ if valid_unknown then
+ detected_distro['supported'] = true
+ else
+ if valid and manual then
+ rspamd_logger.infox(
+ rspamd_config, 'assuming elastic distro: %s, version: %s', detected_distro_name, detected_distro_version)
+ detected_distro['supported'] = true
+ elseif valid and not manual then
+ rspamd_logger.infox(rspamd_config, 'successfully connected to elastic distro: %s, version: %s',
+ detected_distro_name, detected_distro_version)
+ detected_distro['supported'] = true
+ else
+ handle_error('configure','distro',settings['version']['autodetect_max_fail'])
+ end
+ end
+end
+
+local function configure_distro(cfg, ev_base)
+ if not settings['version']['autodetect_enabled'] then
+ detected_distro['name'] = settings['version']['override']['name']
+ detected_distro['version'] = settings['version']['override']['version']
+ rspamd_logger.infox(rspamd_config, 'automatic detection of elastic distro and version is disabled, taking configuration from settings')
+ verify_distro(true)
+ end
+
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
+ local ip_addr = upstream:get_addr():to_string(true)
+ local root_url = connect_prefix .. ip_addr .. '/'
+ local function http_callback(err, code, body, _)
+ if err then
+ rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', root_url, err)
+ upstream:fail()
+ elseif code ~= 200 then
+ rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s), status code: %s, response: %s', root_url, code, body)
+ upstream:fail()
+ else
+ local parser = ucl.parser()
+ local res, ucl_err = parser:parse_string(body)
+ if not res then
+ rspamd_logger.errx(rspamd_config, 'failed to parse reply from elastic (%s): %s', root_url, ucl_err)
+ upstream:fail()
else
- push_kibana_template()
+ local obj = parser:get_object()
+ if obj['tagline'] == "The OpenSearch Project: https://opensearch.org/" then
+ detected_distro['name'] = 'opensearch'
+ end
+ if obj['tagline'] == "You Know, for Search" then
+ detected_distro['name'] = 'elastic'
+ end
+ if obj['version'] then
+ if obj['version']['number'] then
+ detected_distro['version'] = obj['version']['number']
+ end
+ if not detected_distro['name'] and obj['version']['distribution'] then
+ detected_distro['name'] = obj['version']['distribution']
+ end
+ end
+ verify_distro()
+ if detected_distro['supported'] then
+ upstream:ok()
+ end
end
end
+ end
+ if settings['version']['autodetect_enabled'] then
rspamd_http.request({
- url = template_url,
+ url = root_url,
ev_base = ev_base,
config = cfg,
- method = 'head',
- callback = http_template_exist_callback,
+ headers = {
+ ['Host'] = host,
+ ['Content-Type'] = 'application/json',
+ },
+ method = 'get',
+ callback = http_callback,
+ gzip = settings.use_gzip,
+ keepalive = settings.use_keepalive,
no_ssl_verify = settings.no_ssl_verify,
user = settings.user,
password = settings.password,
timeout = settings.timeout,
})
-
end
end
-redis_params = rspamd_redis.parse_redis_server('elastic')
+local opts = rspamd_config:get_all_opt('elastic')
-if redis_params and opts then
- for k, v in pairs(opts) do
+if opts then
+ for k,v in pairs(opts) do
settings[k] = v
end
+ if not settings['enabled'] then
+ rspamd_logger.infox(rspamd_config, 'module disabled in config')
+ lua_util.disable_module(N, "config")
+ end
+
if not settings['server'] and not settings['servers'] then
rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
lua_util.disable_module(N, "config")
@@ -498,29 +1554,10 @@ if redis_params and opts then
connect_prefix = 'https://'
end
- if settings.ingest_module then
- ingest_geoip_type = 'modules'
- end
-
- settings.upstream = upstream_list.create(rspamd_config,
- settings['server'] or settings['servers'], 9200)
+ settings.upstream = upstream_list.create(rspamd_config, settings['server'] or settings['servers'], 9200)
if not settings.upstream then
- rspamd_logger.errx('cannot parse elastic address: %s',
- settings['server'] or settings['servers'])
- lua_util.disable_module(N, "config")
- return
- end
- if not settings['template_file'] then
- rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module')
- lua_util.disable_module(N, "config")
- return
- end
-
- elastic_template = read_file(settings['template_file']);
- if not elastic_template then
- rspamd_logger.infox(rspamd_config, 'elastic unable to read %s, disabling module',
- settings['template_file'])
+ rspamd_logger.errx(rspamd_config, 'cannot parse elastic address: %s', settings['server'] or settings['servers'])
lua_util.disable_module(N, "config")
return
end
@@ -533,12 +1570,79 @@ if redis_params and opts then
augmentations = { string.format("timeout=%f", settings.timeout) },
})
+ -- send tail of data if worker going to stop
+ rspamd_config:register_finish_script(function(task)
+ local nlogs_total = buffer['logs']:length()
+ if nlogs_total > 0 then
+ rspamd_logger.debugm(N, task, 'flushing buffer on shutdown, buffer size: %s', nlogs_total)
+ elastic_send_data(true, task)
+ end
+ end)
+
rspamd_config:add_on_load(function(cfg, ev_base, worker)
if worker:is_scanner() then
- check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements
- initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations
+ rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base)
+ if not detected_distro['supported'] then
+ if states['distro']['configured'] then
+ return false -- stop running periodic job
+ else
+ configure_distro(p_cfg, p_ev_base)
+ return true -- continue running periodic job
+ end
+ end
+ end)
+ -- send data periodically if any of limits reached
+ rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base)
+ if detected_distro['supported'] then
+ periodic_send_data(p_cfg, p_ev_base)
+ end
+ return true
+ end)
+ end
+ if worker:is_primary_controller() then
+ rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base)
+ if not settings['index_template']['managed'] then
+ return false
+ elseif not detected_distro['supported'] then
+ return true
+ else
+ if states['index_template']['configured'] then
+ return false
+ else
+ configure_index_template(p_cfg, p_ev_base)
+ return true
+ end
+ end
+ end)
+ rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base)
+ if not settings['index_policy']['enabled'] or not settings['index_policy']['managed'] then
+ return false
+ elseif not detected_distro['supported'] then
+ return true
+ else
+ if states['index_policy']['configured'] then
+ return false
+ else
+ configure_index_policy(p_cfg, p_ev_base)
+ return true
+ end
+ end
+ end)
+ rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base)
+ if not settings['geoip']['enabled'] or not settings['geoip']['managed'] then
+ return false
+ elseif not detected_distro['supported'] then
+ return true
+ else
+ if states['geoip_pipeline']['configured'] then
+ return false
+ else
+ configure_geoip_pipeline(p_cfg, p_ev_base)
+ return true
+ end
+ end
+ end)
end
end)
end
-
end