diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-10-30 18:30:51 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-10-30 18:30:51 +0300 |
commit | bb1b1f7889abe5c98a7628ba82e07b70a2142658 (patch) | |
tree | ad51257652a78e63bc030d517ba6a55241a69e3f /src/main.c | |
parent | 1290cafef2180e5435cbbfd5a72527d0be2f4970 (diff) | |
download | rspamd-bb1b1f7889abe5c98a7628ba82e07b70a2142658.tar.gz rspamd-bb1b1f7889abe5c98a7628ba82e07b70a2142658.zip |
* Implement new system of managing rspamd processes
Diffstat (limited to 'src/main.c')
-rw-r--r-- | src/main.c | 137 |
1 files changed, 60 insertions, 77 deletions
diff --git a/src/main.c b/src/main.c index 702944467..4b811bb36 100644 --- a/src/main.c +++ b/src/main.c @@ -57,7 +57,6 @@ static struct rspamd_worker *fork_worker (struct rspamd_main *, struct worker sig_atomic_t do_restart; sig_atomic_t do_terminate; sig_atomic_t child_dead; -sig_atomic_t child_ready; sig_atomic_t got_alarm; extern int yynerrs; @@ -71,8 +70,6 @@ extern void xs_init (pTHX); extern PerlInterpreter *perl_interpreter; #endif -/* Active workers */ -static GList *active_workers = NULL; /* List of workers that are pending to start */ static GList *workers_pending = NULL; @@ -93,7 +90,7 @@ sig_handler (int signo) child_dead = 1; break; case SIGUSR2: - child_ready = 1; + /* Do nothing */ break; case SIGALRM: got_alarm = 1; @@ -285,7 +282,7 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf) cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker)); if (cur) { bzero (cur, sizeof (struct rspamd_worker)); - active_workers = g_list_prepend (active_workers, cur); + g_queue_push_head (cf->active_workers, cur); cur->srv = rspamd; cur->type = cf->type; cur->pid = fork (); @@ -328,6 +325,10 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf) pidfile_remove (rspamd->pfh); exit (-errno); break; + default: + /* Insert worker into worker's table, pid is index */ + g_hash_table_insert (rspamd->workers, GSIZE_TO_POINTER (cur->pid), cur); + break; } } @@ -416,7 +417,7 @@ fork_delayed (struct rspamd_main *rspamd) } static void -spawn_workers (struct rspamd_main *rspamd) +spawn_workers (struct rspamd_main *rspamd, gboolean make_sockets) { GList *cur; struct worker_conf *cf; @@ -427,8 +428,8 @@ spawn_workers (struct rspamd_main *rspamd) while (cur) { cf = cur->data; - /* Create listen socket */ - if (cf->type != TYPE_FUZZY) { + if (make_sockets && cf->has_socket) { + /* Create listen socket */ listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port, cf->bind_family, cf->bind_host); if (listen_sock == -1) { exit (-errno); @@ -463,6 +464,29 @@ get_process_type (enum process_type type) return NULL; } +static void +kill_old_workers (gpointer key, gpointer value, gpointer unused) +{ + struct rspamd_worker *w = value; + + kill (w->pid, SIGUSR2); + msg_info ("rspamd_restart: send signal to worker %ld", (long int)w->pid); +} + +static gboolean +wait_for_workers (gpointer key, gpointer value, gpointer unused) +{ + struct rspamd_worker *w = value; + int res = 0; + + waitpid (w->pid, &res, 0); + + msg_debug ("main(cleaning): %s process %d terminated", get_process_type (w->type), w->pid); + g_free (w); + + return TRUE; +} + int main (int argc, char **argv, char **env) { @@ -470,7 +494,7 @@ main (int argc, char **argv, char **env) struct module_ctx *cur_module = NULL; int res = 0, i; struct sigaction signals; - struct rspamd_worker *cur, *active_worker; + struct rspamd_worker *cur; struct rlimit rlim; struct metric *metric; struct cache_item *item; @@ -495,9 +519,7 @@ main (int argc, char **argv, char **env) do_terminate = 0; do_restart = 0; child_dead = 0; - child_ready = 0; do_reopen_log = 0; - active_worker = NULL; rspamd->stat = memory_pool_alloc_shared (rspamd->server_pool, sizeof (struct rspamd_stat)); bzero (rspamd->stat, sizeof (struct rspamd_stat)); @@ -704,8 +726,8 @@ main (int argc, char **argv, char **env) l = g_list_next (l); } - - spawn_workers (rspamd); + rspamd->workers = g_hash_table_new (g_direct_hash, g_direct_equal); + spawn_workers (rspamd, TRUE); /* Signal processing cycle */ for (;;) { @@ -714,7 +736,7 @@ main (int argc, char **argv, char **env) sigsuspend (&signals.sa_mask); if (do_terminate) { msg_debug ("main: catch termination signal, waiting for childs"); - pass_signal_worker (active_workers, SIGTERM); + pass_signal_worker (rspamd->workers, SIGTERM); break; } if (child_dead) { @@ -722,35 +744,31 @@ main (int argc, char **argv, char **env) msg_debug ("main: catch SIGCHLD signal, finding terminated worker"); /* Remove dead child form childs list */ wrk = waitpid (0, &res, 0); - l = g_list_first (active_workers); - while (l) { - cur = l->data; - if (wrk == cur->pid) { - /* Catch situations if active worker is abnormally terminated */ - if (cur == active_worker) { - active_worker = NULL; - } - active_workers = g_list_remove_link (active_workers, l); + if ((cur = g_hash_table_lookup (rspamd->workers, GSIZE_TO_POINTER (wrk))) != NULL) { + /* Unlink dead process from queue and hash table */ + + g_hash_table_remove (rspamd->workers, GSIZE_TO_POINTER (wrk)); + g_queue_remove (cur->cf->active_workers, cur); - if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { - /* Normal worker termination, do not fork one more */ - msg_info ("main: %s process %d terminated normally", get_process_type (cur->type), cur->pid); + if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { + /* Normal worker termination, do not fork one more */ + msg_info ("main: %s process %d terminated normally", get_process_type (cur->type), cur->pid); + } + else { + if (WIFSIGNALED (res)) { + msg_warn ("main: %s process %d terminated abnormally by signal: %d", get_process_type (cur->type), cur->pid, WTERMSIG (res)); } else { - if (WIFSIGNALED (res)) { - msg_warn ("main: %s process %d terminated abnormally by signal: %d", get_process_type (cur->type), cur->pid, WTERMSIG (res)); - } - else { - msg_warn ("main: %s process %d terminated abnormally", get_process_type (cur->type), cur->pid); - } - /* Fork another worker in replace of dead one */ - delay_fork (cur->cf); + msg_warn ("main: %s process %d terminated abnormally", get_process_type (cur->type), cur->pid); } - g_list_free_1 (l); - g_free (cur); - break; + /* Fork another worker in replace of dead one */ + delay_fork (cur->cf); } - l = g_list_next (l); + + g_free (cur); + } + else { + msg_err ("main: got SIGCHLD, but pid %ld is not found in workers hash table, something goes wrong", (long int)wrk); } } if (do_restart) { @@ -758,35 +776,9 @@ main (int argc, char **argv, char **env) do_reopen_log = 1; msg_info ("main: rspamd " RVERSION " is restarting"); - l = g_list_first (active_workers); - while (l) { - cur = l->data; - /* Start new workers that would reread configuration */ - cur->pending = FALSE; - active_worker = fork_worker (rspamd, cur->cf); - active_worker->pending = TRUE; - l = g_list_next (l); - } - } - if (child_ready) { - child_ready = 0; - - if (active_worker != NULL) { - l = g_list_first (active_workers); - while (l) { - cur = l->data; - if (!cur->pending && !cur->is_dying) { - /* Send to old workers SIGUSR2 */ - kill (cur->pid, SIGUSR2); - cur->is_dying = 1; - } - else if (!cur->is_dying) { - msg_info ("main: %s process %d has been successfully started", get_process_type (cur->type), cur->pid); - } - l = g_list_next (l); - } - } - active_worker = NULL; + g_hash_table_foreach (rspamd->workers, kill_old_workers, NULL); + spawn_workers (rspamd, FALSE); + } if (got_alarm) { got_alarm = 0; @@ -795,16 +787,7 @@ main (int argc, char **argv, char **env) } /* Wait for workers termination */ - l = g_list_first (active_workers); - while (l) { - cur = l->data; - waitpid (cur->pid, &res, 0); - msg_debug ("main(cleaning): %s process %d terminated", get_process_type (cur->type), cur->pid); - g_free (cur); - l = g_list_next (l); - } - - g_list_free (active_workers); + g_hash_table_foreach_remove (rspamd->workers, wait_for_workers, NULL); msg_info ("main: terminating..."); |