From 714ebee786d239c68aff8356b199eca9d14b8794 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 26 Feb 2018 16:04:58 +0000 Subject: [PATCH] [Minor] Add common methods to find a primary controller --- src/libserver/worker_util.c | 11 ++++++ src/libserver/worker_util.h | 9 ++++- src/lua/lua_common.c | 17 +++++++++ src/plugins/lua/bayes_expiry.lua | 2 +- src/plugins/lua/clickhouse.lua | 56 +++++++++++++++-------------- src/plugins/lua/dmarc.lua | 7 ++-- src/plugins/lua/dynamic_conf.lua | 14 ++++---- src/plugins/lua/elastic.lua | 6 ++-- src/plugins/lua/fann_redis.lua | 2 +- src/plugins/lua/fuzzy_collect.lua | 2 +- src/plugins/lua/metric_exporter.lua | 2 +- 11 files changed, 85 insertions(+), 43 deletions(-) diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index f6bc78aaa..8b4ca4027 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -751,6 +751,17 @@ rspamd_worker_is_scanner (struct rspamd_worker *w) return FALSE; } +gboolean +rspamd_worker_is_primary_controller (struct rspamd_worker *w) +{ + + if (w) { + return !!(w->flags & RSPAMD_WORKER_CONTROLLER) && w->index == 0; + } + + return FALSE; +} + struct rspamd_worker_session_elt { void *ptr; guint *pref; diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h index 179907ee4..2e3fd4458 100644 --- a/src/libserver/worker_util.h +++ b/src/libserver/worker_util.h @@ -143,12 +143,19 @@ void rspamd_worker_unblock_signals (void); void rspamd_hard_terminate (struct rspamd_main *rspamd_main) G_GNUC_NORETURN; /** - * Returns TRUE if a specific worker is normal worker + * Returns TRUE if a specific worker is a scanner worker * @param w * @return */ gboolean rspamd_worker_is_scanner (struct rspamd_worker *w); +/** + * Returns TRUE if a specific worker is a primary controller + * @param w + * @return + */ +gboolean rspamd_worker_is_primary_controller (struct rspamd_worker *w); + /** * Creates new session cache * @param w diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c index f61fa6d0b..942f5a9b6 100644 --- a/src/lua/lua_common.c +++ b/src/lua/lua_common.c @@ -38,6 +38,7 @@ LUA_FUNCTION_DEF (worker, get_stat); LUA_FUNCTION_DEF (worker, get_index); LUA_FUNCTION_DEF (worker, get_pid); LUA_FUNCTION_DEF (worker, is_scanner); +LUA_FUNCTION_DEF (worker, is_primary_controller); LUA_FUNCTION_DEF (worker, spawn_process); const luaL_reg worker_reg[] = { @@ -47,6 +48,7 @@ const luaL_reg worker_reg[] = { LUA_INTERFACE_DEF (worker, get_pid), LUA_INTERFACE_DEF (worker, spawn_process), LUA_INTERFACE_DEF (worker, is_scanner), + LUA_INTERFACE_DEF (worker, is_primary_controller), {"__tostring", rspamd_lua_class_tostring}, {NULL, NULL} }; @@ -1449,6 +1451,21 @@ lua_worker_is_scanner (lua_State *L) return 1; } +static gint +lua_worker_is_primary_controller (lua_State *L) +{ + struct rspamd_worker *w = lua_check_worker (L, 1); + + if (w) { + lua_pushboolean (L, rspamd_worker_is_primary_controller (w)); + } + else { + return luaL_error (L, "invalid arguments"); + } + + return 1; +} + struct rspamd_lua_process_cbdata { gint sp[2]; gint func_cbref; diff --git a/src/plugins/lua/bayes_expiry.lua b/src/plugins/lua/bayes_expiry.lua index af955465d..97b7ca180 100644 --- a/src/plugins/lua/bayes_expiry.lua +++ b/src/plugins/lua/bayes_expiry.lua @@ -213,7 +213,7 @@ end rspamd_config:add_on_load(function (_, ev_base, worker) -- Exit unless we're the first 'controller' worker - if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end + if not worker:is_primary_controller() then return end local unique_redis_params = {} -- Push redis script to all unique redis servers diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index aae6f20b7..2336ae2fc 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -686,37 +686,39 @@ if opts then end) -- Create tables on load rspamd_config:add_on_load(function(cfg, ev_base, worker) - -- XXX: need to call this script for all upstreams - local upstream = settings.upstream:get_upstream_round_robin() - local ip_addr = upstream:get_addr():to_string(true) - - local function http_cb(err_message, code, _, _) - if code ~= 200 or err_message then - rspamd_logger.errx(rspamd_config, "cannot create table in clickhouse server %s: %s", - ip_addr, err_message) - upstream:fail() - else - upstream:ok() + if worker:is_primary_controller() then + -- XXX: need to call this script for all upstreams + local upstream = settings.upstream:get_upstream_round_robin() + local ip_addr = upstream:get_addr():to_string(true) + + local function http_cb(err_message, code, _, _) + if code ~= 200 or err_message then + rspamd_logger.errx(rspamd_config, "cannot create table in clickhouse server %s: %s", + ip_addr, err_message) + upstream:fail() + else + upstream:ok() + end end - end - local function send_req(elt, sql) - if not rspamd_http.request({ - ev_base = ev_base, - config = cfg, - url = connect_prefix .. ip_addr, - body = sql, - callback = http_cb, - mime_type = 'text/plain', - timeout = settings['timeout'], - }) then - rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request", - elt, settings['server']) + local function send_req(elt, sql) + if not rspamd_http.request({ + ev_base = ev_base, + config = cfg, + url = connect_prefix .. ip_addr, + body = sql, + callback = http_cb, + mime_type = 'text/plain', + timeout = settings['timeout'], + }) then + rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request", + elt, settings['server']) + end end - end - for tab,sql in pairs(clickhouse_schema) do - send_req(tab, sql) + for tab,sql in pairs(clickhouse_schema) do + send_req(tab, sql) + end end end) end diff --git a/src/plugins/lua/dmarc.lua b/src/plugins/lua/dmarc.lua index 4a5a5be19..8ab390257 100644 --- a/src/plugins/lua/dmarc.lua +++ b/src/plugins/lua/dmarc.lua @@ -306,7 +306,7 @@ local function dmarc_callback(task) for _,r in ipairs(results) do if failed_policy then break end - (function() + local function try() local elts = dmarc_grammar:match(r) if not elts then return @@ -381,7 +381,8 @@ local function dmarc_callback(task) rua = elts['rua'] end end - end)() + end + try() end if not found_policy then @@ -642,7 +643,7 @@ if opts['reporting'] == true then end rspamd_config:add_on_load(function(cfg, ev_base, worker) load_scripts(cfg, ev_base) - if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end + if not worker:is_primary_controller() then return end local rresolver = rspamd_resolver.init(ev_base, rspamd_config) rspamd_config:register_finish_script(function () local stamp = pool:get_variable(VAR_NAME, 'double') diff --git a/src/plugins/lua/dynamic_conf.lua b/src/plugins/lua/dynamic_conf.lua index a4b7527a5..744b8b6f8 100644 --- a/src/plugins/lua/dynamic_conf.lua +++ b/src/plugins/lua/dynamic_conf.lua @@ -242,12 +242,14 @@ if section then settings[k] = v end - rspamd_config:add_on_load(function(_, ev_base) - rspamd_config:add_periodic(ev_base, 0.0, - function(cfg, _ev_base) - check_dynamic_conf(cfg, _ev_base) - return settings.redis_watch_interval - end, true) + rspamd_config:add_on_load(function(_, ev_base, worker) + if worker:is_scanner() then + rspamd_config:add_periodic(ev_base, 0.0, + function(cfg, _ev_base) + check_dynamic_conf(cfg, _ev_base) + return settings.redis_watch_interval + end, true) + end end) end diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index aed3a87c7..c5919bf7f 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -373,8 +373,10 @@ if redis_params and opts then }) rspamd_config:add_on_load(function(cfg, ev_base,worker) - check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements - initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations + if worker:is_scanner() then + check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements + initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations + end end) end diff --git a/src/plugins/lua/fann_redis.lua b/src/plugins/lua/fann_redis.lua index 828060240..117881b31 100644 --- a/src/plugins/lua/fann_redis.lua +++ b/src/plugins/lua/fann_redis.lua @@ -1036,7 +1036,7 @@ else rspamd_config:add_on_load(function(cfg, ev_base, worker) check_fanns(rule, cfg, ev_base) - if worker:get_name() == 'controller' and worker:get_index() == 0 then + if worker:is_primary_controller() then -- We also want to train neural nets when they have enough data rspamd_config:add_periodic(ev_base, 0.0, function(_, _) diff --git a/src/plugins/lua/fuzzy_collect.lua b/src/plugins/lua/fuzzy_collect.lua index b5dae4e20..57a4a2fa6 100644 --- a/src/plugins/lua/fuzzy_collect.lua +++ b/src/plugins/lua/fuzzy_collect.lua @@ -178,7 +178,7 @@ if opts and type(opts) == 'table' then if sane_config then rspamd_config:add_on_load(function(_, ev_base, worker) - if worker:get_name() == 'controller' and worker:get_index() == 0 then + if worker:is_primary_controller() then rspamd_config:add_periodic(ev_base, 0.0, function(_cfg, _ev_base) return collect_fuzzy_hashes(_cfg, _ev_base) diff --git a/src/plugins/lua/metric_exporter.lua b/src/plugins/lua/metric_exporter.lua index efb50d586..ee435b23b 100644 --- a/src/plugins/lua/metric_exporter.lua +++ b/src/plugins/lua/metric_exporter.lua @@ -174,7 +174,7 @@ end rspamd_config:add_on_load(function (_, ev_base, worker) -- Exit unless we're the first 'controller' worker - if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end + if not worker:is_primary_controller() then return end -- Persist mempool variable to statefile on shutdown rspamd_config:register_finish_script(function () local stamp = pool:get_variable(VAR_NAME, 'double') -- 2.39.5