aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-11-11 13:57:56 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-11-11 15:30:30 +0000
commit82ca7ec2ac7f02f67304e1ed7b6859cabdfeae22 (patch)
treefe9e36215a195ae9cd40be7d7eea0fe3f10e0408 /src/worker.c
parent7ff0c15b188eacd7051c174633c1779809f28352 (diff)
downloadrspamd-82ca7ec2ac7f02f67304e1ed7b6859cabdfeae22.tar.gz
rspamd-82ca7ec2ac7f02f67304e1ed7b6859cabdfeae22.zip
[Rework] Replace controller functions by any scanner worker if needed
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c145
1 files changed, 39 insertions, 106 deletions
diff --git a/src/worker.c b/src/worker.c
index 81ec0904a..193c74319 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -27,16 +27,13 @@
#include "libserver/dns.h"
#include "libmime/message.h"
#include "rspamd.h"
-#include "keypairs_cache.h"
#include "libstat/stat_api.h"
#include "libserver/worker_util.h"
#include "libserver/rspamd_control.h"
#include "worker_private.h"
-#include "utlist.h"
#include "libutil/http_private.h"
-#include "libmime/lang_detection.h"
+#include "libserver/cfg_file_private.h"
#include <math.h>
-#include <src/libserver/cfg_file_private.h>
#include "unix-std.h"
#include "lua/lua_common.h"
@@ -57,15 +54,15 @@ worker_t normal_worker = {
};
#define msg_err_ctx(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
- "controller", ctx->cfg->cfg_pool->tag.uid, \
+ "worker", ctx->cfg->cfg_pool->tag.uid, \
G_STRFUNC, \
__VA_ARGS__)
#define msg_warn_ctx(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
- "controller", ctx->cfg->cfg_pool->tag.uid, \
+ "worker", ctx->cfg->cfg_pool->tag.uid, \
G_STRFUNC, \
__VA_ARGS__)
#define msg_info_ctx(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
- "controller", ctx->cfg->cfg_pool->tag.uid, \
+ "worker", ctx->cfg->cfg_pool->tag.uid, \
G_STRFUNC, \
__VA_ARGS__)
@@ -405,80 +402,6 @@ accept_socket (EV_P_ ev_io *w, int revents)
ctx->timeout);
}
-static gboolean
-rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
- struct rspamd_worker *worker, gint fd,
- gint attached_fd,
- struct rspamd_control_command *cmd,
- gpointer ud)
-{
- struct rspamd_config *cfg = ud;
- struct rspamd_worker_log_pipe *lp;
- struct rspamd_control_reply rep;
-
- memset (&rep, 0, sizeof (rep));
- rep.type = RSPAMD_CONTROL_LOG_PIPE;
-
- if (attached_fd != -1) {
- lp = g_malloc0 (sizeof (*lp));
- lp->fd = attached_fd;
- lp->type = cmd->cmd.log_pipe.type;
-
- DL_APPEND (cfg->log_pipes, lp);
- msg_info ("added new log pipe");
- }
- else {
- rep.reply.log_pipe.status = ENOENT;
- msg_err ("cannot attach log pipe: invalid fd");
- }
-
- if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
- msg_err ("cannot write reply to the control socket: %s",
- strerror (errno));
- }
-
- return TRUE;
-}
-
-static gboolean
-rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
- struct rspamd_worker *worker, gint fd,
- gint attached_fd,
- struct rspamd_control_command *cmd,
- gpointer ud)
-{
- struct rspamd_control_reply rep;
- struct rspamd_monitored *m;
- struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;
- struct rspamd_config *cfg = ud;
-
- memset (&rep, 0, sizeof (rep));
- rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
-
- if (cmd->cmd.monitored_change.sender != getpid ()) {
- m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag);
-
- if (m != NULL) {
- rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
- rep.reply.monitored_change.status = 1;
- msg_info_config ("updated monitored status for %s: %s",
- cmd->cmd.monitored_change.tag,
- cmd->cmd.monitored_change.alive ? "alive" : "dead");
- } else {
- msg_err ("cannot find monitored by tag: %*s", 32,
- cmd->cmd.monitored_change.tag);
- rep.reply.monitored_change.status = 0;
- }
- }
-
- if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
- msg_err ("cannot write reply to the control socket: %s",
- strerror (errno));
- }
-
- return TRUE;
-}
-
gpointer
init_worker (struct rspamd_config *cfg)
{
@@ -557,31 +480,6 @@ init_worker (struct rspamd_config *cfg)
return ctx;
}
-void
-rspamd_worker_init_scanner (struct rspamd_worker *worker,
- struct ev_loop *ev_base,
- struct rspamd_dns_resolver *resolver,
- struct rspamd_lang_detector **plang_det)
-{
- rspamd_stat_init (worker->srv->cfg, ev_base);
-#ifdef WITH_HYPERSCAN
- rspamd_control_worker_add_cmd_handler (worker,
- RSPAMD_CONTROL_HYPERSCAN_LOADED,
- rspamd_worker_hyperscan_ready,
- NULL);
-#endif
- rspamd_control_worker_add_cmd_handler (worker,
- RSPAMD_CONTROL_LOG_PIPE,
- rspamd_worker_log_pipe_handler,
- worker->srv->cfg);
- rspamd_control_worker_add_cmd_handler (worker,
- RSPAMD_CONTROL_MONITORED_CHANGE,
- rspamd_worker_monitored_handler,
- worker->srv->cfg);
-
- *plang_det = worker->srv->cfg->lang_det;
-}
-
/*
* Start worker process
*/
@@ -589,6 +487,7 @@ void
start_worker (struct rspamd_worker *worker)
{
struct rspamd_worker_ctx *ctx = worker->ctx;
+ gboolean is_controller = FALSE;
g_assert (rspamd_worker_check_context (worker->ctx, rspamd_worker_magic));
ctx->cfg = worker->srv->cfg;
@@ -619,12 +518,46 @@ start_worker (struct rspamd_worker *worker)
ctx->http_ctx);
rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
&ctx->lang_det);
+
+ if (worker->index == 0) {
+ /* If there are no controllers, then pretend that we are a controller */
+ gboolean controller_seen = FALSE;
+ GList *cur;
+
+ cur = worker->srv->cfg->workers;
+
+ while (cur) {
+ struct rspamd_worker_conf *cf;
+
+ cf = (struct rspamd_worker_conf *)cur->data;
+ if (cf->type == g_quark_from_static_string ("controller")) {
+ if (cf->enabled && cf->count >= 0) {
+ controller_seen = TRUE;
+ break;
+ }
+ }
+
+ cur = g_list_next (cur);
+ }
+
+ if (!controller_seen) {
+ msg_info_ctx ("no controller workers defined, execute "
+ "controller periodics in this worker");
+ worker->flags |= RSPAMD_WORKER_CONTROLLER;
+ is_controller = TRUE;
+ }
+ }
+
rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
worker);
ev_loop (ctx->event_loop, 0);
rspamd_worker_block_signals ();
+ if (is_controller) {
+ rspamd_controller_on_terminate (
+ }
+
rspamd_stat_close ();
REF_RELEASE (ctx->cfg);
rspamd_log_close (worker->srv->logger, TRUE);