From 885b63d8457dba1094f465471432d5e2cbdb7dea Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 20 Jun 2019 21:22:12 +0100 Subject: [Project] Another workaround for signals... --- src/client/rspamc.c | 2 +- src/libserver/worker_util.c | 59 ++++++++++-------- src/libserver/worker_util.h | 4 +- src/libutil/map.c | 4 +- src/libutil/map_private.h | 1 + src/lua/lua_cdb.c | 3 +- src/lua/lua_tcp.c | 2 +- src/lua/lua_util.c | 4 +- src/lua/lua_worker.c | 4 +- src/plugins/lua/multimap.lua | 20 +++--- src/rspamd.c | 144 ++++++++++++++++--------------------------- src/rspamd.h | 1 + 12 files changed, 116 insertions(+), 132 deletions(-) diff --git a/src/client/rspamc.c b/src/client/rspamc.c index cc339ef7a..d08a7f620 100644 --- a/src/client/rspamc.c +++ b/src/client/rspamc.c @@ -1903,7 +1903,7 @@ main (gint argc, gchar **argv, gchar **env) } rspamd_init_libs (); - event_loop = ev_default_loop (EVFLAG_SIGNALFD); + event_loop = ev_loop_new (EVFLAG_SIGNALFD|EVBACKEND_ALL); struct rspamd_http_context_cfg http_config; diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c index d849f542e..078de3c8f 100644 --- a/src/libserver/worker_util.c +++ b/src/libserver/worker_util.c @@ -55,6 +55,8 @@ #include #elif defined(HAVE_SYS_UCONTEXT_H) #include +#include + #endif static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *); @@ -206,7 +208,12 @@ rspamd_worker_signal_handle (EV_P_ ev_signal *w, int revents) static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *sigh) { + sigset_t set; + ev_signal_stop (sigh->event_loop, &sigh->ev_sig); + sigemptyset (&set); + sigaddset (&set, sigh->signo); + sigprocmask (SIG_BLOCK, &set, NULL); } static void @@ -271,33 +278,22 @@ rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker, } void -rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base) +rspamd_worker_init_signals (struct rspamd_worker *worker, + struct ev_loop *event_loop) { - struct sigaction signals; - /* A set of terminating signals */ - rspamd_worker_set_signal_handler (SIGTERM, worker, base, + rspamd_worker_set_signal_handler (SIGTERM, worker, event_loop, rspamd_worker_term_handler, NULL); - rspamd_worker_set_signal_handler (SIGINT, worker, base, + rspamd_worker_set_signal_handler (SIGINT, worker, event_loop, rspamd_worker_term_handler, NULL); - rspamd_worker_set_signal_handler (SIGHUP, worker, base, + rspamd_worker_set_signal_handler (SIGHUP, worker, event_loop, rspamd_worker_term_handler, NULL); /* Special purpose signals */ - rspamd_worker_set_signal_handler (SIGUSR1, worker, base, + rspamd_worker_set_signal_handler (SIGUSR1, worker, event_loop, rspamd_worker_usr1_handler, NULL); - rspamd_worker_set_signal_handler (SIGUSR2, worker, base, + rspamd_worker_set_signal_handler (SIGUSR2, worker, event_loop, rspamd_worker_usr2_handler, NULL); - - /* Unblock all signals processed */ - sigemptyset (&signals.sa_mask); - sigaddset (&signals.sa_mask, SIGTERM); - sigaddset (&signals.sa_mask, SIGINT); - sigaddset (&signals.sa_mask, SIGHUP); - sigaddset (&signals.sa_mask, SIGUSR1); - sigaddset (&signals.sa_mask, SIGUSR2); - - sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); } struct ev_loop * @@ -319,7 +315,7 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name, worker->signal_events = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, rspamd_sigh_free); - event_loop = ev_default_loop (EVFLAG_SIGNALFD); + event_loop = ev_loop_new (EVFLAG_SIGNALFD); worker->srv->event_loop = event_loop; @@ -633,11 +629,16 @@ rspamd_worker_on_term (EV_P_ ev_child *w, int revents) { struct rspamd_worker *wrk = (struct rspamd_worker *)w->data; - if (wrk->term_handler) { - wrk->term_handler (EV_A_ w, wrk->srv, wrk); + if (wrk->ppid == getpid ()) { + if (wrk->term_handler) { + wrk->term_handler (EV_A_ w, wrk->srv, wrk); + } + else { + rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus); + } } else { - rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus); + /* Ignore SIGCHLD for not our children... */ } } @@ -696,8 +697,17 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main, evutil_secure_rng_init (); #endif + /* + * Libev stores all signals in a global table, so + * previous handlers must be explicitly detached and forgotten + * before starting a new loop + */ + ev_signal_stop (rspamd_main->event_loop, &rspamd_main->int_ev); + ev_signal_stop (rspamd_main->event_loop, &rspamd_main->term_ev); + ev_signal_stop (rspamd_main->event_loop, &rspamd_main->hup_ev); + ev_signal_stop (rspamd_main->event_loop, &rspamd_main->usr1_ev); /* Remove the inherited event base */ - ev_loop_destroy (EV_DEFAULT); + ev_loop_destroy (rspamd_main->event_loop); rspamd_main->event_loop = NULL; /* Drop privileges */ rspamd_worker_drop_priv (rspamd_main); @@ -1173,7 +1183,7 @@ rspamd_check_termination_clause (struct rspamd_main *rspamd_main, { gboolean need_refork = TRUE; - if (wrk->wanna_die) { + if (wrk->wanna_die || rspamd_main->wanna_die) { /* Do not refork workers that are intended to be terminated */ need_refork = FALSE; } @@ -1183,6 +1193,7 @@ rspamd_check_termination_clause (struct rspamd_main *rspamd_main, msg_info_main ("%s process %P terminated normally", g_quark_to_string (wrk->type), wrk->pid); + need_refork = FALSE; } else { if (WIFSIGNALED (res)) { diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h index 4946badcf..cafe1608f 100644 --- a/src/libserver/worker_util.h +++ b/src/libserver/worker_util.h @@ -33,9 +33,9 @@ struct rspamd_worker_signal_handler; /** * Init basic signals for a worker * @param worker - * @param base + * @param event_loop */ -void rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base); +void rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *event_loop); typedef void (*rspamd_accept_handler)(struct ev_loop *loop, ev_io *w, int revents); diff --git a/src/libutil/map.c b/src/libutil/map.c index eadf0279c..838c7c428 100644 --- a/src/libutil/map.c +++ b/src/libutil/map.c @@ -1940,6 +1940,8 @@ rspamd_map_watch (struct rspamd_config *cfg, } PTR_ARRAY_FOREACH (map->backends, i, bk) { + bk->event_loop = event_loop; + if (bk->protocol == MAP_PROTO_FILE) { struct file_map_data *data; @@ -2245,7 +2247,7 @@ rspamd_map_backend_dtor (struct rspamd_map_backend *bk) switch (bk->protocol) { case MAP_PROTO_FILE: if (bk->data.fd) { - ev_stat_stop (ev_default_loop (0), &bk->data.fd->st_ev); + ev_stat_stop (bk->event_loop, &bk->data.fd->st_ev); g_free (bk->data.fd->filename); g_free (bk->data.fd); } diff --git a/src/libutil/map_private.h b/src/libutil/map_private.h index 455919d15..946fc476d 100644 --- a/src/libutil/map_private.h +++ b/src/libutil/map_private.h @@ -116,6 +116,7 @@ struct rspamd_map_backend { gboolean is_signed; gboolean is_compressed; gboolean is_fallback; + struct ev_loop *event_loop; guint32 id; struct rspamd_cryptobox_pubkey *trusted_pubkey; union rspamd_map_backend_data data; diff --git a/src/lua/lua_cdb.c b/src/lua/lua_cdb.c index 0b8c27b2a..5d4c499a7 100644 --- a/src/lua/lua_cdb.c +++ b/src/lua/lua_cdb.c @@ -50,6 +50,7 @@ lua_cdb_create (lua_State *L) struct cdb *cdb, **pcdb; const gchar *filename; gint fd; + struct ev_loop *ev_base = lua_check_ev_base (L, 2); filename = luaL_checkstring (L, 1); /* If file begins with cdb://, just skip it */ @@ -69,7 +70,7 @@ lua_cdb_create (lua_State *L) lua_pushnil (L); } else { - cdb_add_timer (cdb, ev_default_loop (0), CDB_REFRESH_TIME); + cdb_add_timer (cdb, ev_base, CDB_REFRESH_TIME); pcdb = lua_newuserdata (L, sizeof (struct cdb *)); rspamd_lua_setclass (L, "rspamd{cdb}", -1); *pcdb = cdb; diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c index 4d1c205cf..ab00548a0 100644 --- a/src/lua/lua_tcp.c +++ b/src/lua/lua_tcp.c @@ -1462,7 +1462,7 @@ lua_tcp_request (lua_State *L) event_loop = *(struct ev_loop **)lua_touserdata (L, -1); } else { - event_loop = ev_default_loop (0); + return luaL_error (L, "event loop is required"); } lua_pop (L, 1); diff --git a/src/lua/lua_util.c b/src/lua/lua_util.c index 8e6403972..4eaf7b672 100644 --- a/src/lua/lua_util.c +++ b/src/lua/lua_util.c @@ -699,7 +699,7 @@ lua_util_create_event_base (lua_State *L) pev_base = lua_newuserdata (L, sizeof (struct ev_loop *)); rspamd_lua_setclass (L, "rspamd{ev_base}", -1); - *pev_base = ev_default_loop (EVFLAG_SIGNALFD); + *pev_base = ev_loop_new (EVFLAG_SIGNALFD|EVBACKEND_ALL); return 1; } @@ -848,7 +848,7 @@ lua_util_process_message (lua_State *L) message = luaL_checklstring (L, 2, &mlen); if (cfg != NULL && message != NULL) { - base = ev_loop_new (EVFLAG_SIGNALFD); + base = ev_loop_new (EVFLAG_SIGNALFD|EVBACKEND_ALL); rspamd_init_filters (cfg, FALSE); task = rspamd_task_new (NULL, cfg, NULL, NULL, base); task->msg.begin = rspamd_mempool_alloc (task->task_pool, mlen); diff --git a/src/lua/lua_worker.c b/src/lua/lua_worker.c index 7dbefc6be..73f8baea1 100644 --- a/src/lua/lua_worker.c +++ b/src/lua/lua_worker.c @@ -613,8 +613,8 @@ lua_worker_spawn_process (lua_State *L) close (cbdata->sp[0]); /* Here we assume that we can block on writing results */ rspamd_socket_blocking (cbdata->sp[1]); - ev_loop_destroy (EV_DEFAULT); - cbdata->event_loop = ev_default_loop (EVFLAG_SIGNALFD); + ev_loop_destroy (cbdata->event_loop); + cbdata->event_loop = ev_loop_new (EVFLAG_SIGNALFD); g_hash_table_remove_all (w->signal_events); rspamd_worker_unblock_signals (); rspamd_lua_execute_lua_subprocess (L, cbdata); diff --git a/src/plugins/lua/multimap.lua b/src/plugins/lua/multimap.lua index 68f254c1c..d9fc0b449 100644 --- a/src/plugins/lua/multimap.lua +++ b/src/plugins/lua/multimap.lua @@ -400,6 +400,17 @@ local function multimap_callback(task, rule) local ret = false if r['cdb'] then + if type(r.cdb) == 'string' then + local cdb = rspamd_cdb.create(r.cdb, task:get_ev_base()) + + if not cdb then + rspamd_logger.infox(task, 'cannot open cdb file %s', r.cdb) + + return false + else + r.cdb = cdb + end + end local srch = value if type(value) == 'userdata' then if value.class == 'rspamd{ip}' then @@ -997,13 +1008,8 @@ local function add_multimap_rule(key, newrule) end -- Check cdb flag if type(newrule['map']) == 'string' and string.find(newrule['map'], '^cdb://.*$') then - newrule['cdb'] = cdb.create(newrule['map']) - if newrule['cdb'] then - ret = true - else - rspamd_logger.warnx(rspamd_config, 'Cannot add rule: map doesn\'t exists: %1', - newrule['map']) - end + newrule['cdb'] = newrule['map'] + ret = true elseif type(newrule['map']) == 'string' and string.find(newrule['map'], '^redis://.*$') then if not redis_params then rspamd_logger.infox(rspamd_config, 'no redis servers are specified, ' .. diff --git a/src/rspamd.c b/src/rspamd.c index a1badd635..00995c470 100644 --- a/src/rspamd.c +++ b/src/rspamd.c @@ -714,100 +714,55 @@ kill_old_workers (gpointer key, gpointer value, gpointer unused) } } -static gboolean +static void rspamd_worker_wait (struct rspamd_worker *w) { struct rspamd_main *rspamd_main; - gint res = 0; - gboolean nowait = FALSE; - rspamd_main = w->srv; - if (w->ppid != getpid ()) { - nowait = TRUE; - } - - if (nowait || waitpid (w->pid, &res, WNOHANG) <= 0) { - if (term_attempts < 0) { - if (w->cf->worker->flags & RSPAMD_WORKER_KILLABLE) { - msg_warn_main ("terminate worker %s(%P) with SIGKILL", - g_quark_to_string (w->type), w->pid); - if (kill (w->pid, SIGKILL) == -1) { - if (nowait && errno == ESRCH) { - /* We have actually killed the process */ - goto finished; - } + if (term_attempts < 0) { + if (w->cf->worker->flags & RSPAMD_WORKER_KILLABLE) { + msg_warn_main ("terminate worker %s(%P) with SIGKILL", + g_quark_to_string (w->type), w->pid); + if (kill (w->pid, SIGKILL) == -1) { + if (errno == ESRCH) { + /* We have actually killed the process */ + return; } } - else { - if (term_attempts > -(TERMINATION_ATTEMPTS * 2)) { - if (term_attempts % 10 == 0) { - msg_info_main ("waiting for worker %s(%P) to sync, " - "%d seconds remain", - g_quark_to_string (w->type), w->pid, - (TERMINATION_ATTEMPTS * 2 + term_attempts) / 5); - kill (w->pid, SIGTERM); - if (nowait && errno == ESRCH) { - /* We have actually killed the process */ - goto finished; - } - } - } - else { - msg_err_main ("data corruption warning: terminating " - "special worker %s(%P) with SIGKILL", - g_quark_to_string (w->type), w->pid); - kill (w->pid, SIGKILL); - if (nowait && errno == ESRCH) { + } + else { + if (term_attempts > -(TERMINATION_ATTEMPTS * 2)) { + if (term_attempts % 10 == 0) { + msg_info_main ("waiting for worker %s(%P) to sync, " + "%d seconds remain", + g_quark_to_string (w->type), w->pid, + (TERMINATION_ATTEMPTS * 2 + term_attempts) / 5); + kill (w->pid, SIGTERM); + if (errno == ESRCH) { /* We have actually killed the process */ - goto finished; + return; } } } - } - else if (nowait) { - kill (w->pid, 0); - - if (errno != ESRCH) { - return FALSE; - } else { - goto finished; + msg_err_main ("data corruption warning: terminating " + "special worker %s(%P) with SIGKILL", + g_quark_to_string (w->type), w->pid); + kill (w->pid, SIGKILL); + if (errno == ESRCH) { + /* We have actually killed the process */ + return; + } } } - - return FALSE; - } - - - - finished: - msg_info_main ("%s process %P terminated %s", - g_quark_to_string (w->type), w->pid, - nowait ? "with no result available" : - (WTERMSIG (res) == SIGKILL ? "hardly" : "softly")); - if (w->srv_pipe[0] != -1) { - /* Ugly workaround */ - if (w->tmp_data) { - g_free (w->tmp_data); - } - ev_io_stop (rspamd_main->event_loop, &w->srv_ev); } - - if (w->finish_actions) { - g_ptr_array_free (w->finish_actions, TRUE); - } - - REF_RELEASE (w->cf); - g_free (w); - - return TRUE; } -static gboolean +static void hash_worker_wait_callback (gpointer key, gpointer value, gpointer unused) { - return rspamd_worker_wait ((struct rspamd_worker *)value); + rspamd_worker_wait ((struct rspamd_worker *)value); } struct core_check_cbdata { @@ -992,7 +947,7 @@ rspamd_final_timer_handler (EV_P_ ev_timer *w, int revents) term_attempts--; - g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL); + g_hash_table_foreach (rspamd_main->workers, hash_worker_wait_callback, NULL); if (g_hash_table_size (rspamd_main->workers) == 0) { ev_break (rspamd_main->event_loop, EVBREAK_ALL); @@ -1076,9 +1031,9 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main, gboolean need_refork; /* Turn off locking for logger */ + ev_child_stop (EV_A_ w); rspamd_log_nolock (rspamd_main->logger); - msg_info_main ("got SIGCHLD signal, finding terminated workers"); /* Remove dead child form children list */ g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (wrk->pid)); if (wrk->srv_pipe[0] != -1) { @@ -1105,9 +1060,17 @@ rspamd_cld_handler (EV_P_ ev_child *w, struct rspamd_main *rspamd_main, if (need_refork) { /* Fork another worker in replace of dead one */ + msg_info_main ("respawn process %s in lieu of terminated process with pid %P", + g_quark_to_string (wrk->type), + wrk->pid); rspamd_check_core_limits (rspamd_main); rspamd_fork_delayed (wrk->cf, wrk->index, rspamd_main); } + else { + msg_info_main ("do not respawn process %s after found terminated process with pid %P", + g_quark_to_string (wrk->type), + wrk->pid); + } g_free (wrk); rspamd_log_lock (rspamd_main->logger); @@ -1178,7 +1141,6 @@ main (gint argc, gchar **argv, gchar **env) GQuark type; rspamd_inet_addr_t *control_addr = NULL; struct ev_loop *event_loop; - static ev_signal term_ev, int_ev, hup_ev, usr1_ev; struct rspamd_main *rspamd_main; gboolean skip_pid = FALSE; @@ -1429,28 +1391,28 @@ main (gint argc, gchar **argv, gchar **env) rspamd_main->workers = g_hash_table_new (g_direct_hash, g_direct_equal); /* Init event base */ - event_loop = ev_default_loop (EVFLAG_SIGNALFD); + event_loop = ev_default_loop (EVFLAG_SIGNALFD|EVBACKEND_ALL); rspamd_main->event_loop = event_loop; /* Unblock signals */ sigemptyset (&signals.sa_mask); sigprocmask (SIG_SETMASK, &signals.sa_mask, NULL); /* Set events for signals */ - ev_signal_init (&term_ev, rspamd_term_handler, SIGTERM); - term_ev.data = rspamd_main; - ev_signal_start (event_loop, &term_ev); + ev_signal_init (&rspamd_main->term_ev, rspamd_term_handler, SIGTERM); + rspamd_main->term_ev.data = rspamd_main; + ev_signal_start (event_loop, &rspamd_main->term_ev); - ev_signal_init (&int_ev, rspamd_term_handler, SIGINT); - int_ev.data = rspamd_main; - ev_signal_start (event_loop, &int_ev); + ev_signal_init (&rspamd_main->int_ev, rspamd_term_handler, SIGINT); + rspamd_main->int_ev.data = rspamd_main; + ev_signal_start (event_loop, &rspamd_main->int_ev); - ev_signal_init (&hup_ev, rspamd_hup_handler, SIGHUP); - hup_ev.data = rspamd_main; - ev_signal_start (event_loop, &hup_ev); + ev_signal_init (&rspamd_main->hup_ev, rspamd_hup_handler, SIGHUP); + rspamd_main->hup_ev.data = rspamd_main; + ev_signal_start (event_loop, &rspamd_main->hup_ev); - ev_signal_init (&usr1_ev, rspamd_usr1_handler, SIGUSR1); - usr1_ev.data = rspamd_main; - ev_signal_start (event_loop, &usr1_ev); + ev_signal_init (&rspamd_main->usr1_ev, rspamd_usr1_handler, SIGUSR1); + rspamd_main->usr1_ev.data = rspamd_main; + ev_signal_start (event_loop, &rspamd_main->usr1_ev); rspamd_check_core_limits (rspamd_main); rspamd_mempool_lock_mutex (rspamd_main->start_mtx); diff --git a/src/rspamd.h b/src/rspamd.h index 97e4c4c6a..fff373397 100644 --- a/src/rspamd.h +++ b/src/rspamd.h @@ -289,6 +289,7 @@ struct rspamd_main { gboolean cores_throttling; /**< turn off cores when limits are exceeded */ struct roll_history *history; /**< rolling history */ struct ev_loop *event_loop; + ev_signal term_ev, int_ev, hup_ev, usr1_ev; /**< signals */ struct rspamd_http_context *http_ctx; }; -- cgit v1.2.3