]> source.dussan.org Git - rspamd.git/commitdiff
Adopt rspamd for the next glib release.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 31 Jan 2012 16:59:10 +0000 (20:59 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 31 Jan 2012 16:59:10 +0000 (20:59 +0400)
Fix several issues in threads handling inside keystorage.
Fix sigsuspend usage in keystorage.

contrib/hiredis/hiredis.c
src/kvstorage.c
src/kvstorage.h
src/kvstorage_server.c
src/kvstorage_server.h
src/message.c
src/plugins/regexp.c
src/plugins/surbl.c

index 9ea998ace2737875418c648ae17314a37c5a3cf0..1426ac723d7af62b186853157c16174e8580e621 100644 (file)
@@ -796,7 +796,7 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) {
                     }
 
                     /* Consume and discard vararg */
-                    va_arg(ap,void);
+                    (void)va_arg(ap,int);
                 }
             }
 
index 76def504e9f1a77011d09d6cc4f51b117662f0ca..f02f6a568b4bfb643e26ea32a903521a2bed5f4c 100644 (file)
@@ -61,8 +61,12 @@ rspamd_kv_storage_new (gint id, const gchar *name, struct rspamd_kv_cache *cache
                new->name = g_malloc (sizeof ("18446744073709551616"));
                rspamd_snprintf (new->name, sizeof ("18446744073709551616"), "%d", id);
        }
-
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
+       g_rw_lock_init (&new->rwlock);
+#else
        g_static_rw_lock_init (&new->rwlock);
