]> source.dussan.org Git - rspamd.git/commitdiff
[Rework] Replace controller functions by any scanner worker if needed
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 11 Nov 2019 13:57:56 +0000 (13:57 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 11 Nov 2019 15:30:30 +0000 (15:30 +0000)
conf/options.inc
src/controller.c
src/libserver/cfg_file.h
src/libserver/cfg_rcl.c
src/libserver/worker_util.c
src/libserver/worker_util.h
src/lua/lua_expression.c
src/rspamd_proxy.c
src/worker.c

index f600fdb9ceca9f852c4f569397b9c2db34f7713e..e635262e3d181bb6afd082d4b643bfe679ef4370 100644 (file)
@@ -57,6 +57,9 @@ words_decay = 600;
 # Write statistics about rspamd usage to the round-robin database
 rrd = "${DBDIR}/rspamd.rrd";
 
+# Write statistics for `rspamc` here
+stats_file = "${DBDIR}/stats.ucl";
+
 # Local networks
 local_addrs = [192.168.0.0/16, 10.0.0.0/8, 172.16.0.0/12, fd00::/8, 169.254.0.0/16, fe80::/10];
 hs_cache_dir = "${DBDIR}/";
index 7b6ecff4ac0b603f4e1fb34bedf6db4703ce8212..28e12a1c7ba8d5fabea81bb3ed9c289c543cabc2 100644 (file)
@@ -38,8 +38,6 @@
 /* 60 seconds for worker's IO */
 #define DEFAULT_WORKER_IO_TIMEOUT 60000
 
-#define DEFAULT_STATS_PATH RSPAMD_DBDIR "/stats.ucl"
-
 /* HTTP paths */
 #define PATH_AUTH "/auth"
 #define PATH_SYMBOLS "/symbols"
@@ -162,9 +160,6 @@ struct rspamd_controller_worker_ctx {
        /* Static files dir */
        gchar *static_files_dir;
 
-       /* Saved statistics path */
-       gchar *saved_stats_path;
-
        /* Custom commands registered by plugins */
        GHashTable *custom_commands;
 
@@ -3141,7 +3136,8 @@ rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents)
 }
 
 static void
