aboutsummaryrefslogtreecommitdiffstats
path: root/src/kvstorage_server.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-10-31 20:02:55 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-10-31 20:02:55 +0300
commitc750ae5859456d3bc7593e5998deac48cfad7c69 (patch)
tree79b66e2b490b5c2f19316882da681a1771049104 /src/kvstorage_server.c
parentcbb830d3182f7967ec477d1b050bc0dedbf71dd8 (diff)
downloadrspamd-c750ae5859456d3bc7593e5998deac48cfad7c69.tar.gz
rspamd-c750ae5859456d3bc7593e5998deac48cfad7c69.zip
* Fix threading in kvstorage.
Rspamd now can detect and work with libevent-2.
Diffstat (limited to 'src/kvstorage_server.c')
-rw-r--r--src/kvstorage_server.c136
1 files changed, 67 insertions, 69 deletions
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index d124e9f4a..48068cdf7 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -37,10 +37,10 @@
#define ERROR_NOT_FOUND "NOT_FOUND" CRLF
#define ERROR_INVALID_KEYSTORAGE "CLIENT_ERROR storage does not exists" CRLF
-/* This is required for normal signals processing */
-static GList *global_evbases = NULL;
-static struct event_base *main_base = NULL;
+
static sig_atomic_t wanna_die = 0;
+static sig_atomic_t do_reopen_log = 0;
+static sig_atomic_t soft_wanna_die = 0;
/* Logging functions */
#define thr_err(...) do { \
@@ -69,64 +69,20 @@ static void
sig_handler (gint signo, siginfo_t *info, void *unused)
#endif
{
- struct timeval tv;
- GList *cur;
-
switch (signo) {
case SIGUSR1:
- reopen_log (rspamd_main->logger);
+ do_reopen_log = 1;
break;
case SIGINT:
case SIGTERM:
- if (!wanna_die) {
- wanna_die = 1;
- tv.tv_sec = 0;
- tv.tv_usec = 0;
- cur = global_evbases;
- while (cur) {
- event_base_loopexit (cur->data, &tv);
- }
- event_base_loopexit (main_base, &tv);
-#ifdef WITH_GPERF_TOOLS
- ProfilerStop ();
-#endif
- }
+ wanna_die = 1;
+ break;
+ case SIGUSR2:
+ soft_wanna_die = 1;
break;
}
}
-/*
- * Config reload is designed by sending sigusr to active workers and pending shutdown of them
- */
-static void
-sigusr_handler (gint fd, short what, void *arg)
-{
- struct rspamd_worker *worker = (struct rspamd_worker *)arg;
- /* Do not accept new connections, preparing to end worker's process */
- struct timeval tv;
- GList *cur;
- struct kvstorage_worker_ctx *ctx;
- struct kvstorage_worker_thread *thr;
-
- ctx = worker->ctx;
- if (! wanna_die) {
- tv.tv_sec = SOFT_SHUTDOWN_TIME;
- tv.tv_usec = 0;
- event_del (&worker->sig_ev);
- msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
- cur = ctx->threads;
- while (cur) {
- thr = cur->data;
- if (thr->ev_base != NULL) {
- event_del (&thr->bind_ev);
- event_base_loopexit (thr->ev_base, &tv);
- }
- }
- event_base_loopexit (ctx->ev_base, &tv);
- }
- return;
-}
-
gpointer
init_kvstorage_worker (void)
{
@@ -164,6 +120,7 @@ free_kvstorage_session (struct kvstorage_session *session)
rspamd_remove_dispatcher (session->dispather);
memory_pool_delete (session->pool);
close (session->sock);
+ g_slice_free1 (sizeof (struct kvstorage_session), session);
}
/**
@@ -460,8 +417,10 @@ kvstorage_err_socket (GError * err, void *arg)
struct kvstorage_worker_thread *thr;
thr = session->thr;
- thr_info ("%ud: abnormally closing connection from: %s, error: %s",
+ if (err->code != -1) {
+ thr_info ("%ud: abnormally closing connection from: %s, error: %s",
thr->id, inet_ntoa (session->client_addr), err->message);
+ }
g_error_free (err);
free_kvstorage_session (session);
}
@@ -478,35 +437,35 @@ thr_accept_socket (gint fd, short what, void *arg)
gint nfd;
struct kvstorage_session *session;
+ g_static_mutex_lock (thr->accept_mtx);
if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno));
+ g_static_mutex_unlock (thr->accept_mtx);
return;
}
/* Check for EAGAIN */
if (nfd == 0) {
+ g_static_mutex_unlock (thr->accept_mtx);
return;
}
- session = g_malloc (sizeof (struct kvstorage_session));
+ session = g_slice_alloc (sizeof (struct kvstorage_session));
session->pool = memory_pool_new (memory_pool_get_size ());
session->state = KVSTORAGE_STATE_READ_CMD;
session->thr = thr;
session->sock = nfd;
- session->dispather = rspamd_create_dispatcher (thr->ctx->ev_base, nfd, BUFFER_LINE,
+ session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE,
kvstorage_read_socket, NULL,
kvstorage_err_socket, thr->tv, session);
- session->dispather->strip_eol = TRUE;
if (su.ss.ss_family == AF_UNIX) {
- thr_info ("%ud: accepted connection from unix socket", thr->id);
session->client_addr.s_addr = INADDR_NONE;
}
else if (su.ss.ss_family == AF_INET) {
- thr_info ("%ud: accepted connection from %s port %d", thr->id,
- inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
memcpy (&session->client_addr, &su.s4.sin_addr,
sizeof (struct in_addr));
}
+ g_static_mutex_unlock (thr->accept_mtx);
}
/**
@@ -542,6 +501,7 @@ create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_c
new->worker = worker;
new->tv = &ctx->io_timeout;
new->log_mtx = &ctx->log_mtx;
+ new->accept_mtx = &ctx->accept_mtx;
new->id = id;
new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err);
new->ev_base = NULL;
@@ -563,6 +523,8 @@ start_kvstorage_worker (struct rspamd_worker *worker)
struct kvstorage_worker_ctx *ctx = worker->ctx;
guint i;
struct kvstorage_worker_thread *thr;
+ struct timeval tv;
+ GList *cur;
gperf_profiler_init (worker->srv->cfg, "kvstorage");
@@ -576,7 +538,6 @@ start_kvstorage_worker (struct rspamd_worker *worker)
exit (EXIT_SUCCESS);
}
worker->srv->pid = getpid ();
- ctx->ev_base = event_init ();
ctx->threads = NULL;
g_thread_init (NULL);
@@ -586,7 +547,6 @@ start_kvstorage_worker (struct rspamd_worker *worker)
exit (EXIT_SUCCESS);
}
#endif
- main_base = ctx->ev_base;
/* Set kvstorage options */
if ( !config_kvstorage_worker (worker)) {
@@ -597,22 +557,60 @@ start_kvstorage_worker (struct rspamd_worker *worker)
init_signals (&signals, sig_handler);
sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
- /* SIGUSR2 handler */
- signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
- event_base_set (ctx->ev_base, &worker->sig_ev);
- signal_add (&worker->sig_ev, NULL);
+ /* Set umask */
+ umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);
/* Start workers threads */
g_static_mutex_init (&ctx->log_mtx);
+ g_static_mutex_init (&ctx->accept_mtx);
for (i = 0; i < worker->cf->count; i ++) {
thr = create_kvstorage_thread (worker, ctx, i);
ctx->threads = g_list_prepend (ctx->threads, thr);
}
- /* Set umask */
- umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);
-
- event_base_loop (ctx->ev_base, 0);
+ sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL);
+ /* Signal processing cycle */
+ for (;;) {
+ msg_debug ("calling sigsuspend");
+ sigemptyset (&signals.sa_mask);
+ sigsuspend (&signals.sa_mask);
+ if (wanna_die == 1) {
+ wanna_die = 0;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ msg_info ("worker's immediately shutdown is requested");
+ cur = ctx->threads;
+ while (cur) {
+ thr = cur->data;
+ if (thr->ev_base != NULL) {
+ event_del (&thr->bind_ev);
+ event_base_loopexit (thr->ev_base, &tv);
+ }
+ cur = g_list_next (cur);
+ }
+ break;
+ }
+ else if (soft_wanna_die == 1) {
+ soft_wanna_die = 0;
+ tv.tv_sec = SOFT_SHUTDOWN_TIME;
+ tv.tv_usec = 0;
+ msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+ cur = ctx->threads;
+ while (cur) {
+ thr = cur->data;
+ if (thr->ev_base != NULL) {
+ event_del (&thr->bind_ev);
+ event_base_loopexit (thr->ev_base, &tv);
+ }
+ cur = g_list_next (cur);
+ }
+ break;
+ }
+ else if (do_reopen_log == 1) {
+ do_reopen_log = 0;
+ reopen_log (rspamd_main->logger);
+ }
+ }
close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);