]> source.dussan.org Git - rspamd.git/commitdiff
* Fix reload logic
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 13 Mar 2009 15:03:29 +0000 (18:03 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 13 Mar 2009 15:03:29 +0000 (18:03 +0300)
* Create listen sock for lmtp in main processes dispatcher to allow multiply lmtp workers
* Fix logic of logging

src/cfg_file.l
src/controller.c
src/lmtp.c
src/lmtp.h
src/main.c
src/util.c
src/util.h
src/worker.c

index ac26241558d12079926a84780bbcc2abd0c1454d..4e8e4bb8dbce567838d9cf49b261121d97f59e9a 100644 (file)
@@ -10,6 +10,7 @@
 
 #define MAX_INCLUDE_DEPTH 10
 YY_BUFFER_STATE include_stack[MAX_INCLUDE_DEPTH];
+int line_stack[MAX_INCLUDE_DEPTH];
 int include_stack_ptr = 0;
 extern struct config_file *cfg;
 
@@ -101,36 +102,41 @@ yes|YES|no|NO|[yY]|[nN]                   yylval.flag=parse_flag(yytext); return FLAG;
 [a-zA-Z0-9].[a-zA-Z0-9\/.-]+   yylval.string=strdup(yytext); return DOMAINNAME;
 <incl>[ \t]*      /* eat the whitespace */
 <incl>[^ \t\n]+   { /* got the include file name */
-        if (include_stack_ptr >= MAX_INCLUDE_DEPTH) {
-            yyerror ("yylex: includes nested too deeply");
-            return -1;
-        }
+               /* got the include file name */
+               if ( include_stack_ptr >= MAX_INCLUDE_DEPTH ) {
+                       yyerror ("yylex: includes nested too deeply" );
+                       return -1;
+               }
 
-        include_stack[include_stack_ptr++] =
-            YY_CURRENT_BUFFER;
+               line_stack[include_stack_ptr] = yylineno;
+               include_stack[include_stack_ptr++] = YY_CURRENT_BUFFER;
 
-        yyin = fopen (yytext, "r");
+               yylineno = 1;
+               yyin = fopen (yytext, "r");
 
-        if (! yyin) {
-            yyerror ("yylex: cannot open include file");
+               if (!yyin) {
+                       yyerror ("yylex: cannot open include file");
                        return -1;
                }
 
-        yy_switch_to_buffer (yy_create_buffer (yyin, YY_BUF_SIZE));
+               yy_switch_to_buffer (yy_create_buffer (yyin, YY_BUF_SIZE));
 
-        BEGIN(INITIAL);
-        }
+               BEGIN(INITIAL);
+}
 
 <<EOF>> {
-        if ( --include_stack_ptr < 0 ) {
+               if ( --include_stack_ptr < 0 ) {
+                       include_stack_ptr = 0;
+                       yylineno = 1;
                        post_load_config (cfg);
-            yyterminate ();
-        }
-        else {
-            yy_delete_buffer (YY_CURRENT_BUFFER);
-            yy_switch_to_buffer (include_stack[include_stack_ptr]);
-            }
-        }
+                       yyterminate ();
+               }
+               else {
+                       yy_delete_buffer (YY_CURRENT_BUFFER);
+                       yy_switch_to_buffer (include_stack[include_stack_ptr] );
+                       yylineno = line_stack[include_stack_ptr];
+               }
+}
 
 <module>\n                                                             /* ignore EOL */;
 <module>[ \t]+                                                 /* ignore whitespace */;
index 26c773cffb96df21b3dff8830517997f7a212d36..76caab0dfc1e0c1e0b268051f9d39d6beaaa49ad 100644 (file)
@@ -90,11 +90,11 @@ sigusr_handler (int fd, short what, void *arg)
        struct rspamd_worker *worker = (struct rspamd_worker *)arg;
        /* Do not accept new connections, preparing to end worker's process */
        struct timeval tv;
-       tv.tv_sec = SOFT_SHUTDOWN_TIME;
+       tv.tv_sec = 2;
        tv.tv_usec = 0;
        event_del (&worker->sig_ev);
        event_del (&worker->bind_ev);
-       msg_info ("controller's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+       msg_info ("controller's shutdown is pending in %d sec", 2);
        event_loopexit (&tv);
        return;
 }
