aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-05-26 09:38:47 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-05-26 09:38:47 +0100
commit1abb0939d17211cadd1be7b7249cf024f8cfebdf (patch)
tree6f069c5762c3b1227e70f067afc59e72c75b5054 /src
parent1c7f982f9d7a3d49c16307c17507ed7aa964754f (diff)
downloadrspamd-1abb0939d17211cadd1be7b7249cf024f8cfebdf.tar.gz
rspamd-1abb0939d17211cadd1be7b7249cf024f8cfebdf.zip
Remove async threads for now.
They are anyway broken in the asynchronous world.
Diffstat (limited to 'src')
-rw-r--r--src/libserver/events.c137
-rw-r--r--src/libserver/events.h44
2 files changed, 48 insertions, 133 deletions
diff --git a/src/libserver/events.c b/src/libserver/events.c
index 1b3e845ce..9af0a84a8 100644
--- a/src/libserver/events.c
+++ b/src/libserver/events.c
@@ -26,6 +26,30 @@
#include "main.h"
#include "events.h"
+struct rspamd_async_watcher {
+ event_watcher_t cb;
+ guint remain;
+ gpointer ud;
+};
+
+struct rspamd_async_event {
+ GQuark subsystem;
+ event_finalizer_t fin;
+ void *user_data;
+ struct rspamd_async_watcher *w;
+};
+
+struct rspamd_async_session {
+ session_finalizer_t fin;
+ event_finalizer_t restore;
+ event_finalizer_t cleanup;
+ GHashTable *events;
+ void *user_data;
+ rspamd_mempool_t *pool;
+ struct rspamd_async_watcher *cur_watcher;
+ guint flags;
+};
+
static gboolean
rspamd_event_equal (gconstpointer a, gconstpointer b)
{
@@ -42,27 +66,15 @@ static guint
rspamd_event_hash (gconstpointer a)
{
const struct rspamd_async_event *ev = a;
+ XXH64_state_t st;
- return GPOINTER_TO_UINT (ev->user_data);
-}
+ XXH64_reset (&st, rspamd_hash_seed ());
+ XXH64_update (&st, ev->user_data, sizeof (gpointer));
+ XXH64_update (&st, ev->fin, sizeof (*ev->fin));
-#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
-static void
-event_mutex_free (gpointer data)
-{
- GMutex *mtx = data;
-
- g_mutex_free (mtx);
+ return XXH64_digest (&st);
}
-static void
-event_cond_free (gpointer data)
-{
- GCond *cond = data;
-
- g_cond_free (cond);
-}
-#endif
struct rspamd_async_session *
new_async_session (rspamd_mempool_t * pool, session_finalizer_t fin,
@@ -70,36 +82,13 @@ new_async_session (rspamd_mempool_t * pool, session_finalizer_t fin,
{
struct rspamd_async_session *new;
- new = rspamd_mempool_alloc (pool, sizeof (struct rspamd_async_session));
+ new = rspamd_mempool_alloc0 (pool, sizeof (struct rspamd_async_session));
new->pool = pool;
new->fin = fin;
new->restore = restore;
new->cleanup = cleanup;
new->user_data = user_data;
- new->wanna_die = FALSE;
new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
-#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
- new->mtx = g_mutex_new ();
- new->cond = g_cond_new ();
- rspamd_mempool_add_destructor (pool,
- (rspamd_mempool_destruct_t) event_mutex_free,
- new->mtx);
- rspamd_mempool_add_destructor (pool,
- (rspamd_mempool_destruct_t) event_cond_free,
- new->cond);
-#else
- new->mtx = rspamd_mempool_alloc (pool, sizeof (GMutex));
- g_mutex_init (new->mtx);
- new->cond = rspamd_mempool_alloc (pool, sizeof (GCond));
- g_cond_init (new->cond);
- rspamd_mempool_add_destructor (pool,
- (rspamd_mempool_destruct_t) g_mutex_clear,
- new->mtx);
- rspamd_mempool_add_destructor (pool,
- (rspamd_mempool_destruct_t) g_cond_clear,
- new->cond);
-#endif
- new->threads = 0;
rspamd_mempool_add_destructor (pool,
(rspamd_mempool_destruct_t) g_hash_table_destroy,
@@ -121,21 +110,26 @@ register_async_event (struct rspamd_async_session *session,
return;
}
- g_mutex_lock (session->mtx);
new = rspamd_mempool_alloc (session->pool,
sizeof (struct rspamd_async_event));
new->fin = fin;
new->user_data = user_data;
new->subsystem = subsystem;
+ if (session->cur_watcher) {
+ new->w = session->cur_watcher;
+ new->w->remain ++;
+ }
+ else {
+ new->w = NULL;
+ }
+
g_hash_table_insert (session->events, new, new);
msg_debug ("added event: %p, pending %d events, subsystem: %s",
user_data,
g_hash_table_size (session->events),
g_quark_to_string (subsystem));
-
- g_mutex_unlock (session->mtx);
}
void
@@ -150,7 +144,6 @@ remove_normal_event (struct rspamd_async_session *session,
return;
}
- g_mutex_lock (session->mtx);
/* Search for event */
search_ev.fin = fin;
search_ev.user_data = ud;
@@ -163,7 +156,6 @@ remove_normal_event (struct rspamd_async_session *session,
/* Remove event */
fin (ud);
}
- g_mutex_unlock (session->mtx);
check_session_pending (session);
}
@@ -192,22 +184,10 @@ destroy_session (struct rspamd_async_session *session)
return FALSE;
}
- g_mutex_lock (session->mtx);
- if (session->threads > 0) {
- /* Wait for conditional variable to finish processing */
- g_mutex_unlock (session->mtx);
- g_cond_wait (session->cond, session->mtx);
- }
-
- session->wanna_die = TRUE;
-
g_hash_table_foreach_remove (session->events,
rspamd_session_destroy,
session);
- /* Mutex can be destroyed here */
- g_mutex_unlock (session->mtx);
-
if (session->cleanup != NULL) {
session->cleanup (session->user_data);
}
@@ -217,15 +197,8 @@ destroy_session (struct rspamd_async_session *session)
gboolean
check_session_pending (struct rspamd_async_session *session)
{
- g_mutex_lock (session->mtx);
- if (session->wanna_die && g_hash_table_size (session->events) == 0) {
- session->wanna_die = FALSE;
- if (session->threads > 0) {
- /* Wait for conditional variable to finish processing */
- g_cond_wait (session->cond, session->mtx);
- }
+ if (g_hash_table_size (session->events) == 0) {
if (session->fin != NULL) {
- g_mutex_unlock (session->mtx);
if (!session->fin (session->user_data)) {
/* Session finished incompletely, perform restoration */
if (session->restore != NULL) {
@@ -239,37 +212,9 @@ check_session_pending (struct rspamd_async_session *session)
return FALSE;
}
}
- g_mutex_unlock (session->mtx);
+
return FALSE;
}
- g_mutex_unlock (session->mtx);
- return TRUE;
-}
-
-/**
- * Add new async thread to session
- * @param session session object
- */
-void
-register_async_thread (struct rspamd_async_session *session)
-{
- g_atomic_int_inc (&session->threads);
- msg_debug ("added thread: pending %d thread", session->threads);
-}
-
-/**
- * Remove async thread from session and check whether session can be terminated
- * @param session session object
- */
-void
-remove_async_thread (struct rspamd_async_session *session)
-{
- if (g_atomic_int_dec_and_test (&session->threads)) {
- /* Signal if there are any sessions waiting */
- g_mutex_lock (session->mtx);
- g_cond_signal (session->cond);
- g_mutex_unlock (session->mtx);
- }
- msg_debug ("removed thread: pending %d thread", session->threads);
+ return TRUE;
}
diff --git a/src/libserver/events.h b/src/libserver/events.h
index e751c1054..442834f98 100644
--- a/src/libserver/events.h
+++ b/src/libserver/events.h
@@ -29,29 +29,11 @@
#include "mem_pool.h"
struct rspamd_async_event;
+struct rspamd_async_session;
-typedef void (*event_finalizer_t)(void *user_data);
-typedef gboolean (*session_finalizer_t)(void *user_data);
-
-struct rspamd_async_event {
- GQuark subsystem;
- event_finalizer_t fin;
- void *user_data;
- guint ref;
-};
-
-struct rspamd_async_session {
- session_finalizer_t fin;
- event_finalizer_t restore;
- event_finalizer_t cleanup;
- GHashTable *events;
- void *user_data;
- rspamd_mempool_t *pool;
- gboolean wanna_die;
- guint threads;
- GMutex *mtx;
- GCond *cond;
-};
+typedef void (*event_finalizer_t)(gpointer ud);
+typedef void (*event_watcher_t)(guint remain, gboolean terminated, gpointer ud);
+typedef gboolean (*session_finalizer_t)(gpointer user_data);
/**
* Make new async session
@@ -64,7 +46,7 @@ struct rspamd_async_session {
*/
struct rspamd_async_session * new_async_session (rspamd_mempool_t *pool,
session_finalizer_t fin, event_finalizer_t restore,
- event_finalizer_t cleanup, void *user_data);
+ event_finalizer_t cleanup, gpointer user_data);
/**
* Insert new event to the session
@@ -74,7 +56,7 @@ struct rspamd_async_session * new_async_session (rspamd_mempool_t *pool,
* @param forced unused
*/
void register_async_event (struct rspamd_async_session *session,
- event_finalizer_t fin, void *user_data, GQuark subsystem);
+ event_finalizer_t fin, gpointer user_data, GQuark subsystem);
/**
* Remove normal event
@@ -84,7 +66,7 @@ void register_async_event (struct rspamd_async_session *session,
*/
void remove_normal_event (struct rspamd_async_session *session,
event_finalizer_t fin,
- void *ud);
+ gpointer ud);
/**
* Must be called at the end of session, it calls fin functions for all non-forced callbacks
@@ -99,16 +81,4 @@ gboolean destroy_session (struct rspamd_async_session *session);
*/
gboolean check_session_pending (struct rspamd_async_session *session);
-/**
- * Add new async thread to session
- * @param session session object
- */
-void register_async_thread (struct rspamd_async_session *session);
-
-/**
- * Remove async thread from session and check whether session can be terminated
- * @param session session object
- */
-void remove_async_thread (struct rspamd_async_session *session);
-
#endif /* RSPAMD_EVENTS_H */