rspamd_control_default_worker_handler (gint fd, short what, gpointer ud)
{
struct rspamd_worker_control_data *cd = ud;
- struct rspamd_control_command cmd;
- struct msghdr msg;
- struct iovec iov;
- guchar fdspace[CMSG_SPACE(sizeof (int))];
+ static struct rspamd_control_command cmd;
+ static struct msghdr msg;
+ static struct iovec iov;
+ static guchar fdspace[CMSG_SPACE(sizeof (int))];
gint rfd = -1;
gssize r;
struct rspamd_srv_reply_data {
struct rspamd_worker *worker;
+ struct rspamd_main *srv;
gint fd;
struct rspamd_srv_reply rep;
};
g_slice_free1 (sizeof (*elt), elt);
}
+static void
+rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd,
+ struct rspamd_main *srv)
+{
+ struct rspamd_worker *parent, *child;
+
+ parent = g_hash_table_lookup (srv->workers,
+ GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid));
+
+ if (parent == NULL) {
+ msg_err ("cannot find parent for a forked process %P (%P child)",
+ cmd->cmd.on_fork.ppid, cmd->cmd.on_fork.cpid);
+
+ return;
+ }
+
+ if (cmd->cmd.on_fork.state == child_dead) {
+ /* We need to remove stale worker */
+ child = g_hash_table_lookup (srv->workers,
+ GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid));
+
+ if (child == NULL) {
+ msg_err ("cannot find child for a forked process %P (%P parent)",
+ cmd->cmd.on_fork.cpid, cmd->cmd.on_fork.ppid);
+
+ return;
+ }
+
+ g_hash_table_remove (srv->workers,
+ GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid));
+ g_free (child);
+ }
+ else {
+ child = g_malloc0 (sizeof (struct rspamd_worker));
+ child->srv = srv;
+ child->type = parent->type;
+ child->pid = cmd->cmd.on_fork.cpid;
+ g_hash_table_insert (srv->workers,
+ GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid), child);
+ }
+}
+
+struct rspamd_srv_cbdata {
+ struct rspamd_worker *worker;
+ struct rspamd_main *srv;
+};
+
static void
rspamd_srv_handler (gint fd, short what, gpointer ud)
{
+ struct rspamd_srv_cbdata *cbd;
struct rspamd_worker *worker;
- struct rspamd_srv_command cmd;
+ static struct rspamd_srv_command cmd;
struct rspamd_main *srv;
struct rspamd_srv_reply_data *rdata;
struct msghdr msg;
struct cmsghdr *cmsg;
- struct iovec iov;
- guchar fdspace[CMSG_SPACE(sizeof (int))];
+ static struct iovec iov;
+ static guchar fdspace[CMSG_SPACE(sizeof (int))];
gint *spair, rfd = -1;
gchar *nid;
struct rspamd_control_command wcmd;
gssize r;
if (what == EV_READ) {
- worker = ud;
- srv = worker->srv;
+ cbd = ud;
+ worker = cbd->worker;
+ srv = cbd->srv;
iov.iov_base = &cmd;
iov.iov_len = sizeof (cmd);
memset (&msg, 0, sizeof (msg));
else {
rdata = g_slice_alloc0 (sizeof (*rdata));
rdata->worker = worker;
+ rdata->srv = srv;
rdata->rep.id = cmd.id;
rdata->rep.type = cmd.type;
rdata->fd = -1;
wcmd.cmd.hs_loaded.forced = cmd.cmd.hs_loaded.forced;
rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_hs_io_handler, NULL);
+ return;
break;
case RSPAMD_SRV_MONITORED_CHANGE:
/* Broadcast command to all workers */
wcmd.cmd.monitored_change.sender = cmd.cmd.monitored_change.sender;
rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_hs_io_handler, NULL);
+ return;
break;
case RSPAMD_SRV_LOG_PIPE:
memset (&wcmd, 0, sizeof (wcmd));
wcmd.cmd.log_pipe.type = cmd.cmd.log_pipe.type;
rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
rspamd_control_log_pipe_io_handler, NULL);
+ return;
+ break;
+ case RSPAMD_SRV_ON_FORK:
+ rdata->rep.reply.on_fork.status = 0;
+ rspamd_control_handle_on_fork (&cmd, srv);
break;
default:
msg_err ("unknown command type: %d", cmd.type);
else if (what == EV_WRITE) {
rdata = ud;
worker = rdata->worker;
- srv = worker->srv;
+ srv = rdata->srv;
memset (&msg, 0, sizeof (msg));
}
void
-rspamd_srv_start_watching (struct rspamd_worker *worker,
+rspamd_srv_start_watching (struct rspamd_main *srv,
+ struct rspamd_worker *worker,
struct event_base *ev_base)
{
+ struct rspamd_srv_cbdata *cbd;
g_assert (worker != NULL);
+ cbd = rspamd_mempool_alloc (srv->server_pool, sizeof (*cbd));
+ cbd->worker = worker;
+ cbd->srv = srv;
event_set (&worker->srv_ev, worker->srv_pipe[0], EV_READ | EV_PERSIST,
- rspamd_srv_handler, worker);
+ rspamd_srv_handler, cbd);
event_base_set (ev_base, &worker->srv_ev);
event_add (&worker->srv_ev, NULL);
}