]> source.dussan.org Git - rspamd.git/commitdiff
* Fix threading in kvstorage.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 31 Oct 2011 17:02:55 +0000 (20:02 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 31 Oct 2011 17:02:55 +0000 (20:02 +0300)
Rspamd now can detect and work with libevent-2.

CMakeLists.txt
config.h.in
src/buffer.c
src/kvstorage_server.c
src/kvstorage_server.h
src/util.c

index 0d9fc18b6945a0a526767e1137b3fac102c6747f..c620bfeaadd11334d46cb51cbf85396e2deb155f 100644 (file)
@@ -672,6 +672,13 @@ ELSE(HAVE_SIGINFO_H)
        CHECK_SYMBOL_EXISTS(SA_SIGINFO "signal.h" HAVE_SA_SIGINFO)
 ENDIF(HAVE_SIGINFO_H)
 
+# Some hack for libevent 2.0
+CHECK_C_SOURCE_COMPILES ("#include <event.h>
+                                       #if _EVENT_NUMERIC_VERSION < 0x02000000
+                                       #error Unsupported
+                                       #endif
+                                       int main() { return 0;}" HAVE_LIBEVENT2)
+
 IF(NOT CMAKE_SYSTEM_NAME STREQUAL "SunOS")
 IF(HAVE_CLOCK_GETTIME)
        CHECK_SYMBOL_EXISTS(CLOCK_PROCESS_CPUTIME_ID time.h HAVE_CLOCK_PROCESS_CPUTIME_ID)
@@ -810,6 +817,9 @@ IF(LIBDB_LIBRARY)
        TARGET_LINK_LIBRARIES(rspamd db-4)
 ENDIF(LIBDB_LIBRARY)
 TARGET_LINK_LIBRARIES(rspamd event)
+IF(HAVE_LIBEVENT2)
+       TARGET_LINK_LIBRARIES(rspamd event_pthreads)
+ENDIF(HAVE_LIBEVENT2)
 TARGET_LINK_LIBRARIES(rspamd pcre)
 
 TARGET_LINK_LIBRARIES(rspamd ${CMAKE_REQUIRED_LIBRARIES})
index 950bf2ef4f67ebe69958e60337a939cc228caee0..baa4beea8fbefbe3757ca2096d07c1a4dacc1214 100644 (file)
 # include <siginfo.h>
 #endif
 #include <event.h>
+#if _EVENT_NUMERIC_VERSION > 0x02000000
+# include <event2/thread.h>
+#endif
 #include <glib.h>
 
 
index c94d5f91c305cdc45da9475173742ea1c251e1d7..95e42f756ca570e3949a4e0ab4c751da6a31a82d 100644 (file)
@@ -178,7 +178,7 @@ static                          gboolean
 write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed)
 {
        GList                          *cur;
-       GError                         *err;
+       GError                         *err = NULL;
        rspamd_buffer_t                *buf;
        ssize_t                         r;
 
@@ -264,7 +264,7 @@ static void
 read_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean skip_read)
 {
        ssize_t                         r;
-       GError                         *err;
+       GError                         *err = NULL;
        f_str_t                         res;
        gchar                           *c, *b;
        gchar                           *end;
@@ -453,7 +453,7 @@ static void
 dispatcher_cb (gint fd, short what, void *arg)
 {
        rspamd_io_dispatcher_t         *d = (rspamd_io_dispatcher_t *) arg;
-       GError                         *err;
+       GError                         *err = NULL;
 
        debug_ip("in dispatcher callback, what: %d, fd: %d", (gint)what, fd);
 
index d124e9f4a6f5c2ed76b2d54ecdfe45ce77a4a27e..48068cdf7efc0ad50a383ff426d619a5eed9b0b8 100644 (file)
 #define ERROR_NOT_FOUND "NOT_FOUND" CRLF
 #define ERROR_INVALID_KEYSTORAGE "CLIENT_ERROR storage does not exists" CRLF
 
-/* This is required for normal signals processing */
-static GList *global_evbases = NULL;
-static struct event_base *main_base = NULL;
+
 static sig_atomic_t wanna_die = 0;
+static sig_atomic_t do_reopen_log = 0;
+static sig_atomic_t soft_wanna_die = 0;
 
 /* Logging functions */
 #define thr_err(...)   do {                                                                                                                                                    \
@@ -69,64 +69,20 @@ static void
 sig_handler (gint signo, siginfo_t *info, void *unused)
 #endif
 {
-       struct timeval                  tv;
-       GList                          *cur;
-
        switch (signo) {
        case SIGUSR1:
-               reopen_log (rspamd_main->logger);
+               do_reopen_log = 1;
                break;
        case SIGINT:
        case SIGTERM:
-               if (!wanna_die) {
-                       wanna_die = 1;
-                       tv.tv_sec = 0;
-                       tv.tv_usec = 0;
-                       cur = global_evbases;
-                       while (cur) {
-                               event_base_loopexit (cur->data, &tv);
-                       }
-                       event_base_loopexit (main_base, &tv);
-#ifdef WITH_GPERF_TOOLS
-                       ProfilerStop ();
-#endif
-               }
+               wanna_die = 1;
+               break;
+       case SIGUSR2:
+               soft_wanna_die = 1;
                break;
        }
 }
 
-/*
- * Config reload is designed by sending sigusr to active workers and pending shutdown of them
- */
-static void
-sigusr_handler (gint fd, short what, void *arg)
-{
-       struct rspamd_worker           *worker = (struct rspamd_worker *)arg;
-       /* Do not accept new connections, preparing to end worker's process */
-       struct timeval                  tv;
-       GList                          *cur;
-       struct kvstorage_worker_ctx    *ctx;
-       struct kvstorage_worker_thread *thr;
-
-       ctx = worker->ctx;
-       if (! wanna_die) {
-               tv.tv_sec = SOFT_SHUTDOWN_TIME;
-               tv.tv_usec = 0;
-               event_del (&worker->sig_ev);
-               msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
-               cur = ctx->threads;
-               while (cur) {
-                       thr = cur->data;
-                       if (thr->ev_base != NULL) {
-                               event_del (&thr->bind_ev);
-                               event_base_loopexit (thr->ev_base, &tv);
-                       }
-               }
-               event_base_loopexit (ctx->ev_base, &tv);
-       }
-       return;
-}
-
 gpointer
 init_kvstorage_worker (void)
 {
@@ -164,6 +120,7 @@ free_kvstorage_session (struct kvstorage_session *session)
        rspamd_remove_dispatcher (session->dispather);
        memory_pool_delete (session->pool);
        close (session->sock);
+       g_slice_free1 (sizeof (struct kvstorage_session), session);
 }
 
 /**
@@ -460,8 +417,10 @@ kvstorage_err_socket (GError * err, void *arg)
        struct kvstorage_worker_thread          *thr;
 
        thr = session->thr;
-       thr_info ("%ud: abnormally closing connection from: %s, error: %s",
+       if (err->code != -1) {
+               thr_info ("%ud: abnormally closing connection from: %s, error: %s",
                        thr->id, inet_ntoa (session->client_addr), err->message);
+       }
        g_error_free (err);
        free_kvstorage_session (session);
 }
@@ -478,35 +437,35 @@ thr_accept_socket (gint fd, short what, void *arg)
        gint                                     nfd;
        struct kvstorage_session                        *session;
 
+       g_static_mutex_lock (thr->accept_mtx);
        if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
                thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno));
+               g_static_mutex_unlock (thr->accept_mtx);
                return;
        }
        /* Check for EAGAIN */
        if (nfd == 0) {
+               g_static_mutex_unlock (thr->accept_mtx);
                return;
        }
 
-       session = g_malloc (sizeof (struct kvstorage_session));
+       session = g_slice_alloc (sizeof (struct kvstorage_session));
        session->pool = memory_pool_new (memory_pool_get_size ());
        session->state = KVSTORAGE_STATE_READ_CMD;
        session->thr = thr;
        session->sock = nfd;
-       session->dispather = rspamd_create_dispatcher (thr->ctx->ev_base, nfd, BUFFER_LINE,
+       session->dispather = rspamd_create_dispatcher (thr->ev_base, nfd, BUFFER_LINE,
                        kvstorage_read_socket, NULL,
                        kvstorage_err_socket, thr->tv, session);
-       session->dispather->strip_eol = TRUE;
 
        if (su.ss.ss_family == AF_UNIX) {
-               thr_info ("%ud: accepted connection from unix socket", thr->id);
                session->client_addr.s_addr = INADDR_NONE;
        }
        else if (su.ss.ss_family == AF_INET) {
-               thr_info ("%ud: accepted connection from %s port %d", thr->id,
-                               inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
                memcpy (&session->client_addr, &su.s4.sin_addr,
                                                sizeof (struct in_addr));
        }
+       g_static_mutex_unlock (thr->accept_mtx);
 }
 
 /**
@@ -542,6 +501,7 @@ create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_c
        new->worker = worker;
        new->tv = &ctx->io_timeout;
        new->log_mtx = &ctx->log_mtx;
+       new->accept_mtx = &ctx->accept_mtx;
        new->id = id;
        new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err);
        new->ev_base = NULL;
@@ -563,6 +523,8 @@ start_kvstorage_worker (struct rspamd_worker *worker)
        struct kvstorage_worker_ctx         *ctx = worker->ctx;
        guint                                                            i;
        struct kvstorage_worker_thread          *thr;
+       struct timeval                                           tv;
+       GList                                                           *cur;
 
        gperf_profiler_init (worker->srv->cfg, "kvstorage");
 
@@ -576,7 +538,6 @@ start_kvstorage_worker (struct rspamd_worker *worker)
                exit (EXIT_SUCCESS);
        }
        worker->srv->pid = getpid ();
-       ctx->ev_base = event_init ();
        ctx->threads = NULL;
 
        g_thread_init (NULL);
@@ -586,7 +547,6 @@ start_kvstorage_worker (struct rspamd_worker *worker)
                exit (EXIT_SUCCESS);
        }
 #endif
-       main_base = ctx->ev_base;
 
        /* Set kvstorage options */
        if ( !config_kvstorage_worker (worker)) {
@@ -597,22 +557,60 @@ start_kvstorage_worker (struct rspamd_worker *worker)
        init_signals (&signals, sig_handler);
        sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
 
-       /* SIGUSR2 handler */
-       signal_set (&worker->sig_ev, SIGUSR2, sigusr_handler, (void *)worker);
-       event_base_set (ctx->ev_base, &worker->sig_ev);
-       signal_add (&worker->sig_ev, NULL);
+       /* Set umask */
+       umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);
 
        /* Start workers threads */
        g_static_mutex_init (&ctx->log_mtx);
+       g_static_mutex_init (&ctx->accept_mtx);
        for (i = 0; i < worker->cf->count; i ++) {
                thr = create_kvstorage_thread (worker, ctx, i);
                ctx->threads = g_list_prepend (ctx->threads, thr);
        }
 
-       /* Set umask */
-       umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);
-
-       event_base_loop (ctx->ev_base, 0);
+       sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL);
+       /* Signal processing cycle */
+       for (;;) {
+               msg_debug ("calling sigsuspend");
+               sigemptyset (&signals.sa_mask);
+               sigsuspend (&signals.sa_mask);
+               if (wanna_die == 1) {
+                       wanna_die = 0;
+                       tv.tv_sec = 0;
+                       tv.tv_usec = 0;
+                       msg_info ("worker's immediately shutdown is requested");
+                       cur = ctx->threads;
+                       while (cur) {
+                               thr = cur->data;
+                               if (thr->ev_base != NULL) {
+                                       event_del (&thr->bind_ev);
+                                       event_base_loopexit (thr->ev_base, &tv);
+                               }
+                               cur = g_list_next (cur);
+                       }
+                       break;
+               }
+               else if (soft_wanna_die == 1) {
+                       soft_wanna_die = 0;
+                       tv.tv_sec = SOFT_SHUTDOWN_TIME;
+                       tv.tv_usec = 0;
+                       msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
+                       cur = ctx->threads;
+                       while (cur) {
+                               thr = cur->data;
+                               if (thr->ev_base != NULL) {
+                                       event_del (&thr->bind_ev);
+                                       event_base_loopexit (thr->ev_base, &tv);
+                               }
+                               cur = g_list_next (cur);
+                       }
+                       break;
+               }
+               else if (do_reopen_log == 1) {
+                       do_reopen_log = 0;
+                       reopen_log (rspamd_main->logger);
+               }
+       }
 
        close_log (rspamd_main->logger);
        exit (EXIT_SUCCESS);
index 6f22b08c2be6bf247f767123b65e987cf408e4c7..827b34c9ca6824f02b40a451f35438b86e402e78 100644 (file)
@@ -38,6 +38,7 @@ struct kvstorage_worker_ctx {
        memory_pool_t *pool;
        struct event_base *ev_base;
        GStaticMutex log_mtx;
+       GStaticMutex accept_mtx;
 };
 
 struct kvstorage_worker_thread {
@@ -48,6 +49,7 @@ struct kvstorage_worker_thread {
        GThread *thr;
        struct event_base *ev_base;
        GStaticMutex *log_mtx;
+       GStaticMutex *accept_mtx;
        guint id;
 };
 
index 423ceb96917ae2d853df907cd15418c237d5a69a..570427ef7466e056fa1c4076cd78df169cd857a8 100644 (file)
@@ -1239,6 +1239,8 @@ process_to_str (enum process_type type)
                return "lmtp";
        case TYPE_SMTP:
                return "smtp";
+       case TYPE_KVSTORAGE:
+               return "keystorage";
        default:
                return "unknown";
        }