diff options
author | Vsevolod Stakhov <vsevolod@rspamd.com> | 2023-08-07 11:41:28 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rspamd.com> | 2023-08-07 11:41:28 +0100 |
commit | 662145d0554de5e769b92dab2d41173a98adcee5 (patch) | |
tree | ec28311a0bce6181f248ba7b50304293ad764e44 /src/plugins/lua/elastic.lua | |
parent | bbd88232db43d18f5e0de5a6502848d4074621c5 (diff) | |
download | rspamd-662145d0554de5e769b92dab2d41173a98adcee5.tar.gz rspamd-662145d0554de5e769b92dab2d41173a98adcee5.zip |
[Minor] Reformat all Lua code, no functional changes
Diffstat (limited to 'src/plugins/lua/elastic.lua')
-rw-r--r-- | src/plugins/lua/elastic.lua | 91 |
1 files changed, 50 insertions, 41 deletions
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index ce2d01b80..ccbb7c198 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -43,7 +43,7 @@ local settings = { limit = 500, index_pattern = 'rspamd-%Y.%m.%d', template_file = rspamd_paths['SHAREDIR'] .. '/elastic/rspamd_template.json', - kibana_file = rspamd_paths['SHAREDIR'] ..'/elastic/kibana.json', + kibana_file = rspamd_paths['SHAREDIR'] .. '/elastic/kibana.json', key_prefix = 'elastic-', expire = 3600, timeout = 5.0, @@ -61,23 +61,25 @@ local settings = { } local function read_file(path) - local file = io.open(path, "rb") - if not file then return nil end - local content = file:read "*a" - file:close() - return content + local file = io.open(path, "rb") + if not file then + return nil + end + local content = file:read "*a" + file:close() + return content end local function elastic_send_data(task) local es_index = os.date(settings['index_pattern']) local tbl = {} - for _,value in pairs(rows) do + for _, value in pairs(rows) do if settings.elasticsearch_version >= 7 then - table.insert(tbl, '{ "index" : { "_index" : "'..es_index.. - '","pipeline": "rspamd-geoip"} }') + table.insert(tbl, '{ "index" : { "_index" : "' .. es_index .. + '","pipeline": "rspamd-geoip"} }') else - table.insert(tbl, '{ "index" : { "_index" : "'..es_index.. - '", "_type" : "_doc" ,"pipeline": "rspamd-geoip"} }') + table.insert(tbl, '{ "index" : { "_index" : "' .. es_index .. + '", "_type" : "_doc" ,"pipeline": "rspamd-geoip"} }') end table.insert(tbl, ucl.to_format(value, 'json-compact')) end @@ -87,7 +89,7 @@ local function elastic_send_data(task) local upstream = settings.upstream:get_upstream_round_robin() local ip_addr = upstream:get_addr():to_string(true) - local push_url = connect_prefix .. ip_addr .. '/'..es_index..'/_bulk' + local push_url = connect_prefix .. ip_addr .. '/' .. es_index .. '/_bulk' local bulk_json = table.concat(tbl, "\n") local function http_callback(err, code, _, _) @@ -127,7 +129,7 @@ local function get_general_metadata(task) local r = {} local ip_addr = task:get_ip() - if ip_addr and ip_addr:is_valid() then + if ip_addr and ip_addr:is_valid() then r.is_local = ip_addr:is_local() r.ip = tostring(ip_addr) else @@ -153,10 +155,10 @@ local function get_general_metadata(task) r.action = task:get_metric_action() r.rspamd_server = HOSTNAME if r.user ~= 'unknown' then - r.direction = "Outbound" + r.direction = "Outbound" end local s = task:get_metric_score()[1] - r.score = s + r.score = s local rcpt = task:get_recipients('smtp') if rcpt then @@ -169,14 +171,14 @@ local function get_general_metadata(task) r.rcpt = 'unknown' end - local from = task:get_from{'smtp', 'orig'} + local from = task:get_from { 'smtp', 'orig' } if ((from or E)[1] or E).addr then r.from = from[1].addr else r.from = 'unknown' end - local mime_from = task:get_from{'mime', 'orig'} + local mime_from = task:get_from { 'mime', 'orig' } if ((mime_from or E)[1] or E).addr then r.mime_from = mime_from[1].addr else @@ -188,7 +190,7 @@ local function get_general_metadata(task) r.asn = {} local pool = task:get_mempool() r.asn.country = pool:get_variable("country") or 'unknown' - r.asn.asn = pool:get_variable("asn") or 0 + r.asn.asn = pool:get_variable("asn") or 0 r.asn.ipnet = pool:get_variable("ipnet") or 'unknown' local function process_header(name) @@ -244,12 +246,18 @@ local function get_general_metadata(task) end local function elastic_collect(task) - if not enabled then return end - if task:has_flag('skip') then return end - if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then return end + if not enabled then + return + end + if task:has_flag('skip') then + return + end + if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then + return + end - local row = {['rspamd_meta'] = get_general_metadata(task), - ['@timestamp'] = tostring(util.get_time() * 1000)} + local row = { ['rspamd_meta'] = get_general_metadata(task), + ['@timestamp'] = tostring(util.get_time() * 1000) } table.insert(rows, row) nrows = nrows + 1 if nrows > settings['limit'] then @@ -272,7 +280,6 @@ local function elastic_collect(task) end end - local opts = rspamd_config:get_all_opt('elastic') local function check_elastic_server(cfg, ev_base, _) @@ -282,7 +289,7 @@ local function check_elastic_server(cfg, ev_base, _) local function http_callback(err, code, body, _) if code == 200 then local parser = ucl.parser() - local res,ucl_err = parser:parse_string(body) + local res, ucl_err = parser:parse_string(body) if not res then rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s', plugins_url, ucl_err) @@ -290,9 +297,9 @@ local function check_elastic_server(cfg, ev_base, _) return end local obj = parser:get_object() - for node,value in pairs(obj['nodes']) do + for node, value in pairs(obj['nodes']) do local plugin_found = false - for _,plugin in pairs(value['plugins']) do + for _, plugin in pairs(value['plugins']) do if plugin['name'] == 'ingest-geoip' then plugin_found = true lua_util.debugm(N, "ingest-geoip plugin has been found") @@ -326,7 +333,9 @@ end -- import ingest pipeline and kibana dashboard/visualization local function initial_setup(cfg, ev_base, worker) - if not worker:is_primary_controller() then return end + if not worker:is_primary_controller() then + return + end local upstream = settings.upstream:get_upstream_round_robin() local ip_addr = upstream:get_addr():to_string(true) @@ -337,7 +346,7 @@ local function initial_setup(cfg, ev_base, worker) local kibana_mappings = read_file(settings['kibana_file']) if kibana_mappings then local parser = ucl.parser() - local res,parser_err = parser:parse_string(kibana_mappings) + local res, parser_err = parser:parse_string(kibana_mappings) if not res then rspamd_logger.infox(rspamd_config, 'kibana template cannot be parsed: %s', parser_err) @@ -347,14 +356,14 @@ local function initial_setup(cfg, ev_base, worker) end local obj = parser:get_object() local tbl = {} - for _,item in ipairs(obj) do - table.insert(tbl, '{ "index" : { "_index" : ".kibana", "_type" : "doc" ,"_id": "'.. - item['_type'] .. ':' .. item["_id"]..'"} }') + for _, item in ipairs(obj) do + table.insert(tbl, '{ "index" : { "_index" : ".kibana", "_type" : "doc" ,"_id": "' .. + item['_type'] .. ':' .. item["_id"] .. '"} }') table.insert(tbl, ucl.to_format(item['_source'], 'json-compact')) end table.insert(tbl, '') -- For last \n - local kibana_url = connect_prefix .. ip_addr ..'/.kibana/_bulk' + local kibana_url = connect_prefix .. ip_addr .. '/.kibana/_bulk' local function kibana_template_callback(err, code, body, _) if code ~= 200 then rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)', kibana_url, @@ -389,7 +398,7 @@ local function initial_setup(cfg, ev_base, worker) if enabled then -- create ingest pipeline - local geoip_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip' + local geoip_url = connect_prefix .. ip_addr .. '/_ingest/pipeline/rspamd-geoip' local function geoip_cb(err, code, body, _) if code ~= 200 then rspamd_logger.errx('cannot get data from %s: %s(%s) (%s)', @@ -425,7 +434,7 @@ local function initial_setup(cfg, ev_base, worker) timeout = settings.timeout, }) -- create template mappings if not exist - local template_url = connect_prefix .. ip_addr ..'/_template/rspamd' + local template_url = connect_prefix .. ip_addr .. '/_template/rspamd' local function http_template_put_callback(err, code, body, _) if code ~= 200 then rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)', @@ -477,7 +486,7 @@ end redis_params = rspamd_redis.parse_redis_server('elastic') if redis_params and opts then - for k,v in pairs(opts) do + for k, v in pairs(opts) do settings[k] = v end @@ -494,11 +503,11 @@ if redis_params and opts then end settings.upstream = upstream_list.create(rspamd_config, - settings['server'] or settings['servers'], 9200) + settings['server'] or settings['servers'], 9200) if not settings.upstream then rspamd_logger.errx('cannot parse elastic address: %s', - settings['server'] or settings['servers']) + settings['server'] or settings['servers']) lua_util.disable_module(N, "config") return end @@ -511,7 +520,7 @@ if redis_params and opts then elastic_template = read_file(settings['template_file']); if not elastic_template then rspamd_logger.infox(rspamd_config, 'elastic unable to read %s, disabling module', - settings['template_file']) + settings['template_file']) lua_util.disable_module(N, "config") return end @@ -521,10 +530,10 @@ if redis_params and opts then type = 'idempotent', callback = elastic_collect, flags = 'empty,explicit_disable,ignore_passthrough', - augmentations = {string.format("timeout=%f", settings.timeout)}, + augmentations = { string.format("timeout=%f", settings.timeout) }, }) - rspamd_config:add_on_load(function(cfg, ev_base,worker) + rspamd_config:add_on_load(function(cfg, ev_base, worker) if worker:is_scanner() then check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations |