Sfoglia il codice sorgente

Rework signals processing in workers.

tags/0.7.3
Vsevolod Stakhov 9 anni fa
parent
commit
f230a889c1
3 ha cambiato i file con 110 aggiunte e 60 eliminazioni
  1. 1
    3
      src/fuzzy_storage.c
  2. 99
    55
      src/libserver/worker_util.c
  3. 10
    2
      src/main.h

+ 1
- 3
src/fuzzy_storage.c Vedi File

@@ -1009,7 +1009,7 @@ start_fuzzy (struct rspamd_worker *worker)
gint i;

ctx->ev_base = rspamd_prepare_worker (worker,
"controller",
"fuzzy",
accept_fuzzy_socket);
server_stat = worker->srv->stat;

@@ -1023,8 +1023,6 @@ start_fuzzy (struct rspamd_worker *worker)
signal_set (&sev, SIGTERM, sigterm_handler, (void *)worker);
event_base_set (ctx->ev_base, &sev);
signal_add (&sev, NULL);
/* We set wanna_die to 1 as we setup our own SIGTERM handler */
wanna_die = 1;

if (ctx->strict_hash) {
static_hash = g_hash_table_new_full (rspamd_str_hash, rspamd_str_equal,

+ 99
- 55
src/libserver/worker_util.c Vedi File

@@ -26,8 +26,6 @@
#include "message.h"
#include "lua/lua_common.h"

extern struct rspamd_main *rspamd_main;

/**
* Return worker's control structure by its type
* @param type
@@ -48,68 +46,112 @@ rspamd_get_worker_by_type (GQuark type)

return NULL;
}
sig_atomic_t wanna_die = 0;

#ifndef HAVE_SA_SIGINFO
static void
worker_sig_handler (gint signo)
#else
static void
worker_sig_handler (gint signo, siginfo_t * info, void *unused)
#endif
{
struct timeval tv;

switch (signo) {
case SIGINT:
case SIGTERM:
if (!wanna_die) {
wanna_die = 1;
tv.tv_sec = 0;
tv.tv_usec = 0;
event_loopexit (&tv);

#ifdef WITH_GPERF_TOOLS
ProfilerStop ();
#endif
}
break;
}
}
sig_atomic_t wanna_die = 0;

/*
* Config reload is designed by sending sigusr2 to active workers and pending shutdown of them
*/
static void
worker_sigusr2_handler (gint fd, short what, void *arg)
rspamd_worker_usr2_handler (gint fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *) arg;
struct rspamd_worker_signal_handler *sigh =
(struct rspamd_worker_signal_handler *)arg;
/* Do not accept new connections, preparing to end worker's process */
struct timeval tv;

if (!wanna_die) {
tv.tv_sec = SOFT_SHUTDOWN_TIME;
tv.tv_usec = 0;
event_del (&worker->sig_ev_usr1);
event_del (&worker->sig_ev_usr2);
rspamd_worker_stop_accept (worker);
rspamd_worker_stop_accept (sigh->worker);
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
event_loopexit (&tv);
event_base_loopexit (sigh->base, &tv);
}
return;
}

/*
* Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them
*/
static void
worker_sigusr1_handler (gint fd, short what, void *arg)
rspamd_worker_usr1_handler (gint fd, short what, void *arg)
{
struct rspamd_worker_signal_handler *sigh =
(struct rspamd_worker_signal_handler *)arg;

reopen_log (sigh->worker->srv->logger);
}

static void
rspamd_worker_term_handler (gint fd, short what, void *arg)
{
struct rspamd_worker_signal_handler *sigh =
(struct rspamd_worker_signal_handler *)arg;
struct timeval tv;

if (!wanna_die) {
msg_info ("terminating after receiving %s signal", strsignal (sigh->signo));
wanna_die = 1;
tv.tv_sec = 0;
tv.tv_usec = 0;
event_base_loopexit (sigh->base, &tv);
#ifdef WITH_GPERF_TOOLS
ProfilerStop ();
#endif
}
}

static void
rspamd_worker_ignore_signal (int signo)
{
struct rspamd_worker *worker = (struct rspamd_worker *) arg;
struct sigaction sig;

sigemptyset (&sig.sa_mask);
sigaddset (&sig.sa_mask, signo);
sig.sa_handler = SIG_IGN;
sig.sa_flags = 0;
sigaction (signo, &sig, NULL);
}

reopen_log (worker->srv->logger);
static void
rspamd_worker_set_signal_handler (int signo, struct rspamd_worker *worker,
struct event_base *base, void (*handler)(int, short, void *))
{
struct rspamd_worker_signal_handler *sigh;

return;
sigh = g_malloc (sizeof (*sigh));
sigh->signo = signo;
sigh->worker = worker;
sigh->base = base;
sigh->enabled = TRUE;

signal_set (&sigh->ev, signo, handler, sigh);
event_base_set (base, &sigh->ev);
signal_add (&sigh->ev, NULL);

g_hash_table_insert (worker->signal_events, GINT_TO_POINTER (signo), sigh);
}

static void
rspamd_worker_init_signals (struct rspamd_worker *worker, struct event_base *base)
{
/* We ignore these signals in the worker */
rspamd_worker_ignore_signal (SIGPIPE);
rspamd_worker_ignore_signal (SIGALRM);
rspamd_worker_ignore_signal (SIGCHLD);

/* A set of terminating signals */
rspamd_worker_set_signal_handler (SIGTERM, worker, base,
rspamd_worker_term_handler);
rspamd_worker_set_signal_handler (SIGINT, worker, base,
rspamd_worker_term_handler);
rspamd_worker_set_signal_handler (SIGHUP, worker, base,
rspamd_worker_term_handler);

/* Special purpose signals */
rspamd_worker_set_signal_handler (SIGUSR1, worker, base,
rspamd_worker_usr1_handler);
rspamd_worker_set_signal_handler (SIGUSR1, worker, base,
rspamd_worker_usr2_handler);
}

struct event_base *
@@ -118,7 +160,6 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
{
struct event_base *ev_base;
struct event *accept_event;
struct sigaction signals;
GList *cur;
gint listen_socket;

@@ -130,11 +171,12 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
gperf_profiler_init (worker->srv->cfg, name);

worker->srv->pid = getpid ();
worker->signal_events = g_hash_table_new_full (g_int_hash, g_int_equal,
NULL, g_free);

ev_base = event_init ();

init_signals (&signals, worker_sig_handler);
sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
rspamd_worker_init_signals (worker, ev_base);

/* Accept all sockets */
cur = worker->cf->listen_socks;
@@ -152,18 +194,6 @@ rspamd_prepare_worker (struct rspamd_worker *worker, const char *name,
cur = g_list_next (cur);
}

/* SIGUSR2 handler */
signal_set (&worker->sig_ev_usr2, SIGUSR2, worker_sigusr2_handler,
(void *) worker);
event_base_set (ev_base, &worker->sig_ev_usr2);
signal_add (&worker->sig_ev_usr2, NULL);

/* SIGUSR1 handler */
signal_set (&worker->sig_ev_usr1, SIGUSR1, worker_sigusr1_handler,
(void *) worker);
event_base_set (ev_base, &worker->sig_ev_usr1);
signal_add (&worker->sig_ev_usr1, NULL);

return ev_base;
}

@@ -172,6 +202,9 @@ rspamd_worker_stop_accept (struct rspamd_worker *worker)
{
GList *cur;
struct event *event;
GHashTableIter it;
struct rspamd_worker_signal_handler *sigh;
gpointer k, v;

/* Remove all events */
cur = worker->accept_events;
@@ -185,6 +218,17 @@ rspamd_worker_stop_accept (struct rspamd_worker *worker)
if (worker->accept_events != NULL) {
g_list_free (worker->accept_events);
}

g_hash_table_iter_init (&it, worker->signal_events);
while (g_hash_table_iter_next (&it, &k, &v)) {
sigh = (struct rspamd_worker_signal_handler *)v;
g_hash_table_iter_steal (&it);
if (sigh->enabled) {
event_del (&sigh->ev);
}
g_free (sigh);
}
g_hash_table_unref (worker->signal_events);
}

void

+ 10
- 2
src/main.h Vedi File

@@ -54,13 +54,21 @@ struct rspamd_worker {
gboolean pending; /**< if worker is pending to run */
struct rspamd_main *srv; /**< pointer to server structure */
GQuark type; /**< process type */
struct event sig_ev_usr1; /**< signals event */
struct event sig_ev_usr2; /**< signals event */
GHashTable *signal_events; /**< signal events */
GList *accept_events; /**< socket events */
struct rspamd_worker_conf *cf; /**< worker config data */
gpointer ctx; /**< worker's specific data */
};

struct rspamd_worker_signal_handler {
gint signo;
gboolean enabled;
struct event ev;
struct event_base *base;
struct rspamd_worker *worker;
};


/**
* Module
*/

Loading…
Annulla
Salva