]> source.dussan.org Git - rspamd.git/commitdiff
* Implement new system of managing rspamd processes
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 30 Oct 2009 15:30:51 +0000 (18:30 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 30 Oct 2009 15:30:51 +0000 (18:30 +0300)
13 files changed:
CMakeLists.txt
config.h.in
src/cfg_file.h
src/cfg_file.y
src/cfg_utils.c
src/controller.c
src/fuzzy_storage.c
src/lmtp.c
src/main.c
src/main.h
src/util.c
src/util.h
src/worker.c

index b4789c28b491272faa231bad703d842b7767938b..528b99c62df5017ecabb34e2c7fbd5990eb2d2ff 100644 (file)
@@ -241,6 +241,7 @@ CHECK_FUNCTION_EXISTS(nanosleep HAVE_NANOSLEEP)
 CHECK_FUNCTION_EXISTS(vfork HAVE_VFORK)
 CHECK_FUNCTION_EXISTS(wait4 HAVE_WAIT4)
 CHECK_FUNCTION_EXISTS(waitpid HAVE_WAITPID)
+CHECK_FUNCTION_EXISTS(flock HAVE_FLOCK)
 
 CHECK_SYMBOL_EXISTS(PATH_MAX limits.h HAVE_PATH_MAX)
 CHECK_SYMBOL_EXISTS(MAXPATHLEN sys/param.h HAVE_MAXPATHLEN)
index 2c03d5486c6c76ebb8f5042ade472a99c920d471..b80f399e8baeba6190045edceee399adf8853869 100644 (file)
@@ -92,6 +92,8 @@
 
 #cmakedefine HAVE_WAITPID        1
 
+#cmakedefine HAVE_FLOCK          1
+
 #cmakedefine DEBUG_MODE          1
 
 #cmakedefine GMIME24             1
index 6a0835f51b3b975e0020b051c8a8fc4800f8c371..b165e82f263b011d49c4dcc098c5424c9bb3c3df 100644 (file)
@@ -177,6 +177,8 @@ struct worker_conf {
        int count;                                                                              /**< number of workers                                                                  */
        GHashTable *params;                                                             /**< params for worker                                                                  */
        int listen_sock;                                                                /**< listening socket desctiptor                                                */
+       GQueue *active_workers;                                                 /**< linked list of spawned workers                                             */
+       gboolean has_socket;                                                    /**< whether we should make listening socket in main process */
 };
 
 /**
@@ -328,6 +330,7 @@ void unescape_quotes (char *line);
 
 GList* parse_comma_list (memory_pool_t *pool, char *line);
 struct classifier_config* check_classifier_cfg (struct config_file *cfg, struct classifier_config *c);
+struct worker_conf* check_worker_conf (struct config_file *cfg, struct worker_conf *c);
 
 int yylex (void);
 int yyparse (void);
index 24a94bbc38cbf60242c058495ffe683020cccef7..8ac9cb6a526177e79e86b684a6d82001b29a8d38 100644 (file)
@@ -234,16 +234,7 @@ workercmd:
 
 bindsock:
        BINDSOCK EQSIGN bind_cred {
-               if (cur_worker == NULL) {
-                       cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf));
-                       cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal);
-                       memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params);
-#ifdef HAVE_SC_NPROCESSORS_ONLN
-                       cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN);
-#else
-                       cur_worker->count = DEFAULT_WORKERS_NUM;
-#endif
-               }
+               cur_worker = check_worker_conf (cfg, cur_worker);
 
                if (!parse_bind_line (cfg, cur_worker, $3)) {
                        yyerror ("yyparse: parse_bind_line");
@@ -273,28 +264,22 @@ bind_cred:
 
 workertype:
        TYPE EQSIGN QUOTEDSTRING {
-               if (cur_worker == NULL) {
-                       cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf));
-                       cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal);
-                       memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params);
-#ifdef HAVE_SC_NPROCESSORS_ONLN
-                       cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN);
-#else
-                       cur_worker->count = DEFAULT_WORKERS_NUM;
-#endif
-               }
-               
+               cur_worker = check_worker_conf (cfg, cur_worker);
                if (g_ascii_strcasecmp ($3, "normal") == 0) {
                        cur_worker->type = TYPE_WORKER;
+                       cur_worker->has_socket = TRUE;
                }
                else if (g_ascii_strcasecmp ($3, "controller") == 0) {
                        cur_worker->type = TYPE_CONTROLLER;
+                       cur_worker->has_socket = TRUE;
                }
                else if (g_ascii_strcasecmp ($3, "lmtp") == 0) {
                        cur_worker->type = TYPE_LMTP;
+                       cur_worker->has_socket = TRUE;
                }
                else if (g_ascii_strcasecmp ($3, "fuzzy") == 0) {
                        cur_worker->type = TYPE_FUZZY;
+                       cur_worker->has_socket = FALSE;
                }
                else {
                        yyerror ("yyparse: unknown worker type: %s", $3);
@@ -305,16 +290,7 @@ workertype:
 
 workercount:
        COUNT EQSIGN NUMBER {
-               if (cur_worker == NULL) {
-                       cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf));
-                       cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal);
-                       memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params);
-#ifdef HAVE_SC_NPROCESSORS_ONLN
-                       cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN);
-#else
-                       cur_worker->count = DEFAULT_WORKERS_NUM;
-#endif
-               }
+               cur_worker = check_worker_conf (cfg, cur_worker);
 
                if ($3 > 0) {
                        cur_worker->count = $3;
@@ -328,16 +304,7 @@ workercount:
 
 workerparam:
        STRING EQSIGN QUOTEDSTRING {
-               if (cur_worker == NULL) {
-                       cur_worker = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf));
-                       cur_worker->params = g_hash_table_new (g_str_hash, g_str_equal);
-                       memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, cur_worker->params);
-#ifdef HAVE_SC_NPROCESSORS_ONLN
-                       cur_worker->count = sysconf (_SC_NPROCESSORS_ONLN);
-#else
-                       cur_worker->count = DEFAULT_WORKERS_NUM;
-#endif
-               }
+               cur_worker = check_worker_conf (cfg, cur_worker);
                
                g_hash_table_insert (cur_worker->params, $1, $3);
        }
index 7b64700281cc6629cb816f7521edaffacad3cfc3..13a5e091af4c4e859cde9af9b1a5102b9886773a 100644 (file)
@@ -623,6 +623,24 @@ check_classifier_cfg (struct config_file *cfg, struct classifier_config *c)
        return c;
 }
 
+struct worker_conf *
+check_worker_conf (struct config_file *cfg, struct worker_conf *c)
+{
+       if (c == NULL) {
+               c = memory_pool_alloc0 (cfg->cfg_pool, sizeof (struct worker_conf));
+               c->params = g_hash_table_new (g_str_hash, g_str_equal);
+               c->active_workers = g_queue_new ();
+               memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_hash_table_destroy, c->params);
+               memory_pool_add_destructor (cfg->cfg_pool, (pool_destruct_func)g_queue_free, c->active_workers);
+#ifdef HAVE_SC_NPROCESSORS_ONLN
+               c->count = sysconf (_SC_NPROCESSORS_ONLN);
+#else
+               c->count = DEFAULT_WORKERS_NUM;
+#endif
+       }
+       
+       return c;
+}
 /*
  * vi:ts=4
  */
index 21df8d511c82d9da37c27bde2f09343d74122f57..64ef75d59a19bf946afc40f1cc9d514934bde339 100644 (file)
@@ -611,9 +611,6 @@ start_controller (struct rspamd_worker *worker)
        event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
        event_add (&worker->bind_ev, NULL);
 
-       /* Send SIGUSR2 to parent */
-       kill (getppid (), SIGUSR2);
-
        gperf_profiler_init (worker->srv->cfg, "controller");
 
        event_loop (0);
index 702c5b7c533ff2325fd09e8fc8c33c2073057815..68da96b44fe717b7bfd747f35df9c14d4098b4f0 100644 (file)
@@ -109,12 +109,21 @@ sync_cache (struct rspamd_worker *wrk)
        else {
                expire = DEFAULT_EXPIRE;
        }
+       
+       /* Sync section */
+       if ((fd = open (filename, O_WRONLY)) != -1) {
+               /* Aqquire a lock */
+               (void)lock_file (fd, FALSE);
+               (void)unlock_file (fd, FALSE);
+       }
 
        if ((fd = open (filename, O_WRONLY | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH)) == -1) {
                msg_err ("sync_cache: cannot create hash file %s: %s", filename, strerror (errno));
                return;
        }
 
+       (void)lock_file (fd, FALSE);
+
        now = (uint64_t) time (NULL);
        for (i = 0; i < BUCKETS; i++) {
                cur = hashes[i]->head;
@@ -136,6 +145,7 @@ sync_cache (struct rspamd_worker *wrk)
                }
        }
 
