CHECK_FUNCTION_EXISTS(vfork HAVE_VFORK)
CHECK_FUNCTION_EXISTS(wait4 HAVE_WAIT4)
CHECK_FUNCTION_EXISTS(waitpid HAVE_WAITPID)
+CHECK_FUNCTION_EXISTS(flock HAVE_FLOCK)
CHECK_SYMBOL_EXISTS(PATH_MAX limits.h HAVE_PATH_MAX)
CHECK_SYMBOL_EXISTS(MAXPATHLEN sys/param.h HAVE_MAXPATHLEN)
#cmakedefine HAVE_WAITPID 1
+#cmakedefine HAVE_FLOCK 1
+
#cmakedefine DEBUG_MODE 1
#cmakedefine GMIME24 1
int count; /**< number of workers */
GHashTable *params; /**< params for worker */
int listen_sock; /**< listening socket desctiptor */
+ GQueue *active_workers; /**< linked list of spawned workers */
+ gboolean has_socket; /**< whether we should make listening socket in main process */
};
/**
GList* parse_comma_list (memory_pool_t *pool, char *line);
struct classifier_config* check_classifier_cfg (struct config_file *cfg, struct classifier_config *c);
+struct worker_conf* check_worker_conf (struct config_file *cfg, struct worker_conf *c);
int yylex (void);
int yyparse (void);
bindsock:
BINDSOCK EQSIGN bind_cred {
- if (cur_worker == NULL) {
- cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf));
- cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal);
- memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params);
-#ifdef HAVE_SC_NPROCESSORS_ONLN
- cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN);
-#else
- cur_worker->count = DEFAULT_WORKERS_NUM;
-#endif
- }
+ cur_worker = check_worker_conf (cfg, cur_worker);
if (!parse_bind_line (cfg, cur_worker, $3)) {
yyerror ("yyparse: parse_bind_line");
workertype:
TYPE EQSIGN QUOTEDSTRING {
- if (cur_worker == NULL) {
- cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf));
- cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal);
- memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params);
-#ifdef HAVE_SC_NPROCESSORS_ONLN
- cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN);
-#else
- cur_worker->count = DEFAULT_WORKERS_NUM;
-#endif
- }
-
+ cur_worker = check_worker_conf (cfg, cur_worker);
if (g_ascii_strcasecmp ($3, "normal") == 0) {
cur_worker->type = TYPE_WORKER;
+ cur_worker->has_socket = TRUE;
}
else if (g_ascii_strcasecmp ($3, "controller") == 0) {
cur_worker->type = TYPE_CONTROLLER;
+ cur_worker->has_socket = TRUE;
}
else if (g_ascii_strcasecmp ($3, "lmtp") == 0) {
cur_worker->type = TYPE_LMTP;
+ cur_worker->has_socket = TRUE;
}
else if (g_ascii_strcasecmp ($3, "fuzzy") == 0) {
cur_worker->type = TYPE_FUZZY;
+ cur_worker->has_socket = FALSE;
}
else {
yyerror ("yyparse: unknown worker type: %s", $3);
workercount:
COUNT EQSIGN NUMBER {
- if (cur_worker == NULL) {
- cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf));
- cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal);
- memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params);
-#ifdef HAVE_SC_NPROCESSORS_ONLN
- cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN);
-#else
- cur_worker->count = DEFAULT_WORKERS_NUM;
-#endif
- }
+ cur_worker = check_worker_conf (cfg, cur_worker);
if ($3 > 0) {
cur_worker->count = $3;
workerparam:
STRING EQSIGN QUOTEDSTRING {
- if (cur_worker == NULL) {
- cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf));
- cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal);
- memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params);
-#ifdef HAVE_SC_NPROCESSORS_ONLN
- cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN);
-#else
- cur_worker->count = DEFAULT_WORKERS_NUM;
-#endif
- }
+ cur_worker = check_worker_conf (cfg, cur_worker);
g_hash_table_insert (cur_worker->params, $1, $3);
}
return c;
}
+struct worker_conf *
+check_worker_conf (struct config_file *cfg, struct worker_conf *c)
+{
+ if (c == NULL) {
+ c = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf));
+ c->params = g_hash_table_new (g_str_hash, g_str_equal);
+ c->active_workers = g_queue_new ();
+ memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, c->params);
+ memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_queue_free, c->active_workers);
+#ifdef HAVE_SC_NPROCESSORS_ONLN
+ c->count = sysconf (_SC_NPROCESSORS_ONLN);
+#else
+ c->count = DEFAULT_WORKERS_NUM;
+#endif
+ }
+
+ return c;
+}
/*
* vi:ts=4
*/
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
event_add (&worker->bind_ev, NULL);
- /* Send SIGUSR2 to parent */
- kill (getppid (), SIGUSR2);
-
gperf_profiler_init (worker->srv->cfg, "controller");
event_loop (0);
else {
expire = DEFAULT_EXPIRE;
}
+
+ /* Sync section */
+ if ((fd = open (filename, O_WRONLY)) != -1) {
+ /* Aqquire a lock */
+ (void)lock_file (fd, FALSE);
+ (void)unlock_file (fd, FALSE);
+ }
if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) {
msg_err ("sync_cache: cannot create hash file %s: %s", filename, strerror (errno));
return;
}
+ (void)lock_file (fd, FALSE);
+
now = (uint64_t) time (NULL);
for (i = 0; i < BUCKETS; i++) {
cur = hashes[i]->head;
}
}
+ (void)unlock_file (fd, FALSE);
close (fd);
}
return FALSE;
}
+ (void)lock_file (fd, FALSE);
+
fstat (fd, &st);
for (;;) {
bloom_add (bf, node->h.hash_pipe);
}
+ (void)unlock_file (fd, FALSE);
+ close (fd);
+
if (r > 0) {
msg_warn ("read_hashes_file: ignore garbadge at the end of file, length of garbadge: %d", r);
}
signal_set (&sev, SIGTERM, sigterm_handler, (void *)worker);
signal_add (&sev, NULL);
- /* Send SIGUSR2 to parent */
- kill (getppid (), SIGUSR2);
-
/* Init bloom filter */
bf = bloom_create (20000000L, DEFAULT_BLOOM_HASHES);
/* Try to read hashes from file */
hostbuf[hostmax - 1] = '\0';
snprintf (greetingbuf, sizeof (greetingbuf), "%d rspamd version %s LMTP on %s Ready\r\n", LMTP_OK, RVERSION, hostbuf);
- /* Send SIGUSR2 to parent */
- kill (getppid (), SIGUSR2);
-
io_tv.tv_sec = WORKER_IO_TIMEOUT;
io_tv.tv_usec = 0;
sig_atomic_t do_restart;
sig_atomic_t do_terminate;
sig_atomic_t child_dead;
-sig_atomic_t child_ready;
sig_atomic_t got_alarm;
extern int yynerrs;
extern PerlInterpreter *perl_interpreter;
#endif
-/* Active workers */
-static GList *active_workers = NULL;
/* List of workers that are pending to start */
static GList *workers_pending = NULL;
child_dead = 1;
break;
case SIGUSR2:
- child_ready = 1;
+ /* Do nothing */
break;
case SIGALRM:
got_alarm = 1;
cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker));
if (cur) {
bzero (cur, sizeof (struct rspamd_worker));
- active_workers = g_list_prepend (active_workers, cur);
+ g_queue_push_head (cf->active_workers, cur);
cur->srv = rspamd;
cur->type = cf->type;
cur->pid = fork ();
pidfile_remove (rspamd->pfh);
exit (-errno);
break;
+ default:
+ /* Insert worker into worker's table, pid is index */
+ g_hash_table_insert (rspamd->workers, GSIZE_TO_POINTER (cur->pid), cur);
+ break;
}
}
}
static void
-spawn_workers (struct rspamd_main *rspamd)
+spawn_workers (struct rspamd_main *rspamd, gboolean make_sockets)
{
GList *cur;
struct worker_conf *cf;
while (cur) {
cf = cur->data;
- /* Create listen socket */
- if (cf->type != TYPE_FUZZY) {
+ if (make_sockets && cf->has_socket) {
+ /* Create listen socket */
listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port, cf->bind_family, cf->bind_host);
if (listen_sock == -1) {
exit (-errno);
return NULL;
}
+static void
+kill_old_workers (gpointer key, gpointer value, gpointer unused)
+{
+ struct rspamd_worker *w = value;
+
+ kill (w->pid, SIGUSR2);
+ msg_info ("rspamd_restart: send signal to worker %ld", (long int)w->pid);
+}
+
+static gboolean
+wait_for_workers (gpointer key, gpointer value, gpointer unused)
+{
+ struct rspamd_worker *w = value;
+ int res = 0;
+
+ waitpid (w->pid, &res, 0);
+
+ msg_debug ("main(cleaning): %s process %d terminated", get_process_type (w->type), w->pid);
+ g_free (w);
+
+ return TRUE;
+}
+
int
main (int argc, char **argv, char **env)
{
struct module_ctx *cur_module = NULL;
int res = 0, i;
struct sigaction signals;
- struct rspamd_worker *cur, *active_worker;
+ struct rspamd_worker *cur;
struct rlimit rlim;
struct metric *metric;
struct cache_item *item;
do_terminate = 0;
do_restart = 0;
child_dead = 0;
- child_ready = 0;
do_reopen_log = 0;
- active_worker = NULL;
rspamd->stat = memory_pool_alloc_shared (rspamd->server_pool, sizeof (struct rspamd_stat));
bzero (rspamd->stat, sizeof (struct rspamd_stat));
l = g_list_next (l);
}
-
- spawn_workers (rspamd);
+ rspamd->workers = g_hash_table_new (g_direct_hash, g_direct_equal);
+ spawn_workers (rspamd, TRUE);
/* Signal processing cycle */
for (;;) {
sigsuspend (&signals.sa_mask);
if (do_terminate) {
msg_debug ("main: catch termination signal, waiting for childs");
- pass_signal_worker (active_workers, SIGTERM);
+ pass_signal_worker (rspamd->workers, SIGTERM);
break;
}
if (child_dead) {
msg_debug ("main: catch SIGCHLD signal, finding terminated worker");
/* Remove dead child form childs list */
wrk = waitpid (0, &res, 0);
- l = g_list_first (active_workers);
- while (l) {
- cur = l->data;
- if (wrk == cur->pid) {
- /* Catch situations if active worker is abnormally terminated */
- if (cur == active_worker) {
- active_worker = NULL;
- }
- active_workers = g_list_remove_link (active_workers, l);
+ if ((cur = g_hash_table_lookup (rspamd->workers, GSIZE_TO_POINTER (wrk))) != NULL) {
+ /* Unlink dead process from queue and hash table */
+
+ g_hash_table_remove (rspamd->workers, GSIZE_TO_POINTER (wrk));
+ g_queue_remove (cur->cf->active_workers, cur);
- if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
- /* Normal worker termination, do not fork one more */
- msg_info ("main: %s process %d terminated normally", get_process_type (cur->type), cur->pid);
+ if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
+ /* Normal worker termination, do not fork one more */
+ msg_info ("main: %s process %d terminated normally", get_process_type (cur->type), cur->pid);
+ }
+ else {
+ if (WIFSIGNALED (res)) {
+ msg_warn ("main: %s process %d terminated abnormally by signal: %d", get_process_type (cur->type), cur->pid, WTERMSIG (res));
}
else {
- if (WIFSIGNALED (res)) {
- msg_warn ("main: %s process %d terminated abnormally by signal: %d", get_process_type (cur->type), cur->pid, WTERMSIG (res));
- }
- else {
- msg_warn ("main: %s process %d terminated abnormally", get_process_type (cur->type), cur->pid);
- }
- /* Fork another worker in replace of dead one */
- delay_fork (cur->cf);
+ msg_warn ("main: %s process %d terminated abnormally", get_process_type (cur->type), cur->pid);
}
- g_list_free_1 (l);
- g_free (cur);
- break;
+ /* Fork another worker in replace of dead one */
+ delay_fork (cur->cf);
}
- l = g_list_next (l);
+
+ g_free (cur);
+ }
+ else {
+ msg_err ("main: got SIGCHLD, but pid %ld is not found in workers hash table, something goes wrong", (long int)wrk);
}
}
if (do_restart) {
do_reopen_log = 1;
msg_info ("main: rspamd " RVERSION " is restarting");
- l = g_list_first (active_workers);
- while (l) {
- cur = l->data;
- /* Start new workers that would reread configuration */
- cur->pending = FALSE;
- active_worker = fork_worker (rspamd, cur->cf);
- active_worker->pending = TRUE;
- l = g_list_next (l);
- }
- }
- if (child_ready) {
- child_ready = 0;
-
- if (active_worker != NULL) {
- l = g_list_first (active_workers);
- while (l) {
- cur = l->data;
- if (!cur->pending && !cur->is_dying) {
- /* Send to old workers SIGUSR2 */
- kill (cur->pid, SIGUSR2);
- cur->is_dying = 1;
- }
- else if (!cur->is_dying) {
- msg_info ("main: %s process %d has been successfully started", get_process_type (cur->type), cur->pid);
- }
- l = g_list_next (l);
- }
- }
- active_worker = NULL;
+ g_hash_table_foreach (rspamd->workers, kill_old_workers, NULL);
+ spawn_workers (rspamd, FALSE);
+
}
if (got_alarm) {
got_alarm = 0;
}
/* Wait for workers termination */
- l = g_list_first (active_workers);
- while (l) {
- cur = l->data;
- waitpid (cur->pid, &res, 0);
- msg_debug ("main(cleaning): %s process %d terminated", get_process_type (cur->type), cur->pid);
- g_free (cur);
- l = g_list_next (l);
- }
-
- g_list_free (active_workers);
+ g_hash_table_foreach_remove (rspamd->workers, wait_for_workers, NULL);
msg_info ("main: terminating...");
/* Default values */
#define FIXED_CONFIG_FILE ETC_PREFIX "/rspamd.conf"
/* Time in seconds to exit for old worker */
-#define SOFT_SHUTDOWN_TIME 60
+#define SOFT_SHUTDOWN_TIME 10
/* Default metric name */
#define DEFAULT_METRIC "default"
/* 60 seconds for worker's IO */
memory_pool_t *server_pool; /**< server's memory pool */
statfile_pool_t *statfile_pool; /**< shared statfiles pool */
+ GHashTable *workers; /**< workers pool indexed by pid */
};
struct counter_data {
sigaction (SIGPIPE, &sigpipe_act, NULL);
}
-void
-pass_signal_worker (GList * workers, int signo)
+static void
+pass_signal_cb (gpointer key, gpointer value, gpointer ud)
{
- struct rspamd_worker *cur;
- GList *l;
+ struct rspamd_worker *cur = value;
+ int signo = GPOINTER_TO_INT (ud);
- l = workers;
- while (l) {
- cur = l->data;
- kill (cur->pid, signo);
- l = g_list_next (l);
- }
+ kill (cur->pid, signo);
+}
+
+void
+pass_signal_worker (GHashTable * workers, int signo)
+{
+ g_hash_table_foreach (workers, pass_signal_cb, GINT_TO_POINTER (signo));
}
void
#endif
}
+#ifdef HAVE_FLOCK
+/* Flock version */
+gboolean
+lock_file (int fd, gboolean async)
+{
+ int flags;
+
+ if (async) {
+ flags = LOCK_EX | LOCK_NB;
+ }
+ else {
+ flags = LOCK_EX;
+ }
+
+ if (flock (fd, flags) == -1) {
+ if (async && errno == EAGAIN) {
+ return FALSE;
+ }
+ msg_warn ("lock_file: lock on file failed: %s", strerror (errno));
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+gboolean
+unlock_file (int fd, gboolean async)
+{
+ int flags;
+
+ if (async) {
+ flags = LOCK_UN | LOCK_NB;
+ }
+ else {
+ flags = LOCK_UN;
+ }
+
+ if (flock (fd, flags) == -1) {
+ if (async && errno == EAGAIN) {
+ return FALSE;
+ }
+ msg_warn ("lock_file: lock on file failed: %s", strerror (errno));
+ return FALSE;
+ }
+
+ return TRUE;
+
+}
+#else
+/* Fctnl version */
+gboolean
+lock_file (int fd, gboolean async)
+{
+ struct flock fl = {
+ .l_type = F_WRLCK,
+ .l_whence = SEEK_SET,
+ .l_start = 0,
+ .l_len = 0
+ };
+
+ if (fcntl (fd, async ? F_SETLK : F_SETLKW, &fl) == -1) {
+ if (async && (errno == EAGAIN || errno == EACCES)) {
+ return FALSE;
+ }
+ msg_warn ("lock_file: lock on file failed: %s", strerror (errno));
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+gboolean
+unlock_file (int fd, gboolean async)
+{
+ struct flock fl = {
+ .l_type = F_UNLCK,
+ .l_whence = SEEK_SET,
+ .l_start = 0,
+ .l_len = 0
+ };
+
+ if (fcntl (fd, async ? F_SETLK : F_SETLKW, &fl) == -1) {
+ if (async && (errno == EAGAIN || errno == EACCES)) {
+ return FALSE;
+ }
+ msg_warn ("lock_file: lock on file failed: %s", strerror (errno));
+ return FALSE;
+ }
+
+ return TRUE;
+
+}
+#endif
+
/*
* vi:ts=4
*/
/* Init signals */
void init_signals (struct sigaction *, sig_t);
/* Send specified signal to each worker */
-void pass_signal_worker (GList *, int );
+void pass_signal_worker (GHashTable *, int );
/* Convert string to lowercase */
void convert_to_lowercase (char *str, unsigned int size);
double set_counter (const char *name, long int value);
+gboolean lock_file (int fd, gboolean async);
+gboolean unlock_file (int fd, gboolean async);
+
guint rspamd_strcase_hash (gconstpointer key);
gboolean rspamd_strcase_equal (gconstpointer v, gconstpointer v2);
is_mime = TRUE;
}
- /* Send SIGUSR2 to parent */
- kill (getppid (), SIGUSR2);
-
event_loop (0);
exit (EXIT_SUCCESS);
}