diff options
-rw-r--r-- | lualib/lua_redis.lua | 196 |
1 files changed, 129 insertions, 67 deletions
diff --git a/lualib/lua_redis.lua b/lualib/lua_redis.lua index 818d955e9..241a58ce1 100644 --- a/lualib/lua_redis.lua +++ b/lualib/lua_redis.lua @@ -1162,8 +1162,11 @@ local function prepare_redis_call(script) end -- Call load script on each server, set loaded flag - script.in_flight = #servers - for _, s in ipairs(servers) do + if not script.servers_ready then + script.servers_ready = { } -- possible values for each server: unsent, tempfail, failed, done + end + + for idx, s in ipairs(servers) do local cur_opts = { host = s:get_addr(), timeout = script.redis_params['timeout'], @@ -1172,6 +1175,11 @@ local function prepare_redis_call(script) upstream = s } + -- By default we start from unsent status + if not script.servers_ready[idx] then + script.servers_ready[idx] = "unsent" + end + if script.redis_params['username'] then cur_opts['username'] = script.redis_params['username'] end @@ -1190,46 +1198,81 @@ local function prepare_redis_call(script) return options end +local function is_all_servers_ready(script) + for _, s in ipairs(script.servers_ready) do + if s == "unsent" or s == "tempfail" then + return false + end + end + + -- We assume that permanent errors are not recoverable, so we will just skip those servers + return true +end + +local function is_all_servers_failed(script) + for _, s in ipairs(script.servers_ready) do + if s == "unsent" or s == "tempfail" or s == "done" then + return false + end + end + + return true +end + local function load_script_task(script, task, is_write) local rspamd_redis = require "rspamd_redis" local opts = prepare_redis_call(script) - for _, opt in ipairs(opts) do - opt.task = task - opt.is_write = is_write - opt.callback = function(err, data) - if err then - logger.errx(task, 'cannot upload script to %s: %s; registered from: %s:%s', - opt.upstream:get_addr():to_string(true), - err, script.caller.short_src, script.caller.currentline) - opt.upstream:fail() - script.fatal_error = err - else - opt.upstream:ok() - logger.infox(task, - "uploaded redis script to %s %s %s, sha: %s", - opt.upstream:get_addr():to_string(true), - script.filename and "from file" or "with id", script.filename or script.id, data) - script.sha = data -- We assume that sha is the same on all servers - end - script.in_flight = script.in_flight - 1 - - if script.in_flight == 0 then - script_set_loaded(script) - end - end + for idx, opt in ipairs(opts) do + if script.servers_ready[idx] ~= 'done' then + opt.task = task + opt.is_write = is_write + opt.callback = function(err, data) + if err then + if string.match(err, 'ERR') then + logger.errx(task, 'cannot upload script to %s: %s; registered from: %s:%s', + opt.upstream:get_addr():to_string(true), + err, script.caller.short_src, script.caller.currentline) + opt.upstream:fail() + script.servers_ready[idx] = "failed" + return + else + -- Assume temporary error + logger.infox(task, 'temporary error uploading script to %s: %s; registered from: %s:%s', + opt.upstream:get_addr():to_string(true), + err, script.caller.short_src, script.caller.currentline) + script.servers_ready[idx] = "tempfail" + return + end + else + opt.upstream:ok() + logger.infox(task, + "uploaded redis script to %s %s %s, sha: %s", + opt.upstream:get_addr():to_string(true), + script.filename and "from file" or "with id", script.filename or script.id, data) + script.sha = data -- We assume that sha is the same on all servers + end + if is_all_servers_ready(script) then + script_set_loaded(script) + elseif is_all_servers_failed(script) then + script.fatal_error = "cannot upload script to any server" + end + end -- callback - local ret = rspamd_redis.make_request(opt) + local ret = rspamd_redis.make_request(opt) - if not ret then - logger.errx('cannot execute redis request to load script on %s', - opt.upstream:get_addr()) - script.in_flight = script.in_flight - 1 - opt.upstream:fail() + if not ret then + logger.errx('cannot execute redis request to load script on %s', + opt.upstream:get_addr()) + script.servers_ready[idx] = "failed" + opt.upstream:fail() + end end - if script.in_flight == 0 then + if is_all_servers_ready(script) then script_set_loaded(script) + elseif is_all_servers_failed(script) then + script.fatal_error = "cannot upload script to any server" end end end @@ -1238,44 +1281,58 @@ local function load_script_taskless(script, cfg, ev_base, is_write) local rspamd_redis = require "rspamd_redis" local opts = prepare_redis_call(script) - for _, opt in ipairs(opts) do - opt.config = cfg - opt.ev_base = ev_base - opt.is_write = is_write - opt.callback = function(err, data) - if err then - logger.errx(cfg, 'cannot upload script to %s: %s; registered from: %s:%s, filename: %s', - opt.upstream:get_addr():to_string(true), - err, script.caller.short_src, script.caller.currentline, script.filename) - opt.upstream:fail() - script.fatal_error = err - else - opt.upstream:ok() - logger.infox(cfg, - "uploaded redis script to %s %s %s, sha: %s", - opt.upstream:get_addr():to_string(true), - script.filename and "from file" or "with id", script.filename or script.id, - data) - script.sha = data -- We assume that sha is the same on all servers - script.fatal_error = nil - end - script.in_flight = script.in_flight - 1 + for idx, opt in ipairs(opts) do + if script.servers_ready[idx] ~= 'done' then + opt.config = cfg + opt.ev_base = ev_base + opt.is_write = is_write + opt.callback = function(err, data) + if err then + if string.match(err, 'ERR') then + logger.errx(cfg, 'cannot upload script to %s: %s; registered from: %s:%s', + opt.upstream:get_addr():to_string(true), + err, script.caller.short_src, script.caller.currentline) + opt.upstream:fail() + script.servers_ready[idx] = "failed" + return + else + -- Assume temporary error + logger.infox(cfg, 'temporary error uploading script to %s: %s; registered from: %s:%s', + opt.upstream:get_addr():to_string(true), + err, script.caller.short_src, script.caller.currentline) + script.servers_ready[idx] = "tempfail" + return + end + else + opt.upstream:ok() + logger.infox(cfg, + "uploaded redis script to %s %s %s, sha: %s", + opt.upstream:get_addr():to_string(true), + script.filename and "from file" or "with id", script.filename or script.id, + data) + script.sha = data -- We assume that sha is the same on all servers + end - if script.in_flight == 0 then - script_set_loaded(script) + if is_all_servers_ready(script) then + script_set_loaded(script) + elseif is_all_servers_failed(script) then + script.fatal_error = "cannot upload script to any server" + end end - end - local ret = rspamd_redis.make_request(opt) + local ret = rspamd_redis.make_request(opt) - if not ret then - logger.errx('cannot execute redis request to load script on %s', - opt.upstream:get_addr()) - script.in_flight = script.in_flight - 1 - opt.upstream:fail() + if not ret then + logger.errx('cannot execute redis request to load script on %s', + opt.upstream:get_addr()) + script.servers_ready[idx] = "failed" + opt.upstream:fail() + end end - if script.in_flight == 0 then + if is_all_servers_ready(script) then script_set_loaded(script) + elseif is_all_servers_failed(script) then + script.fatal_error = "cannot upload script to any server" end end end @@ -1306,12 +1363,17 @@ local function add_redis_script(script, redis_params, caller_level, maybe_filena rspamd_config:add_on_load(function(cfg, ev_base, worker) local mult = 0.0 rspamd_config:add_periodic(ev_base, 0.0, function() - if not new_script.sha then + if not new_script.sha and not new_script.fatal_error then load_redis_script(new_script, cfg, ev_base, worker) - mult = mult + 1 + + -- Do not wait for more than 10 seconds to retry + if mult < 10 then + mult = mult + 1 + end return 1.0 * mult -- Check one more time in one second end + -- All loaded, we are good to go return false end, false) end) |