diff options
author | Vsevolod Stakhov <vsevolod@rspamd.com> | 2023-08-01 16:07:13 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rspamd.com> | 2023-08-01 16:07:13 +0100 |
commit | ac5e940daa667b7ec2d1d29d2718806d83ee3a2c (patch) | |
tree | c53978e8ac35cdaa4e2e38cc6de39237bbd79db4 | |
parent | 1338a8b9272dbea20ce0e4fee2c986bd5eeaa82b (diff) | |
download | rspamd-ac5e940daa667b7ec2d1d29d2718806d83ee3a2c.tar.gz rspamd-ac5e940daa667b7ec2d1d29d2718806d83ee3a2c.zip |
[Feature] Add `sentinel_password` option
-rw-r--r-- | lualib/lua_redis.lua | 236 |
1 files changed, 126 insertions, 110 deletions
diff --git a/lualib/lua_redis.lua b/lualib/lua_redis.lua index 62511451e..2a43a915c 100644 --- a/lualib/lua_redis.lua +++ b/lualib/lua_redis.lua @@ -36,34 +36,33 @@ local common_schema = ts.shape { sentinel_watch_time = (ts.number + ts.string / lutil.parse_time_interval):is_optional(), sentinel_masters_pattern = ts.string:is_optional(), sentinel_master_maxerrors = (ts.number + ts.string / tonumber):is_optional(), + sentinel_password = ts.string:is_optional(), } -local config_schema = - -- Allow separate read/write servers to allow usage in the `extra_fields` - ts.shape({ - read_servers = ts.string + ts.array_of(ts.string), - }, {extra_fields = common_schema}) + - ts.shape({ - write_servers = ts.string + ts.array_of(ts.string), - }, {extra_fields = common_schema}) + - ts.shape({ - read_servers = ts.string + ts.array_of(ts.string), - write_servers = ts.string + ts.array_of(ts.string), - }, {extra_fields = common_schema}) + - ts.shape({ - servers = ts.string + ts.array_of(ts.string), - }, {extra_fields = common_schema}) + - ts.shape({ - server = ts.string + ts.array_of(ts.string), - }, {extra_fields = common_schema}) +local config_schema = -- Allow separate read/write servers to allow usage in the `extra_fields` +ts.shape({ + read_servers = ts.string + ts.array_of(ts.string), +}, { extra_fields = common_schema }) + + ts.shape({ + write_servers = ts.string + ts.array_of(ts.string), + }, { extra_fields = common_schema }) + + ts.shape({ + read_servers = ts.string + ts.array_of(ts.string), + write_servers = ts.string + ts.array_of(ts.string), + }, { extra_fields = common_schema }) + + ts.shape({ + servers = ts.string + ts.array_of(ts.string), + }, { extra_fields = common_schema }) + + ts.shape({ + server = ts.string + ts.array_of(ts.string), + }, { extra_fields = common_schema }) exports.config_schema = config_schema - local function redis_query_sentinel(ev_base, params, initialised) local function flatten_redis_table(tbl) local res = {} - for i=1,#tbl,2 do + for i = 1, #tbl, 2 do res[tbl[i]] = tbl[i + 1] end @@ -83,12 +82,12 @@ local function redis_query_sentinel(ev_base, params, initialised) local pending_subrequests = 0 - for _,m in ipairs(result) do + for _, m in ipairs(result) do local master = flatten_redis_table(m) -- Wrap IPv6-addresses in brackets if (master.ip:match(":")) then - master.ip = "["..master.ip.."]" + master.ip = "[" .. master.ip .. "]" end if params.sentinel_masters_pattern then @@ -108,18 +107,18 @@ local function redis_query_sentinel(ev_base, params, initialised) end -- For each master we need to get a list of slaves - for k,v in pairs(masters) do + for k, v in pairs(masters) do v.slaves = {} local function slaves_cb(slave_err, slave_result) if not slave_err and type(slave_result) == 'table' then - for _,s in ipairs(slave_result) do + for _, s in ipairs(slave_result) do local slave = flatten_redis_table(s) lutil.debugm(N, rspamd_config, 'found slave for master %s with ip %s and port %s', v.name, slave.ip, slave.port) -- Wrap IPv6-addresses in brackets if (slave.ip:match(":")) then - slave.ip = "["..slave.ip.."]" + slave.ip = "[" .. slave.ip .. "]" end v.slaves[#v.slaves + 1] = slave end @@ -137,16 +136,17 @@ local function redis_query_sentinel(ev_base, params, initialised) end end - local ret = rspamd_redis.make_request({ + local ret = rspamd_redis.make_request { host = addr:get_addr(), timeout = params.timeout, + password = params.sentinel_password, config = rspamd_config, ev_base = ev_base, cmd = 'SENTINEL', - args = {'slaves', k}, + args = { 'slaves', k }, no_pool = true, callback = slaves_cb - }) + } if not ret then logger.errx(rspamd_config, 'cannot connect sentinel when query slaves at address: %s', @@ -165,16 +165,17 @@ local function redis_query_sentinel(ev_base, params, initialised) end end - local ret = rspamd_redis.make_request({ + local ret = rspamd_redis.make_request { host = addr:get_addr(), timeout = params.timeout, config = rspamd_config, ev_base = ev_base, + password = params.sentinel_password, cmd = 'SENTINEL', - args = {'masters'}, + args = { 'masters' }, no_pool = true, callback = masters_cb, - }) + } if not ret then logger.errx(rspamd_config, 'cannot connect sentinel at address: %s', @@ -186,7 +187,7 @@ local function redis_query_sentinel(ev_base, params, initialised) -- We now form new strings for masters and slaves local read_servers_tbl, write_servers_tbl = {}, {} - for _,master in pairs(masters) do + for _, master in pairs(masters) do write_servers_tbl[#write_servers_tbl + 1] = string.format( '%s:%s', master.ip, master.port ) @@ -194,7 +195,7 @@ local function redis_query_sentinel(ev_base, params, initialised) '%s:%s', master.ip, master.port ) - for _,slave in ipairs(master.slaves) do + for _, slave in ipairs(master.slaves) do if slave['master-link-status'] == 'ok' then read_servers_tbl[#read_servers_tbl + 1] = string.format( '%s:%s', slave.ip, slave.port @@ -309,7 +310,7 @@ local function calculate_redis_hash(params) h:update(k) h:update(tostring(v)) elseif type(v) == 'table' then - for kk,vv in pairs(v) do + for kk, vv in pairs(v) do rec_hash(kk, vv) end end @@ -415,17 +416,17 @@ local function process_redis_options(options, rspamd_config, result) if options['read_servers'] then if rspamd_config then upstreams_read = upstream_list.create(rspamd_config, - options['read_servers'], default_port) + options['read_servers'], default_port) else upstreams_read = upstream_list.create(options['read_servers'], - default_port) + default_port) end result.read_servers_str = options['read_servers'] elseif options['servers'] then if rspamd_config then upstreams_read = upstream_list.create(rspamd_config, - options['servers'], default_port) + options['servers'], default_port) else upstreams_read = upstream_list.create(options['servers'], default_port) end @@ -435,7 +436,7 @@ local function process_redis_options(options, rspamd_config, result) elseif options['server'] then if rspamd_config then upstreams_read = upstream_list.create(rspamd_config, - options['server'], default_port) + options['server'], default_port) else upstreams_read = upstream_list.create(options['server'], default_port) end @@ -448,10 +449,10 @@ local function process_redis_options(options, rspamd_config, result) if options['write_servers'] then if rspamd_config then upstreams_write = upstream_list.create(rspamd_config, - options['write_servers'], default_port) + options['write_servers'], default_port) else upstreams_write = upstream_list.create(options['write_servers'], - default_port) + default_port) end result.write_servers_str = options['write_servers'] read_only = false @@ -567,12 +568,12 @@ local function rspamd_parse_redis_server(module_name, module_opts, no_fallback) -- Exclude disabled if opts['disabled_modules'] then - for _,v in ipairs(opts['disabled_modules']) do + for _, v in ipairs(opts['disabled_modules']) do if v == module_name then logger.infox(rspamd_config, "NOT using default redis server for module %s: it is disabled", - module_name) + module_name) - return nil + return nil end end end @@ -586,7 +587,7 @@ local function rspamd_parse_redis_server(module_name, module_opts, no_fallback) end if result.read_servers then - return maybe_return_cached(result) + return maybe_return_cached(result) end return nil @@ -619,7 +620,7 @@ local process_cmd = { end, blpop = function(args) local idx_l = {} - for i = 1, #args -1 do + for i = 1, #args - 1 do table.insert(idx_l, i) end return idx_l @@ -635,7 +636,7 @@ local process_cmd = { return idx_l end, set = function(args) - return {1} + return { 1 } end, mget = function(args) local idx_l = {} @@ -659,9 +660,10 @@ local process_cmd = { return idx_l end, smove = function(args) - return {1, 2} + return { 1, 2 } end, - script = function() end + script = function() + end } process_cmd.append = process_cmd.set process_cmd.auth = process_cmd.script @@ -835,12 +837,16 @@ local gen_meta = { end, principal_recipient_domain = function(task) local p = task:get_principal_recipient() - if not p then return end + if not p then + return + end return string.match(p, '.*@(.*)') end, ip = function(task) local i = task:get_ip() - if i and i:is_valid() then return i:to_string() end + if i and i:is_valid() then + return i:to_string() + end end, from = function(task) return ((task:get_from('smtp') or E)[1] or E)['addr'] @@ -850,7 +856,9 @@ local gen_meta = { end, from_domain_or_helo_domain = function(task) local d = ((task:get_from('smtp') or E)[1] or E)['domain'] - if d and #d > 0 then return d end + if d and #d > 0 then + return d + end return task:get_helo() end, mime_from = function(task) @@ -864,7 +872,9 @@ local gen_meta = { local function gen_get_esld(f) return function(task) local d = f(task) - if not d then return end + if not d then + return + end return rspamd_util.get_tld(d) end end @@ -912,7 +922,7 @@ end -- args - table of arguments -- extra_opts - table of optional request arguments local function rspamd_redis_make_request(task, redis_params, key, is_write, - callback, command, args, extra_opts) + callback, command, args, extra_opts) local addr local function rspamd_redis_make_request_cb(err, data) if err then @@ -925,7 +935,7 @@ local function rspamd_redis_make_request(task, redis_params, key, is_write, end end if not task or not redis_params or not callback or not command then - return false,nil,nil + return false, nil, nil end local rspamd_redis = require "rspamd_redis" @@ -967,7 +977,7 @@ local function rspamd_redis_make_request(task, redis_params, key, is_write, } if extra_opts then - for k,v in pairs(extra_opts) do + for k, v in pairs(extra_opts) do options[k] = v end end @@ -984,14 +994,14 @@ local function rspamd_redis_make_request(task, redis_params, key, is_write, ' (host=%s, timeout=%s): cmd: %s', ip_addr, options.timeout, options.cmd) - local ret,conn = rspamd_redis.make_request(options) + local ret, conn = rspamd_redis.make_request(options) if not ret then addr:fail() logger.warnx(task, "cannot make redis request to: %s", tostring(ip_addr)) end - return ret,conn,addr + return ret, conn, addr end --[[[ @@ -1010,9 +1020,9 @@ exports.rspamd_redis_make_request = rspamd_redis_make_request exports.redis_make_request = rspamd_redis_make_request local function redis_make_request_taskless(ev_base, cfg, redis_params, key, - is_write, callback, command, args, extra_opts) + is_write, callback, command, args, extra_opts) if not ev_base or not redis_params or not callback or not command then - return false,nil,nil + return false, nil, nil end local addr @@ -1057,12 +1067,11 @@ local function redis_make_request_taskless(ev_base, cfg, redis_params, key, args = args } if extra_opts then - for k,v in pairs(extra_opts) do + for k, v in pairs(extra_opts) do options[k] = v end end - if redis_params['password'] then options['password'] = redis_params['password'] end @@ -1074,13 +1083,13 @@ local function redis_make_request_taskless(ev_base, cfg, redis_params, key, lutil.debugm(N, cfg, 'perform taskless request to redis server' .. ' (host=%s, timeout=%s): cmd: %s', options.host:tostring(true), options.timeout, options.cmd) - local ret,conn = rspamd_redis.make_request(options) + local ret, conn = rspamd_redis.make_request(options) if not ret then logger.errx('cannot execute redis request') addr:fail() end - return ret,conn,addr + return ret, conn, addr end --[[[ @@ -1101,20 +1110,22 @@ local function script_set_loaded(script) end local wait_table = {} - for _,s in ipairs(script.waitq) do + for _, s in ipairs(script.waitq) do table.insert(wait_table, s) end script.waitq = {} - for _,s in ipairs(wait_table) do + for _, s in ipairs(wait_table) do s(script.loaded) end end local function prepare_redis_call(script) local function merge_tables(t1, t2) - for k,v in pairs(t2) do t1[k] = v end + for k, v in pairs(t2) do + t1[k] = v + end end local servers = {} @@ -1129,12 +1140,12 @@ local function prepare_redis_call(script) -- Call load script on each server, set loaded flag script.in_flight = #servers - for _,s in ipairs(servers) do + for _, s in ipairs(servers) do local cur_opts = { host = s:get_addr(), timeout = script.redis_params['timeout'], cmd = 'SCRIPT', - args = {'LOAD', script.script }, + args = { 'LOAD', script.script }, upstream = s } @@ -1156,7 +1167,7 @@ local function load_script_task(script, task, is_write) local rspamd_redis = require "rspamd_redis" local opts = prepare_redis_call(script) - for _,opt in ipairs(opts) do + for _, opt in ipairs(opts) do opt.task = task opt.is_write = is_write opt.callback = function(err, data) @@ -1169,7 +1180,7 @@ local function load_script_task(script, task, is_write) else opt.upstream:ok() logger.infox(task, - "uploaded redis script to %s with id %s, sha: %s", + "uploaded redis script to %s with id %s, sha: %s", opt.upstream:get_addr():to_string(true), script.id, data) script.sha = data -- We assume that sha is the same on all servers @@ -1185,7 +1196,7 @@ local function load_script_task(script, task, is_write) if not ret then logger.errx('cannot execute redis request to load script on %s', - opt.upstream:get_addr()) + opt.upstream:get_addr()) script.in_flight = script.in_flight - 1 opt.upstream:fail() end @@ -1200,7 +1211,7 @@ local function load_script_taskless(script, cfg, ev_base, is_write) local rspamd_redis = require "rspamd_redis" local opts = prepare_redis_call(script) - for _,opt in ipairs(opts) do + for _, opt in ipairs(opts) do opt.config = cfg opt.ev_base = ev_base opt.is_write = is_write @@ -1214,7 +1225,7 @@ local function load_script_taskless(script, cfg, ev_base, is_write) else opt.upstream:ok() logger.infox(cfg, - "uploaded redis script to %s with id %s, sha: %s", + "uploaded redis script to %s with id %s, sha: %s", opt.upstream:get_addr():to_string(true), script.id, data) script.sha = data -- We assume that sha is the same on all servers script.fatal_error = nil @@ -1229,7 +1240,7 @@ local function load_script_taskless(script, cfg, ev_base, is_write) if not ret then logger.errx('cannot execute redis request to load script on %s', - opt.upstream:get_addr()) + opt.upstream:get_addr()) script.in_flight = script.in_flight - 1 opt.upstream:fail() end @@ -1247,7 +1258,9 @@ local function load_redis_script(script, cfg, ev_base, _) end local function add_redis_script(script, redis_params, caller_level) - if not caller_level then caller_level = 2 end + if not caller_level then + caller_level = 2 + end local caller = debug.getinfo(caller_level) local new_script = { @@ -1290,8 +1303,10 @@ local function load_redis_script_from_file(filename, redis_params, dir) local lua_util = require "lua_util" local rspamd_logger = require "rspamd_logger" - if not dir then dir = rspamd_paths.LUALIBDIR end - if filename:sub(1, 1) ~= package.config:sub(1,1) then + if not dir then + dir = rspamd_paths.LUALIBDIR + end + if filename:sub(1, 1) ~= package.config:sub(1, 1) then -- Relative path filename = lua_util.join_path(dir, "redis_scripts", filename) end @@ -1318,11 +1333,10 @@ local function exec_redis_script(id, params, callback, keys, args) local redis_args = {} if not redis_scripts[id] then - logger.errx("cannot find registered script with id %s", id) + logger.errx("cannot find registered script with id %s", id) return false end - local script = redis_scripts[id] if script.fatal_error then @@ -1363,7 +1377,7 @@ local function exec_redis_script(id, params, callback, keys, args) if #redis_args == 0 then table.insert(redis_args, script.sha) table.insert(redis_args, tostring(#keys)) - for _,k in ipairs(keys) do + for _, k in ipairs(keys) do table.insert(redis_args, k) end @@ -1376,13 +1390,13 @@ local function exec_redis_script(id, params, callback, keys, args) if params.task then if not rspamd_redis_make_request(params.task, script.redis_params, - params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then + params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then callback('Cannot make redis request', nil) end else if not redis_make_request_taskless(params.ev_base, rspamd_config, - script.redis_params, - params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then + script.redis_params, + params.key, params.is_write, redis_cb, 'EVALSHA', redis_args) then callback('Cannot make redis request', nil) end end @@ -1414,7 +1428,7 @@ exports.exec_redis_script = exec_redis_script local function redis_connect_sync(redis_params, is_write, key, cfg, ev_base) if not redis_params then - return false,nil + return false, nil end local rspamd_redis = require "rspamd_redis" @@ -1446,45 +1460,45 @@ local function redis_connect_sync(redis_params, is_write, key, cfg, ev_base) session = redis_params.session or rspamadm_session } - for k,v in pairs(redis_params) do + for k, v in pairs(redis_params) do options[k] = v end if not options.config then logger.errx('config is not set') - return false,nil,addr + return false, nil, addr end if not options.ev_base then logger.errx('ev_base is not set') - return false,nil,addr + return false, nil, addr end if not options.session then logger.errx('session is not set') - return false,nil,addr + return false, nil, addr end - local ret,conn = rspamd_redis.connect_sync(options) + local ret, conn = rspamd_redis.connect_sync(options) if not ret then logger.errx('cannot create redis connection: %s', conn) addr:fail() - return false,nil,addr + return false, nil, addr end if conn then local need_exec = false if redis_params['password'] then - conn:add_cmd('AUTH', {redis_params['password']}) + conn:add_cmd('AUTH', { redis_params['password'] }) need_exec = true end if redis_params['db'] then - conn:add_cmd('SELECT', {tostring(redis_params['db'])}) + conn:add_cmd('SELECT', { tostring(redis_params['db']) }) need_exec = true elseif redis_params['dbname'] then - conn:add_cmd('SELECT', {tostring(redis_params['dbname'])}) + conn:add_cmd('SELECT', { tostring(redis_params['dbname']) }) need_exec = true end @@ -1495,12 +1509,12 @@ local function redis_connect_sync(redis_params, is_write, key, cfg, ev_base) logger.errx('cannot prepare redis connection (authentication or db selection failure): %s', res) addr:fail() - return false,nil,addr + return false, nil, addr end end end - return ret,conn,addr + return ret, conn, addr end exports.redis_connect_sync = redis_connect_sync @@ -1520,12 +1534,12 @@ exports.request = function(redis_params, attrs, req) if not attrs or not redis_params or not req then logger.errx('invalid arguments for redis request') - return false,nil,nil + return false, nil, nil end if not (attrs.task or (attrs.config and attrs.ev_base)) then logger.errx('invalid attributes for redis request') - return false,nil,nil + return false, nil, nil end local opts = lua_util.shallowcopy(attrs) @@ -1593,16 +1607,16 @@ exports.request = function(redis_params, attrs, req) opts.timeout, opts.cmd, opts.args) if opts.callback then - local ret,conn = rspamd_redis.make_request(opts) + local ret, conn = rspamd_redis.make_request(opts) if not ret then logger.errx(log_obj, 'cannot execute redis request') addr:fail() end - return ret,conn,addr + return ret, conn, addr else -- Coroutines version - local ret,conn = rspamd_redis.connect_sync(opts) + local ret, conn = rspamd_redis.connect_sync(opts) if not ret then logger.errx(log_obj, 'cannot execute redis request') addr:fail() @@ -1610,7 +1624,7 @@ exports.request = function(redis_params, attrs, req) conn:add_cmd(opts.cmd, opts.args) return conn:exec() end - return false,nil,addr + return false, nil, addr end end @@ -1627,12 +1641,12 @@ exports.connect = function(redis_params, attrs) if not attrs or not redis_params then logger.errx('invalid arguments for redis connect') - return false,nil,nil + return false, nil, nil end if not (attrs.task or (attrs.config and attrs.ev_base)) then logger.errx('invalid attributes for redis connect') - return false,nil,nil + return false, nil, nil end local opts = lua_util.shallowcopy(attrs) @@ -1688,24 +1702,24 @@ exports.connect = function(redis_params, attrs) end if opts.callback then - local ret,conn = rspamd_redis.connect(opts) + local ret, conn = rspamd_redis.connect(opts) if not ret then logger.errx(log_obj, 'cannot execute redis connect') addr:fail() end - return ret,conn,addr + return ret, conn, addr else -- Coroutines version - local ret,conn = rspamd_redis.connect_sync(opts) + local ret, conn = rspamd_redis.connect_sync(opts) if not ret then logger.errx(log_obj, 'cannot execute redis connect') addr:fail() else - return true,conn,addr + return true, conn, addr end - return false,nil,addr + return false, nil, addr end end @@ -1726,7 +1740,7 @@ local function register_prefix(prefix, module, description, optional) } if optional and type(optional) == 'table' then - for k,v in pairs(optional) do + for k, v in pairs(optional) do pr[k] = v end end @@ -1746,7 +1760,9 @@ exports.prefixes = function(mname) else local fun = require "fun" - return fun.totable(fun.filter(function(_, data) return data.module == mname end, + return fun.totable(fun.filter(function(_, data) + return data.module == mname + end, redis_prefixes)) end end |