-rspamd_controller_load_saved_stats (struct rspamd_controller_worker_ctx *ctx)
+rspamd_controller_load_saved_stats (struct rspamd_main *rspamd_main,
+               struct rspamd_config *cfg)
 {
        struct ucl_parser *parser;
        ucl_object_t *obj;
@@ -3149,19 +3145,21 @@ rspamd_controller_load_saved_stats (struct rspamd_controller_worker_ctx *ctx)
        struct rspamd_stat *stat, stat_copy;
        gint i;
 
-       g_assert (ctx->saved_stats_path != NULL);
+       if (cfg->stats_file == NULL) {
+               return;
+       }
 
-       if (access (ctx->saved_stats_path, R_OK) == -1) {
-               msg_err_ctx ("cannot load controller stats from %s: %s",
-                               ctx->saved_stats_path, strerror (errno));
+       if (access (cfg->stats_file, R_OK) == -1) {
+               msg_err_config ("cannot load controller stats from %s: %s",
+                               cfg->stats_file, strerror (errno));
                return;
        }
 
        parser = ucl_parser_new (0);
 
-       if (!ucl_parser_add_file (parser, ctx->saved_stats_path)) {
-               msg_err_ctx ("cannot parse controller stats from %s: %s",
-                               ctx->saved_stats_path, ucl_parser_get_error (parser));
+       if (!ucl_parser_add_file (parser, cfg->stats_file)) {
+               msg_err_config ("cannot parse controller stats from %s: %s",
+                               cfg->stats_file, ucl_parser_get_error (parser));
                ucl_parser_free (parser);
 
                return;
@@ -3170,7 +3168,7 @@ rspamd_controller_load_saved_stats (struct rspamd_controller_worker_ctx *ctx)
        obj = ucl_parser_get_object (parser);
        ucl_parser_free (parser);
 
-       stat = ctx->srv->stat;
+       stat = rspamd_main->stat;
        memcpy (&stat_copy, stat, sizeof (stat_copy));
 
        elt = ucl_object_lookup (obj, "scanned");
@@ -3214,32 +3212,29 @@ rspamd_controller_load_saved_stats (struct rspamd_controller_worker_ctx *ctx)
 }
 
 static void
-rspamd_controller_store_saved_stats (struct rspamd_controller_worker_ctx *ctx)
+rspamd_controller_store_saved_stats (struct rspamd_main *rspamd_main,
+               struct rspamd_config *cfg)
 {
        struct rspamd_stat *stat;
        ucl_object_t *top, *sub;
        struct ucl_emitter_functions *efuncs;
        gint i, fd;
+       gchar fpath[PATH_MAX], *tmpfile;
 
-       g_assert (ctx->saved_stats_path != NULL);
-
-       fd = open (ctx->saved_stats_path, O_WRONLY|O_CREAT|O_TRUNC, 00644);
-
-       if (fd == -1) {
-               msg_err_ctx ("cannot open for writing controller stats from %s: %s",
-                               ctx->saved_stats_path, strerror (errno));
+       if (cfg->stats_file == NULL) {
                return;
        }
 
-       if (rspamd_file_lock (fd, FALSE) == -1) {
-               msg_err_ctx ("cannot lock controller stats in %s: %s",
-                               ctx->saved_stats_path, strerror (errno));
-               close (fd);
+       rspamd_snprintf (fpath, sizeof (fpath), "%s.XXXXXXXX", cfg->stats_file);
+       fd = g_mkstemp_full (fpath, O_WRONLY|O_TRUNC, 00644);
 
+       if (fd == -1) {
+               msg_err_config ("cannot open for writing controller stats from %s: %s",
+                               fpath, strerror (errno));
                return;
        }
 
-       stat = ctx->srv->stat;
+       stat = rspamd_main->stat;
 
        top = ucl_object_typed_new (UCL_OBJECT);
        ucl_object_insert_key (top, ucl_object_fromint (
@@ -3258,18 +3253,28 @@ rspamd_controller_store_saved_stats (struct rspamd_controller_worker_ctx *ctx)
        }
 
        ucl_object_insert_key (top,
-                       ucl_object_fromint (stat->connections_count), "connections", 0, false);
+                       ucl_object_fromint (stat->connections_count),
+                       "connections", 0, false);
        ucl_object_insert_key (top,
                        ucl_object_fromint (stat->control_connections_count),
                        "control_connections", 0, false);
 
-
        efuncs = ucl_object_emit_fd_funcs (fd);
-       ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT,
-                       efuncs, NULL);
+       if (!ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT,
+                       efuncs, NULL)) {
+               msg_err_config ("cannot write stats to %s: %s",
+                               fpath, strerror (errno));
+
+               unlink (fpath);
+       }
+       else {
+               if (rename (fpath, cfg->stats_file) == -1) {
+                       msg_err_config ("cannot rename stats from %s to %s: %s",
+                                       fpath, cfg->stats_file, strerror (errno));
+               }
+       }
 
        ucl_object_unref (top);
-       rspamd_file_unlock (fd, FALSE);
        close (fd);
        ucl_object_emit_funcs_free (efuncs);
 }
@@ -3280,7 +3285,7 @@ rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents)
        struct rspamd_controller_worker_ctx *ctx =
                        (struct rspamd_controller_worker_ctx *)w->data;
 
-       rspamd_controller_store_saved_stats (ctx);
+       rspamd_controller_store_saved_stats (ctx->srv, ctx->cfg);
        ev_timer_again (EV_A_ w);
 }
 
@@ -3415,16 +3420,6 @@ init_controller_worker (struct rspamd_config *cfg)
                        0,
                        "Encryption keypair");
 
-       rspamd_rcl_register_worker_option (cfg,
-                       type,
-                       "stats_path",
-                       rspamd_rcl_parse_struct_string,
-                       ctx,
-                       G_STRUCT_OFFSET (struct rspamd_controller_worker_ctx,
-                                       saved_stats_path),
-                       0,
-                       "Directory where controller saves server's statistics between restarts");
-
        rspamd_rcl_register_worker_option (cfg,
                        type,
                        "task_timeout",
@@ -3563,18 +3558,10 @@ lua_csession_send_string (lua_State *L)
        return 0;
 }
 
-static void
+void
 rspamd_controller_on_terminate (struct rspamd_worker *worker)
 {
-       struct rspamd_controller_worker_ctx *ctx = worker->ctx;
-
-       rspamd_controller_store_saved_stats (ctx);
-
-       if (ctx->rrd) {
-               msg_info ("closing rrd file: %s", ctx->rrd->filename);
-               ev_timer_stop (ctx->event_loop, &ctx->rrd_event);
-               rspamd_rrd_close (ctx->rrd);
-       }
+       rspamd_controller_store_saved_stats (worker->srv, worker->srv->cfg);
 }
 
 static void
