@@ -751,6 +751,17 @@ rspamd_worker_is_scanner (struct rspamd_worker *w) | |||
return FALSE; | |||
} | |||
gboolean | |||
rspamd_worker_is_primary_controller (struct rspamd_worker *w) | |||
{ | |||
if (w) { | |||
return !!(w->flags & RSPAMD_WORKER_CONTROLLER) && w->index == 0; | |||
} | |||
return FALSE; | |||
} | |||
struct rspamd_worker_session_elt { | |||
void *ptr; | |||
guint *pref; |
@@ -143,12 +143,19 @@ void rspamd_worker_unblock_signals (void); | |||
void rspamd_hard_terminate (struct rspamd_main *rspamd_main) G_GNUC_NORETURN; | |||
/** | |||
* Returns TRUE if a specific worker is normal worker | |||
* Returns TRUE if a specific worker is a scanner worker | |||
* @param w | |||
* @return | |||
*/ | |||
gboolean rspamd_worker_is_scanner (struct rspamd_worker *w); | |||
/** | |||
* Returns TRUE if a specific worker is a primary controller | |||
* @param w | |||
* @return | |||
*/ | |||
gboolean rspamd_worker_is_primary_controller (struct rspamd_worker *w); | |||
/** | |||
* Creates new session cache | |||
* @param w |
@@ -38,6 +38,7 @@ LUA_FUNCTION_DEF (worker, get_stat); | |||
LUA_FUNCTION_DEF (worker, get_index); | |||
LUA_FUNCTION_DEF (worker, get_pid); | |||
LUA_FUNCTION_DEF (worker, is_scanner); | |||
LUA_FUNCTION_DEF (worker, is_primary_controller); | |||
LUA_FUNCTION_DEF (worker, spawn_process); | |||
const luaL_reg worker_reg[] = { | |||
@@ -47,6 +48,7 @@ const luaL_reg worker_reg[] = { | |||
LUA_INTERFACE_DEF (worker, get_pid), | |||
LUA_INTERFACE_DEF (worker, spawn_process), | |||
LUA_INTERFACE_DEF (worker, is_scanner), | |||
LUA_INTERFACE_DEF (worker, is_primary_controller), | |||
{"__tostring", rspamd_lua_class_tostring}, | |||
{NULL, NULL} | |||
}; | |||
@@ -1449,6 +1451,21 @@ lua_worker_is_scanner (lua_State *L) | |||
return 1; | |||
} | |||
static gint | |||
lua_worker_is_primary_controller (lua_State *L) | |||
{ | |||
struct rspamd_worker *w = lua_check_worker (L, 1); | |||
if (w) { | |||
lua_pushboolean (L, rspamd_worker_is_primary_controller (w)); | |||
} | |||
else { | |||
return luaL_error (L, "invalid arguments"); | |||
} | |||
return 1; | |||
} | |||
struct rspamd_lua_process_cbdata { | |||
gint sp[2]; | |||
gint func_cbref; |
@@ -213,7 +213,7 @@ end | |||
rspamd_config:add_on_load(function (_, ev_base, worker) | |||
-- Exit unless we're the first 'controller' worker | |||
if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end | |||
if not worker:is_primary_controller() then return end | |||
local unique_redis_params = {} | |||
-- Push redis script to all unique redis servers |
@@ -686,37 +686,39 @@ if opts then | |||
end) | |||
-- Create tables on load | |||
rspamd_config:add_on_load(function(cfg, ev_base, worker) | |||
-- XXX: need to call this script for all upstreams | |||
local upstream = settings.upstream:get_upstream_round_robin() | |||
local ip_addr = upstream:get_addr():to_string(true) | |||
local function http_cb(err_message, code, _, _) | |||
if code ~= 200 or err_message then | |||
rspamd_logger.errx(rspamd_config, "cannot create table in clickhouse server %s: %s", | |||
ip_addr, err_message) | |||
upstream:fail() | |||
else | |||
upstream:ok() | |||
if worker:is_primary_controller() then | |||
-- XXX: need to call this script for all upstreams | |||
local upstream = settings.upstream:get_upstream_round_robin() | |||
local ip_addr = upstream:get_addr():to_string(true) | |||
local function http_cb(err_message, code, _, _) | |||
if code ~= 200 or err_message then | |||
rspamd_logger.errx(rspamd_config, "cannot create table in clickhouse server %s: %s", | |||
ip_addr, err_message) | |||
upstream:fail() | |||
else | |||
upstream:ok() | |||
end | |||
end | |||
end | |||
local function send_req(elt, sql) | |||
if not rspamd_http.request({ | |||
ev_base = ev_base, | |||
config = cfg, | |||
url = connect_prefix .. ip_addr, | |||
body = sql, | |||
callback = http_cb, | |||
mime_type = 'text/plain', | |||
timeout = settings['timeout'], | |||
}) then | |||
rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request", | |||
elt, settings['server']) | |||
local function send_req(elt, sql) | |||
if not rspamd_http.request({ | |||
ev_base = ev_base, | |||
config = cfg, | |||
url = connect_prefix .. ip_addr, | |||
body = sql, | |||
callback = http_cb, | |||
mime_type = 'text/plain', | |||
timeout = settings['timeout'], | |||
}) then | |||
rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request", | |||
elt, settings['server']) | |||
end | |||
end | |||
end | |||
for tab,sql in pairs(clickhouse_schema) do | |||
send_req(tab, sql) | |||
for tab,sql in pairs(clickhouse_schema) do | |||
send_req(tab, sql) | |||
end | |||
end | |||
end) | |||
end |
@@ -306,7 +306,7 @@ local function dmarc_callback(task) | |||
for _,r in ipairs(results) do | |||
if failed_policy then break end | |||
(function() | |||
local function try() | |||
local elts = dmarc_grammar:match(r) | |||
if not elts then | |||
return | |||
@@ -381,7 +381,8 @@ local function dmarc_callback(task) | |||
rua = elts['rua'] | |||
end | |||
end | |||
end)() | |||
end | |||
try() | |||
end | |||
if not found_policy then | |||
@@ -642,7 +643,7 @@ if opts['reporting'] == true then | |||
end | |||
rspamd_config:add_on_load(function(cfg, ev_base, worker) | |||
load_scripts(cfg, ev_base) | |||
if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end | |||
if not worker:is_primary_controller() then return end | |||
local rresolver = rspamd_resolver.init(ev_base, rspamd_config) | |||
rspamd_config:register_finish_script(function () | |||
local stamp = pool:get_variable(VAR_NAME, 'double') |
@@ -242,12 +242,14 @@ if section then | |||
settings[k] = v | |||
end | |||
rspamd_config:add_on_load(function(_, ev_base) | |||
rspamd_config:add_periodic(ev_base, 0.0, | |||
function(cfg, _ev_base) | |||
check_dynamic_conf(cfg, _ev_base) | |||
return settings.redis_watch_interval | |||
end, true) | |||
rspamd_config:add_on_load(function(_, ev_base, worker) | |||
if worker:is_scanner() then | |||
rspamd_config:add_periodic(ev_base, 0.0, | |||
function(cfg, _ev_base) | |||
check_dynamic_conf(cfg, _ev_base) | |||
return settings.redis_watch_interval | |||
end, true) | |||
end | |||
end) | |||
end | |||
@@ -373,8 +373,10 @@ if redis_params and opts then | |||
}) | |||
rspamd_config:add_on_load(function(cfg, ev_base,worker) | |||
check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements | |||
initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations | |||
if worker:is_scanner() then | |||
check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements | |||
initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations | |||
end | |||
end) | |||
end | |||
@@ -1036,7 +1036,7 @@ else | |||
rspamd_config:add_on_load(function(cfg, ev_base, worker) | |||
check_fanns(rule, cfg, ev_base) | |||
if worker:get_name() == 'controller' and worker:get_index() == 0 then | |||
if worker:is_primary_controller() then | |||
-- We also want to train neural nets when they have enough data | |||
rspamd_config:add_periodic(ev_base, 0.0, | |||
function(_, _) |
@@ -178,7 +178,7 @@ if opts and type(opts) == 'table' then | |||
if sane_config then | |||
rspamd_config:add_on_load(function(_, ev_base, worker) | |||
if worker:get_name() == 'controller' and worker:get_index() == 0 then | |||
if worker:is_primary_controller() then | |||
rspamd_config:add_periodic(ev_base, 0.0, | |||
function(_cfg, _ev_base) | |||
return collect_fuzzy_hashes(_cfg, _ev_base) |
@@ -174,7 +174,7 @@ end | |||
rspamd_config:add_on_load(function (_, ev_base, worker) | |||
-- Exit unless we're the first 'controller' worker | |||
if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end | |||
if not worker:is_primary_controller() then return end | |||
-- Persist mempool variable to statefile on shutdown | |||
rspamd_config:register_finish_script(function () | |||
local stamp = pool:get_variable(VAR_NAME, 'double') |