+#endif
+
 
        /* Init structures */
        if (new->cache->init_func) {
@@ -86,13 +90,13 @@ rspamd_kv_storage_insert_cache (struct rspamd_kv_storage *storage, gpointer key,
        gint                                                            steps = 0;
        struct rspamd_kv_element                        *elt;
 
-       g_static_rw_lock_writer_lock (&storage->rwlock);
+       RW_W_LOCK (&storage->rwlock);
        /* Hard limit */
        if (storage->max_memory > 0) {
                if (len > storage->max_memory) {
                        msg_info ("<%s>: trying to insert value of length %z while limit is %z", storage->name,
                                        len, storage->max_memory);
-                       g_static_rw_lock_writer_unlock (&storage->rwlock);
+                       RW_W_UNLOCK (&storage->rwlock);
                        return FALSE;
                }
 
@@ -105,7 +109,7 @@ rspamd_kv_storage_insert_cache (struct rspamd_kv_storage *storage, gpointer key,
                                msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
                        }
                        if (++steps > MAX_EXPIRE_STEPS) {
-                               g_static_rw_lock_writer_unlock (&storage->rwlock);
+                               RW_W_UNLOCK (&storage->rwlock);
                                msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
                                return FALSE;
                        }
@@ -132,7 +136,7 @@ rspamd_kv_storage_insert_cache (struct rspamd_kv_storage *storage, gpointer key,
 
        storage->elts ++;
        storage->memory += ELT_SIZE (elt);
-       g_static_rw_lock_writer_unlock (&storage->rwlock);
+       RW_W_UNLOCK (&storage->rwlock);
 
        return TRUE;
 }
@@ -148,12 +152,12 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
        glong                                                           longval;
 
        /* Hard limit */
-       g_static_rw_lock_writer_lock (&storage->rwlock);
+       RW_W_LOCK (&storage->rwlock);
        if (storage->max_memory > 0) {
                if (len + sizeof (struct rspamd_kv_element) + keylen >= storage->max_memory) {
                        msg_warn ("<%s>: trying to insert value of length %z while limit is %z", storage->name,
                                        len, storage->max_memory);
-                       g_static_rw_lock_writer_unlock (&storage->rwlock);
+                       RW_W_UNLOCK (&storage->rwlock);
                        return FALSE;
                }
 
@@ -166,7 +170,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
                                msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
                        }
                        if (++steps > MAX_EXPIRE_STEPS) {
-                               g_static_rw_lock_writer_unlock (&storage->rwlock);
+                               RW_W_UNLOCK (&storage->rwlock);
                                msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
                                return FALSE;
                        }
@@ -183,7 +187,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
                                msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
                        }
                        if (++steps > MAX_EXPIRE_STEPS) {
-                               g_static_rw_lock_writer_unlock (&storage->rwlock);
+                               RW_W_UNLOCK (&storage->rwlock);
                                msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
                                return FALSE;
                        }
@@ -213,11 +217,11 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
                        /* Just do incref and nothing more */
                        if (storage->backend && storage->backend->incref_func) {
                                if (storage->backend->incref_func (storage->backend, key, keylen)) {
-                                       g_static_rw_lock_writer_unlock (&storage->rwlock);
+                                       RW_W_UNLOCK (&storage->rwlock);
                                        return TRUE;
                                }
                                else {
-                                       g_static_rw_lock_writer_unlock (&storage->rwlock);
+                                       RW_W_UNLOCK (&storage->rwlock);
                                        return FALSE;
                                }
                        }
@@ -239,7 +243,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
        else {
                elt = storage->cache->insert_func (storage->cache, key, keylen, data, len);
                if (elt == NULL) {
-                       g_static_rw_lock_writer_unlock (&storage->rwlock);
+                       RW_W_UNLOCK (&storage->rwlock);
                        return FALSE;
                }
        }
@@ -262,7 +266,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
 
        storage->elts ++;
        storage->memory += ELT_SIZE (elt);
-       g_static_rw_lock_writer_unlock (&storage->rwlock);
+       RW_W_UNLOCK (&storage->rwlock);
 
        return res;
 }
@@ -285,9 +289,9 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin
                /* Now check limits */
                while (storage->memory + ELT_SIZE (elt) > storage->max_memory) {
                        if (storage->expire) {
-                               g_static_rw_lock_writer_lock (&storage->rwlock);
+                               RW_W_LOCK (&storage->rwlock);
                                storage->expire->step_func (storage->expire, storage, time (NULL), steps);
-                               g_static_rw_lock_writer_unlock (&storage->rwlock);
+                               RW_W_UNLOCK (&storage->rwlock);
                        }
                        else {
                                msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -299,7 +303,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin
                }
        }
 
-       g_static_rw_lock_writer_lock (&storage->rwlock);
+       RW_W_LOCK (&storage->rwlock);
        /* Insert elt to the cache */
        res = storage->cache->replace_func (storage->cache, key, keylen, elt);
 
@@ -307,7 +311,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin
        if (res && storage->backend) {
                res = storage->backend->replace_func (storage->backend, key, keylen, elt);
        }
-       g_static_rw_lock_writer_unlock (&storage->rwlock);
+       RW_W_UNLOCK (&storage->rwlock);
 
        return res;
 }
@@ -320,7 +324,7 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu
        glong                                                           *lp;
 
        /* First try to look at cache */