@@ -3724,13 +3711,7 @@ start_controller_worker (struct rspamd_worker *worker)
                                &ctx->secure_map, NULL);
        }
 
-       if (ctx->saved_stats_path == NULL) {
-               /* Assume default path */
-               ctx->saved_stats_path = rspamd_mempool_strdup (worker->srv->cfg->cfg_pool,
-                               DEFAULT_STATS_PATH);
-       }
-
-       rspamd_controller_load_saved_stats (ctx);
+       rspamd_controller_load_saved_stats (ctx->srv, ctx->cfg);
        ctx->lang_det = ctx->cfg->lang_det;
 
        /* RRD collector */
@@ -3925,6 +3906,12 @@ start_controller_worker (struct rspamd_worker *worker)
        rspamd_worker_block_signals ();
        rspamd_controller_on_terminate (worker);
 
+       if (ctx->rrd) {
+               msg_info ("closing rrd file: %s", ctx->rrd->filename);
+               ev_timer_stop (ctx->event_loop, &ctx->rrd_event);
+               rspamd_rrd_close (ctx->rrd);
+       }
+
        rspamd_stat_close ();
        rspamd_http_router_free (ctx->http);
 
index 750e4832507da58d7bf1cec58cced003b2ff8416..e604fe43a46c02a8cbc847b5bee0d358cf72ec0e 100644 (file)
@@ -439,6 +439,7 @@ struct rspamd_config {
 
        gchar *rrd_file;                               /**< rrd file to store statistics                                                */
        gchar *history_file;                           /**< file to save rolling history                                                */
+       gchar *stats_file;                           /**< file to save stats                                            */
        gchar *tld_file;                               /**< file to load effective tld list from                                */
        gchar *hs_cache_dir;                           /**< directory to save hyperscan databases                               */
        gchar *events_backend;                         /**< string representation of the events backend used    */
index c7d414df4d82de60bddd7a00f3be074cc0eb636f..01b0c43eaacb87f27e0710e80349167308407a25 100644 (file)
@@ -1963,6 +1963,12 @@ rspamd_rcl_config_init (struct rspamd_config *cfg, GHashTable *skip_sections)
                                G_STRUCT_OFFSET (struct rspamd_config, rrd_file),
                                RSPAMD_CL_FLAG_STRING_PATH,
                                "Path to RRD file");
