summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2017-05-18 16:40:24 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2017-05-18 16:40:48 +0100
commit8e4777724dd6119782bee59e558f0c92493be6a0 (patch)
treed48a6f4d7d35b2eb075a9a3e5f56e10b25033ec0
parent37f250e80e9e9aee06e4a333b4f8a0ab6d8a3aba (diff)
downloadrspamd-8e4777724dd6119782bee59e558f0c92493be6a0.tar.gz
rspamd-8e4777724dd6119782bee59e558f0c92493be6a0.zip
[Rework] Continue modularisation for lua library
-rw-r--r--lualib/global_functions.lua56
-rw-r--r--src/plugins/lua/dmarc.lua64
-rw-r--r--src/plugins/lua/fann_redis.lua98
3 files changed, 87 insertions, 131 deletions
diff --git a/lualib/global_functions.lua b/lualib/global_functions.lua
index a557fa557..29b8f7328 100644
--- a/lualib/global_functions.lua
+++ b/lualib/global_functions.lua
@@ -118,6 +118,7 @@ local function rspamd_parse_redis_server(module_name)
end
exports.rspamd_parse_redis_server = rspamd_parse_redis_server
+exports.parse_redis_server = rspamd_parse_redis_server
-- Performs async call to redis hiding all complexity inside function
-- task - rspamd_task
@@ -183,6 +184,61 @@ local function rspamd_redis_make_request(task, redis_params, key, is_write, call
end
exports.rspamd_redis_make_request = rspamd_redis_make_request
+exports.redis_make_request = rspamd_redis_make_request
+
+local function redis_make_request_taskless(ev_base, cfg, key, is_write, callback, command, args)
+ if not ev_base or not redis_params or not callback or not command then
+ return false,nil,nil
+ end
+
+ local addr
+ local rspamd_redis = require "rspamd_redis"
+
+ if key then
+ if is_write then
+ addr = redis_params['write_servers']:get_upstream_by_hash(key)
+ else
+ addr = redis_params['read_servers']:get_upstream_by_hash(key)
+ end
+ else
+ if is_write then
+ addr = redis_params['write_servers']:get_upstream_master_slave(key)
+ else
+ addr = redis_params['read_servers']:get_upstream_round_robin(key)
+ end
+ end
+
+ if not addr then
+ rspamd_logger.errx(cfg, 'cannot select server to make redis request')
+ end
+
+ local options = {
+ ev_base = ev_base,
+ config = cfg,
+ callback = callback,
+ host = addr:get_addr(),
+ timeout = redis_params['timeout'],
+ cmd = command,
+ args = args
+ }
+
+ if redis_params['password'] then
+ options['password'] = redis_params['password']
+ end
+
+ if redis_params['db'] then
+ options['dbname'] = redis_params['db']
+ end
+
+ local ret,conn = rspamd_redis.make_request(options)
+ if not ret then
+ rspamd_logger.errx('cannot execute redis request')
+ end
+ return ret,conn,addr
+end
+
+exports.rspamd_redis_make_request_taskless = redis_make_request_taskless
+exports.redis_make_request_taskless = redis_make_request_taskless
local split_grammar = {}
function rspamd_str_split(s, sep)
diff --git a/src/plugins/lua/dmarc.lua b/src/plugins/lua/dmarc.lua
index 377ecc30f..0aba39462 100644
--- a/src/plugins/lua/dmarc.lua
+++ b/src/plugins/lua/dmarc.lua
@@ -22,6 +22,7 @@ local mempool = require "rspamd_mempool"
local rspamd_tcp = require "rspamd_tcp"
local rspamd_url = require "rspamd_url"
local rspamd_util = require "rspamd_util"
+local globals = require "global_functions"
local check_local = false
local check_authed = false
@@ -152,57 +153,6 @@ end
local tz_offset = get_timezone_offset(os.time())
-local function redis_make_request(ev_base, cfg, key, is_write, callback, command, args)
- if not ev_base or not redis_params or not callback or not command then
- return false,nil,nil
- end
-
- local addr
- local rspamd_redis = require "rspamd_redis"
-
- if key then
- if is_write then
- addr = redis_params['write_servers']:get_upstream_by_hash(key)
- else
- addr = redis_params['read_servers']:get_upstream_by_hash(key)
- end
- else
- if is_write then
- addr = redis_params['write_servers']:get_upstream_master_slave(key)
- else
- addr = redis_params['read_servers']:get_upstream_round_robin(key)
- end
- end
-
- if not addr then
- rspamd_logger.errx(cfg, 'cannot select server to make redis request')
- end
-
- local options = {
- ev_base = ev_base,
- config = cfg,
- callback = callback,
- host = addr:get_addr(),
- timeout = redis_params['timeout'],
- cmd = command,
- args = args
- }
-
- if redis_params['password'] then
- options['password'] = redis_params['password']
- end
-
- if redis_params['db'] then
- options['dbname'] = redis_params['db']
- end
-
- local ret,conn = rspamd_redis.make_request(options)
- if not ret then
- rspamd_logger.errx(cfg, 'cannot execute redis request')
- end
- return ret,conn,addr
-end
-
local function load_scripts(cfg, ev_base)
local function redis_report_script_cb(err, data)
if err then
@@ -212,7 +162,7 @@ local function load_scripts(cfg, ev_base)
rspamd_logger.infox(cfg, 'Loaded DMARC report script with SHA %s', take_report_sha)
end
end
- local ret = redis_make_request(ev_base,
+ local ret = globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -581,7 +531,7 @@ local function dmarc_callback(task)
local idx_key = table.concat({redis_keys.index_prefix, period}, redis_keys.join_char)
if report_data then
- local ret = rspamd_redis_make_request(task,
+ local ret = globals.redis_make_request(task,
redis_params, -- connect params
hfromdom, -- hash key
true, -- is write
@@ -923,7 +873,7 @@ if opts['reporting'] == true then
dmarc_xml('push', {data[2][i], data[2][i+1]})
end
if cursor ~= 0 then
- local ret = redis_make_request(ev_base,
+ local ret = globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
false, -- is write
@@ -940,7 +890,7 @@ if opts['reporting'] == true then
end
end
end
- local ret = redis_make_request(ev_base,
+ local ret = globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
false, -- is write
@@ -962,7 +912,7 @@ if opts['reporting'] == true then
rspamd_logger.infox(rspamd_config, 'Deleted reports for %s', reporting_domain)
get_reporting_domain()
end
- local ret = redis_make_request(ev_base,
+ local ret = globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
false, -- is write
@@ -1128,7 +1078,7 @@ if opts['reporting'] == true then
end
end
local idx_key = table.concat({redis_keys.index_prefix, want_period}, redis_keys.join_char)
- local ret = redis_make_request(ev_base,
+ local ret = globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
false, -- is write
diff --git a/src/plugins/lua/fann_redis.lua b/src/plugins/lua/fann_redis.lua
index 528d6535e..76da1175b 100644
--- a/src/plugins/lua/fann_redis.lua
+++ b/src/plugins/lua/fann_redis.lua
@@ -26,6 +26,7 @@ local rspamd_fann = require "rspamd_fann"
local rspamd_util = require "rspamd_util"
local fann_symbol_spam = 'FANNR_SPAM'
local fann_symbol_ham = 'FANNR_HAM'
+local globals = require "global_functions"
local fun = require "fun"
local module_log_id = 0x200
@@ -174,57 +175,6 @@ local lock_expire = 600
local learning_spawned = false
local ann_expire = 60 * 60 * 24 * 2 -- 2 days
-local function redis_make_request(ev_base, cfg, key, is_write, callback, command, args)
- if not ev_base or not redis_params or not callback or not command then
- return false,nil,nil
- end
-
- local addr
- local rspamd_redis = require "rspamd_redis"
-
- if key then
- if is_write then
- addr = redis_params['write_servers']:get_upstream_by_hash(key)
- else
- addr = redis_params['read_servers']:get_upstream_by_hash(key)
- end
- else
- if is_write then
- addr = redis_params['write_servers']:get_upstream_master_slave(key)
- else
- addr = redis_params['read_servers']:get_upstream_round_robin(key)
- end
- end
-
- if not addr then
- rspamd_logger.errx(cfg, 'cannot select server to make redis request')
- end
-
- local options = {
- ev_base = ev_base,
- config = cfg,
- callback = callback,
- host = addr:get_addr(),
- timeout = redis_params['timeout'],
- cmd = command,
- args = args
- }
-
- if redis_params['password'] then
- options['password'] = redis_params['password']
- end
-
- if redis_params['db'] then
- options['dbname'] = redis_params['db']
- end
-
- local ret,conn = rspamd_redis.make_request(options)
- if not ret then
- rspamd_logger.errx('cannot execute redis request')
- end
- return ret,conn,addr
-end
-
local function load_scripts(cfg, ev_base, on_load_cb)
local function can_train_sha_cb(err, data)
if err or not data or type(data) ~= 'string' then
@@ -233,7 +183,7 @@ local function load_scripts(cfg, ev_base, on_load_cb)
redis_can_train_sha = tostring(data)
end
end
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -256,7 +206,7 @@ local function load_scripts(cfg, ev_base, on_load_cb)
end
end
end
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -272,7 +222,7 @@ local function load_scripts(cfg, ev_base, on_load_cb)
redis_maybe_invalidate_sha = tostring(data)
end
end
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -288,7 +238,7 @@ local function load_scripts(cfg, ev_base, on_load_cb)
redis_locked_invalidate_sha = tostring(data)
end
end
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -304,7 +254,7 @@ local function load_scripts(cfg, ev_base, on_load_cb)
redis_maybe_lock_sha = tostring(data)
end
end
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -320,7 +270,7 @@ local function load_scripts(cfg, ev_base, on_load_cb)
redis_save_unlock_sha = tostring(data)
end
end
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -477,7 +427,7 @@ local function load_or_invalidate_fann(data, id, ev_base)
end
-- Invalidate ANN
rspamd_logger.infox(rspamd_config, 'invalidate ANN %s', prefix)
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -524,7 +474,7 @@ local function fann_train_callback(score, required_score, results, _, id, opts,
fun.each(function(e) table.insert(learn_data, e) end, extra)
local str = rspamd_util.zstd_compress(table.concat(learn_data, ';'))
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -542,7 +492,7 @@ local function fann_train_callback(score, required_score, results, _, id, opts,
end
end
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -570,7 +520,7 @@ local function train_fann(_, ev_base, elt)
if err then
rspamd_logger.errx(rspamd_config, 'cannot save ANN %s to redis: %s',
prefix, err)
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
false, -- is write
@@ -589,7 +539,7 @@ local function train_fann(_, ev_base, elt)
if errcode ~= 0 then
rspamd_logger.errx(rspamd_config, 'cannot train ANN %s: %s',
prefix, errmsg)
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -604,7 +554,7 @@ local function train_fann(_, ev_base, elt)
fanns[elt].version = fanns[elt].version + 1
fanns[elt].fann = fanns[elt].fann_train
fanns[elt].fann_train = nil
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -619,7 +569,7 @@ local function train_fann(_, ev_base, elt)
if err or type(data) ~= 'table' then
rspamd_logger.errx(rspamd_config, 'cannot get ham tokens for ANN %s from redis: %s',
prefix, err)
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -669,7 +619,7 @@ local function train_fann(_, ev_base, elt)
end
-- Invalidate ANN
rspamd_logger.infox(rspamd_config, 'invalidate ANN %s: training data is invalid', prefix)
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -690,7 +640,7 @@ local function train_fann(_, ev_base, elt)
if err or type(data) ~= 'table' then
rspamd_logger.errx(rspamd_config, 'cannot get spam tokens for ANN %s from redis: %s',
prefix, err)
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -704,7 +654,7 @@ local function train_fann(_, ev_base, elt)
local _,str = rspamd_util.zstd_decompress(tok)
return fun.totable(fun.map(tonumber, rspamd_str_split(tostring(str), ';')))
end, data))
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
false, -- is write
@@ -724,7 +674,7 @@ local function train_fann(_, ev_base, elt)
end
elseif type(data) == 'number' then
-- Can train ANN
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
false, -- is write
@@ -745,7 +695,7 @@ local function train_fann(_, ev_base, elt)
end
end
if learning_spawned then
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -769,7 +719,7 @@ local function train_fann(_, ev_base, elt)
rspamd_logger.infox(rspamd_config, 'do not learn ANN %s, already learning another ANN', prefix)
return
end
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
true, -- is write
@@ -800,7 +750,7 @@ local function maybe_train_fanns(cfg, ev_base)
end
end
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
false, -- is write
@@ -818,7 +768,7 @@ local function maybe_train_fanns(cfg, ev_base)
return 1.0
end
-- First we need to get all fanns stored in our Redis
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
false, -- is write
@@ -854,7 +804,7 @@ local function check_fanns(_, ev_base)
local_ver = fanns[elt].version
end
end
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
false, -- is write
@@ -872,7 +822,7 @@ local function check_fanns(_, ev_base)
return 1.0
end
-- First we need to get all fanns stored in our Redis
- redis_make_request(ev_base,
+ globals.redis_make_request_taskless(ev_base,
rspamd_config,
nil,
false, -- is write