-       g_static_rw_lock_writer_lock (&storage->rwlock);
+       RW_W_LOCK (&storage->rwlock);
        elt = storage->cache->lookup_func (storage->cache, key, keylen);
 
        if (elt == NULL && storage->backend) {
@@ -328,11 +332,11 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu
                if (belt) {
                        /* Put this element into cache */
                        if ((belt->flags & KV_ELT_INTEGER) != 0) {
-                               g_static_rw_lock_writer_unlock (&storage->rwlock);
+                               RW_W_UNLOCK (&storage->rwlock);
                                rspamd_kv_storage_insert_cache (storage, ELT_KEY (belt), keylen, ELT_DATA (belt),
                                        belt->size, belt->flags,
                                        belt->expire, &elt);
-                               g_static_rw_lock_writer_lock (&storage->rwlock);
+                               RW_W_LOCK (&storage->rwlock);
                        }
                        if ((belt->flags & KV_ELT_DIRTY) == 0) {
                                g_free (belt);
@@ -352,21 +356,21 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu
                elt->age = time (NULL);
                if (storage->backend) {
                        if (storage->backend->replace_func (storage->backend, key, keylen, elt)) {
-                               g_static_rw_lock_writer_unlock (&storage->rwlock);
+                               RW_W_UNLOCK (&storage->rwlock);
                                return TRUE;
                        }
                        else {
-                               g_static_rw_lock_writer_unlock (&storage->rwlock);
+                               RW_W_UNLOCK (&storage->rwlock);
                                return FALSE;
                        }
                }
                else {
-                       g_static_rw_lock_writer_unlock (&storage->rwlock);
+                       RW_W_UNLOCK (&storage->rwlock);
                        return TRUE;
                }
        }
 
-       g_static_rw_lock_writer_unlock (&storage->rwlock);
+       RW_W_UNLOCK (&storage->rwlock);
 
        return FALSE;
 }
@@ -378,7 +382,7 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, guint
        struct rspamd_kv_element                        *elt = NULL, *belt;
 
        /* First try to look at cache */
-       g_static_rw_lock_reader_lock (&storage->rwlock);
+       RW_R_LOCK (&storage->rwlock);
        elt = storage->cache->lookup_func (storage->cache, key, keylen);
 
        /* Next look at the backend */
@@ -417,7 +421,7 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint
        struct rspamd_kv_element           *elt;
 
        /* First delete key from cache */
-       g_static_rw_lock_writer_lock (&storage->rwlock);
+       RW_W_LOCK (&storage->rwlock);
        elt = storage->cache->delete_func (storage->cache, key, keylen);
 
        /* Now delete from backend */
@@ -439,7 +443,7 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint
                }
        }
 
-       g_static_rw_lock_writer_unlock (&storage->rwlock);
+       RW_W_UNLOCK (&storage->rwlock);
 
        return elt;
 }
@@ -448,7 +452,7 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint
 void
 rspamd_kv_storage_destroy (struct rspamd_kv_storage *storage)
 {
-       g_static_rw_lock_writer_lock (&storage->rwlock);
+       RW_W_LOCK (&storage->rwlock);
        if (storage->backend && storage->backend->destroy_func) {
                storage->backend->destroy_func (storage->backend);
        }
@@ -461,7 +465,7 @@ rspamd_kv_storage_destroy (struct rspamd_kv_storage *storage)
 
        g_free (storage->name);
 
-       g_static_rw_lock_writer_unlock (&storage->rwlock);
+       RW_W_UNLOCK (&storage->rwlock);
        g_slice_free1 (sizeof (struct rspamd_kv_storage), storage);
 }
 
index d32db2ee4231e6a943833c6c63c52e750bfe66b2..d1e2c16359d0f8ac11761f51aa4a238c9c4c8f63 100644 (file)
@@ -32,6 +32,19 @@ struct rspamd_kv_storage;
 struct rspamd_kv_expire;
 struct rspamd_kv_element;
 
+/* Locking definitions */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
+#define RW_R_LOCK g_rw_lock_reader_lock
+#define RW_R_UNLOCK g_rw_lock_reader_unlock
+#define RW_W_LOCK g_rw_lock_writer_lock
+#define RW_W_UNLOCK g_rw_lock_writer_unlock
+#else
+#define RW_R_LOCK g_static_rw_lock_reader_lock
+#define RW_R_UNLOCK g_static_rw_lock_reader_unlock
+#define RW_W_LOCK g_static_rw_lock_writer_lock
+#define RW_W_UNLOCK g_static_rw_lock_writer_unlock
+#endif
+
 /* Callbacks for cache */
 typedef void (*cache_init)(struct rspamd_kv_cache *cache);
 typedef struct rspamd_kv_element* (*cache_insert)(struct rspamd_kv_cache *cache,
@@ -140,7 +153,11 @@ struct rspamd_kv_storage {
        gchar *name;                                                            /* numeric ID */
 
        gboolean no_overwrite;                                          /* do not overwrite data with the same keys */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
+       GRWLock rwlock;                                                         /* rwlock in new glib */
+#else
        GStaticRWLock rwlock;                                           /* rwlock for threaded access */
+#endif
 };
 
 /** Create new kv storage */
index 059a0e6e87d58c8eb7e3c6cb6cdc967f540e2a55..96b1d6f15166c5511cbe4bd2348de1de15a285f1 100644 (file)
@@ -46,21 +46,21 @@ static sig_atomic_t soft_wanna_die = 0;
 
 /* Logging functions */
 #define thr_err(...)   do {                                                                                                                                                    \
-       g_static_mutex_lock (thr->log_mtx);                                                                                                                             \
+       g_mutex_lock (thr->log_mtx);                                                                                                                            \
        rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_CRITICAL, __FUNCTION__, __VA_ARGS__);       \
