aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/lua/elastic.lua
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2023-08-07 11:41:28 +0100
committerVsevolod Stakhov <vsevolod@rspamd.com>2023-08-07 11:41:28 +0100
commit662145d0554de5e769b92dab2d41173a98adcee5 (patch)
treeec28311a0bce6181f248ba7b50304293ad764e44 /src/plugins/lua/elastic.lua
parentbbd88232db43d18f5e0de5a6502848d4074621c5 (diff)
downloadrspamd-662145d0554de5e769b92dab2d41173a98adcee5.tar.gz
rspamd-662145d0554de5e769b92dab2d41173a98adcee5.zip
[Minor] Reformat all Lua code, no functional changes
Diffstat (limited to 'src/plugins/lua/elastic.lua')
-rw-r--r--src/plugins/lua/elastic.lua91
1 files changed, 50 insertions, 41 deletions
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua
index ce2d01b80..ccbb7c198 100644
--- a/src/plugins/lua/elastic.lua
+++ b/src/plugins/lua/elastic.lua
@@ -43,7 +43,7 @@ local settings = {
limit = 500,
index_pattern = 'rspamd-%Y.%m.%d',
template_file = rspamd_paths['SHAREDIR'] .. '/elastic/rspamd_template.json',
- kibana_file = rspamd_paths['SHAREDIR'] ..'/elastic/kibana.json',
+ kibana_file = rspamd_paths['SHAREDIR'] .. '/elastic/kibana.json',
key_prefix = 'elastic-',
expire = 3600,
timeout = 5.0,
@@ -61,23 +61,25 @@ local settings = {
}
local function read_file(path)
- local file = io.open(path, "rb")
- if not file then return nil end
- local content = file:read "*a"
- file:close()
- return content
+ local file = io.open(path, "rb")
+ if not file then
+ return nil
+ end
+ local content = file:read "*a"
+ file:close()
+ return content
end
local function elastic_send_data(task)
local es_index = os.date(settings['index_pattern'])
local tbl = {}
- for _,value in pairs(rows) do
+ for _, value in pairs(rows) do
if settings.elasticsearch_version >= 7 then
- table.insert(tbl, '{ "index" : { "_index" : "'..es_index..
- '","pipeline": "rspamd-geoip"} }')
+ table.insert(tbl, '{ "index" : { "_index" : "' .. es_index ..
+ '","pipeline": "rspamd-geoip"} }')
else
- table.insert(tbl, '{ "index" : { "_index" : "'..es_index..
- '", "_type" : "_doc" ,"pipeline": "rspamd-geoip"} }')
+ table.insert(tbl, '{ "index" : { "_index" : "' .. es_index ..
+ '", "_type" : "_doc" ,"pipeline": "rspamd-geoip"} }')
end
table.insert(tbl, ucl.to_format(value, 'json-compact'))
end
@@ -87,7 +89,7 @@ local function elastic_send_data(task)
local upstream = settings.upstream:get_upstream_round_robin()
local ip_addr = upstream:get_addr():to_string(true)
- local push_url = connect_prefix .. ip_addr .. '/'..es_index..'/_bulk'
+ local push_url = connect_prefix .. ip_addr .. '/' .. es_index .. '/_bulk'
local bulk_json = table.concat(tbl, "\n")
local function http_callback(err, code, _, _)
@@ -127,7 +129,7 @@ local function get_general_metadata(task)
local r = {}
local ip_addr = task:get_ip()
- if ip_addr and ip_addr:is_valid() then
+ if ip_addr and ip_addr:is_valid() then
r.is_local = ip_addr:is_local()
r.ip = tostring(ip_addr)
else
@@ -153,10 +155,10 @@ local function get_general_metadata(task)
r.action = task:get_metric_action()
r.rspamd_server = HOSTNAME
if r.user ~= 'unknown' then
- r.direction = "Outbound"
+ r.direction = "Outbound"
end
local s = task:get_metric_score()[1]
- r.score = s
+ r.score = s
local rcpt = task:get_recipients('smtp')
if rcpt then
@@ -169,14 +171,14 @@ local function get_general_metadata(task)
r.rcpt = 'unknown'
end
- local from = task:get_from{'smtp', 'orig'}
+ local from = task:get_from { 'smtp', 'orig' }
if ((from or E)[1] or E).addr then
r.from = from[1].addr
else
r.from = 'unknown'
end
- local mime_from = task:get_from{'mime', 'orig'}
+ local mime_from = task:get_from { 'mime', 'orig' }
if ((mime_from or E)[1] or E).addr then
r.mime_from = mime_from[1].addr
else
@@ -188,7 +190,7 @@ local function get_general_metadata(task)
r.asn = {}
local pool = task:get_mempool()
r.asn.country = pool:get_variable("country") or 'unknown'
- r.asn.asn = pool:get_variable("asn") or 0
+ r.asn.asn = pool:get_variable("asn") or 0
r.asn.ipnet = pool:get_variable("ipnet") or 'unknown'
local function process_header(name)
@@ -244,12 +246,18 @@ local function get_general_metadata(task)
end
local function elastic_collect(task)
- if not enabled then return end
- if task:has_flag('skip') then return end
- if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then return end
+ if not enabled then
+ return
+ end
+ if task:has_flag('skip') then
+ return
+ end
+ if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then
+ return
+ end
- local row = {['rspamd_meta'] = get_general_metadata(task),
- ['@timestamp'] = tostring(util.get_time() * 1000)}
+ local row = { ['rspamd_meta'] = get_general_metadata(task),
+ ['@timestamp'] = tostring(util.get_time() * 1000) }
table.insert(rows, row)
nrows = nrows + 1
if nrows > settings['limit'] then
@@ -272,7 +280,6 @@ local function elastic_collect(task)
end
end
-
local opts = rspamd_config:get_all_opt('elastic')
local function check_elastic_server(cfg, ev_base, _)
@@ -282,7 +289,7 @@ local function check_elastic_server(cfg, ev_base, _)
local function http_callback(err, code, body, _)
if code == 200 then
local parser = ucl.parser()
- local res,ucl_err = parser:parse_string(body)
+ local res, ucl_err = parser:parse_string(body)
if not res then
rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s',
plugins_url, ucl_err)
@@ -290,9 +297,9 @@ local function check_elastic_server(cfg, ev_base, _)
return
end
local obj = parser:get_object()
- for node,value in pairs(obj['nodes']) do
+ for node, value in pairs(obj['nodes']) do
local plugin_found = false
- for _,plugin in pairs(value['plugins']) do
+ for _, plugin in pairs(value['plugins']) do
if plugin['name'] == 'ingest-geoip' then
plugin_found = true
lua_util.debugm(N, "ingest-geoip plugin has been found")
@@ -326,7 +333,9 @@ end
-- import ingest pipeline and kibana dashboard/visualization
local function initial_setup(cfg, ev_base, worker)
- if not worker:is_primary_controller() then return end
+ if not worker:is_primary_controller() then
+ return
+ end
local upstream = settings.upstream:get_upstream_round_robin()
local ip_addr = upstream:get_addr():to_string(true)
@@ -337,7 +346,7 @@ local function initial_setup(cfg, ev_base, worker)
local kibana_mappings = read_file(settings['kibana_file'])
if kibana_mappings then
local parser = ucl.parser()
- local res,parser_err = parser:parse_string(kibana_mappings)
+ local res, parser_err = parser:parse_string(kibana_mappings)
if not res then
rspamd_logger.infox(rspamd_config, 'kibana template cannot be parsed: %s',
parser_err)
@@ -347,14 +356,14 @@ local function initial_setup(cfg, ev_base, worker)
end
local obj = parser:get_object()
local tbl = {}
- for _,item in ipairs(obj) do
- table.insert(tbl, '{ "index" : { "_index" : ".kibana", "_type" : "doc" ,"_id": "'..
- item['_type'] .. ':' .. item["_id"]..'"} }')
+ for _, item in ipairs(obj) do
+ table.insert(tbl, '{ "index" : { "_index" : ".kibana", "_type" : "doc" ,"_id": "' ..
+ item['_type'] .. ':' .. item["_id"] .. '"} }')
table.insert(tbl, ucl.to_format(item['_source'], 'json-compact'))
end
table.insert(tbl, '') -- For last \n
- local kibana_url = connect_prefix .. ip_addr ..'/.kibana/_bulk'
+ local kibana_url = connect_prefix .. ip_addr .. '/.kibana/_bulk'
local function kibana_template_callback(err, code, body, _)
if code ~= 200 then
rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)', kibana_url,
@@ -389,7 +398,7 @@ local function initial_setup(cfg, ev_base, worker)
if enabled then
-- create ingest pipeline
- local geoip_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip'
+ local geoip_url = connect_prefix .. ip_addr .. '/_ingest/pipeline/rspamd-geoip'
local function geoip_cb(err, code, body, _)
if code ~= 200 then
rspamd_logger.errx('cannot get data from %s: %s(%s) (%s)',
@@ -425,7 +434,7 @@ local function initial_setup(cfg, ev_base, worker)
timeout = settings.timeout,
})
-- create template mappings if not exist
- local template_url = connect_prefix .. ip_addr ..'/_template/rspamd'
+ local template_url = connect_prefix .. ip_addr .. '/_template/rspamd'
local function http_template_put_callback(err, code, body, _)
if code ~= 200 then
rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)',
@@ -477,7 +486,7 @@ end
redis_params = rspamd_redis.parse_redis_server('elastic')
if redis_params and opts then
- for k,v in pairs(opts) do
+ for k, v in pairs(opts) do
settings[k] = v
end
@@ -494,11 +503,11 @@ if redis_params and opts then
end
settings.upstream = upstream_list.create(rspamd_config,
- settings['server'] or settings['servers'], 9200)
+ settings['server'] or settings['servers'], 9200)
if not settings.upstream then
rspamd_logger.errx('cannot parse elastic address: %s',
- settings['server'] or settings['servers'])
+ settings['server'] or settings['servers'])
lua_util.disable_module(N, "config")
return
end
@@ -511,7 +520,7 @@ if redis_params and opts then
elastic_template = read_file(settings['template_file']);
if not elastic_template then
rspamd_logger.infox(rspamd_config, 'elastic unable to read %s, disabling module',
- settings['template_file'])
+ settings['template_file'])
lua_util.disable_module(N, "config")
return
end
@@ -521,10 +530,10 @@ if redis_params and opts then
type = 'idempotent',
callback = elastic_collect,
flags = 'empty,explicit_disable,ignore_passthrough',
- augmentations = {string.format("timeout=%f", settings.timeout)},
+ augmentations = { string.format("timeout=%f", settings.timeout) },
})
- rspamd_config:add_on_load(function(cfg, ev_base,worker)
+ rspamd_config:add_on_load(function(cfg, ev_base, worker)
if worker:is_scanner() then
check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements
initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations