summaryrefslogtreecommitdiffstats
path: root/src/main.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-06-17 19:31:48 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-06-17 19:31:48 +0400
commitbca226772e9747a4587866a50122d4a8f7973b26 (patch)
treeaae459617c9b3a7a82dd0b9e2a8b03be11e3ff52 /src/main.c
parent453ecf68e3b51941944dbc3b1dece11342be3810 (diff)
downloadrspamd-bca226772e9747a4587866a50122d4a8f7973b26.tar.gz
rspamd-bca226772e9747a4587866a50122d4a8f7973b26.zip
* Introduce new system of workers spawning and configuring, now rspamd can be easily extended by new types of wrokers
* Rework config system and avoid from using queue (3) lists * Upgrade version to 0.2.0 as config format is now incompatible with older one
Diffstat (limited to 'src/main.c')
-rw-r--r--src/main.c134
1 files changed, 69 insertions, 65 deletions
diff --git a/src/main.c b/src/main.c
index 9b8992d8b..dc0226680 100644
--- a/src/main.c
+++ b/src/main.c
@@ -51,7 +51,7 @@ struct config_file *cfg;
rspamd_hash_t *counters;
static void sig_handler (int );
-static struct rspamd_worker * fork_worker (struct rspamd_main *, int, enum process_type);
+static struct rspamd_worker * fork_worker (struct rspamd_main *, struct worker_conf *);
sig_atomic_t do_restart;
sig_atomic_t do_terminate;
@@ -269,7 +269,7 @@ reread_config (struct rspamd_main *rspamd)
}
static struct rspamd_worker *
-fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type)
+fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
{
struct rspamd_worker *cur;
/* Starting worker process */
@@ -278,12 +278,14 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type
bzero (cur, sizeof (struct rspamd_worker));
TAILQ_INSERT_HEAD (&rspamd->workers, cur, next);
cur->srv = rspamd;
- cur->type = type;
+ cur->type = cf->type;
cur->pid = fork();
+ cur->cf = cf;
switch (cur->pid) {
case 0:
- /* TODO: add worker code */
- switch (type) {
+ /* Drop privilleges */
+ drop_priv (cfg);
+ switch (cf->type) {
case TYPE_CONTROLLER:
setproctitle ("controller process");
pidfile_close (rspamd->pfh);
@@ -294,13 +296,13 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type
setproctitle ("lmtp process");
pidfile_close (rspamd->pfh);
msg_info ("fork_worker: starting lmtp process %d", getpid ());
- start_lmtp_worker (cur, listen_sock);
+ start_lmtp_worker (cur);
case TYPE_WORKER:
default:
setproctitle ("worker process");
pidfile_close (rspamd->pfh);
msg_info ("fork_worker: starting worker process %d", getpid ());
- start_worker (cur, listen_sock);
+ start_worker (cur);
break;
}
break;
@@ -316,35 +318,27 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type
}
static void
-delay_fork (enum process_type type)
+delay_fork (struct worker_conf *cf)
{
- workers_pending = g_list_prepend (workers_pending, GINT_TO_POINTER (type));
+ workers_pending = g_list_prepend (workers_pending, cf);
(void)alarm (SOFT_FORK_TIME);
}
-static void
-fork_delayed (struct rspamd_main *rspamd, int listen_sock)
-{
- GList *cur;
-
- while (workers_pending != NULL) {
- cur = workers_pending;
- workers_pending = g_list_remove_link (workers_pending, cur);
- fork_worker (rspamd, listen_sock, GPOINTER_TO_INT (cur->data));
- g_list_free_1 (cur);
- }
-}
static void
dump_module_variables (gpointer key, gpointer value, gpointer data)
-{
- LIST_HEAD (moduleoptq, module_opt) *cur_module_opt = (struct moduleoptq *)value;
- struct module_opt *cur, *tmp;
+{
+ GList *cur_opt;
+ struct module_opt *cur;
+
+ cur_opt = (GList *)value;
- LIST_FOREACH_SAFE (cur, cur_module_opt, next, tmp) {
+ while (cur_opt) {
+ cur = cur_opt->data;
if (cur->value) {
printf ("$%s = \"%s\"\n", cur->param, cur->value);
}
+ cur_opt = g_list_next (cur_opt);
}
}
@@ -388,12 +382,56 @@ create_listen_socket (struct in_addr *addr, int port, int family, char *path)
return listen_sock;
}
+static void
+fork_delayed (struct rspamd_main *rspamd)
+{
+ GList *cur;
+ struct worker_conf *cf;
+
+ while (workers_pending != NULL) {
+ cur = workers_pending;
+ cf = cur->data;
+
+ workers_pending = g_list_remove_link (workers_pending, cur);
+ fork_worker (rspamd, cf);
+ g_list_free_1 (cur);
+ }
+}
+
+static void
+spawn_workers (struct rspamd_main *rspamd)
+{
+ GList *cur;
+ struct worker_conf *cf;
+ int i, listen_sock;
+
+ cur = cfg->workers;
+
+ while (cur) {
+ cf = cur->data;
+
+ /* Create listen socket */
+ listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port,
+ cf->bind_family, cf->bind_host);
+ if (listen_sock == -1) {
+ exit(-errno);
+ }
+ cf->listen_sock = listen_sock;
+
+ for (i = 0; i < cf->count; i++) {
+ fork_worker (rspamd, cf);
+ }
+
+ cur = g_list_next (cur);
+ }
+}
+
int
main (int argc, char **argv, char **env)
{
struct rspamd_main *rspamd;
struct module_ctx *cur_module = NULL;
- int res = 0, i, listen_sock, lmtp_listen_sock;
+ int res = 0, i;
struct sigaction signals;
struct rspamd_worker *cur, *cur_tmp, *active_worker;
struct rlimit rlim;
@@ -498,23 +536,6 @@ main (int argc, char **argv, char **env)
}
- /* Create listen socket */
- listen_sock = create_listen_socket (&rspamd->cfg->bind_addr, rspamd->cfg->bind_port,
- rspamd->cfg->bind_family, rspamd->cfg->bind_host);
- if (listen_sock == -1) {
- exit(-errno);
- }
-
- if (cfg->lmtp_enable) {
- lmtp_listen_sock = create_listen_socket (&rspamd->cfg->lmtp_addr, rspamd->cfg->lmtp_port,
- rspamd->cfg->lmtp_family, rspamd->cfg->lmtp_host);
- if (listen_sock == -1) {
- exit(-errno);
- }
- }
-
- /* Drop privilleges */
- drop_priv (cfg);
/* Set stack size for pcre */
getrlimit(RLIMIT_STACK, &rlim);
@@ -589,20 +610,7 @@ main (int argc, char **argv, char **env)
modules[i].module_config_func (cfg);
}
- for (i = 0; i < cfg->workers_number; i++) {
- fork_worker (rspamd, listen_sock, TYPE_WORKER);
- }
- /* Start controller if enabled */
- if (cfg->controller_enabled) {
- fork_worker (rspamd, listen_sock, TYPE_CONTROLLER);
- }
-
- /* Start lmtp if enabled */
- if (cfg->lmtp_enable) {
- for (i = 0; i < cfg->lmtp_workers_number; i++) {
- fork_worker (rspamd, lmtp_listen_sock, TYPE_LMTP);
- }
- }
+ spawn_workers (rspamd);
/* Signal processing cycle */
for (;;) {
@@ -632,7 +640,7 @@ main (int argc, char **argv, char **env)
(cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
/* But respawn controller */
if (cur->type == TYPE_CONTROLLER) {
- fork_worker (rspamd, listen_sock, TYPE_CONTROLLER);
+ fork_worker (rspamd, cur->cf);
}
}
else {
@@ -646,7 +654,7 @@ main (int argc, char **argv, char **env)
(cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
}
/* Fork another worker in replace of dead one */
- delay_fork (cur->type);
+ delay_fork (cur->cf);
}
g_free (cur);
}
@@ -662,7 +670,7 @@ main (int argc, char **argv, char **env)
TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP) {
/* Start new workers that would reread configuration */
- active_worker = fork_worker (rspamd, listen_sock, cur->type);
+ active_worker = fork_worker (rspamd, cur->cf);
}
/* Immideately send termination request to conroller and wait for SIGCHLD */
if (cur->type == TYPE_CONTROLLER) {
@@ -690,7 +698,7 @@ main (int argc, char **argv, char **env)
}
if (got_alarm) {
got_alarm = 0;
- fork_delayed (rspamd, listen_sock);
+ fork_delayed (rspamd);
}
}
@@ -705,10 +713,6 @@ main (int argc, char **argv, char **env)
msg_info ("main: terminating...");
- if (rspamd->cfg->bind_family == AF_UNIX) {
- unlink (rspamd->cfg->bind_host);
- }
-
free_config (rspamd->cfg);
g_free (rspamd->cfg);
g_free (rspamd);