From: Vsevolod Stakhov Date: Mon, 17 Jul 2017 07:40:00 +0000 (+0100) Subject: [Feature] Implement forked workers children monitoring X-Git-Tag: 1.7.0~840 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=660d3e9967cee74a642daec7525a67e2f74bcdc1;p=rspamd.git [Feature] Implement forked workers children monitoring --- diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c index 90dabc4e1..ab2f736b6 100644 --- a/src/libserver/rspamd_control.c +++ b/src/libserver/rspamd_control.c @@ -599,10 +599,10 @@ static void rspamd_control_default_worker_handler (gint fd, short what, gpointer ud) { struct rspamd_worker_control_data *cd = ud; - struct rspamd_control_command cmd; - struct msghdr msg; - struct iovec iov; - guchar fdspace[CMSG_SPACE(sizeof (int))]; + static struct rspamd_control_command cmd; + static struct msghdr msg; + static struct iovec iov; + static guchar fdspace[CMSG_SPACE(sizeof (int))]; gint rfd = -1; gssize r; @@ -697,6 +697,7 @@ rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker, struct rspamd_srv_reply_data { struct rspamd_worker *worker; + struct rspamd_main *srv; gint fd; struct rspamd_srv_reply rep; }; @@ -725,25 +726,74 @@ rspamd_control_log_pipe_io_handler (gint fd, short what, gpointer ud) g_slice_free1 (sizeof (*elt), elt); } +static void +rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd, + struct rspamd_main *srv) +{ + struct rspamd_worker *parent, *child; + + parent = g_hash_table_lookup (srv->workers, + GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid)); + + if (parent == NULL) { + msg_err ("cannot find parent for a forked process %P (%P child)", + cmd->cmd.on_fork.ppid, cmd->cmd.on_fork.cpid); + + return; + } + + if (cmd->cmd.on_fork.state == child_dead) { + /* We need to remove stale worker */ + child = g_hash_table_lookup (srv->workers, + GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid)); + + if (child == NULL) { + msg_err ("cannot find child for a forked process %P (%P parent)", + cmd->cmd.on_fork.cpid, cmd->cmd.on_fork.ppid); + + return; + } + + g_hash_table_remove (srv->workers, + GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid)); + g_free (child); + } + else { + child = g_malloc0 (sizeof (struct rspamd_worker)); + child->srv = srv; + child->type = parent->type; + child->pid = cmd->cmd.on_fork.cpid; + g_hash_table_insert (srv->workers, + GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid), child); + } +} + +struct rspamd_srv_cbdata { + struct rspamd_worker *worker; + struct rspamd_main *srv; +}; + static void rspamd_srv_handler (gint fd, short what, gpointer ud) { + struct rspamd_srv_cbdata *cbd; struct rspamd_worker *worker; - struct rspamd_srv_command cmd; + static struct rspamd_srv_command cmd; struct rspamd_main *srv; struct rspamd_srv_reply_data *rdata; struct msghdr msg; struct cmsghdr *cmsg; - struct iovec iov; - guchar fdspace[CMSG_SPACE(sizeof (int))]; + static struct iovec iov; + static guchar fdspace[CMSG_SPACE(sizeof (int))]; gint *spair, rfd = -1; gchar *nid; struct rspamd_control_command wcmd; gssize r; if (what == EV_READ) { - worker = ud; - srv = worker->srv; + cbd = ud; + worker = cbd->worker; + srv = cbd->srv; iov.iov_base = &cmd; iov.iov_len = sizeof (cmd); memset (&msg, 0, sizeof (msg)); @@ -772,6 +822,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) else { rdata = g_slice_alloc0 (sizeof (*rdata)); rdata->worker = worker; + rdata->srv = srv; rdata->rep.id = cmd.id; rdata->rep.type = cmd.type; rdata->fd = -1; @@ -814,6 +865,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) wcmd.cmd.hs_loaded.forced = cmd.cmd.hs_loaded.forced; rspamd_control_broadcast_cmd (srv, &wcmd, rfd, rspamd_control_hs_io_handler, NULL); + return; break; case RSPAMD_SRV_MONITORED_CHANGE: /* Broadcast command to all workers */ @@ -826,6 +878,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) wcmd.cmd.monitored_change.sender = cmd.cmd.monitored_change.sender; rspamd_control_broadcast_cmd (srv, &wcmd, rfd, rspamd_control_hs_io_handler, NULL); + return; break; case RSPAMD_SRV_LOG_PIPE: memset (&wcmd, 0, sizeof (wcmd)); @@ -833,6 +886,11 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) wcmd.cmd.log_pipe.type = cmd.cmd.log_pipe.type; rspamd_control_broadcast_cmd (srv, &wcmd, rfd, rspamd_control_log_pipe_io_handler, NULL); + return; + break; + case RSPAMD_SRV_ON_FORK: + rdata->rep.reply.on_fork.status = 0; + rspamd_control_handle_on_fork (&cmd, srv); break; default: msg_err ("unknown command type: %d", cmd.type); @@ -857,7 +915,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) else if (what == EV_WRITE) { rdata = ud; worker = rdata->worker; - srv = worker->srv; + srv = rdata->srv; memset (&msg, 0, sizeof (msg)); @@ -897,13 +955,18 @@ rspamd_srv_handler (gint fd, short what, gpointer ud) } void -rspamd_srv_start_watching (struct rspamd_worker *worker, +rspamd_srv_start_watching (struct rspamd_main *srv, + struct rspamd_worker *worker, struct event_base *ev_base) { + struct rspamd_srv_cbdata *cbd; g_assert (worker != NULL); + cbd = rspamd_mempool_alloc (srv->server_pool, sizeof (*cbd)); + cbd->worker = worker; + cbd->srv = srv; event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST, - rspamd_srv_handler, worker); + rspamd_srv_handler, cbd); event_base_set (ev_base, &worker->srv_ev); event_add (&worker->srv_ev, NULL); } diff --git a/src/libserver/rspamd_control.h b/src/libserver/rspamd_control.h index ba26cd187..bc42b662e 100644 --- a/src/libserver/rspamd_control.h +++ b/src/libserver/rspamd_control.h @@ -41,6 +41,7 @@ enum rspamd_srv_type { RSPAMD_SRV_HYPERSCAN_LOADED, RSPAMD_SRV_MONITORED_CHANGE, RSPAMD_SRV_LOG_PIPE, + RSPAMD_SRV_ON_FORK, }; enum rspamd_log_pipe_type { @@ -144,6 +145,14 @@ struct rspamd_srv_command { struct { enum rspamd_log_pipe_type type; } log_pipe; + struct { + pid_t ppid; + pid_t cpid; + enum { + child_create = 0, + child_dead, + } state; + } on_fork; } cmd; }; @@ -163,6 +172,9 @@ struct rspamd_srv_reply { struct { enum rspamd_log_pipe_type type; } log_pipe; + struct { + gint status; + } on_fork; } reply; }; @@ -200,7 +212,8 @@ void rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker, /** * Start watching on srv pipe */ -void rspamd_srv_start_watching (struct rspamd_worker *worker, +void rspamd_srv_start_watching (struct rspamd_main *srv, + struct rspamd_worker *worker, struct event_base *ev_base); diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index 1a564786f..88e775b54 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -617,7 +617,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, close (wrk->srv_pipe[1]); rspamd_socket_nonblocking (wrk->control_pipe[0]); rspamd_socket_nonblocking (wrk->srv_pipe[0]); - rspamd_srv_start_watching (wrk, ev_base); + rspamd_srv_start_watching (rspamd_main, wrk, ev_base); /* Insert worker into worker's table, pid is index */ g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER ( wrk->pid), wrk);