diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2020-02-14 13:19:54 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2020-02-14 13:19:54 +0000 |
commit | d0d0f333d3fd5d10bc6b88dd364cb792e326c8c2 (patch) | |
tree | 9989659cae68846c4f88f1d042ae7bb6175dabca /lualib/lua_redis.lua | |
parent | e36edd1c6074f882f11b5f47795dea8ba0460b4f (diff) | |
download | rspamd-d0d0f333d3fd5d10bc6b88dd364cb792e326c8c2.tar.gz rspamd-d0d0f333d3fd5d10bc6b88dd364cb792e326c8c2.zip |
[Fix] Fix sentinel connections leak by using async connections
Diffstat (limited to 'lualib/lua_redis.lua')
-rw-r--r-- | lualib/lua_redis.lua | 159 |
1 files changed, 96 insertions, 63 deletions
diff --git a/lualib/lua_redis.lua b/lualib/lua_redis.lua index fb98a5b9c..017381e02 100644 --- a/lualib/lua_redis.lua +++ b/lualib/lua_redis.lua @@ -66,76 +66,115 @@ local function redis_query_sentinel(ev_base, params, initialised) local rspamd_redis = require "rspamd_redis" local addr = params.sentinels:get_upstream_round_robin() - local is_ok, connection = rspamd_redis.connect_sync({ - host = addr:get_addr(), - timeout = params.timeout, - config = rspamd_config, - ev_base = ev_base, - no_pool = true, - }) - - if not is_ok then - logger.errx(rspamd_config, 'cannot connect sentinel at address: %s', - tostring(addr:get_addr())) - addr:fail() - - return - end + local host = addr:get_addr() + local masters = {} + local process_masters -- Function that is called to process masters data - -- Get masters list - connection:add_cmd('SENTINEL', {'masters'}) + local function masters_cb(err, result) + if not err and result and type(result) == 'table' then - local ok,result = connection:exec() + local pending_subrequests = 0 - if ok and result and type(result) == 'table' then - local masters = {} - for _,m in ipairs(result) do - local master = flatten_redis_table(m) + for _,m in ipairs(result) do + local master = flatten_redis_table(m) - -- Wrap IPv6-adresses in brackets - if (master.ip:match(":")) then - master.ip = "["..master.ip.."]" - end + -- Wrap IPv6-adresses in brackets + if (master.ip:match(":")) then + master.ip = "["..master.ip.."]" + end - if params.sentinel_masters_pattern then - if master.name:match(params.sentinel_masters_pattern) then + if params.sentinel_masters_pattern then + if master.name:match(params.sentinel_masters_pattern) then + lutil.debugm(N, 'found master %s with ip %s and port %s', + master.name, master.ip, master.port) + masters[master.name] = master + else + lutil.debugm(N, 'skip master %s with ip %s and port %s, pattern %s', + master.name, master.ip, master.port, params.sentinel_masters_pattern) + end + else lutil.debugm(N, 'found master %s with ip %s and port %s', master.name, master.ip, master.port) masters[master.name] = master - else - lutil.debugm(N, 'skip master %s with ip %s and port %s, pattern %s', - master.name, master.ip, master.port, params.sentinel_masters_pattern) end - else - lutil.debugm(N, 'found master %s with ip %s and port %s', - master.name, master.ip, master.port) - masters[master.name] = master end - end - -- For each master we need to get a list of slaves - for k,v in pairs(masters) do - v.slaves = {} - local slave_result - - connection:add_cmd('SENTINEL', {'slaves', k}) - ok,slave_result = connection:exec() - - if ok then - for _,s in ipairs(slave_result) do - local slave = flatten_redis_table(s) - lutil.debugm(N, rspamd_config, - 'found slave for master %s with ip %s and port %s', - v.name, slave.ip, slave.port) - -- Wrap IPv6-adresses in brackets - if (slave.ip:match(":")) then - slave.ip = "["..slave.ip.."]" + -- For each master we need to get a list of slaves + for k,v in pairs(masters) do + v.slaves = {} + local function slaves_cb(slave_err, slave_result) + if not slave_err and type(slave_result) == 'table' then + for _,s in ipairs(slave_result) do + local slave = flatten_redis_table(s) + lutil.debugm(N, rspamd_config, + 'found slave for master %s with ip %s and port %s', + v.name, slave.ip, slave.port) + -- Wrap IPv6-adresses in brackets + if (slave.ip:match(":")) then + slave.ip = "["..slave.ip.."]" + end + v.slaves[#v.slaves + 1] = slave + end + else + logger.errx('cannot get slaves data from Redis Sentinel %s: %s', + host:to_string(true), slave_err) + addr:fail() + end + + pending_subrequests = pending_subrequests - 1 + + if pending_subrequests == 0 then + -- Finalize masters and slaves + process_masters() end - v.slaves[#v.slaves + 1] = slave + end + + local ret = rspamd_redis.make_request({ + host = addr:get_addr(), + timeout = params.timeout, + config = rspamd_config, + ev_base = ev_base, + cmd = 'SENTINEL', + args = {'slaves', k}, + no_pool = true, + callback = slaves_cb + }) + + if not ret then + logger.errx(rspamd_config, 'cannot connect sentinel when query slaves at address: %s', + host:to_string(true)) + addr:fail() + else + pending_subrequests = pending_subrequests + 1 end end + + addr:ok() + else + logger.errx('cannot get masters data from Redis Sentinel %s: %s', + host:to_string(true), err) + addr:fail() end + end + local ret = rspamd_redis.make_request({ + host = addr:get_addr(), + timeout = params.timeout, + config = rspamd_config, + ev_base = ev_base, + cmd = 'SENTINEL', + args = {'masters'}, + no_pool = true, + callback = masters_cb, + }) + + if not ret then + logger.errx(rspamd_config, 'cannot connect sentinel at address: %s', + host:to_string(true)) + addr:fail() + end + + process_masters = function() -- We now form new strings for masters and slaves local read_servers_tbl, write_servers_tbl = {}, {} @@ -175,7 +214,7 @@ local function redis_query_sentinel(ev_base, params, initialised) if read_upstreams then logger.infox(rspamd_config, 'sentinel %s: replace read servers with new list: %s', - addr:get_addr():to_string(true), read_servers_str) + host:to_string(true), read_servers_str) params.read_servers = read_upstreams params.read_servers_str = read_servers_str end @@ -189,7 +228,7 @@ local function redis_query_sentinel(ev_base, params, initialised) if write_upstreams then logger.infox(rspamd_config, 'sentinel %s: replace write servers with new list: %s', - addr:get_addr():to_string(true), write_servers_str) + host:to_string(true), write_servers_str) params.write_servers = write_upstreams params.write_servers_str = write_servers_str @@ -198,7 +237,7 @@ local function redis_query_sentinel(ev_base, params, initialised) local function monitor_failures(up, _, count) if count > params.sentinel_master_maxerrors and not queried then logger.infox(rspamd_config, 'sentinel: master with address %s, caused %s failures, try to query sentinel', - up:get_addr():to_string(true), count) + host:to_string(true), count) queried = true -- Avoid multiple checks caused by this monitor redis_query_sentinel(ev_base, params, true) end @@ -207,12 +246,6 @@ local function redis_query_sentinel(ev_base, params, initialised) write_upstreams:add_watcher('failure', monitor_failures) end end - - addr:ok() - else - logger.errx('cannot get data from Redis Sentinel %s: %s', - addr:get_addr():to_string(true), result) - addr:fail() end end |