aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lualib/lua_redis.lua196
1 files changed, 129 insertions, 67 deletions
diff --git a/lualib/lua_redis.lua b/lualib/lua_redis.lua
index 818d955e9..241a58ce1 100644
--- a/lualib/lua_redis.lua
+++ b/lualib/lua_redis.lua
@@ -1162,8 +1162,11 @@ local function prepare_redis_call(script)
end
-- Call load script on each server, set loaded flag
- script.in_flight = #servers
- for _, s in ipairs(servers) do
+ if not script.servers_ready then
+ script.servers_ready = { } -- possible values for each server: unsent, tempfail, failed, done
+ end
+
+ for idx, s in ipairs(servers) do
local cur_opts = {
host = s:get_addr(),
timeout = script.redis_params['timeout'],
@@ -1172,6 +1175,11 @@ local function prepare_redis_call(script)
upstream = s
}
+ -- By default we start from unsent status
+ if not script.servers_ready[idx] then
+ script.servers_ready[idx] = "unsent"
+ end
+
if script.redis_params['username'] then
cur_opts['username'] = script.redis_params['username']
end
@@ -1190,46 +1198,81 @@ local function prepare_redis_call(script)
return options
end
+local function is_all_servers_ready(script)
+ for _, s in ipairs(script.servers_ready) do
+ if s == "unsent" or s == "tempfail" then
+ return false
+ end
+ end
+
+ -- We assume that permanent errors are not recoverable, so we will just skip those servers
+ return true
+end
+
+local function is_all_servers_failed(script)
+ for _, s in ipairs(script.servers_ready) do
+ if s == "unsent" or s == "tempfail" or s == "done" then
+ return false
+ end
+ end
+
+ return true
+end
+
local function load_script_task(script, task, is_write)
local rspamd_redis = require "rspamd_redis"
local opts = prepare_redis_call(script)
- for _, opt in ipairs(opts) do
- opt.task = task
- opt.is_write = is_write
- opt.callback = function(err, data)
- if err then
- logger.errx(task, 'cannot upload script to %s: %s; registered from: %s:%s',
- opt.upstream:get_addr():to_string(true),
- err, script.caller.short_src, script.caller.currentline)
- opt.upstream:fail()
- script.fatal_error = err
- else
- opt.upstream:ok()
- logger.infox(task,
- "uploaded redis script to %s %s %s, sha: %s",
- opt.upstream:get_addr():to_string(true),
- script.filename and "from file" or "with id", script.filename or script.id, data)
- script.sha = data -- We assume that sha is the same on all servers
- end
- script.in_flight = script.in_flight - 1
-
- if script.in_flight == 0 then
- script_set_loaded(script)
- end
- end
+ for idx, opt in ipairs(opts) do
+ if script.servers_ready[idx] ~= 'done' then
+ opt.task = task
+ opt.is_write = is_write
+ opt.callback = function(err, data)
+ if err then
+ if string.match(err, 'ERR') then
+ logger.errx(task, 'cannot upload script to %s: %s; registered from: %s:%s',
+ opt.upstream:get_addr():to_string(true),
+ err, script.caller.short_src, script.caller.currentline)
+ opt.upstream:fail()
+ script.servers_ready[idx] = "failed"
+ return
+ else
+ -- Assume temporary error
+ logger.infox(task, 'temporary error uploading script to %s: %s; registered from: %s:%s',
+ opt.upstream:get_addr():to_string(true),
+ err, script.caller.short_src, script.caller.currentline)
+ script.servers_ready[idx] = "tempfail"
+ return
+ end
+ else
+ opt.upstream:ok()
+ logger.infox(task,
+ "uploaded redis script to %s %s %s, sha: %s",
+ opt.upstream:get_addr():to_string(true),
+ script.filename and "from file" or "with id", script.filename or script.id, data)
+ script.sha = data -- We assume that sha is the same on all servers
+ end
+ if is_all_servers_ready(script) then
+ script_set_loaded(script)
+ elseif is_all_servers_failed(script) then
+ script.fatal_error = "cannot upload script to any server"
+ end
+ end -- callback
- local ret = rspamd_redis.make_request(opt)
+ local ret = rspamd_redis.make_request(opt)
- if not ret then
- logger.errx('cannot execute redis request to load script on %s',
- opt.upstream:get_addr())
- script.in_flight = script.in_flight - 1
- opt.upstream:fail()
+ if not ret then
+ logger.errx('cannot execute redis request to load script on %s',
+ opt.upstream:get_addr())
+ script.servers_ready[idx] = "failed"
+ opt.upstream:fail()
+ end
end
- if script.in_flight == 0 then
+ if is_all_servers_ready(script) then
script_set_loaded(script)
+ elseif is_all_servers_failed(script) then
+ script.fatal_error = "cannot upload script to any server"
end
end
end
@@ -1238,44 +1281,58 @@ local function load_script_taskless(script, cfg, ev_base, is_write)
local rspamd_redis = require "rspamd_redis"
local opts = prepare_redis_call(script)
- for _, opt in ipairs(opts) do
- opt.config = cfg
- opt.ev_base = ev_base
- opt.is_write = is_write
- opt.callback = function(err, data)
- if err then
- logger.errx(cfg, 'cannot upload script to %s: %s; registered from: %s:%s, filename: %s',
- opt.upstream:get_addr():to_string(true),
- err, script.caller.short_src, script.caller.currentline, script.filename)
- opt.upstream:fail()
- script.fatal_error = err
- else
- opt.upstream:ok()
- logger.infox(cfg,
- "uploaded redis script to %s %s %s, sha: %s",
- opt.upstream:get_addr():to_string(true),
- script.filename and "from file" or "with id", script.filename or script.id,
- data)
- script.sha = data -- We assume that sha is the same on all servers
- script.fatal_error = nil
- end
- script.in_flight = script.in_flight - 1
+ for idx, opt in ipairs(opts) do
+ if script.servers_ready[idx] ~= 'done' then
+ opt.config = cfg
+ opt.ev_base = ev_base
+ opt.is_write = is_write
+ opt.callback = function(err, data)
+ if err then
+ if string.match(err, 'ERR') then
+ logger.errx(cfg, 'cannot upload script to %s: %s; registered from: %s:%s',
+ opt.upstream:get_addr():to_string(true),
+ err, script.caller.short_src, script.caller.currentline)
+ opt.upstream:fail()
+ script.servers_ready[idx] = "failed"
+ return
+ else
+ -- Assume temporary error
+ logger.infox(cfg, 'temporary error uploading script to %s: %s; registered from: %s:%s',
+ opt.upstream:get_addr():to_string(true),
+ err, script.caller.short_src, script.caller.currentline)
+ script.servers_ready[idx] = "tempfail"
+ return
+ end
+ else
+ opt.upstream:ok()
+ logger.infox(cfg,
+ "uploaded redis script to %s %s %s, sha: %s",
+ opt.upstream:get_addr():to_string(true),
+ script.filename and "from file" or "with id", script.filename or script.id,
+ data)
+ script.sha = data -- We assume that sha is the same on all servers
+ end
- if script.in_flight == 0 then
- script_set_loaded(script)
+ if is_all_servers_ready(script) then
+ script_set_loaded(script)
+ elseif is_all_servers_failed(script) then
+ script.fatal_error = "cannot upload script to any server"
+ end
end
- end
- local ret = rspamd_redis.make_request(opt)
+ local ret = rspamd_redis.make_request(opt)
- if not ret then
- logger.errx('cannot execute redis request to load script on %s',
- opt.upstream:get_addr())
- script.in_flight = script.in_flight - 1
- opt.upstream:fail()
+ if not ret then
+ logger.errx('cannot execute redis request to load script on %s',
+ opt.upstream:get_addr())
+ script.servers_ready[idx] = "failed"
+ opt.upstream:fail()
+ end
end
- if script.in_flight == 0 then
+ if is_all_servers_ready(script) then
script_set_loaded(script)
+ elseif is_all_servers_failed(script) then
+ script.fatal_error = "cannot upload script to any server"
end
end
end
@@ -1306,12 +1363,17 @@ local function add_redis_script(script, redis_params, caller_level, maybe_filena
rspamd_config:add_on_load(function(cfg, ev_base, worker)
local mult = 0.0
rspamd_config:add_periodic(ev_base, 0.0, function()
- if not new_script.sha then
+ if not new_script.sha and not new_script.fatal_error then
load_redis_script(new_script, cfg, ev_base, worker)
- mult = mult + 1
+
+ -- Do not wait for more than 10 seconds to retry
+ if mult < 10 then
+ mult = mult + 1
+ end
return 1.0 * mult -- Check one more time in one second
end
+ -- All loaded, we are good to go
return false
end, false)
end)