aboutsummaryrefslogtreecommitdiffstats
path: root/lualib/lua_redis.lua
diff options
context:
space:
mode:
Diffstat (limited to 'lualib/lua_redis.lua')
-rw-r--r--lualib/lua_redis.lua44
1 files changed, 27 insertions, 17 deletions
diff --git a/lualib/lua_redis.lua b/lualib/lua_redis.lua
index 85f5ebc7a..195b7759f 100644
--- a/lualib/lua_redis.lua
+++ b/lualib/lua_redis.lua
@@ -26,7 +26,7 @@ local N = "lua_redis"
local db_schema = (ts.number / tostring + ts.string):is_optional():describe("Database number")
local common_schema = {
- timeout = (ts.number + ts.string / lutil.parse_time_interval):is_optional():describe("Connection timeout"),
+ timeout = (ts.number + ts.string / lutil.parse_time_interval):is_optional():describe("Connection timeout (seconds)"),
db = db_schema,
database = db_schema,
dbname = db_schema,
@@ -40,6 +40,7 @@ local common_schema = {
sentinel_master_maxerrors = (ts.number + ts.string / tonumber):is_optional():describe("Sentinel master max errors"),
sentinel_username = ts.string:is_optional():describe("Sentinel username"),
sentinel_password = ts.string:is_optional():describe("Sentinel password"),
+ redis_version = (ts.number + ts.string / tonumber):is_optional():describe("Redis server version (6 or 7)"),
}
local read_schema = lutil.table_merge({
@@ -357,6 +358,10 @@ local function process_redis_opts(options, redis_params)
redis_params['prefix'] = options['prefix']
end
+ if options['redis_version'] and not redis_params['redis_version'] then
+ redis_params['redis_version'] = tonumber(options['redis_version'])
+ end
+
if type(options['expand_keys']) == 'boolean' then
redis_params['expand_keys'] = options['expand_keys']
else
@@ -1124,9 +1129,9 @@ local function redis_make_request_taskless(ev_base, cfg, redis_params, key,
end
--[[[
--- @function lua_redis.redis_make_request_taskless(ev_base, redis_params, key, is_write, callback, command, args)
+-- @function lua_redis.redis_make_request_taskless(ev_base, cfg, redis_params, key, is_write, callback, command, args)
-- Sends a request to Redis in context where `task` is not available for some specific use-cases
--- Identical to redis_make_request() except in that first parameter is an `event base` object
+-- Identical to redis_make_request() except in that first parameter is an `event base` object and the second one is the 'config' object
--]]
exports.rspamd_redis_make_request_taskless = redis_make_request_taskless
@@ -1202,15 +1207,13 @@ local function prepare_redis_call(script)
return options
end
-local function is_all_servers_ready(script)
+local function is_any_server_ready(script)
for _, s in ipairs(script.servers_ready) do
- if s == "unsent" or s == "tempfail" then
- return false
+ if s == "done" then
+ return true
end
end
-
- -- We assume that permanent errors are not recoverable, so we will just skip those servers
- return true
+ return false
end
local function is_all_servers_failed(script)
@@ -1264,7 +1267,7 @@ local function load_script_task(script, task, is_write)
script.sha = data -- We assume that sha is the same on all servers
script.servers_ready[idx] = "done"
end
- if is_all_servers_ready(script) then
+ if is_any_server_ready(script) then
script_set_loaded(script)
elseif is_all_servers_failed(script) then
script.pending_upload = false
@@ -1282,7 +1285,7 @@ local function load_script_task(script, task, is_write)
end
end
- if is_all_servers_ready(script) then
+ if is_any_server_ready(script) then
script_set_loaded(script)
elseif is_all_servers_failed(script) then
script.pending_upload = false
@@ -1309,7 +1312,6 @@ local function load_script_taskless(script, cfg, ev_base, is_write)
err, script.caller.short_src, script.caller.currentline)
opt.upstream:fail()
script.servers_ready[idx] = "failed"
- return
else
-- Assume temporary error
logger.infox(cfg, 'temporary error uploading script %s to %s: %s; registered from: %s:%s',
@@ -1317,7 +1319,6 @@ local function load_script_taskless(script, cfg, ev_base, is_write)
opt.upstream:get_addr():to_string(true),
err, script.caller.short_src, script.caller.currentline)
script.servers_ready[idx] = "tempfail"
- return
end
else
opt.upstream:ok()
@@ -1330,7 +1331,7 @@ local function load_script_taskless(script, cfg, ev_base, is_write)
script.servers_ready[idx] = "done"
end
- if is_all_servers_ready(script) then
+ if is_any_server_ready(script) then
script_set_loaded(script)
elseif is_all_servers_failed(script) then
script.pending_upload = false
@@ -1348,7 +1349,7 @@ local function load_script_taskless(script, cfg, ev_base, is_write)
end
end
- if is_all_servers_ready(script) then
+ if is_any_server_ready(script) then
script_set_loaded(script)
elseif is_all_servers_failed(script) then
script.pending_upload = false
@@ -1477,6 +1478,10 @@ local function exec_redis_script(id, params, callback, keys, args)
script.sha = nil
script.loaded = nil
script.pending_upload = true
+ -- We must initialize all servers as we don't know here which one failed
+ for i, _ in ipairs(script.servers_ready) do
+ script.servers_ready[i] = "unsent"
+ end
-- Reload scripts if this has not been initiated yet
if params.task then
load_script_task(script, params.task)
@@ -1510,15 +1515,20 @@ local function exec_redis_script(id, params, callback, keys, args)
end
end
+ local redis_command = 'EVALSHA'
+ if not params.is_write and script.redis_params.redis_version and
+ script.redis_params.redis_version >= 7 then
+ redis_command = 'EVALSHA_RO'
+ end
if params.task then
if not rspamd_redis_make_request(params.task, script.redis_params,
- params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then
+ params.key, params.is_write, redis_cb, redis_command, redis_args) then
callback('Cannot make redis request', nil)
end
else
if not redis_make_request_taskless(params.ev_base, rspamd_config,
script.redis_params,
- params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then
+ params.key, params.is_write, redis_cb, redis_command, redis_args) then
callback('Cannot make redis request', nil)
end
end