summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-09-21 15:18:54 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-09-21 15:18:54 +0400
commit16b6241644a83aeef2711c2d7286b9f878b1c6f0 (patch)
treef3391413c630546c07fc65ab6d9883f23ec445c8
parent427315a198e3542ce3fabc343b8f9f7a396cb895 (diff)
downloadrspamd-16b6241644a83aeef2711c2d7286b9f878b1c6f0.tar.gz
rspamd-16b6241644a83aeef2711c2d7286b9f878b1c6f0.zip
* Cleanify logic of processes dispatcher
-rw-r--r--src/main.c100
-rw-r--r--src/main.h3
-rw-r--r--src/util.c9
-rw-r--r--src/util.h2
4 files changed, 69 insertions, 45 deletions
diff --git a/src/main.c b/src/main.c
index 3cb10e221..1c89efd5a 100644
--- a/src/main.c
+++ b/src/main.c
@@ -71,6 +71,8 @@ extern void xs_init(pTHX);
extern PerlInterpreter *perl_interpreter;
#endif
+/* Active workers */
+static GList *active_workers = NULL;
/* List of workers that are pending to start */
static GList *workers_pending = NULL;
@@ -284,7 +286,7 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker));
if (cur) {
bzero (cur, sizeof (struct rspamd_worker));
- TAILQ_INSERT_HEAD (&rspamd->workers, cur, next);
+ active_workers = g_list_prepend (active_workers, cur);
cur->srv = rspamd;
cur->type = cf->type;
cur->pid = fork();
@@ -444,6 +446,25 @@ spawn_workers (struct rspamd_main *rspamd)
}
}
+static const char *
+get_process_type (enum process_type type)
+{
+ switch (type) {
+ case TYPE_MAIN:
+ return "main";
+ case TYPE_WORKER:
+ return "worker";
+ case TYPE_FUZZY:
+ return "fuzzy";
+ case TYPE_CONTROLLER:
+ return "controller";
+ case TYPE_LMTP:
+ return "lmtp";
+ }
+
+ return NULL;
+}
+
int
main (int argc, char **argv, char **env)
{
@@ -451,7 +472,7 @@ main (int argc, char **argv, char **env)
struct module_ctx *cur_module = NULL;
int res = 0, i;
struct sigaction signals;
- struct rspamd_worker *cur, *cur_tmp, *active_worker;
+ struct rspamd_worker *cur, *active_worker;
struct rlimit rlim;
struct metric *metric;
struct cache_item *item;
@@ -656,8 +677,6 @@ main (int argc, char **argv, char **env)
/* Block signals to use sigsuspend in future */
sigprocmask(SIG_BLOCK, &signals.sa_mask, NULL);
- TAILQ_INIT (&rspamd->workers);
-
setproctitle ("main process");
/* Init statfile pool */
@@ -699,7 +718,7 @@ main (int argc, char **argv, char **env)
sigsuspend (&signals.sa_mask);
if (do_terminate) {
msg_debug ("main: catch termination signal, waiting for childs");
- pass_signal_worker (&rspamd->workers, SIGTERM);
+ pass_signal_worker (active_workers, SIGTERM);
break;
}
if (child_dead) {
@@ -707,37 +726,38 @@ main (int argc, char **argv, char **env)
msg_debug ("main: catch SIGCHLD signal, finding terminated worker");
/* Remove dead child form childs list */
wrk = waitpid (0, &res, 0);
- TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
+ l = g_list_first (active_workers);
+ while (l) {
+ cur = l->data;
if (wrk == cur->pid) {
/* Catch situations if active worker is abnormally terminated */
if (cur == active_worker) {
active_worker = NULL;
}
- TAILQ_REMOVE(&rspamd->workers, cur, next);
+ active_workers = g_list_remove_link (active_workers, l);
+
if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
/* Normal worker termination, do not fork one more */
msg_info ("main: %s process %d terminated normally",
- (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
- /* But respawn controller */
- if (cur->type == TYPE_CONTROLLER) {
- fork_worker (rspamd, cur->cf);
- }
+ get_process_type (cur->type), cur->pid);
}
else {
if (WIFSIGNALED (res)) {
msg_warn ("main: %s process %d terminated abnormally by signal: %d",
- (cur->type == TYPE_CONTROLLER) ? "controller" : "worker",
+ get_process_type (cur->type),
cur->pid, WTERMSIG(res));
}
else {
msg_warn ("main: %s process %d terminated abnormally",
- (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
+ get_process_type (cur->type), cur->pid);
}
/* Fork another worker in replace of dead one */
delay_fork (cur->cf);
}
+ g_list_free_1 (l);
g_free (cur);
}
+ l = g_list_next (l);
}
}
if (do_restart) {
@@ -745,37 +765,36 @@ main (int argc, char **argv, char **env)
do_reopen_log = 1;
msg_info ("main: rspamd " RVERSION " is restarting");
- if (active_worker == NULL) {
- /* reread_config (rspamd); */
- TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
- if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP || cur->type == TYPE_FUZZY) {
- /* Start new workers that would reread configuration */
- cur->pending = FALSE;
- active_worker = fork_worker (rspamd, cur->cf);
- active_worker->pending = TRUE;
- }
- /* Immideately send termination request to conroller and wait for SIGCHLD */
- if (cur->type == TYPE_CONTROLLER) {
- kill (cur->pid, SIGUSR2);
- cur->is_dying = 1;
- }
- }
- }
- /* Do not start new workers until active worker is not ready for accept */
+ l = g_list_first (active_workers);
+ while (l) {
+ cur = l->data;
+ /* Start new workers that would reread configuration */
+ cur->pending = FALSE;
+ active_worker = fork_worker (rspamd, cur->cf);
+ active_worker->pending = TRUE;
+ l = g_list_next (l);
+ }
}
if (child_ready) {
child_ready = 0;
if (active_worker != NULL) {
- msg_info ("main: worker process %d has been successfully started", active_worker->pid);
- TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
- if (!cur->pending && !cur->is_dying && cur->type != TYPE_CONTROLLER) {
+ l = g_list_first (active_workers);
+ while (l) {
+ cur = l->data;
+ if (!cur->pending && !cur->is_dying) {
/* Send to old workers SIGUSR2 */
kill (cur->pid, SIGUSR2);
cur->is_dying = 1;
}
+ else {
+ msg_info ("main: %s process %d has been successfully started",
+ get_process_type (cur->type), cur->pid);
+ }
+ l = g_list_next (l);
}
}
+ active_worker = NULL;
}
if (got_alarm) {
got_alarm = 0;
@@ -784,13 +803,16 @@ main (int argc, char **argv, char **env)
}
/* Wait for workers termination */
- while (!TAILQ_EMPTY(&rspamd->workers)) {
- cur = TAILQ_FIRST(&rspamd->workers);
+ l = g_list_first (active_workers);
+ while (l) {
+ cur = l->data;
waitpid (cur->pid, &res, 0);
- msg_debug ("main(cleaning): worker process %d terminated", cur->pid);
- TAILQ_REMOVE(&rspamd->workers, cur, next);
- g_free(cur);
+ msg_debug ("main(cleaning): %s process %d terminated", get_process_type (cur->type), cur->pid);
+ g_free (cur);
+ l = g_list_next (l);
}
+
+ g_list_free (active_workers);
msg_info ("main: terminating...");
diff --git a/src/main.h b/src/main.h
index 0590c8e5c..d31f09943 100644
--- a/src/main.h
+++ b/src/main.h
@@ -64,7 +64,6 @@ struct rspamd_worker {
struct event sig_ev; /**< signals event */
struct event bind_ev; /**< socket events */
struct worker_conf *cf; /**< worker config data */
- TAILQ_ENTRY (rspamd_worker) next; /**< chain link to next worker */
};
struct pidfh;
@@ -101,8 +100,6 @@ struct rspamd_main {
memory_pool_t *server_pool; /**< server's memory pool */
statfile_pool_t *statfile_pool; /**< shared statfiles pool */
-
- TAILQ_HEAD (workq, rspamd_worker) workers; /**< linked list of workers */
};
struct counter_data {
diff --git a/src/util.c b/src/util.c
index 7d074a174..ba06c570e 100644
--- a/src/util.c
+++ b/src/util.c
@@ -288,11 +288,16 @@ init_signals (struct sigaction *signals, sig_t sig_handler)
}
void
-pass_signal_worker (struct workq *workers, int signo)
+pass_signal_worker (GList *workers, int signo)
{
struct rspamd_worker *cur;
- TAILQ_FOREACH (cur, workers, next) {
+ GList *l;
+
+ l = workers;
+ while (l) {
+ cur = l->data;
kill (cur->pid, signo);
+ l = g_list_next (l);
}
}
diff --git a/src/util.h b/src/util.h
index d871288ca..7f30b5721 100644
--- a/src/util.h
+++ b/src/util.h
@@ -24,7 +24,7 @@ int event_make_socket_nonblocking(int);
/* Init signals */
void init_signals (struct sigaction *, sig_t);
/* Send specified signal to each worker */
-void pass_signal_worker (struct workq *, int );
+void pass_signal_worker (GList *, int );
/* Convert string to lowercase */
void convert_to_lowercase (char *str, unsigned int size);