-       g_static_mutex_unlock (thr->log_mtx);                                                                                                                           \
+       g_mutex_unlock (thr->log_mtx);                                                                                                                          \
 } while (0)
 
 #define thr_warn(...)  do {                                                                                                                                                    \
-       g_static_mutex_lock (thr->log_mtx);                                                                                                                             \
+       g_mutex_lock (thr->log_mtx);                                                                                                                            \
        rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_WARNING, __FUNCTION__, __VA_ARGS__);        \
-       g_static_mutex_unlock (thr->log_mtx);                                                                                                                           \
+       g_mutex_unlock (thr->log_mtx);                                                                                                                          \
 } while (0)
 
 #define thr_info(...)  do {                                                                                                                                                    \
-       g_static_mutex_lock (thr->log_mtx);                                                                                                                             \
+       g_mutex_lock (thr->log_mtx);                                                                                                                            \
        rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_INFO, __FUNCTION__, __VA_ARGS__);   \
-       g_static_mutex_unlock (thr->log_mtx);                                                                                                                           \
+       g_mutex_unlock (thr->log_mtx);                                                                                                                          \
 } while (0)
 
 /* Init functions */
@@ -491,7 +491,7 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
        else if (session->command == KVSTORAGE_CMD_GET) {
                elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now);
                if (elt == NULL) {
-                       g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+                       RW_R_UNLOCK (&session->cf->storage->rwlock);
                        if (!is_redis) {
                                return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
                                                sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
@@ -520,18 +520,18 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
                        }
                        if (!rspamd_dispatcher_write (session->dispather, outbuf,
                                        r, TRUE, FALSE)) {
-                               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+                               RW_R_UNLOCK (&session->cf->storage->rwlock);
                                return FALSE;
                        }
                        if (elt->flags & KV_ELT_INTEGER) {
                                if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) {
-                                       g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+                                       RW_R_UNLOCK (&session->cf->storage->rwlock);
                                        return FALSE;
                                }
                        }
                        else {
                                if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) {
-                                       g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+                                       RW_R_UNLOCK (&session->cf->storage->rwlock);
                                        return FALSE;
                                }
                        }
@@ -545,7 +545,7 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
                                                sizeof (CRLF) - 1, FALSE, TRUE);
                        }
                        if (!res) {
-                               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+                               RW_R_UNLOCK (&session->cf->storage->rwlock);
                        }
 
                        return res;
@@ -942,7 +942,7 @@ kvstorage_write_socket (void *arg)
                if ((session->elt->flags & KV_ELT_NEED_INSERT) != 0) {
                        /* Insert to cache and free element */
                        session->elt->flags &= ~KV_ELT_NEED_INSERT;
-                       g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+                       RW_R_UNLOCK (&session->cf->storage->rwlock);
                        rspamd_kv_storage_insert_cache (session->cf->storage, ELT_KEY (session->elt),
                                                        session->elt->keylen, ELT_DATA (session->elt),
                                                        session->elt->size, session->elt->flags,
@@ -951,7 +951,7 @@ kvstorage_write_socket (void *arg)
                        session->elt = NULL;
                        return TRUE;
                }
-               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+               RW_R_UNLOCK (&session->cf->storage->rwlock);
                session->elt = NULL;
 
        }
