diff options
Diffstat (limited to 'src/libserver')
-rw-r--r-- | src/libserver/CMakeLists.txt | 4 | ||||
-rw-r--r-- | src/libserver/worker_util.c | 217 | ||||
-rw-r--r-- | src/libserver/worker_util.h | 67 |
3 files changed, 287 insertions, 1 deletions
diff --git a/src/libserver/CMakeLists.txt b/src/libserver/CMakeLists.txt index bd5df18b9..99a4debb2 100644 --- a/src/libserver/CMakeLists.txt +++ b/src/libserver/CMakeLists.txt @@ -17,7 +17,9 @@ SET(LIBRSPAMDSERVERSRC statfile_sync.c symbols_cache.c task.c - url.c) + url.c + worker_util.c) + SET(TOKENIZERSSRC ../tokenizers/tokenizers.c ../tokenizers/osb.c) diff --git a/src/libserver/worker_util.c b/src/libserver/worker_util.c new file mode 100644 index 000000000..5507cdb7c --- /dev/null +++ b/src/libserver/worker_util.c @@ -0,0 +1,217 @@ +/* Copyright (c) 2010-2011, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "main.h" +#include "message.h" +#include "lua/lua_common.h" + +extern struct rspamd_main *rspamd_main; + +/** + * Return worker's control structure by its type + * @param type + * @return worker's control structure or NULL + */ +worker_t* +get_worker_by_type (GQuark type) +{ + worker_t **cur; + + cur = &workers[0]; + while (*cur) { + if (g_quark_from_string ((*cur)->name) == type) { + return *cur; + } + cur ++; + } + + return NULL; +} + +double +set_counter (const gchar *name, guint32 value) +{ + struct counter_data *cd; + double alpha; + gchar *key; + + cd = rspamd_hash_lookup (rspamd_main->counters, (gpointer) name); + + if (cd == NULL) { + cd = rspamd_mempool_alloc_shared (rspamd_main->counters->pool, sizeof (struct counter_data)); + cd->value = value; + cd->number = 0; + key = rspamd_mempool_strdup_shared (rspamd_main->counters->pool, name); + rspamd_hash_insert (rspamd_main->counters, (gpointer) key, (gpointer) cd); + } + else { + /* Calculate new value */ + rspamd_mempool_wlock_rwlock (rspamd_main->counters->lock); + + alpha = 2. / (++cd->number + 1); + cd->value = cd->value * (1. - alpha) + value * alpha; + + rspamd_mempool_wunlock_rwlock (rspamd_main->counters->lock); + } + + return cd->value; +} + +sig_atomic_t wanna_die = 0; + +#ifndef HAVE_SA_SIGINFO +static void +worker_sig_handler (gint signo) +#else +static void +worker_sig_handler (gint signo, siginfo_t * info, void *unused) +#endif +{ + struct timeval tv; + + switch (signo) { + case SIGINT: + case SIGTERM: + if (!wanna_die) { + wanna_die = 1; + tv.tv_sec = 0; + tv.tv_usec = 0; + event_loopexit (&tv); + +#ifdef WITH_GPERF_TOOLS + ProfilerStop (); +#endif + } + break; + } +} + +/* + * Config reload is designed by sending sigusr2 to active workers and pending shutdown of them + */ +static void +worker_sigusr2_handler (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *) arg; + /* Do not accept new connections, preparing to end worker's process */ + struct timeval tv; + + if (!wanna_die) { + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + event_del (&worker->sig_ev_usr1); + event_del (&worker->sig_ev_usr2); + worker_stop_accept (worker); + msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + event_loopexit (&tv); + } + return; +} + +/* + * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them + */ +static void +worker_sigusr1_handler (gint fd, short what, void *arg) +{ + struct rspamd_worker *worker = (struct rspamd_worker *) arg; + + reopen_log (worker->srv->logger); + + return; +} + +struct event_base * +prepare_worker (struct rspamd_worker *worker, const char *name, + void (*accept_handler)(int, short, void *)) +{ + struct event_base *ev_base; + struct event *accept_event; + struct sigaction signals; + GList *cur; + gint listen_socket; + +#ifdef WITH_PROFILER + extern void _start (void), etext (void); + monstartup ((u_long) & _start, (u_long) & etext); +#endif + + gperf_profiler_init (worker->srv->cfg, name); + + worker->srv->pid = getpid (); + + ev_base = event_init (); + + init_signals (&signals, worker_sig_handler); + sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); + + /* Accept all sockets */ + cur = worker->cf->listen_socks; + while (cur) { + listen_socket = GPOINTER_TO_INT (cur->data); + if (listen_socket != -1) { + accept_event = g_slice_alloc0 (sizeof (struct event)); + event_set (accept_event, listen_socket, EV_READ | EV_PERSIST, + accept_handler, worker); + event_base_set (ev_base, accept_event); + event_add (accept_event, NULL); + worker->accept_events = g_list_prepend (worker->accept_events, accept_event); + } + cur = g_list_next (cur); + } + + /* SIGUSR2 handler */ + signal_set (&worker->sig_ev_usr2, SIGUSR2, worker_sigusr2_handler, + (void *) worker); + event_base_set (ev_base, &worker->sig_ev_usr2); + signal_add (&worker->sig_ev_usr2, NULL); + + /* SIGUSR1 handler */ + signal_set (&worker->sig_ev_usr1, SIGUSR1, worker_sigusr1_handler, + (void *) worker); + event_base_set (ev_base, &worker->sig_ev_usr1); + signal_add (&worker->sig_ev_usr1, NULL); + + return ev_base; +} + +void +worker_stop_accept (struct rspamd_worker *worker) +{ + GList *cur; + struct event *event; + + /* Remove all events */ + cur = worker->accept_events; + while (cur) { + event = cur->data; + event_del (event); + cur = g_list_next (cur); + g_slice_free1 (sizeof (struct event), event); + } + + if (worker->accept_events != NULL) { + g_list_free (worker->accept_events); + } +} diff --git a/src/libserver/worker_util.h b/src/libserver/worker_util.h new file mode 100644 index 000000000..d88b93a8a --- /dev/null +++ b/src/libserver/worker_util.h @@ -0,0 +1,67 @@ +/* Copyright (c) 2014, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef WORKER_UTIL_H_ +#define WORKER_UTIL_H_ + +#include "config.h" +#include "util.h" + +/** + * Return worker's control structure by its type + * @param type + * @return worker's control structure or NULL + */ +worker_t* get_worker_by_type (GQuark type); + +/** + * Set counter for a symbol + */ +double set_counter (const gchar *name, guint32 value); + +#ifndef HAVE_SA_SIGINFO +typedef void (*rspamd_sig_handler_t) (gint); +#else +typedef void (*rspamd_sig_handler_t) (gint, siginfo_t *, void *); +#endif + +struct rspamd_worker; + +/** + * Prepare worker's startup + * @param worker worker structure + * @param name name of the worker + * @param sig_handler handler of main signals + * @param accept_handler handler of accept event for listen sockets + * @return event base suitable for a worker + */ +struct event_base * +prepare_worker (struct rspamd_worker *worker, const char *name, + void (*accept_handler)(int, short, void *)); + +/** + * Stop accepting new connections for a worker + * @param worker + */ +void worker_stop_accept (struct rspamd_worker *worker); + +#endif /* WORKER_UTIL_H_ */ |