]> source.dussan.org Git - rspamd.git/commitdiff
[Fix] Fix Redis scripts uploading when Redis is not ready
authorVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 1 Aug 2024 11:13:07 +0000 (12:13 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Thu, 1 Aug 2024 11:13:07 +0000 (12:13 +0100)
Initially, there was no way to recover from Redis errors that are temporary
by nature (e.g. when Redis was busy with loading database).

This PR adds logic to check returned reply and adds more fine-grained
errors-per-server handling.

lualib/lua_redis.lua

index 818d955e95e14802e629b340fece598729223dc0..241a58ce1b90d2c8d03bcc8ebe5da2dc3abab110 100644 (file)
@@ -1162,8 +1162,11 @@ local function prepare_redis_call(script)
   end
 
   -- Call load script on each server, set loaded flag
-  script.in_flight = #servers
-  for _, s in ipairs(servers) do
+  if not script.servers_ready then
+    script.servers_ready = { } -- possible values for each server: unsent, tempfail, failed, done
+  end
+
+  for idx, s in ipairs(servers) do
     local cur_opts = {
       host = s:get_addr(),
       timeout = script.redis_params['timeout'],
@@ -1172,6 +1175,11 @@ local function prepare_redis_call(script)
       upstream = s
     }
 
+    -- By default we start from unsent status
+    if not script.servers_ready[idx] then
+      script.servers_ready[idx] = "unsent"
+    end
+
     if script.redis_params['username'] then
       cur_opts['username'] = script.redis_params['username']
     end
@@ -1190,46 +1198,81 @@ local function prepare_redis_call(script)
   return options
 end
 
+local function is_all_servers_ready(script)
+  for _, s in ipairs(script.servers_ready) do
+    if s == "unsent" or s == "tempfail" then
+      return false
+    end
+  end
+
+  -- We assume that permanent errors are not recoverable, so we will just skip those servers
+  return true
+end
+
+local function is_all_servers_failed(script)
+  for _, s in ipairs(script.servers_ready) do
+    if s == "unsent" or s == "tempfail" or s == "done" then
+      return false
+    end
+  end
+
+  return true
+end
+
 local function load_script_task(script, task, is_write)
   local rspamd_redis = require "rspamd_redis"
   local opts = prepare_redis_call(script)
 
-  for _, opt in ipairs(opts) do
-    opt.task = task
-    opt.is_write = is_write
-    opt.callback = function(err, data)
-      if err then
-        logger.errx(task, 'cannot upload script to %s: %s; registered from: %s:%s',
-            opt.upstream:get_addr():to_string(true),
-            err, script.caller.short_src, script.caller.currentline)
-        opt.upstream:fail()
-        script.fatal_error = err
-      else
-        opt.upstream:ok()
-        logger.infox(task,
-            "uploaded redis script to %s %s %s, sha: %s",
-            opt.upstream:get_addr():to_string(true),
-            script.filename and "from file" or "with id", script.filename or script.id, data)
-        script.sha = data -- We assume that sha is the same on all servers
-      end
-      script.in_flight = script.in_flight - 1
-
-      if script.in_flight == 0 then
-        script_set_loaded(script)
-      end
-    end
+  for idx, opt in ipairs(opts) do
+    if script.servers_ready[idx] ~= 'done' then
+      opt.task = task
+      opt.is_write = is_write
+      opt.callback = function(err, data)
+        if err then
+          if string.match(err, 'ERR') then
+            logger.errx(task, 'cannot upload script to %s: %s; registered from: %s:%s',
+                opt.upstream:get_addr():to_string(true),
+                err, script.caller.short_src, script.caller.currentline)
+            opt.upstream:fail()
+            script.servers_ready[idx] = "failed"
+            return
+          else
+            -- Assume temporary error
+            logger.infox(task, 'temporary error uploading script to %s: %s; registered from: %s:%s',
+                opt.upstream:get_addr():to_string(true),
+                err, script.caller.short_src, script.caller.currentline)
+            script.servers_ready[idx] = "tempfail"
+            return
+          end
+        else
+          opt.upstream:ok()
+          logger.infox(task,
+              "uploaded redis script to %s %s %s, sha: %s",
+              opt.upstream:get_addr():to_string(true),
+              script.filename and "from file" or "with id", script.filename or script.id, data)
+          script.sha = data -- We assume that sha is the same on all servers
+        end
+        if is_all_servers_ready(script) then
+          script_set_loaded(script)
+        elseif is_all_servers_failed(script) then
+          script.fatal_error = "cannot upload script to any server"
+        end
+      end -- callback
 
-    local ret = rspamd_redis.make_request(opt)
+      local ret = rspamd_redis.make_request(opt)
 
-    if not ret then
-      logger.errx('cannot execute redis request to load script on %s',
-          opt.upstream:get_addr())
-      script.in_flight = script.in_flight - 1
-      opt.upstream:fail()
+      if not ret then
+        logger.errx('cannot execute redis request to load script on %s',
+            opt.upstream:get_addr())
+        script.servers_ready[idx] = "failed"
+        opt.upstream:fail()
+      end
     end
 
-    if script.in_flight == 0 then
+    if is_all_servers_ready(script) then
       script_set_loaded(script)
+    elseif is_all_servers_failed(script) then
+      script.fatal_error = "cannot upload script to any server"
     end
   end
 end
@@ -1238,44 +1281,58 @@ local function load_script_taskless(script, cfg, ev_base, is_write)
   local rspamd_redis = require "rspamd_redis"
   local opts = prepare_redis_call(script)
 
-  for _, opt in ipairs(opts) do
-    opt.config = cfg
-    opt.ev_base = ev_base
-    opt.is_write = is_write
-    opt.callback = function(err, data)
-      if err then
-        logger.errx(cfg, 'cannot upload script to %s: %s; registered from: %s:%s, filename: %s',
-            opt.upstream:get_addr():to_string(true),
-            err, script.caller.short_src, script.caller.currentline, script.filename)
-        opt.upstream:fail()
-        script.fatal_error = err
-      else
-        opt.upstream:ok()
-        logger.infox(cfg,
-            "uploaded redis script to %s %s %s, sha: %s",
-            opt.upstream:get_addr():to_string(true),
-            script.filename and "from file" or "with id", script.filename or script.id,
-            data)
-        script.sha = data -- We assume that sha is the same on all servers
-        script.fatal_error = nil
-      end
-      script.in_flight = script.in_flight - 1
+  for idx, opt in ipairs(opts) do
+    if script.servers_ready[idx] ~= 'done' then
+      opt.config = cfg
+      opt.ev_base = ev_base
+      opt.is_write = is_write
+      opt.callback = function(err, data)
+        if err then
+          if string.match(err, 'ERR') then
+            logger.errx(cfg, 'cannot upload script to %s: %s; registered from: %s:%s',
+                opt.upstream:get_addr():to_string(true),
+                err, script.caller.short_src, script.caller.currentline)
+            opt.upstream:fail()
+            script.servers_ready[idx] = "failed"
+            return
+          else
+            -- Assume temporary error
+            logger.infox(cfg, 'temporary error uploading script to %s: %s; registered from: %s:%s',
+                opt.upstream:get_addr():to_string(true),
+                err, script.caller.short_src, script.caller.currentline)
+            script.servers_ready[idx] = "tempfail"
+            return
+          end
+        else
+          opt.upstream:ok()
+          logger.infox(cfg,
+              "uploaded redis script to %s %s %s, sha: %s",
+              opt.upstream:get_addr():to_string(true),
+              script.filename and "from file" or "with id", script.filename or script.id,
+              data)
+          script.sha = data -- We assume that sha is the same on all servers
+        end
 
-      if script.in_flight == 0 then
-        script_set_loaded(script)
+        if is_all_servers_ready(script) then
+          script_set_loaded(script)
+        elseif is_all_servers_failed(script) then
+          script.fatal_error = "cannot upload script to any server"
+        end
       end
-    end
-    local ret = rspamd_redis.make_request(opt)
+      local ret = rspamd_redis.make_request(opt)
 
-    if not ret then
-      logger.errx('cannot execute redis request to load script on %s',
-          opt.upstream:get_addr())
-      script.in_flight = script.in_flight - 1
-      opt.upstream:fail()
+      if not ret then
+        logger.errx('cannot execute redis request to load script on %s',
+            opt.upstream:get_addr())
+        script.servers_ready[idx] = "failed"
+        opt.upstream:fail()
+      end
     end
 
-    if script.in_flight == 0 then
+    if is_all_servers_ready(script) then
       script_set_loaded(script)
+    elseif is_all_servers_failed(script) then
+      script.fatal_error = "cannot upload script to any server"
     end
   end
 end
@@ -1306,12 +1363,17 @@ local function add_redis_script(script, redis_params, caller_level, maybe_filena
   rspamd_config:add_on_load(function(cfg, ev_base, worker)
     local mult = 0.0
     rspamd_config:add_periodic(ev_base, 0.0, function()
-      if not new_script.sha then
+      if not new_script.sha and not new_script.fatal_error then
         load_redis_script(new_script, cfg, ev_base, worker)
-        mult = mult + 1
+
+        -- Do not wait for more than 10 seconds to retry
+        if mult < 10 then
+          mult = mult + 1
+        end
         return 1.0 * mult -- Check one more time in one second
       end
 
+      -- All loaded, we are good to go
       return false
     end, false)
   end)