summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-20 15:07:58 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-22 10:57:29 +0100
commiteafdd221037b41ddcba0add79fd8efccecaf0775 (patch)
treee66add9878fad4934f952e422308d8a2f836fdb7
parentf5a7eddda980f8d6f2e0aaeca46ed4bd6f0f34b4 (diff)
downloadrspamd-eafdd221037b41ddcba0add79fd8efccecaf0775.tar.gz
rspamd-eafdd221037b41ddcba0add79fd8efccecaf0775.zip
[Minor] Try to fix more issues
-rw-r--r--src/libserver/rspamd_control.c4
-rw-r--r--src/libserver/worker_util.c2
-rw-r--r--src/libutil/map.c12
-rw-r--r--src/rspamd.c91
4 files changed, 89 insertions, 20 deletions
diff --git a/src/libserver/rspamd_control.c b/src/libserver/rspamd_control.c
index 62ca24643..1d161f6bc 100644
--- a/src/libserver/rspamd_control.c
+++ b/src/libserver/rspamd_control.c
@@ -727,7 +727,7 @@ rspamd_control_hs_io_handler (int fd, short what, void *ud)
/* At this point we just ignore replies from the workers */
(void)read (fd, &rep, sizeof (rep));
- rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev);
+ rspamd_ev_watcher_stop (elt->wrk->srv->event_loop, &elt->ev);
g_free (elt);
}
@@ -740,7 +740,7 @@ rspamd_control_log_pipe_io_handler (int fd, short what, void *ud)
/* At this point we just ignore replies from the workers */
(void) read (fd, &rep, sizeof (rep));
- rspamd_ev_watcher_stop (ev_default_loop (0), &elt->ev);
+ rspamd_ev_watcher_stop (elt->wrk->srv->event_loop, &elt->ev);
g_free (elt);
}
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index 70d349c2c..f7b4ee9ab 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -321,6 +321,8 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
event_loop = ev_default_loop (EVFLAG_SIGNALFD);
+ worker->srv->event_loop = event_loop;
+
rspamd_worker_init_signals (worker, event_loop);
rspamd_control_worker_add_default_handler (worker, event_loop);
#ifdef WITH_HIREDIS
diff --git a/src/libutil/map.c b/src/libutil/map.c
index 3ca94806f..eadf0279c 100644
--- a/src/libutil/map.c
+++ b/src/libutil/map.c
@@ -274,7 +274,12 @@ free_http_cbdata_common (struct http_callback_data *cbd, gboolean plan_new)
MAP_RELEASE (cbd->bk, "rspamd_map_backend");
- MAP_RELEASE (periodic, "periodic");
+
+ if (periodic) {
+ /* Detached in case of HTTP error */
+ MAP_RELEASE (periodic, "periodic");
+ }
+
g_free (cbd);
}
@@ -325,7 +330,11 @@ http_map_error (struct rspamd_http_connection *conn,
cbd->bk->uri,
cbd->addr ? rspamd_inet_address_to_string_pretty (cbd->addr) : "",
err);
+ MAP_RETAIN (cbd->periodic, "periodic");
rspamd_map_process_periodic (cbd->periodic);
+ MAP_RELEASE (cbd->periodic, "periodic");
+ /* Detach periodic as rspamd_map_process_periodic will destroy it */
+ cbd->periodic = NULL;
MAP_RELEASE (cbd, "http_callback_data");
}
@@ -2236,6 +2245,7 @@ rspamd_map_backend_dtor (struct rspamd_map_backend *bk)
switch (bk->protocol) {
case MAP_PROTO_FILE:
if (bk->data.fd) {
+ ev_stat_stop (ev_default_loop (0), &bk->data.fd->st_ev);
g_free (bk->data.fd->filename);
g_free (bk->data.fd);
}
diff --git a/src/rspamd.c b/src/rspamd.c
index 765b4bd2b..d07961757 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -719,6 +719,7 @@ kill_old_workers (gpointer key, gpointer value, gpointer unused)
if (!w->wanna_die) {
w->wanna_die = TRUE;
kill (w->pid, SIGUSR2);
+ ev_io_stop (rspamd_main->event_loop, &w->srv_ev);
msg_info_main ("send signal to worker %P", w->pid);
}
else {
@@ -727,9 +728,8 @@ kill_old_workers (gpointer key, gpointer value, gpointer unused)
}
static gboolean
-wait_for_workers (gpointer key, gpointer value, gpointer unused)
+rspamd_worker_wait (struct rspamd_worker *w)
{
- struct rspamd_worker *w = value;
struct rspamd_main *rspamd_main;
gint res = 0;
gboolean nowait = FALSE;
@@ -756,7 +756,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
if (term_attempts > -(TERMINATION_ATTEMPTS * 2)) {
if (term_attempts % 10 == 0) {
msg_info_main ("waiting for worker %s(%P) to sync, "
- "%d seconds remain",
+ "%d seconds remain",
g_quark_to_string (w->type), w->pid,
(TERMINATION_ATTEMPTS * 2 + term_attempts) / 5);
kill (w->pid, SIGTERM);
@@ -768,7 +768,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
}
else {
msg_err_main ("data corruption warning: terminating "
- "special worker %s(%P) with SIGKILL",
+ "special worker %s(%P) with SIGKILL",
g_quark_to_string (w->type), w->pid);
kill (w->pid, SIGKILL);
if (nowait && errno == ESRCH) {
@@ -798,7 +798,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
msg_info_main ("%s process %P terminated %s",
g_quark_to_string (w->type), w->pid,
nowait ? "with no result available" :
- (WTERMSIG (res) == SIGKILL ? "hardly" : "softly"));
+ (WTERMSIG (res) == SIGKILL ? "hardly" : "softly"));
if (w->srv_pipe[0] != -1) {
/* Ugly workaround */
if (w->tmp_data) {
@@ -817,6 +817,25 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
return TRUE;
}
+static gboolean
+hash_worker_wait_callback (gpointer key, gpointer value, gpointer unused)
+{
+ return rspamd_worker_wait ((struct rspamd_worker *)value);
+}
+
+static void
+rspamd_final_cld_handler (EV_P_ ev_signal *w, int revents)
+{
+ struct rspamd_main *rspamd_main = (struct rspamd_main *)w->data;
+ g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback,
+ NULL);
+
+ if (g_hash_table_size (rspamd_main->workers) == 0) {
+ ev_break (rspamd_main->event_loop, EVBREAK_ALL);
+ }
+}
+
+
struct core_check_cbdata {
struct rspamd_config *cfg;
gsize total_count;
@@ -983,6 +1002,15 @@ do_encrypt_password (void)
rspamd_fprintf (stderr, "use rspamadm pw for this operation\n");
}
+static void
+stop_srv_ev (gpointer key, gpointer value, gpointer ud)
+{
+ struct rspamd_worker *cur = (struct rspamd_worker *)value;
+ struct rspamd_main *rspamd_main = (struct rspamd_main *)ud;
+
+ ev_io_stop (rspamd_main->event_loop, &cur->srv_ev);
+}
+
/* Signal handlers */
static void
rspamd_term_handler (struct ev_loop *loop, ev_signal *w, int revents)
@@ -991,6 +1019,8 @@ rspamd_term_handler (struct ev_loop *loop, ev_signal *w, int revents)
msg_info_main ("catch termination signal, waiting for children");
rspamd_log_nolock (rspamd_main->logger);
+ /* Stop srv events to avoid false notifications */
+ g_hash_table_foreach (rspamd_main->workers, stop_srv_ev, rspamd_main);
rspamd_pass_signal (rspamd_main->workers, w->signum);
ev_break (rspamd_main->event_loop, EVBREAK_ALL);
@@ -1033,22 +1063,41 @@ rspamd_cld_handler (EV_P_ ev_signal *w, int revents)
guint i;
gint res = 0;
struct rspamd_worker *cur;
- pid_t wrk;
- gboolean need_refork = TRUE;
+ pid_t pid;
+ gboolean need_refork = TRUE, found_proc = FALSE;
/* Turn off locking for logger */
rspamd_log_nolock (rspamd_main->logger);
- msg_info_main ("catch SIGCHLD signal, finding terminated workers");
+ msg_info_main ("got SIGCHLD signal, finding terminated workers");
/* Remove dead child form children list */
- while ((wrk = waitpid (0, &res, WNOHANG)) > 0) {
+ for (;;) {
+ pid = waitpid (0, &res, WNOHANG);
+
+ if (pid == -1) {
+ if (errno != EINTR) {
+ msg_warn_main ("got unexpected system error when waiting: %s",
+ strerror (errno));
+ break;
+ }
+ else {
+ continue;
+ }
+ }
+ else if (pid == 0) {
+ /* No more processes to wait */
+ break;
+ }
+
+ found_proc = TRUE;
+
if ((cur =
g_hash_table_lookup (rspamd_main->workers,
- GSIZE_TO_POINTER (wrk))) != NULL) {
+ GSIZE_TO_POINTER (pid))) != NULL) {
/* Unlink dead process from queue and hash table */
g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (
- wrk));
+ pid));
if (cur->wanna_die) {
/* Do not refork workers that are intended to be terminated */
@@ -1155,14 +1204,19 @@ rspamd_cld_handler (EV_P_ ev_signal *w, int revents)
}
else {
for (i = 0; i < other_workers->len; i++) {
- if (g_array_index (other_workers, pid_t, i) == wrk) {
+ if (g_array_index (other_workers, pid_t, i) == pid) {
g_array_remove_index_fast (other_workers, i);
- msg_info_main ("related process %P terminated", wrk);
+ msg_info_main ("related process %P terminated", pid);
}
}
}
}
+ if (!found_proc) {
+ msg_err_main ("got SIGCHLD but no workers were able to be waited: %s",
+ strerror (errno));
+ }
+
rspamd_log_lock (rspamd_main->logger);
}
@@ -1173,7 +1227,7 @@ rspamd_final_term_handler (EV_P_ ev_timer *w, int revents)
term_attempts--;
- g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL);
+ g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
if (g_hash_table_size (rspamd_main->workers) == 0) {
ev_break (rspamd_main->event_loop, EVBREAK_ALL);
@@ -1245,8 +1299,8 @@ main (gint argc, gchar **argv, gchar **env)
GQuark type;
rspamd_inet_addr_t *control_addr = NULL;
struct ev_loop *event_loop;
- ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev;
- ev_io control_ev;
+ static ev_signal term_ev, int_ev, cld_ev, hup_ev, usr1_ev;
+ static ev_io control_ev;
struct rspamd_main *rspamd_main;
gboolean skip_pid = FALSE, valgrind_mode = FALSE;
@@ -1568,7 +1622,10 @@ main (gint argc, gchar **argv, gchar **env)
/* Wait for workers termination */
- g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL);
+ ev_signal_init (&cld_ev, rspamd_final_cld_handler, SIGCHLD);
+ ev_signal_start (event_loop, &cld_ev);
+
+ g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
static ev_timer ev_finale;