]> source.dussan.org Git - rspamd.git/commitdiff
[Minor] Add common methods to find a primary controller
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 26 Feb 2018 16:04:58 +0000 (16:04 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 26 Feb 2018 16:04:58 +0000 (16:04 +0000)
src/libserver/worker_util.c
src/libserver/worker_util.h
src/lua/lua_common.c
src/plugins/lua/bayes_expiry.lua
src/plugins/lua/clickhouse.lua
src/plugins/lua/dmarc.lua
src/plugins/lua/dynamic_conf.lua
src/plugins/lua/elastic.lua
src/plugins/lua/fann_redis.lua
src/plugins/lua/fuzzy_collect.lua
src/plugins/lua/metric_exporter.lua

index f6bc78aaa767dd542f88ccc0567a1a96f50c15c8..8b4ca402717b805baa6994fb3f301d1be10bc77f 100644 (file)
@@ -751,6 +751,17 @@ rspamd_worker_is_scanner (struct rspamd_worker *w)
        return FALSE;
 }
 
+gboolean
+rspamd_worker_is_primary_controller (struct rspamd_worker *w)
+{
+
+       if (w) {
+               return !!(w->flags & RSPAMD_WORKER_CONTROLLER) && w->index == 0;
+       }
+
+       return FALSE;
+}
+
 struct rspamd_worker_session_elt {
        void *ptr;
        guint *pref;
index 179907ee4012151f5fc68c6f89e85f80e9e86c79..2e3fd44580f0b9d3fbf70ec1d4795536a40eaa1b 100644 (file)
@@ -143,12 +143,19 @@ void rspamd_worker_unblock_signals (void);
 void rspamd_hard_terminate (struct rspamd_main *rspamd_main) G_GNUC_NORETURN;
 
 /**
- * Returns TRUE if a specific worker is normal worker
+ * Returns TRUE if a specific worker is a scanner worker
  * @param w
  * @return
  */
 gboolean rspamd_worker_is_scanner (struct rspamd_worker *w);
 
+/**
+ * Returns TRUE if a specific worker is a primary controller
+ * @param w
+ * @return
+ */
+gboolean rspamd_worker_is_primary_controller (struct rspamd_worker *w);
+
 /**
  * Creates new session cache
  * @param w
index f61fa6d0bdabb8955a64a6feac594435cf053d8e..942f5a9b6690ed94f887cb0893a2a5b15f29ba80 100644 (file)
@@ -38,6 +38,7 @@ LUA_FUNCTION_DEF (worker, get_stat);
 LUA_FUNCTION_DEF (worker, get_index);
 LUA_FUNCTION_DEF (worker, get_pid);
 LUA_FUNCTION_DEF (worker, is_scanner);
+LUA_FUNCTION_DEF (worker, is_primary_controller);
 LUA_FUNCTION_DEF (worker, spawn_process);
 
 const luaL_reg worker_reg[] = {
@@ -47,6 +48,7 @@ const luaL_reg worker_reg[] = {
        LUA_INTERFACE_DEF (worker, get_pid),
        LUA_INTERFACE_DEF (worker, spawn_process),
        LUA_INTERFACE_DEF (worker, is_scanner),
+       LUA_INTERFACE_DEF (worker, is_primary_controller),
        {"__tostring", rspamd_lua_class_tostring},
        {NULL, NULL}
 };
@@ -1449,6 +1451,21 @@ lua_worker_is_scanner (lua_State *L)
        return 1;
 }
 
+static gint
+lua_worker_is_primary_controller (lua_State *L)
+{
+       struct rspamd_worker *w = lua_check_worker (L, 1);
+
+       if (w) {
+               lua_pushboolean (L, rspamd_worker_is_primary_controller (w));
+       }
+       else {
+               return luaL_error (L, "invalid arguments");
+       }
+
+       return 1;
+}
+
 struct rspamd_lua_process_cbdata {
        gint sp[2];
        gint func_cbref;
index af955465dfed92a1869284b49f9d98073e0ecc37..97b7ca18040029e112851cc49f6b7b715cf7352e 100644 (file)
@@ -213,7 +213,7 @@ end
 
 rspamd_config:add_on_load(function (_, ev_base, worker)
   -- Exit unless we're the first 'controller' worker
-  if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end
+  if not worker:is_primary_controller() then return end
 
   local unique_redis_params = {}
   -- Push redis script to all unique redis servers
index aae6f20b7e016842a12a54fee46f5917a0db5e97..2336ae2fc0496bdbcda8f98516577d6daa7e1190 100644 (file)
@@ -686,37 +686,39 @@ if opts then
       end)
       -- Create tables on load
       rspamd_config:add_on_load(function(cfg, ev_base, worker)
-        -- XXX: need to call this script for all upstreams
-        local upstream = settings.upstream:get_upstream_round_robin()
-        local ip_addr = upstream:get_addr():to_string(true)
-
-        local function http_cb(err_message, code, _, _)
-          if code ~= 200 or err_message then
-            rspamd_logger.errx(rspamd_config, "cannot create table in clickhouse server %s: %s",
-              ip_addr, err_message)
-            upstream:fail()
-          else
-            upstream:ok()
+        if worker:is_primary_controller() then
+          -- XXX: need to call this script for all upstreams
+          local upstream = settings.upstream:get_upstream_round_robin()
+          local ip_addr = upstream:get_addr():to_string(true)
+
+          local function http_cb(err_message, code, _, _)
+            if code ~= 200 or err_message then
+              rspamd_logger.errx(rspamd_config, "cannot create table in clickhouse server %s: %s",
+                  ip_addr, err_message)
+              upstream:fail()
+            else
+              upstream:ok()
+            end
           end
-        end
 
-        local function send_req(elt, sql)
-          if not rspamd_http.request({
-            ev_base = ev_base,
-            config = cfg,
-            url = connect_prefix .. ip_addr,
-            body = sql,
-            callback = http_cb,
-            mime_type = 'text/plain',
-            timeout = settings['timeout'],
-          }) then
-            rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request",
-              elt, settings['server'])
+          local function send_req(elt, sql)
+            if not rspamd_http.request({
+              ev_base = ev_base,
+              config = cfg,
+              url = connect_prefix .. ip_addr,
+              body = sql,
+              callback = http_cb,
+              mime_type = 'text/plain',
+              timeout = settings['timeout'],
+            }) then
+              rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request",
+                  elt, settings['server'])
+            end
           end
-        end
 
-        for tab,sql in pairs(clickhouse_schema) do
-          send_req(tab, sql)
+          for tab,sql in pairs(clickhouse_schema) do
+            send_req(tab, sql)
+          end
         end
       end)
     end
index 4a5a5be19334f2e50446b72832595f56f8fa98f2..8ab390257f34294a5eb57e69b88c8d8677d72501 100644 (file)
@@ -306,7 +306,7 @@ local function dmarc_callback(task)
 
     for _,r in ipairs(results) do
       if failed_policy then break end
-      (function()
+      local function try()
         local elts = dmarc_grammar:match(r)
         if not elts then
           return
@@ -381,7 +381,8 @@ local function dmarc_callback(task)
             rua = elts['rua']
           end
         end
-      end)()
+      end
+      try()
     end
 
     if not found_policy then
@@ -642,7 +643,7 @@ if opts['reporting'] == true then
     end
     rspamd_config:add_on_load(function(cfg, ev_base, worker)
       load_scripts(cfg, ev_base)
-      if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end
+      if not worker:is_primary_controller() then return end
       local rresolver = rspamd_resolver.init(ev_base, rspamd_config)
       rspamd_config:register_finish_script(function ()
         local stamp = pool:get_variable(VAR_NAME, 'double')
index a4b7527a52a61573ee1e04e4dba5398a323c9ad7..744b8b6f82988db55794e9ea3ae173d4fcc141e0 100644 (file)
@@ -242,12 +242,14 @@ if section then
     settings[k] = v
   end
 
-  rspamd_config:add_on_load(function(_, ev_base)
-    rspamd_config:add_periodic(ev_base, 0.0,
-    function(cfg, _ev_base)
-      check_dynamic_conf(cfg, _ev_base)
-      return settings.redis_watch_interval
-    end, true)
+  rspamd_config:add_on_load(function(_, ev_base, worker)
+    if worker:is_scanner() then
+      rspamd_config:add_periodic(ev_base, 0.0,
+          function(cfg, _ev_base)
+            check_dynamic_conf(cfg, _ev_base)
+            return settings.redis_watch_interval
+          end, true)
+    end
   end)
 end
 
index aed3a87c7b1acde91f5dc596224e43f35a6da037..c5919bf7fbc87535b1a39a806b64655621559965 100644 (file)
@@ -373,8 +373,10 @@ if redis_params and opts then
     })
 
     rspamd_config:add_on_load(function(cfg, ev_base,worker)
-      check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements
-      initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations
+      if worker:is_scanner() then
+        check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements
+        initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations
+      end
     end)
   end
 
index 828060240605c161676f9437df4647d148093e39..117881b31219be1331b91767dc783dd8fea0107d 100644 (file)
@@ -1036,7 +1036,7 @@ else
     rspamd_config:add_on_load(function(cfg, ev_base, worker)
       check_fanns(rule, cfg, ev_base)
 
-      if worker:get_name() == 'controller' and worker:get_index() == 0 then
+      if worker:is_primary_controller() then
         -- We also want to train neural nets when they have enough data
         rspamd_config:add_periodic(ev_base, 0.0,
           function(_, _)
index b5dae4e20e1f917f1b2e2b6dbf33af2f8da19414..57a4a2fa6c509bd66ba404d26516ba4310b20820 100644 (file)
@@ -178,7 +178,7 @@ if opts and type(opts) == 'table' then
 
   if sane_config then
     rspamd_config:add_on_load(function(_, ev_base, worker)
-      if worker:get_name() == 'controller' and worker:get_index() == 0 then
+      if worker:is_primary_controller() then
         rspamd_config:add_periodic(ev_base, 0.0,
           function(_cfg, _ev_base)
             return collect_fuzzy_hashes(_cfg, _ev_base)
index efb50d586fea23f58955b43164dfe56f5613baa9..ee435b23b687f531fffff165dd47d75b4c618372 100644 (file)
@@ -174,7 +174,7 @@ end
 
 rspamd_config:add_on_load(function (_, ev_base, worker)
   -- Exit unless we're the first 'controller' worker
-  if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end
+  if not worker:is_primary_controller() then return end
   -- Persist mempool variable to statefile on shutdown
   rspamd_config:register_finish_script(function ()
     local stamp = pool:get_variable(VAR_NAME, 'double')