Browse Source

[Feature] Implement forked workers children monitoring

tags/1.7.0
Vsevolod Stakhov 6 years ago
parent
commit
660d3e9967
3 changed files with 90 additions and 14 deletions
  1. 75
    12
      src/libserver/rspamd_control.c
  2. 14
    1
      src/libserver/rspamd_control.h
  3. 1
    1
      src/libserver/worker_util.c

+ 75
- 12
src/libserver/rspamd_control.c View File

@@ -599,10 +599,10 @@ static void
rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
{
struct rspamd_worker_control_data *cd = ud;
struct rspamd_control_command cmd;
struct msghdr msg;
struct iovec iov;
guchar fdspace[CMSG_SPACE(sizeof (int))];
static struct rspamd_control_command cmd;
static struct msghdr msg;
static struct iovec iov;
static guchar fdspace[CMSG_SPACE(sizeof (int))];
gint rfd = -1;
gssize r;

@@ -697,6 +697,7 @@ rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker,

struct rspamd_srv_reply_data {
struct rspamd_worker *worker;
struct rspamd_main *srv;
gint fd;
struct rspamd_srv_reply rep;
};
@@ -725,25 +726,74 @@ rspamd_control_log_pipe_io_handler (gint fd, short what, gpointer ud)
g_slice_free1 (sizeof (*elt), elt);
}

static void
rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
struct rspamd_main *srv)
{
struct rspamd_worker *parent, *child;

parent = g_hash_table_lookup (srv->workers,
GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid));

if (parent == NULL) {
msg_err ("cannot find parent for a forked process %P (%P child)",
cmd->cmd.on_fork.ppid, cmd->cmd.on_fork.cpid);

return;
}

if (cmd->cmd.on_fork.state == child_dead) {
/* We need to remove stale worker */
child = g_hash_table_lookup (srv->workers,
GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid));

if (child == NULL) {
msg_err ("cannot find child for a forked process %P (%P parent)",
cmd->cmd.on_fork.cpid, cmd->cmd.on_fork.ppid);

return;
}

g_hash_table_remove (srv->workers,
GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid));
g_free (child);
}
else {
child = g_malloc0 (sizeof (struct rspamd_worker));
child->srv = srv;
child->type = parent->type;
child->pid = cmd->cmd.on_fork.cpid;
g_hash_table_insert (srv->workers,
GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid), child);
}
}

struct rspamd_srv_cbdata {
struct rspamd_worker *worker;
struct rspamd_main *srv;
};

static void
rspamd_srv_handler (gint fd, short what, gpointer ud)
{
struct rspamd_srv_cbdata *cbd;
struct rspamd_worker *worker;
struct rspamd_srv_command cmd;
static struct rspamd_srv_command cmd;
struct rspamd_main *srv;
struct rspamd_srv_reply_data *rdata;
struct msghdr msg;
struct cmsghdr *cmsg;
struct iovec iov;
guchar fdspace[CMSG_SPACE(sizeof (int))];
static struct iovec iov;
static guchar fdspace[CMSG_SPACE(sizeof (int))];
gint *spair, rfd = -1;
gchar *nid;
struct rspamd_control_command wcmd;
gssize r;

if (what == EV_READ) {
worker = ud;
srv = worker->srv;
cbd = ud;
worker = cbd->worker;
srv = cbd->srv;
iov.iov_base = &cmd;
iov.iov_len = sizeof (cmd);
memset (&msg, 0, sizeof (msg));
@@ -772,6 +822,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
else {
rdata = g_slice_alloc0 (sizeof (*rdata));
rdata->worker = worker;
rdata->srv = srv;
rdata->rep.id = cmd.id;
rdata->rep.type = cmd.type;
rdata->fd = -1;
@@ -814,6 +865,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
wcmd.cmd.hs_loaded.forced = cmd.cmd.hs_loaded.forced;
rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_hs_io_handler, NULL);
return;
break;
case RSPAMD_SRV_MONITORED_CHANGE:
/* Broadcast command to all workers */
@@ -826,6 +878,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
wcmd.cmd.monitored_change.sender = cmd.cmd.monitored_change.sender;
rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_hs_io_handler, NULL);
return;
break;
case RSPAMD_SRV_LOG_PIPE:
memset (&wcmd, 0, sizeof (wcmd));
@@ -833,6 +886,11 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
wcmd.cmd.log_pipe.type = cmd.cmd.log_pipe.type;
rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_log_pipe_io_handler, NULL);
return;
break;
case RSPAMD_SRV_ON_FORK:
rdata->rep.reply.on_fork.status = 0;
rspamd_control_handle_on_fork (&cmd, srv);
break;
default:
msg_err ("unknown command type: %d", cmd.type);
@@ -857,7 +915,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
else if (what == EV_WRITE) {
rdata = ud;
worker = rdata->worker;
srv = worker->srv;
srv = rdata->srv;

memset (&msg, 0, sizeof (msg));

@@ -897,13 +955,18 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
}

void
rspamd_srv_start_watching (struct rspamd_worker *worker,
rspamd_srv_start_watching (struct rspamd_main *srv,
struct rspamd_worker *worker,
struct event_base *ev_base)
{
struct rspamd_srv_cbdata *cbd;
g_assert (worker != NULL);

cbd = rspamd_mempool_alloc (srv->server_pool, sizeof (*cbd));
cbd->worker = worker;
cbd->srv = srv;
event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST,
rspamd_srv_handler, worker);
rspamd_srv_handler, cbd);
event_base_set (ev_base, &worker->srv_ev);
event_add (&worker->srv_ev, NULL);
}

+ 14
- 1
src/libserver/rspamd_control.h View File

@@ -41,6 +41,7 @@ enum rspamd_srv_type {
RSPAMD_SRV_HYPERSCAN_LOADED,
RSPAMD_SRV_MONITORED_CHANGE,
RSPAMD_SRV_LOG_PIPE,
RSPAMD_SRV_ON_FORK,
};

enum rspamd_log_pipe_type {
@@ -144,6 +145,14 @@ struct rspamd_srv_command {
struct {
enum rspamd_log_pipe_type type;
} log_pipe;
struct {
pid_t ppid;
pid_t cpid;
enum {
child_create = 0,
child_dead,
} state;
} on_fork;
} cmd;
};

@@ -163,6 +172,9 @@ struct rspamd_srv_reply {
struct {
enum rspamd_log_pipe_type type;
} log_pipe;
struct {
gint status;
} on_fork;
} reply;
};

@@ -200,7 +212,8 @@ void rspamd_control_worker_add_cmd_handler (struct rspamd_worker *worker,
/**
* Start watching on srv pipe
*/
void rspamd_srv_start_watching (struct rspamd_worker *worker,
void rspamd_srv_start_watching (struct rspamd_main *srv,
struct rspamd_worker *worker,
struct event_base *ev_base);



+ 1
- 1
src/libserver/worker_util.c View File

@@ -617,7 +617,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
close (wrk->srv_pipe[1]);
rspamd_socket_nonblocking (wrk->control_pipe[0]);
rspamd_socket_nonblocking (wrk->srv_pipe[0]);
rspamd_srv_start_watching (wrk, ev_base);
rspamd_srv_start_watching (rspamd_main, wrk, ev_base);
/* Insert worker into worker's table, pid is index */
g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (
wrk->pid), wrk);

Loading…
Cancel
Save