]> source.dussan.org Git - rspamd.git/commitdiff
Rework events library slightly:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 31 Jan 2012 17:24:32 +0000 (21:24 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 31 Jan 2012 17:24:32 +0000 (21:24 +0400)
 - forced events are no longer checked or created
 - add async threads to prevent session to be destroyed till all threads are finished

src/events.c
src/events.h
src/plugins/regexp.c
src/smtp.c

index eb1e260941e14d491d5cdac58f12fdc5520bd2ec..bf47bb1f89a534ca3a1241fd420791b58329fd87 100644 (file)
@@ -62,10 +62,9 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin,
        new->user_data = user_data;
        new->wanna_die = FALSE;
        new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
-       new->forced_events = g_queue_new ();
+       new->threads = 0;
 
        memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events);
-       memory_pool_add_destructor (pool, (pool_destruct_func) g_queue_free, new->forced_events);
 
        return new;
 }
@@ -73,70 +72,23 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin,
 void
 register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, gboolean forced)
 {
-       struct rspamd_async_event      *new, *ev;
-       GList                          *cur;
+       struct rspamd_async_event      *new;
 
        if (session == NULL) {
                msg_info ("session is NULL");
                return;
        }
 
-       if (forced) {
-               /* For forced events try first to increase its reference */
-               cur = session->forced_events->head;
-               while (cur) {
-                       ev = cur->data;
-                       if (ev->forced && ev->fin == fin) {
-                               ev->ref++;
-                               return;
-                       }
-                       cur = g_list_next (cur);
-               }
-       }
-
        new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event));
        new->fin = fin;
        new->user_data = user_data;
-       new->forced = forced;
-       new->ref = 1;
+
        g_hash_table_insert (session->events, new, new);
 #ifdef RSPAMD_EVENTS_DEBUG
        msg_info ("added event: %p, pending %d events", user_data, g_hash_table_size (session->events));
 #endif
 }
 
-void
-remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin)
-{
-       struct rspamd_async_event      *ev;
-       GList                          *cur;
-
-       if (session == NULL) {
-               msg_info ("session is NULL");
-               return;
-       }
-
-       cur = session->forced_events->head;
-       while (cur) {
-               ev = cur->data;
-               if (ev->forced && ev->fin == fin) {
-                       ev->ref--;
-                       if (ev->ref == 0) {
-                               g_queue_delete_link (session->forced_events, cur);
-                       }
-                       break;
-               }
-               cur = g_list_next (cur);
-       }
-
-       check_session_pending (session);
-
-       if (session->wanna_die && session->fin != NULL && g_queue_get_length (session->forced_events) == 0) {
-               /* Call session destroy after all forced events are ready */
-               session->cleanup (session->user_data);
-       }
-}
-
 void
 remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud)
 {
@@ -183,7 +135,7 @@ destroy_session (struct rspamd_async_session *session)
 
        g_hash_table_foreach (session->events, rspamd_session_destroy, session);
 
-       if (g_queue_get_length (session->forced_events) == 0) {
+       if (session->threads == 0) {
                if (session->cleanup != NULL) {
                        session->cleanup (session->user_data);
                }
@@ -196,7 +148,7 @@ destroy_session (struct rspamd_async_session *session)
 gboolean
 check_session_pending (struct rspamd_async_session *session)
 {
-       if (g_queue_get_length (session->forced_events) == 0 && g_hash_table_size (session->events) == 0) {
+       if (session->threads == 0 && g_hash_table_size (session->events) == 0) {
                if (session->fin != NULL) {
                        session->fin (session->user_data);
                }
@@ -212,3 +164,26 @@ check_session_pending (struct rspamd_async_session *session)
 
        return TRUE;
 }
+
+
+/**
+ * Add new async thread to session
+ * @param session session object
+ */
+void
+register_async_thread (struct rspamd_async_session *session)
+{
+       g_atomic_int_inc (&session->threads);
+}
+
+/**
+ * Remove async thread from session and check whether session can be terminated
+ * @param session session object
+ */
+void
+remove_async_thread (struct rspamd_async_session *session)
+{
+       if (g_atomic_int_dec_and_test (&session->threads)) {
+               (void) check_session_pending (session);
+       }
+}
index 2aba3eb2ea494702d27883c6d2e6df6276c59998..138920060bd76bd38aef09de36d79d5987036265 100644 (file)
@@ -10,7 +10,6 @@ typedef void (*event_finalizer_t)(void *user_data);
 struct rspamd_async_event {
        event_finalizer_t fin;
        void *user_data;
-       gboolean forced;
        guint ref;
 };
 
@@ -19,10 +18,10 @@ struct rspamd_async_session {
        event_finalizer_t restore;
        event_finalizer_t cleanup;
        GHashTable *events;
-       GQueue *forced_events;
        void *user_data;
        memory_pool_t *pool;
        gboolean wanna_die;
+       guint threads;
 };
 
 /**
@@ -43,19 +42,11 @@ struct rspamd_async_session *new_async_session (memory_pool_t *pool,
  * @param session session object
  * @param fin finalizer callback
  * @param user_data abstract user_data
- * @param forced session cannot be destroyed until forced event are still in it
+ * @param forced unused
  */
 void register_async_event (struct rspamd_async_session *session,
                event_finalizer_t fin, void *user_data, gboolean forced);
 
-
-/**
- * Remove forced event
- * @param session session object
- * @param fin destructor function
- */
-void remove_forced_event (struct rspamd_async_session *session, event_finalizer_t fin);
-
 /**
  * Remove normal event
  * @param session session object
@@ -77,4 +68,16 @@ gboolean destroy_session (struct rspamd_async_session *session);
  */
 gboolean check_session_pending (struct rspamd_async_session *session);
 
+/**
+ * Add new async thread to session
+ * @param session session object
+ */
+void register_async_thread (struct rspamd_async_session *session);
+
+/**
+ * Remove async thread from session and check whether session can be terminated
+ * @param session session object
+ */
+void remove_async_thread (struct rspamd_async_session *session);
+
 #endif /* RSPAMD_EVENTS_H */
index 902078d29719f9a141156528b23db53495ea3eff..9d2210c5643b83548349d2ed4e4b41f09302845d 100644 (file)
@@ -1227,6 +1227,7 @@ process_regexp_item_threaded (gpointer data, gpointer user_data)
                        g_mutex_unlock (workers_mtx);
                }
        }
+       remove_async_thread (ud->task->s);
 }
 
 static void
@@ -1240,6 +1241,7 @@ process_regexp_item (struct worker_task *task, void *user_data)
                thr_ud = memory_pool_alloc (task->task_pool, sizeof (struct regexp_threaded_ud));
                thr_ud->item = item;
                thr_ud->task = task;
+               register_async_thread (task->s);
                g_thread_pool_push (regexp_module_ctx->workers, thr_ud, NULL);
        }
        else {
index 2b5236cc6e18e37e6b674d92a3284cb670afc668..46df67b3646fb7cba0d2c2312f54a705fc384f54 100644 (file)
@@ -589,8 +589,7 @@ smtp_dns_cb (struct rspamd_dns_reply *reply, void *arg)
        gint                            res = 0;
        union rspamd_reply_element     *elt;
        GList                          *cur;
-       
-       remove_forced_event (session->s, (event_finalizer_t)smtp_dns_cb);
+
        switch (session->state) {
                case SMTP_STATE_RESOLVE_REVERSE:
                        /* Parse reverse reply and start resolve of this ip */