From 9023eb3f1e85e456ee8804b7867c9e17ada5aa6e Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Tue, 31 Jan 2012 20:59:10 +0400 Subject: [PATCH] Adopt rspamd for the next glib release. Fix several issues in threads handling inside keystorage. Fix sigsuspend usage in keystorage. --- contrib/hiredis/hiredis.c | 2 +- src/kvstorage.c | 62 ++++++++++--------- src/kvstorage.h | 17 +++++ src/kvstorage_server.c | 127 ++++++++++++++++++++++++++++---------- src/kvstorage_server.h | 10 +-- src/message.c | 10 +-- src/plugins/regexp.c | 23 +++++-- src/plugins/surbl.c | 4 +- 8 files changed, 178 insertions(+), 77 deletions(-) diff --git a/contrib/hiredis/hiredis.c b/contrib/hiredis/hiredis.c index 9ea998ace..1426ac723 100644 --- a/contrib/hiredis/hiredis.c +++ b/contrib/hiredis/hiredis.c @@ -796,7 +796,7 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { } /* Consume and discard vararg */ - va_arg(ap,void); + (void)va_arg(ap,int); } } diff --git a/src/kvstorage.c b/src/kvstorage.c index 76def504e..f02f6a568 100644 --- a/src/kvstorage.c +++ b/src/kvstorage.c @@ -61,8 +61,12 @@ rspamd_kv_storage_new (gint id, const gchar *name, struct rspamd_kv_cache *cache new->name = g_malloc (sizeof ("18446744073709551616")); rspamd_snprintf (new->name, sizeof ("18446744073709551616"), "%d", id); } - +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) + g_rw_lock_init (&new->rwlock); +#else g_static_rw_lock_init (&new->rwlock); +#endif + /* Init structures */ if (new->cache->init_func) { @@ -86,13 +90,13 @@ rspamd_kv_storage_insert_cache (struct rspamd_kv_storage *storage, gpointer key, gint steps = 0; struct rspamd_kv_element *elt; - g_static_rw_lock_writer_lock (&storage->rwlock); + RW_W_LOCK (&storage->rwlock); /* Hard limit */ if (storage->max_memory > 0) { if (len > storage->max_memory) { msg_info ("<%s>: trying to insert value of length %z while limit is %z", storage->name, len, storage->max_memory); - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); return FALSE; } @@ -105,7 +109,7 @@ rspamd_kv_storage_insert_cache (struct rspamd_kv_storage *storage, gpointer key, msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); } if (++steps > MAX_EXPIRE_STEPS) { - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); return FALSE; } @@ -132,7 +136,7 @@ rspamd_kv_storage_insert_cache (struct rspamd_kv_storage *storage, gpointer key, storage->elts ++; storage->memory += ELT_SIZE (elt); - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); return TRUE; } @@ -148,12 +152,12 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint glong longval; /* Hard limit */ - g_static_rw_lock_writer_lock (&storage->rwlock); + RW_W_LOCK (&storage->rwlock); if (storage->max_memory > 0) { if (len + sizeof (struct rspamd_kv_element) + keylen >= storage->max_memory) { msg_warn ("<%s>: trying to insert value of length %z while limit is %z", storage->name, len, storage->max_memory); - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); return FALSE; } @@ -166,7 +170,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); } if (++steps > MAX_EXPIRE_STEPS) { - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); return FALSE; } @@ -183,7 +187,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); } if (++steps > MAX_EXPIRE_STEPS) { - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); return FALSE; } @@ -213,11 +217,11 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint /* Just do incref and nothing more */ if (storage->backend && storage->backend->incref_func) { if (storage->backend->incref_func (storage->backend, key, keylen)) { - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); return TRUE; } else { - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); return FALSE; } } @@ -239,7 +243,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint else { elt = storage->cache->insert_func (storage->cache, key, keylen, data, len); if (elt == NULL) { - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); return FALSE; } } @@ -262,7 +266,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint storage->elts ++; storage->memory += ELT_SIZE (elt); - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); return res; } @@ -285,9 +289,9 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin /* Now check limits */ while (storage->memory + ELT_SIZE (elt) > storage->max_memory) { if (storage->expire) { - g_static_rw_lock_writer_lock (&storage->rwlock); + RW_W_LOCK (&storage->rwlock); storage->expire->step_func (storage->expire, storage, time (NULL), steps); - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); } else { msg_warn ("<%s>: storage is full and no expire function is defined", storage->name); @@ -299,7 +303,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin } } - g_static_rw_lock_writer_lock (&storage->rwlock); + RW_W_LOCK (&storage->rwlock); /* Insert elt to the cache */ res = storage->cache->replace_func (storage->cache, key, keylen, elt); @@ -307,7 +311,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guin if (res && storage->backend) { res = storage->backend->replace_func (storage->backend, key, keylen, elt); } - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); return res; } @@ -320,7 +324,7 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu glong *lp; /* First try to look at cache */ - g_static_rw_lock_writer_lock (&storage->rwlock); + RW_W_LOCK (&storage->rwlock); elt = storage->cache->lookup_func (storage->cache, key, keylen); if (elt == NULL && storage->backend) { @@ -328,11 +332,11 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu if (belt) { /* Put this element into cache */ if ((belt->flags & KV_ELT_INTEGER) != 0) { - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); rspamd_kv_storage_insert_cache (storage, ELT_KEY (belt), keylen, ELT_DATA (belt), belt->size, belt->flags, belt->expire, &elt); - g_static_rw_lock_writer_lock (&storage->rwlock); + RW_W_LOCK (&storage->rwlock); } if ((belt->flags & KV_ELT_DIRTY) == 0) { g_free (belt); @@ -352,21 +356,21 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu elt->age = time (NULL); if (storage->backend) { if (storage->backend->replace_func (storage->backend, key, keylen, elt)) { - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); return TRUE; } else { - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); return FALSE; } } else { - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); return TRUE; } } - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); return FALSE; } @@ -378,7 +382,7 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, guint struct rspamd_kv_element *elt = NULL, *belt; /* First try to look at cache */ - g_static_rw_lock_reader_lock (&storage->rwlock); + RW_R_LOCK (&storage->rwlock); elt = storage->cache->lookup_func (storage->cache, key, keylen); /* Next look at the backend */ @@ -417,7 +421,7 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint struct rspamd_kv_element *elt; /* First delete key from cache */ - g_static_rw_lock_writer_lock (&storage->rwlock); + RW_W_LOCK (&storage->rwlock); elt = storage->cache->delete_func (storage->cache, key, keylen); /* Now delete from backend */ @@ -439,7 +443,7 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint } } - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); return elt; } @@ -448,7 +452,7 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint void rspamd_kv_storage_destroy (struct rspamd_kv_storage *storage) { - g_static_rw_lock_writer_lock (&storage->rwlock); + RW_W_LOCK (&storage->rwlock); if (storage->backend && storage->backend->destroy_func) { storage->backend->destroy_func (storage->backend); } @@ -461,7 +465,7 @@ rspamd_kv_storage_destroy (struct rspamd_kv_storage *storage) g_free (storage->name); - g_static_rw_lock_writer_unlock (&storage->rwlock); + RW_W_UNLOCK (&storage->rwlock); g_slice_free1 (sizeof (struct rspamd_kv_storage), storage); } diff --git a/src/kvstorage.h b/src/kvstorage.h index d32db2ee4..d1e2c1635 100644 --- a/src/kvstorage.h +++ b/src/kvstorage.h @@ -32,6 +32,19 @@ struct rspamd_kv_storage; struct rspamd_kv_expire; struct rspamd_kv_element; +/* Locking definitions */ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) +#define RW_R_LOCK g_rw_lock_reader_lock +#define RW_R_UNLOCK g_rw_lock_reader_unlock +#define RW_W_LOCK g_rw_lock_writer_lock +#define RW_W_UNLOCK g_rw_lock_writer_unlock +#else +#define RW_R_LOCK g_static_rw_lock_reader_lock +#define RW_R_UNLOCK g_static_rw_lock_reader_unlock +#define RW_W_LOCK g_static_rw_lock_writer_lock +#define RW_W_UNLOCK g_static_rw_lock_writer_unlock +#endif + /* Callbacks for cache */ typedef void (*cache_init)(struct rspamd_kv_cache *cache); typedef struct rspamd_kv_element* (*cache_insert)(struct rspamd_kv_cache *cache, @@ -140,7 +153,11 @@ struct rspamd_kv_storage { gchar *name; /* numeric ID */ gboolean no_overwrite; /* do not overwrite data with the same keys */ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) + GRWLock rwlock; /* rwlock in new glib */ +#else GStaticRWLock rwlock; /* rwlock for threaded access */ +#endif }; /** Create new kv storage */ diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index 059a0e6e8..96b1d6f15 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -46,21 +46,21 @@ static sig_atomic_t soft_wanna_die = 0; /* Logging functions */ #define thr_err(...) do { \ - g_static_mutex_lock (thr->log_mtx); \ + g_mutex_lock (thr->log_mtx); \ rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_CRITICAL, __FUNCTION__, __VA_ARGS__); \ - g_static_mutex_unlock (thr->log_mtx); \ + g_mutex_unlock (thr->log_mtx); \ } while (0) #define thr_warn(...) do { \ - g_static_mutex_lock (thr->log_mtx); \ + g_mutex_lock (thr->log_mtx); \ rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_WARNING, __FUNCTION__, __VA_ARGS__); \ - g_static_mutex_unlock (thr->log_mtx); \ + g_mutex_unlock (thr->log_mtx); \ } while (0) #define thr_info(...) do { \ - g_static_mutex_lock (thr->log_mtx); \ + g_mutex_lock (thr->log_mtx); \ rspamd_common_log_function(rspamd_main->logger, G_LOG_LEVEL_INFO, __FUNCTION__, __VA_ARGS__); \ - g_static_mutex_unlock (thr->log_mtx); \ + g_mutex_unlock (thr->log_mtx); \ } while (0) /* Init functions */ @@ -491,7 +491,7 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) else if (session->command == KVSTORAGE_CMD_GET) { elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->keylen, session->now); if (elt == NULL) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); @@ -520,18 +520,18 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } if (!rspamd_dispatcher_write (session->dispather, outbuf, r, TRUE, FALSE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); return FALSE; } if (elt->flags & KV_ELT_INTEGER) { if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); return FALSE; } } else { if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); return FALSE; } } @@ -545,7 +545,7 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) sizeof (CRLF) - 1, FALSE, TRUE); } if (!res) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); } return res; @@ -942,7 +942,7 @@ kvstorage_write_socket (void *arg) if ((session->elt->flags & KV_ELT_NEED_INSERT) != 0) { /* Insert to cache and free element */ session->elt->flags &= ~KV_ELT_NEED_INSERT; - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); rspamd_kv_storage_insert_cache (session->cf->storage, ELT_KEY (session->elt), session->elt->keylen, ELT_DATA (session->elt), session->elt->size, session->elt->flags, @@ -951,7 +951,7 @@ kvstorage_write_socket (void *arg) session->elt = NULL; return TRUE; } - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); session->elt = NULL; } @@ -975,7 +975,7 @@ kvstorage_err_socket (GError * err, void *arg) } if (session->elt) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + RW_R_UNLOCK (&session->cf->storage->rwlock); session->elt = NULL; } @@ -995,16 +995,16 @@ thr_accept_socket (gint fd, short what, void *arg) gint nfd; struct kvstorage_session *session; - g_static_mutex_lock (thr->accept_mtx); + g_mutex_lock (thr->accept_mtx); if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) { thr_warn ("%ud: accept failed: %s", thr->id, strerror (errno)); - g_static_mutex_unlock (thr->accept_mtx); + g_mutex_unlock (thr->accept_mtx); return; } /* Check for EAGAIN */ if (nfd == 0) { - g_static_mutex_unlock (thr->accept_mtx); + g_mutex_unlock (thr->accept_mtx); return; } @@ -1017,7 +1017,7 @@ thr_accept_socket (gint fd, short what, void *arg) kvstorage_read_socket, kvstorage_write_socket, kvstorage_err_socket, thr->tv, session); - g_static_mutex_unlock (thr->accept_mtx); + g_mutex_unlock (thr->accept_mtx); session->elt = NULL; if (su.ss.ss_family == AF_UNIX) { @@ -1029,6 +1029,25 @@ thr_accept_socket (gint fd, short what, void *arg) } } +/** + * Handle termination + */ +static void +thr_term_socket (gint fd, short what, void *arg) +{ + struct kvstorage_worker_thread *thr = (struct kvstorage_worker_thread *)arg; + struct timeval tv; + + if (read (fd, &tv, sizeof (struct timeval)) != sizeof (struct timeval)) { + thr_err ("cannot read data from socket: %s", strerror (errno)); + tv.tv_sec = 0; + tv.tv_usec = 0; + } + + event_base_loopexit (thr->ev_base, &tv); + event_del (&thr->bind_ev); +} + /** * Thread main worker function */ @@ -1046,6 +1065,10 @@ kvstorage_thread (gpointer ud) event_base_set (thr->ev_base, &thr->bind_ev); event_add (&thr->bind_ev, NULL); + event_set (&thr->term_ev, thr->term_sock[0], EV_READ | EV_PERSIST, thr_term_socket, (void *)thr); + event_base_set (thr->ev_base, &thr->term_ev); + event_add (&thr->term_ev, NULL); + event_base_loop (thr->ev_base, 0); return NULL; @@ -1064,10 +1087,27 @@ create_kvstorage_thread (struct rspamd_worker *worker, struct kvstorage_worker_c new->ctx = ctx; new->worker = worker; new->tv = &ctx->io_timeout; - new->log_mtx = &ctx->log_mtx; - new->accept_mtx = &ctx->accept_mtx; + new->log_mtx = ctx->log_mtx; + new->accept_mtx = ctx->accept_mtx; new->id = id; + + /* Create and setup terminating socket */ + if (make_socketpair (new->term_sock) == -1) { + msg_err ("socket failed: %s", strerror (errno)); + return NULL; + } + make_socket_nonblocking (new->term_sock[0]); + +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) new->thr = g_thread_create (kvstorage_thread, new, FALSE, &err); +#else + gchar *name; + + name = memory_pool_alloc (ctx->pool, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1); + rspamd_snprintf (name, sizeof ("kvstorage_thread") + sizeof ("4294967296") - 1, "kvstorage_thread%d", id); + + new->thr = g_thread_new (name, kvstorage_thread, new); +#endif new->ev_base = NULL; new->signals = signals; @@ -1104,8 +1144,10 @@ start_keystorage (struct rspamd_worker *worker) } worker->srv->pid = getpid (); ctx->threads = NULL; - +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) g_thread_init (NULL); +#endif + #if _EVENT_NUMERIC_VERSION > 0x02000000 if (evthread_use_pthreads () == -1) { msg_err ("threads support is not supported in your libevent so kvstorage is not functionable"); @@ -1120,17 +1162,27 @@ start_keystorage (struct rspamd_worker *worker) } init_signals (&signals, sig_handler); - sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL); /* Set umask */ umask (S_IWGRP | S_IWOTH | S_IROTH | S_IRGRP); + /* Init mutexes */ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) + ctx->log_mtx = g_mutex_new (); + ctx->accept_mtx = g_mutex_new (); +#else + ctx->log_mtx = memory_pool_alloc (ctx->pool, sizeof (GMutex)); + ctx->accept_mtx = memory_pool_alloc (ctx->pool, sizeof (GMutex)); + g_mutex_init (ctx->log_mtx); + g_mutex_init (ctx->accept_mtx); +#endif + /* Start workers threads */ - g_static_mutex_init (&ctx->log_mtx); - g_static_mutex_init (&ctx->accept_mtx); for (i = 0; i < worker->cf->count; i ++) { thr = create_kvstorage_thread (worker, ctx, i, &signals.sa_mask); - ctx->threads = g_list_prepend (ctx->threads, thr); + if (thr != NULL) { + ctx->threads = g_list_prepend (ctx->threads, thr); + } } sigprocmask (SIG_BLOCK, &signals.sa_mask, NULL); @@ -1147,9 +1199,11 @@ start_keystorage (struct rspamd_worker *worker) cur = ctx->threads; while (cur) { thr = cur->data; - if (thr->ev_base != NULL) { - event_del (&thr->bind_ev); - event_base_loopexit (thr->ev_base, &tv); + while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) { + if (errno != EAGAIN) { + msg_err ("write to term socket failed: %s", strerror (errno)); + abort (); + } } cur = g_list_next (cur); } @@ -1163,9 +1217,11 @@ start_keystorage (struct rspamd_worker *worker) cur = ctx->threads; while (cur) { thr = cur->data; - if (thr->ev_base != NULL) { - event_del (&thr->bind_ev); - event_base_loopexit (thr->ev_base, &tv); + while (write (thr->term_sock[1], &tv, sizeof (struct timeval)) == -1) { + if (errno != EAGAIN) { + msg_err ("write to term socket failed: %s", strerror (errno)); + abort (); + } } cur = g_list_next (cur); } @@ -1178,6 +1234,15 @@ start_keystorage (struct rspamd_worker *worker) } msg_info ("syncing storages"); + /* Wait for threads in the recent glib */ +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30)) + cur = ctx->threads; + while (cur) { + thr = cur->data; + (void)g_thread_join (thr->thr); + cur = g_list_next (cur); + } +#endif destroy_kvstorage_config (); close_log (rspamd_main->logger); exit (EXIT_SUCCESS); diff --git a/src/kvstorage_server.h b/src/kvstorage_server.h index aeb11e249..6e8e218ea 100644 --- a/src/kvstorage_server.h +++ b/src/kvstorage_server.h @@ -38,21 +38,23 @@ struct kvstorage_worker_ctx { gboolean is_redis; memory_pool_t *pool; struct event_base *ev_base; - GStaticMutex log_mtx; - GStaticMutex accept_mtx; + GMutex *log_mtx; + GMutex *accept_mtx; }; struct kvstorage_worker_thread { struct event bind_ev; + struct event term_ev; struct timeval *tv; struct kvstorage_worker_ctx *ctx; struct rspamd_worker *worker; GThread *thr; struct event_base *ev_base; - GStaticMutex *log_mtx; - GStaticMutex *accept_mtx; + GMutex *log_mtx; + GMutex *accept_mtx; guint id; sigset_t *signals; + gint term_sock[2]; }; struct kvstorage_session { diff --git a/src/message.c b/src/message.c index bf955ae94..7620eb25d 100644 --- a/src/message.c +++ b/src/message.c @@ -1182,7 +1182,7 @@ header_iterate (memory_pool_t * pool, struct gmime_raw_header *h, GList ** ret, { while (h) { if (G_LIKELY (!strong)) { - if (h->value && !g_strncasecmp (field, h->name, strlen (field))) { + if (h->value && !g_ascii_strncasecmp (field, h->name, strlen (field))) { if (pool != NULL) { *ret = g_list_prepend (*ret, memory_pool_strdup (pool, h->value)); } @@ -1223,7 +1223,7 @@ header_iterate (memory_pool_t * pool, GMimeHeaderList * ls, GList ** ret, const while (g_mime_header_iter_is_valid (iter)) { name = g_mime_header_iter_get_name (iter); if (G_LIKELY (!strong)) { - if (!g_strncasecmp (field, name, strlen (name))) { + if (!g_ascii_strncasecmp (field, name, strlen (name))) { if (pool != NULL) { *ret = g_list_prepend (*ret, memory_pool_strdup (pool, g_mime_header_iter_get_value (iter))); } @@ -1556,11 +1556,11 @@ message_set_header (GMimeMessage * message, const gchar *field, const gchar *val { gint i; - if (!g_strcasecmp (field, "MIME-Version:") || !g_strncasecmp (field, "Content-", 8)) { + if (!g_ascii_strcasecmp (field, "MIME-Version:") || !g_ascii_strncasecmp (field, "Content-", 8)) { return; } for (i = 0; i <= HEADER_UNKNOWN; ++i) { - if (!fieldfunc[i].name || !g_strncasecmp (field, fieldfunc[i].name, strlen (fieldfunc[i].name))) { + if (!fieldfunc[i].name || !g_ascii_strncasecmp (field, fieldfunc[i].name, strlen (fieldfunc[i].name))) { switch (fieldfunc[i].functype) { case FUNC_CHARPTR: (*(fieldfunc[i].setfunc)) (message, value); @@ -1593,7 +1593,7 @@ message_get_header (memory_pool_t * pool, GMimeMessage * message, const gchar *f InternetAddressList *ia_list = NULL, *ia; for (i = 0; i <= HEADER_UNKNOWN; ++i) { - if (!fieldfunc[i].name || !g_strncasecmp (field, fieldfunc[i].name, strlen (fieldfunc[i].name))) { + if (!fieldfunc[i].name || !g_ascii_strncasecmp (field, fieldfunc[i].name, strlen (fieldfunc[i].name))) { switch (fieldfunc[i].functype) { case FUNC_CHARFREEPTR: ret = (gchar *)(*(fieldfunc[i].func)) (message); diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c index 863136d26..902078d29 100644 --- a/src/plugins/regexp.c +++ b/src/plugins/regexp.c @@ -84,6 +84,7 @@ static const struct luaL_reg regexplib_m[] = { }; static struct regexp_ctx *regexp_module_ctx = NULL; +static GMutex *workers_mtx = NULL; static gint regexp_common_filter (struct worker_task *task); static void process_regexp_item_threaded (gpointer data, gpointer user_data); @@ -525,7 +526,13 @@ regexp_module_config (struct config_file *cfg) if (g_thread_supported ()) { thr = parse_limit (value, -1); if (thr > 1) { +#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30)) g_thread_init (NULL); + workers_mtx = g_mutex_new (); +#else + workers_mtx = memory_pool_alloc (regexp_module_ctx->regexp_pool, sizeof (GMutex)); + g_mutex_init (workers_mtx); +#endif regexp_module_ctx->workers = g_thread_pool_new (process_regexp_item_threaded, regexp_module_ctx, thr, TRUE, &err); if (err != NULL) { msg_err ("thread pool creation failed: %s", err->message); @@ -552,11 +559,8 @@ regexp_module_config (struct config_file *cfg) cur_opt = g_hash_table_lookup (cfg->modules_opts, "regexp"); while (cur_opt) { cur = cur_opt->data; - if (strcmp (cur->param, "metric") == 0 || strcmp (cur->param, "statfile_prefix") == 0) { - cur_opt = g_list_next (cur_opt); - continue; - } - else if (g_ascii_strncasecmp (cur->param, "autolearn", sizeof ("autolearn") - 1) == 0) { + /* Skip several options that are not regexp */ + if (g_ascii_strncasecmp (cur->param, "autolearn", sizeof ("autolearn") - 1) == 0) { parse_autolearn_param (cur->param, cur->value, cfg); cur_opt = g_list_next (cur_opt); continue; @@ -569,6 +573,11 @@ regexp_module_config (struct config_file *cfg) cur_opt = g_list_next (cur_opt); continue; } + else if (g_ascii_strncasecmp (cur->param, "max_threads", sizeof ("max_threads") - 1) == 0) { + cur_opt = g_list_next (cur_opt); + continue; + } + /* Handle regexps */ cur_item = memory_pool_alloc0 (regexp_module_ctx->regexp_pool, sizeof (struct regexp_module_item)); cur_item->symbol = cur->param; if (cur->is_lua && cur->lua_type == LUA_VAR_STRING) { @@ -1205,13 +1214,17 @@ process_regexp_item_threaded (gpointer data, gpointer user_data) if (ud->item->lua_function) { /* Just call function */ if (lua_call_expression_func ("regexp", ud->item->lua_function, ud->task, NULL, &res) && res) { + g_mutex_lock (workers_mtx); insert_result (ud->task, ud->item->symbol, 1, NULL); + g_mutex_unlock (workers_mtx); } } else { /* Process expression */ if (process_regexp_expression (ud->item->expr, ud->item->symbol, ud->task, NULL)) { + g_mutex_lock (workers_mtx); insert_result (ud->task, ud->item->symbol, 1, NULL); + g_mutex_unlock (workers_mtx); } } } diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index 2482dc589..6e4cad8ce 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -389,7 +389,7 @@ surbl_module_config (struct config_file *cfg) cur_opt = g_hash_table_lookup (cfg->modules_opts, "surbl"); while (cur_opt) { cur = cur_opt->data; - if (!g_strncasecmp (cur->param, "suffix", sizeof ("suffix") - 1)) { + if (!g_ascii_strncasecmp (cur->param, "suffix", sizeof ("suffix") - 1)) { if ((str = strchr (cur->param, '_')) != NULL) { new_suffix = memory_pool_alloc (surbl_module_ctx->surbl_pool, sizeof (struct suffix_item)); *str = '\0'; @@ -414,7 +414,7 @@ surbl_module_config (struct config_file *cfg) } } /* Search for bits */ - else if (!g_strncasecmp (cur->param, "bit", sizeof ("bit") - 1)) { + else if (!g_ascii_strncasecmp (cur->param, "bit", sizeof ("bit") - 1)) { if ((str = strchr (cur->param, '_')) != NULL) { bit = strtoul (str + 1, NULL, 10); if (bit != 0) { -- 2.39.5