+               rspamd_rcl_add_default_handler (sub,
+                               "stats_file",
+                               rspamd_rcl_parse_struct_string,
+                               G_STRUCT_OFFSET (struct rspamd_config, stats_file),
+                               RSPAMD_CL_FLAG_STRING_PATH,
+                               "Path to stats file");
                rspamd_rcl_add_default_handler (sub,
                                "history_file",
                                rspamd_rcl_parse_struct_string,
index 1a09dfce767ab606d4cb37760cfe6e33e01578a1..05997d8b34f7cf661d3b59948b989863edaca94b 100644 (file)
@@ -62,6 +62,7 @@
 #endif
 
 #include "contrib/libev/ev.h"
+#include "libstat/stat_api.h"
 
 /* Forward declaration */
 static void rspamd_worker_heartbeat_start (struct rspamd_worker *,
@@ -1616,4 +1617,103 @@ rspamd_worker_check_context (gpointer ctx, guint64 magic)
        struct rspamd_abstract_worker_ctx *actx = (struct rspamd_abstract_worker_ctx*)ctx;
 
        return actx->magic == magic;
+}
+
+static gboolean
+rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
+                                                               struct rspamd_worker *worker, gint fd,
+                                                               gint attached_fd,
+                                                               struct rspamd_control_command *cmd,
+                                                               gpointer ud)
+{
+       struct rspamd_config *cfg = ud;
+       struct rspamd_worker_log_pipe *lp;
+       struct rspamd_control_reply rep;
+
+       memset (&rep, 0, sizeof (rep));
+       rep.type = RSPAMD_CONTROL_LOG_PIPE;
+
+       if (attached_fd != -1) {
+               lp = g_malloc0 (sizeof (*lp));
+               lp->fd = attached_fd;
+               lp->type = cmd->cmd.log_pipe.type;
+
+               DL_APPEND (cfg->log_pipes, lp);
+               msg_info ("added new log pipe");
+       }
+       else {
+               rep.reply.log_pipe.status = ENOENT;
+               msg_err ("cannot attach log pipe: invalid fd");
+       }
+
+       if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+               msg_err ("cannot write reply to the control socket: %s",
+                               strerror (errno));
+       }
+
+       return TRUE;
+}
+
+static gboolean
+rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
+                                                                struct rspamd_worker *worker, gint fd,
+                                                                gint attached_fd,
+                                                                struct rspamd_control_command *cmd,
+                                                                gpointer ud)
+{
+       struct rspamd_control_reply rep;
+       struct rspamd_monitored *m;
+       struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;
+       struct rspamd_config *cfg = ud;
+
+       memset (&rep, 0, sizeof (rep));
+       rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
+
+       if (cmd->cmd.monitored_change.sender != getpid ()) {
+               m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag);
+
+               if (m != NULL) {
+                       rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
+                       rep.reply.monitored_change.status = 1;
+                       msg_info_config ("updated monitored status for %s: %s",
+                                       cmd->cmd.monitored_change.tag,
+                                       cmd->cmd.monitored_change.alive ? "alive" : "dead");
+               } else {
+                       msg_err ("cannot find monitored by tag: %*s", 32,
+                                       cmd->cmd.monitored_change.tag);
+                       rep.reply.monitored_change.status = 0;
+               }
+       }
+
+       if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+               msg_err ("cannot write reply to the control socket: %s",
+                               strerror (errno));
+       }
+
+       return TRUE;
+}
+
+void
+rspamd_worker_init_scanner (struct rspamd_worker *worker,
+                                                       struct ev_loop *ev_base,
+                                                       struct rspamd_dns_resolver *resolver,
+                                                       struct rspamd_lang_detector **plang_det)
+{
+       rspamd_stat_init (worker->srv->cfg, ev_base);
+#ifdef WITH_HYPERSCAN
+       rspamd_control_worker_add_cmd_handler (worker,
+                       RSPAMD_CONTROL_HYPERSCAN_LOADED,
+                       rspamd_worker_hyperscan_ready,
+                       NULL);
+#endif
+       rspamd_control_worker_add_cmd_handler (worker,
+                       RSPAMD_CONTROL_LOG_PIPE,
+                       rspamd_worker_log_pipe_handler,
+                       worker->srv->cfg);
+       rspamd_control_worker_add_cmd_handler (worker,
+                       RSPAMD_CONTROL_MONITORED_CHANGE,
+                       rspamd_worker_monitored_handler,
+                       worker->srv->cfg);
+
+       *plang_det = worker->srv->cfg->lang_det;
 }
\ No newline at end of file
index 15d79df2fa96e91e5421cb947f3857c7c83a751e..b94d8bd9b87ea295f0c4720769a1305c5670c988 100644 (file)
@@ -244,6 +244,12 @@ gboolean rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
  */
 gboolean rspamd_worker_call_finish_handlers (struct rspamd_worker *worker);
 
+/**
+ * Defined in controller.c
+ * @param worker
+ */
+extern void rspamd_controller_on_terminate (struct rspamd_worker *worker);
+
 #ifdef WITH_HYPERSCAN
 struct rspamd_control_command;
 
index 60ee8fdf7c57273ff85fa8c6bb0ee5bb416c9e19..b2addd30c571f7e81944afe80c7e9a32ab1f03a3 100644 (file)
@@ -36,12 +36,12 @@ local rspamd_mempool = require "rspamd_mempool"
 
 local function parse_func(str)
        -- extract token till the first space character
-       local token = table.join('', take_while(function(s) return s ~= ' ' end, str))
+       local token = table.concat(totable(take_while(function(s) return s ~= ' ' end, iter(str))))
        -- Return token name
        return token
 end
 
-local function process_func(token, task)
+local function process_func(token)
        -- Do something using token and task
 end
 
index 227e20c99793d806473f0c416a607e94d4d5df78..0c9687c7e0c73c3804b508dc1734cfc03a5e35da 100644 (file)
@@ -2225,7 +2225,6 @@ start_rspamd_proxy (struct rspamd_worker *worker)
        ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
                        ctx->event_loop,
                        worker->srv->cfg);
-       rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
 
        rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
                        ctx->event_loop, ctx->resolver->r);
