diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-01-31 20:59:10 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-01-31 20:59:10 +0400 |
commit | 9023eb3f1e85e456ee8804b7867c9e17ada5aa6e (patch) | |
tree | a3861a28d1cfd8cd47c6168ff42bf3630c31411f /src/kvstorage_server.c | |
parent | a874d5eb9fe0a2e1ddf1a0f48e6df41845be087f (diff) | |
download | rspamd-9023eb3f1e85e456ee8804b7867c9e17ada5aa6e.tar.gz rspamd-9023eb3f1e85e456ee8804b7867c9e17ada5aa6e.zip |
Adopt rspamd for the next glib release.
Fix several issues in threads handling inside keystorage.
Fix sigsuspend usage in keystorage.
Diffstat (limited to 'src/kvstorage_server.c')
-rw-r--r-- | src/kvstorage_server.c | 127 |
1 files changed, 96 insertions, 31 deletions
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index 059a0e6e8..96b1d6f15 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -46,21 +46,21 @@ static sig_atomic_t soft_wanna_die = 0; /* Logging functions */ #define thr_err(...) do { \ - g_static_mutex_lock (thr->log_mtx); \ + g_mutex_lock (thr->log_mtx); \ rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_CRITICAL, __FUNCTION__, __VA_ARGS__); \ - g_static_mutex_unlock (thr->log_mtx); \ + g_mutex_unlock (thr->log_mtx); \ } while (0) #define thr_warn(...) do { \ - g_static_mutex_lock (thr->log_mtx); \ + g_mutex_lock (thr->log_mtx); \ rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_WARNING, __FUNCTION__, __VA_ARGS__); \ - g_static_mutex_unlock (thr->log_mtx); \ + g_mutex_unlock (thr->log_mtx); \ } while (0) #define thr_info(...) do { \ - g_static_mutex_lock (thr->log_mtx); \ + g_mutex_lock (thr->log_mtx); \ rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_INFO, __FUNCTION__, __VA_ARGS__); \ - g_static_mutex_unlock (thr->log_mtx); \ + g_mutex_unlock (thr->log_mtx); \ } while (0) /* Init functions */ @@ -491,7 +491,7 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) else if (session->command == KVSTORAGE_CMD_GET) { elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now); if (elt == NULL) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); @@ -520,18 +520,18 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } if (!rspamd_dispatcher_write (session->dispather, outbuf, r, TRUE, FALSE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); return FALSE; } if (elt->flags & KV_ELT_INTEGER) { if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); return FALSE; } } else { if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); return FALSE; } } @@ -545,7 +545,7 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) sizeof (CRLF) - 1, FALSE, TRUE); } if (!res) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); } return res; @@ -942,7 +942,7 @@ kvstorage_write_socket (void *arg) if ((session->elt->flags & KV_ELT_NEED_INSERT) != 0) { /* Insert to cache and free element */ session->elt->flags &= ~KV_ELT_NEED_INSERT; - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); rspamd_kv_storage_insert_cache (session->cf->storage, ELT_KEY (session->elt), session->elt->keylen, ELT_DATA (session->elt), session->elt->size, session->elt->flags, @@ -951,7 +951,7 @@ kvstorage_write_socket (void *arg) session->elt = NULL; return TRUE; } - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); session->elt = NULL; } @@ -975,7 +975,7 @@ kvstorage_err_socket (GError * err, void *arg) } if (session->elt) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); session->elt = NULL; } @@ -995,16 +995,16 @@ thr_accept_socket (gint fd, short what, void *arg) gint nfd; struct kvstorage_session *session; - g_static_mutex_lock (thr->accept_mtx); + g_mutex_lock (thr->accept_mtx); if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno)); - g_static_mutex_unlock (thr->accept_mtx); + g_mutex_unlock (thr->accept_mtx); return; } /* Check for EAGAIN */ if (nfd == 0) { - g_static_mutex_unlock (thr->accept_mtx); + g_mutex_unlock (thr->accept_mtx); return; } @@ -1017,7 +1017,7 @@ thr_accept_socket (gint fd, short what, void *arg) kvstorage_read_socket, kvstorage_write_socket, kvstorage_err_socket, thr->tv, session); - g_static_mutex_unlock (thr->accept_mtx); + g_mutex_unlock (thr->accept_mtx); session->elt = NULL; if (su.ss.ss_family == AF_UNIX) { @@ -1030,6 +1030,25 @@ thr_accept_socket (gint fd, short what, void *arg) } /** + * Handle termination + */ +static void +thr_term_socket (gint fd, short what, void *arg) +{ + struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg; + struct timeval tv; + + if (read (fd, &tv, sizeof (struct timeval)) != sizeof (struct timeval)) { + thr_err ("cannot read data from socket: %s", strerror (errno)); + tv.tv_sec = 0; + tv.tv_usec = 0; + } + + event_base_loopexit (thr->ev_base, &tv); + event_del (&thr->bind_ev); +} + +/** * Thread main worker function */ static gpointer @@ -1046,6 +1065,10 @@ kvstorage_thread (gpointer ud) event_base_set (thr->ev_base, &thr->bind_ev); event_add (&thr->bind_ev, NULL); + event_set (&thr->term_ev, thr->term_sock[0], EV_READ | EV_PERSIST, thr_term_socket, (void *)thr); + event_base_set (thr->ev_base, &thr->term_ev); + event_add (&thr->term_ev, NULL); + event_base_loop (thr->ev_base, 0); return NULL; @@ -1064,10 +1087,27 @@ create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_c new->ctx = ctx; new->worker = worker; new->tv = &ctx->io_timeout; - new->log_mtx = &ctx->log_mtx; - new->accept_mtx = &ctx->accept_mtx; + new->log_mtx = ctx->log_mtx; + new->accept_mtx = ctx->accept_mtx; new->id = id; + + /* Create and setup terminating socket */ + if (make_socketpair (new->term_sock) == -1) { + msg_err ("socket failed: %s", strerror (errno)); + return NULL; + } + make_socket_nonblocking (new->term_sock[0]); + +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err); +#else + gchar *name; + + name = memory_pool_alloc (ctx->pool, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1); + rspamd_snprintf (name, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1, "kvstorage_thread%d", id); + + new->thr = g_thread_new (name, kvstorage_thread, new); +#endif new->ev_base = NULL; new->signals = signals; @@ -1104,8 +1144,10 @@ start_keystorage (struct rspamd_worker *worker) } worker->srv->pid = getpid (); ctx->threads = NULL; - +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) g_thread_init (NULL); +#endif + #if _EVENT_NUMERIC_VERSION > 0x02000000 if (evthread_use_pthreads () == -1) { msg_err ("threads support is not supported in your libevent so kvstorage is not functionable"); @@ -1120,17 +1162,27 @@ start_keystorage (struct rspamd_worker *worker) } init_signals (&signals, sig_handler); - sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); /* Set umask */ umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); + /* Init mutexes */ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + ctx->log_mtx = g_mutex_new (); + ctx->accept_mtx = g_mutex_new (); +#else + ctx->log_mtx = memory_pool_alloc (ctx->pool, sizeof (GMutex)); + ctx->accept_mtx = memory_pool_alloc (ctx->pool, sizeof (GMutex)); + g_mutex_init (ctx->log_mtx); + g_mutex_init (ctx->accept_mtx); +#endif + /* Start workers threads */ - g_static_mutex_init (&ctx->log_mtx); - g_static_mutex_init (&ctx->accept_mtx); for (i = 0; i < worker->cf->count; i ++) { thr = create_kvstorage_thread (worker, ctx, i, &signals.sa_mask); - ctx->threads = g_list_prepend (ctx->threads, thr); + if (thr != NULL) { + ctx->threads = g_list_prepend (ctx->threads, thr); + } } sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL); @@ -1147,9 +1199,11 @@ start_keystorage (struct rspamd_worker *worker) cur = ctx->threads; while (cur) { thr = cur->data; - if (thr->ev_base != NULL) { - event_del (&thr->bind_ev); - event_base_loopexit (thr->ev_base, &tv); + while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) { + if (errno != EAGAIN) { + msg_err ("write to term socket failed: %s", strerror (errno)); + abort (); + } } cur = g_list_next (cur); } @@ -1163,9 +1217,11 @@ start_keystorage (struct rspamd_worker *worker) cur = ctx->threads; while (cur) { thr = cur->data; - if (thr->ev_base != NULL) { - event_del (&thr->bind_ev); - event_base_loopexit (thr->ev_base, &tv); + while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) { + if (errno != EAGAIN) { + msg_err ("write to term socket failed: %s", strerror (errno)); + abort (); + } } cur = g_list_next (cur); } @@ -1178,6 +1234,15 @@ start_keystorage (struct rspamd_worker *worker) } msg_info ("syncing storages"); + /* Wait for threads in the recent glib */ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) + cur = ctx->threads; + while (cur) { + thr = cur->data; + (void)g_thread_join (thr->thr); + cur = g_list_next (cur); + } +#endif destroy_kvstorage_config (); close_log (rspamd_main->logger); exit (EXIT_SUCCESS); |