From 8e4777724dd6119782bee59e558f0c92493be6a0 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 18 May 2017 16:40:24 +0100 Subject: [Rework] Continue modularisation for lua library --- lualib/global_functions.lua | 56 ++++++++++++++++++++++++ src/plugins/lua/dmarc.lua | 64 +++------------------------ src/plugins/lua/fann_redis.lua | 98 +++++++++++------------------------------- 3 files changed, 87 insertions(+), 131 deletions(-) diff --git a/lualib/global_functions.lua b/lualib/global_functions.lua index a557fa557..29b8f7328 100644 --- a/lualib/global_functions.lua +++ b/lualib/global_functions.lua @@ -118,6 +118,7 @@ local function rspamd_parse_redis_server(module_name) end exports.rspamd_parse_redis_server = rspamd_parse_redis_server +exports.parse_redis_server = rspamd_parse_redis_server -- Performs async call to redis hiding all complexity inside function -- task - rspamd_task @@ -183,6 +184,61 @@ local function rspamd_redis_make_request(task, redis_params, key, is_write, call end exports.rspamd_redis_make_request = rspamd_redis_make_request +exports.redis_make_request = rspamd_redis_make_request + +local function redis_make_request_taskless(ev_base, cfg, key, is_write, callback, command, args) + if not ev_base or not redis_params or not callback or not command then + return false,nil,nil + end + + local addr + local rspamd_redis = require "rspamd_redis" + + if key then + if is_write then + addr = redis_params['write_servers']:get_upstream_by_hash(key) + else + addr = redis_params['read_servers']:get_upstream_by_hash(key) + end + else + if is_write then + addr = redis_params['write_servers']:get_upstream_master_slave(key) + else + addr = redis_params['read_servers']:get_upstream_round_robin(key) + end + end + + if not addr then + rspamd_logger.errx(cfg, 'cannot select server to make redis request') + end + + local options = { + ev_base = ev_base, + config = cfg, + callback = callback, + host = addr:get_addr(), + timeout = redis_params['timeout'], + cmd = command, + args = args + } + + if redis_params['password'] then + options['password'] = redis_params['password'] + end + + if redis_params['db'] then + options['dbname'] = redis_params['db'] + end + + local ret,conn = rspamd_redis.make_request(options) + if not ret then + rspamd_logger.errx('cannot execute redis request') + end + return ret,conn,addr +end + +exports.rspamd_redis_make_request_taskless = redis_make_request_taskless +exports.redis_make_request_taskless = redis_make_request_taskless local split_grammar = {} function rspamd_str_split(s, sep) diff --git a/src/plugins/lua/dmarc.lua b/src/plugins/lua/dmarc.lua index 377ecc30f..0aba39462 100644 --- a/src/plugins/lua/dmarc.lua +++ b/src/plugins/lua/dmarc.lua @@ -22,6 +22,7 @@ local mempool = require "rspamd_mempool" local rspamd_tcp = require "rspamd_tcp" local rspamd_url = require "rspamd_url" local rspamd_util = require "rspamd_util" +local globals = require "global_functions" local check_local = false local check_authed = false @@ -152,57 +153,6 @@ end local tz_offset = get_timezone_offset(os.time()) -local function redis_make_request(ev_base, cfg, key, is_write, callback, command, args) - if not ev_base or not redis_params or not callback or not command then - return false,nil,nil - end - - local addr - local rspamd_redis = require "rspamd_redis" - - if key then - if is_write then - addr = redis_params['write_servers']:get_upstream_by_hash(key) - else - addr = redis_params['read_servers']:get_upstream_by_hash(key) - end - else - if is_write then - addr = redis_params['write_servers']:get_upstream_master_slave(key) - else - addr = redis_params['read_servers']:get_upstream_round_robin(key) - end - end - - if not addr then - rspamd_logger.errx(cfg, 'cannot select server to make redis request') - end - - local options = { - ev_base = ev_base, - config = cfg, - callback = callback, - host = addr:get_addr(), - timeout = redis_params['timeout'], - cmd = command, - args = args - } - - if redis_params['password'] then - options['password'] = redis_params['password'] - end - - if redis_params['db'] then - options['dbname'] = redis_params['db'] - end - - local ret,conn = rspamd_redis.make_request(options) - if not ret then - rspamd_logger.errx(cfg, 'cannot execute redis request') - end - return ret,conn,addr -end - local function load_scripts(cfg, ev_base) local function redis_report_script_cb(err, data) if err then @@ -212,7 +162,7 @@ local function load_scripts(cfg, ev_base) rspamd_logger.infox(cfg, 'Loaded DMARC report script with SHA %s', take_report_sha) end end - local ret = redis_make_request(ev_base, + local ret = globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -581,7 +531,7 @@ local function dmarc_callback(task) local idx_key = table.concat({redis_keys.index_prefix, period}, redis_keys.join_char) if report_data then - local ret = rspamd_redis_make_request(task, + local ret = globals.redis_make_request(task, redis_params, -- connect params hfromdom, -- hash key true, -- is write @@ -923,7 +873,7 @@ if opts['reporting'] == true then dmarc_xml('push', {data[2][i], data[2][i+1]}) end if cursor ~= 0 then - local ret = redis_make_request(ev_base, + local ret = globals.redis_make_request_taskless(ev_base, rspamd_config, nil, false, -- is write @@ -940,7 +890,7 @@ if opts['reporting'] == true then end end end - local ret = redis_make_request(ev_base, + local ret = globals.redis_make_request_taskless(ev_base, rspamd_config, nil, false, -- is write @@ -962,7 +912,7 @@ if opts['reporting'] == true then rspamd_logger.infox(rspamd_config, 'Deleted reports for %s', reporting_domain) get_reporting_domain() end - local ret = redis_make_request(ev_base, + local ret = globals.redis_make_request_taskless(ev_base, rspamd_config, nil, false, -- is write @@ -1128,7 +1078,7 @@ if opts['reporting'] == true then end end local idx_key = table.concat({redis_keys.index_prefix, want_period}, redis_keys.join_char) - local ret = redis_make_request(ev_base, + local ret = globals.redis_make_request_taskless(ev_base, rspamd_config, nil, false, -- is write diff --git a/src/plugins/lua/fann_redis.lua b/src/plugins/lua/fann_redis.lua index 528d6535e..76da1175b 100644 --- a/src/plugins/lua/fann_redis.lua +++ b/src/plugins/lua/fann_redis.lua @@ -26,6 +26,7 @@ local rspamd_fann = require "rspamd_fann" local rspamd_util = require "rspamd_util" local fann_symbol_spam = 'FANNR_SPAM' local fann_symbol_ham = 'FANNR_HAM' +local globals = require "global_functions" local fun = require "fun" local module_log_id = 0x200 @@ -174,57 +175,6 @@ local lock_expire = 600 local learning_spawned = false local ann_expire = 60 * 60 * 24 * 2 -- 2 days -local function redis_make_request(ev_base, cfg, key, is_write, callback, command, args) - if not ev_base or not redis_params or not callback or not command then - return false,nil,nil - end - - local addr - local rspamd_redis = require "rspamd_redis" - - if key then - if is_write then - addr = redis_params['write_servers']:get_upstream_by_hash(key) - else - addr = redis_params['read_servers']:get_upstream_by_hash(key) - end - else - if is_write then - addr = redis_params['write_servers']:get_upstream_master_slave(key) - else - addr = redis_params['read_servers']:get_upstream_round_robin(key) - end - end - - if not addr then - rspamd_logger.errx(cfg, 'cannot select server to make redis request') - end - - local options = { - ev_base = ev_base, - config = cfg, - callback = callback, - host = addr:get_addr(), - timeout = redis_params['timeout'], - cmd = command, - args = args - } - - if redis_params['password'] then - options['password'] = redis_params['password'] - end - - if redis_params['db'] then - options['dbname'] = redis_params['db'] - end - - local ret,conn = rspamd_redis.make_request(options) - if not ret then - rspamd_logger.errx('cannot execute redis request') - end - return ret,conn,addr -end - local function load_scripts(cfg, ev_base, on_load_cb) local function can_train_sha_cb(err, data) if err or not data or type(data) ~= 'string' then @@ -233,7 +183,7 @@ local function load_scripts(cfg, ev_base, on_load_cb) redis_can_train_sha = tostring(data) end end - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -256,7 +206,7 @@ local function load_scripts(cfg, ev_base, on_load_cb) end end end - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -272,7 +222,7 @@ local function load_scripts(cfg, ev_base, on_load_cb) redis_maybe_invalidate_sha = tostring(data) end end - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -288,7 +238,7 @@ local function load_scripts(cfg, ev_base, on_load_cb) redis_locked_invalidate_sha = tostring(data) end end - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -304,7 +254,7 @@ local function load_scripts(cfg, ev_base, on_load_cb) redis_maybe_lock_sha = tostring(data) end end - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -320,7 +270,7 @@ local function load_scripts(cfg, ev_base, on_load_cb) redis_save_unlock_sha = tostring(data) end end - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -477,7 +427,7 @@ local function load_or_invalidate_fann(data, id, ev_base) end -- Invalidate ANN rspamd_logger.infox(rspamd_config, 'invalidate ANN %s', prefix) - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -524,7 +474,7 @@ local function fann_train_callback(score, required_score, results, _, id, opts, fun.each(function(e) table.insert(learn_data, e) end, extra) local str = rspamd_util.zstd_compress(table.concat(learn_data, ';')) - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -542,7 +492,7 @@ local function fann_train_callback(score, required_score, results, _, id, opts, end end - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -570,7 +520,7 @@ local function train_fann(_, ev_base, elt) if err then rspamd_logger.errx(rspamd_config, 'cannot save ANN %s to redis: %s', prefix, err) - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, false, -- is write @@ -589,7 +539,7 @@ local function train_fann(_, ev_base, elt) if errcode ~= 0 then rspamd_logger.errx(rspamd_config, 'cannot train ANN %s: %s', prefix, errmsg) - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -604,7 +554,7 @@ local function train_fann(_, ev_base, elt) fanns[elt].version = fanns[elt].version + 1 fanns[elt].fann = fanns[elt].fann_train fanns[elt].fann_train = nil - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -619,7 +569,7 @@ local function train_fann(_, ev_base, elt) if err or type(data) ~= 'table' then rspamd_logger.errx(rspamd_config, 'cannot get ham tokens for ANN %s from redis: %s', prefix, err) - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -669,7 +619,7 @@ local function train_fann(_, ev_base, elt) end -- Invalidate ANN rspamd_logger.infox(rspamd_config, 'invalidate ANN %s: training data is invalid', prefix) - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -690,7 +640,7 @@ local function train_fann(_, ev_base, elt) if err or type(data) ~= 'table' then rspamd_logger.errx(rspamd_config, 'cannot get spam tokens for ANN %s from redis: %s', prefix, err) - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -704,7 +654,7 @@ local function train_fann(_, ev_base, elt) local _,str = rspamd_util.zstd_decompress(tok) return fun.totable(fun.map(tonumber, rspamd_str_split(tostring(str), ';'))) end, data)) - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, false, -- is write @@ -724,7 +674,7 @@ local function train_fann(_, ev_base, elt) end elseif type(data) == 'number' then -- Can train ANN - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, false, -- is write @@ -745,7 +695,7 @@ local function train_fann(_, ev_base, elt) end end if learning_spawned then - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -769,7 +719,7 @@ local function train_fann(_, ev_base, elt) rspamd_logger.infox(rspamd_config, 'do not learn ANN %s, already learning another ANN', prefix) return end - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, true, -- is write @@ -800,7 +750,7 @@ local function maybe_train_fanns(cfg, ev_base) end end - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, false, -- is write @@ -818,7 +768,7 @@ local function maybe_train_fanns(cfg, ev_base) return 1.0 end -- First we need to get all fanns stored in our Redis - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, false, -- is write @@ -854,7 +804,7 @@ local function check_fanns(_, ev_base) local_ver = fanns[elt].version end end - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, false, -- is write @@ -872,7 +822,7 @@ local function check_fanns(_, ev_base) return 1.0 end -- First we need to get all fanns stored in our Redis - redis_make_request(ev_base, + globals.redis_make_request_taskless(ev_base, rspamd_config, nil, false, -- is write -- cgit v1.2.3