@@ -975,7 +975,7 @@ kvstorage_err_socket (GError * err, void *arg)
        }
 
        if (session->elt) {
-               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+               RW_R_UNLOCK (&session->cf->storage->rwlock);
                session->elt = NULL;
        }
 
@@ -995,16 +995,16 @@ thr_accept_socket (gint fd, short what, void *arg)
        gint                                     nfd;
        struct kvstorage_session                        *session;
 
-       g_static_mutex_lock (thr->accept_mtx);
+       g_mutex_lock (thr->accept_mtx);
        if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
                thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno));
-               g_static_mutex_unlock (thr->accept_mtx);
+               g_mutex_unlock (thr->accept_mtx);
                return;
        }
 
        /* Check for EAGAIN */
        if (nfd == 0) {
-               g_static_mutex_unlock (thr->accept_mtx);
+               g_mutex_unlock (thr->accept_mtx);
                return;
        }
 
@@ -1017,7 +1017,7 @@ thr_accept_socket (gint fd, short what, void *arg)
                        kvstorage_read_socket, kvstorage_write_socket,
                        kvstorage_err_socket, thr->tv, session);
 
-       g_static_mutex_unlock (thr->accept_mtx);
+       g_mutex_unlock (thr->accept_mtx);
        session->elt = NULL;
 
        if (su.ss.ss_family == AF_UNIX) {
@@ -1029,6 +1029,25 @@ thr_accept_socket (gint fd, short what, void *arg)
        }
 }
 
+/**
+ * Handle termination
+ */
+static void
+thr_term_socket (gint fd, short what, void *arg)
+{
+       struct kvstorage_worker_thread          *thr = (struct kvstorage_worker_thread *)arg;
+       struct timeval                                           tv;
+
+       if (read (fd, &tv, sizeof (struct timeval)) != sizeof (struct timeval)) {
+               thr_err ("cannot read data from socket: %s", strerror (errno));
+               tv.tv_sec = 0;
+               tv.tv_usec = 0;
+       }
+
+       event_base_loopexit (thr->ev_base, &tv);
+       event_del (&thr->bind_ev);
+}
+
 /**
  * Thread main worker function
  */
@@ -1046,6 +1065,10 @@ kvstorage_thread (gpointer ud)
        event_base_set (thr->ev_base, &thr->bind_ev);
        event_add (&thr->bind_ev, NULL);
 
+       event_set (&thr->term_ev, thr->term_sock[0], EV_READ | EV_PERSIST, thr_term_socket, (void *)thr);
+       event_base_set (thr->ev_base, &thr->term_ev);
+       event_add (&thr->term_ev, NULL);
+
        event_base_loop (thr->ev_base, 0);
 
        return NULL;
@@ -1064,10 +1087,27 @@ create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_c
        new->ctx = ctx;
        new->worker = worker;
        new->tv = &ctx->io_timeout;
-       new->log_mtx = &ctx->log_mtx;
-       new->accept_mtx = &ctx->accept_mtx;
+       new->log_mtx = ctx->log_mtx;
+       new->accept_mtx = ctx->accept_mtx;
        new->id = id;
+
+       /* Create and setup terminating socket */
+       if (make_socketpair (new->term_sock) == -1) {
+               msg_err ("socket failed: %s", strerror (errno));
+               return NULL;
+       }
+       make_socket_nonblocking (new->term_sock[0]);
+
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
        new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err);
+#else
+       gchar                                                           *name;
+
+       name = memory_pool_alloc (ctx->pool, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1);
+       rspamd_snprintf (name, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1, "kvstorage_thread%d", id);
+
+       new->thr = g_thread_new (name, kvstorage_thread, new);
+#endif
        new->ev_base = NULL;
        new->signals = signals;
 
@@ -1104,8 +1144,10 @@ start_keystorage (struct rspamd_worker *worker)
        }
        worker->srv->pid = getpid ();
        ctx->threads = NULL;
