aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2024-08-01 12:13:07 +0100
committerVsevolod Stakhov <vsevolod@rspamd.com>2024-08-01 12:13:07 +0100
commit8058356806ae01c4d7e0b03d03f8d80bb2a4a56b (patch)
tree499eb6f842e6336d73abadfcb8d2bbbdad202849
parent0b547ff8e801979b2f1ea42b1b686476f10341cf (diff)
downloadrspamd-8058356806ae01c4d7e0b03d03f8d80bb2a4a56b.tar.gz
rspamd-8058356806ae01c4d7e0b03d03f8d80bb2a4a56b.zip
[Fix] Fix Redis scripts uploading when Redis is not ready
Initially, there was no way to recover from Redis errors that are temporary by nature (e.g. when Redis was busy with loading database). This PR adds logic to check returned reply and adds more fine-grained errors-per-server handling.
-rw-r--r--lualib/lua_redis.lua196
1 files changed, 129 insertions, 67 deletions
diff --git a/lualib/lua_redis.lua b/lualib/lua_redis.lua
index 818d955e9..241a58ce1 100644
--- a/lualib/lua_redis.lua
+++ b/lualib/lua_redis.lua
@@ -1162,8 +1162,11 @@ local function prepare_redis_call(script)
end
-- Call load script on each server, set loaded flag
- script.in_flight = #servers
- for _, s in ipairs(servers) do
+ if not script.servers_ready then
+ script.servers_ready = { } -- possible values for each server: unsent, tempfail, failed, done
+ end
+
+ for idx, s in ipairs(servers) do
local cur_opts = {
host = s:get_addr(),
timeout = script.redis_params['timeout'],
@@ -1172,6 +1175,11 @@ local function prepare_redis_call(script)
upstream = s
}
+ -- By default we start from unsent status
+ if not script.servers_ready[idx] then
+ script.servers_ready[idx] = "unsent"
+ end
+
if script.redis_params['username'] then
cur_opts['username'] = script.redis_params['username']
end
@@ -1190,46 +1198,81 @@ local function prepare_redis_call(script)
return options
end
+local function is_all_servers_ready(script)
+ for _, s in ipairs(script.servers_ready) do
+ if s == "unsent" or s == "tempfail" then
+ return false
+ end
+ end
+
+ -- We assume that permanent errors are not recoverable, so we will just skip those servers
+ return true
+end
+
+local function is_all_servers_failed(script)
+ for _, s in ipairs(script.servers_ready) do
+ if s == "unsent" or s == "tempfail" or s == "done" then
+ return false
+ end
+ end
+
+ return true
+end
+
local function load_script_task(script, task, is_write)
local rspamd_redis = require "rspamd_redis"
local opts = prepare_redis_call(script)
- for _, opt in ipairs(opts) do
- opt.task = task
- opt.is_write = is_write
- opt.callback = function(err, data)
- if err then
- logger.errx(task, 'cannot upload script to %s: %s; registered from: %s:%s',
- opt.upstream:get_addr():to_string(true),
- err, script.caller.short_src, script.caller.currentline)
- opt.upstream:fail()
- script.fatal_error = err
- else
- opt.upstream:ok()
- logger.infox(task,
- "uploaded redis script to %s %s %s, sha: %s",
- opt.upstream:get_addr():to_string(true),
- script.filename and "from file" or "with id", script.filename or script.id, data)
- script.sha = data -- We assume that sha is the same on all servers
- end
- script.in_flight = script.in_flight - 1
-
- if script.in_flight == 0 then
- script_set_loaded(script)
- end
- end
+ for idx, opt in ipairs(opts) do
+ if script.servers_ready[idx] ~= 'done' then
+ opt.task = task
+ opt.is_write = is_write
+ opt.callback = function(err, data)
+ if err then
+ if string.match(err, 'ERR') then
+ logger.errx(task, 'cannot upload script to %s: %s; registered from: %s:%s',
+ opt.upstream:get_addr():to_string(true),
+ err, script.caller.short_src, script.caller.currentline)
+ opt.upstream:fail()
+ script.servers_ready[idx] = "failed"
+ return
+ else
+ -- Assume temporary error
+ logger.infox(task, 'temporary error uploading script to %s: %s; registered from: %s:%s',
+ opt.upstream:get_addr():to_string(true),
+ err, script.caller.short_src, script.caller.currentline)
+ script.servers_ready[idx] = "tempfail"
+ return
+ end
+ else
+ opt.upstream:ok()
+ logger.infox(task,
+ "uploaded redis script to %s %s %s, sha: %s",
+ opt.upstream:get_addr():to_string(true),
+ script.filename and "from file" or "with id", script.filename or script.id, data)
+ script.sha = data -- We assume that sha is the same on all servers
+ end
+ if is_all_servers_ready(script) then
+ script_set_loaded(script)
+ elseif is_all_servers_failed(script) then
+ script.fatal_error = "cannot upload script to any server"
+ end
+ end -- callback
- local ret = rspamd_redis.make_request(opt)
+ local ret = rspamd_redis.make_request(opt)
- if not ret then
- logger.errx('cannot execute redis request to load script on %s',
- opt.upstream:get_addr())
- script.in_flight = script.in_flight - 1
- opt.upstream:fail()
+ if not ret then
+ logger.errx('cannot execute redis request to load script on %s',
+ opt.upstream:get_addr())
+ script.servers_ready[idx] = "failed"
+ opt.upstream:fail()
+ end
end
- if script.in_flight == 0 then
+ if is_all_servers_ready(script) then
script_set_loaded(script)
+ elseif is_all_servers_failed(script) then
+ script.fatal_error = "cannot upload script to any server"
end
end
end
@@ -1238,44 +1281,58 @@ local function load_script_taskless(script, cfg, ev_base, is_write)
local rspamd_redis = require "rspamd_redis"
local opts = prepare_redis_call(script)
- for _, opt in ipairs(opts) do
- opt.config = cfg
- opt.ev_base = ev_base
- opt.is_write = is_write
- opt.callback = function(err, data)
- if err then
- logger.errx(cfg, 'cannot upload script to %s: %s; registered from: %s:%s, filename: %s',
- opt.upstream:get_addr():to_string(true),
- err, script.caller.short_src, script.caller.currentline, script.filename)
- opt.upstream:fail()
- script.fatal_error = err
- else
- opt.upstream:ok()
- logger.infox(cfg,
- "uploaded redis script to %s %s %s, sha: %s",
- opt.upstream:get_addr():to_string(true),
- script.filename and "from file" or "with id", script.filename or script.id,
- data)
- script.sha = data -- We assume that sha is the same on all servers
- script.fatal_error = nil
- end
- script.in_flight = script.in_flight - 1
+ for idx, opt in ipairs(opts) do
+ if script.servers_ready[idx] ~= 'done' then
+ opt.config = cfg
+ opt.ev_base = ev_base
+ opt.is_write = is_write
+ opt.callback = function(err, data)
+ if err then
+ if string.match(err, 'ERR') then
+ logger.errx(cfg, 'cannot upload script to %s: %s; registered from: %s:%s',
+ opt.upstream:get_addr():to_string(true),
+ err, script.caller.short_src, script.caller.currentline)
+ opt.upstream:fail()
+ script.servers_ready[idx] = "failed"
+ return
+ else
+ -- Assume temporary error
+ logger.infox(cfg, 'temporary error uploading script to %s: %s; registered from: %s:%s',
+ opt.upstream:get_addr():to_string(true),
+ err, script.caller.short_src, script.caller.currentline)
+ script.servers_ready[idx] = "tempfail"
+ return
+ end
+ else
+ opt.upstream:ok()
+ logger.infox(cfg,
+ "uploaded redis script to %s %s %s, sha: %s",
+ opt.upstream:get_addr():to_string(true),
+ script.filename and "from file" or "with id", script.filename or script.id,
+ data)
+ script.sha = data -- We assume that sha is the same on all servers
+ end
- if script.in_flight == 0 then
- script_set_loaded(script)
+ if is_all_servers_ready(script) then
+ script_set_loaded(script)
+ elseif is_all_servers_failed(script) then
+ script.fatal_error = "cannot upload script to any server"
+ end
end
- end
- local ret = rspamd_redis.make_request(opt)
+ local ret = rspamd_redis.make_request(opt)
- if not ret then
- logger.errx('cannot execute redis request to load script on %s',
- opt.upstream:get_addr())
- script.in_flight = script.in_flight - 1
- opt.upstream:fail()
+ if not ret then
+ logger.errx('cannot execute redis request to load script on %s',
+ opt.upstream:get_addr())
+ script.servers_ready[idx] = "failed"
+ opt.upstream:fail()
+ end
end
- if script.in_flight == 0 then
+ if is_all_servers_ready(script) then
script_set_loaded(script)
+ elseif is_all_servers_failed(script) then
+ script.fatal_error = "cannot upload script to any server"
end
end
end
@@ -1306,12 +1363,17 @@ local function add_redis_script(script, redis_params, caller_level, maybe_filena
rspamd_config:add_on_load(function(cfg, ev_base, worker)
local mult = 0.0
rspamd_config:add_periodic(ev_base, 0.0, function()
- if not new_script.sha then
+ if not new_script.sha and not new_script.fatal_error then
load_redis_script(new_script, cfg, ev_base, worker)
- mult = mult + 1
+
+ -- Do not wait for more than 10 seconds to retry
+ if mult < 10 then
+ mult = mult + 1
+ end
return 1.0 * mult -- Check one more time in one second
end
+ -- All loaded, we are good to go
return false
end, false)
end)