@@ -2236,11 +2235,49 @@ start_rspamd_proxy (struct rspamd_worker *worker)
                        (rspamd_mempool_destruct_t)rspamd_http_context_free,
                        ctx->http_ctx);
 
+       rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver,
+                       worker, 0);
+
        if (ctx->has_self_scan) {
                /* Additional initialisation needed */
                rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
                                &ctx->lang_det);
 
+               if (worker->index == 0) {
+                       /*
+                        * If there are no controllers and no normal workers,
+                        * then pretend that we are a controller
+                        */
+                       gboolean controller_seen = FALSE;
+                       GList *cur;
+
+                       cur = worker->srv->cfg->workers;
+
+                       while (cur) {
+                               struct rspamd_worker_conf *cf;
+
+                               cf = (struct rspamd_worker_conf *)cur->data;
+                               if ((cf->type == g_quark_from_static_string ("controller")) ||
+                                               (cf->type == g_quark_from_static_string ("normal"))) {
+
+                                       if (cf->enabled && cf->count >= 0) {
+                                               controller_seen = TRUE;
+                                               break;
+                                       }
+                               }
+
+                               cur = g_list_next (cur);
+                       }
+
+                       if (!controller_seen) {
+                               msg_info ("no controller or normal workers defined, execute "
+                                                         "controller periodics in this worker");
+                               worker->flags |= RSPAMD_WORKER_CONTROLLER;
+                       }
+               }
+       }
+       else {
+               worker->flags &= ~RSPAMD_WORKER_SCANNER;
        }
 
        if (worker->srv->cfg->enable_sessions_cache) {
index 81ec0904ae8743cee80e2c73f2f4fb0b0979f590..193c743195fccb22fd077f328fe743785d8ff114 100644 (file)
 #include "libserver/dns.h"
 #include "libmime/message.h"
 #include "rspamd.h"
-#include "keypairs_cache.h"
 #include "libstat/stat_api.h"
 #include "libserver/worker_util.h"
 #include "libserver/rspamd_control.h"
 #include "worker_private.h"
-#include "utlist.h"
 #include "libutil/http_private.h"
-#include "libmime/lang_detection.h"
+#include "libserver/cfg_file_private.h"
 #include <math.h>
-#include <src/libserver/cfg_file_private.h>
 #include "unix-std.h"
 
 #include "lua/lua_common.h"
@@ -57,15 +54,15 @@ worker_t normal_worker = {
 };
 
 #define msg_err_ctx(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
-        "controller", ctx->cfg->cfg_pool->tag.uid, \
+        "worker", ctx->cfg->cfg_pool->tag.uid, \
         G_STRFUNC, \
         __VA_ARGS__)
 #define msg_warn_ctx(...)   rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
-        "controller", ctx->cfg->cfg_pool->tag.uid, \
+        "worker", ctx->cfg->cfg_pool->tag.uid, \
         G_STRFUNC, \
         __VA_ARGS__)
 #define msg_info_ctx(...)   rspamd_default_log_function (G_LOG_LEVEL_INFO, \
-        "controller", ctx->cfg->cfg_pool->tag.uid, \
+        "worker", ctx->cfg->cfg_pool->tag.uid, \
         G_STRFUNC, \
         __VA_ARGS__)
 
@@ -405,80 +402,6 @@ accept_socket (EV_P_ ev_io *w, int revents)
                        ctx->timeout);
 }
 