-
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
        g_thread_init (NULL);
+#endif
+
 #if _EVENT_NUMERIC_VERSION > 0x02000000
        if (evthread_use_pthreads () == -1) {
                msg_err ("threads support is not supported in your libevent so kvstorage is not functionable");
@@ -1120,17 +1162,27 @@ start_keystorage (struct rspamd_worker *worker)
        }
 
        init_signals (&signals, sig_handler);
-       sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
 
        /* Set umask */
        umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP);
 
+       /* Init mutexes */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+       ctx->log_mtx = g_mutex_new ();
+       ctx->accept_mtx = g_mutex_new ();
+#else
+       ctx->log_mtx = memory_pool_alloc (ctx->pool, sizeof (GMutex));
+       ctx->accept_mtx = memory_pool_alloc (ctx->pool, sizeof (GMutex));
+       g_mutex_init (ctx->log_mtx);
+       g_mutex_init (ctx->accept_mtx);
+#endif
+
        /* Start workers threads */
-       g_static_mutex_init (&ctx->log_mtx);
-       g_static_mutex_init (&ctx->accept_mtx);
        for (i = 0; i < worker->cf->count; i ++) {
                thr = create_kvstorage_thread (worker, ctx, i, &signals.sa_mask);
-               ctx->threads = g_list_prepend (ctx->threads, thr);
+               if (thr != NULL) {
+                       ctx->threads = g_list_prepend (ctx->threads, thr);
+               }
        }
 
        sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL);
@@ -1147,9 +1199,11 @@ start_keystorage (struct rspamd_worker *worker)
                        cur = ctx->threads;
                        while (cur) {
                                thr = cur->data;
-                               if (thr->ev_base != NULL) {
-                                       event_del (&thr->bind_ev);
-                                       event_base_loopexit (thr->ev_base, &tv);
+                               while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) {
+                                       if (errno != EAGAIN) {
+                                               msg_err ("write to term socket failed: %s", strerror (errno));
+                                               abort ();
+                                       }
                                }
                                cur = g_list_next (cur);
                        }
@@ -1163,9 +1217,11 @@ start_keystorage (struct rspamd_worker *worker)
                        cur = ctx->threads;
                        while (cur) {
                                thr = cur->data;
-                               if (thr->ev_base != NULL) {
-                                       event_del (&thr->bind_ev);
-                                       event_base_loopexit (thr->ev_base, &tv);
+                               while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) {
+                                       if (errno != EAGAIN) {
+                                               msg_err ("write to term socket failed: %s", strerror (errno));
+                                               abort ();
+                                       }
                                }
                                cur = g_list_next (cur);
                        }
@@ -1178,6 +1234,15 @@ start_keystorage (struct rspamd_worker *worker)
        }
 
        msg_info ("syncing storages");
+       /* Wait for threads in the recent glib */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
+       cur = ctx->threads;
+       while (cur) {
+               thr = cur->data;
+               (void)g_thread_join (thr->thr);
+               cur = g_list_next (cur);
+       }
+#endif
        destroy_kvstorage_config ();
        close_log (rspamd_main->logger);
        exit (EXIT_SUCCESS);
