return FALSE;
}
+gboolean
+rspamd_worker_is_primary_controller (struct rspamd_worker *w)
+{
+
+ if (w) {
+ return !!(w->flags & RSPAMD_WORKER_CONTROLLER) && w->index == 0;
+ }
+
+ return FALSE;
+}
+
struct rspamd_worker_session_elt {
void *ptr;
guint *pref;
void rspamd_hard_terminate (struct rspamd_main *rspamd_main) G_GNUC_NORETURN;
/**
- * Returns TRUE if a specific worker is normal worker
+ * Returns TRUE if a specific worker is a scanner worker
* @param w
* @return
*/
gboolean rspamd_worker_is_scanner (struct rspamd_worker *w);
+/**
+ * Returns TRUE if a specific worker is a primary controller
+ * @param w
+ * @return
+ */
+gboolean rspamd_worker_is_primary_controller (struct rspamd_worker *w);
+
/**
* Creates new session cache
* @param w
LUA_FUNCTION_DEF (worker, get_index);
LUA_FUNCTION_DEF (worker, get_pid);
LUA_FUNCTION_DEF (worker, is_scanner);
+LUA_FUNCTION_DEF (worker, is_primary_controller);
LUA_FUNCTION_DEF (worker, spawn_process);
const luaL_reg worker_reg[] = {
LUA_INTERFACE_DEF (worker, get_pid),
LUA_INTERFACE_DEF (worker, spawn_process),
LUA_INTERFACE_DEF (worker, is_scanner),
+ LUA_INTERFACE_DEF (worker, is_primary_controller),
{"__tostring", rspamd_lua_class_tostring},
{NULL, NULL}
};
return 1;
}
+static gint
+lua_worker_is_primary_controller (lua_State *L)
+{
+ struct rspamd_worker *w = lua_check_worker (L, 1);
+
+ if (w) {
+ lua_pushboolean (L, rspamd_worker_is_primary_controller (w));
+ }
+ else {
+ return luaL_error (L, "invalid arguments");
+ }
+
+ return 1;
+}
+
struct rspamd_lua_process_cbdata {
gint sp[2];
gint func_cbref;
rspamd_config:add_on_load(function (_, ev_base, worker)
-- Exit unless we're the first 'controller' worker
- if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end
+ if not worker:is_primary_controller() then return end
local unique_redis_params = {}
-- Push redis script to all unique redis servers
end)
-- Create tables on load
rspamd_config:add_on_load(function(cfg, ev_base, worker)
- -- XXX: need to call this script for all upstreams
- local upstream = settings.upstream:get_upstream_round_robin()
- local ip_addr = upstream:get_addr():to_string(true)
-
- local function http_cb(err_message, code, _, _)
- if code ~= 200 or err_message then
- rspamd_logger.errx(rspamd_config, "cannot create table in clickhouse server %s: %s",
- ip_addr, err_message)
- upstream:fail()
- else
- upstream:ok()
+ if worker:is_primary_controller() then
+ -- XXX: need to call this script for all upstreams
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local ip_addr = upstream:get_addr():to_string(true)
+
+ local function http_cb(err_message, code, _, _)
+ if code ~= 200 or err_message then
+ rspamd_logger.errx(rspamd_config, "cannot create table in clickhouse server %s: %s",
+ ip_addr, err_message)
+ upstream:fail()
+ else
+ upstream:ok()
+ end
end
- end
- local function send_req(elt, sql)
- if not rspamd_http.request({
- ev_base = ev_base,
- config = cfg,
- url = connect_prefix .. ip_addr,
- body = sql,
- callback = http_cb,
- mime_type = 'text/plain',
- timeout = settings['timeout'],
- }) then
- rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request",
- elt, settings['server'])
+ local function send_req(elt, sql)
+ if not rspamd_http.request({
+ ev_base = ev_base,
+ config = cfg,
+ url = connect_prefix .. ip_addr,
+ body = sql,
+ callback = http_cb,
+ mime_type = 'text/plain',
+ timeout = settings['timeout'],
+ }) then
+ rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request",
+ elt, settings['server'])
+ end
end
- end
- for tab,sql in pairs(clickhouse_schema) do
- send_req(tab, sql)
+ for tab,sql in pairs(clickhouse_schema) do
+ send_req(tab, sql)
+ end
end
end)
end
for _,r in ipairs(results) do
if failed_policy then break end
- (function()
+ local function try()
local elts = dmarc_grammar:match(r)
if not elts then
return
rua = elts['rua']
end
end
- end)()
+ end
+ try()
end
if not found_policy then
end
rspamd_config:add_on_load(function(cfg, ev_base, worker)
load_scripts(cfg, ev_base)
- if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end
+ if not worker:is_primary_controller() then return end
local rresolver = rspamd_resolver.init(ev_base, rspamd_config)
rspamd_config:register_finish_script(function ()
local stamp = pool:get_variable(VAR_NAME, 'double')
settings[k] = v
end
- rspamd_config:add_on_load(function(_, ev_base)
- rspamd_config:add_periodic(ev_base, 0.0,
- function(cfg, _ev_base)
- check_dynamic_conf(cfg, _ev_base)
- return settings.redis_watch_interval
- end, true)
+ rspamd_config:add_on_load(function(_, ev_base, worker)
+ if worker:is_scanner() then
+ rspamd_config:add_periodic(ev_base, 0.0,
+ function(cfg, _ev_base)
+ check_dynamic_conf(cfg, _ev_base)
+ return settings.redis_watch_interval
+ end, true)
+ end
end)
end
})
rspamd_config:add_on_load(function(cfg, ev_base,worker)
- check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements
- initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations
+ if worker:is_scanner() then
+ check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements
+ initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations
+ end
end)
end
rspamd_config:add_on_load(function(cfg, ev_base, worker)
check_fanns(rule, cfg, ev_base)
- if worker:get_name() == 'controller' and worker:get_index() == 0 then
+ if worker:is_primary_controller() then
-- We also want to train neural nets when they have enough data
rspamd_config:add_periodic(ev_base, 0.0,
function(_, _)
if sane_config then
rspamd_config:add_on_load(function(_, ev_base, worker)
- if worker:get_name() == 'controller' and worker:get_index() == 0 then
+ if worker:is_primary_controller() then
rspamd_config:add_periodic(ev_base, 0.0,
function(_cfg, _ev_base)
return collect_fuzzy_hashes(_cfg, _ev_base)
rspamd_config:add_on_load(function (_, ev_base, worker)
-- Exit unless we're the first 'controller' worker
- if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end
+ if not worker:is_primary_controller() then return end
-- Persist mempool variable to statefile on shutdown
rspamd_config:register_finish_script(function ()
local stamp = pool:get_variable(VAR_NAME, 'double')