aboutsummaryrefslogtreecommitdiffstats
path: root/src/main.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-10-30 18:30:51 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-10-30 18:30:51 +0300
commitbb1b1f7889abe5c98a7628ba82e07b70a2142658 (patch)
treead51257652a78e63bc030d517ba6a55241a69e3f /src/main.c
parent1290cafef2180e5435cbbfd5a72527d0be2f4970 (diff)
downloadrspamd-bb1b1f7889abe5c98a7628ba82e07b70a2142658.tar.gz
rspamd-bb1b1f7889abe5c98a7628ba82e07b70a2142658.zip
* Implement new system of managing rspamd processes
Diffstat (limited to 'src/main.c')
-rw-r--r--src/main.c137
1 files changed, 60 insertions, 77 deletions
diff --git a/src/main.c b/src/main.c
index 702944467..4b811bb36 100644
--- a/src/main.c
+++ b/src/main.c
@@ -57,7 +57,6 @@ static struct rspamd_worker *fork_worker (struct rspamd_main *, struct worker
sig_atomic_t do_restart;
sig_atomic_t do_terminate;
sig_atomic_t child_dead;
-sig_atomic_t child_ready;
sig_atomic_t got_alarm;
extern int yynerrs;
@@ -71,8 +70,6 @@ extern void xs_init (pTHX);
extern PerlInterpreter *perl_interpreter;
#endif
-/* Active workers */
-static GList *active_workers = NULL;
/* List of workers that are pending to start */
static GList *workers_pending = NULL;
@@ -93,7 +90,7 @@ sig_handler (int signo)
child_dead = 1;
break;
case SIGUSR2:
- child_ready = 1;
+ /* Do nothing */
break;
case SIGALRM:
got_alarm = 1;
@@ -285,7 +282,7 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker));
if (cur) {
bzero (cur, sizeof (struct rspamd_worker));
- active_workers = g_list_prepend (active_workers, cur);
+ g_queue_push_head (cf->active_workers, cur);
cur->srv = rspamd;
cur->type = cf->type;
cur->pid = fork ();
@@ -328,6 +325,10 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
pidfile_remove (rspamd->pfh);
exit (-errno);
break;
+ default:
+ /* Insert worker into worker's table, pid is index */
+ g_hash_table_insert (rspamd->workers, GSIZE_TO_POINTER (cur->pid), cur);
+ break;
}
}
@@ -416,7 +417,7 @@ fork_delayed (struct rspamd_main *rspamd)
}
static void
-spawn_workers (struct rspamd_main *rspamd)
+spawn_workers (struct rspamd_main *rspamd, gboolean make_sockets)
{
GList *cur;
struct worker_conf *cf;
@@ -427,8 +428,8 @@ spawn_workers (struct rspamd_main *rspamd)
while (cur) {
cf = cur->data;
- /* Create listen socket */
- if (cf->type != TYPE_FUZZY) {
+ if (make_sockets && cf->has_socket) {
+ /* Create listen socket */
listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port, cf->bind_family, cf->bind_host);
if (listen_sock == -1) {
exit (-errno);
@@ -463,6 +464,29 @@ get_process_type (enum process_type type)
return NULL;
}
+static void
+kill_old_workers (gpointer key, gpointer value, gpointer unused)
+{
+ struct rspamd_worker *w = value;
+
+ kill (w->pid, SIGUSR2);
+ msg_info ("rspamd_restart: send signal to worker %ld", (long int)w->pid);
+}
+
+static gboolean
+wait_for_workers (gpointer key, gpointer value, gpointer unused)
+{
+ struct rspamd_worker *w = value;
+ int res = 0;
+
+ waitpid (w->pid, &res, 0);
+
+ msg_debug ("main(cleaning): %s process %d terminated", get_process_type (w->type), w->pid);
+ g_free (w);
+
+ return TRUE;
+}
+
int
main (int argc, char **argv, char **env)
{
@@ -470,7 +494,7 @@ main (int argc, char **argv, char **env)
struct module_ctx *cur_module = NULL;
int res = 0, i;
struct sigaction signals;
- struct rspamd_worker *cur, *active_worker;
+ struct rspamd_worker *cur;
struct rlimit rlim;
struct metric *metric;
struct cache_item *item;
@@ -495,9 +519,7 @@ main (int argc, char **argv, char **env)
do_terminate = 0;
do_restart = 0;
child_dead = 0;
- child_ready = 0;
do_reopen_log = 0;
- active_worker = NULL;
rspamd->stat = memory_pool_alloc_shared (rspamd->server_pool, sizeof (struct rspamd_stat));
bzero (rspamd->stat, sizeof (struct rspamd_stat));
@@ -704,8 +726,8 @@ main (int argc, char **argv, char **env)
l = g_list_next (l);
}
-
- spawn_workers (rspamd);
+ rspamd->workers = g_hash_table_new (g_direct_hash, g_direct_equal);
+ spawn_workers (rspamd, TRUE);
/* Signal processing cycle */
for (;;) {
@@ -714,7 +736,7 @@ main (int argc, char **argv, char **env)
sigsuspend (&signals.sa_mask);
if (do_terminate) {
msg_debug ("main: catch termination signal, waiting for childs");
- pass_signal_worker (active_workers, SIGTERM);
+ pass_signal_worker (rspamd->workers, SIGTERM);
break;
}
if (child_dead) {
@@ -722,35 +744,31 @@ main (int argc, char **argv, char **env)
msg_debug ("main: catch SIGCHLD signal, finding terminated worker");
/* Remove dead child form childs list */
wrk = waitpid (0, &res, 0);
- l = g_list_first (active_workers);
- while (l) {
- cur = l->data;
- if (wrk == cur->pid) {
- /* Catch situations if active worker is abnormally terminated */
- if (cur == active_worker) {
- active_worker = NULL;
- }
- active_workers = g_list_remove_link (active_workers, l);
+ if ((cur = g_hash_table_lookup (rspamd->workers, GSIZE_TO_POINTER (wrk))) != NULL) {
+ /* Unlink dead process from queue and hash table */
+
+ g_hash_table_remove (rspamd->workers, GSIZE_TO_POINTER (wrk));
+ g_queue_remove (cur->cf->active_workers, cur);
- if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
- /* Normal worker termination, do not fork one more */
- msg_info ("main: %s process %d terminated normally", get_process_type (cur->type), cur->pid);
+ if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
+ /* Normal worker termination, do not fork one more */
+ msg_info ("main: %s process %d terminated normally", get_process_type (cur->type), cur->pid);
+ }
+ else {
+ if (WIFSIGNALED (res)) {
+ msg_warn ("main: %s process %d terminated abnormally by signal: %d", get_process_type (cur->type), cur->pid, WTERMSIG (res));
}
else {
- if (WIFSIGNALED (res)) {
- msg_warn ("main: %s process %d terminated abnormally by signal: %d", get_process_type (cur->type), cur->pid, WTERMSIG (res));
- }
- else {
- msg_warn ("main: %s process %d terminated abnormally", get_process_type (cur->type), cur->pid);
- }
- /* Fork another worker in replace of dead one */
- delay_fork (cur->cf);
+ msg_warn ("main: %s process %d terminated abnormally", get_process_type (cur->type), cur->pid);
}
- g_list_free_1 (l);
- g_free (cur);
- break;
+ /* Fork another worker in replace of dead one */
+ delay_fork (cur->cf);
}
- l = g_list_next (l);
+
+ g_free (cur);
+ }
+ else {
+ msg_err ("main: got SIGCHLD, but pid %ld is not found in workers hash table, something goes wrong", (long int)wrk);
}
}
if (do_restart) {
@@ -758,35 +776,9 @@ main (int argc, char **argv, char **env)
do_reopen_log = 1;
msg_info ("main: rspamd " RVERSION " is restarting");
- l = g_list_first (active_workers);
- while (l) {
- cur = l->data;
- /* Start new workers that would reread configuration */
- cur->pending = FALSE;
- active_worker = fork_worker (rspamd, cur->cf);
- active_worker->pending = TRUE;
- l = g_list_next (l);
- }
- }
- if (child_ready) {
- child_ready = 0;
-
- if (active_worker != NULL) {
- l = g_list_first (active_workers);
- while (l) {
- cur = l->data;
- if (!cur->pending && !cur->is_dying) {
- /* Send to old workers SIGUSR2 */
- kill (cur->pid, SIGUSR2);
- cur->is_dying = 1;
- }
- else if (!cur->is_dying) {
- msg_info ("main: %s process %d has been successfully started", get_process_type (cur->type), cur->pid);
- }
- l = g_list_next (l);
- }
- }
- active_worker = NULL;
+ g_hash_table_foreach (rspamd->workers, kill_old_workers, NULL);
+ spawn_workers (rspamd, FALSE);
+
}
if (got_alarm) {
got_alarm = 0;
@@ -795,16 +787,7 @@ main (int argc, char **argv, char **env)
}
/* Wait for workers termination */
- l = g_list_first (active_workers);
- while (l) {
- cur = l->data;
- waitpid (cur->pid, &res, 0);
- msg_debug ("main(cleaning): %s process %d terminated", get_process_type (cur->type), cur->pid);
- g_free (cur);
- l = g_list_next (l);
- }
-
- g_list_free (active_workers);
+ g_hash_table_foreach_remove (rspamd->workers, wait_for_workers, NULL);
msg_info ("main: terminating...");