From c750ae5859456d3bc7593e5998deac48cfad7c69 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 31 Oct 2011 20:02:55 +0300 Subject: * Fix threading in kvstorage. Rspamd now can detect and work with libevent-2. --- src/kvstorage_server.c | 136 ++++++++++++++++++++++++------------------------- 1 file changed, 67 insertions(+), 69 deletions(-) (limited to 'src/kvstorage_server.c') diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index d124e9f4a..48068cdf7 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -37,10 +37,10 @@ #define ERROR_NOT_FOUND "NOT_FOUND" CRLF #define ERROR_INVALID_KEYSTORAGE "CLIENT_ERROR storage does not exists" CRLF -/* This is required for normal signals processing */ -static GList *global_evbases = NULL; -static struct event_base *main_base = NULL; + static sig_atomic_t wanna_die = 0; +static sig_atomic_t do_reopen_log = 0; +static sig_atomic_t soft_wanna_die = 0; /* Logging functions */ #define thr_err(...) do { \ @@ -69,64 +69,20 @@ static void sig_handler (gint signo, siginfo_t *info, void *unused) #endif { - struct timeval tv; - GList *cur; - switch (signo) { case SIGUSR1: - reopen_log (rspamd_main->logger); + do_reopen_log = 1; break; case SIGINT: case SIGTERM: - if (!wanna_die) { - wanna_die = 1; - tv.tv_sec = 0; - tv.tv_usec = 0; - cur = global_evbases; - while (cur) { - event_base_loopexit (cur->data, &tv); - } - event_base_loopexit (main_base, &tv); -#ifdef WITH_GPERF_TOOLS - ProfilerStop (); -#endif - } + wanna_die = 1; + break; + case SIGUSR2: + soft_wanna_die = 1; break; } } -/* - * Config reload is designed by sending sigusr to active workers and pending shutdown of them - */ -static void -sigusr_handler (gint fd, short what, void *arg) -{ - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - /* Do not accept new connections, preparing to end worker's process */ - struct timeval tv; - GList *cur; - struct kvstorage_worker_ctx *ctx; - struct kvstorage_worker_thread *thr; - - ctx = worker->ctx; - if (! wanna_die) { - tv.tv_sec = SOFT_SHUTDOWN_TIME; - tv.tv_usec = 0; - event_del (&worker->sig_ev); - msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); - cur = ctx->threads; - while (cur) { - thr = cur->data; - if (thr->ev_base != NULL) { - event_del (&thr->bind_ev); - event_base_loopexit (thr->ev_base, &tv); - } - } - event_base_loopexit (ctx->ev_base, &tv); - } - return; -} - gpointer init_kvstorage_worker (void) { @@ -164,6 +120,7 @@ free_kvstorage_session (struct kvstorage_session *session) rspamd_remove_dispatcher (session->dispather); memory_pool_delete (session->pool); close (session->sock); + g_slice_free1 (sizeof (struct kvstorage_session), session); } /** @@ -460,8 +417,10 @@ kvstorage_err_socket (GError * err, void *arg) struct kvstorage_worker_thread *thr; thr = session->thr; - thr_info ("%ud: abnormally closing connection from: %s, error: %s", + if (err->code != -1) { + thr_info ("%ud: abnormally closing connection from: %s, error: %s", thr->id, inet_ntoa (session->client_addr), err->message); + } g_error_free (err); free_kvstorage_session (session); } @@ -478,35 +437,35 @@ thr_accept_socket (gint fd, short what, void *arg) gint nfd; struct kvstorage_session *session; + g_static_mutex_lock (thr->accept_mtx); if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno)); + g_static_mutex_unlock (thr->accept_mtx); return; } /* Check for EAGAIN */ if (nfd == 0) { + g_static_mutex_unlock (thr->accept_mtx); return; } - session = g_malloc (sizeof (struct kvstorage_session)); + session = g_slice_alloc (sizeof (struct kvstorage_session)); session->pool = memory_pool_new (memory_pool_get_size ()); session->state = KVSTORAGE_STATE_READ_CMD; session->thr = thr; session->sock = nfd; - session->dispather = rspamd_create_dispatcher (thr->ctx->ev_base, nfd, BUFFER_LINE, + session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE, kvstorage_read_socket, NULL, kvstorage_err_socket, thr->tv, session); - session->dispather->strip_eol = TRUE; if (su.ss.ss_family == AF_UNIX) { - thr_info ("%ud: accepted connection from unix socket", thr->id); session->client_addr.s_addr = INADDR_NONE; } else if (su.ss.ss_family == AF_INET) { - thr_info ("%ud: accepted connection from %s port %d", thr->id, - inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port)); memcpy (&session->client_addr, &su.s4.sin_addr, sizeof (struct in_addr)); } + g_static_mutex_unlock (thr->accept_mtx); } /** @@ -542,6 +501,7 @@ create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_c new->worker = worker; new->tv = &ctx->io_timeout; new->log_mtx = &ctx->log_mtx; + new->accept_mtx = &ctx->accept_mtx; new->id = id; new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err); new->ev_base = NULL; @@ -563,6 +523,8 @@ start_kvstorage_worker (struct rspamd_worker *worker) struct kvstorage_worker_ctx *ctx = worker->ctx; guint i; struct kvstorage_worker_thread *thr; + struct timeval tv; + GList *cur; gperf_profiler_init (worker->srv->cfg, "kvstorage"); @@ -576,7 +538,6 @@ start_kvstorage_worker (struct rspamd_worker *worker) exit (EXIT_SUCCESS); } worker->srv->pid = getpid (); - ctx->ev_base = event_init (); ctx->threads = NULL; g_thread_init (NULL); @@ -586,7 +547,6 @@ start_kvstorage_worker (struct rspamd_worker *worker) exit (EXIT_SUCCESS); } #endif - main_base = ctx->ev_base; /* Set kvstorage options */ if ( !config_kvstorage_worker (worker)) { @@ -597,22 +557,60 @@ start_kvstorage_worker (struct rspamd_worker *worker) init_signals (&signals, sig_handler); sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); - /* SIGUSR2 handler */ - signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker); - event_base_set (ctx->ev_base, &worker->sig_ev); - signal_add (&worker->sig_ev, NULL); + /* Set umask */ + umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); /* Start workers threads */ g_static_mutex_init (&ctx->log_mtx); + g_static_mutex_init (&ctx->accept_mtx); for (i = 0; i < worker->cf->count; i ++) { thr = create_kvstorage_thread (worker, ctx, i); ctx->threads = g_list_prepend (ctx->threads, thr); } - /* Set umask */ - umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); - - event_base_loop (ctx->ev_base, 0); + sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL); + /* Signal processing cycle */ + for (;;) { + msg_debug ("calling sigsuspend"); + sigemptyset (&signals.sa_mask); + sigsuspend (&signals.sa_mask); + if (wanna_die == 1) { + wanna_die = 0; + tv.tv_sec = 0; + tv.tv_usec = 0; + msg_info ("worker's immediately shutdown is requested"); + cur = ctx->threads; + while (cur) { + thr = cur->data; + if (thr->ev_base != NULL) { + event_del (&thr->bind_ev); + event_base_loopexit (thr->ev_base, &tv); + } + cur = g_list_next (cur); + } + break; + } + else if (soft_wanna_die == 1) { + soft_wanna_die = 0; + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + cur = ctx->threads; + while (cur) { + thr = cur->data; + if (thr->ev_base != NULL) { + event_del (&thr->bind_ev); + event_base_loopexit (thr->ev_base, &tv); + } + cur = g_list_next (cur); + } + break; + } + else if (do_reopen_log == 1) { + do_reopen_log = 0; + reopen_log (rspamd_main->logger); + } + } close_log (rspamd_main->logger); exit (EXIT_SUCCESS); -- cgit v1.2.3