]> source.dussan.org Git - rspamd.git/commitdiff
* Cleanify logic of processes dispatcher
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 21 Sep 2009 11:18:54 +0000 (15:18 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 21 Sep 2009 11:18:54 +0000 (15:18 +0400)
src/main.c
src/main.h
src/util.c
src/util.h

index 3cb10e22196bb625f22fc02ca36417d1fdfb1aa5..1c89efd5a6418d889274090690d98458722891a8 100644 (file)
@@ -71,6 +71,8 @@ extern void xs_init(pTHX);
 extern PerlInterpreter *perl_interpreter;
 #endif
 
+/* Active workers */
+static GList *active_workers = NULL;
 /* List of workers that are pending to start */
 static GList *workers_pending = NULL;
 
@@ -284,7 +286,7 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
        cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker));
        if (cur) {
                bzero (cur, sizeof (struct rspamd_worker));
-               TAILQ_INSERT_HEAD (&rspamd->workers, cur, next);
+        active_workers = g_list_prepend (active_workers, cur);
                cur->srv = rspamd;
                cur->type = cf->type;
                cur->pid = fork();
@@ -444,6 +446,25 @@ spawn_workers (struct rspamd_main *rspamd)
        }
 }
 
+static const char *
+get_process_type (enum process_type type)
+{
+       switch (type) {
+               case TYPE_MAIN:
+                       return "main";
+               case TYPE_WORKER:
+                       return "worker";
+               case TYPE_FUZZY:
+                       return "fuzzy";
+               case TYPE_CONTROLLER:
+                       return "controller";
+               case TYPE_LMTP:
+                       return "lmtp";
+       }
+
+       return NULL;
+}
+
 int 
 main (int argc, char **argv, char **env)
 {
@@ -451,7 +472,7 @@ main (int argc, char **argv, char **env)
        struct module_ctx *cur_module = NULL;
        int res = 0, i;
        struct sigaction signals;
-       struct rspamd_worker *cur, *cur_tmp, *active_worker;
+       struct rspamd_worker *cur, *active_worker;
        struct rlimit rlim;
        struct metric *metric;
        struct cache_item *item;
@@ -656,8 +677,6 @@ main (int argc, char **argv, char **env)
        /* Block signals to use sigsuspend in future */
        sigprocmask(SIG_BLOCK, &signals.sa_mask, NULL);
 
-       TAILQ_INIT (&rspamd->workers);
-
        setproctitle ("main process");
 
        /* Init statfile pool */
@@ -699,7 +718,7 @@ main (int argc, char **argv, char **env)
                sigsuspend (&signals.sa_mask);
                if (do_terminate) {
                        msg_debug ("main: catch termination signal, waiting for childs");
-                       pass_signal_worker (&rspamd->workers, SIGTERM);
+                       pass_signal_worker (active_workers, SIGTERM);
                        break;
                }
                if (child_dead) {
@@ -707,37 +726,38 @@ main (int argc, char **argv, char **env)
                        msg_debug ("main: catch SIGCHLD signal, finding terminated worker");
                        /* Remove dead child form childs list */
                        wrk = waitpid (0, &res, 0);
-                       TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
+            l = g_list_first (active_workers);
+            while (l) {
+                cur = l->data;
                                if (wrk == cur->pid) {
                                        /* Catch situations if active worker is abnormally terminated */
                                        if (cur == active_worker) {
                                                active_worker = NULL;
                                        }
-                                       TAILQ_REMOVE(&rspamd->workers, cur, next);
+                    active_workers = g_list_remove_link (active_workers, l);
+
                                        if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
                                                /* Normal worker termination, do not fork one more */
                                                msg_info ("main: %s process %d terminated normally", 
-                                                                       (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
-                                               /* But respawn controller */
-                                               if (cur->type == TYPE_CONTROLLER) {
-                                                       fork_worker (rspamd, cur->cf);
-                                               }
+                                                                       get_process_type (cur->type), cur->pid);
                                        }
                                        else {
                                                if (WIFSIGNALED (res)) {
                                                        msg_warn ("main: %s process %d terminated abnormally by signal: %d", 
-                                                                               (cur->type == TYPE_CONTROLLER) ? "controller" : "worker",
+                                                                               get_process_type (cur->type),
                                                                                cur->pid, WTERMSIG(res));
                                                }
                                                else {
                                                        msg_warn ("main: %s process %d terminated abnormally", 
-                                                                               (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
+                                                                               get_process_type (cur->type), cur->pid);
                                                }
                                                /* Fork another worker in replace of dead one */
                                                delay_fork (cur->cf);
                                        }
+                           g_list_free_1 (l);
                                        g_free (cur);
                                }
+                               l = g_list_next (l);
                        }
                }
                if (do_restart) {       
@@ -745,37 +765,36 @@ main (int argc, char **argv, char **env)
                        do_reopen_log = 1;
 
                        msg_info ("main: rspamd " RVERSION " is restarting");
-                       if (active_worker == NULL) {
-                               /* reread_config (rspamd); */
-                               TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
-                                       if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP || cur->type == TYPE_FUZZY) {
-                                               /* Start new workers that would reread configuration */
-                                               cur->pending = FALSE;
-                                               active_worker = fork_worker (rspamd, cur->cf);
-                                               active_worker->pending = TRUE;
-                                       }
-                                       /* Immideately send termination request to conroller and wait for SIGCHLD */
-                                       if (cur->type == TYPE_CONTROLLER) {
-                                               kill (cur->pid, SIGUSR2);
-                                               cur->is_dying = 1;
-                                       }
-                               }
-                       }
-                       /* Do not start new workers until active worker is not ready for accept */
+            l = g_list_first (active_workers);
+            while (l) {
+                cur = l->data;
+                /* Start new workers that would reread configuration */
+                               cur->pending = FALSE;
+                               active_worker = fork_worker (rspamd, cur->cf);
+                active_worker->pending = TRUE;
+                               l = g_list_next (l);
+            }
                }
                if (child_ready) {
                        child_ready = 0;
 
                        if (active_worker != NULL) {
-                               msg_info ("main: worker process %d has been successfully started", active_worker->pid);
-                               TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
-                                       if (!cur->pending && !cur->is_dying && cur->type != TYPE_CONTROLLER) {
+                               l = g_list_first (active_workers);
+                               while (l) {
+                                       cur = l->data;
+                                       if (!cur->pending && !cur->is_dying) {
                                                /* Send to old workers SIGUSR2 */
                                                kill (cur->pid, SIGUSR2);
                                                cur->is_dying = 1;
                                        }
+                                       else {
+                                               msg_info ("main: %s process %d has been successfully started", 
+                                                               get_process_type (cur->type), cur->pid);
+                                       }
+                                       l = g_list_next (l);
                                }
                        }
+                       active_worker = NULL;
                }
                if (got_alarm) {
                        got_alarm = 0;
@@ -784,13 +803,16 @@ main (int argc, char **argv, char **env)
        }
 
        /* Wait for workers termination */
-       while (!TAILQ_EMPTY(&rspamd->workers)) {
-               cur = TAILQ_FIRST(&rspamd->workers);
+       l = g_list_first (active_workers);
+       while (l) {
+               cur = l->data;
                waitpid (cur->pid, &res, 0);
-               msg_debug ("main(cleaning): worker process %d terminated", cur->pid);
-               TAILQ_REMOVE(&rspamd->workers, cur, next);
-               g_free(cur);
+               msg_debug ("main(cleaning): %s process %d terminated", get_process_type (cur->type), cur->pid);
+               g_free (cur);
+               l = g_list_next (l);
        }
+
+       g_list_free (active_workers);
        
        msg_info ("main: terminating...");
        
index 0590c8e5ca6986fa1be80a208e3b38ec31e5d7b2..d31f09943b365d6bfa159f1173ab4e61337d58de 100644 (file)
@@ -64,7 +64,6 @@ struct rspamd_worker {
        struct event sig_ev;                                                                            /**< signals event                                                                      */
        struct event bind_ev;                                                                           /**< socket events                                                                      */
        struct worker_conf *cf;                                                                         /**< worker config data                                                         */
-       TAILQ_ENTRY (rspamd_worker) next;                                                       /**< chain link to next worker                                          */
 };
 
 struct pidfh;
@@ -101,8 +100,6 @@ struct rspamd_main {
 
        memory_pool_t *server_pool;                                                                     /**< server's memory pool                                                       */
        statfile_pool_t *statfile_pool;                                                         /**< shared statfiles pool                                                      */
-
-       TAILQ_HEAD (workq, rspamd_worker) workers;                                      /**< linked list of workers                                                     */
 };
 
 struct counter_data {
index 7d074a1744beacbfa835a0a43e40c00d0b7d58cd..ba06c570ea61cdba53244f9031b235214a8e9324 100644 (file)
@@ -288,11 +288,16 @@ init_signals (struct sigaction *signals, sig_t sig_handler)
 }
 
 void
-pass_signal_worker (struct workq *workers, int signo)
+pass_signal_worker (GList *workers, int signo)
 {
        struct rspamd_worker *cur;
-       TAILQ_FOREACH (cur, workers, next) {
+       GList *l;
+
+       l = workers;
+       while (l) {
+               cur = l->data;
                kill (cur->pid, signo);
+               l = g_list_next (l);
        }
 }
 
index d871288cab712cd32ae056776dcc941f30b653b7..7f30b5721bf3a912400be12d6a615617ff68dfa0 100644 (file)
@@ -24,7 +24,7 @@ int event_make_socket_nonblocking(int);
 /* Init signals */
 void init_signals (struct sigaction *, sig_t);
 /* Send specified signal to each worker */
-void pass_signal_worker (struct workq *, int );
+void pass_signal_worker (GList *, int );
 /* Convert string to lowercase */
 void convert_to_lowercase (char *str, unsigned int size);