+       (void)unlock_file (fd, FALSE);
        close (fd);
 }
 
@@ -196,6 +206,8 @@ read_hashes_file (struct rspamd_worker *wrk)
                return FALSE;
        }
 
+       (void)lock_file (fd, FALSE);
+
        fstat (fd, &st);
 
        for (;;) {
@@ -208,6 +220,9 @@ read_hashes_file (struct rspamd_worker *wrk)
                bloom_add (bf, node->h.hash_pipe);
        }
 
+       (void)unlock_file (fd, FALSE);
+       close (fd);
+
        if (r > 0) {
                msg_warn ("read_hashes_file: ignore garbadge at the end of file, length of garbadge: %d", r);
        }
@@ -415,9 +430,6 @@ start_fuzzy_storage (struct rspamd_worker *worker)
        signal_set (&sev, SIGTERM, sigterm_handler, (void *)worker);
        signal_add (&sev, NULL);
 
-       /* Send SIGUSR2 to parent */
-       kill (getppid (), SIGUSR2);
-
        /* Init bloom filter */
        bf = bloom_create (20000000L, DEFAULT_BLOOM_HASHES);
        /* Try to read hashes from file */
index 087be2e1a29c3cccc6b6fcffc08ecb79b45c53b2..f9bd6f65e9db59ea275cde7ca7bacd4f84ff5bfa 100644 (file)
@@ -292,9 +292,6 @@ start_lmtp_worker (struct rspamd_worker *worker)
        hostbuf[hostmax - 1] = '\0';
        snprintf (greetingbuf, sizeof (greetingbuf), "%d rspamd version %s LMTP on %s Ready\r\n", LMTP_OK, RVERSION, hostbuf);
 
