aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rspamd.com>2024-08-01 20:48:50 +0600
committerGitHub <noreply@github.com>2024-08-01 20:48:50 +0600
commite7a400d2eee707f2615009bc748bd9e21428cd9c (patch)
treecc2db267d3e5e15667a4a53fb180e970ae881409
parent9265435a9b7fdd132c00767331c85b60dedd7ecf (diff)
parente7dea536e351caa1b97baa3dd5c298ae30461242 (diff)
downloadrspamd-e7a400d2eee707f2615009bc748bd9e21428cd9c.tar.gz
rspamd-e7a400d2eee707f2615009bc748bd9e21428cd9c.zip
Merge pull request #5091 from rspamd/vstakhov-redis-fix-scripts-loading
[Fix] Fix Redis scripts uploading when Redis is not ready
-rw-r--r--lualib/lua_redis.lua214
1 files changed, 146 insertions, 68 deletions
diff --git a/lualib/lua_redis.lua b/lualib/lua_redis.lua
index 818d955e9..2c77c100a 100644
--- a/lualib/lua_redis.lua
+++ b/lualib/lua_redis.lua
@@ -1162,8 +1162,11 @@ local function prepare_redis_call(script)
end
-- Call load script on each server, set loaded flag
- script.in_flight = #servers
- for _, s in ipairs(servers) do
+ if not script.servers_ready then
+ script.servers_ready = { } -- possible values for each server: unsent, tempfail, failed, done
+ end
+
+ for idx, s in ipairs(servers) do
local cur_opts = {
host = s:get_addr(),
timeout = script.redis_params['timeout'],
@@ -1172,6 +1175,11 @@ local function prepare_redis_call(script)
upstream = s
}
+ -- By default we start from unsent status
+ if not script.servers_ready[idx] then
+ script.servers_ready[idx] = "unsent"
+ end
+
if script.redis_params['username'] then
cur_opts['username'] = script.redis_params['username']
end
@@ -1190,46 +1198,89 @@ local function prepare_redis_call(script)
return options
end
+local function is_all_servers_ready(script)
+ for _, s in ipairs(script.servers_ready) do
+ if s == "unsent" or s == "tempfail" then
+ return false
+ end
+ end
+
+ -- We assume that permanent errors are not recoverable, so we will just skip those servers
+ return true
+end
+
+local function is_all_servers_failed(script)
+ for _, s in ipairs(script.servers_ready) do
+ if s == "unsent" or s == "tempfail" or s == "done" then
+ return false
+ end
+ end
+
+ return true
+end
+
+local function script_description(script)
+ return script.filename and ("from file: " .. script.filename)
+ or ("with id: " .. script.id)
+end
+
local function load_script_task(script, task, is_write)
local rspamd_redis = require "rspamd_redis"
local opts = prepare_redis_call(script)
- for _, opt in ipairs(opts) do
- opt.task = task
- opt.is_write = is_write
- opt.callback = function(err, data)
- if err then
- logger.errx(task, 'cannot upload script to %s: %s; registered from: %s:%s',
- opt.upstream:get_addr():to_string(true),
- err, script.caller.short_src, script.caller.currentline)
- opt.upstream:fail()
- script.fatal_error = err
- else
- opt.upstream:ok()
- logger.infox(task,
- "uploaded redis script to %s %s %s, sha: %s",
- opt.upstream:get_addr():to_string(true),
- script.filename and "from file" or "with id", script.filename or script.id, data)
- script.sha = data -- We assume that sha is the same on all servers
- end
- script.in_flight = script.in_flight - 1
-
- if script.in_flight == 0 then
- script_set_loaded(script)
- end
- end
+ for idx, opt in ipairs(opts) do
+ if script.servers_ready[idx] ~= 'done' then
+ opt.task = task
+ opt.is_write = is_write
+ opt.callback = function(err, data)
+ if err then
+ if string.match(err, 'ERR') then
+ logger.errx(task, 'cannot upload script %s to %s: %s; registered from: %s:%s',
+ script_description(script),
+ opt.upstream:get_addr():to_string(true),
+ err, script.caller.short_src, script.caller.currentline)
+ opt.upstream:fail()
+ script.servers_ready[idx] = "failed"
+ return
+ else
+ -- Assume temporary error
+ logger.infox(task, 'temporary error uploading script %s to %s: %s; registered from: %s:%s',
+ script_description(script),
+ opt.upstream:get_addr():to_string(true),
+ err, script.caller.short_src, script.caller.currentline)
+ script.servers_ready[idx] = "tempfail"
+ return
+ end
+ else
+ opt.upstream:ok()
+ logger.infox(task,
+ "uploaded redis script to %s %s, sha: %s",
+ opt.upstream:get_addr():to_string(true),
+ script_description(script), data)
+ script.sha = data -- We assume that sha is the same on all servers
+ script.servers_ready[idx] = "done"
+ end
+ if is_all_servers_ready(script) then
+ script_set_loaded(script)
+ elseif is_all_servers_failed(script) then
+ script.fatal_error = "cannot upload script to any server"
+ end
+ end -- callback
- local ret = rspamd_redis.make_request(opt)
+ local ret = rspamd_redis.make_request(opt)
- if not ret then
- logger.errx('cannot execute redis request to load script on %s',
- opt.upstream:get_addr())
- script.in_flight = script.in_flight - 1
- opt.upstream:fail()
+ if not ret then
+ logger.errx('cannot execute redis request to load script on %s',
+ opt.upstream:get_addr())
+ script.servers_ready[idx] = "failed"
+ opt.upstream:fail()
+ end
end
- if script.in_flight == 0 then
+ if is_all_servers_ready(script) then
script_set_loaded(script)
+ elseif is_all_servers_failed(script) then
+ script.fatal_error = "cannot upload script to any server"
end
end
end
@@ -1238,44 +1289,62 @@ local function load_script_taskless(script, cfg, ev_base, is_write)
local rspamd_redis = require "rspamd_redis"
local opts = prepare_redis_call(script)
- for _, opt in ipairs(opts) do
- opt.config = cfg
- opt.ev_base = ev_base
- opt.is_write = is_write
- opt.callback = function(err, data)
- if err then
- logger.errx(cfg, 'cannot upload script to %s: %s; registered from: %s:%s, filename: %s',
- opt.upstream:get_addr():to_string(true),
- err, script.caller.short_src, script.caller.currentline, script.filename)
- opt.upstream:fail()
- script.fatal_error = err
- else
- opt.upstream:ok()
- logger.infox(cfg,
- "uploaded redis script to %s %s %s, sha: %s",
- opt.upstream:get_addr():to_string(true),
- script.filename and "from file" or "with id", script.filename or script.id,
- data)
- script.sha = data -- We assume that sha is the same on all servers
- script.fatal_error = nil
- end
- script.in_flight = script.in_flight - 1
+ for idx, opt in ipairs(opts) do
+ if script.servers_ready[idx] ~= 'done' then
+ opt.config = cfg
+ opt.ev_base = ev_base
+ opt.is_write = is_write
+ opt.callback = function(err, data)
+ if err then
+ if string.match(err, 'ERR') then
+ logger.errx(cfg, 'cannot upload script %s to %s: %s; registered from: %s:%s',
+ script_description(script),
+ opt.upstream:get_addr():to_string(true),
+ err, script.caller.short_src, script.caller.currentline)
+ opt.upstream:fail()
+ script.servers_ready[idx] = "failed"
+ return
+ else
+ -- Assume temporary error
+ logger.infox(cfg, 'temporary error uploading script %s to %s: %s; registered from: %s:%s',
+ script_description(script),
+ opt.upstream:get_addr():to_string(true),
+ err, script.caller.short_src, script.caller.currentline)
+ script.servers_ready[idx] = "tempfail"
+ return
+ end
+ else
+ opt.upstream:ok()
+ logger.infox(cfg,
+ "uploaded redis script %s to %s, sha: %s",
+ script_description(script),
+ opt.upstream:get_addr():to_string(true),
+ data)
+ script.sha = data -- We assume that sha is the same on all servers
+ script.servers_ready[idx] = "done"
+ end
- if script.in_flight == 0 then
- script_set_loaded(script)
+ if is_all_servers_ready(script) then
+ script_set_loaded(script)
+ elseif is_all_servers_failed(script) then
+ script.fatal_error = "cannot upload script to any server"
+ end
end
- end
- local ret = rspamd_redis.make_request(opt)
+ local ret = rspamd_redis.make_request(opt)
- if not ret then
- logger.errx('cannot execute redis request to load script on %s',
- opt.upstream:get_addr())
- script.in_flight = script.in_flight - 1
- opt.upstream:fail()
+ if not ret then
+ logger.errx('cannot execute redis request to load script %s on %s',
+ script_description(script),
+ opt.upstream:get_addr())
+ script.servers_ready[idx] = "failed"
+ opt.upstream:fail()
+ end
end
- if script.in_flight == 0 then
+ if is_all_servers_ready(script) then
script_set_loaded(script)
+ elseif is_all_servers_failed(script) then
+ script.fatal_error = "cannot upload script " .. script_description(script) .. " to any server"
end
end
end
@@ -1306,12 +1375,17 @@ local function add_redis_script(script, redis_params, caller_level, maybe_filena
rspamd_config:add_on_load(function(cfg, ev_base, worker)
local mult = 0.0
rspamd_config:add_periodic(ev_base, 0.0, function()
- if not new_script.sha then
+ if not new_script.sha and not new_script.fatal_error then
load_redis_script(new_script, cfg, ev_base, worker)
- mult = mult + 1
+
+ -- Do not wait for more than 10 seconds to retry
+ if mult < 10 then
+ mult = mult + 1
+ end
return 1.0 * mult -- Check one more time in one second
end
+ -- All loaded, we are good to go
return false
end, false)
end)
@@ -1386,10 +1460,12 @@ local function exec_redis_script(id, params, callback, keys, args)
callback(err, data)
elseif string.match(err, 'NOSCRIPT') then
-- Schedule restart
+ logger.infox(params.task, 'redis script %s is not loaded (NOSCRIPT returned), scheduling reload',
+ script_description(script))
script.sha = nil
if can_reload then
table.insert(script.waitq, do_call)
- if script.in_flight == 0 then
+ if not script.servers_ready then
-- Reload scripts if this has not been initiated yet
if params.task then
load_script_task(script, params.task)
@@ -1437,6 +1513,8 @@ local function exec_redis_script(id, params, callback, keys, args)
do_call(true)
else
-- Delayed until scripts are loaded
+ logger.infox(params.task or rspamd_config, 'redis script %s is not loaded, trying to load',
+ script_description(script))
if not params.task then
table.insert(script.waitq, do_call)
else