diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-11-11 13:57:56 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2019-11-11 15:30:30 +0000 |
commit | 82ca7ec2ac7f02f67304e1ed7b6859cabdfeae22 (patch) | |
tree | fe9e36215a195ae9cd40be7d7eea0fe3f10e0408 /src/worker.c | |
parent | 7ff0c15b188eacd7051c174633c1779809f28352 (diff) | |
download | rspamd-82ca7ec2ac7f02f67304e1ed7b6859cabdfeae22.tar.gz rspamd-82ca7ec2ac7f02f67304e1ed7b6859cabdfeae22.zip |
[Rework] Replace controller functions by any scanner worker if needed
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 145 |
1 files changed, 39 insertions, 106 deletions
diff --git a/src/worker.c b/src/worker.c index 81ec0904a..193c74319 100644 --- a/src/worker.c +++ b/src/worker.c @@ -27,16 +27,13 @@ #include "libserver/dns.h" #include "libmime/message.h" #include "rspamd.h" -#include "keypairs_cache.h" #include "libstat/stat_api.h" #include "libserver/worker_util.h" #include "libserver/rspamd_control.h" #include "worker_private.h" -#include "utlist.h" #include "libutil/http_private.h" -#include "libmime/lang_detection.h" +#include "libserver/cfg_file_private.h" #include <math.h> -#include <src/libserver/cfg_file_private.h> #include "unix-std.h" #include "lua/lua_common.h" @@ -57,15 +54,15 @@ worker_t normal_worker = { }; #define msg_err_ctx(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \ - "controller", ctx->cfg->cfg_pool->tag.uid, \ + "worker", ctx->cfg->cfg_pool->tag.uid, \ G_STRFUNC, \ __VA_ARGS__) #define msg_warn_ctx(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ - "controller", ctx->cfg->cfg_pool->tag.uid, \ + "worker", ctx->cfg->cfg_pool->tag.uid, \ G_STRFUNC, \ __VA_ARGS__) #define msg_info_ctx(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ - "controller", ctx->cfg->cfg_pool->tag.uid, \ + "worker", ctx->cfg->cfg_pool->tag.uid, \ G_STRFUNC, \ __VA_ARGS__) @@ -405,80 +402,6 @@ accept_socket (EV_P_ ev_io *w, int revents) ctx->timeout); } -static gboolean -rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main, - struct rspamd_worker *worker, gint fd, - gint attached_fd, - struct rspamd_control_command *cmd, - gpointer ud) -{ - struct rspamd_config *cfg = ud; - struct rspamd_worker_log_pipe *lp; - struct rspamd_control_reply rep; - - memset (&rep, 0, sizeof (rep)); - rep.type = RSPAMD_CONTROL_LOG_PIPE; - - if (attached_fd != -1) { - lp = g_malloc0 (sizeof (*lp)); - lp->fd = attached_fd; - lp->type = cmd->cmd.log_pipe.type; - - DL_APPEND (cfg->log_pipes, lp); - msg_info ("added new log pipe"); - } - else { - rep.reply.log_pipe.status = ENOENT; - msg_err ("cannot attach log pipe: invalid fd"); - } - - if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { - msg_err ("cannot write reply to the control socket: %s", - strerror (errno)); - } - - return TRUE; -} - -static gboolean -rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main, - struct rspamd_worker *worker, gint fd, - gint attached_fd, - struct rspamd_control_command *cmd, - gpointer ud) -{ - struct rspamd_control_reply rep; - struct rspamd_monitored *m; - struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx; - struct rspamd_config *cfg = ud; - - memset (&rep, 0, sizeof (rep)); - rep.type = RSPAMD_CONTROL_MONITORED_CHANGE; - - if (cmd->cmd.monitored_change.sender != getpid ()) { - m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag); - - if (m != NULL) { - rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive); - rep.reply.monitored_change.status = 1; - msg_info_config ("updated monitored status for %s: %s", - cmd->cmd.monitored_change.tag, - cmd->cmd.monitored_change.alive ? "alive" : "dead"); - } else { - msg_err ("cannot find monitored by tag: %*s", 32, - cmd->cmd.monitored_change.tag); - rep.reply.monitored_change.status = 0; - } - } - - if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { - msg_err ("cannot write reply to the control socket: %s", - strerror (errno)); - } - - return TRUE; -} - gpointer init_worker (struct rspamd_config *cfg) { @@ -557,31 +480,6 @@ init_worker (struct rspamd_config *cfg) return ctx; } -void -rspamd_worker_init_scanner (struct rspamd_worker *worker, - struct ev_loop *ev_base, - struct rspamd_dns_resolver *resolver, - struct rspamd_lang_detector **plang_det) -{ - rspamd_stat_init (worker->srv->cfg, ev_base); -#ifdef WITH_HYPERSCAN - rspamd_control_worker_add_cmd_handler (worker, - RSPAMD_CONTROL_HYPERSCAN_LOADED, - rspamd_worker_hyperscan_ready, - NULL); -#endif - rspamd_control_worker_add_cmd_handler (worker, - RSPAMD_CONTROL_LOG_PIPE, - rspamd_worker_log_pipe_handler, - worker->srv->cfg); - rspamd_control_worker_add_cmd_handler (worker, - RSPAMD_CONTROL_MONITORED_CHANGE, - rspamd_worker_monitored_handler, - worker->srv->cfg); - - *plang_det = worker->srv->cfg->lang_det; -} - /* * Start worker process */ @@ -589,6 +487,7 @@ void start_worker (struct rspamd_worker *worker) { struct rspamd_worker_ctx *ctx = worker->ctx; + gboolean is_controller = FALSE; g_assert (rspamd_worker_check_context (worker->ctx, rspamd_worker_magic)); ctx->cfg = worker->srv->cfg; @@ -619,12 +518,46 @@ start_worker (struct rspamd_worker *worker) ctx->http_ctx); rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver, &ctx->lang_det); + + if (worker->index == 0) { + /* If there are no controllers, then pretend that we are a controller */ + gboolean controller_seen = FALSE; + GList *cur; + + cur = worker->srv->cfg->workers; + + while (cur) { + struct rspamd_worker_conf *cf; + + cf = (struct rspamd_worker_conf *)cur->data; + if (cf->type == g_quark_from_static_string ("controller")) { + if (cf->enabled && cf->count >= 0) { + controller_seen = TRUE; + break; + } + } + + cur = g_list_next (cur); + } + + if (!controller_seen) { + msg_info_ctx ("no controller workers defined, execute " + "controller periodics in this worker"); + worker->flags |= RSPAMD_WORKER_CONTROLLER; + is_controller = TRUE; + } + } + rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop, worker); ev_loop (ctx->event_loop, 0); rspamd_worker_block_signals (); + if (is_controller) { + rspamd_controller_on_terminate ( + } + rspamd_stat_close (); REF_RELEASE (ctx->cfg); rspamd_log_close (worker->srv->logger, TRUE); |