]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Implement forked workers children monitoring
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 17 Jul 2017 07:40:00 +0000 (08:40 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 17 Jul 2017 07:40:00 +0000 (08:40 +0100)
src/libserver/rspamd_control.c
src/libserver/rspamd_control.h
src/libserver/worker_util.c

index 90dabc4e131d62229a66e2b4ed0596fc0c15fe90..ab2f736b6bbe31f90c4765b8a891c8b4272f5592 100644 (file)
@@ -599,10 +599,10 @@ static void
 rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
 {
        struct rspamd_worker_control_data *cd = ud;
-       struct rspamd_control_command cmd;
-       struct msghdr msg;
-       struct iovec iov;
-       guchar fdspace[CMSG_SPACE(sizeof (int))];
+       static struct rspamd_control_command cmd;
+       static struct msghdr msg;
+       static struct iovec iov;
+       static guchar fdspace[CMSG_SPACE(sizeof (int))];
        gint rfd = -1;
        gssize r;
 
@@ -697,6 +697,7 @@ rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker,
 
 struct rspamd_srv_reply_data {
        struct rspamd_worker *worker;
+       struct rspamd_main *srv;
        gint fd;
        struct rspamd_srv_reply rep;
 };
@@ -725,25 +726,74 @@ rspamd_control_log_pipe_io_handler (gint fd, short what, gpointer ud)
        g_slice_free1 (sizeof (*elt), elt);
 }
 
+static void
+rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
+               struct rspamd_main *srv)
+{
+       struct rspamd_worker *parent, *child;
+
+       parent = g_hash_table_lookup (srv->workers,
+                       GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid));
+
+       if (parent == NULL) {
+               msg_err ("cannot find parent for a forked process %P (%P child)",
+                               cmd->cmd.on_fork.ppid, cmd->cmd.on_fork.cpid);
+
+               return;
+       }
+
+       if (cmd->cmd.on_fork.state == child_dead) {
+               /* We need to remove stale worker */
+               child = g_hash_table_lookup (srv->workers,
+                               GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid));
+
+               if (child == NULL) {
+                       msg_err ("cannot find child for a forked process %P (%P parent)",
+                                       cmd->cmd.on_fork.cpid, cmd->cmd.on_fork.ppid);
+
+                       return;
+               }
+
+               g_hash_table_remove (srv->workers,
+                               GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid));
+               g_free (child);
+       }
+       else {
+               child = g_malloc0 (sizeof (struct rspamd_worker));
+               child->srv = srv;
+               child->type = parent->type;
+               child->pid = cmd->cmd.on_fork.cpid;
+               g_hash_table_insert (srv->workers,
+                               GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid), child);
+       }
+}
+
+struct rspamd_srv_cbdata {
+       struct rspamd_worker *worker;
+       struct rspamd_main *srv;
+};
+
 static void
 rspamd_srv_handler (gint fd, short what, gpointer ud)
 {
+       struct rspamd_srv_cbdata *cbd;
        struct rspamd_worker *worker;
-       struct rspamd_srv_command cmd;
+       static struct rspamd_srv_command cmd;
        struct rspamd_main *srv;
        struct rspamd_srv_reply_data *rdata;
        struct msghdr msg;
        struct cmsghdr *cmsg;
-       struct iovec iov;
-       guchar fdspace[CMSG_SPACE(sizeof (int))];
+       static struct iovec iov;
+       static guchar fdspace[CMSG_SPACE(sizeof (int))];
        gint *spair, rfd = -1;
        gchar *nid;
        struct rspamd_control_command wcmd;
        gssize r;
 
        if (what == EV_READ) {
-               worker = ud;
-               srv = worker->srv;
+               cbd = ud;
+               worker = cbd->worker;
+               srv = cbd->srv;
                iov.iov_base = &cmd;
                iov.iov_len = sizeof (cmd);
                memset (&msg, 0, sizeof (msg));
@@ -772,6 +822,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                else {
                        rdata = g_slice_alloc0 (sizeof (*rdata));
                        rdata->worker = worker;
+                       rdata->srv = srv;
                        rdata->rep.id = cmd.id;
                        rdata->rep.type = cmd.type;
                        rdata->fd = -1;
@@ -814,6 +865,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                                wcmd.cmd.hs_loaded.forced = cmd.cmd.hs_loaded.forced;
                                rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
                                                rspamd_control_hs_io_handler, NULL);
+                               return;
                                break;
                        case RSPAMD_SRV_MONITORED_CHANGE:
                                /* Broadcast command to all workers */
@@ -826,6 +878,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                                wcmd.cmd.monitored_change.sender = cmd.cmd.monitored_change.sender;
                                rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
                                                rspamd_control_hs_io_handler, NULL);
+                               return;
                                break;
                        case RSPAMD_SRV_LOG_PIPE:
                                memset (&wcmd, 0, sizeof (wcmd));
@@ -833,6 +886,11 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
                                wcmd.cmd.log_pipe.type = cmd.cmd.log_pipe.type;
                                rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
                                                rspamd_control_log_pipe_io_handler, NULL);
