aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-01-31 21:24:32 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-01-31 21:24:32 +0400
commit2d708971163dc99f9c29cc47e7d4f56a3af882c5 (patch)
tree91d29c6ac7727405e01f065eabe062431302483c /src
parent3958296431df16a30550047103008c1324fa331a (diff)
downloadrspamd-2d708971163dc99f9c29cc47e7d4f56a3af882c5.tar.gz
rspamd-2d708971163dc99f9c29cc47e7d4f56a3af882c5.zip
Rework events library slightly:
- forced events are no longer checked or created - add async threads to prevent session to be destroyed till all threads are finished
Diffstat (limited to 'src')
-rw-r--r--src/events.c81
-rw-r--r--src/events.h25
-rw-r--r--src/plugins/regexp.c2
-rw-r--r--src/smtp.c3
4 files changed, 45 insertions, 66 deletions
diff --git a/src/events.c b/src/events.c
index eb1e26094..bf47bb1f8 100644
--- a/src/events.c
+++ b/src/events.c
@@ -62,10 +62,9 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin,
new->user_data = user_data;
new->wanna_die = FALSE;
new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
- new->forced_events = g_queue_new ();
+ new->threads = 0;
memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events);
- memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->forced_events);
return new;
}
@@ -73,32 +72,17 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin,
void
register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced)
{
- struct rspamd_async_event *new, *ev;
- GList *cur;
+ struct rspamd_async_event *new;
if (session == NULL) {
msg_info ("session is NULL");
return;
}
- if (forced) {
- /* For forced events try first to increase its reference */
- cur = session->forced_events->head;
- while (cur) {
- ev = cur->data;
- if (ev->forced && ev->fin == fin) {
- ev->ref++;
- return;
- }
- cur = g_list_next (cur);
- }
- }
-
new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event));
new->fin = fin;
new->user_data = user_data;
- new->forced = forced;
- new->ref = 1;
+
g_hash_table_insert (session->events, new, new);
#ifdef RSPAMD_EVENTS_DEBUG
msg_info ("added event: %p, pending %d events", user_data, g_hash_table_size (session->events));
@@ -106,38 +90,6 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
}
void
-remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin)
-{
- struct rspamd_async_event *ev;
- GList *cur;
-
- if (session == NULL) {
- msg_info ("session is NULL");
- return;
- }
-
- cur = session->forced_events->head;
- while (cur) {
- ev = cur->data;
- if (ev->forced && ev->fin == fin) {
- ev->ref--;
- if (ev->ref == 0) {
- g_queue_delete_link (session->forced_events, cur);
- }
- break;
- }
- cur = g_list_next (cur);
- }
-
- check_session_pending (session);
-
- if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->forced_events) == 0) {
- /* Call session destroy after all forced events are ready */
- session->cleanup (session->user_data);
- }
-}
-
-void
remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud)
{
struct rspamd_async_event search_ev;
@@ -183,7 +135,7 @@ destroy_session (struct rspamd_async_session *session)
g_hash_table_foreach (session->events, rspamd_session_destroy, session);
- if (g_queue_get_length (session->forced_events) == 0) {
+ if (session->threads == 0) {
if (session->cleanup != NULL) {
session->cleanup (session->user_data);
}
@@ -196,7 +148,7 @@ destroy_session (struct rspamd_async_session *session)
gboolean
check_session_pending (struct rspamd_async_session *session)
{
- if (g_queue_get_length (session->forced_events) == 0 && g_hash_table_size (session->events) == 0) {
+ if (session->threads == 0 && g_hash_table_size (session->events) == 0) {
if (session->fin != NULL) {
session->fin (session->user_data);
}
@@ -212,3 +164,26 @@ check_session_pending (struct rspamd_async_session *session)
return TRUE;
}
+
+
+/**
+ * Add new async thread to session
+ * @param session session object
+ */
+void
+register_async_thread (struct rspamd_async_session *session)
+{
+ g_atomic_int_inc (&session->threads);
+}
+
+/**
+ * Remove async thread from session and check whether session can be terminated
+ * @param session session object
+ */
+void
+remove_async_thread (struct rspamd_async_session *session)
+{
+ if (g_atomic_int_dec_and_test (&session->threads)) {
+ (void) check_session_pending (session);
+ }
+}
diff --git a/src/events.h b/src/events.h
index 2aba3eb2e..138920060 100644
--- a/src/events.h
+++ b/src/events.h
@@ -10,7 +10,6 @@ typedef void (*event_finalizer_t)(void *user_data);
struct rspamd_async_event {
event_finalizer_t fin;
void *user_data;
- gboolean forced;
guint ref;
};
@@ -19,10 +18,10 @@ struct rspamd_async_session {
event_finalizer_t restore;
event_finalizer_t cleanup;
GHashTable *events;
- GQueue *forced_events;
void *user_data;
memory_pool_t *pool;
gboolean wanna_die;
+ guint threads;
};
/**
@@ -43,19 +42,11 @@ struct rspamd_async_session *new_async_session (memory_pool_t *pool,
* @param session session object
* @param fin finalizer callback
* @param user_data abstract user_data
- * @param forced session cannot be destroyed until forced event are still in it
+ * @param forced unused
*/
void register_async_event (struct rspamd_async_session *session,
event_finalizer_t fin, void *user_data, gboolean forced);
-
-/**
- * Remove forced event
- * @param session session object
- * @param fin destructor function
- */
-void remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin);
-
/**
* Remove normal event
* @param session session object
@@ -77,4 +68,16 @@ gboolean destroy_session (struct rspamd_async_session *session);
*/
gboolean check_session_pending (struct rspamd_async_session *session);
+/**
+ * Add new async thread to session
+ * @param session session object
+ */
+void register_async_thread (struct rspamd_async_session *session);
+
+/**
+ * Remove async thread from session and check whether session can be terminated
+ * @param session session object
+ */
+void remove_async_thread (struct rspamd_async_session *session);
+
#endif /* RSPAMD_EVENTS_H */
diff --git a/src/plugins/regexp.c b/src/plugins/regexp.c
index 902078d29..9d2210c56 100644
--- a/src/plugins/regexp.c
+++ b/src/plugins/regexp.c
@@ -1227,6 +1227,7 @@ process_regexp_item_threaded (gpointer data, gpointer user_data)
g_mutex_unlock (workers_mtx);
}
}
+ remove_async_thread (ud->task->s);
}
static void
@@ -1240,6 +1241,7 @@ process_regexp_item (struct worker_task *task, void *user_data)
thr_ud = memory_pool_alloc (task->task_pool, sizeof (struct regexp_threaded_ud));
thr_ud->item = item;
thr_ud->task = task;
+ register_async_thread (task->s);
g_thread_pool_push (regexp_module_ctx->workers, thr_ud, NULL);
}
else {
diff --git a/src/smtp.c b/src/smtp.c
index 2b5236cc6..46df67b36 100644
--- a/src/smtp.c
+++ b/src/smtp.c
@@ -589,8 +589,7 @@ smtp_dns_cb (struct rspamd_dns_reply *reply, void *arg)
gint res = 0;
union rspamd_reply_element *elt;
GList *cur;
-
- remove_forced_event (session->s, (event_finalizer_t)smtp_dns_cb);
+
switch (session->state) {
case SMTP_STATE_RESOLVE_REVERSE:
/* Parse reverse reply and start resolve of this ip */