From 16b6241644a83aeef2711c2d7286b9f878b1c6f0 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 21 Sep 2009 15:18:54 +0400 Subject: [PATCH] * Cleanify logic of processes dispatcher --- src/main.c | 100 ++++++++++++++++++++++++++++++++--------------------- src/main.h | 3 -- src/util.c | 9 +++-- src/util.h | 2 +- 4 files changed, 69 insertions(+), 45 deletions(-) diff --git a/src/main.c b/src/main.c index 3cb10e221..1c89efd5a 100644 --- a/src/main.c +++ b/src/main.c @@ -71,6 +71,8 @@ extern void xs_init(pTHX); extern PerlInterpreter *perl_interpreter; #endif +/* Active workers */ +static GList *active_workers = NULL; /* List of workers that are pending to start */ static GList *workers_pending = NULL; @@ -284,7 +286,7 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf) cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker)); if (cur) { bzero (cur, sizeof (struct rspamd_worker)); - TAILQ_INSERT_HEAD (&rspamd->workers, cur, next); + active_workers = g_list_prepend (active_workers, cur); cur->srv = rspamd; cur->type = cf->type; cur->pid = fork(); @@ -444,6 +446,25 @@ spawn_workers (struct rspamd_main *rspamd) } } +static const char * +get_process_type (enum process_type type) +{ + switch (type) { + case TYPE_MAIN: + return "main"; + case TYPE_WORKER: + return "worker"; + case TYPE_FUZZY: + return "fuzzy"; + case TYPE_CONTROLLER: + return "controller"; + case TYPE_LMTP: + return "lmtp"; + } + + return NULL; +} + int main (int argc, char **argv, char **env) { @@ -451,7 +472,7 @@ main (int argc, char **argv, char **env) struct module_ctx *cur_module = NULL; int res = 0, i; struct sigaction signals; - struct rspamd_worker *cur, *cur_tmp, *active_worker; + struct rspamd_worker *cur, *active_worker; struct rlimit rlim; struct metric *metric; struct cache_item *item; @@ -656,8 +677,6 @@ main (int argc, char **argv, char **env) /* Block signals to use sigsuspend in future */ sigprocmask(SIG_BLOCK, &signals.sa_mask, NULL); - TAILQ_INIT (&rspamd->workers); - setproctitle ("main process"); /* Init statfile pool */ @@ -699,7 +718,7 @@ main (int argc, char **argv, char **env) sigsuspend (&signals.sa_mask); if (do_terminate) { msg_debug ("main: catch termination signal, waiting for childs"); - pass_signal_worker (&rspamd->workers, SIGTERM); + pass_signal_worker (active_workers, SIGTERM); break; } if (child_dead) { @@ -707,37 +726,38 @@ main (int argc, char **argv, char **env) msg_debug ("main: catch SIGCHLD signal, finding terminated worker"); /* Remove dead child form childs list */ wrk = waitpid (0, &res, 0); - TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) { + l = g_list_first (active_workers); + while (l) { + cur = l->data; if (wrk == cur->pid) { /* Catch situations if active worker is abnormally terminated */ if (cur == active_worker) { active_worker = NULL; } - TAILQ_REMOVE(&rspamd->workers, cur, next); + active_workers = g_list_remove_link (active_workers, l); + if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { /* Normal worker termination, do not fork one more */ msg_info ("main: %s process %d terminated normally", - (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid); - /* But respawn controller */ - if (cur->type == TYPE_CONTROLLER) { - fork_worker (rspamd, cur->cf); - } + get_process_type (cur->type), cur->pid); } else { if (WIFSIGNALED (res)) { msg_warn ("main: %s process %d terminated abnormally by signal: %d", - (cur->type == TYPE_CONTROLLER) ? "controller" : "worker", + get_process_type (cur->type), cur->pid, WTERMSIG(res)); } else { msg_warn ("main: %s process %d terminated abnormally", - (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid); + get_process_type (cur->type), cur->pid); } /* Fork another worker in replace of dead one */ delay_fork (cur->cf); } + g_list_free_1 (l); g_free (cur); } + l = g_list_next (l); } } if (do_restart) { @@ -745,37 +765,36 @@ main (int argc, char **argv, char **env) do_reopen_log = 1; msg_info ("main: rspamd " RVERSION " is restarting"); - if (active_worker == NULL) { - /* reread_config (rspamd); */ - TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) { - if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP || cur->type == TYPE_FUZZY) { - /* Start new workers that would reread configuration */ - cur->pending = FALSE; - active_worker = fork_worker (rspamd, cur->cf); - active_worker->pending = TRUE; - } - /* Immideately send termination request to conroller and wait for SIGCHLD */ - if (cur->type == TYPE_CONTROLLER) { - kill (cur->pid, SIGUSR2); - cur->is_dying = 1; - } - } - } - /* Do not start new workers until active worker is not ready for accept */ + l = g_list_first (active_workers); + while (l) { + cur = l->data; + /* Start new workers that would reread configuration */ + cur->pending = FALSE; + active_worker = fork_worker (rspamd, cur->cf); + active_worker->pending = TRUE; + l = g_list_next (l); + } } if (child_ready) { child_ready = 0; if (active_worker != NULL) { - msg_info ("main: worker process %d has been successfully started", active_worker->pid); - TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) { - if (!cur->pending && !cur->is_dying && cur->type != TYPE_CONTROLLER) { + l = g_list_first (active_workers); + while (l) { + cur = l->data; + if (!cur->pending && !cur->is_dying) { /* Send to old workers SIGUSR2 */ kill (cur->pid, SIGUSR2); cur->is_dying = 1; } + else { + msg_info ("main: %s process %d has been successfully started", + get_process_type (cur->type), cur->pid); + } + l = g_list_next (l); } } + active_worker = NULL; } if (got_alarm) { got_alarm = 0; @@ -784,13 +803,16 @@ main (int argc, char **argv, char **env) } /* Wait for workers termination */ - while (!TAILQ_EMPTY(&rspamd->workers)) { - cur = TAILQ_FIRST(&rspamd->workers); + l = g_list_first (active_workers); + while (l) { + cur = l->data; waitpid (cur->pid, &res, 0); - msg_debug ("main(cleaning): worker process %d terminated", cur->pid); - TAILQ_REMOVE(&rspamd->workers, cur, next); - g_free(cur); + msg_debug ("main(cleaning): %s process %d terminated", get_process_type (cur->type), cur->pid); + g_free (cur); + l = g_list_next (l); } + + g_list_free (active_workers); msg_info ("main: terminating..."); diff --git a/src/main.h b/src/main.h index 0590c8e5c..d31f09943 100644 --- a/src/main.h +++ b/src/main.h @@ -64,7 +64,6 @@ struct rspamd_worker { struct event sig_ev; /**< signals event */ struct event bind_ev; /**< socket events */ struct worker_conf *cf; /**< worker config data */ - TAILQ_ENTRY (rspamd_worker) next; /**< chain link to next worker */ }; struct pidfh; @@ -101,8 +100,6 @@ struct rspamd_main { memory_pool_t *server_pool; /**< server's memory pool */ statfile_pool_t *statfile_pool; /**< shared statfiles pool */ - - TAILQ_HEAD (workq, rspamd_worker) workers; /**< linked list of workers */ }; struct counter_data { diff --git a/src/util.c b/src/util.c index 7d074a174..ba06c570e 100644 --- a/src/util.c +++ b/src/util.c @@ -288,11 +288,16 @@ init_signals (struct sigaction *signals, sig_t sig_handler) } void -pass_signal_worker (struct workq *workers, int signo) +pass_signal_worker (GList *workers, int signo) { struct rspamd_worker *cur; - TAILQ_FOREACH (cur, workers, next) { + GList *l; + + l = workers; + while (l) { + cur = l->data; kill (cur->pid, signo); + l = g_list_next (l); } } diff --git a/src/util.h b/src/util.h index d871288ca..7f30b5721 100644 --- a/src/util.h +++ b/src/util.h @@ -24,7 +24,7 @@ int event_make_socket_nonblocking(int); /* Init signals */ void init_signals (struct sigaction *, sig_t); /* Send specified signal to each worker */ -void pass_signal_worker (struct workq *, int ); +void pass_signal_worker (GList *, int ); /* Convert string to lowercase */ void convert_to_lowercase (char *str, unsigned int size); -- 2.39.5