aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver/worker_util.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-11-11 13:57:56 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-11-11 15:30:30 +0000
commit82ca7ec2ac7f02f67304e1ed7b6859cabdfeae22 (patch)
treefe9e36215a195ae9cd40be7d7eea0fe3f10e0408 /src/libserver/worker_util.c
parent7ff0c15b188eacd7051c174633c1779809f28352 (diff)
downloadrspamd-82ca7ec2ac7f02f67304e1ed7b6859cabdfeae22.tar.gz
rspamd-82ca7ec2ac7f02f67304e1ed7b6859cabdfeae22.zip
[Rework] Replace controller functions by any scanner worker if needed
Diffstat (limited to 'src/libserver/worker_util.c')
-rw-r--r--src/libserver/worker_util.c100
1 files changed, 100 insertions, 0 deletions
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 1a09dfce7..05997d8b3 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -62,6 +62,7 @@
#endif
#include "contrib/libev/ev.h"
+#include "libstat/stat_api.h"
/* Forward declaration */
static void rspamd_worker_heartbeat_start (struct rspamd_worker *,
@@ -1616,4 +1617,103 @@ rspamd_worker_check_context (gpointer ctx, guint64 magic)
struct rspamd_abstract_worker_ctx *actx = (struct rspamd_abstract_worker_ctx*)ctx;
return actx->magic == magic;
+}
+
+static gboolean
+rspamd_worker_log_pipe_handler (struct rspamd_main *rspamd_main,
+ struct rspamd_worker *worker, gint fd,
+ gint attached_fd,
+ struct rspamd_control_command *cmd,
+ gpointer ud)
+{
+ struct rspamd_config *cfg = ud;
+ struct rspamd_worker_log_pipe *lp;
+ struct rspamd_control_reply rep;
+
+ memset (&rep, 0, sizeof (rep));
+ rep.type = RSPAMD_CONTROL_LOG_PIPE;
+
+ if (attached_fd != -1) {
+ lp = g_malloc0 (sizeof (*lp));
+ lp->fd = attached_fd;
+ lp->type = cmd->cmd.log_pipe.type;
+
+ DL_APPEND (cfg->log_pipes, lp);
+ msg_info ("added new log pipe");
+ }
+ else {
+ rep.reply.log_pipe.status = ENOENT;
+ msg_err ("cannot attach log pipe: invalid fd");
+ }
+
+ if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+ msg_err ("cannot write reply to the control socket: %s",
+ strerror (errno));
+ }
+
+ return TRUE;
+}
+
+static gboolean
+rspamd_worker_monitored_handler (struct rspamd_main *rspamd_main,
+ struct rspamd_worker *worker, gint fd,
+ gint attached_fd,
+ struct rspamd_control_command *cmd,
+ gpointer ud)
+{
+ struct rspamd_control_reply rep;
+ struct rspamd_monitored *m;
+ struct rspamd_monitored_ctx *mctx = worker->srv->cfg->monitored_ctx;
+ struct rspamd_config *cfg = ud;
+
+ memset (&rep, 0, sizeof (rep));
+ rep.type = RSPAMD_CONTROL_MONITORED_CHANGE;
+
+ if (cmd->cmd.monitored_change.sender != getpid ()) {
+ m = rspamd_monitored_by_tag (mctx, cmd->cmd.monitored_change.tag);
+
+ if (m != NULL) {
+ rspamd_monitored_set_alive (m, cmd->cmd.monitored_change.alive);
+ rep.reply.monitored_change.status = 1;
+ msg_info_config ("updated monitored status for %s: %s",
+ cmd->cmd.monitored_change.tag,
+ cmd->cmd.monitored_change.alive ? "alive" : "dead");
+ } else {
+ msg_err ("cannot find monitored by tag: %*s", 32,
+ cmd->cmd.monitored_change.tag);
+ rep.reply.monitored_change.status = 0;
+ }
+ }
+
+ if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+ msg_err ("cannot write reply to the control socket: %s",
+ strerror (errno));
+ }
+
+ return TRUE;
+}
+
+void
+rspamd_worker_init_scanner (struct rspamd_worker *worker,
+ struct ev_loop *ev_base,
+ struct rspamd_dns_resolver *resolver,
+ struct rspamd_lang_detector **plang_det)
+{
+ rspamd_stat_init (worker->srv->cfg, ev_base);
+#ifdef WITH_HYPERSCAN
+ rspamd_control_worker_add_cmd_handler (worker,
+ RSPAMD_CONTROL_HYPERSCAN_LOADED,
+ rspamd_worker_hyperscan_ready,
+ NULL);
+#endif
+ rspamd_control_worker_add_cmd_handler (worker,
+ RSPAMD_CONTROL_LOG_PIPE,
+ rspamd_worker_log_pipe_handler,
+ worker->srv->cfg);
+ rspamd_control_worker_add_cmd_handler (worker,
+ RSPAMD_CONTROL_MONITORED_CHANGE,
+ rspamd_worker_monitored_handler,
+ worker->srv->cfg);
+
+ *plang_det = worker->srv->cfg->lang_det;
} \ No newline at end of file