aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/plugins/lua/elastic.lua83
1 files changed, 21 insertions, 62 deletions
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua
index d553a65c6..b2c6d36ba 100644
--- a/src/plugins/lua/elastic.lua
+++ b/src/plugins/lua/elastic.lua
@@ -230,9 +230,11 @@ function Queue:pop_first(count)
local popped_items = {}
count = count or self:length()
local actual_count = math.min(count, self:length())
- for i = 1, actual_count do
+ local n = 0
+ while n < actual_count do
local item = self:pop()
table.insert(popped_items, item)
+ n = n + 1
end
return popped_items
end
@@ -262,49 +264,6 @@ local function safe_get(table, ...)
return value
end
-local function deep_compare(t1, t2, visited)
- if t1 == t2 then
- return true
- end
-
- if type(t1) ~= "table" or type(t2) ~= "table" then
- return false
- end
-
- -- use visited to keep track of already compared tables to handle cycles
- visited = visited or {}
- if visited[t1] and visited[t1][t2] then
- return true
- end
-
- visited[t1] = visited[t1] or {}
- visited[t1][t2] = true
-
- -- compare the number of keys in both tables
- local t1len = 0
- for _ in pairs(t1) do
- t1len = t1len + 1
- end
-
- local t2len = 0
- for _ in pairs(t2) do
- t2len = t2len + 1
- end
-
- if t1len ~= t2len then
- return false
- end
-
- -- recursively compare each key-value pair
- for k, v1 in pairs(t1) do
- local v2 = t2[k]
- if v2 == nil or not deep_compare(v1, v2, visited) then
- return false
- end
- end
- return true
-end
-
local function compare_versions(v1, v2)
-- helper function to extract the numeric version string
local function extract_numeric_version(version)
@@ -839,28 +798,28 @@ local function get_index_policy(cfg, ev_base, upstream, host, policy_url, index_
elseif code == 200 then
local remote_policy_parser = ucl.parser()
local our_policy_parser = ucl.parser()
- local res, ucl_err = remote_policy_parser:parse_string(body)
- if not ucl_err and res then
- local res, ucl_err = our_policy_parser:parse_string(index_policy_json)
- if not ucl_err and res then
+ local rp_res, rp_ucl_err = remote_policy_parser:parse_string(body)
+ if rp_res and not rp_ucl_err then
+ local op_res, op_ucl_err = our_policy_parser:parse_string(index_policy_json)
+ if op_res and not op_ucl_err then
local remote_policy = remote_policy_parser:get_object()
local our_policy = our_policy_parser:get_object()
local update_needed = false
if detected_distro['name'] == 'elastic' then
local index_policy_name = settings['index_policy']['name']
local current_phases = safe_get(remote_policy, index_policy_name, 'policy', 'phases')
- if not deep_compare(our_policy['policy']['phases'], current_phases) then
+ if not lua_util.table_cmp(our_policy['policy']['phases'], current_phases) then
update_needed = true
end
elseif detected_distro['name'] == 'opensearch' then
local current_default_state = safe_get(remote_policy, 'policy', 'default_state')
local current_ism_index_patterns = safe_get(remote_policy, 'policy', 'ism_template', 1, 'index_patterns')
local current_states = safe_get(remote_policy, 'policy', 'states')
- if not deep_compare(our_policy['policy']['default_state'], current_default_state) then
+ if not lua_util.table_cmp(our_policy['policy']['default_state'], current_default_state) then
update_needed = true
- elseif not deep_compare(our_policy['policy']['ism_template'][1]['index_patterns'], current_ism_index_patterns) then
+ elseif not lua_util.table_cmp(our_policy['policy']['ism_template'][1]['index_patterns'], current_ism_index_patterns) then
update_needed = true
- elseif not deep_compare(our_policy['policy']['states'], current_states) then
+ elseif not lua_util.table_cmp(our_policy['policy']['states'], current_states) then
update_needed = true
end
end
@@ -1574,26 +1533,26 @@ if opts then
rspamd_config:add_on_load(function(cfg, ev_base, worker)
if worker:is_scanner() then
- rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base)
+ rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base)
if not detected_distro['supported'] then
if states['distro']['configured'] then
return false -- stop running periodic job
else
- configure_distro(cfg, ev_base)
+ configure_distro(p_cfg, p_ev_base)
return true -- continue running periodic job
end
end
end)
-- send data periodically if any of limits reached
- rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base)
+ rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base)
if detected_distro['supported'] then
- periodic_send_data(cfg, ev_base)
+ periodic_send_data(p_cfg, p_ev_base)
end
return true
end)
end
if worker:is_primary_controller() then
- rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base)
+ rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base)
if not settings['index_template']['managed'] then
return false
elseif not detected_distro['supported'] then
@@ -1602,12 +1561,12 @@ if opts then
if states['index_template']['configured'] then
return false
else
- configure_index_template(cfg, ev_base)
+ configure_index_template(p_cfg, p_ev_base)
return true
end
end
end)
- rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base)
+ rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base)
if not settings['index_policy']['enabled'] or not settings['index_policy']['managed'] then
return false
elseif not detected_distro['supported'] then
@@ -1616,12 +1575,12 @@ if opts then
if states['index_policy']['configured'] then
return false
else
- configure_index_policy(cfg, ev_base)
+ configure_index_policy(p_cfg, p_ev_base)
return true
end
end
end)
- rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base)
+ rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(p_cfg, p_ev_base)
if not settings['geoip']['enabled'] or not settings['geoip']['managed'] then
return false
elseif not detected_distro['supported'] then
@@ -1630,7 +1589,7 @@ if opts then
if states['geoip_pipeline']['configured'] then
return false
else
- configure_geoip_pipeline(cfg, ev_base)
+ configure_geoip_pipeline(p_cfg, p_ev_base)
return true
end
end