diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-06-17 19:31:48 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2009-06-17 19:31:48 +0400 |
commit | bca226772e9747a4587866a50122d4a8f7973b26 (patch) | |
tree | aae459617c9b3a7a82dd0b9e2a8b03be11e3ff52 /src/main.c | |
parent | 453ecf68e3b51941944dbc3b1dece11342be3810 (diff) | |
download | rspamd-bca226772e9747a4587866a50122d4a8f7973b26.tar.gz rspamd-bca226772e9747a4587866a50122d4a8f7973b26.zip |
* Introduce new system of workers spawning and configuring, now rspamd can be easily extended by new types of wrokers
* Rework config system and avoid from using queue (3) lists
* Upgrade version to 0.2.0 as config format is now incompatible with older one
Diffstat (limited to 'src/main.c')
-rw-r--r-- | src/main.c | 134 |
1 files changed, 69 insertions, 65 deletions
diff --git a/src/main.c b/src/main.c index 9b8992d8b..dc0226680 100644 --- a/src/main.c +++ b/src/main.c @@ -51,7 +51,7 @@ struct config_file *cfg; rspamd_hash_t *counters; static void sig_handler (int ); -static struct rspamd_worker * fork_worker (struct rspamd_main *, int, enum process_type); +static struct rspamd_worker * fork_worker (struct rspamd_main *, struct worker_conf *); sig_atomic_t do_restart; sig_atomic_t do_terminate; @@ -269,7 +269,7 @@ reread_config (struct rspamd_main *rspamd) } static struct rspamd_worker * -fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type) +fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf) { struct rspamd_worker *cur; /* Starting worker process */ @@ -278,12 +278,14 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type bzero (cur, sizeof (struct rspamd_worker)); TAILQ_INSERT_HEAD (&rspamd->workers, cur, next); cur->srv = rspamd; - cur->type = type; + cur->type = cf->type; cur->pid = fork(); + cur->cf = cf; switch (cur->pid) { case 0: - /* TODO: add worker code */ - switch (type) { + /* Drop privilleges */ + drop_priv (cfg); + switch (cf->type) { case TYPE_CONTROLLER: setproctitle ("controller process"); pidfile_close (rspamd->pfh); @@ -294,13 +296,13 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type setproctitle ("lmtp process"); pidfile_close (rspamd->pfh); msg_info ("fork_worker: starting lmtp process %d", getpid ()); - start_lmtp_worker (cur, listen_sock); + start_lmtp_worker (cur); case TYPE_WORKER: default: setproctitle ("worker process"); pidfile_close (rspamd->pfh); msg_info ("fork_worker: starting worker process %d", getpid ()); - start_worker (cur, listen_sock); + start_worker (cur); break; } break; @@ -316,35 +318,27 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type } static void -delay_fork (enum process_type type) +delay_fork (struct worker_conf *cf) { - workers_pending = g_list_prepend (workers_pending, GINT_TO_POINTER (type)); + workers_pending = g_list_prepend (workers_pending, cf); (void)alarm (SOFT_FORK_TIME); } -static void -fork_delayed (struct rspamd_main *rspamd, int listen_sock) -{ - GList *cur; - - while (workers_pending != NULL) { - cur = workers_pending; - workers_pending = g_list_remove_link (workers_pending, cur); - fork_worker (rspamd, listen_sock, GPOINTER_TO_INT (cur->data)); - g_list_free_1 (cur); - } -} static void dump_module_variables (gpointer key, gpointer value, gpointer data) -{ - LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = (struct moduleoptq *)value; - struct module_opt *cur, *tmp; +{ + GList *cur_opt; + struct module_opt *cur; + + cur_opt = (GList *)value; - LIST_FOREACH_SAFE (cur, cur_module_opt, next, tmp) { + while (cur_opt) { + cur = cur_opt->data; if (cur->value) { printf ("$%s = \"%s\"\n", cur->param, cur->value); } + cur_opt = g_list_next (cur_opt); } } @@ -388,12 +382,56 @@ create_listen_socket (struct in_addr *addr, int port, int family, char *path) return listen_sock; } +static void +fork_delayed (struct rspamd_main *rspamd) +{ + GList *cur; + struct worker_conf *cf; + + while (workers_pending != NULL) { + cur = workers_pending; + cf = cur->data; + + workers_pending = g_list_remove_link (workers_pending, cur); + fork_worker (rspamd, cf); + g_list_free_1 (cur); + } +} + +static void +spawn_workers (struct rspamd_main *rspamd) +{ + GList *cur; + struct worker_conf *cf; + int i, listen_sock; + + cur = cfg->workers; + + while (cur) { + cf = cur->data; + + /* Create listen socket */ + listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port, + cf->bind_family, cf->bind_host); + if (listen_sock == -1) { + exit(-errno); + } + cf->listen_sock = listen_sock; + + for (i = 0; i < cf->count; i++) { + fork_worker (rspamd, cf); + } + + cur = g_list_next (cur); + } +} + int main (int argc, char **argv, char **env) { struct rspamd_main *rspamd; struct module_ctx *cur_module = NULL; - int res = 0, i, listen_sock, lmtp_listen_sock; + int res = 0, i; struct sigaction signals; struct rspamd_worker *cur, *cur_tmp, *active_worker; struct rlimit rlim; @@ -498,23 +536,6 @@ main (int argc, char **argv, char **env) } - /* Create listen socket */ - listen_sock = create_listen_socket (&rspamd->cfg->bind_addr, rspamd->cfg->bind_port, - rspamd->cfg->bind_family, rspamd->cfg->bind_host); - if (listen_sock == -1) { - exit(-errno); - } - - if (cfg->lmtp_enable) { - lmtp_listen_sock = create_listen_socket (&rspamd->cfg->lmtp_addr, rspamd->cfg->lmtp_port, - rspamd->cfg->lmtp_family, rspamd->cfg->lmtp_host); - if (listen_sock == -1) { - exit(-errno); - } - } - - /* Drop privilleges */ - drop_priv (cfg); /* Set stack size for pcre */ getrlimit(RLIMIT_STACK, &rlim); @@ -589,20 +610,7 @@ main (int argc, char **argv, char **env) modules[i].module_config_func (cfg); } - for (i = 0; i < cfg->workers_number; i++) { - fork_worker (rspamd, listen_sock, TYPE_WORKER); - } - /* Start controller if enabled */ - if (cfg->controller_enabled) { - fork_worker (rspamd, listen_sock, TYPE_CONTROLLER); - } - - /* Start lmtp if enabled */ - if (cfg->lmtp_enable) { - for (i = 0; i < cfg->lmtp_workers_number; i++) { - fork_worker (rspamd, lmtp_listen_sock, TYPE_LMTP); - } - } + spawn_workers (rspamd); /* Signal processing cycle */ for (;;) { @@ -632,7 +640,7 @@ main (int argc, char **argv, char **env) (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid); /* But respawn controller */ if (cur->type == TYPE_CONTROLLER) { - fork_worker (rspamd, listen_sock, TYPE_CONTROLLER); + fork_worker (rspamd, cur->cf); } } else { @@ -646,7 +654,7 @@ main (int argc, char **argv, char **env) (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid); } /* Fork another worker in replace of dead one */ - delay_fork (cur->type); + delay_fork (cur->cf); } g_free (cur); } @@ -662,7 +670,7 @@ main (int argc, char **argv, char **env) TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) { if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP) { /* Start new workers that would reread configuration */ - active_worker = fork_worker (rspamd, listen_sock, cur->type); + active_worker = fork_worker (rspamd, cur->cf); } /* Immideately send termination request to conroller and wait for SIGCHLD */ if (cur->type == TYPE_CONTROLLER) { @@ -690,7 +698,7 @@ main (int argc, char **argv, char **env) } if (got_alarm) { got_alarm = 0; - fork_delayed (rspamd, listen_sock); + fork_delayed (rspamd); } } @@ -705,10 +713,6 @@ main (int argc, char **argv, char **env) msg_info ("main: terminating..."); - if (rspamd->cfg->bind_family == AF_UNIX) { - unlink (rspamd->cfg->bind_host); - } - free_config (rspamd->cfg); g_free (rspamd->cfg); g_free (rspamd); |