You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lua_redis.lua 6.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. local logger = require "rspamd_logger"
  2. local exports = {}
  3. -- This function parses redis server definition using either
  4. -- specific server string for this module or global
  5. -- redis section
  6. local function rspamd_parse_redis_server(module_name, module_opts, no_fallback)
  7. local result = {}
  8. local default_port = 6379
  9. local default_timeout = 1.0
  10. local upstream_list = require "rspamd_upstream_list"
  11. local function try_load_redis_servers(options)
  12. -- Try to get read servers:
  13. local upstreams_read, upstreams_write
  14. if options['read_servers'] then
  15. upstreams_read = upstream_list.create(rspamd_config,
  16. options['read_servers'], default_port)
  17. elseif options['servers'] then
  18. upstreams_read = upstream_list.create(rspamd_config,
  19. options['servers'], default_port)
  20. elseif options['server'] then
  21. upstreams_read = upstream_list.create(rspamd_config,
  22. options['server'], default_port)
  23. end
  24. if upstreams_read then
  25. if options['write_servers'] then
  26. upstreams_write = upstream_list.create(rspamd_config,
  27. options['write_servers'], default_port)
  28. else
  29. upstreams_write = upstreams_read
  30. end
  31. end
  32. -- Store options
  33. if not result['timeout'] or result['timeout'] == default_timeout then
  34. if options['timeout'] then
  35. result['timeout'] = tonumber(options['timeout'])
  36. else
  37. result['timeout'] = default_timeout
  38. end
  39. end
  40. if options['prefix'] and not result['prefix'] then
  41. result['prefix'] = options['prefix']
  42. end
  43. if not result['db'] then
  44. if options['db'] then
  45. result['db'] = tostring(options['db'])
  46. elseif options['dbname'] then
  47. result['db'] = tostring(options['dbname'])
  48. end
  49. end
  50. if options['password'] and not result['password'] then
  51. result['password'] = options['password']
  52. end
  53. if upstreams_write and upstreams_read then
  54. result.read_servers = upstreams_read
  55. result.write_servers = upstreams_write
  56. return true
  57. end
  58. return false
  59. end
  60. -- Try local options
  61. local opts
  62. if not module_opts then
  63. opts = rspamd_config:get_all_opt(module_name)
  64. else
  65. opts = module_opts
  66. end
  67. local ret
  68. if opts then
  69. if opts.redis then
  70. ret = try_load_redis_servers(opts.redis, result)
  71. if ret then
  72. return result
  73. end
  74. end
  75. ret = try_load_redis_servers(opts, result)
  76. if ret then
  77. return result
  78. end
  79. end
  80. if no_fallback then return nil end
  81. -- Try global options
  82. opts = rspamd_config:get_all_opt('redis')
  83. if opts then
  84. if opts[module_name] then
  85. ret = try_load_redis_servers(opts[module_name], result)
  86. if ret then
  87. return result
  88. end
  89. else
  90. ret = try_load_redis_servers(opts, result)
  91. -- Exclude disabled
  92. if opts['disabled_modules'] then
  93. for _,v in ipairs(opts['disabled_modules']) do
  94. if v == module_name then
  95. logger.infox(rspamd_config, "NOT using default redis server for module %s: it is disabled",
  96. module_name)
  97. return nil
  98. end
  99. end
  100. end
  101. if ret then
  102. logger.infox(rspamd_config, "using default redis server for module %s",
  103. module_name)
  104. end
  105. end
  106. end
  107. if result.read_servers then
  108. return result
  109. else
  110. return nil
  111. end
  112. end
  113. exports.rspamd_parse_redis_server = rspamd_parse_redis_server
  114. exports.parse_redis_server = rspamd_parse_redis_server
  115. -- Performs async call to redis hiding all complexity inside function
  116. -- task - rspamd_task
  117. -- redis_params - valid params returned by rspamd_parse_redis_server
  118. -- key - key to select upstream or nil to select round-robin/master-slave
  119. -- is_write - true if need to write to redis server
  120. -- callback - function to be called upon request is completed
  121. -- command - redis command
  122. -- args - table of arguments
  123. local function rspamd_redis_make_request(task, redis_params, key, is_write, callback, command, args)
  124. local addr
  125. local function rspamd_redis_make_request_cb(err, data)
  126. if err then
  127. addr:fail()
  128. else
  129. addr:ok()
  130. end
  131. callback(err, data, addr)
  132. end
  133. if not task or not redis_params or not callback or not command then
  134. return false,nil,nil
  135. end
  136. local rspamd_redis = require "rspamd_redis"
  137. if key then
  138. if is_write then
  139. addr = redis_params['write_servers']:get_upstream_by_hash(key)
  140. else
  141. addr = redis_params['read_servers']:get_upstream_by_hash(key)
  142. end
  143. else
  144. if is_write then
  145. addr = redis_params['write_servers']:get_upstream_master_slave(key)
  146. else
  147. addr = redis_params['read_servers']:get_upstream_round_robin(key)
  148. end
  149. end
  150. if not addr then
  151. logger.errx(task, 'cannot select server to make redis request')
  152. end
  153. local options = {
  154. task = task,
  155. callback = rspamd_redis_make_request_cb,
  156. host = addr:get_addr(),
  157. timeout = redis_params['timeout'],
  158. cmd = command,
  159. args = args
  160. }
  161. if redis_params['password'] then
  162. options['password'] = redis_params['password']
  163. end
  164. if redis_params['db'] then
  165. options['dbname'] = redis_params['db']
  166. end
  167. local ret,conn = rspamd_redis.make_request(options)
  168. return ret,conn,addr
  169. end
  170. exports.rspamd_redis_make_request = rspamd_redis_make_request
  171. exports.redis_make_request = rspamd_redis_make_request
  172. local function redis_make_request_taskless(ev_base, cfg, redis_params, key, is_write, callback, command, args)
  173. if not ev_base or not redis_params or not callback or not command then
  174. return false,nil,nil
  175. end
  176. local addr
  177. local rspamd_redis = require "rspamd_redis"
  178. if key then
  179. if is_write then
  180. addr = redis_params['write_servers']:get_upstream_by_hash(key)
  181. else
  182. addr = redis_params['read_servers']:get_upstream_by_hash(key)
  183. end
  184. else
  185. if is_write then
  186. addr = redis_params['write_servers']:get_upstream_master_slave(key)
  187. else
  188. addr = redis_params['read_servers']:get_upstream_round_robin(key)
  189. end
  190. end
  191. if not addr then
  192. logger.errx(cfg, 'cannot select server to make redis request')
  193. end
  194. local options = {
  195. ev_base = ev_base,
  196. config = cfg,
  197. callback = callback,
  198. host = addr:get_addr(),
  199. timeout = redis_params['timeout'],
  200. cmd = command,
  201. args = args
  202. }
  203. if redis_params['password'] then
  204. options['password'] = redis_params['password']
  205. end
  206. if redis_params['db'] then
  207. options['dbname'] = redis_params['db']
  208. end
  209. local ret,conn = rspamd_redis.make_request(options)
  210. if not ret then
  211. logger.errx('cannot execute redis request')
  212. end
  213. return ret,conn,addr
  214. end
  215. exports.rspamd_redis_make_request_taskless = redis_make_request_taskless
  216. exports.redis_make_request_taskless = redis_make_request_taskless
  217. return exports