summaryrefslogtreecommitdiffstats
path: root/lualib/lua_redis.lua
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2020-02-14 13:19:54 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2020-02-14 13:19:54 +0000
commitd0d0f333d3fd5d10bc6b88dd364cb792e326c8c2 (patch)
tree9989659cae68846c4f88f1d042ae7bb6175dabca /lualib/lua_redis.lua
parente36edd1c6074f882f11b5f47795dea8ba0460b4f (diff)
downloadrspamd-d0d0f333d3fd5d10bc6b88dd364cb792e326c8c2.tar.gz
rspamd-d0d0f333d3fd5d10bc6b88dd364cb792e326c8c2.zip
[Fix] Fix sentinel connections leak by using async connections
Diffstat (limited to 'lualib/lua_redis.lua')
-rw-r--r--lualib/lua_redis.lua159
1 files changed, 96 insertions, 63 deletions
diff --git a/lualib/lua_redis.lua b/lualib/lua_redis.lua
index fb98a5b9c..017381e02 100644
--- a/lualib/lua_redis.lua
+++ b/lualib/lua_redis.lua
@@ -66,76 +66,115 @@ local function redis_query_sentinel(ev_base, params, initialised)
local rspamd_redis = require "rspamd_redis"
local addr = params.sentinels:get_upstream_round_robin()
- local is_ok, connection = rspamd_redis.connect_sync({
- host = addr:get_addr(),
- timeout = params.timeout,
- config = rspamd_config,
- ev_base = ev_base,
- no_pool = true,
- })
-
- if not is_ok then
- logger.errx(rspamd_config, 'cannot connect sentinel at address: %s',
- tostring(addr:get_addr()))
- addr:fail()
-
- return
- end
+ local host = addr:get_addr()
+ local masters = {}
+ local process_masters -- Function that is called to process masters data
- -- Get masters list
- connection:add_cmd('SENTINEL', {'masters'})
+ local function masters_cb(err, result)
+ if not err and result and type(result) == 'table' then
- local ok,result = connection:exec()
+ local pending_subrequests = 0
- if ok and result and type(result) == 'table' then
- local masters = {}
- for _,m in ipairs(result) do
- local master = flatten_redis_table(m)
+ for _,m in ipairs(result) do
+ local master = flatten_redis_table(m)
- -- Wrap IPv6-adresses in brackets
- if (master.ip:match(":")) then
- master.ip = "["..master.ip.."]"
- end
+ -- Wrap IPv6-adresses in brackets
+ if (master.ip:match(":")) then
+ master.ip = "["..master.ip.."]"
+ end
- if params.sentinel_masters_pattern then
- if master.name:match(params.sentinel_masters_pattern) then
+ if params.sentinel_masters_pattern then
+ if master.name:match(params.sentinel_masters_pattern) then
+ lutil.debugm(N, 'found master %s with ip %s and port %s',
+ master.name, master.ip, master.port)
+ masters[master.name] = master
+ else
+ lutil.debugm(N, 'skip master %s with ip %s and port %s, pattern %s',
+ master.name, master.ip, master.port, params.sentinel_masters_pattern)
+ end
+ else
lutil.debugm(N, 'found master %s with ip %s and port %s',
master.name, master.ip, master.port)
masters[master.name] = master
- else
- lutil.debugm(N, 'skip master %s with ip %s and port %s, pattern %s',
- master.name, master.ip, master.port, params.sentinel_masters_pattern)
end
- else
- lutil.debugm(N, 'found master %s with ip %s and port %s',
- master.name, master.ip, master.port)
- masters[master.name] = master
end
- end
- -- For each master we need to get a list of slaves
- for k,v in pairs(masters) do
- v.slaves = {}
- local slave_result
-
- connection:add_cmd('SENTINEL', {'slaves', k})
- ok,slave_result = connection:exec()
-
- if ok then
- for _,s in ipairs(slave_result) do
- local slave = flatten_redis_table(s)
- lutil.debugm(N, rspamd_config,
- 'found slave for master %s with ip %s and port %s',
- v.name, slave.ip, slave.port)
- -- Wrap IPv6-adresses in brackets
- if (slave.ip:match(":")) then
- slave.ip = "["..slave.ip.."]"
+ -- For each master we need to get a list of slaves
+ for k,v in pairs(masters) do
+ v.slaves = {}
+ local function slaves_cb(slave_err, slave_result)
+ if not slave_err and type(slave_result) == 'table' then
+ for _,s in ipairs(slave_result) do
+ local slave = flatten_redis_table(s)
+ lutil.debugm(N, rspamd_config,
+ 'found slave for master %s with ip %s and port %s',
+ v.name, slave.ip, slave.port)
+ -- Wrap IPv6-adresses in brackets
+ if (slave.ip:match(":")) then
+ slave.ip = "["..slave.ip.."]"
+ end
+ v.slaves[#v.slaves + 1] = slave
+ end
+ else
+ logger.errx('cannot get slaves data from Redis Sentinel %s: %s',
+ host:to_string(true), slave_err)
+ addr:fail()
+ end
+
+ pending_subrequests = pending_subrequests - 1
+
+ if pending_subrequests == 0 then
+ -- Finalize masters and slaves
+ process_masters()
end
- v.slaves[#v.slaves + 1] = slave
+ end
+
+ local ret = rspamd_redis.make_request({
+ host = addr:get_addr(),
+ timeout = params.timeout,
+ config = rspamd_config,
+ ev_base = ev_base,
+ cmd = 'SENTINEL',
+ args = {'slaves', k},
+ no_pool = true,
+ callback = slaves_cb
+ })
+
+ if not ret then
+ logger.errx(rspamd_config, 'cannot connect sentinel when query slaves at address: %s',
+ host:to_string(true))
+ addr:fail()
+ else
+ pending_subrequests = pending_subrequests + 1
end
end
+
+ addr:ok()
+ else
+ logger.errx('cannot get masters data from Redis Sentinel %s: %s',
+ host:to_string(true), err)
+ addr:fail()
end
+ end
+ local ret = rspamd_redis.make_request({
+ host = addr:get_addr(),
+ timeout = params.timeout,
+ config = rspamd_config,
+ ev_base = ev_base,
+ cmd = 'SENTINEL',
+ args = {'masters'},
+ no_pool = true,
+ callback = masters_cb,
+ })
+
+ if not ret then
+ logger.errx(rspamd_config, 'cannot connect sentinel at address: %s',
+ host:to_string(true))
+ addr:fail()
+ end
+
+ process_masters = function()
-- We now form new strings for masters and slaves
local read_servers_tbl, write_servers_tbl = {}, {}
@@ -175,7 +214,7 @@ local function redis_query_sentinel(ev_base, params, initialised)
if read_upstreams then
logger.infox(rspamd_config, 'sentinel %s: replace read servers with new list: %s',
- addr:get_addr():to_string(true), read_servers_str)
+ host:to_string(true), read_servers_str)
params.read_servers = read_upstreams
params.read_servers_str = read_servers_str
end
@@ -189,7 +228,7 @@ local function redis_query_sentinel(ev_base, params, initialised)
if write_upstreams then
logger.infox(rspamd_config, 'sentinel %s: replace write servers with new list: %s',
- addr:get_addr():to_string(true), write_servers_str)
+ host:to_string(true), write_servers_str)
params.write_servers = write_upstreams
params.write_servers_str = write_servers_str
@@ -198,7 +237,7 @@ local function redis_query_sentinel(ev_base, params, initialised)
local function monitor_failures(up, _, count)
if count > params.sentinel_master_maxerrors and not queried then
logger.infox(rspamd_config, 'sentinel: master with address %s, caused %s failures, try to query sentinel',
- up:get_addr():to_string(true), count)
+ host:to_string(true), count)
queried = true -- Avoid multiple checks caused by this monitor
redis_query_sentinel(ev_base, params, true)
end
@@ -207,12 +246,6 @@ local function redis_query_sentinel(ev_base, params, initialised)
write_upstreams:add_watcher('failure', monitor_failures)
end
end
-
- addr:ok()
- else
- logger.errx('cannot get data from Redis Sentinel %s: %s',
- addr:get_addr():to_string(true), result)
- addr:fail()
end
end