summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2009-03-13 18:03:29 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2009-03-13 18:03:29 +0300
commit1085ddb9c09caba4bcb4408135044dc24f7798f3 (patch)
tree88a7073ef0b503557fb28e080a61224b60c18d5a /src
parent89f264624c1c846c995c22a8390b7e79f09ef960 (diff)
downloadrspamd-1085ddb9c09caba4bcb4408135044dc24f7798f3.tar.gz
rspamd-1085ddb9c09caba4bcb4408135044dc24f7798f3.zip
* Fix reload logic
* Create listen sock for lmtp in main processes dispatcher to allow multiply lmtp workers * Fix logic of logging
Diffstat (limited to 'src')
-rw-r--r--src/cfg_file.l46
-rw-r--r--src/controller.c7
-rw-r--r--src/lmtp.c27
-rw-r--r--src/lmtp.h2
-rw-r--r--src/main.c162
-rw-r--r--src/util.c15
-rw-r--r--src/util.h1
-rw-r--r--src/worker.c1
8 files changed, 152 insertions, 109 deletions
diff --git a/src/cfg_file.l b/src/cfg_file.l
index ac2624155..4e8e4bb8d 100644
--- a/src/cfg_file.l
+++ b/src/cfg_file.l
@@ -10,6 +10,7 @@
#define MAX_INCLUDE_DEPTH 10
YY_BUFFER_STATE include_stack[MAX_INCLUDE_DEPTH];
+int line_stack[MAX_INCLUDE_DEPTH];
int include_stack_ptr = 0;
extern struct config_file *cfg;
@@ -101,36 +102,41 @@ yes|YES|no|NO|[yY]|[nN] yylval.flag=parse_flag(yytext); return FLAG;
[a-zA-Z0-9].[a-zA-Z0-9\/.-]+ yylval.string=strdup(yytext); return DOMAINNAME;
<incl>[ \t]* /* eat the whitespace */
<incl>[^ \t\n]+ { /* got the include file name */
- if (include_stack_ptr >= MAX_INCLUDE_DEPTH) {
- yyerror ("yylex: includes nested too deeply");
- return -1;
- }
+ /* got the include file name */
+ if ( include_stack_ptr >= MAX_INCLUDE_DEPTH ) {
+ yyerror ("yylex: includes nested too deeply" );
+ return -1;
+ }
- include_stack[include_stack_ptr++] =
- YY_CURRENT_BUFFER;
+ line_stack[include_stack_ptr] = yylineno;
+ include_stack[include_stack_ptr++] = YY_CURRENT_BUFFER;
- yyin = fopen (yytext, "r");
+ yylineno = 1;
+ yyin = fopen (yytext, "r");
- if (! yyin) {
- yyerror ("yylex: cannot open include file");
+ if (!yyin) {
+ yyerror ("yylex: cannot open include file");
return -1;
}
- yy_switch_to_buffer (yy_create_buffer (yyin, YY_BUF_SIZE));
+ yy_switch_to_buffer (yy_create_buffer (yyin, YY_BUF_SIZE));
- BEGIN(INITIAL);
- }
+ BEGIN(INITIAL);
+}
<<EOF>> {
- if ( --include_stack_ptr < 0 ) {
+ if ( --include_stack_ptr < 0 ) {
+ include_stack_ptr = 0;
+ yylineno = 1;
post_load_config (cfg);
- yyterminate ();
- }
- else {
- yy_delete_buffer (YY_CURRENT_BUFFER);
- yy_switch_to_buffer (include_stack[include_stack_ptr]);
- }
- }
+ yyterminate ();
+ }
+ else {
+ yy_delete_buffer (YY_CURRENT_BUFFER);
+ yy_switch_to_buffer (include_stack[include_stack_ptr] );
+ yylineno = line_stack[include_stack_ptr];
+ }
+}
<module>\n /* ignore EOL */;
<module>[ \t]+ /* ignore whitespace */;
diff --git a/src/controller.c b/src/controller.c
index 26c773cff..76caab0df 100644
--- a/src/controller.c
+++ b/src/controller.c
@@ -90,11 +90,11 @@ sigusr_handler (int fd, short what, void *arg)
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
/* Do not accept new connections, preparing to end worker's process */
struct timeval tv;
- tv.tv_sec = SOFT_SHUTDOWN_TIME;
+ tv.tv_sec = 2;
tv.tv_usec = 0;
event_del (&worker->sig_ev);
event_del (&worker->bind_ev);
- msg_info ("controller's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+ msg_info ("controller's shutdown is pending in %d sec", 2);
event_loopexit (&tv);
return;
}
@@ -568,6 +568,9 @@ start_controller (struct rspamd_worker *worker)
io_tv.tv_usec = 0;
event_loop (0);
+ close (listen_sock);
+
+ exit (EXIT_SUCCESS);
}
diff --git a/src/lmtp.c b/src/lmtp.c
index 276aad0cb..a8f10ee2c 100644
--- a/src/lmtp.c
+++ b/src/lmtp.c
@@ -62,7 +62,7 @@ sigusr_handler (int fd, short what, void *arg)
event_del (&worker->sig_ev);
event_del (&worker->bind_ev);
do_reopen_log = 1;
- msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+ msg_info ("lmtp worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
event_loopexit (&tv);
return;
}
@@ -244,11 +244,10 @@ accept_socket (int fd, short what, void *arg)
* Start lmtp worker process
*/
void
-start_lmtp_worker (struct rspamd_worker *worker)
+start_lmtp_worker (struct rspamd_worker *worker, int listen_sock)
{
struct sigaction signals;
- int listen_sock, i;
- struct sockaddr_un *un_addr;
+ int i;
char *hostbuf;
long int hostmax;
@@ -264,25 +263,6 @@ start_lmtp_worker (struct rspamd_worker *worker)
signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
signal_add (&worker->sig_ev, NULL);
- /* Create listen socket */
- if (worker->srv->cfg->lmtp_family == AF_INET) {
- if ((listen_sock = make_tcp_socket (&worker->srv->cfg->lmtp_addr, worker->srv->cfg->lmtp_port, TRUE)) == -1) {
- msg_err ("start_lmtp: cannot create tcp listen socket. %s", strerror (errno));
- exit(-errno);
- }
- }
- else {
- un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un));
- if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->lmtp_host, un_addr, TRUE)) == -1) {
- msg_err ("start_lmtp: cannot create unix listen socket. %s", strerror (errno));
- exit(-errno);
- }
- }
-
- if (listen (listen_sock, -1) == -1) {
- msg_err ("start_lmtp: cannot listen on socket. %s", strerror (errno));
- exit(-errno);
- }
/* Accept event */
event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
event_add(&worker->bind_ev, NULL);
@@ -306,6 +286,7 @@ start_lmtp_worker (struct rspamd_worker *worker)
io_tv.tv_usec = 0;
event_loop (0);
+ exit (EXIT_SUCCESS);
}
/*
diff --git a/src/lmtp.h b/src/lmtp.h
index d7c13c497..b784eed3b 100644
--- a/src/lmtp.h
+++ b/src/lmtp.h
@@ -15,6 +15,6 @@
#define LMTP_NO_RCPT 554
#define LMTP_TEMP_FAIL 421
-void start_lmtp_worker (struct rspamd_worker *worker);
+void start_lmtp_worker (struct rspamd_worker *worker, int listen_sock);
#endif
diff --git a/src/main.c b/src/main.c
index b3a36da98..16af301dc 100644
--- a/src/main.c
+++ b/src/main.c
@@ -35,7 +35,7 @@
struct config_file *cfg;
static void sig_handler (int );
-static struct rspamd_worker * fork_worker (struct rspamd_main *, int, int, enum process_type);
+static struct rspamd_worker * fork_worker (struct rspamd_main *, int, enum process_type);
sig_atomic_t do_restart;
sig_atomic_t do_terminate;
@@ -203,45 +203,52 @@ config_logger (struct rspamd_main *rspamd, gboolean is_fatal)
}
}
-static struct rspamd_worker *
-fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum process_type type)
+static void
+reread_config (struct rspamd_main *rspamd)
{
- struct rspamd_worker *cur;
+ struct config_file *tmp_cfg;
char *cfg_file;
FILE *f;
- struct config_file *tmp_cfg;
- /* Starting worker process */
- cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker));
- if (cur) {
- /* Reconfig needed */
- if (reconfig) {
- tmp_cfg = (struct config_file *) g_malloc (sizeof (struct config_file));
- if (tmp_cfg) {
- bzero (tmp_cfg, sizeof (struct config_file));
- tmp_cfg->cfg_pool = memory_pool_new (32768);
- cfg_file = memory_pool_strdup (tmp_cfg->cfg_pool, rspamd->cfg->cfg_name);
- f = fopen (rspamd->cfg->cfg_name , "r");
- if (f == NULL) {
- msg_warn ("fork_worker: cannot open file: %s", rspamd->cfg->cfg_name );
- }
- else {
- yyin = f;
- yyrestart (yyin);
- if (yyparse() != 0 || yynerrs > 0) {
- msg_warn ("fork_worker: yyparse: cannot parse config file, %d errors", yynerrs);
- fclose (f);
- }
- else {
- free_config (rspamd->cfg);
- g_free (rspamd->cfg);
- rspamd->cfg = tmp_cfg;
- rspamd->cfg->cfg_name = cfg_file;
- config_logger (rspamd, FALSE);
- }
- }
+ tmp_cfg = (struct config_file *) g_malloc (sizeof (struct config_file));
+ if (tmp_cfg) {
+ bzero (tmp_cfg, sizeof (struct config_file));
+ tmp_cfg->cfg_pool = memory_pool_new (memory_pool_get_size ());
+ init_defaults (tmp_cfg);
+ cfg_file = memory_pool_strdup (tmp_cfg->cfg_pool, rspamd->cfg->cfg_name);
+ f = fopen (rspamd->cfg->cfg_name , "r");
+ if (f == NULL) {
+ msg_warn ("reread_config: cannot open file: %s", rspamd->cfg->cfg_name );
+ }
+ else {
+ yyin = f;
+ yyrestart (yyin);
+
+ if (yyparse() != 0 || yynerrs > 0) {
+ msg_warn ("reread_config: yyparse: cannot parse config file, %d errors", yynerrs);
+ fclose (f);
+ }
+ else {
+ msg_debug ("reread_config: replacing config");
+ free_config (rspamd->cfg);
+ close_log (rspamd->cfg);
+ g_free (rspamd->cfg);
+ rspamd->cfg = tmp_cfg;
+ rspamd->cfg->cfg_name = cfg_file;
+ config_logger (rspamd, FALSE);
+ msg_info ("reread_config: config rereaded successfully");
}
}
+ }
+}
+
+static struct rspamd_worker *
+fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type)
+{
+ struct rspamd_worker *cur;
+ /* Starting worker process */
+ cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker));
+ if (cur) {
bzero (cur, sizeof (struct rspamd_worker));
TAILQ_INSERT_HEAD (&rspamd->workers, cur, next);
cur->srv = rspamd;
@@ -261,7 +268,7 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum pro
setproctitle ("lmtp process");
pidfile_close (rspamd->pfh);
msg_info ("fork_worker: starting lmtp process %d", getpid ());
- start_lmtp_worker (cur);
+ start_lmtp_worker (cur, listen_sock);
case TYPE_WORKER:
default:
setproctitle ("worker process");
@@ -297,20 +304,46 @@ fork_delayed (struct rspamd_main *rspamd, int listen_sock)
while (workers_pending != NULL) {
cur = workers_pending;
workers_pending = g_list_remove_link (workers_pending, cur);
- fork_worker (rspamd, listen_sock, 0, GPOINTER_TO_INT (cur->data));
+ fork_worker (rspamd, listen_sock, GPOINTER_TO_INT (cur->data));
g_list_free_1 (cur);
}
}
+static int
+create_listen_socket (struct in_addr *addr, int port, int family, char *path)
+{
+ int listen_sock = -1;
+ struct sockaddr_un *un_addr;
+ /* Create listen socket */
+ if (family == AF_INET) {
+ if ((listen_sock = make_tcp_socket (addr, port, TRUE)) == -1) {
+ msg_err ("create_listen_socket: cannot create tcp listen socket. %s", strerror (errno));
+ }
+ }
+ else {
+ un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un));
+ if (!un_addr || (listen_sock = make_unix_socket (path, un_addr, TRUE)) == -1) {
+ msg_err ("create_listen_socket: cannot create unix listen socket. %s", strerror (errno));
+ }
+ }
+
+ if (listen_sock != -1) {
+ if (listen (listen_sock, -1) == -1) {
+ msg_err ("start_lmtp: cannot listen on socket. %s", strerror (errno));
+ }
+ }
+
+ return listen_sock;
+}
+
int
main (int argc, char **argv, char **env)
{
struct rspamd_main *rspamd;
struct module_ctx *cur_module = NULL;
- int res = 0, i, listen_sock;
+ int res = 0, i, listen_sock, lmtp_listen_sock;
struct sigaction signals;
struct rspamd_worker *cur, *cur_tmp, *active_worker;
- struct sockaddr_un *un_addr;
FILE *f;
pid_t wrk;
char *args[] = { "", "-e", "0", NULL };
@@ -379,31 +412,26 @@ main (int argc, char **argv, char **env)
}
/* Create listen socket */
- if (rspamd->cfg->bind_family == AF_INET) {
- if ((listen_sock = make_tcp_socket (&rspamd->cfg->bind_addr, rspamd->cfg->bind_port, TRUE)) == -1) {
- msg_err ("main: cannot create tcp listen socket. %s", strerror (errno));
- exit(-errno);
- }
+ listen_sock = create_listen_socket (&rspamd->cfg->bind_addr, rspamd->cfg->bind_port,
+ rspamd->cfg->bind_family, rspamd->cfg->bind_host);
+ if (listen_sock == -1) {
+ exit(-errno);
}
- else {
- un_addr = (struct sockaddr_un *) g_malloc (sizeof (struct sockaddr_un));
- if (!un_addr || (listen_sock = make_unix_socket (rspamd->cfg->bind_host, un_addr, TRUE)) == -1) {
- msg_err ("main: cannot create unix listen socket. %s", strerror (errno));
+
+ if (cfg->lmtp_enable) {
+ lmtp_listen_sock = create_listen_socket (&rspamd->cfg->lmtp_addr, rspamd->cfg->lmtp_port,
+ rspamd->cfg->lmtp_family, rspamd->cfg->lmtp_host);
+ if (listen_sock == -1) {
exit(-errno);
}
}
- if (listen (listen_sock, -1) == -1) {
- msg_err ("main: cannot listen on socket. %s", strerror (errno));
- exit(-errno);
- }
-
/* Drop privilleges */
drop_priv (cfg);
config_logger (rspamd, TRUE);
- msg_info ("main: starting...");
+ msg_info ("main: rspamd "RVERSION " is starting");
rspamd->cfg->cfg_name = memory_pool_strdup (rspamd->cfg->cfg_pool, rspamd->cfg->cfg_name );
/* Strictly set temp dir */
@@ -464,17 +492,17 @@ main (int argc, char **argv, char **env)
rspamd->statfile_pool = statfile_pool_new (cfg->max_statfile_size);
for (i = 0; i < cfg->workers_number; i++) {
- fork_worker (rspamd, listen_sock, 0, TYPE_WORKER);
+ fork_worker (rspamd, listen_sock, TYPE_WORKER);
}
/* Start controller if enabled */
if (cfg->controller_enabled) {
- fork_worker (rspamd, listen_sock, 0, TYPE_CONTROLLER);
+ fork_worker (rspamd, listen_sock, TYPE_CONTROLLER);
}
/* Start lmtp if enabled */
if (cfg->lmtp_enable) {
for (i = 0; i < cfg->lmtp_workers_number; i++) {
- fork_worker (rspamd, listen_sock, 0, TYPE_LMTP);
+ fork_worker (rspamd, lmtp_listen_sock, TYPE_LMTP);
}
}
@@ -504,6 +532,10 @@ main (int argc, char **argv, char **env)
/* Normal worker termination, do not fork one more */
msg_info ("main: %s process %d terminated normally",
(cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
+ /* But respawn controller */
+ if (cur->type == TYPE_CONTROLLER) {
+ fork_worker (rspamd, listen_sock, TYPE_CONTROLLER);
+ }
}
else {
if (WIFSIGNALED (res)) {
@@ -524,10 +556,22 @@ main (int argc, char **argv, char **env)
}
if (do_restart) {
do_restart = 0;
+ do_reopen_log = 1;
+ msg_info ("main: rspamd "RVERSION " is restarting");
if (active_worker == NULL) {
- /* Start new worker that would reread configuration*/
- active_worker = fork_worker (rspamd, listen_sock, 1, TYPE_WORKER);
+ /* reread_config (rspamd); */
+ TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
+ if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP) {
+ /* Start new workers that would reread configuration */
+ active_worker = fork_worker (rspamd, listen_sock, cur->type);
+ }
+ /* Immideately send termination request to conroller and wait for SIGCHLD */
+ if (cur->type == TYPE_CONTROLLER) {
+ kill (cur->pid, SIGUSR2);
+ cur->is_dying = 1;
+ }
+ }
}
/* Do not start new workers until active worker is not ready for accept */
}
@@ -537,7 +581,7 @@ main (int argc, char **argv, char **env)
if (active_worker != NULL) {
msg_info ("main: worker process %d has been successfully started", active_worker->pid);
TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
- if (cur != active_worker && !cur->is_dying && cur->type == TYPE_WORKER) {
+ if (cur != active_worker && !cur->is_dying && cur->type != TYPE_CONTROLLER) {
/* Send to old workers SIGUSR2 */
kill (cur->pid, SIGUSR2);
cur->is_dying = 1;
diff --git a/src/util.c b/src/util.c
index a60bafea6..62d656140 100644
--- a/src/util.c
+++ b/src/util.c
@@ -825,14 +825,13 @@ open_log (struct config_file *cfg)
}
}
-int
-reopen_log (struct config_file *cfg)
+void
+close_log (struct config_file *cfg)
{
- do_reopen_log = 0;
switch (cfg->log_type) {
case RSPAMD_LOG_CONSOLE:
/* Do nothing with console */
- return 0;
+ break;
case RSPAMD_LOG_SYSLOG:
closelog ();
break;
@@ -840,6 +839,14 @@ reopen_log (struct config_file *cfg)
close (cfg->log_fd);
break;
}
+
+}
+
+int
+reopen_log (struct config_file *cfg)
+{
+ do_reopen_log = 0;
+ close_log (cfg);
return open_log (cfg);
}
diff --git a/src/util.h b/src/util.h
index 7267a719b..fa47f21e1 100644
--- a/src/util.h
+++ b/src/util.h
@@ -48,6 +48,7 @@ int pidfile_remove(struct pidfh *pfh);
#endif
int open_log (struct config_file *cfg);
+void close_log (struct config_file *cfg);
int reopen_log (struct config_file *cfg);
void syslog_log_function (const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer arg);
void file_log_function (const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer arg);
diff --git a/src/worker.c b/src/worker.c
index ee10d9a84..457395a97 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -297,6 +297,7 @@ start_worker (struct rspamd_worker *worker, int listen_sock)
kill (getppid (), SIGUSR2);
event_loop (0);
+ exit (EXIT_SUCCESS);
}
/*