+                               return;
+                               break;
+                       case RSPAMD_SRV_ON_FORK:
+                               rdata->rep.reply.on_fork.status = 0;
+                               rspamd_control_handle_on_fork (&cmd, srv);
                                break;
                        default:
                                msg_err ("unknown command type: %d", cmd.type);
@@ -857,7 +915,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
        else if (what == EV_WRITE) {
                rdata = ud;
                worker = rdata->worker;
-               srv = worker->srv;
+               srv = rdata->srv;
 
                memset (&msg, 0, sizeof (msg));
 
@@ -897,13 +955,18 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
 }
 
 void
-rspamd_srv_start_watching (struct rspamd_worker *worker,
+rspamd_srv_start_watching (struct rspamd_main *srv,
+               struct rspamd_worker *worker,
                struct event_base *ev_base)
 {
+       struct rspamd_srv_cbdata *cbd;
        g_assert (worker != NULL);
 
+       cbd = rspamd_mempool_alloc (srv->server_pool, sizeof (*cbd));
+       cbd->worker = worker;
+       cbd->srv = srv;
        event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST,
-                       rspamd_srv_handler, worker);
+                       rspamd_srv_handler, cbd);
        event_base_set (ev_base, &worker->srv_ev);
        event_add (&worker->srv_ev, NULL);
 }
index ba26cd1879d6c4fcc79ae1a239aacddd7e9e9037..bc42b662e7730a3712606d2baa4393ee070c920e 100644 (file)
@@ -41,6 +41,7 @@ enum rspamd_srv_type {
        RSPAMD_SRV_HYPERSCAN_LOADED,
        RSPAMD_SRV_MONITORED_CHANGE,
        RSPAMD_SRV_LOG_PIPE,
+       RSPAMD_SRV_ON_FORK,
 };
 
 enum rspamd_log_pipe_type {
@@ -144,6 +145,14 @@ struct rspamd_srv_command {
                struct {
                        enum rspamd_log_pipe_type type;
                } log_pipe;
+               struct {
+                       pid_t ppid;
+                       pid_t cpid;
+                       enum {
+                               child_create = 0,
+                               child_dead,
+                       } state;
+               } on_fork;
        } cmd;
 };
 
@@ -163,6 +172,9 @@ struct rspamd_srv_reply {
                struct {
                        enum rspamd_log_pipe_type type;
                } log_pipe;
+               struct {
+                       gint status;
+               } on_fork;
        } reply;
 };
 
@@ -200,7 +212,8 @@ void rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker,
 /**
  * Start watching on srv pipe
  */
-void rspamd_srv_start_watching (struct rspamd_worker *worker,
+void rspamd_srv_start_watching (struct rspamd_main *srv,
+               struct rspamd_worker *worker,
                struct event_base *ev_base);
 
 
index 1a564786fd94447c0499fd306f0340dabab279d4..88e775b54e4131695fb2a36f816251f3762eccb2 100644 (file)
@@ -617,7 +617,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
                close (wrk->srv_pipe[1]);
                rspamd_socket_nonblocking (wrk->control_pipe[0]);
                rspamd_socket_nonblocking (wrk->srv_pipe[0]);
-               rspamd_srv_start_watching (wrk, ev_base);
+               rspamd_srv_start_watching (rspamd_main, wrk, ev_base);
                /* Insert worker into worker's table, pid is index */
                g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (
                                wrk->pid), wrk);