-       /* Send SIGUSR2 to parent */
-       kill (getppid (), SIGUSR2);
-
        io_tv.tv_sec = WORKER_IO_TIMEOUT;
        io_tv.tv_usec = 0;
 
index 7029444679e49c172264ff2a36966a8df1ff3b16..4b811bb36c6095c55d0863a56b09051c890766ae 100644 (file)
@@ -57,7 +57,6 @@ static struct rspamd_worker    *fork_worker (struct rspamd_main *, struct worker
 sig_atomic_t                    do_restart;
 sig_atomic_t                    do_terminate;
 sig_atomic_t                    child_dead;
-sig_atomic_t                    child_ready;
 sig_atomic_t                    got_alarm;
 
 extern int                      yynerrs;
@@ -71,8 +70,6 @@ extern void                     xs_init (pTHX);
 extern PerlInterpreter         *perl_interpreter;
 #endif
 
-/* Active workers */
-static GList                   *active_workers = NULL;
 /* List of workers that are pending to start */
 static GList                   *workers_pending = NULL;
 
@@ -93,7 +90,7 @@ sig_handler (int signo)
                child_dead = 1;
                break;
        case SIGUSR2:
-               child_ready = 1;
+               /* Do nothing */
                break;
        case SIGALRM:
                got_alarm = 1;
@@ -285,7 +282,7 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
        cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker));
        if (cur) {
                bzero (cur, sizeof (struct rspamd_worker));
-               active_workers = g_list_prepend (active_workers, cur);
+               g_queue_push_head (cf->active_workers, cur);
                cur->srv = rspamd;
                cur->type = cf->type;
                cur->pid = fork ();
@@ -328,6 +325,10 @@ fork_worker (struct rspamd_main *rspamd, struct worker_conf *cf)
                        pidfile_remove (rspamd->pfh);
                        exit (-errno);
                        break;
+               default:
+                       /* Insert worker into worker's table, pid is index */
+                       g_hash_table_insert (rspamd->workers, GSIZE_TO_POINTER (cur->pid), cur);
+                       break;
                }
        }
 
@@ -416,7 +417,7 @@ fork_delayed (struct rspamd_main *rspamd)
 }
 
 static void
