}
rspamd_init_libs ();
- event_loop = ev_default_loop (EVFLAG_SIGNALFD);
+ event_loop = ev_loop_new (EVFLAG_SIGNALFD|EVBACKEND_ALL);
struct rspamd_http_context_cfg http_config;
#include <ucontext.h>
#elif defined(HAVE_SYS_UCONTEXT_H)
#include <sys/ucontext.h>
+#include <ev.h>
+
#endif
static void rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *);
static void
rspamd_worker_ignore_signal (struct rspamd_worker_signal_handler *sigh)
{
+ sigset_t set;
+
ev_signal_stop (sigh->event_loop, &sigh->ev_sig);
+ sigemptyset (&set);
+ sigaddset (&set, sigh->signo);
+ sigprocmask (SIG_BLOCK, &set, NULL);
}
static void
}
void
-rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base)
+rspamd_worker_init_signals (struct rspamd_worker *worker,
+ struct ev_loop *event_loop)
{
- struct sigaction signals;
-
/* A set of terminating signals */
- rspamd_worker_set_signal_handler (SIGTERM, worker, base,
+ rspamd_worker_set_signal_handler (SIGTERM, worker, event_loop,
rspamd_worker_term_handler, NULL);
- rspamd_worker_set_signal_handler (SIGINT, worker, base,
+ rspamd_worker_set_signal_handler (SIGINT, worker, event_loop,
rspamd_worker_term_handler, NULL);
- rspamd_worker_set_signal_handler (SIGHUP, worker, base,
+ rspamd_worker_set_signal_handler (SIGHUP, worker, event_loop,
rspamd_worker_term_handler, NULL);
/* Special purpose signals */
- rspamd_worker_set_signal_handler (SIGUSR1, worker, base,
+ rspamd_worker_set_signal_handler (SIGUSR1, worker, event_loop,
rspamd_worker_usr1_handler, NULL);
- rspamd_worker_set_signal_handler (SIGUSR2, worker, base,
+ rspamd_worker_set_signal_handler (SIGUSR2, worker, event_loop,
rspamd_worker_usr2_handler, NULL);
-
- /* Unblock all signals processed */
- sigemptyset (&signals.sa_mask);
- sigaddset (&signals.sa_mask, SIGTERM);
- sigaddset (&signals.sa_mask, SIGINT);
- sigaddset (&signals.sa_mask, SIGHUP);
- sigaddset (&signals.sa_mask, SIGUSR1);
- sigaddset (&signals.sa_mask, SIGUSR2);
-
- sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
}
struct ev_loop *
worker->signal_events = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, rspamd_sigh_free);
- event_loop = ev_default_loop (EVFLAG_SIGNALFD);
+ event_loop = ev_loop_new (EVFLAG_SIGNALFD);
worker->srv->event_loop = event_loop;
{
struct rspamd_worker *wrk = (struct rspamd_worker *)w->data;
- if (wrk->term_handler) {
- wrk->term_handler (EV_A_ w, wrk->srv, wrk);
+ if (wrk->ppid == getpid ()) {
+ if (wrk->term_handler) {
+ wrk->term_handler (EV_A_ w, wrk->srv, wrk);
+ }
+ else {
+ rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
+ }
}
else {
- rspamd_check_termination_clause (wrk->srv, wrk, w->rstatus);
+ /* Ignore SIGCHLD for not our children... */
}
}
evutil_secure_rng_init ();
#endif
+ /*
+ * Libev stores all signals in a global table, so
+ * previous handlers must be explicitly detached and forgotten
+ * before starting a new loop
+ */
+ ev_signal_stop (rspamd_main->event_loop, &rspamd_main->int_ev);
+ ev_signal_stop (rspamd_main->event_loop, &rspamd_main->term_ev);
+ ev_signal_stop (rspamd_main->event_loop, &rspamd_main->hup_ev);
+ ev_signal_stop (rspamd_main->event_loop, &rspamd_main->usr1_ev);
/* Remove the inherited event base */
- ev_loop_destroy (EV_DEFAULT);
+ ev_loop_destroy (rspamd_main->event_loop);
rspamd_main->event_loop = NULL;
/* Drop privileges */
rspamd_worker_drop_priv (rspamd_main);
{
gboolean need_refork = TRUE;
- if (wrk->wanna_die) {
+ if (wrk->wanna_die || rspamd_main->wanna_die) {
/* Do not refork workers that are intended to be terminated */
need_refork = FALSE;
}
msg_info_main ("%s process %P terminated normally",
g_quark_to_string (wrk->type),
wrk->pid);
+ need_refork = FALSE;
}
else {
if (WIFSIGNALED (res)) {
/**
* Init basic signals for a worker
* @param worker
- * @param base
+ * @param event_loop
*/
-void rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *base);
+void rspamd_worker_init_signals (struct rspamd_worker *worker, struct ev_loop *event_loop);
typedef void (*rspamd_accept_handler)(struct ev_loop *loop, ev_io *w, int revents);
}
PTR_ARRAY_FOREACH (map->backends, i, bk) {
+ bk->event_loop = event_loop;
+
if (bk->protocol == MAP_PROTO_FILE) {
struct file_map_data *data;
switch (bk->protocol) {
case MAP_PROTO_FILE:
if (bk->data.fd) {
- ev_stat_stop (ev_default_loop (0), &bk->data.fd->st_ev);
+ ev_stat_stop (bk->event_loop, &bk->data.fd->st_ev);
g_free (bk->data.fd->filename);
g_free (bk->data.fd);
}
gboolean is_signed;
gboolean is_compressed;
gboolean is_fallback;
+ struct ev_loop *event_loop;
guint32 id;
struct rspamd_cryptobox_pubkey *trusted_pubkey;
union rspamd_map_backend_data data;
struct cdb *cdb, **pcdb;
const gchar *filename;
gint fd;
+ struct ev_loop *ev_base = lua_check_ev_base (L, 2);
filename = luaL_checkstring (L, 1);
/* If file begins with cdb://, just skip it */
lua_pushnil (L);
}
else {
- cdb_add_timer (cdb, ev_default_loop (0), CDB_REFRESH_TIME);
+ cdb_add_timer (cdb, ev_base, CDB_REFRESH_TIME);
pcdb = lua_newuserdata (L, sizeof (struct cdb *));
rspamd_lua_setclass (L, "rspamd{cdb}", -1);
*pcdb = cdb;
event_loop = *(struct ev_loop **)lua_touserdata (L, -1);
}
else {
- event_loop = ev_default_loop (0);
+ return luaL_error (L, "event loop is required");
}
lua_pop (L, 1);
pev_base = lua_newuserdata (L, sizeof (struct ev_loop *));
rspamd_lua_setclass (L, "rspamd{ev_base}", -1);
- *pev_base = ev_default_loop (EVFLAG_SIGNALFD);
+ *pev_base = ev_loop_new (EVFLAG_SIGNALFD|EVBACKEND_ALL);
return 1;
}
message = luaL_checklstring (L, 2, &mlen);
if (cfg != NULL && message != NULL) {
- base = ev_loop_new (EVFLAG_SIGNALFD);
+ base = ev_loop_new (EVFLAG_SIGNALFD|EVBACKEND_ALL);
rspamd_init_filters (cfg, FALSE);
task = rspamd_task_new (NULL, cfg, NULL, NULL, base);
task->msg.begin = rspamd_mempool_alloc (task->task_pool, mlen);
close (cbdata->sp[0]);
/* Here we assume that we can block on writing results */
rspamd_socket_blocking (cbdata->sp[1]);
- ev_loop_destroy (EV_DEFAULT);
- cbdata->event_loop = ev_default_loop (EVFLAG_SIGNALFD);
+ ev_loop_destroy (cbdata->event_loop);
+ cbdata->event_loop = ev_loop_new (EVFLAG_SIGNALFD);
g_hash_table_remove_all (w->signal_events);
rspamd_worker_unblock_signals ();
rspamd_lua_execute_lua_subprocess (L, cbdata);
local ret = false
if r['cdb'] then
+ if type(r.cdb) == 'string' then
+ local cdb = rspamd_cdb.create(r.cdb, task:get_ev_base())
+
+ if not cdb then
+ rspamd_logger.infox(task, 'cannot open cdb file %s', r.cdb)
+
+ return false
+ else
+ r.cdb = cdb
+ end
+ end
local srch = value
if type(value) == 'userdata' then
if value.class == 'rspamd{ip}' then
end
-- Check cdb flag
if type(newrule['map']) == 'string' and string.find(newrule['map'], '^cdb://.*$') then
- newrule['cdb'] = cdb.create(newrule['map'])
- if newrule['cdb'] then
- ret = true
- else
- rspamd_logger.warnx(rspamd_config, 'Cannot add rule: map doesn\'t exists: %1',
- newrule['map'])
- end
+ newrule['cdb'] = newrule['map']
+ ret = true
elseif type(newrule['map']) == 'string' and string.find(newrule['map'], '^redis://.*$') then
if not redis_params then
rspamd_logger.infox(rspamd_config, 'no redis servers are specified, ' ..
}
}
-static gboolean
+static void
rspamd_worker_wait (struct rspamd_worker *w)
{
struct rspamd_main *rspamd_main;
- gint res = 0;
- gboolean nowait = FALSE;
-
rspamd_main = w->srv;
- if (w->ppid != getpid ()) {
- nowait = TRUE;
- }
-
- if (nowait || waitpid (w->pid, &res, WNOHANG) <= 0) {
- if (term_attempts < 0) {
- if (w->cf->worker->flags & RSPAMD_WORKER_KILLABLE) {
- msg_warn_main ("terminate worker %s(%P) with SIGKILL",
- g_quark_to_string (w->type), w->pid);
- if (kill (w->pid, SIGKILL) == -1) {
- if (nowait && errno == ESRCH) {
- /* We have actually killed the process */
- goto finished;
- }
+ if (term_attempts < 0) {
+ if (w->cf->worker->flags & RSPAMD_WORKER_KILLABLE) {
+ msg_warn_main ("terminate worker %s(%P) with SIGKILL",
+ g_quark_to_string (w->type), w->pid);
+ if (kill (w->pid, SIGKILL) == -1) {
+ if (errno == ESRCH) {
+ /* We have actually killed the process */
+ return;
}
}
- else {
- if (term_attempts > -(TERMINATION_ATTEMPTS * 2)) {
- if (term_attempts % 10 == 0) {
- msg_info_main ("waiting for worker %s(%P) to sync, "
- "%d seconds remain",
- g_quark_to_string (w->type), w->pid,
- (TERMINATION_ATTEMPTS * 2 + term_attempts) / 5);
- kill (w->pid, SIGTERM);
- if (nowait && errno == ESRCH) {
- /* We have actually killed the process */
- goto finished;
- }
- }
- }
- else {
- msg_err_main ("data corruption warning: terminating "
- "special worker %s(%P) with SIGKILL",
- g_quark_to_string (w->type), w->pid);
- kill (w->pid, SIGKILL);
- if (nowait && errno == ESRCH) {
+ }
+ else {
+ if (term_attempts > -(TERMINATION_ATTEMPTS * 2)) {
+ if (term_attempts % 10 == 0) {
+ msg_info_main ("waiting for worker %s(%P) to sync, "
+ "%d seconds remain",
+ g_quark_to_string (w->type), w->pid,
+ (TERMINATION_ATTEMPTS * 2 + term_attempts) / 5);
+ kill (w->pid, SIGTERM);
+ if (errno == ESRCH) {
/* We have actually killed the process */
- goto finished;
+ return;
}
}
}
- }
- else if (nowait) {
- kill (w->pid, 0);
-
- if (errno != ESRCH) {
- return FALSE;
- }
else {
- goto finished;
+ msg_err_main ("data corruption warning: terminating "
+ "special worker %s(%P) with SIGKILL",
+ g_quark_to_string (w->type), w->pid);
+ kill (w->pid, SIGKILL);
+ if (errno == ESRCH) {
+ /* We have actually killed the process */
+ return;
+ }
}
}
-
- return FALSE;
- }
-
-
-
- finished:
- msg_info_main ("%s process %P terminated %s",
- g_quark_to_string (w->type), w->pid,
- nowait ? "with no result available" :
- (WTERMSIG (res) == SIGKILL ? "hardly" : "softly"));
- if (w->srv_pipe[0] != -1) {
- /* Ugly workaround */
- if (w->tmp_data) {
- g_free (w->tmp_data);
- }
- ev_io_stop (rspamd_main->event_loop, &w->srv_ev);
}
-
- if (w->finish_actions) {
- g_ptr_array_free (w->finish_actions, TRUE);
- }
-
- REF_RELEASE (w->cf);
- g_free (w);
-
- return TRUE;
}
-static gboolean
+static void
hash_worker_wait_callback (gpointer key, gpointer value, gpointer unused)
{
- return rspamd_worker_wait ((struct rspamd_worker *)value);
+ rspamd_worker_wait ((struct rspamd_worker *)value);
}
struct core_check_cbdata {
term_attempts--;
- g_hash_table_foreach_remove (rspamd_main->workers, hash_worker_wait_callback, NULL);
+ g_hash_table_foreach (rspamd_main->workers, hash_worker_wait_callback, NULL);
if (g_hash_table_size (rspamd_main->workers) == 0) {
ev_break (rspamd_main->event_loop, EVBREAK_ALL);
gboolean need_refork;
/* Turn off locking for logger */
+ ev_child_stop (EV_A_ w);
rspamd_log_nolock (rspamd_main->logger);
- msg_info_main ("got SIGCHLD signal, finding terminated workers");
/* Remove dead child form children list */
g_hash_table_remove (rspamd_main->workers, GSIZE_TO_POINTER (wrk->pid));
if (wrk->srv_pipe[0] != -1) {
if (need_refork) {
/* Fork another worker in replace of dead one */
+ msg_info_main ("respawn process %s in lieu of terminated process with pid %P",
+ g_quark_to_string (wrk->type),
+ wrk->pid);
rspamd_check_core_limits (rspamd_main);
rspamd_fork_delayed (wrk->cf, wrk->index, rspamd_main);
}
+ else {
+ msg_info_main ("do not respawn process %s after found terminated process with pid %P",
+ g_quark_to_string (wrk->type),
+ wrk->pid);
+ }
g_free (wrk);
rspamd_log_lock (rspamd_main->logger);
GQuark type;
rspamd_inet_addr_t *control_addr = NULL;
struct ev_loop *event_loop;
- static ev_signal term_ev, int_ev, hup_ev, usr1_ev;
struct rspamd_main *rspamd_main;
gboolean skip_pid = FALSE;
rspamd_main->workers = g_hash_table_new (g_direct_hash, g_direct_equal);
/* Init event base */
- event_loop = ev_default_loop (EVFLAG_SIGNALFD);
+ event_loop = ev_default_loop (EVFLAG_SIGNALFD|EVBACKEND_ALL);
rspamd_main->event_loop = event_loop;
/* Unblock signals */
sigemptyset (&signals.sa_mask);
sigprocmask (SIG_SETMASK, &signals.sa_mask, NULL);
/* Set events for signals */
- ev_signal_init (&term_ev, rspamd_term_handler, SIGTERM);
- term_ev.data = rspamd_main;
- ev_signal_start (event_loop, &term_ev);
+ ev_signal_init (&rspamd_main->term_ev, rspamd_term_handler, SIGTERM);
+ rspamd_main->term_ev.data = rspamd_main;
+ ev_signal_start (event_loop, &rspamd_main->term_ev);
- ev_signal_init (&int_ev, rspamd_term_handler, SIGINT);
- int_ev.data = rspamd_main;
- ev_signal_start (event_loop, &int_ev);
+ ev_signal_init (&rspamd_main->int_ev, rspamd_term_handler, SIGINT);
+ rspamd_main->int_ev.data = rspamd_main;
+ ev_signal_start (event_loop, &rspamd_main->int_ev);
- ev_signal_init (&hup_ev, rspamd_hup_handler, SIGHUP);
- hup_ev.data = rspamd_main;
- ev_signal_start (event_loop, &hup_ev);
+ ev_signal_init (&rspamd_main->hup_ev, rspamd_hup_handler, SIGHUP);
+ rspamd_main->hup_ev.data = rspamd_main;
+ ev_signal_start (event_loop, &rspamd_main->hup_ev);
- ev_signal_init (&usr1_ev, rspamd_usr1_handler, SIGUSR1);
- usr1_ev.data = rspamd_main;
- ev_signal_start (event_loop, &usr1_ev);
+ ev_signal_init (&rspamd_main->usr1_ev, rspamd_usr1_handler, SIGUSR1);
+ rspamd_main->usr1_ev.data = rspamd_main;
+ ev_signal_start (event_loop, &rspamd_main->usr1_ev);
rspamd_check_core_limits (rspamd_main);
rspamd_mempool_lock_mutex (rspamd_main->start_mtx);
gboolean cores_throttling; /**< turn off cores when limits are exceeded */
struct roll_history *history; /**< rolling history */
struct ev_loop *event_loop;
+ ev_signal term_ev, int_ev, hup_ev, usr1_ev; /**< signals */
struct rspamd_http_context *http_ctx;
};