summaryrefslogtreecommitdiffstats
path: root/src/kvstorage_server.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-01-31 20:59:10 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-01-31 20:59:10 +0400
commit9023eb3f1e85e456ee8804b7867c9e17ada5aa6e (patch)
treea3861a28d1cfd8cd47c6168ff42bf3630c31411f /src/kvstorage_server.c
parenta874d5eb9fe0a2e1ddf1a0f48e6df41845be087f (diff)
downloadrspamd-9023eb3f1e85e456ee8804b7867c9e17ada5aa6e.tar.gz
rspamd-9023eb3f1e85e456ee8804b7867c9e17ada5aa6e.zip
Adopt rspamd for the next glib release.
Fix several issues in threads handling inside keystorage. Fix sigsuspend usage in keystorage.
Diffstat (limited to 'src/kvstorage_server.c')
-rw-r--r--src/kvstorage_server.c127
1 files changed, 96 insertions, 31 deletions
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index 059a0e6e8..96b1d6f15 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -46,21 +46,21 @@ static sig_atomic_t soft_wanna_die = 0;
/* Logging functions */
#define thr_err(...) do { \
- g_static_mutex_lock (thr->log_mtx); \
+ g_mutex_lock (thr->log_mtx); \
rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_CRITICAL, __FUNCTION__, __VA_ARGS__); \
- g_static_mutex_unlock (thr->log_mtx); \
+ g_mutex_unlock (thr->log_mtx); \
} while (0)
#define thr_warn(...) do { \
- g_static_mutex_lock (thr->log_mtx); \
+ g_mutex_lock (thr->log_mtx); \
rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_WARNING, __FUNCTION__, __VA_ARGS__); \
- g_static_mutex_unlock (thr->log_mtx); \
+ g_mutex_unlock (thr->log_mtx); \
} while (0)
#define thr_info(...) do { \
- g_static_mutex_lock (thr->log_mtx); \
+ g_mutex_lock (thr->log_mtx); \
rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_INFO, __FUNCTION__, __VA_ARGS__); \
- g_static_mutex_unlock (thr->log_mtx); \
+ g_mutex_unlock (thr->log_mtx); \
} while (0)
/* Init functions */
@@ -491,7 +491,7 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
else if (session->command == KVSTORAGE_CMD_GET) {
elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now);
if (elt == NULL) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ RW_R_UNLOCK (&session->cf->storage->rwlock);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
@@ -520,18 +520,18 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
if (!rspamd_dispatcher_write (session->dispather, outbuf,
r, TRUE, FALSE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ RW_R_UNLOCK (&session->cf->storage->rwlock);
return FALSE;
}
if (elt->flags & KV_ELT_INTEGER) {
if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ RW_R_UNLOCK (&session->cf->storage->rwlock);
return FALSE;
}
}
else {
if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ RW_R_UNLOCK (&session->cf->storage->rwlock);
return FALSE;
}
}
@@ -545,7 +545,7 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
sizeof (CRLF) - 1, FALSE, TRUE);
}
if (!res) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ RW_R_UNLOCK (&session->cf->storage->rwlock);
}
return res;
@@ -942,7 +942,7 @@ kvstorage_write_socket (void *arg)
if ((session->elt->flags & KV_ELT_NEED_INSERT) != 0) {
/* Insert to cache and free element */
session->elt->flags &= ~KV_ELT_NEED_INSERT;
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ RW_R_UNLOCK (&session->cf->storage->rwlock);
rspamd_kv_storage_insert_cache (session->cf->storage, ELT_KEY (session->elt),
session->elt->keylen, ELT_DATA (session->elt),
session->elt->size, session->elt->flags,
@@ -951,7 +951,7 @@ kvstorage_write_socket (void *arg)
session->elt = NULL;
return TRUE;
}
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ RW_R_UNLOCK (&session->cf->storage->rwlock);
session->elt = NULL;
}
@@ -975,7 +975,7 @@ kvstorage_err_socket (GError * err, void *arg)
}
if (session->elt) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ RW_R_UNLOCK (&session->cf->storage->rwlock);
session->elt = NULL;
}
@@ -995,16 +995,16 @@ thr_accept_socket (gint fd, short what, void *arg)
gint nfd;
struct kvstorage_session *session;
- g_static_mutex_lock (thr->accept_mtx);
+ g_mutex_lock (thr->accept_mtx);
if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno));
- g_static_mutex_unlock (thr->accept_mtx);
+ g_mutex_unlock (thr->accept_mtx);
return;
}
/* Check for EAGAIN */
if (nfd == 0) {
- g_static_mutex_unlock (thr->accept_mtx);
+ g_mutex_unlock (thr->accept_mtx);
return;
}
@@ -1017,7 +1017,7 @@ thr_accept_socket (gint fd, short what, void *arg)
kvstorage_read_socket, kvstorage_write_socket,
kvstorage_err_socket, thr->tv, session);
- g_static_mutex_unlock (thr->accept_mtx);
+ g_mutex_unlock (thr->accept_mtx);
session->elt = NULL;
if (su.ss.ss_family == AF_UNIX) {
@@ -1030,6 +1030,25 @@ thr_accept_socket (gint fd, short what, void *arg)
}
/**
+ * Handle termination
+ */
+static void
+thr_term_socket (gint fd, short what, void *arg)
+{
+ struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg;
+ struct timeval tv;
+
+ if (read (fd, &tv, sizeof (struct timeval)) != sizeof (struct timeval)) {
+ thr_err ("cannot read data from socket: %s", strerror (errno));
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ }
+
+ event_base_loopexit (thr->ev_base, &tv);
+ event_del (&thr->bind_ev);
+}
+
+/**
* Thread main worker function
*/
static gpointer
@@ -1046,6 +1065,10 @@ kvstorage_thread (gpointer ud)
event_base_set (thr->ev_base, &thr->bind_ev);
event_add (&thr->bind_ev, NULL);
+ event_set (&thr->term_ev, thr->term_sock[0], EV_READ | EV_PERSIST, thr_term_socket, (void *)thr);
+ event_base_set (thr->ev_base, &thr->term_ev);
+ event_add (&thr->term_ev, NULL);
+
event_base_loop (thr->ev_base, 0);
return NULL;
@@ -1064,10 +1087,27 @@ create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_c
new->ctx = ctx;
new->worker = worker;
new->tv = &ctx->io_timeout;
- new->log_mtx = &ctx->log_mtx;
- new->accept_mtx = &ctx->accept_mtx;
+ new->log_mtx = ctx->log_mtx;
+ new->accept_mtx = ctx->accept_mtx;
new->id = id;
+
+ /* Create and setup terminating socket */
+ if (make_socketpair (new->term_sock) == -1) {
+ msg_err ("socket failed: %s", strerror (errno));
+ return NULL;
+ }
+ make_socket_nonblocking (new->term_sock[0]);
+
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err);
+#else
+ gchar *name;
+
+ name = memory_pool_alloc (ctx->pool, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1);
+ rspamd_snprintf (name, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1, "kvstorage_thread%d", id);
+
+ new->thr = g_thread_new (name, kvstorage_thread, new);
+#endif
new->ev_base = NULL;
new->signals = signals;
@@ -1104,8 +1144,10 @@ start_keystorage (struct rspamd_worker *worker)
}
worker->srv->pid = getpid ();
ctx->threads = NULL;
-
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
g_thread_init (NULL);
+#endif
+
#if _EVENT_NUMERIC_VERSION > 0x02000000
if (evthread_use_pthreads () == -1) {
msg_err ("threads support is not supported in your libevent so kvstorage is not functionable");
@@ -1120,17 +1162,27 @@ start_keystorage (struct rspamd_worker *worker)
}
init_signals (&signals, sig_handler);
- sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
/* Set umask */
umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);
+ /* Init mutexes */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ ctx->log_mtx = g_mutex_new ();
+ ctx->accept_mtx = g_mutex_new ();
+#else
+ ctx->log_mtx = memory_pool_alloc (ctx->pool, sizeof (GMutex));
+ ctx->accept_mtx = memory_pool_alloc (ctx->pool, sizeof (GMutex));
+ g_mutex_init (ctx->log_mtx);
+ g_mutex_init (ctx->accept_mtx);
+#endif
+
/* Start workers threads */
- g_static_mutex_init (&ctx->log_mtx);
- g_static_mutex_init (&ctx->accept_mtx);
for (i = 0; i < worker->cf->count; i ++) {
thr = create_kvstorage_thread (worker, ctx, i, &signals.sa_mask);
- ctx->threads = g_list_prepend (ctx->threads, thr);
+ if (thr != NULL) {
+ ctx->threads = g_list_prepend (ctx->threads, thr);
+ }
}
sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL);
@@ -1147,9 +1199,11 @@ start_keystorage (struct rspamd_worker *worker)
cur = ctx->threads;
while (cur) {
thr = cur->data;
- if (thr->ev_base != NULL) {
- event_del (&thr->bind_ev);
- event_base_loopexit (thr->ev_base, &tv);
+ while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) {
+ if (errno != EAGAIN) {
+ msg_err ("write to term socket failed: %s", strerror (errno));
+ abort ();
+ }
}
cur = g_list_next (cur);
}
@@ -1163,9 +1217,11 @@ start_keystorage (struct rspamd_worker *worker)
cur = ctx->threads;
while (cur) {
thr = cur->data;
- if (thr->ev_base != NULL) {
- event_del (&thr->bind_ev);
- event_base_loopexit (thr->ev_base, &tv);
+ while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) {
+ if (errno != EAGAIN) {
+ msg_err ("write to term socket failed: %s", strerror (errno));
+ abort ();
+ }
}
cur = g_list_next (cur);
}
@@ -1178,6 +1234,15 @@ start_keystorage (struct rspamd_worker *worker)
}
msg_info ("syncing storages");
+ /* Wait for threads in the recent glib */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
+ cur = ctx->threads;
+ while (cur) {
+ thr = cur->data;
+ (void)g_thread_join (thr->thr);
+ cur = g_list_next (cur);
+ }
+#endif
destroy_kvstorage_config ();
close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);