-spawn_workers (struct rspamd_main *rspamd)
+spawn_workers (struct rspamd_main *rspamd, gboolean make_sockets)
 {
        GList                          *cur;
        struct worker_conf             *cf;
@@ -427,8 +428,8 @@ spawn_workers (struct rspamd_main *rspamd)
        while (cur) {
                cf = cur->data;
 
-               /* Create listen socket */
-               if (cf->type != TYPE_FUZZY) {
+               if (make_sockets && cf->has_socket) {
+                       /* Create listen socket */
                        listen_sock = create_listen_socket (&cf->bind_addr, cf->bind_port, cf->bind_family, cf->bind_host);
                        if (listen_sock == -1) {
                                exit (-errno);
@@ -463,6 +464,29 @@ get_process_type (enum process_type type)
        return NULL;
 }
 
+static void
+kill_old_workers (gpointer key, gpointer value, gpointer unused)
+{
+       struct rspamd_worker         *w = value;
+
+       kill (w->pid, SIGUSR2);
+       msg_info ("rspamd_restart: send signal to worker %ld", (long int)w->pid);
+}
+
+static gboolean
+wait_for_workers (gpointer key, gpointer value, gpointer unused)
+{
+       struct rspamd_worker          *w = value;
+       int                            res = 0;
+
+       waitpid (w->pid, &res, 0);
+
+       msg_debug ("main(cleaning): %s process %d terminated", get_process_type (w->type), w->pid);
+       g_free (w);
+
+       return TRUE;
+}
+
 int
 main (int argc, char **argv, char **env)
 {
@@ -470,7 +494,7 @@ main (int argc, char **argv, char **env)
        struct module_ctx              *cur_module = NULL;
        int                             res = 0, i;
        struct sigaction                signals;
-       struct rspamd_worker           *cur, *active_worker;
+       struct rspamd_worker           *cur;
        struct rlimit                   rlim;
        struct metric                  *metric;
        struct cache_item              *item;
@@ -495,9 +519,7 @@ main (int argc, char **argv, char **env)
        do_terminate = 0;
        do_restart = 0;
        child_dead = 0;
-       child_ready = 0;
        do_reopen_log = 0;
-       active_worker = NULL;
 
        rspamd->stat = memory_pool_alloc_shared (rspamd->server_pool, sizeof (struct rspamd_stat));
        bzero (rspamd->stat, sizeof (struct rspamd_stat));
@@ -704,8 +726,8 @@ main (int argc, char **argv, char **env)
                l = g_list_next (l);
        }
 
-
-       spawn_workers (rspamd);
+       rspamd->workers = g_hash_table_new (g_direct_hash, g_direct_equal);
+       spawn_workers (rspamd, TRUE);
 
        /* Signal processing cycle */
        for (;;) {
@@ -714,7 +736,7 @@ main (int argc, char **argv, char **env)
                sigsuspend (&signals.sa_mask);
                if (do_terminate) {
                        msg_debug ("main: catch termination signal, waiting for childs");
-                       pass_signal_worker (active_workers, SIGTERM);
+                       pass_signal_worker (rspamd->workers, SIGTERM);
                        break;
                }
                if (child_dead) {
@@ -722,35 +744,31 @@ main (int argc, char **argv, char **env)
                        msg_debug ("main: catch SIGCHLD signal, finding terminated worker");
                        /* Remove dead child form childs list */
                        wrk = waitpid (0, &res, 0);
-                       l = g_list_first (active_workers);
-                       while (l) {
-                               cur = l->data;
-                               if (wrk == cur->pid) {
-                                       /* Catch situations if active worker is abnormally terminated */
-                                       if (cur == active_worker) {
-                                               active_worker = NULL;
-                                       }
-                                       active_workers = g_list_remove_link (active_workers, l);
+                       if ((cur = g_hash_table_lookup (rspamd->workers, GSIZE_TO_POINTER (wrk))) != NULL) {
+                               /* Unlink dead process from queue and hash table */
+
+                               g_hash_table_remove (rspamd->workers, GSIZE_TO_POINTER (wrk));
+                               g_queue_remove (cur->cf->active_workers, cur);
 
-                                       if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
-                                               /* Normal worker termination, do not fork one more */
-                                               msg_info ("main: %s process %d terminated normally", get_process_type (cur->type), cur->pid);
+                               if (WIFEXITED (res) && WEXITSTATUS (res) == 0) {
+                                       /* Normal worker termination, do not fork one more */
+                                       msg_info ("main: %s process %d terminated normally", get_process_type (cur->type), cur->pid);
+                               }
+                               else {
+                                       if (WIFSIGNALED (res)) {
+                                               msg_warn ("main: %s process %d terminated abnormally by signal: %d", get_process_type (cur->type), cur->pid, WTERMSIG (res));
                                        }
                                        else {
-                                               if (WIFSIGNALED (res)) {
-                                                       msg_warn ("main: %s process %d terminated abnormally by signal: %d", get_process_type (cur->type), cur->pid, WTERMSIG (res));
-                                               }
-                                               else {
-                                                       msg_warn ("main: %s process %d terminated abnormally", get_process_type (cur->type), cur->pid);
-                                               }
-                                               /* Fork another worker in replace of dead one */
-                                               delay_fork (cur->cf);
+                                               msg_warn ("main: %s process %d terminated abnormally", get_process_type (cur->type), cur->pid);
                                        }
-                                       g_list_free_1 (l);
-                                       g_free (cur);
-                                       break;
+                                       /* Fork another worker in replace of dead one */
+                                       delay_fork (cur->cf);
                                }
-                               l = g_list_next (l);
+
+                               g_free (cur);
+                       }
+                       else {
+                               msg_err ("main: got SIGCHLD, but pid %ld is not found in workers hash table, something goes wrong", (long int)wrk);
                        }
                }
                if (do_restart) {
@@ -758,35 +776,9 @@ main (int argc, char **argv, char **env)
                        do_reopen_log = 1;
 
                        msg_info ("main: rspamd " RVERSION " is restarting");
-                       l = g_list_first (active_workers);
-                       while (l) {
-                               cur = l->data;
-                               /* Start new workers that would reread configuration */
-                               cur->pending = FALSE;
-                               active_worker = fork_worker (rspamd, cur->cf);
-                               active_worker->pending = TRUE;
-                               l = g_list_next (l);
-                       }
-               }
-               if (child_ready) {
-                       child_ready = 0;
-
-                       if (active_worker != NULL) {
-                               l = g_list_first (active_workers);
-                               while (l) {
-                                       cur = l->data;
-                                       if (!cur->pending && !cur->is_dying) {
-                                               /* Send to old workers SIGUSR2 */
-                                               kill (cur->pid, SIGUSR2);
-                                               cur->is_dying = 1;
-                                       }
-                                       else if (!cur->is_dying) {
-                                               msg_info ("main: %s process %d has been successfully started", get_process_type (cur->type), cur->pid);
-                                       }
-                                       l = g_list_next (l);
-                               }
-                       }
-                       active_worker = NULL;
+                       g_hash_table_foreach (rspamd->workers, kill_old_workers, NULL);
+                       spawn_workers (rspamd, FALSE);
+
                }
                if (got_alarm) {
                        got_alarm = 0;
@@ -795,16 +787,7 @@ main (int argc, char **argv, char **env)
        }
 
        /* Wait for workers termination */
-       l = g_list_first (active_workers);
-       while (l) {
-               cur = l->data;
-               waitpid (cur->pid, &res, 0);
-               msg_debug ("main(cleaning): %s process %d terminated", get_process_type (cur->type), cur->pid);
-               g_free (cur);
-               l = g_list_next (l);
-       }
-
-       g_list_free (active_workers);
+       g_hash_table_foreach_remove (rspamd->workers, wait_for_workers, NULL);
 
        msg_info ("main: terminating...");
 
index c633d4f7ada9abe18c90ecc68c51c53764f419f7..0ecc38aa95c0b975aa643ada1b10f42cf75b19d1 100644 (file)
@@ -22,7 +22,7 @@
 /* Default values */
 #define FIXED_CONFIG_FILE ETC_PREFIX "/rspamd.conf"
 /* Time in seconds to exit for old worker */
-#define SOFT_SHUTDOWN_TIME 60
+#define SOFT_SHUTDOWN_TIME 10
 /* Default metric name */
 #define DEFAULT_METRIC "default"
 /* 60 seconds for worker's IO */
@@ -101,6 +101,7 @@ struct rspamd_main {
 
        memory_pool_t *server_pool;                                                                     /**< server's memory pool                                                       */
        statfile_pool_t *statfile_pool;                                                         /**< shared statfiles pool                                                      */
+    GHashTable *workers;                                        /**< workers pool indexed by pid                    */
 };
 
 struct counter_data {
index 59de138aa299abb221a050a66a9ad09aff7310aa..fbb5cb547e5c73af1f44332fc763598bf1da085f 100644 (file)
@@ -340,18 +340,19 @@ init_signals (struct sigaction *signals, sig_t sig_handler)
        sigaction (SIGPIPE, &sigpipe_act, NULL);
 }
 
-void
-pass_signal_worker (GList * workers, int signo)
+static void
+pass_signal_cb (gpointer key, gpointer value, gpointer ud)
 {
-       struct rspamd_worker           *cur;
-       GList                          *l;
+       struct rspamd_worker           *cur = value;
+    int                             signo = GPOINTER_TO_INT (ud);
 
-       l = workers;
-       while (l) {
-               cur = l->data;
-               kill (cur->pid, signo);
-               l = g_list_next (l);
-       }
+       kill (cur->pid, signo);
+}
+
+void
+pass_signal_worker (GHashTable * workers, int signo)
+{
+    g_hash_table_foreach (workers, pass_signal_cb, GINT_TO_POINTER (signo));
 }
 
 void
@@ -1036,6 +1037,100 @@ gperf_profiler_init (struct config_file *cfg, const char *descr)
 #endif
 }
 
+#ifdef HAVE_FLOCK
+/* Flock version */
+gboolean 
+lock_file (int fd, gboolean async)
+{
+    int flags;
+
+    if (async) {
+        flags = LOCK_EX | LOCK_NB;
+    }
+    else {
+        flags = LOCK_EX;
+    }
+
+    if (flock (fd, flags) == -1) {
+        if (async && errno == EAGAIN) {
+            return FALSE;
+        }
+        msg_warn ("lock_file: lock on file failed: %s", strerror (errno));
+        return FALSE;
+    }
+
+    return TRUE;
+}
+
+gboolean 
+unlock_file (int fd, gboolean async)
+{
+    int flags;
+
+    if (async) {
+        flags = LOCK_UN | LOCK_NB;
+    }
+    else {
+        flags = LOCK_UN;
+    }
+
+    if (flock (fd, flags) == -1) {
+        if (async && errno == EAGAIN) {
+            return FALSE;
+        }
+        msg_warn ("lock_file: lock on file failed: %s", strerror (errno));
+        return FALSE;
+    }
+
+    return TRUE;
+
+}
+#else
+/* Fctnl version */
+gboolean 
+lock_file (int fd, gboolean async)
+{
+    struct flock fl = {
+        .l_type = F_WRLCK,
+        .l_whence = SEEK_SET,
+        .l_start = 0,
+        .l_len = 0
+    };
+
+    if (fcntl (fd, async ? F_SETLK : F_SETLKW, &fl) == -1) {
+        if (async && (errno == EAGAIN || errno == EACCES)) {
+            return FALSE;
+        }
+        msg_warn ("lock_file: lock on file failed: %s", strerror (errno));
+        return FALSE;
+    }
+
+    return TRUE;
+}
+
+gboolean 
+unlock_file (int fd, gboolean async)
+{
+    struct flock fl = {
+        .l_type = F_UNLCK,
+        .l_whence = SEEK_SET,
+        .l_start = 0,
+        .l_len = 0
+    };
+
+    if (fcntl (fd, async ? F_SETLK : F_SETLKW, &fl) == -1) {
+        if (async && (errno == EAGAIN || errno == EACCES)) {
+            return FALSE;
+        }
+        msg_warn ("lock_file: lock on file failed: %s", strerror (errno));
+        return FALSE;
+    }
+
+    return TRUE;
+
+}
+#endif
+
 /*
  * vi:ts=4
  */
index 2cd220ec7e0163c58844fdfbffd77b4306c51588..dfc08aa7d2164a0fad7e01f60b1bcd873771184a 100644 (file)
@@ -27,7 +27,7 @@ int poll_sync_socket (int fd, int timeout, short events);
 /* Init signals */
 void init_signals (struct sigaction *, sig_t);
 /* Send specified signal to each worker */
-void pass_signal_worker (GList *, int );
+void pass_signal_worker (GHashTable *, int );
 /* Convert string to lowercase */
 void convert_to_lowercase (char *str, unsigned int size);
 
@@ -69,6 +69,9 @@ const char* calculate_check_time (struct timespec *begin, int resolution);
 
 double set_counter (const char *name, long int value);
 
+gboolean lock_file (int fd, gboolean async);
+gboolean unlock_file (int fd, gboolean async);
+
 guint rspamd_strcase_hash (gconstpointer key);
 gboolean rspamd_strcase_equal (gconstpointer v, gconstpointer v2);
 
index ded5e57aa6bca588b02a77404f7a421330037c7f..222b36971ea39bd673fff3798f1d7c0d7465e2d6 100644 (file)
@@ -391,9 +391,6 @@ start_worker (struct rspamd_worker *worker)
                is_mime = TRUE;
        }
 
-       /* Send SIGUSR2 to parent */
-       kill (getppid (), SIGUSR2);
-
        event_loop (0);
        exit (EXIT_SUCCESS);
 }