aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-20 21:22:12 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-06-22 10:57:29 +0100
commit885b63d8457dba1094f465471432d5e2cbdb7dea (patch)
tree19a8da32489e5bdfc8eda42843bf0167a77bb813 /src
parent390620fc357bfdb9e7f20835e3c61e857e3a5da2 (diff)
downloadrspamd-885b63d8457dba1094f465471432d5e2cbdb7dea.tar.gz
rspamd-885b63d8457dba1094f465471432d5e2cbdb7dea.zip
[Project] Another workaround for signals...
Diffstat (limited to 'src')
-rw-r--r--src/client/rspamc.c2
-rw-r--r--src/libserver/worker_util.c59
-rw-r--r--src/libserver/worker_util.h4
-rw-r--r--src/libutil/map.c4
-rw-r--r--src/libutil/map_private.h1
-rw-r--r--src/lua/lua_cdb.c3
-rw-r--r--src/lua/lua_tcp.c2
-rw-r--r--src/lua/lua_util.c4
-rw-r--r--src/lua/lua_worker.c4
-rw-r--r--src/plugins/lua/multimap.lua20
-rw-r--r--src/rspamd.c144
-rw-r--r--src/rspamd.h1
12 files changed, 116 insertions, 132 deletions
diff --git a/src/client/rspamc.c b/src/client/rspamc.c
index cc339ef7a..d08a7f620 100644
--- a/src/client/rspamc.c
+++ b/src/client/rspamc.c
@@ -1903,7 +1903,7 @@ main (gint argc, gchar **argv, gchar **env)
}
rspamd_init_libs ();
- event_loop = ev_default_loop (EVFLAG_SIGNALFD);
+ event_loop = ev_loop_new (EVFLAG_SIGNALFD|EVBACKEND_ALL);
struct rspamd_http_context_cfg http_config;
diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c
index d849f542e..078de3c8f 100644
--- a/src/libserver/worker_util.c
+++ b/src/libserver/worker_util.c
@@ -55,6 +55,8 @@
#include <ucontext.h>
#elif defined(HAVE_SYS_UCONTEXT_H)
#include <sys/ucontext.h>
+#include <ev.h>
+
#endif
static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *);
@@ -206,7 +208,12 @@ rspamd_worker_signal_handle (EV_P_ ev_signal *w, int revents)
static void
rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *sigh)
{
+ sigset_t set;
+
ev_signal_stop (sigh->event_loop, &sigh->ev_sig);
+ sigemptyset (&set);
+ sigaddset (&set, sigh->signo);
+ sigprocmask (SIG_BLOCK, &set, NULL);
}
static void
@@ -271,33 +278,22 @@ rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker,
}
void
-rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base)
+rspamd_worker_init_signals (struct rspamd_worker *worker,
+ struct ev_loop *event_loop)
{
- struct sigaction signals;
-
/* A set of terminating signals */
- rspamd_worker_set_signal_handler (SIGTERM, worker, base,
+ rspamd_worker_set_signal_handler (SIGTERM, worker, event_loop,
rspamd_worker_term_handler, NULL);
- rspamd_worker_set_signal_handler (SIGINT, worker, base,
+ rspamd_worker_set_signal_handler (SIGINT, worker, event_loop,
rspamd_worker_term_handler, NULL);
- rspamd_worker_set_signal_handler (SIGHUP, worker, base,
+ rspamd_worker_set_signal_handler (SIGHUP, worker, event_loop,
rspamd_worker_term_handler, NULL);
/* Special purpose signals */
- rspamd_worker_set_signal_handler (SIGUSR1, worker, base,
+ rspamd_worker_set_signal_handler (SIGUSR1, worker, event_loop,
rspamd_worker_usr1_handler, NULL);
- rspamd_worker_set_signal_handler (SIGUSR2, worker, base,
+ rspamd_worker_set_signal_handler (SIGUSR2, worker, event_loop,
rspamd_worker_usr2_handler, NULL);
-
- /* Unblock all signals processed */
- sigemptyset (&signals.sa_mask);
- sigaddset (&signals.sa_mask, SIGTERM);
- sigaddset (&signals.sa_mask, SIGINT);
- sigaddset (&signals.sa_mask, SIGHUP);
- sigaddset (&signals.sa_mask, SIGUSR1);
- sigaddset (&signals.sa_mask, SIGUSR2);
-
- sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
}
struct ev_loop *
@@ -319,7 +315,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
worker->signal_events = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, rspamd_sigh_free);
- event_loop = ev_default_loop (EVFLAG_SIGNALFD);
+ event_loop = ev_loop_new (EVFLAG_SIGNALFD);
worker->srv->event_loop = event_loop;
@@ -633,11 +629,16 @@ rspamd_worker_on_term (EV_P_ ev_child *w, int revents)
{
struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
- if (wrk->term_handler) {
- wrk->term_handler (EV_A_ w, wrk->srv, wrk);
+ if (wrk->ppid == getpid ()) {
+ if (wrk->term_handler) {
+ wrk->term_handler (EV_A_ w, wrk->srv, wrk);
+ }
+ else {
+ rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
+ }
}
else {
- rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
+ /* Ignore SIGCHLD for not our children... */
}
}
@@ -696,8 +697,17 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
evutil_secure_rng_init ();
#endif
+ /*
+ * Libev stores all signals in a global table, so
+ * previous handlers must be explicitly detached and forgotten
+ * before starting a new loop
+ */
+ ev_signal_stop (rspamd_main->event_loop, &rspamd_main->int_ev);
+ ev_signal_stop (rspamd_main->event_loop, &rspamd_main->term_ev);
+ ev_signal_stop (rspamd_main->event_loop, &rspamd_main->hup_ev);
+ ev_signal_stop (rspamd_main->event_loop, &rspamd_main->usr1_ev);
/* Remove the inherited event base */
- ev_loop_destroy (EV_DEFAULT);
+ ev_loop_destroy (rspamd_main->event_loop);
rspamd_main->event_loop = NULL;
/* Drop privileges */
rspamd_worker_drop_priv (rspamd_main);
@@ -1173,7 +1183,7 @@ rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
{
gboolean need_refork = TRUE;
- if (wrk->wanna_die) {
+ if (wrk->wanna_die || rspamd_main->wanna_die) {
/* Do not refork workers that are intended to be terminated */
need_refork = FALSE;
}
@@ -1183,6 +1193,7 @@ rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
msg_info_main ("%s process %P terminated normally",
g_quark_to_string (wrk->type),
wrk->pid);
+ need_refork = FALSE;
}
else {
if (WIFSIGNALED (res)) {
diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h
index 4946badcf..cafe1608f 100644
--- a/src/libserver/worker_util.h
+++ b/src/libserver/worker_util.h
@@ -33,9 +33,9 @@ struct rspamd_worker_signal_handler;
/**
* Init basic signals for a worker
* @param worker
- * @param base
+ * @param event_loop
*/
-void rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base);
+void rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *event_loop);
typedef void (*rspamd_accept_handler)(struct ev_loop *loop, ev_io *w, int revents);
diff --git a/src/libutil/map.c b/src/libutil/map.c
index eadf0279c..838c7c428 100644
--- a/src/libutil/map.c
+++ b/src/libutil/map.c
@@ -1940,6 +1940,8 @@ rspamd_map_watch (struct rspamd_config *cfg,
}
PTR_ARRAY_FOREACH (map->backends, i, bk) {
+ bk->event_loop = event_loop;
+
if (bk->protocol == MAP_PROTO_FILE) {
struct file_map_data *data;
@@ -2245,7 +2247,7 @@ rspamd_map_backend_dtor (struct rspamd_map_backend *bk)
switch (bk->protocol) {
case MAP_PROTO_FILE:
if (bk->data.fd) {
- ev_stat_stop (ev_default_loop (0), &bk->data.fd->st_ev);
+ ev_stat_stop (bk->event_loop, &bk->data.fd->st_ev);
g_free (bk->data.fd->filename);
g_free (bk->data.fd);
}
diff --git a/src/libutil/map_private.h b/src/libutil/map_private.h
index 455919d15..946fc476d 100644
--- a/src/libutil/map_private.h
+++ b/src/libutil/map_private.h
@@ -116,6 +116,7 @@ struct rspamd_map_backend {
gboolean is_signed;
gboolean is_compressed;
gboolean is_fallback;
+ struct ev_loop *event_loop;
guint32 id;
struct rspamd_cryptobox_pubkey *trusted_pubkey;
union rspamd_map_backend_data data;
diff --git a/src/lua/lua_cdb.c b/src/lua/lua_cdb.c
index 0b8c27b2a..5d4c499a7 100644
--- a/src/lua/lua_cdb.c
+++ b/src/lua/lua_cdb.c
@@ -50,6 +50,7 @@ lua_cdb_create (lua_State *L)
struct cdb *cdb, **pcdb;
const gchar *filename;
gint fd;
+ struct ev_loop *ev_base = lua_check_ev_base (L, 2);
filename = luaL_checkstring (L, 1);
/* If file begins with cdb://, just skip it */
@@ -69,7 +70,7 @@ lua_cdb_create (lua_State *L)
lua_pushnil (L);
}
else {
- cdb_add_timer (cdb, ev_default_loop (0), CDB_REFRESH_TIME);
+ cdb_add_timer (cdb, ev_base, CDB_REFRESH_TIME);
pcdb = lua_newuserdata (L, sizeof (struct cdb *));
rspamd_lua_setclass (L, "rspamd{cdb}", -1);
*pcdb = cdb;
diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c
index 4d1c205cf..ab00548a0 100644
--- a/src/lua/lua_tcp.c
+++ b/src/lua/lua_tcp.c
@@ -1462,7 +1462,7 @@ lua_tcp_request (lua_State *L)
event_loop = *(struct ev_loop **)lua_touserdata (L, -1);
}
else {
- event_loop = ev_default_loop (0);
+ return luaL_error (L, "event loop is required");
}
lua_pop (L, 1);
diff --git a/src/lua/lua_util.c b/src/lua/lua_util.c
index 8e6403972..4eaf7b672 100644
--- a/src/lua/lua_util.c
+++ b/src/lua/lua_util.c
@@ -699,7 +699,7 @@ lua_util_create_event_base (lua_State *L)
pev_base = lua_newuserdata (L, sizeof (struct ev_loop *));
rspamd_lua_setclass (L, "rspamd{ev_base}", -1);
- *pev_base = ev_default_loop (EVFLAG_SIGNALFD);
+ *pev_base = ev_loop_new (EVFLAG_SIGNALFD|EVBACKEND_ALL);
return 1;
}
@@ -848,7 +848,7 @@ lua_util_process_message (lua_State *L)
message = luaL_checklstring (L, 2, &mlen);
if (cfg != NULL && message != NULL) {
- base = ev_loop_new (EVFLAG_SIGNALFD);
+ base = ev_loop_new (EVFLAG_SIGNALFD|EVBACKEND_ALL);
rspamd_init_filters (cfg, FALSE);
task = rspamd_task_new (NULL, cfg, NULL, NULL, base);
task->msg.begin = rspamd_mempool_alloc (task->task_pool, mlen);
diff --git a/src/lua/lua_worker.c b/src/lua/lua_worker.c
index 7dbefc6be..73f8baea1 100644
--- a/src/lua/lua_worker.c
+++ b/src/lua/lua_worker.c
@@ -613,8 +613,8 @@ lua_worker_spawn_process (lua_State *L)
close (cbdata->sp[0]);
/* Here we assume that we can block on writing results */
rspamd_socket_blocking (cbdata->sp[1]);
- ev_loop_destroy (EV_DEFAULT);
- cbdata->event_loop = ev_default_loop (EVFLAG_SIGNALFD);
+ ev_loop_destroy (cbdata->event_loop);
+ cbdata->event_loop = ev_loop_new (EVFLAG_SIGNALFD);
g_hash_table_remove_all (w->signal_events);
rspamd_worker_unblock_signals ();
rspamd_lua_execute_lua_subprocess (L, cbdata);
diff --git a/src/plugins/lua/multimap.lua b/src/plugins/lua/multimap.lua
index 68f254c1c..d9fc0b449 100644
--- a/src/plugins/lua/multimap.lua
+++ b/src/plugins/lua/multimap.lua
@@ -400,6 +400,17 @@ local function multimap_callback(task, rule)
local ret = false
if r['cdb'] then
+ if type(r.cdb) == 'string' then
+ local cdb = rspamd_cdb.create(r.cdb, task:get_ev_base())
+
+ if not cdb then
+ rspamd_logger.infox(task, 'cannot open cdb file %s', r.cdb)
+
+ return false
+ else
+ r.cdb = cdb
+ end
+ end
local srch = value
if type(value) == 'userdata' then
if value.class == 'rspamd{ip}' then
@@ -997,13 +1008,8 @@ local function add_multimap_rule(key, newrule)
end
-- Check cdb flag
if type(newrule['map']) == 'string' and string.find(newrule['map'], '^cdb://.*$') then
- newrule['cdb'] = cdb.create(newrule['map'])
- if newrule['cdb'] then
- ret = true
- else
- rspamd_logger.warnx(rspamd_config, 'Cannot add rule: map doesn\'t exists: %1',
- newrule['map'])
- end
+ newrule['cdb'] = newrule['map']
+ ret = true
elseif type(newrule['map']) == 'string' and string.find(newrule['map'], '^redis://.*$') then
if not redis_params then
rspamd_logger.infox(rspamd_config, 'no redis servers are specified, ' ..
diff --git a/src/rspamd.c b/src/rspamd.c
index a1badd635..00995c470 100644
--- a/src/rspamd.c
+++ b/src/rspamd.c
@@ -714,100 +714,55 @@ kill_old_workers (gpointer key, gpointer value, gpointer unused)
}
}
-static gboolean
+static void
rspamd_worker_wait (struct rspamd_worker *w)
{
struct rspamd_main *rspamd_main;
- gint res = 0;
- gboolean nowait = FALSE;
-
rspamd_main = w->srv;
- if (w->ppid != getpid ()) {
- nowait = TRUE;
- }
-
- if (nowait || waitpid (w->pid, &res, WNOHANG) <= 0) {
- if (term_attempts < 0) {
- if (w->cf->worker->flags & RSPAMD_WORKER_KILLABLE) {
- msg_warn_main ("terminate worker %s(%P) with SIGKILL",
- g_quark_to_string (w->type), w->pid);
- if (kill (w->pid, SIGKILL) == -1) {
- if (nowait && errno == ESRCH) {
- /* We have actually killed the process */
- goto finished;
- }
+ if (term_attempts < 0) {
+ if (w->cf->worker->flags & RSPAMD_WORKER_KILLABLE) {
+ msg_warn_main ("terminate worker %s(%P) with SIGKILL",
+ g_quark_to_string (w->type), w->pid);
+ if (kill (w->pid, SIGKILL) == -1) {
+ if (errno == ESRCH) {
+ /* We have actually killed the process */
+ return;
}
}
- else {
- if (term_attempts > -(TERMINATION_ATTEMPTS * 2)) {
- if (term_attempts % 10 == 0) {
- msg_info_main ("waiting for worker %s(%P) to sync, "
- "%d seconds remain",
- g_quark_to_string (w->type), w->pid,
- (TERMINATION_ATTEMPTS * 2 + term_attempts) / 5);
- kill (w->pid, SIGTERM);
- if (nowait && errno == ESRCH) {
- /* We have actually killed the process */
- goto finished;
- }
- }
- }
- else {
- msg_err_main ("data corruption warning: terminating "
- "special worker %s(%P) with SIGKILL",
- g_quark_to_string (w->type), w->pid);
- kill (w->pid, SIGKILL);
- if (nowait && errno == ESRCH) {
+ }
+ else {
+ if (term_attempts > -(TERMINATION_ATTEMPTS * 2)) {
+ if (term_attempts % 10 == 0) {
+ msg_info_main ("waiting for worker %s(%P) to sync, "
+ "%d seconds remain",
+ g_quark_to_string (w->type), w->pid,
+ (TERMINATION_ATTEMPTS * 2 + term_attempts) / 5);
+ kill (w->pid, SIGTERM);
+ if (errno == ESRCH) {
/* We have actually killed the process */
- goto finished;
+ return;
}
}
}
- }
- else if (nowait) {
- kill (w->pid, 0);
-
- if (errno != ESRCH) {
- return FALSE;
- }
else {
- goto finished;
+ msg_err_main ("data corruption warning: terminating "
+ "special worker %s(%P) with SIGKILL",
+ g_quark_to_string (w->type), w->pid);
+ kill (w->pid, SIGKILL);
+ if (errno == ESRCH) {
+ /* We have actually killed the process */
+ return;
+ }
}
}
-
- return FALSE;
- }
-
-
-
- finished:
- msg_info_main ("%s process %P terminated %s",
- g_quark_to_string (w->type), w->pid,
- nowait ? "with no result available" :
- (WTERMSIG (res) == SIGKILL ? "hardly" : "softly"));
- if (w->srv_pipe[0] != -1) {
- /* Ugly workaround */
- if (w->tmp_data) {
- g_free (w->tmp_data);
- }
- ev_io_stop (rspamd_main->event_loop, &w->srv_ev);
}
-
- if (w->finish_actions) {
- g_ptr_array_free (w->finish_actions, TRUE);
- }
-
- REF_RELEASE (w->cf);
- g_free (w);
-
- return TRUE;
}
-static gboolean
+static void
hash_worker_wait_callback (gpointer key, gpointer value, gpointer unused)
{
- return rspamd_worker_wait ((struct rspamd_worker *)value);
+ rspamd_worker_wait ((struct rspamd_worker *)value);
}
struct core_check_cbdata {
@@ -992,7 +947,7 @@ rspamd_final_timer_handler (EV_P_ ev_timer *w, int revents)
term_attempts--;
- g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
+ g_hash_table_foreach (rspamd_main->workers, hash_worker_wait_callback, NULL);
if (g_hash_table_size (rspamd_main->workers) == 0) {
ev_break (rspamd_main->event_loop, EVBREAK_ALL);
@@ -1076,9 +1031,9 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
gboolean need_refork;
/* Turn off locking for logger */
+ ev_child_stop (EV_A_ w);
rspamd_log_nolock (rspamd_main->logger);
- msg_info_main ("got SIGCHLD signal, finding terminated workers");
/* Remove dead child form children list */
g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (wrk->pid));
if (wrk->srv_pipe[0] != -1) {
@@ -1105,9 +1060,17 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
if (need_refork) {
/* Fork another worker in replace of dead one */
+ msg_info_main ("respawn process %s in lieu of terminated process with pid %P",
+ g_quark_to_string (wrk->type),
+ wrk->pid);
rspamd_check_core_limits (rspamd_main);
rspamd_fork_delayed (wrk->cf, wrk->index, rspamd_main);
}
+ else {
+ msg_info_main ("do not respawn process %s after found terminated process with pid %P",
+ g_quark_to_string (wrk->type),
+ wrk->pid);
+ }
g_free (wrk);
rspamd_log_lock (rspamd_main->logger);
@@ -1178,7 +1141,6 @@ main (gint argc, gchar **argv, gchar **env)
GQuark type;
rspamd_inet_addr_t *control_addr = NULL;
struct ev_loop *event_loop;
- static ev_signal term_ev, int_ev, hup_ev, usr1_ev;
struct rspamd_main *rspamd_main;
gboolean skip_pid = FALSE;
@@ -1429,28 +1391,28 @@ main (gint argc, gchar **argv, gchar **env)
rspamd_main->workers = g_hash_table_new (g_direct_hash, g_direct_equal);
/* Init event base */
- event_loop = ev_default_loop (EVFLAG_SIGNALFD);
+ event_loop = ev_default_loop (EVFLAG_SIGNALFD|EVBACKEND_ALL);
rspamd_main->event_loop = event_loop;
/* Unblock signals */
sigemptyset (&signals.sa_mask);
sigprocmask (SIG_SETMASK, &signals.sa_mask, NULL);
/* Set events for signals */
- ev_signal_init (&term_ev, rspamd_term_handler, SIGTERM);
- term_ev.data = rspamd_main;
- ev_signal_start (event_loop, &term_ev);
+ ev_signal_init (&rspamd_main->term_ev, rspamd_term_handler, SIGTERM);
+ rspamd_main->term_ev.data = rspamd_main;
+ ev_signal_start (event_loop, &rspamd_main->term_ev);
- ev_signal_init (&int_ev, rspamd_term_handler, SIGINT);
- int_ev.data = rspamd_main;
- ev_signal_start (event_loop, &int_ev);
+ ev_signal_init (&rspamd_main->int_ev, rspamd_term_handler, SIGINT);
+ rspamd_main->int_ev.data = rspamd_main;
+ ev_signal_start (event_loop, &rspamd_main->int_ev);
- ev_signal_init (&hup_ev, rspamd_hup_handler, SIGHUP);
- hup_ev.data = rspamd_main;
- ev_signal_start (event_loop, &hup_ev);
+ ev_signal_init (&rspamd_main->hup_ev, rspamd_hup_handler, SIGHUP);
+ rspamd_main->hup_ev.data = rspamd_main;
+ ev_signal_start (event_loop, &rspamd_main->hup_ev);
- ev_signal_init (&usr1_ev, rspamd_usr1_handler, SIGUSR1);
- usr1_ev.data = rspamd_main;
- ev_signal_start (event_loop, &usr1_ev);
+ ev_signal_init (&rspamd_main->usr1_ev, rspamd_usr1_handler, SIGUSR1);
+ rspamd_main->usr1_ev.data = rspamd_main;
+ ev_signal_start (event_loop, &rspamd_main->usr1_ev);
rspamd_check_core_limits (rspamd_main);
rspamd_mempool_lock_mutex (rspamd_main->start_mtx);
diff --git a/src/rspamd.h b/src/rspamd.h
index 97e4c4c6a..fff373397 100644
--- a/src/rspamd.h
+++ b/src/rspamd.h
@@ -289,6 +289,7 @@ struct rspamd_main {
gboolean cores_throttling; /**< turn off cores when limits are exceeded */
struct roll_history *history; /**< rolling history */
struct ev_loop *event_loop;
+ ev_signal term_ev, int_ev, hup_ev, usr1_ev; /**< signals */
struct rspamd_http_context *http_ctx;
};