index aeb11e2496d60eeccddf964c4fd06f0c69483387..6e8e218ea1c558ca6f81200a5a9b3061d621c66b 100644 (file)
@@ -38,21 +38,23 @@ struct kvstorage_worker_ctx {
        gboolean is_redis;
        memory_pool_t *pool;
        struct event_base *ev_base;
-       GStaticMutex log_mtx;
-       GStaticMutex accept_mtx;
+       GMutex *log_mtx;
+       GMutex *accept_mtx;
 };
 
 struct kvstorage_worker_thread {
        struct event bind_ev;
+       struct event term_ev;
        struct timeval *tv;
        struct kvstorage_worker_ctx *ctx;
        struct rspamd_worker *worker;
        GThread *thr;
        struct event_base *ev_base;
-       GStaticMutex *log_mtx;
-       GStaticMutex *accept_mtx;
+       GMutex *log_mtx;
+       GMutex *accept_mtx;
        guint id;
        sigset_t *signals;
+       gint term_sock[2];
 };
 
 struct kvstorage_session {
index bf955ae942548d59a32270ea201c707f05d52e7e..7620eb25dff0e4386ffcb54574e47a7153e5a258 100644 (file)
@@ -1182,7 +1182,7 @@ header_iterate (memory_pool_t * pool, struct gmime_raw_header *h, GList ** ret,
 {
        while (h) {
                if (G_LIKELY (!strong)) {
-                       if (h->value && !g_strncasecmp (field, h->name, strlen (field))) {
+                       if (h->value && !g_ascii_strncasecmp (field, h->name, strlen (field))) {
                                if (pool != NULL) {
                                        *ret = g_list_prepend (*ret, memory_pool_strdup (pool, h->value));
                                }
@@ -1223,7 +1223,7 @@ header_iterate (memory_pool_t * pool, GMimeHeaderList * ls, GList ** ret, const
                while (g_mime_header_iter_is_valid (iter)) {
                        name = g_mime_header_iter_get_name (iter);
                        if (G_LIKELY (!strong)) {
-                               if (!g_strncasecmp (field, name, strlen (name))) {
+                               if (!g_ascii_strncasecmp (field, name, strlen (name))) {
                                        if (pool != NULL) {
                                                *ret = g_list_prepend (*ret, memory_pool_strdup (pool, g_mime_header_iter_get_value (iter)));
                                        }
@@ -1556,11 +1556,11 @@ message_set_header (GMimeMessage * message, const gchar *field, const gchar *val
 {
        gint                            i;
 
-       if (!g_strcasecmp (field, "MIME-Version:") || !g_strncasecmp (field, "Content-", 8)) {
+       if (!g_ascii_strcasecmp (field, "MIME-Version:") || !g_ascii_strncasecmp (field, "Content-", 8)) {
                return;
        }
        for (i = 0; i <= HEADER_UNKNOWN; ++i) {
-               if (!fieldfunc[i].name || !g_strncasecmp (field, fieldfunc[i].name, strlen (fieldfunc[i].name))) {
+               if (!fieldfunc[i].name || !g_ascii_strncasecmp (field, fieldfunc[i].name, strlen (fieldfunc[i].name))) {
                        switch (fieldfunc[i].functype) {
                        case FUNC_CHARPTR:
                                (*(fieldfunc[i].setfunc)) (message, value);
@@ -1593,7 +1593,7 @@ message_get_header (memory_pool_t * pool, GMimeMessage * message, const gchar *f
        InternetAddressList            *ia_list = NULL, *ia;
 
        for (i = 0; i <= HEADER_UNKNOWN; ++i) {
-               if (!fieldfunc[i].name || !g_strncasecmp (field, fieldfunc[i].name, strlen (fieldfunc[i].name))) {
+               if (!fieldfunc[i].name || !g_ascii_strncasecmp (field, fieldfunc[i].name, strlen (fieldfunc[i].name))) {
                        switch (fieldfunc[i].functype) {
                        case FUNC_CHARFREEPTR:
                                ret = (gchar *)(*(fieldfunc[i].func)) (message);
index 863136d2678cf064d04e0773769c711e4564b5ce..902078d29719f9a141156528b23db53495ea3eff 100644 (file)
@@ -84,6 +84,7 @@ static const struct luaL_reg    regexplib_m[] = {
 };
 
 static struct regexp_ctx       *regexp_module_ctx = NULL;
+static GMutex                             *workers_mtx = NULL;
 
 static gint                     regexp_common_filter (struct worker_task *task);
 static void                                            process_regexp_item_threaded (gpointer data, gpointer user_data);
@@ -525,7 +526,13 @@ regexp_module_config (struct config_file *cfg)
                if (g_thread_supported ()) {
                        thr = parse_limit (value, -1);
                        if (thr > 1) {
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
                                g_thread_init (NULL);
+                               workers_mtx = g_mutex_new ();
+#else
+                               workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex));
+                               g_mutex_init (workers_mtx);
+#endif
                                regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, regexp_module_ctx, thr, TRUE, &err);
                                if (err != NULL) {
                                        msg_err ("thread pool creation failed: %s", err->message);
@@ -552,11 +559,8 @@ regexp_module_config (struct config_file *cfg)
        cur_opt = g_hash_table_lookup (cfg->modules_opts, "regexp");
        while (cur_opt) {
                cur = cur_opt->data;
-               if (strcmp (cur->param, "metric") == 0 || strcmp (cur->param, "statfile_prefix") == 0) {
-                       cur_opt = g_list_next (cur_opt);
-                       continue;
-               }
-               else if (g_ascii_strncasecmp (cur->param, "autolearn", sizeof ("autolearn") - 1) == 0) {
+               /* Skip several options that are not regexp */
+               if (g_ascii_strncasecmp (cur->param, "autolearn", sizeof ("autolearn") - 1) == 0) {
                        parse_autolearn_param (cur->param, cur->value, cfg);
                        cur_opt = g_list_next (cur_opt);
                        continue;
@@ -569,6 +573,11 @@ regexp_module_config (struct config_file *cfg)
                        cur_opt = g_list_next (cur_opt);
                        continue;
                }
+               else if (g_ascii_strncasecmp (cur->param, "max_threads", sizeof ("max_threads") - 1) == 0) {
+                       cur_opt = g_list_next (cur_opt);
+                       continue;
+               }
+               /* Handle regexps */
                cur_item = memory_pool_alloc0 (regexp_module_ctx->regexp_pool, sizeof (struct regexp_module_item));
                cur_item->symbol = cur->param;
                if (cur->is_lua && cur->lua_type == LUA_VAR_STRING) {
@@ -1205,13 +1214,17 @@ process_regexp_item_threaded (gpointer data, gpointer user_data)
        if (ud->item->lua_function) {
                /* Just call function */
                if (lua_call_expression_func ("regexp", ud->item->lua_function, ud->task, NULL, &res) && res) {
+                       g_mutex_lock (workers_mtx);
                        insert_result (ud->task, ud->item->symbol, 1, NULL);
+                       g_mutex_unlock (workers_mtx);
                }
        }
        else {
                /* Process expression */
                if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) {
+                       g_mutex_lock (workers_mtx);
                        insert_result (ud->task, ud->item->symbol, 1, NULL);
+                       g_mutex_unlock (workers_mtx);
                }
        }
 }
index 2482dc58956e3fefe31703f84809c51a3fe2ce14..6e4cad8ce5779d1a3a4f02899123d5813bf9708c 100644 (file)
@@ -389,7 +389,7 @@ surbl_module_config (struct config_file *cfg)
        cur_opt = g_hash_table_lookup (cfg->modules_opts, "surbl");
        while (cur_opt) {
                cur = cur_opt->data;
-               if (!g_strncasecmp (cur->param, "suffix", sizeof ("suffix") - 1)) {
+               if (!g_ascii_strncasecmp (cur->param, "suffix", sizeof ("suffix") - 1)) {
                        if ((str = strchr (cur->param, '_')) != NULL) {
                                new_suffix = memory_pool_alloc (surbl_module_ctx->surbl_pool, sizeof (struct suffix_item));
                                *str = '\0';
@@ -414,7 +414,7 @@ surbl_module_config (struct config_file *cfg)
                        }
                }
                /* Search for bits */
-               else if (!g_strncasecmp (cur->param, "bit", sizeof ("bit") - 1)) {
+               else if (!g_ascii_strncasecmp (cur->param, "bit", sizeof ("bit") - 1)) {
                        if ((str = strchr (cur->param, '_')) != NULL) {
                                bit = strtoul (str + 1, NULL, 10);
                                if (bit != 0) {