@@ -568,6 +568,9 @@ start_controller (struct rspamd_worker *worker)
        io_tv.tv_usec = 0;
 
        event_loop (0);
+       close (listen_sock);
+
+       exit (EXIT_SUCCESS);
 }
 
 
index 276aad0cbebdf8d0a6a546c42b84925ab37950cf..a8f10ee2c7255a1cccf7bac715658ec64675b056 100644 (file)
@@ -62,7 +62,7 @@ sigusr_handler (int fd, short what, void *arg)
        event_del (&worker->sig_ev);
        event_del (&worker->bind_ev);
        do_reopen_log = 1;
-       msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+       msg_info ("lmtp worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
        event_loopexit (&tv);
        return;
 }
@@ -244,11 +244,10 @@ accept_socket (int fd, short what, void *arg)
  * Start lmtp worker process
  */
 void
-start_lmtp_worker (struct rspamd_worker *worker)
+start_lmtp_worker (struct rspamd_worker *worker, int listen_sock)
 {
        struct sigaction signals;
-       int listen_sock, i;
-       struct sockaddr_un *un_addr;
+       int i;
        char *hostbuf;
        long int hostmax;
 
@@ -264,25 +263,6 @@ start_lmtp_worker (struct rspamd_worker *worker)
        signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *) worker);
        signal_add (&worker->sig_ev, NULL);
        