-static gboolean
-rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
-               struct rspamd_worker *worker, gint fd,
-               gint attached_fd,
-               struct rspamd_control_command *cmd,
-               gpointer ud)
-{
-       struct rspamd_config *cfg = ud;
-       struct rspamd_worker_log_pipe *lp;
-       struct rspamd_control_reply rep;
-
-       memset (&rep, 0, sizeof (rep));
-       rep.type = RSPAMD_CONTROL_LOG_PIPE;
-
-       if (attached_fd != -1) {
-               lp = g_malloc0 (sizeof (*lp));
-               lp->fd = attached_fd;
-               lp->type = cmd->cmd.log_pipe.type;
-
-               DL_APPEND (cfg->log_pipes, lp);
-               msg_info ("added new log pipe");
-       }
-       else {
-               rep.reply.log_pipe.status = ENOENT;
-               msg_err ("cannot attach log pipe: invalid fd");
-       }
-
-       if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
-               msg_err ("cannot write reply to the control socket: %s",
-                               strerror (errno));
-       }
-
-       return TRUE;
-}
-
-static gboolean
-rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
-               struct rspamd_worker *worker, gint fd,
-               gint attached_fd,
-               struct rspamd_control_command *cmd,
-               gpointer ud)
-{
-       struct rspamd_control_reply rep;
-       struct rspamd_monitored *m;
-       struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;
-       struct rspamd_config *cfg = ud;
-
-       memset (&rep, 0, sizeof (rep));
-       rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
-
-       if (cmd->cmd.monitored_change.sender != getpid ()) {
-               m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag);
-
-               if (m != NULL) {
-                       rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
-                       rep.reply.monitored_change.status = 1;
-                       msg_info_config ("updated monitored status for %s: %s",
-                                       cmd->cmd.monitored_change.tag,
-                                       cmd->cmd.monitored_change.alive ? "alive" : "dead");
-               } else {
-                       msg_err ("cannot find monitored by tag: %*s", 32,
-                                       cmd->cmd.monitored_change.tag);
-                       rep.reply.monitored_change.status = 0;
-               }
-       }
-
-       if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
-               msg_err ("cannot write reply to the control socket: %s",
-                               strerror (errno));
-       }
-
-       return TRUE;
-}
-
 gpointer
 init_worker (struct rspamd_config *cfg)
 {
@@ -557,31 +480,6 @@ init_worker (struct rspamd_config *cfg)
        return ctx;
 }
 
-void
-rspamd_worker_init_scanner (struct rspamd_worker *worker,
-               struct ev_loop *ev_base,
-               struct rspamd_dns_resolver *resolver,
-               struct rspamd_lang_detector **plang_det)
-{
-       rspamd_stat_init (worker->srv->cfg, ev_base);
-#ifdef WITH_HYPERSCAN
-       rspamd_control_worker_add_cmd_handler (worker,
-                       RSPAMD_CONTROL_HYPERSCAN_LOADED,
-                       rspamd_worker_hyperscan_ready,
-                       NULL);
-#endif
-       rspamd_control_worker_add_cmd_handler (worker,
-                       RSPAMD_CONTROL_LOG_PIPE,
-                       rspamd_worker_log_pipe_handler,
-                       worker->srv->cfg);
-       rspamd_control_worker_add_cmd_handler (worker,
-                       RSPAMD_CONTROL_MONITORED_CHANGE,
-                       rspamd_worker_monitored_handler,
-                       worker->srv->cfg);
-
-       *plang_det = worker->srv->cfg->lang_det;
-}
-
 /*
  * Start worker process
  */
@@ -589,6 +487,7 @@ void
 start_worker (struct rspamd_worker *worker)
 {
        struct rspamd_worker_ctx *ctx = worker->ctx;
+       gboolean is_controller = FALSE;
 
        g_assert (rspamd_worker_check_context (worker->ctx, rspamd_worker_magic));
        ctx->cfg = worker->srv->cfg;
@@ -619,12 +518,46 @@ start_worker (struct rspamd_worker *worker)
                        ctx->http_ctx);
        rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
                        &ctx->lang_det);
+
+       if (worker->index == 0) {
+               /* If there are no controllers, then pretend that we are a controller */
+               gboolean controller_seen = FALSE;
+               GList *cur;
+
+               cur = worker->srv->cfg->workers;
+
+               while (cur) {
+                       struct rspamd_worker_conf *cf;
+
+                       cf = (struct rspamd_worker_conf *)cur->data;
+                       if (cf->type == g_quark_from_static_string ("controller")) {
+                               if (cf->enabled && cf->count >= 0) {
+                                       controller_seen = TRUE;
+                                       break;
+                               }
+                       }
+
+                       cur = g_list_next (cur);
+               }
+
+               if (!controller_seen) {
+                       msg_info_ctx ("no controller workers defined, execute "
+                                "controller periodics in this worker");
+                       worker->flags |= RSPAMD_WORKER_CONTROLLER;
+                       is_controller = TRUE;
+               }
+       }
+
        rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
                        worker);
 
        ev_loop (ctx->event_loop, 0);
        rspamd_worker_block_signals ();
 
+       if (is_controller) {
+               rspamd_controller_on_terminate (
+       }
+
        rspamd_stat_close ();
        REF_RELEASE (ctx->cfg);
        rspamd_log_close (worker->srv->logger, TRUE);