diff options
-rw-r--r-- | CMakeLists.txt | 1 | ||||
-rw-r--r-- | config.h.in | 2 | ||||
-rw-r--r-- | src/cfg_file.h | 3 | ||||
-rw-r--r-- | src/cfg_file.y | 49 | ||||
-rw-r--r-- | src/cfg_utils.c | 18 | ||||
-rw-r--r-- | src/controller.c | 3 | ||||
-rw-r--r-- | src/fuzzy_storage.c | 18 | ||||
-rw-r--r-- | src/lmtp.c | 3 | ||||
-rw-r--r-- | src/main.c | 137 | ||||
-rw-r--r-- | src/main.h | 3 | ||||
-rw-r--r-- | src/util.c | 115 | ||||
-rw-r--r-- | src/util.h | 5 | ||||
-rw-r--r-- | src/worker.c | 3 |
13 files changed, 218 insertions, 142 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index b4789c28b..528b99c62 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -241,6 +241,7 @@ CHECK_FUNCTION_EXISTS(nanosleep HAVE_NANOSLEEP) CHECK_FUNCTION_EXISTS(vfork HAVE_VFORK) CHECK_FUNCTION_EXISTS(wait4 HAVE_WAIT4) CHECK_FUNCTION_EXISTS(waitpid HAVE_WAITPID) +CHECK_FUNCTION_EXISTS(flock HAVE_FLOCK) CHECK_SYMBOL_EXISTS(PATH_MAX limits.h HAVE_PATH_MAX) CHECK_SYMBOL_EXISTS(MAXPATHLEN sys/param.h HAVE_MAXPATHLEN) diff --git a/config.h.in b/config.h.in index 2c03d5486..b80f399e8 100644 --- a/config.h.in +++ b/config.h.in @@ -92,6 +92,8 @@ #cmakedefine HAVE_WAITPID 1 +#cmakedefine HAVE_FLOCK 1 + #cmakedefine DEBUG_MODE 1 #cmakedefine GMIME24 1 diff --git a/src/cfg_file.h b/src/cfg_file.h index 6a0835f51..b165e82f2 100644 --- a/src/cfg_file.h +++ b/src/cfg_file.h @@ -177,6 +177,8 @@ struct worker_conf { int count; /**< number of workers */ GHashTable *params; /**< params for worker */ int listen_sock; /**< listening socket desctiptor */ + GQueue *active_workers; /**< linked list of spawned workers */ + gboolean has_socket; /**< whether we should make listening socket in main process */ }; /** @@ -328,6 +330,7 @@ void unescape_quotes (char *line); GList* parse_comma_list (memory_pool_t *pool, char *line); struct classifier_config* check_classifier_cfg (struct config_file *cfg, struct classifier_config *c); +struct worker_conf* check_worker_conf (struct config_file *cfg, struct worker_conf *c); int yylex (void); int yyparse (void); diff --git a/src/cfg_file.y b/src/cfg_file.y index 24a94bbc3..8ac9cb6a5 100644 --- a/src/cfg_file.y +++ b/src/cfg_file.y @@ -234,16 +234,7 @@ workercmd: bindsock: BINDSOCK EQSIGN bind_cred { - if (cur_worker == NULL) { - cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf)); - cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal); - memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params); -#ifdef HAVE_SC_NPROCESSORS_ONLN - cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN); -#else - cur_worker->count = DEFAULT_WORKERS_NUM; -#endif - } + cur_worker = check_worker_conf (cfg, cur_worker); if (!parse_bind_line (cfg, cur_worker, $3)) { yyerror ("yyparse: parse_bind_line"); @@ -273,28 +264,22 @@ bind_cred: workertype: TYPE EQSIGN QUOTEDSTRING { - if (cur_worker == NULL) { - cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf)); - cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal); - memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params); -#ifdef HAVE_SC_NPROCESSORS_ONLN - cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN); -#else - cur_worker->count = DEFAULT_WORKERS_NUM; -#endif - } - + cur_worker = check_worker_conf (cfg, cur_worker); if (g_ascii_strcasecmp ($3, "normal") == 0) { cur_worker->type = TYPE_WORKER; + cur_worker->has_socket = TRUE; } else if (g_ascii_strcasecmp ($3, "controller") == 0) { cur_worker->type = TYPE_CONTROLLER; + cur_worker->has_socket = TRUE; } else if (g_ascii_strcasecmp ($3, "lmtp") == 0) { cur_worker->type = TYPE_LMTP; + cur_worker->has_socket = TRUE; } else if (g_ascii_strcasecmp ($3, "fuzzy") == 0) { cur_worker->type = TYPE_FUZZY; + cur_worker->has_socket = FALSE; } else { yyerror ("yyparse: unknown worker type: %s", $3); @@ -305,16 +290,7 @@ workertype: workercount: COUNT EQSIGN NUMBER { - if (cur_worker == NULL) { - cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf)); - cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal); - memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params); -#ifdef HAVE_SC_NPROCESSORS_ONLN - cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN); -#else - cur_worker->count = DEFAULT_WORKERS_NUM; -#endif - } + cur_worker = check_worker_conf (cfg, cur_worker); if ($3 > 0) { cur_worker->count = $3; @@ -328,16 +304,7 @@ workercount: workerparam: STRING EQSIGN QUOTEDSTRING { - if (cur_worker == NULL) { - cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf)); - cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal); - memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params); -#ifdef HAVE_SC_NPROCESSORS_ONLN - cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN); -#else - cur_worker->count = DEFAULT_WORKERS_NUM; -#endif - } + cur_worker = check_worker_conf (cfg, cur_worker); g_hash_table_insert (cur_worker->params, $1, $3); } diff --git a/src/cfg_utils.c b/src/cfg_utils.c index 7b6470028..13a5e091a 100644 --- a/src/cfg_utils.c +++ b/src/cfg_utils.c @@ -623,6 +623,24 @@ check_classifier_cfg (struct config_file *cfg, struct classifier_config *c) return c; } +struct worker_conf * +check_worker_conf (struct config_file *cfg, struct worker_conf *c) +{ + if (c == NULL) { + c = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf)); + c->params = g_hash_table_new (g_str_hash, g_str_equal); + c->active_workers = g_queue_new (); + memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, c->params); + memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_queue_free, c->active_workers); +#ifdef HAVE_SC_NPROCESSORS_ONLN + c->count = sysconf (_SC_NPROCESSORS_ONLN); +#else + c->count = DEFAULT_WORKERS_NUM; +#endif + } + + return c; +} /* * vi:ts=4 */ diff --git a/src/controller.c b/src/controller.c index 21df8d511..64ef75d59 100644 --- a/src/controller.c +++ b/src/controller.c @@ -611,9 +611,6 @@ start_controller (struct rspamd_worker *worker) event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); event_add (&worker->bind_ev, NULL); - /* Send SIGUSR2 to parent */ - kill (getppid (), SIGUSR2); - gperf_profiler_init (worker->srv->cfg, "controller"); event_loop (0); diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 702c5b7c5..68da96b44 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -109,12 +109,21 @@ sync_cache (struct rspamd_worker *wrk) else { expire = DEFAULT_EXPIRE; } + + /* Sync section */ + if ((fd = open (filename, O_WRONLY)) != -1) { + /* Aqquire a lock */ + (void)lock_file (fd, FALSE); + (void)unlock_file (fd, FALSE); + } if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) { msg_err ("sync_cache: cannot create hash file %s: %s", filename, strerror (errno)); return; } + (void)lock_file (fd, FALSE); + now = (uint64_t) time (NULL); for (i = 0; i < BUCKETS; i++) { cur = hashes[i]->head; @@ -136,6 +145,7 @@ sync_cache (struct rspamd_worker *wrk) } } + (void)unlock_file (fd, FALSE); close (fd); } @@ -196,6 +206,8 @@ read_hashes_file (struct rspamd_worker *wrk) return FALSE; } + (void)lock_file (fd, FALSE); + fstat (fd, &st); for (;;) { @@ -208,6 +220,9 @@ read_hashes_file (struct rspamd_worker *wrk) bloom_add (bf, node->h.hash_pipe); } + (void)unlock_file (fd, FALSE); + close (fd); + if (r > 0) { msg_warn ("read_hashes_file: ignore garbadge at the end of file, length of garbadge: %d", r); } @@ -415,9 +430,6 @@ start_fuzzy_storage (struct rspamd_worker *worker) signal_set (&sev, SIGTERM, sigterm_handler, (void *)worker); signal_add (&sev, NULL); - /* Send SIGUSR2 to parent */ - kill (getppid (), SIGUSR2); - /* Init bloom filter */ bf = bloom_create (20000000L, DEFAULT_BLOOM_HASHES); /* Try to read hashes from file */ diff --git a/src/lmtp.c b/src/lmtp.c index 087be2e1a..f9bd6f65e 100644 --- a/src/lmtp.c +++ b/src/lmtp.c @@ -292,9 +292,6 @@ start_lmtp_worker (struct rspamd_worker *worker) hostbuf[hostmax - 1] = '\0'; snprintf (greetingbuf, sizeof (greetingbuf), "%d rspamd version %s LMTP on %s Ready\r\n", LMTP_OK, RVERSION, hostbuf); - /* Send SIGUSR2 to parent */ - kill (getppid (), SIGUSR2); - io_tv.tv_sec = WORKER_IO_TIMEOUT; io_tv.tv_usec = 0; diff --git a/src/main.c b/src/main.c index 702944467..4b811bb36 100644 --- a/src/main.c +++ b/src/main.c @@ -57,7 +57,6 @@ static struct rspamd_worker *fork_worker (struct rspamd_main *, struct worker sig_atomic_t do_restart; sig_atomic_t do_terminate; sig_atomic_t child_dead; -sig_atomic_t child_ready; sig_atomic_t got_alarm; extern int yynerrs; @@ -71,8 +70,6 @@ extern void xs_init (pTHX); extern PerlInterpreter *perl_interpreter; #endif -/* Active workers */ -static GList *active_workers = NULL; /* List of workers that are pending to start */ static GList *workers_pending = NULL; @@ -93,7 +90,7 @@ sig_handler (int signo) child_dead = 1; break; case SIGUSR2: - child_ready = 1; + /* Do nothing */ break; case SIGALRM: got_alarm = 1; @@ -285,7 +282,7 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf) cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker)); if (cur) { bzero (cur, sizeof (struct rspamd_worker)); - active_workers = g_list_prepend (active_workers, cur); + g_queue_push_head (cf->active_workers, cur); cur->srv = rspamd; cur->type = cf->type; cur->pid = fork (); @@ -328,6 +325,10 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf) pidfile_remove (rspamd->pfh); exit (-errno); break; + default: + /* Insert worker into worker's table, pid is index */ + g_hash_table_insert (rspamd->workers, GSIZE_TO_POINTER (cur->pid), cur); + break; } } @@ -416,7 +417,7 @@ fork_delayed (struct rspamd_main *rspamd) } static void -spawn_workers (struct rspamd_main *rspamd) +spawn_workers (struct rspamd_main *rspamd, gboolean make_sockets) { GList *cur; struct worker_conf *cf; @@ -427,8 +428,8 @@ spawn_workers (struct rspamd_main *rspamd) while (cur) { cf = cur->data; - /* Create listen socket */ - if (cf->type != TYPE_FUZZY) { + if (make_sockets && cf->has_socket) { + /* Create listen socket */ listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port, cf->bind_family, cf->bind_host); if (listen_sock == -1) { exit (-errno); @@ -463,6 +464,29 @@ get_process_type (enum process_type type) return NULL; } +static void +kill_old_workers (gpointer key, gpointer value, gpointer unused) +{ + struct rspamd_worker *w = value; + + kill (w->pid, SIGUSR2); + msg_info ("rspamd_restart: send signal to worker %ld", (long int)w->pid); +} + +static gboolean +wait_for_workers (gpointer key, gpointer value, gpointer unused) +{ + struct rspamd_worker *w = value; + int res = 0; + + waitpid (w->pid, &res, 0); + + msg_debug ("main(cleaning): %s process %d terminated", get_process_type (w->type), w->pid); + g_free (w); + + return TRUE; +} + int main (int argc, char **argv, char **env) { @@ -470,7 +494,7 @@ main (int argc, char **argv, char **env) struct module_ctx *cur_module = NULL; int res = 0, i; struct sigaction signals; - struct rspamd_worker *cur, *active_worker; + struct rspamd_worker *cur; struct rlimit rlim; struct metric *metric; struct cache_item *item; @@ -495,9 +519,7 @@ main (int argc, char **argv, char **env) do_terminate = 0; do_restart = 0; child_dead = 0; - child_ready = 0; do_reopen_log = 0; - active_worker = NULL; rspamd->stat = memory_pool_alloc_shared (rspamd->server_pool, sizeof (struct rspamd_stat)); bzero (rspamd->stat, sizeof (struct rspamd_stat)); @@ -704,8 +726,8 @@ main (int argc, char **argv, char **env) l = g_list_next (l); } - - spawn_workers (rspamd); + rspamd->workers = g_hash_table_new (g_direct_hash, g_direct_equal); + spawn_workers (rspamd, TRUE); /* Signal processing cycle */ for (;;) { @@ -714,7 +736,7 @@ main (int argc, char **argv, char **env) sigsuspend (&signals.sa_mask); if (do_terminate) { msg_debug ("main: catch termination signal, waiting for childs"); - pass_signal_worker (active_workers, SIGTERM); + pass_signal_worker (rspamd->workers, SIGTERM); break; } if (child_dead) { @@ -722,35 +744,31 @@ main (int argc, char **argv, char **env) msg_debug ("main: catch SIGCHLD signal, finding terminated worker"); /* Remove dead child form childs list */ wrk = waitpid (0, &res, 0); - l = g_list_first (active_workers); - while (l) { - cur = l->data; - if (wrk == cur->pid) { - /* Catch situations if active worker is abnormally terminated */ - if (cur == active_worker) { - active_worker = NULL; - } - active_workers = g_list_remove_link (active_workers, l); + if ((cur = g_hash_table_lookup (rspamd->workers, GSIZE_TO_POINTER (wrk))) != NULL) { + /* Unlink dead process from queue and hash table */ + + g_hash_table_remove (rspamd->workers, GSIZE_TO_POINTER (wrk)); + g_queue_remove (cur->cf->active_workers, cur); - if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { - /* Normal worker termination, do not fork one more */ - msg_info ("main: %s process %d terminated normally", get_process_type (cur->type), cur->pid); + if (WIFEXITED (res) && WEXITSTATUS (res) == 0) { + /* Normal worker termination, do not fork one more */ + msg_info ("main: %s process %d terminated normally", get_process_type (cur->type), cur->pid); + } + else { + if (WIFSIGNALED (res)) { + msg_warn ("main: %s process %d terminated abnormally by signal: %d", get_process_type (cur->type), cur->pid, WTERMSIG (res)); } else { - if (WIFSIGNALED (res)) { - msg_warn ("main: %s process %d terminated abnormally by signal: %d", get_process_type (cur->type), cur->pid, WTERMSIG (res)); - } - else { - msg_warn ("main: %s process %d terminated abnormally", get_process_type (cur->type), cur->pid); - } - /* Fork another worker in replace of dead one */ - delay_fork (cur->cf); + msg_warn ("main: %s process %d terminated abnormally", get_process_type (cur->type), cur->pid); } - g_list_free_1 (l); - g_free (cur); - break; + /* Fork another worker in replace of dead one */ + delay_fork (cur->cf); } - l = g_list_next (l); + + g_free (cur); + } + else { + msg_err ("main: got SIGCHLD, but pid %ld is not found in workers hash table, something goes wrong", (long int)wrk); } } if (do_restart) { @@ -758,35 +776,9 @@ main (int argc, char **argv, char **env) do_reopen_log = 1; msg_info ("main: rspamd " RVERSION " is restarting"); - l = g_list_first (active_workers); - while (l) { - cur = l->data; - /* Start new workers that would reread configuration */ - cur->pending = FALSE; - active_worker = fork_worker (rspamd, cur->cf); - active_worker->pending = TRUE; - l = g_list_next (l); - } - } - if (child_ready) { - child_ready = 0; - - if (active_worker != NULL) { - l = g_list_first (active_workers); - while (l) { - cur = l->data; - if (!cur->pending && !cur->is_dying) { - /* Send to old workers SIGUSR2 */ - kill (cur->pid, SIGUSR2); - cur->is_dying = 1; - } - else if (!cur->is_dying) { - msg_info ("main: %s process %d has been successfully started", get_process_type (cur->type), cur->pid); - } - l = g_list_next (l); - } - } - active_worker = NULL; + g_hash_table_foreach (rspamd->workers, kill_old_workers, NULL); + spawn_workers (rspamd, FALSE); + } if (got_alarm) { got_alarm = 0; @@ -795,16 +787,7 @@ main (int argc, char **argv, char **env) } /* Wait for workers termination */ - l = g_list_first (active_workers); - while (l) { - cur = l->data; - waitpid (cur->pid, &res, 0); - msg_debug ("main(cleaning): %s process %d terminated", get_process_type (cur->type), cur->pid); - g_free (cur); - l = g_list_next (l); - } - - g_list_free (active_workers); + g_hash_table_foreach_remove (rspamd->workers, wait_for_workers, NULL); msg_info ("main: terminating..."); diff --git a/src/main.h b/src/main.h index c633d4f7a..0ecc38aa9 100644 --- a/src/main.h +++ b/src/main.h @@ -22,7 +22,7 @@ /* Default values */ #define FIXED_CONFIG_FILE ETC_PREFIX "/rspamd.conf" /* Time in seconds to exit for old worker */ -#define SOFT_SHUTDOWN_TIME 60 +#define SOFT_SHUTDOWN_TIME 10 /* Default metric name */ #define DEFAULT_METRIC "default" /* 60 seconds for worker's IO */ @@ -101,6 +101,7 @@ struct rspamd_main { memory_pool_t *server_pool; /**< server's memory pool */ statfile_pool_t *statfile_pool; /**< shared statfiles pool */ + GHashTable *workers; /**< workers pool indexed by pid */ }; struct counter_data { diff --git a/src/util.c b/src/util.c index 59de138aa..fbb5cb547 100644 --- a/src/util.c +++ b/src/util.c @@ -340,18 +340,19 @@ init_signals (struct sigaction *signals, sig_t sig_handler) sigaction (SIGPIPE, &sigpipe_act, NULL); } -void -pass_signal_worker (GList * workers, int signo) +static void +pass_signal_cb (gpointer key, gpointer value, gpointer ud) { - struct rspamd_worker *cur; - GList *l; + struct rspamd_worker *cur = value; + int signo = GPOINTER_TO_INT (ud); - l = workers; - while (l) { - cur = l->data; - kill (cur->pid, signo); - l = g_list_next (l); - } + kill (cur->pid, signo); +} + +void +pass_signal_worker (GHashTable * workers, int signo) +{ + g_hash_table_foreach (workers, pass_signal_cb, GINT_TO_POINTER (signo)); } void @@ -1036,6 +1037,100 @@ gperf_profiler_init (struct config_file *cfg, const char *descr) #endif } +#ifdef HAVE_FLOCK +/* Flock version */ +gboolean +lock_file (int fd, gboolean async) +{ + int flags; + + if (async) { + flags = LOCK_EX | LOCK_NB; + } + else { + flags = LOCK_EX; + } + + if (flock (fd, flags) == -1) { + if (async && errno == EAGAIN) { + return FALSE; + } + msg_warn ("lock_file: lock on file failed: %s", strerror (errno)); + return FALSE; + } + + return TRUE; +} + +gboolean +unlock_file (int fd, gboolean async) +{ + int flags; + + if (async) { + flags = LOCK_UN | LOCK_NB; + } + else { + flags = LOCK_UN; + } + + if (flock (fd, flags) == -1) { + if (async && errno == EAGAIN) { + return FALSE; + } + msg_warn ("lock_file: lock on file failed: %s", strerror (errno)); + return FALSE; + } + + return TRUE; + +} +#else +/* Fctnl version */ +gboolean +lock_file (int fd, gboolean async) +{ + struct flock fl = { + .l_type = F_WRLCK, + .l_whence = SEEK_SET, + .l_start = 0, + .l_len = 0 + }; + + if (fcntl (fd, async ? F_SETLK : F_SETLKW, &fl) == -1) { + if (async && (errno == EAGAIN || errno == EACCES)) { + return FALSE; + } + msg_warn ("lock_file: lock on file failed: %s", strerror (errno)); + return FALSE; + } + + return TRUE; +} + +gboolean +unlock_file (int fd, gboolean async) +{ + struct flock fl = { + .l_type = F_UNLCK, + .l_whence = SEEK_SET, + .l_start = 0, + .l_len = 0 + }; + + if (fcntl (fd, async ? F_SETLK : F_SETLKW, &fl) == -1) { + if (async && (errno == EAGAIN || errno == EACCES)) { + return FALSE; + } + msg_warn ("lock_file: lock on file failed: %s", strerror (errno)); + return FALSE; + } + + return TRUE; + +} +#endif + /* * vi:ts=4 */ diff --git a/src/util.h b/src/util.h index 2cd220ec7..dfc08aa7d 100644 --- a/src/util.h +++ b/src/util.h @@ -27,7 +27,7 @@ int poll_sync_socket (int fd, int timeout, short events); /* Init signals */ void init_signals (struct sigaction *, sig_t); /* Send specified signal to each worker */ -void pass_signal_worker (GList *, int ); +void pass_signal_worker (GHashTable *, int ); /* Convert string to lowercase */ void convert_to_lowercase (char *str, unsigned int size); @@ -69,6 +69,9 @@ const char* calculate_check_time (struct timespec *begin, int resolution); double set_counter (const char *name, long int value); +gboolean lock_file (int fd, gboolean async); +gboolean unlock_file (int fd, gboolean async); + guint rspamd_strcase_hash (gconstpointer key); gboolean rspamd_strcase_equal (gconstpointer v, gconstpointer v2); diff --git a/src/worker.c b/src/worker.c index ded5e57aa..222b36971 100644 --- a/src/worker.c +++ b/src/worker.c @@ -391,9 +391,6 @@ start_worker (struct rspamd_worker *worker) is_mime = TRUE; } - /* Send SIGUSR2 to parent */ - kill (getppid (), SIGUSR2); - event_loop (0); exit (EXIT_SUCCESS); } |