-       /* Create listen socket */
-       if (worker->srv->cfg->lmtp_family == AF_INET) {
-               if ((listen_sock = make_tcp_socket (&worker->srv->cfg->lmtp_addr, worker->srv->cfg->lmtp_port, TRUE)) == -1) {
-                       msg_err ("start_lmtp: cannot create tcp listen socket. %s", strerror (errno));
-                       exit(-errno);
-               }
-       }
-       else {
-               un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un));
-               if (!un_addr || (listen_sock = make_unix_socket (worker->srv->cfg->lmtp_host, un_addr, TRUE)) == -1) {
-                       msg_err ("start_lmtp: cannot create unix listen socket. %s", strerror (errno));
-                       exit(-errno);
-               }
-       }
-       
-       if (listen (listen_sock, -1) == -1) {
-               msg_err ("start_lmtp: cannot listen on socket. %s", strerror (errno));
-               exit(-errno);
-       }
        /* Accept event */
        event_set(&worker->bind_ev, listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
        event_add(&worker->bind_ev, NULL);
@@ -306,6 +286,7 @@ start_lmtp_worker (struct rspamd_worker *worker)
        io_tv.tv_usec = 0;
 
        event_loop (0);
+       exit (EXIT_SUCCESS);
 }
 
 /* 
index d7c13c497296104311ae67c24bb0332877c1578f..b784eed3b014329ee19b0be8dced3587578abaa9 100644 (file)
@@ -15,6 +15,6 @@
 #define LMTP_NO_RCPT        554
 #define LMTP_TEMP_FAIL      421
 
-void start_lmtp_worker (struct rspamd_worker *worker);
+void start_lmtp_worker (struct rspamd_worker *worker, int listen_sock);
 
 #endif
index b3a36da9848fee44fba40c92bfc89cdf76886097..16af301dcc066b29d5541e665186517c556e0c6a 100644 (file)
@@ -35,7 +35,7 @@
 struct config_file *cfg;
 
 static void sig_handler (int );
-static struct rspamd_worker * fork_worker (struct rspamd_main *, int, int, enum process_type);
+static struct rspamd_worker * fork_worker (struct rspamd_main *, int, enum process_type);
        
 sig_atomic_t do_restart;
 sig_atomic_t do_terminate;
@@ -203,45 +203,52 @@ config_logger (struct rspamd_main *rspamd, gboolean is_fatal)
        }
 }
 
-static struct rspamd_worker *
-fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum process_type type) 
+static void
+reread_config (struct rspamd_main *rspamd)
 {
-       struct rspamd_worker *cur;
+       struct config_file *tmp_cfg;
        char *cfg_file;
        FILE *f;
-       struct config_file *tmp_cfg;
-       /* Starting worker process */
-       cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker));
-       if (cur) {
-               /* Reconfig needed */
-               if (reconfig) {
-                       tmp_cfg = (struct config_file *) g_malloc (sizeof (struct config_file));
-                       if (tmp_cfg) {
-                               bzero (tmp_cfg, sizeof (struct config_file));
-                               tmp_cfg->cfg_pool = memory_pool_new (32768);
-                               cfg_file = memory_pool_strdup (tmp_cfg->cfg_pool, rspamd->cfg->cfg_name);
-                               f = fopen (rspamd->cfg->cfg_name , "r");
-                               if (f == NULL) {
-                                       msg_warn ("fork_worker: cannot open file: %s", rspamd->cfg->cfg_name );
-                               }
-                               else {
-                                       yyin = f;
-                                       yyrestart (yyin);
 
-                                       if (yyparse() != 0 || yynerrs > 0) {
-                                               msg_warn ("fork_worker: yyparse: cannot parse config file, %d errors", yynerrs);
-                                               fclose (f);
-                                       }
-                                       else {
-                                               free_config (rspamd->cfg);
-                                               g_free (rspamd->cfg);
-                                               rspamd->cfg = tmp_cfg;
-                                               rspamd->cfg->cfg_name = cfg_file;
-                                               config_logger (rspamd, FALSE);
-                                       }
-                               }
+       tmp_cfg = (struct config_file *) g_malloc (sizeof (struct config_file));
+       if (tmp_cfg) {
+               bzero (tmp_cfg, sizeof (struct config_file));
+               tmp_cfg->cfg_pool = memory_pool_new (memory_pool_get_size ());
+               init_defaults (tmp_cfg);
+               cfg_file = memory_pool_strdup (tmp_cfg->cfg_pool, rspamd->cfg->cfg_name);
+               f = fopen (rspamd->cfg->cfg_name , "r");
+               if (f == NULL) {
+                       msg_warn ("reread_config: cannot open file: %s", rspamd->cfg->cfg_name );
+               }
+               else {
+                       yyin = f;
+                       yyrestart (yyin);
+
+                       if (yyparse() != 0 || yynerrs > 0) {
+                               msg_warn ("reread_config: yyparse: cannot parse config file, %d errors", yynerrs);
+                               fclose (f);
+                       }
+                       else {
+                               msg_debug ("reread_config: replacing config");
+                               free_config (rspamd->cfg);
+                               close_log (rspamd->cfg);
+                               g_free (rspamd->cfg);
+                               rspamd->cfg = tmp_cfg;
+                               rspamd->cfg->cfg_name = cfg_file;
+                               config_logger (rspamd, FALSE);
+                               msg_info ("reread_config: config rereaded successfully");
                        }
                }
+       }
+}
+
+static struct rspamd_worker *
+fork_worker (struct rspamd_main *rspamd, int listen_sock, enum process_type type) 
+{
+       struct rspamd_worker *cur;
+       /* Starting worker process */
+       cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker));
+       if (cur) {
                bzero (cur, sizeof (struct rspamd_worker));
                TAILQ_INSERT_HEAD (&rspamd->workers, cur, next);
                cur->srv = rspamd;
@@ -261,7 +268,7 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum pro
                                                setproctitle ("lmtp process");
                                                pidfile_close (rspamd->pfh);
                                                msg_info ("fork_worker: starting lmtp process %d", getpid ());
-                                               start_lmtp_worker (cur);
+                                               start_lmtp_worker (cur, listen_sock);
                                        case TYPE_WORKER:
                                        default:
                                                setproctitle ("worker process");
@@ -297,20 +304,46 @@ fork_delayed (struct rspamd_main *rspamd, int listen_sock)
        while (workers_pending != NULL) {
                cur = workers_pending;
                workers_pending = g_list_remove_link (workers_pending, cur);
-               fork_worker (rspamd, listen_sock, 0, GPOINTER_TO_INT (cur->data));
+               fork_worker (rspamd, listen_sock, GPOINTER_TO_INT (cur->data));
                g_list_free_1 (cur);
        }
 }
 
+static int
+create_listen_socket (struct in_addr *addr, int port, int family, char *path)
+{
+       int listen_sock = -1;
+       struct sockaddr_un *un_addr;
+       /* Create listen socket */
+       if (family == AF_INET) {
+               if ((listen_sock = make_tcp_socket (addr, port, TRUE)) == -1) {
+                       msg_err ("create_listen_socket: cannot create tcp listen socket. %s", strerror (errno));
+               }
+       }
+       else {
+               un_addr = (struct sockaddr_un *) alloca (sizeof (struct sockaddr_un));
+               if (!un_addr || (listen_sock = make_unix_socket (path, un_addr, TRUE)) == -1) {
+                       msg_err ("create_listen_socket: cannot create unix listen socket. %s", strerror (errno));
+               }
+       }
+       
+       if (listen_sock != -1) {
+               if (listen (listen_sock, -1) == -1) {
+                       msg_err ("start_lmtp: cannot listen on socket. %s", strerror (errno));
+               }
+       }
+
+       return listen_sock;
+}
+
 int 
 main (int argc, char **argv, char **env)
 {
        struct rspamd_main *rspamd;
        struct module_ctx *cur_module = NULL;
-       int res = 0, i, listen_sock;
+       int res = 0, i, listen_sock, lmtp_listen_sock;
        struct sigaction signals;
        struct rspamd_worker *cur, *cur_tmp, *active_worker;
-       struct sockaddr_un *un_addr;
        FILE *f;
        pid_t wrk;
        char *args[] = { "", "-e", "0", NULL };
@@ -379,31 +412,26 @@ main (int argc, char **argv, char **env)
        }
 
        /* Create listen socket */
-       if (rspamd->cfg->bind_family == AF_INET) {
-               if ((listen_sock = make_tcp_socket (&rspamd->cfg->bind_addr, rspamd->cfg->bind_port, TRUE)) == -1) {
-                       msg_err ("main: cannot create tcp listen socket. %s", strerror (errno));
-                       exit(-errno);
-               }
+       listen_sock = create_listen_socket (&rspamd->cfg->bind_addr, rspamd->cfg->bind_port, 
+                                                                               rspamd->cfg->bind_family, rspamd->cfg->bind_host);
+       if (listen_sock == -1) {
+               exit(-errno);
        }
-       else {
-               un_addr = (struct sockaddr_un *) g_malloc (sizeof (struct sockaddr_un));
-               if (!un_addr || (listen_sock = make_unix_socket (rspamd->cfg->bind_host, un_addr, TRUE)) == -1) {
-                       msg_err ("main: cannot create unix listen socket. %s", strerror (errno));
+
+       if (cfg->lmtp_enable) {
+               lmtp_listen_sock = create_listen_socket (&rspamd->cfg->lmtp_addr, rspamd->cfg->lmtp_port, 
+                                                                               rspamd->cfg->lmtp_family, rspamd->cfg->lmtp_host);
+               if (listen_sock == -1) {
                        exit(-errno);
                }
        }
 
-       if (listen (listen_sock, -1) == -1) {
-               msg_err ("main: cannot listen on socket. %s", strerror (errno));
-               exit(-errno);
-       }
-
        /* Drop privilleges */
        drop_priv (cfg);
        
        config_logger (rspamd, TRUE);
 
-       msg_info ("main: starting...");
+       msg_info ("main: rspamd "RVERSION " is starting");
        rspamd->cfg->cfg_name = memory_pool_strdup (rspamd->cfg->cfg_pool, rspamd->cfg->cfg_name );
 
        /* Strictly set temp dir */
@@ -464,17 +492,17 @@ main (int argc, char **argv, char **env)
        rspamd->statfile_pool = statfile_pool_new (cfg->max_statfile_size);
        
        for (i = 0; i < cfg->workers_number; i++) {
-               fork_worker (rspamd, listen_sock, 0, TYPE_WORKER);
+               fork_worker (rspamd, listen_sock, TYPE_WORKER);
        }
        /* Start controller if enabled */
        if (cfg->controller_enabled) {
-               fork_worker (rspamd, listen_sock, 0, TYPE_CONTROLLER);
+               fork_worker (rspamd, listen_sock, TYPE_CONTROLLER);
        }
        
        /* Start lmtp if enabled */
        if (cfg->lmtp_enable) {
                for (i = 0; i < cfg->lmtp_workers_number; i++) {
-                       fork_worker (rspamd, listen_sock, 0, TYPE_LMTP);
+                       fork_worker (rspamd, lmtp_listen_sock, TYPE_LMTP);
                }
        }
 
@@ -504,6 +532,10 @@ main (int argc, char **argv, char **env)
                                                /* Normal worker termination, do not fork one more */
                                                msg_info ("main: %s process %d terminated normally", 
                                                                        (cur->type != TYPE_WORKER) ? "controller" : "worker", cur->pid);
+                                               /* But respawn controller */
+                                               if (cur->type == TYPE_CONTROLLER) {
+                                                       fork_worker (rspamd, listen_sock, TYPE_CONTROLLER);
+                                               }
                                        }
                                        else {
                                                if (WIFSIGNALED (res)) {
@@ -524,10 +556,22 @@ main (int argc, char **argv, char **env)
                }
                if (do_restart) {       
                        do_restart = 0;
+                       do_reopen_log = 1;
 
+                       msg_info ("main: rspamd "RVERSION " is restarting");
                        if (active_worker == NULL) {
-                               /* Start new worker that would reread configuration*/
-                               active_worker = fork_worker (rspamd, listen_sock, 1, TYPE_WORKER);
+                               /* reread_config (rspamd); */
+                               TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
+                                       if (cur->type == TYPE_WORKER || cur->type == TYPE_LMTP) {
+                                               /* Start new workers that would reread configuration */
+                                               active_worker = fork_worker (rspamd, listen_sock, cur->type);
+                                       }
+                                       /* Immideately send termination request to conroller and wait for SIGCHLD */
+                                       if (cur->type == TYPE_CONTROLLER) {
+                                               kill (cur->pid, SIGUSR2);
+                                               cur->is_dying = 1;
+                                       }
+                               }
                        }
                        /* Do not start new workers until active worker is not ready for accept */
                }
@@ -537,7 +581,7 @@ main (int argc, char **argv, char **env)
                        if (active_worker != NULL) {
                                msg_info ("main: worker process %d has been successfully started", active_worker->pid);
                                TAILQ_FOREACH_SAFE (cur, &rspamd->workers, next, cur_tmp) {
-                                       if (cur != active_worker && !cur->is_dying && cur->type == TYPE_WORKER) {
+                                       if (cur != active_worker && !cur->is_dying && cur->type != TYPE_CONTROLLER) {
                                                /* Send to old workers SIGUSR2 */
                                                kill (cur->pid, SIGUSR2);
                                                cur->is_dying = 1;
index a60bafea66927e47c300a3171d9489779bda3798..62d656140cbdd07f7a9d336497b28361e11a09ee 100644 (file)
@@ -825,14 +825,13 @@ open_log (struct config_file *cfg)
        }
 }
 
-int
-reopen_log (struct config_file *cfg)
+void 
+close_log (struct config_file *cfg)
 {
-       do_reopen_log = 0;
        switch (cfg->log_type) {
                case RSPAMD_LOG_CONSOLE:
                        /* Do nothing with console */
-                       return 0;
+                       break;
                case RSPAMD_LOG_SYSLOG:
                        closelog ();
                        break;
@@ -840,6 +839,14 @@ reopen_log (struct config_file *cfg)
                        close (cfg->log_fd);
                        break;
        }
+
+}
+
+int
+reopen_log (struct config_file *cfg)
+{
+       do_reopen_log = 0;
+       close_log (cfg);
        return open_log (cfg);
 }
 
index 7267a719b527c4c5bed653f4cbddb2cae8695192..fa47f21e19bcfb6533d3963416f1693ac9e0c3ab 100644 (file)
@@ -48,6 +48,7 @@ int pidfile_remove(struct pidfh *pfh);
 #endif
 
 int open_log (struct config_file *cfg);
+void close_log (struct config_file *cfg);
 int reopen_log (struct config_file *cfg);
 void syslog_log_function (const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer arg);
 void file_log_function (const gchar *log_domain, GLogLevelFlags log_level, const gchar *message, gpointer arg);
index ee10d9a848f84340ea9315c0e74f7228ba073533..457395a9749468dbc941fa238489efb6656e8186 100644 (file)
@@ -297,6 +297,7 @@ start_worker (struct rspamd_worker *worker, int listen_sock)
        kill (getppid (), SIGUSR2);
 
        event_loop (0);
+       exit (EXIT_SUCCESS);
 }
 
 /*