summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-10-31 20:02:55 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-10-31 20:02:55 +0300
commitc750ae5859456d3bc7593e5998deac48cfad7c69 (patch)
tree79b66e2b490b5c2f19316882da681a1771049104
parentcbb830d3182f7967ec477d1b050bc0dedbf71dd8 (diff)
downloadrspamd-c750ae5859456d3bc7593e5998deac48cfad7c69.tar.gz
rspamd-c750ae5859456d3bc7593e5998deac48cfad7c69.zip
* Fix threading in kvstorage.
Rspamd now can detect and work with libevent-2.
-rw-r--r--CMakeLists.txt10
-rw-r--r--config.h.in3
-rw-r--r--src/buffer.c6
-rw-r--r--src/kvstorage_server.c136
-rw-r--r--src/kvstorage_server.h2
-rw-r--r--src/util.c2
6 files changed, 87 insertions, 72 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0d9fc18b6..c620bfeaa 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -672,6 +672,13 @@ ELSE(HAVE_SIGINFO_H)
CHECK_SYMBOL_EXISTS(SA_SIGINFO "signal.h" HAVE_SA_SIGINFO)
ENDIF(HAVE_SIGINFO_H)
+# Some hack for libevent 2.0
+CHECK_C_SOURCE_COMPILES ("#include <event.h>
+ #if _EVENT_NUMERIC_VERSION < 0x02000000
+ #error Unsupported
+ #endif
+ int main() { return 0;}" HAVE_LIBEVENT2)
+
IF(NOT CMAKE_SYSTEM_NAME STREQUAL "SunOS")
IF(HAVE_CLOCK_GETTIME)
CHECK_SYMBOL_EXISTS(CLOCK_PROCESS_CPUTIME_ID time.h HAVE_CLOCK_PROCESS_CPUTIME_ID)
@@ -810,6 +817,9 @@ IF(LIBDB_LIBRARY)
TARGET_LINK_LIBRARIES(rspamd db-4)
ENDIF(LIBDB_LIBRARY)
TARGET_LINK_LIBRARIES(rspamd event)
+IF(HAVE_LIBEVENT2)
+ TARGET_LINK_LIBRARIES(rspamd event_pthreads)
+ENDIF(HAVE_LIBEVENT2)
TARGET_LINK_LIBRARIES(rspamd pcre)
TARGET_LINK_LIBRARIES(rspamd ${CMAKE_REQUIRED_LIBRARIES})
diff --git a/config.h.in b/config.h.in
index 950bf2ef4..baa4beea8 100644
--- a/config.h.in
+++ b/config.h.in
@@ -375,6 +375,9 @@
# include <siginfo.h>
#endif
#include <event.h>
+#if _EVENT_NUMERIC_VERSION > 0x02000000
+# include <event2/thread.h>
+#endif
#include <glib.h>
diff --git a/src/buffer.c b/src/buffer.c
index c94d5f91c..95e42f756 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -178,7 +178,7 @@ static gboolean
write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
{
GList *cur;
- GError *err;
+ GError *err = NULL;
rspamd_buffer_t *buf;
ssize_t r;
@@ -264,7 +264,7 @@ static void
read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
{
ssize_t r;
- GError *err;
+ GError *err = NULL;
f_str_t res;
gchar *c, *b;
gchar *end;
@@ -453,7 +453,7 @@ static void
dispatcher_cb (gint fd, short what, void *arg)
{
rspamd_io_dispatcher_t *d = (rspamd_io_dispatcher_t *) arg;
- GError *err;
+ GError *err = NULL;
debug_ip("in dispatcher callback, what: %d, fd: %d", (gint)what, fd);
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index d124e9f4a..48068cdf7 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -37,10 +37,10 @@
#define ERROR_NOT_FOUND "NOT_FOUND" CRLF
#define ERROR_INVALID_KEYSTORAGE "CLIENT_ERROR storage does not exists" CRLF
-/* This is required for normal signals processing */
-static GList *global_evbases = NULL;
-static struct event_base *main_base = NULL;
+
static sig_atomic_t wanna_die = 0;
+static sig_atomic_t do_reopen_log = 0;
+static sig_atomic_t soft_wanna_die = 0;
/* Logging functions */
#define thr_err(...) do { \
@@ -69,64 +69,20 @@ static void
sig_handler (gint signo, siginfo_t *info, void *unused)
#endif
{
- struct timeval tv;
- GList *cur;
-
switch (signo) {
case SIGUSR1:
- reopen_log (rspamd_main->logger);
+ do_reopen_log = 1;
break;
case SIGINT:
case SIGTERM:
- if (!wanna_die) {
- wanna_die = 1;
- tv.tv_sec = 0;
- tv.tv_usec = 0;
- cur = global_evbases;
- while (cur) {
- event_base_loopexit (cur->data, &tv);
- }
- event_base_loopexit (main_base, &tv);
-#ifdef WITH_GPERF_TOOLS
- ProfilerStop ();
-#endif
- }
+ wanna_die = 1;
+ break;
+ case SIGUSR2:
+ soft_wanna_die = 1;
break;
}
}
-/*
- * Config reload is designed by sending sigusr to active workers and pending shutdown of them
- */
-static void
-sigusr_handler (gint fd, short what, void *arg)
-{
- struct rspamd_worker *worker = (struct rspamd_worker *)arg;
- /* Do not accept new connections, preparing to end worker's process */
- struct timeval tv;
- GList *cur;
- struct kvstorage_worker_ctx *ctx;
- struct kvstorage_worker_thread *thr;
-
- ctx = worker->ctx;
- if (! wanna_die) {
- tv.tv_sec = SOFT_SHUTDOWN_TIME;
- tv.tv_usec = 0;
- event_del (&worker->sig_ev);
- msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
- cur = ctx->threads;
- while (cur) {
- thr = cur->data;
- if (thr->ev_base != NULL) {
- event_del (&thr->bind_ev);
- event_base_loopexit (thr->ev_base, &tv);
- }
- }
- event_base_loopexit (ctx->ev_base, &tv);
- }
- return;
-}
-
gpointer
init_kvstorage_worker (void)
{
@@ -164,6 +120,7 @@ free_kvstorage_session (struct kvstorage_session *session)
rspamd_remove_dispatcher (session->dispather);
memory_pool_delete (session->pool);
close (session->sock);
+ g_slice_free1 (sizeof (struct kvstorage_session), session);
}
/**
@@ -460,8 +417,10 @@ kvstorage_err_socket (GError * err, void *arg)
struct kvstorage_worker_thread *thr;
thr = session->thr;
- thr_info ("%ud: abnormally closing connection from: %s, error: %s",
+ if (err->code != -1) {
+ thr_info ("%ud: abnormally closing connection from: %s, error: %s",
thr->id, inet_ntoa (session->client_addr), err->message);
+ }
g_error_free (err);
free_kvstorage_session (session);
}
@@ -478,35 +437,35 @@ thr_accept_socket (gint fd, short what, void *arg)
gint nfd;
struct kvstorage_session *session;
+ g_static_mutex_lock (thr->accept_mtx);
if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno));
+ g_static_mutex_unlock (thr->accept_mtx);
return;
}
/* Check for EAGAIN */
if (nfd == 0) {
+ g_static_mutex_unlock (thr->accept_mtx);
return;
}
- session = g_malloc (sizeof (struct kvstorage_session));
+ session = g_slice_alloc (sizeof (struct kvstorage_session));
session->pool = memory_pool_new (memory_pool_get_size ());
session->state = KVSTORAGE_STATE_READ_CMD;
session->thr = thr;
session->sock = nfd;
- session->dispather = rspamd_create_dispatcher (thr->ctx->ev_base, nfd, BUFFER_LINE,
+ session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE,
kvstorage_read_socket, NULL,
kvstorage_err_socket, thr->tv, session);
- session->dispather->strip_eol = TRUE;
if (su.ss.ss_family == AF_UNIX) {
- thr_info ("%ud: accepted connection from unix socket", thr->id);
session->client_addr.s_addr = INADDR_NONE;
}
else if (su.ss.ss_family == AF_INET) {
- thr_info ("%ud: accepted connection from %s port %d", thr->id,
- inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
memcpy (&session->client_addr, &su.s4.sin_addr,
sizeof (struct in_addr));
}
+ g_static_mutex_unlock (thr->accept_mtx);
}
/**
@@ -542,6 +501,7 @@ create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_c
new->worker = worker;
new->tv = &ctx->io_timeout;
new->log_mtx = &ctx->log_mtx;
+ new->accept_mtx = &ctx->accept_mtx;
new->id = id;
new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err);
new->ev_base = NULL;
@@ -563,6 +523,8 @@ start_kvstorage_worker (struct rspamd_worker *worker)
struct kvstorage_worker_ctx *ctx = worker->ctx;
guint i;
struct kvstorage_worker_thread *thr;
+ struct timeval tv;
+ GList *cur;
gperf_profiler_init (worker->srv->cfg, "kvstorage");
@@ -576,7 +538,6 @@ start_kvstorage_worker (struct rspamd_worker *worker)
exit (EXIT_SUCCESS);
}
worker->srv->pid = getpid ();
- ctx->ev_base = event_init ();
ctx->threads = NULL;
g_thread_init (NULL);
@@ -586,7 +547,6 @@ start_kvstorage_worker (struct rspamd_worker *worker)
exit (EXIT_SUCCESS);
}
#endif
- main_base = ctx->ev_base;
/* Set kvstorage options */
if ( !config_kvstorage_worker (worker)) {
@@ -597,22 +557,60 @@ start_kvstorage_worker (struct rspamd_worker *worker)
init_signals (&signals, sig_handler);
sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
- /* SIGUSR2 handler */
- signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
- event_base_set (ctx->ev_base, &worker->sig_ev);
- signal_add (&worker->sig_ev, NULL);
+ /* Set umask */
+ umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);
/* Start workers threads */
g_static_mutex_init (&ctx->log_mtx);
+ g_static_mutex_init (&ctx->accept_mtx);
for (i = 0; i < worker->cf->count; i ++) {
thr = create_kvstorage_thread (worker, ctx, i);
ctx->threads = g_list_prepend (ctx->threads, thr);
}
- /* Set umask */
- umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);
-
- event_base_loop (ctx->ev_base, 0);
+ sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL);
+ /* Signal processing cycle */
+ for (;;) {
+ msg_debug ("calling sigsuspend");
+ sigemptyset (&signals.sa_mask);
+ sigsuspend (&signals.sa_mask);
+ if (wanna_die == 1) {
+ wanna_die = 0;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ msg_info ("worker's immediately shutdown is requested");
+ cur = ctx->threads;
+ while (cur) {
+ thr = cur->data;
+ if (thr->ev_base != NULL) {
+ event_del (&thr->bind_ev);
+ event_base_loopexit (thr->ev_base, &tv);
+ }
+ cur = g_list_next (cur);
+ }
+ break;
+ }
+ else if (soft_wanna_die == 1) {
+ soft_wanna_die = 0;
+ tv.tv_sec = SOFT_SHUTDOWN_TIME;
+ tv.tv_usec = 0;
+ msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+ cur = ctx->threads;
+ while (cur) {
+ thr = cur->data;
+ if (thr->ev_base != NULL) {
+ event_del (&thr->bind_ev);
+ event_base_loopexit (thr->ev_base, &tv);
+ }
+ cur = g_list_next (cur);
+ }
+ break;
+ }
+ else if (do_reopen_log == 1) {
+ do_reopen_log = 0;
+ reopen_log (rspamd_main->logger);
+ }
+ }
close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);
diff --git a/src/kvstorage_server.h b/src/kvstorage_server.h
index 6f22b08c2..827b34c9c 100644
--- a/src/kvstorage_server.h
+++ b/src/kvstorage_server.h
@@ -38,6 +38,7 @@ struct kvstorage_worker_ctx {
memory_pool_t *pool;
struct event_base *ev_base;
GStaticMutex log_mtx;
+ GStaticMutex accept_mtx;
};
struct kvstorage_worker_thread {
@@ -48,6 +49,7 @@ struct kvstorage_worker_thread {
GThread *thr;
struct event_base *ev_base;
GStaticMutex *log_mtx;
+ GStaticMutex *accept_mtx;
guint id;
};
diff --git a/src/util.c b/src/util.c
index 423ceb969..570427ef7 100644
--- a/src/util.c
+++ b/src/util.c
@@ -1239,6 +1239,8 @@ process_to_str (enum process_type type)
return "lmtp";
case TYPE_SMTP:
return "smtp";
+ case TYPE_KVSTORAGE:
+ return "keystorage";
default:
return "unknown";
}