From: Vsevolod Stakhov Date: Thu, 1 Aug 2024 11:13:07 +0000 (+0100) Subject: [Fix] Fix Redis scripts uploading when Redis is not ready X-Git-Tag: 3.10.0~65^2~4^2~2 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=8058356806ae01c4d7e0b03d03f8d80bb2a4a56b;p=rspamd.git [Fix] Fix Redis scripts uploading when Redis is not ready Initially, there was no way to recover from Redis errors that are temporary by nature (e.g. when Redis was busy with loading database). This PR adds logic to check returned reply and adds more fine-grained errors-per-server handling. --- diff --git a/lualib/lua_redis.lua b/lualib/lua_redis.lua index 818d955e9..241a58ce1 100644 --- a/lualib/lua_redis.lua +++ b/lualib/lua_redis.lua @@ -1162,8 +1162,11 @@ local function prepare_redis_call(script) end -- Call load script on each server, set loaded flag - script.in_flight = #servers - for _, s in ipairs(servers) do + if not script.servers_ready then + script.servers_ready = { } -- possible values for each server: unsent, tempfail, failed, done + end + + for idx, s in ipairs(servers) do local cur_opts = { host = s:get_addr(), timeout = script.redis_params['timeout'], @@ -1172,6 +1175,11 @@ local function prepare_redis_call(script) upstream = s } + -- By default we start from unsent status + if not script.servers_ready[idx] then + script.servers_ready[idx] = "unsent" + end + if script.redis_params['username'] then cur_opts['username'] = script.redis_params['username'] end @@ -1190,46 +1198,81 @@ local function prepare_redis_call(script) return options end +local function is_all_servers_ready(script) + for _, s in ipairs(script.servers_ready) do + if s == "unsent" or s == "tempfail" then + return false + end + end + + -- We assume that permanent errors are not recoverable, so we will just skip those servers + return true +end + +local function is_all_servers_failed(script) + for _, s in ipairs(script.servers_ready) do + if s == "unsent" or s == "tempfail" or s == "done" then + return false + end + end + + return true +end + local function load_script_task(script, task, is_write) local rspamd_redis = require "rspamd_redis" local opts = prepare_redis_call(script) - for _, opt in ipairs(opts) do - opt.task = task - opt.is_write = is_write - opt.callback = function(err, data) - if err then - logger.errx(task, 'cannot upload script to %s: %s; registered from: %s:%s', - opt.upstream:get_addr():to_string(true), - err, script.caller.short_src, script.caller.currentline) - opt.upstream:fail() - script.fatal_error = err - else - opt.upstream:ok() - logger.infox(task, - "uploaded redis script to %s %s %s, sha: %s", - opt.upstream:get_addr():to_string(true), - script.filename and "from file" or "with id", script.filename or script.id, data) - script.sha = data -- We assume that sha is the same on all servers - end - script.in_flight = script.in_flight - 1 - - if script.in_flight == 0 then - script_set_loaded(script) - end - end + for idx, opt in ipairs(opts) do + if script.servers_ready[idx] ~= 'done' then + opt.task = task + opt.is_write = is_write + opt.callback = function(err, data) + if err then + if string.match(err, 'ERR') then + logger.errx(task, 'cannot upload script to %s: %s; registered from: %s:%s', + opt.upstream:get_addr():to_string(true), + err, script.caller.short_src, script.caller.currentline) + opt.upstream:fail() + script.servers_ready[idx] = "failed" + return + else + -- Assume temporary error + logger.infox(task, 'temporary error uploading script to %s: %s; registered from: %s:%s', + opt.upstream:get_addr():to_string(true), + err, script.caller.short_src, script.caller.currentline) + script.servers_ready[idx] = "tempfail" + return + end + else + opt.upstream:ok() + logger.infox(task, + "uploaded redis script to %s %s %s, sha: %s", + opt.upstream:get_addr():to_string(true), + script.filename and "from file" or "with id", script.filename or script.id, data) + script.sha = data -- We assume that sha is the same on all servers + end + if is_all_servers_ready(script) then + script_set_loaded(script) + elseif is_all_servers_failed(script) then + script.fatal_error = "cannot upload script to any server" + end + end -- callback - local ret = rspamd_redis.make_request(opt) + local ret = rspamd_redis.make_request(opt) - if not ret then - logger.errx('cannot execute redis request to load script on %s', - opt.upstream:get_addr()) - script.in_flight = script.in_flight - 1 - opt.upstream:fail() + if not ret then + logger.errx('cannot execute redis request to load script on %s', + opt.upstream:get_addr()) + script.servers_ready[idx] = "failed" + opt.upstream:fail() + end end - if script.in_flight == 0 then + if is_all_servers_ready(script) then script_set_loaded(script) + elseif is_all_servers_failed(script) then + script.fatal_error = "cannot upload script to any server" end end end @@ -1238,44 +1281,58 @@ local function load_script_taskless(script, cfg, ev_base, is_write) local rspamd_redis = require "rspamd_redis" local opts = prepare_redis_call(script) - for _, opt in ipairs(opts) do - opt.config = cfg - opt.ev_base = ev_base - opt.is_write = is_write - opt.callback = function(err, data) - if err then - logger.errx(cfg, 'cannot upload script to %s: %s; registered from: %s:%s, filename: %s', - opt.upstream:get_addr():to_string(true), - err, script.caller.short_src, script.caller.currentline, script.filename) - opt.upstream:fail() - script.fatal_error = err - else - opt.upstream:ok() - logger.infox(cfg, - "uploaded redis script to %s %s %s, sha: %s", - opt.upstream:get_addr():to_string(true), - script.filename and "from file" or "with id", script.filename or script.id, - data) - script.sha = data -- We assume that sha is the same on all servers - script.fatal_error = nil - end - script.in_flight = script.in_flight - 1 + for idx, opt in ipairs(opts) do + if script.servers_ready[idx] ~= 'done' then + opt.config = cfg + opt.ev_base = ev_base + opt.is_write = is_write + opt.callback = function(err, data) + if err then + if string.match(err, 'ERR') then + logger.errx(cfg, 'cannot upload script to %s: %s; registered from: %s:%s', + opt.upstream:get_addr():to_string(true), + err, script.caller.short_src, script.caller.currentline) + opt.upstream:fail() + script.servers_ready[idx] = "failed" + return + else + -- Assume temporary error + logger.infox(cfg, 'temporary error uploading script to %s: %s; registered from: %s:%s', + opt.upstream:get_addr():to_string(true), + err, script.caller.short_src, script.caller.currentline) + script.servers_ready[idx] = "tempfail" + return + end + else + opt.upstream:ok() + logger.infox(cfg, + "uploaded redis script to %s %s %s, sha: %s", + opt.upstream:get_addr():to_string(true), + script.filename and "from file" or "with id", script.filename or script.id, + data) + script.sha = data -- We assume that sha is the same on all servers + end - if script.in_flight == 0 then - script_set_loaded(script) + if is_all_servers_ready(script) then + script_set_loaded(script) + elseif is_all_servers_failed(script) then + script.fatal_error = "cannot upload script to any server" + end end - end - local ret = rspamd_redis.make_request(opt) + local ret = rspamd_redis.make_request(opt) - if not ret then - logger.errx('cannot execute redis request to load script on %s', - opt.upstream:get_addr()) - script.in_flight = script.in_flight - 1 - opt.upstream:fail() + if not ret then + logger.errx('cannot execute redis request to load script on %s', + opt.upstream:get_addr()) + script.servers_ready[idx] = "failed" + opt.upstream:fail() + end end - if script.in_flight == 0 then + if is_all_servers_ready(script) then script_set_loaded(script) + elseif is_all_servers_failed(script) then + script.fatal_error = "cannot upload script to any server" end end end @@ -1306,12 +1363,17 @@ local function add_redis_script(script, redis_params, caller_level, maybe_filena rspamd_config:add_on_load(function(cfg, ev_base, worker) local mult = 0.0 rspamd_config:add_periodic(ev_base, 0.0, function() - if not new_script.sha then + if not new_script.sha and not new_script.fatal_error then load_redis_script(new_script, cfg, ev_base, worker) - mult = mult + 1 + + -- Do not wait for more than 10 seconds to retry + if mult < 10 then + mult = mult + 1 + end return 1.0 * mult -- Check one more time in one second end + -- All loaded, we are good to go return false end, false) end)