@@ -397,6 +397,10 @@ rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main, | |||
while (g_hash_table_iter_next (&it, &k, &v)) { | |||
wrk = v; | |||
if (wrk->control_pipe[0] == -1) { | |||
continue; | |||
} | |||
memset (&msg, 0, sizeof (msg)); | |||
/* Attach fd to the message */ | |||
@@ -433,8 +437,10 @@ rspamd_control_broadcast_cmd (struct rspamd_main *rspamd_main, | |||
DL_APPEND (res, rep_elt); | |||
} | |||
else { | |||
msg_err ("cannot write request to the worker %P (%s): %s", | |||
wrk->pid, g_quark_to_string (wrk->type), strerror (errno)); | |||
msg_err ("cannot write request to the worker %P(%s), fd: %d: %s", | |||
wrk->pid, g_quark_to_string (wrk->type), | |||
wrk->control_pipe[0], | |||
strerror (errno)); | |||
} | |||
} | |||
@@ -754,6 +760,7 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd, | |||
return; | |||
} | |||
REF_RELEASE (child->cf); | |||
g_hash_table_remove (srv->workers, | |||
GSIZE_TO_POINTER (cmd->cmd.on_fork.ppid)); | |||
g_free (child); | |||
@@ -763,6 +770,13 @@ rspamd_control_handle_on_fork (struct rspamd_srv_command *cmd, | |||
child->srv = srv; | |||
child->type = parent->type; | |||
child->pid = cmd->cmd.on_fork.cpid; | |||
child->srv_pipe[0] = -1; | |||
child->srv_pipe[1] = -1; | |||
child->control_pipe[0] = -1; | |||
child->control_pipe[1] = -1; | |||
child->cf = parent->cf; | |||
child->ppid = parent->pid; | |||
REF_RETAIN (child->cf); | |||
g_hash_table_insert (srv->workers, | |||
GSIZE_TO_POINTER (cmd->cmd.on_fork.cpid), child); | |||
} |
@@ -546,13 +546,14 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, | |||
wrk->index = index; | |||
wrk->ctx = cf->ctx; | |||
wrk->finish_actions = g_ptr_array_new (); | |||
wrk->ppid = getpid (); | |||
wrk->pid = fork (); | |||
switch (wrk->pid) { | |||
case 0: | |||
/* Update pid for logging */ | |||
rspamd_log_update_pid (cf->type, rspamd_main->logger); | |||
wrk->pid = getpid (); | |||
/* Init PRNG after fork */ | |||
rc = ottery_init (rspamd_main->cfg->libs_ctx->ottery_cfg); |
@@ -687,6 +687,10 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused) | |||
rspamd_main = w->srv; | |||
if (w->ppid != getpid ()) { | |||
return TRUE; | |||
} | |||
if (waitpid (w->pid, &res, WNOHANG) <= 0) { | |||
if (term_attempts < 0) { | |||
if (w->cf->worker->flags & RSPAMD_WORKER_KILLABLE) { | |||
@@ -1023,12 +1027,22 @@ rspamd_cld_handler (gint signo, short what, gpointer arg) | |||
} | |||
} | |||
event_del (&cur->srv_ev); | |||
/* We also need to clean descriptors left */ | |||
close (cur->control_pipe[0]); | |||
close (cur->srv_pipe[0]); | |||
if (cur->srv_pipe[0] != -1) { | |||
event_del (&cur->srv_ev); | |||
} | |||
if (cur->control_pipe[0] != -1) { | |||
/* We also need to clean descriptors left */ | |||
close (cur->control_pipe[0]); | |||
close (cur->srv_pipe[0]); | |||
} | |||
REF_RELEASE (cur->cf); | |||
g_ptr_array_free (cur->finish_actions, TRUE); | |||
if (cur->finish_actions) { | |||
g_ptr_array_free (cur->finish_actions, TRUE); | |||
} | |||
g_free (cur); | |||
} | |||
else { |
@@ -57,6 +57,7 @@ | |||
*/ | |||
struct rspamd_worker { | |||
pid_t pid; /**< pid of worker */ | |||
pid_t ppid; /**< pid of parent */ | |||
guint index; /**< index number */ | |||
guint nconns; /**< current connections count */ | |||
gboolean wanna_die; /**< worker is terminating */ |