diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-10-31 20:02:55 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-10-31 20:02:55 +0300 |
commit | c750ae5859456d3bc7593e5998deac48cfad7c69 (patch) | |
tree | 79b66e2b490b5c2f19316882da681a1771049104 | |
parent | cbb830d3182f7967ec477d1b050bc0dedbf71dd8 (diff) | |
download | rspamd-c750ae5859456d3bc7593e5998deac48cfad7c69.tar.gz rspamd-c750ae5859456d3bc7593e5998deac48cfad7c69.zip |
* Fix threading in kvstorage.
Rspamd now can detect and work with libevent-2.
-rw-r--r-- | CMakeLists.txt | 10 | ||||
-rw-r--r-- | config.h.in | 3 | ||||
-rw-r--r-- | src/buffer.c | 6 | ||||
-rw-r--r-- | src/kvstorage_server.c | 136 | ||||
-rw-r--r-- | src/kvstorage_server.h | 2 | ||||
-rw-r--r-- | src/util.c | 2 |
6 files changed, 87 insertions, 72 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 0d9fc18b6..c620bfeaa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -672,6 +672,13 @@ ELSE(HAVE_SIGINFO_H) CHECK_SYMBOL_EXISTS(SA_SIGINFO "signal.h" HAVE_SA_SIGINFO) ENDIF(HAVE_SIGINFO_H) +# Some hack for libevent 2.0 +CHECK_C_SOURCE_COMPILES ("#include <event.h> + #if _EVENT_NUMERIC_VERSION < 0x02000000 + #error Unsupported + #endif + int main() { return 0;}" HAVE_LIBEVENT2) + IF(NOT CMAKE_SYSTEM_NAME STREQUAL "SunOS") IF(HAVE_CLOCK_GETTIME) CHECK_SYMBOL_EXISTS(CLOCK_PROCESS_CPUTIME_ID time.h HAVE_CLOCK_PROCESS_CPUTIME_ID) @@ -810,6 +817,9 @@ IF(LIBDB_LIBRARY) TARGET_LINK_LIBRARIES(rspamd db-4) ENDIF(LIBDB_LIBRARY) TARGET_LINK_LIBRARIES(rspamd event) +IF(HAVE_LIBEVENT2) + TARGET_LINK_LIBRARIES(rspamd event_pthreads) +ENDIF(HAVE_LIBEVENT2) TARGET_LINK_LIBRARIES(rspamd pcre) TARGET_LINK_LIBRARIES(rspamd ${CMAKE_REQUIRED_LIBRARIES}) diff --git a/config.h.in b/config.h.in index 950bf2ef4..baa4beea8 100644 --- a/config.h.in +++ b/config.h.in @@ -375,6 +375,9 @@ # include <siginfo.h> #endif #include <event.h> +#if _EVENT_NUMERIC_VERSION > 0x02000000 +# include <event2/thread.h> +#endif #include <glib.h> diff --git a/src/buffer.c b/src/buffer.c index c94d5f91c..95e42f756 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -178,7 +178,7 @@ static gboolean write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) { GList *cur; - GError *err; + GError *err = NULL; rspamd_buffer_t *buf; ssize_t r; @@ -264,7 +264,7 @@ static void read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read) { ssize_t r; - GError *err; + GError *err = NULL; f_str_t res; gchar *c, *b; gchar *end; @@ -453,7 +453,7 @@ static void dispatcher_cb (gint fd, short what, void *arg) { rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg; - GError *err; + GError *err = NULL; debug_ip("in dispatcher callback, what: %d, fd: %d", (gint)what, fd); diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index d124e9f4a..48068cdf7 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -37,10 +37,10 @@ #define ERROR_NOT_FOUND "NOT_FOUND" CRLF #define ERROR_INVALID_KEYSTORAGE "CLIENT_ERROR storage does not exists" CRLF -/* This is required for normal signals processing */ -static GList *global_evbases = NULL; -static struct event_base *main_base = NULL; + static sig_atomic_t wanna_die = 0; +static sig_atomic_t do_reopen_log = 0; +static sig_atomic_t soft_wanna_die = 0; /* Logging functions */ #define thr_err(...) do { \ @@ -69,64 +69,20 @@ static void sig_handler (gint signo, siginfo_t *info, void *unused) #endif { - struct timeval tv; - GList *cur; - switch (signo) { case SIGUSR1: - reopen_log (rspamd_main->logger); + do_reopen_log = 1; break; case SIGINT: case SIGTERM: - if (!wanna_die) { - wanna_die = 1; - tv.tv_sec = 0; - tv.tv_usec = 0; - cur = global_evbases; - while (cur) { - event_base_loopexit (cur->data, &tv); - } - event_base_loopexit (main_base, &tv); -#ifdef WITH_GPERF_TOOLS - ProfilerStop (); -#endif - } + wanna_die = 1; + break; + case SIGUSR2: + soft_wanna_die = 1; break; } } -/* - * Config reload is designed by sending sigusr to active workers and pending shutdown of them - */ -static void -sigusr_handler (gint fd, short what, void *arg) -{ - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - /* Do not accept new connections, preparing to end worker's process */ - struct timeval tv; - GList *cur; - struct kvstorage_worker_ctx *ctx; - struct kvstorage_worker_thread *thr; - - ctx = worker->ctx; - if (! wanna_die) { - tv.tv_sec = SOFT_SHUTDOWN_TIME; - tv.tv_usec = 0; - event_del (&worker->sig_ev); - msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); - cur = ctx->threads; - while (cur) { - thr = cur->data; - if (thr->ev_base != NULL) { - event_del (&thr->bind_ev); - event_base_loopexit (thr->ev_base, &tv); - } - } - event_base_loopexit (ctx->ev_base, &tv); - } - return; -} - gpointer init_kvstorage_worker (void) { @@ -164,6 +120,7 @@ free_kvstorage_session (struct kvstorage_session *session) rspamd_remove_dispatcher (session->dispather); memory_pool_delete (session->pool); close (session->sock); + g_slice_free1 (sizeof (struct kvstorage_session), session); } /** @@ -460,8 +417,10 @@ kvstorage_err_socket (GError * err, void *arg) struct kvstorage_worker_thread *thr; thr = session->thr; - thr_info ("%ud: abnormally closing connection from: %s, error: %s", + if (err->code != -1) { + thr_info ("%ud: abnormally closing connection from: %s, error: %s", thr->id, inet_ntoa (session->client_addr), err->message); + } g_error_free (err); free_kvstorage_session (session); } @@ -478,35 +437,35 @@ thr_accept_socket (gint fd, short what, void *arg) gint nfd; struct kvstorage_session *session; + g_static_mutex_lock (thr->accept_mtx); if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno)); + g_static_mutex_unlock (thr->accept_mtx); return; } /* Check for EAGAIN */ if (nfd == 0) { + g_static_mutex_unlock (thr->accept_mtx); return; } - session = g_malloc (sizeof (struct kvstorage_session)); + session = g_slice_alloc (sizeof (struct kvstorage_session)); session->pool = memory_pool_new (memory_pool_get_size ()); session->state = KVSTORAGE_STATE_READ_CMD; session->thr = thr; session->sock = nfd; - session->dispather = rspamd_create_dispatcher (thr->ctx->ev_base, nfd, BUFFER_LINE, + session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE, kvstorage_read_socket, NULL, kvstorage_err_socket, thr->tv, session); - session->dispather->strip_eol = TRUE; if (su.ss.ss_family == AF_UNIX) { - thr_info ("%ud: accepted connection from unix socket", thr->id); session->client_addr.s_addr = INADDR_NONE; } else if (su.ss.ss_family == AF_INET) { - thr_info ("%ud: accepted connection from %s port %d", thr->id, - inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port)); memcpy (&session->client_addr, &su.s4.sin_addr, sizeof (struct in_addr)); } + g_static_mutex_unlock (thr->accept_mtx); } /** @@ -542,6 +501,7 @@ create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_c new->worker = worker; new->tv = &ctx->io_timeout; new->log_mtx = &ctx->log_mtx; + new->accept_mtx = &ctx->accept_mtx; new->id = id; new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err); new->ev_base = NULL; @@ -563,6 +523,8 @@ start_kvstorage_worker (struct rspamd_worker *worker) struct kvstorage_worker_ctx *ctx = worker->ctx; guint i; struct kvstorage_worker_thread *thr; + struct timeval tv; + GList *cur; gperf_profiler_init (worker->srv->cfg, "kvstorage"); @@ -576,7 +538,6 @@ start_kvstorage_worker (struct rspamd_worker *worker) exit (EXIT_SUCCESS); } worker->srv->pid = getpid (); - ctx->ev_base = event_init (); ctx->threads = NULL; g_thread_init (NULL); @@ -586,7 +547,6 @@ start_kvstorage_worker (struct rspamd_worker *worker) exit (EXIT_SUCCESS); } #endif - main_base = ctx->ev_base; /* Set kvstorage options */ if ( !config_kvstorage_worker (worker)) { @@ -597,22 +557,60 @@ start_kvstorage_worker (struct rspamd_worker *worker) init_signals (&signals, sig_handler); sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); - /* SIGUSR2 handler */ - signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker); - event_base_set (ctx->ev_base, &worker->sig_ev); - signal_add (&worker->sig_ev, NULL); + /* Set umask */ + umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); /* Start workers threads */ g_static_mutex_init (&ctx->log_mtx); + g_static_mutex_init (&ctx->accept_mtx); for (i = 0; i < worker->cf->count; i ++) { thr = create_kvstorage_thread (worker, ctx, i); ctx->threads = g_list_prepend (ctx->threads, thr); } - /* Set umask */ - umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); - - event_base_loop (ctx->ev_base, 0); + sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL); + /* Signal processing cycle */ + for (;;) { + msg_debug ("calling sigsuspend"); + sigemptyset (&signals.sa_mask); + sigsuspend (&signals.sa_mask); + if (wanna_die == 1) { + wanna_die = 0; + tv.tv_sec = 0; + tv.tv_usec = 0; + msg_info ("worker's immediately shutdown is requested"); + cur = ctx->threads; + while (cur) { + thr = cur->data; + if (thr->ev_base != NULL) { + event_del (&thr->bind_ev); + event_base_loopexit (thr->ev_base, &tv); + } + cur = g_list_next (cur); + } + break; + } + else if (soft_wanna_die == 1) { + soft_wanna_die = 0; + tv.tv_sec = SOFT_SHUTDOWN_TIME; + tv.tv_usec = 0; + msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME); + cur = ctx->threads; + while (cur) { + thr = cur->data; + if (thr->ev_base != NULL) { + event_del (&thr->bind_ev); + event_base_loopexit (thr->ev_base, &tv); + } + cur = g_list_next (cur); + } + break; + } + else if (do_reopen_log == 1) { + do_reopen_log = 0; + reopen_log (rspamd_main->logger); + } + } close_log (rspamd_main->logger); exit (EXIT_SUCCESS); diff --git a/src/kvstorage_server.h b/src/kvstorage_server.h index 6f22b08c2..827b34c9c 100644 --- a/src/kvstorage_server.h +++ b/src/kvstorage_server.h @@ -38,6 +38,7 @@ struct kvstorage_worker_ctx { memory_pool_t *pool; struct event_base *ev_base; GStaticMutex log_mtx; + GStaticMutex accept_mtx; }; struct kvstorage_worker_thread { @@ -48,6 +49,7 @@ struct kvstorage_worker_thread { GThread *thr; struct event_base *ev_base; GStaticMutex *log_mtx; + GStaticMutex *accept_mtx; guint id; }; diff --git a/src/util.c b/src/util.c index 423ceb969..570427ef7 100644 --- a/src/util.c +++ b/src/util.c @@ -1239,6 +1239,8 @@ process_to_str (enum process_type type) return "lmtp"; case TYPE_SMTP: return "smtp"; + case TYPE_KVSTORAGE: + return "keystorage"; default: return "unknown"; } |