diff options
Diffstat (limited to 'lualib/lua_redis.lua')
-rw-r--r-- | lualib/lua_redis.lua | 44 |
1 files changed, 27 insertions, 17 deletions
diff --git a/lualib/lua_redis.lua b/lualib/lua_redis.lua index 85f5ebc7a..195b7759f 100644 --- a/lualib/lua_redis.lua +++ b/lualib/lua_redis.lua @@ -26,7 +26,7 @@ local N = "lua_redis" local db_schema = (ts.number / tostring + ts.string):is_optional():describe("Database number") local common_schema = { - timeout = (ts.number + ts.string / lutil.parse_time_interval):is_optional():describe("Connection timeout"), + timeout = (ts.number + ts.string / lutil.parse_time_interval):is_optional():describe("Connection timeout (seconds)"), db = db_schema, database = db_schema, dbname = db_schema, @@ -40,6 +40,7 @@ local common_schema = { sentinel_master_maxerrors = (ts.number + ts.string / tonumber):is_optional():describe("Sentinel master max errors"), sentinel_username = ts.string:is_optional():describe("Sentinel username"), sentinel_password = ts.string:is_optional():describe("Sentinel password"), + redis_version = (ts.number + ts.string / tonumber):is_optional():describe("Redis server version (6 or 7)"), } local read_schema = lutil.table_merge({ @@ -357,6 +358,10 @@ local function process_redis_opts(options, redis_params) redis_params['prefix'] = options['prefix'] end + if options['redis_version'] and not redis_params['redis_version'] then + redis_params['redis_version'] = tonumber(options['redis_version']) + end + if type(options['expand_keys']) == 'boolean' then redis_params['expand_keys'] = options['expand_keys'] else @@ -1124,9 +1129,9 @@ local function redis_make_request_taskless(ev_base, cfg, redis_params, key, end --[[[ --- @function lua_redis.redis_make_request_taskless(ev_base, redis_params, key, is_write, callback, command, args) +-- @function lua_redis.redis_make_request_taskless(ev_base, cfg, redis_params, key, is_write, callback, command, args) -- Sends a request to Redis in context where `task` is not available for some specific use-cases --- Identical to redis_make_request() except in that first parameter is an `event base` object +-- Identical to redis_make_request() except in that first parameter is an `event base` object and the second one is the 'config' object --]] exports.rspamd_redis_make_request_taskless = redis_make_request_taskless @@ -1202,15 +1207,13 @@ local function prepare_redis_call(script) return options end -local function is_all_servers_ready(script) +local function is_any_server_ready(script) for _, s in ipairs(script.servers_ready) do - if s == "unsent" or s == "tempfail" then - return false + if s == "done" then + return true end end - - -- We assume that permanent errors are not recoverable, so we will just skip those servers - return true + return false end local function is_all_servers_failed(script) @@ -1264,7 +1267,7 @@ local function load_script_task(script, task, is_write) script.sha = data -- We assume that sha is the same on all servers script.servers_ready[idx] = "done" end - if is_all_servers_ready(script) then + if is_any_server_ready(script) then script_set_loaded(script) elseif is_all_servers_failed(script) then script.pending_upload = false @@ -1282,7 +1285,7 @@ local function load_script_task(script, task, is_write) end end - if is_all_servers_ready(script) then + if is_any_server_ready(script) then script_set_loaded(script) elseif is_all_servers_failed(script) then script.pending_upload = false @@ -1309,7 +1312,6 @@ local function load_script_taskless(script, cfg, ev_base, is_write) err, script.caller.short_src, script.caller.currentline) opt.upstream:fail() script.servers_ready[idx] = "failed" - return else -- Assume temporary error logger.infox(cfg, 'temporary error uploading script %s to %s: %s; registered from: %s:%s', @@ -1317,7 +1319,6 @@ local function load_script_taskless(script, cfg, ev_base, is_write) opt.upstream:get_addr():to_string(true), err, script.caller.short_src, script.caller.currentline) script.servers_ready[idx] = "tempfail" - return end else opt.upstream:ok() @@ -1330,7 +1331,7 @@ local function load_script_taskless(script, cfg, ev_base, is_write) script.servers_ready[idx] = "done" end - if is_all_servers_ready(script) then + if is_any_server_ready(script) then script_set_loaded(script) elseif is_all_servers_failed(script) then script.pending_upload = false @@ -1348,7 +1349,7 @@ local function load_script_taskless(script, cfg, ev_base, is_write) end end - if is_all_servers_ready(script) then + if is_any_server_ready(script) then script_set_loaded(script) elseif is_all_servers_failed(script) then script.pending_upload = false @@ -1477,6 +1478,10 @@ local function exec_redis_script(id, params, callback, keys, args) script.sha = nil script.loaded = nil script.pending_upload = true + -- We must initialize all servers as we don't know here which one failed + for i, _ in ipairs(script.servers_ready) do + script.servers_ready[i] = "unsent" + end -- Reload scripts if this has not been initiated yet if params.task then load_script_task(script, params.task) @@ -1510,15 +1515,20 @@ local function exec_redis_script(id, params, callback, keys, args) end end + local redis_command = 'EVALSHA' + if not params.is_write and script.redis_params.redis_version and + script.redis_params.redis_version >= 7 then + redis_command = 'EVALSHA_RO' + end if params.task then if not rspamd_redis_make_request(params.task, script.redis_params, - params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then + params.key, params.is_write, redis_cb, redis_command, redis_args) then callback('Cannot make redis request', nil) end else if not redis_make_request_taskless(params.ev_base, rspamd_config, script.redis_params, - params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then + params.key, params.is_write, redis_cb, redis_command, redis_args) then callback('Cannot make redis request', nil) end end |