/*
 * Copyright (c) 2009-2012, Vsevolod Stakhov
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *	 * Redistributions of source code must retain the above copyright
 *	   notice, this list of conditions and the following disclaimer.
 *	 * Redistributions in binary form must reproduce the above copyright
 *	   notice, this list of conditions and the following disclaimer in the
 *	   documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#include "config.h"
#include "main.h"
#include "events.h"

#undef RSPAMD_EVENTS_DEBUG

static gboolean
rspamd_event_equal (gconstpointer a, gconstpointer b)
{
	const struct rspamd_async_event  *ev1 = a, *ev2 = b;

	if (ev1->fin == ev2->fin) {
		return ev1->user_data == ev2->user_data;
	}

	return FALSE;
}

static guint
rspamd_event_hash (gconstpointer a)
{
	const struct rspamd_async_event  *ev = a;

	return GPOINTER_TO_UINT (ev->user_data);
}

#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
static void
event_mutex_free (gpointer data)
{
	GMutex						   *mtx = data;

	g_mutex_free (mtx);
}

static void
event_cond_free (gpointer data)
{
	GCond						   *cond = data;

	g_cond_free (cond);
}
#endif

struct rspamd_async_session    *
new_async_session (memory_pool_t * pool, session_finalizer_t fin,
		event_finalizer_t restore, event_finalizer_t cleanup, void *user_data)
{
	struct rspamd_async_session    *new;

	new = memory_pool_alloc (pool, sizeof (struct rspamd_async_session));
	new->pool = pool;
	new->fin = fin;
	new->restore = restore;
	new->cleanup = cleanup;
	new->user_data = user_data;
	new->wanna_die = FALSE;
	new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
	new->mtx = g_mutex_new ();
	new->cond = g_cond_new ();
	memory_pool_add_destructor (pool, (pool_destruct_func) event_mutex_free, new->mtx);
	memory_pool_add_destructor (pool, (pool_destruct_func) event_cond_free, new->cond);
#else
	new->mtx = memory_pool_alloc (pool, sizeof (GMutex));
	g_mutex_init (new->mtx);
	new->cond = memory_pool_alloc (pool, sizeof (GCond));
	g_cond_init (new->cond);
	memory_pool_add_destructor (pool, (pool_destruct_func) g_mutex_clear, new->mtx);
	memory_pool_add_destructor (pool, (pool_destruct_func) g_cond_clear, new->cond);
#endif
	new->threads = 0;

	memory_pool_add_destructor (pool, (pool_destruct_func) g_hash_table_destroy, new->events);

	return new;
}

void
register_async_event (struct rspamd_async_session *session, event_finalizer_t fin, void *user_data, GQuark subsystem)
{
	struct rspamd_async_event      *new;

	if (session == NULL) {
		msg_info ("session is NULL");
		return;
	}

	g_mutex_lock (session->mtx);
	new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event));
	new->fin = fin;
	new->user_data = user_data;
	new->subsystem = subsystem;

	g_hash_table_insert (session->events, new, new);
#ifdef RSPAMD_EVENTS_DEBUG
	msg_info ("added event: %p, pending %d events, subsystem: %s", user_data, g_hash_table_size (session->events),
			g_quark_to_string (subsystem));
#endif
	g_mutex_unlock (session->mtx);
}

void
remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin, void *ud)
{
	struct rspamd_async_event       search_ev, *found_ev;

	if (session == NULL) {
		msg_info ("session is NULL");
		return;
	}

	g_mutex_lock (session->mtx);
	/* Search for event */
	search_ev.fin = fin;
	search_ev.user_data = ud;
	if ((found_ev = g_hash_table_lookup (session->events, &search_ev)) != NULL) {
		g_hash_table_remove (session->events, found_ev);
#ifdef RSPAMD_EVENTS_DEBUG
		msg_info ("removed event: %p, subsystem: %s, pending %d events", ud,
			g_quark_to_string (found_ev->subsystem), g_hash_table_size (session->events));
#endif
		/* Remove event */
		fin (ud);
	}
	g_mutex_unlock (session->mtx);

	check_session_pending (session);
}

static gboolean
rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
{
	struct rspamd_async_event      *ev = v;

	/* Call event's finalizer */
	if (ev->fin != NULL) {
		ev->fin (ev->user_data);
	}

	return TRUE;
}

gboolean
destroy_session (struct rspamd_async_session *session)
{
	if (session == NULL) {
		msg_info ("session is NULL");
		return FALSE;
	}

	g_mutex_lock (session->mtx);
	if (session->threads > 0) {
	/* Wait for conditional variable to finish processing */
		g_mutex_unlock (session->mtx);
		g_cond_wait (session->cond, session->mtx);
	}

	session->wanna_die = TRUE;

	g_hash_table_foreach_remove (session->events, rspamd_session_destroy, session);

	/* Mutex can be destroyed here */
	g_mutex_unlock (session->mtx);

	if (session->cleanup != NULL) {
		session->cleanup (session->user_data);
	}
	return TRUE;
}

gboolean
check_session_pending (struct rspamd_async_session *session)
{
	g_mutex_lock (session->mtx);
	if (session->wanna_die && g_hash_table_size (session->events) == 0) {
		session->wanna_die = FALSE;
		if (session->threads > 0) {
			/* Wait for conditional variable to finish processing */
			g_cond_wait (session->cond, session->mtx);
		}
		if (session->fin != NULL) {
			if (! session->fin (session->user_data)) {
				g_mutex_unlock (session->mtx);
				/* Session finished incompletely, perform restoration */
				if (session->restore != NULL) {
					session->restore (session->user_data);
					/* Call pending once more */
					return check_session_pending (session);
				}
				return TRUE;
			}
		}
		g_mutex_unlock (session->mtx);
		return FALSE;
	}
	g_mutex_unlock (session->mtx);
	return TRUE;
}


/**
 * Add new async thread to session
 * @param session session object
 */
void
register_async_thread (struct rspamd_async_session *session)
{
	g_atomic_int_inc (&session->threads);
#ifdef RSPAMD_EVENTS_DEBUG
	msg_info ("added thread: pending %d thread", session->threads);
#endif
}

/**
 * Remove async thread from session and check whether session can be terminated
 * @param session session object
 */
void
remove_async_thread (struct rspamd_async_session *session)
{
	if (g_atomic_int_dec_and_test (&session->threads)) {
		/* Signal if there are any sessions waiting */
		g_mutex_lock (session->mtx);
		g_cond_signal (session->cond);
		g_mutex_unlock (session->mtx);
	}
#ifdef RSPAMD_EVENTS_DEBUG
	msg_info ("removed thread: pending %d thread", session->threads);
#endif
}