aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/libserver/worker_util.c11
-rw-r--r--src/libserver/worker_util.h9
-rw-r--r--src/lua/lua_common.c17
-rw-r--r--src/plugins/lua/bayes_expiry.lua2
-rw-r--r--src/plugins/lua/clickhouse.lua56
-rw-r--r--src/plugins/lua/dmarc.lua7
-rw-r--r--src/plugins/lua/dynamic_conf.lua14
-rw-r--r--src/plugins/lua/elastic.lua6
-rw-r--r--src/plugins/lua/fann_redis.lua2
-rw-r--r--src/plugins/lua/fuzzy_collect.lua2
-rw-r--r--src/plugins/lua/metric_exporter.lua2
11 files changed, 85 insertions, 43 deletions
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index f6bc78aaa..8b4ca4027 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -751,6 +751,17 @@ rspamd_worker_is_scanner (struct rspamd_worker *w)
return FALSE;
}
+gboolean
+rspamd_worker_is_primary_controller (struct rspamd_worker *w)
+{
+
+ if (w) {
+ return !!(w->flags & RSPAMD_WORKER_CONTROLLER) && w->index == 0;
+ }
+
+ return FALSE;
+}
+
struct rspamd_worker_session_elt {
void *ptr;
guint *pref;
diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h
index 179907ee4..2e3fd4458 100644
--- a/src/libserver/worker_util.h
+++ b/src/libserver/worker_util.h
@@ -143,13 +143,20 @@ void rspamd_worker_unblock_signals (void);
void rspamd_hard_terminate (struct rspamd_main *rspamd_main) G_GNUC_NORETURN;
/**
- * Returns TRUE if a specific worker is normal worker
+ * Returns TRUE if a specific worker is a scanner worker
* @param w
* @return
*/
gboolean rspamd_worker_is_scanner (struct rspamd_worker *w);
/**
+ * Returns TRUE if a specific worker is a primary controller
+ * @param w
+ * @return
+ */
+gboolean rspamd_worker_is_primary_controller (struct rspamd_worker *w);
+
+/**
* Creates new session cache
* @param w
* @return
diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c
index f61fa6d0b..942f5a9b6 100644
--- a/src/lua/lua_common.c
+++ b/src/lua/lua_common.c
@@ -38,6 +38,7 @@ LUA_FUNCTION_DEF (worker, get_stat);
LUA_FUNCTION_DEF (worker, get_index);
LUA_FUNCTION_DEF (worker, get_pid);
LUA_FUNCTION_DEF (worker, is_scanner);
+LUA_FUNCTION_DEF (worker, is_primary_controller);
LUA_FUNCTION_DEF (worker, spawn_process);
const luaL_reg worker_reg[] = {
@@ -47,6 +48,7 @@ const luaL_reg worker_reg[] = {
LUA_INTERFACE_DEF (worker, get_pid),
LUA_INTERFACE_DEF (worker, spawn_process),
LUA_INTERFACE_DEF (worker, is_scanner),
+ LUA_INTERFACE_DEF (worker, is_primary_controller),
{"__tostring", rspamd_lua_class_tostring},
{NULL, NULL}
};
@@ -1449,6 +1451,21 @@ lua_worker_is_scanner (lua_State *L)
return 1;
}
+static gint
+lua_worker_is_primary_controller (lua_State *L)
+{
+ struct rspamd_worker *w = lua_check_worker (L, 1);
+
+ if (w) {
+ lua_pushboolean (L, rspamd_worker_is_primary_controller (w));
+ }
+ else {
+ return luaL_error (L, "invalid arguments");
+ }
+
+ return 1;
+}
+
struct rspamd_lua_process_cbdata {
gint sp[2];
gint func_cbref;
diff --git a/src/plugins/lua/bayes_expiry.lua b/src/plugins/lua/bayes_expiry.lua
index af955465d..97b7ca180 100644
--- a/src/plugins/lua/bayes_expiry.lua
+++ b/src/plugins/lua/bayes_expiry.lua
@@ -213,7 +213,7 @@ end
rspamd_config:add_on_load(function (_, ev_base, worker)
-- Exit unless we're the first 'controller' worker
- if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end
+ if not worker:is_primary_controller() then return end
local unique_redis_params = {}
-- Push redis script to all unique redis servers
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
index aae6f20b7..2336ae2fc 100644
--- a/src/plugins/lua/clickhouse.lua
+++ b/src/plugins/lua/clickhouse.lua
@@ -686,37 +686,39 @@ if opts then
end)
-- Create tables on load
rspamd_config:add_on_load(function(cfg, ev_base, worker)
- -- XXX: need to call this script for all upstreams
- local upstream = settings.upstream:get_upstream_round_robin()
- local ip_addr = upstream:get_addr():to_string(true)
-
- local function http_cb(err_message, code, _, _)
- if code ~= 200 or err_message then
- rspamd_logger.errx(rspamd_config, "cannot create table in clickhouse server %s: %s",
- ip_addr, err_message)
- upstream:fail()
- else
- upstream:ok()
+ if worker:is_primary_controller() then
+ -- XXX: need to call this script for all upstreams
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local ip_addr = upstream:get_addr():to_string(true)
+
+ local function http_cb(err_message, code, _, _)
+ if code ~= 200 or err_message then
+ rspamd_logger.errx(rspamd_config, "cannot create table in clickhouse server %s: %s",
+ ip_addr, err_message)
+ upstream:fail()
+ else
+ upstream:ok()
+ end
end
- end
- local function send_req(elt, sql)
- if not rspamd_http.request({
- ev_base = ev_base,
- config = cfg,
- url = connect_prefix .. ip_addr,
- body = sql,
- callback = http_cb,
- mime_type = 'text/plain',
- timeout = settings['timeout'],
- }) then
- rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request",
- elt, settings['server'])
+ local function send_req(elt, sql)
+ if not rspamd_http.request({
+ ev_base = ev_base,
+ config = cfg,
+ url = connect_prefix .. ip_addr,
+ body = sql,
+ callback = http_cb,
+ mime_type = 'text/plain',
+ timeout = settings['timeout'],
+ }) then
+ rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request",
+ elt, settings['server'])
+ end
end
- end
- for tab,sql in pairs(clickhouse_schema) do
- send_req(tab, sql)
+ for tab,sql in pairs(clickhouse_schema) do
+ send_req(tab, sql)
+ end
end
end)
end
diff --git a/src/plugins/lua/dmarc.lua b/src/plugins/lua/dmarc.lua
index 4a5a5be19..8ab390257 100644
--- a/src/plugins/lua/dmarc.lua
+++ b/src/plugins/lua/dmarc.lua
@@ -306,7 +306,7 @@ local function dmarc_callback(task)
for _,r in ipairs(results) do
if failed_policy then break end
- (function()
+ local function try()
local elts = dmarc_grammar:match(r)
if not elts then
return
@@ -381,7 +381,8 @@ local function dmarc_callback(task)
rua = elts['rua']
end
end
- end)()
+ end
+ try()
end
if not found_policy then
@@ -642,7 +643,7 @@ if opts['reporting'] == true then
end
rspamd_config:add_on_load(function(cfg, ev_base, worker)
load_scripts(cfg, ev_base)
- if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end
+ if not worker:is_primary_controller() then return end
local rresolver = rspamd_resolver.init(ev_base, rspamd_config)
rspamd_config:register_finish_script(function ()
local stamp = pool:get_variable(VAR_NAME, 'double')
diff --git a/src/plugins/lua/dynamic_conf.lua b/src/plugins/lua/dynamic_conf.lua
index a4b7527a5..744b8b6f8 100644
--- a/src/plugins/lua/dynamic_conf.lua
+++ b/src/plugins/lua/dynamic_conf.lua
@@ -242,12 +242,14 @@ if section then
settings[k] = v
end
- rspamd_config:add_on_load(function(_, ev_base)
- rspamd_config:add_periodic(ev_base, 0.0,
- function(cfg, _ev_base)
- check_dynamic_conf(cfg, _ev_base)
- return settings.redis_watch_interval
- end, true)
+ rspamd_config:add_on_load(function(_, ev_base, worker)
+ if worker:is_scanner() then
+ rspamd_config:add_periodic(ev_base, 0.0,
+ function(cfg, _ev_base)
+ check_dynamic_conf(cfg, _ev_base)
+ return settings.redis_watch_interval
+ end, true)
+ end
end)
end
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua
index aed3a87c7..c5919bf7f 100644
--- a/src/plugins/lua/elastic.lua
+++ b/src/plugins/lua/elastic.lua
@@ -373,8 +373,10 @@ if redis_params and opts then
})
rspamd_config:add_on_load(function(cfg, ev_base,worker)
- check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements
- initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations
+ if worker:is_scanner() then
+ check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements
+ initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations
+ end
end)
end
diff --git a/src/plugins/lua/fann_redis.lua b/src/plugins/lua/fann_redis.lua
index 828060240..117881b31 100644
--- a/src/plugins/lua/fann_redis.lua
+++ b/src/plugins/lua/fann_redis.lua
@@ -1036,7 +1036,7 @@ else
rspamd_config:add_on_load(function(cfg, ev_base, worker)
check_fanns(rule, cfg, ev_base)
- if worker:get_name() == 'controller' and worker:get_index() == 0 then
+ if worker:is_primary_controller() then
-- We also want to train neural nets when they have enough data
rspamd_config:add_periodic(ev_base, 0.0,
function(_, _)
diff --git a/src/plugins/lua/fuzzy_collect.lua b/src/plugins/lua/fuzzy_collect.lua
index b5dae4e20..57a4a2fa6 100644
--- a/src/plugins/lua/fuzzy_collect.lua
+++ b/src/plugins/lua/fuzzy_collect.lua
@@ -178,7 +178,7 @@ if opts and type(opts) == 'table' then
if sane_config then
rspamd_config:add_on_load(function(_, ev_base, worker)
- if worker:get_name() == 'controller' and worker:get_index() == 0 then
+ if worker:is_primary_controller() then
rspamd_config:add_periodic(ev_base, 0.0,
function(_cfg, _ev_base)
return collect_fuzzy_hashes(_cfg, _ev_base)
diff --git a/src/plugins/lua/metric_exporter.lua b/src/plugins/lua/metric_exporter.lua
index efb50d586..ee435b23b 100644
--- a/src/plugins/lua/metric_exporter.lua
+++ b/src/plugins/lua/metric_exporter.lua
@@ -174,7 +174,7 @@ end
rspamd_config:add_on_load(function (_, ev_base, worker)
-- Exit unless we're the first 'controller' worker
- if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end
+ if not worker:is_primary_controller() then return end
-- Persist mempool variable to statefile on shutdown
rspamd_config:register_finish_script(function ()
local stamp = pool:get_variable(VAR_NAME, 'double')