aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2017-07-17 08:40:00 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2017-07-17 08:40:00 +0100
commit660d3e9967cee74a642daec7525a67e2f74bcdc1 (patch)
tree22bf15e735f52be3e9969384b803b68ee3a7ab47 /src/libserver
parent57ad67a4b4560a936f9bc7efa6a1a3778a1372fa (diff)
downloadrspamd-660d3e9967cee74a642daec7525a67e2f74bcdc1.tar.gz
rspamd-660d3e9967cee74a642daec7525a67e2f74bcdc1.zip
[Feature] Implement forked workers children monitoring
Diffstat (limited to 'src/libserver')
-rw-r--r--src/libserver/rspamd_control.c87
-rw-r--r--src/libserver/rspamd_control.h15
-rw-r--r--src/libserver/worker_util.c2
3 files changed, 90 insertions, 14 deletions
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index 90dabc4e1..ab2f736b6 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -599,10 +599,10 @@ static void
rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
{
struct rspamd_worker_control_data *cd = ud;
- struct rspamd_control_command cmd;
- struct msghdr msg;
- struct iovec iov;
- guchar fdspace[CMSG_SPACE(sizeof (int))];
+ static struct rspamd_control_command cmd;
+ static struct msghdr msg;
+ static struct iovec iov;
+ static guchar fdspace[CMSG_SPACE(sizeof (int))];
gint rfd = -1;
gssize r;
@@ -697,6 +697,7 @@ rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker,
struct rspamd_srv_reply_data {
struct rspamd_worker *worker;
+ struct rspamd_main *srv;
gint fd;
struct rspamd_srv_reply rep;
};
@@ -726,24 +727,73 @@ rspamd_control_log_pipe_io_handler (gint fd, short what, gpointer ud)
}
static void
+rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
+ struct rspamd_main *srv)
+{
+ struct rspamd_worker *parent, *child;
+
+ parent = g_hash_table_lookup (srv->workers,
+ GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid));
+
+ if (parent == NULL) {
+ msg_err ("cannot find parent for a forked process %P (%P child)",
+ cmd->cmd.on_fork.ppid, cmd->cmd.on_fork.cpid);
+
+ return;
+ }
+
+ if (cmd->cmd.on_fork.state == child_dead) {
+ /* We need to remove stale worker */
+ child = g_hash_table_lookup (srv->workers,
+ GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid));
+
+ if (child == NULL) {
+ msg_err ("cannot find child for a forked process %P (%P parent)",
+ cmd->cmd.on_fork.cpid, cmd->cmd.on_fork.ppid);
+
+ return;
+ }
+
+ g_hash_table_remove (srv->workers,
+ GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid));
+ g_free (child);
+ }
+ else {
+ child = g_malloc0 (sizeof (struct rspamd_worker));
+ child->srv = srv;
+ child->type = parent->type;
+ child->pid = cmd->cmd.on_fork.cpid;
+ g_hash_table_insert (srv->workers,
+ GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid), child);
+ }
+}
+
+struct rspamd_srv_cbdata {
+ struct rspamd_worker *worker;
+ struct rspamd_main *srv;
+};
+
+static void
rspamd_srv_handler (gint fd, short what, gpointer ud)
{
+ struct rspamd_srv_cbdata *cbd;
struct rspamd_worker *worker;
- struct rspamd_srv_command cmd;
+ static struct rspamd_srv_command cmd;
struct rspamd_main *srv;
struct rspamd_srv_reply_data *rdata;
struct msghdr msg;
struct cmsghdr *cmsg;
- struct iovec iov;
- guchar fdspace[CMSG_SPACE(sizeof (int))];
+ static struct iovec iov;
+ static guchar fdspace[CMSG_SPACE(sizeof (int))];
gint *spair, rfd = -1;
gchar *nid;
struct rspamd_control_command wcmd;
gssize r;
if (what == EV_READ) {
- worker = ud;
- srv = worker->srv;
+ cbd = ud;
+ worker = cbd->worker;
+ srv = cbd->srv;
iov.iov_base = &cmd;
iov.iov_len = sizeof (cmd);
memset (&msg, 0, sizeof (msg));
@@ -772,6 +822,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
else {
rdata = g_slice_alloc0 (sizeof (*rdata));
rdata->worker = worker;
+ rdata->srv = srv;
rdata->rep.id = cmd.id;
rdata->rep.type = cmd.type;
rdata->fd = -1;
@@ -814,6 +865,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
wcmd.cmd.hs_loaded.forced = cmd.cmd.hs_loaded.forced;
rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_hs_io_handler, NULL);
+ return;
break;
case RSPAMD_SRV_MONITORED_CHANGE:
/* Broadcast command to all workers */
@@ -826,6 +878,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
wcmd.cmd.monitored_change.sender = cmd.cmd.monitored_change.sender;
rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_hs_io_handler, NULL);
+ return;
break;
case RSPAMD_SRV_LOG_PIPE:
memset (&wcmd, 0, sizeof (wcmd));
@@ -833,6 +886,11 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
wcmd.cmd.log_pipe.type = cmd.cmd.log_pipe.type;
rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_log_pipe_io_handler, NULL);
+ return;
+ break;
+ case RSPAMD_SRV_ON_FORK:
+ rdata->rep.reply.on_fork.status = 0;
+ rspamd_control_handle_on_fork (&cmd, srv);
break;
default:
msg_err ("unknown command type: %d", cmd.type);
@@ -857,7 +915,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
else if (what == EV_WRITE) {
rdata = ud;
worker = rdata->worker;
- srv = worker->srv;
+ srv = rdata->srv;
memset (&msg, 0, sizeof (msg));
@@ -897,13 +955,18 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
}
void
-rspamd_srv_start_watching (struct rspamd_worker *worker,
+rspamd_srv_start_watching (struct rspamd_main *srv,
+ struct rspamd_worker *worker,
struct event_base *ev_base)
{
+ struct rspamd_srv_cbdata *cbd;
g_assert (worker != NULL);
+ cbd = rspamd_mempool_alloc (srv->server_pool, sizeof (*cbd));
+ cbd->worker = worker;
+ cbd->srv = srv;
event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST,
- rspamd_srv_handler, worker);
+ rspamd_srv_handler, cbd);
event_base_set (ev_base, &worker->srv_ev);
event_add (&worker->srv_ev, NULL);
}
diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h
index ba26cd187..bc42b662e 100644
--- a/src/libserver/rspamd_control.h
+++ b/src/libserver/rspamd_control.h
@@ -41,6 +41,7 @@ enum rspamd_srv_type {
RSPAMD_SRV_HYPERSCAN_LOADED,
RSPAMD_SRV_MONITORED_CHANGE,
RSPAMD_SRV_LOG_PIPE,
+ RSPAMD_SRV_ON_FORK,
};
enum rspamd_log_pipe_type {
@@ -144,6 +145,14 @@ struct rspamd_srv_command {
struct {
enum rspamd_log_pipe_type type;
} log_pipe;
+ struct {
+ pid_t ppid;
+ pid_t cpid;
+ enum {
+ child_create = 0,
+ child_dead,
+ } state;
+ } on_fork;
} cmd;
};
@@ -163,6 +172,9 @@ struct rspamd_srv_reply {
struct {
enum rspamd_log_pipe_type type;
} log_pipe;
+ struct {
+ gint status;
+ } on_fork;
} reply;
};
@@ -200,7 +212,8 @@ void rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker,
/**
* Start watching on srv pipe
*/
-void rspamd_srv_start_watching (struct rspamd_worker *worker,
+void rspamd_srv_start_watching (struct rspamd_main *srv,
+ struct rspamd_worker *worker,
struct event_base *ev_base);
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 1a564786f..88e775b54 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -617,7 +617,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
close (wrk->srv_pipe[1]);
rspamd_socket_nonblocking (wrk->control_pipe[0]);
rspamd_socket_nonblocking (wrk->srv_pipe[0]);
- rspamd_srv_start_watching (wrk, ev_base);
+ rspamd_srv_start_watching (rspamd_main, wrk, ev_base);
/* Insert worker into worker's table, pid is index */
g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (
wrk->pid), wrk);