return FALSE;
}
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+static void
+bayes_mutex_free (gpointer data)
+{
+ GMutex *mtx = data;
+
+ g_mutex_free (mtx);
+}
+#endif
+
struct classifier_ctx*
bayes_init (memory_pool_t *pool, struct classifier_config *cfg)
{
ctx->pool = pool;
ctx->cfg = cfg;
ctx->debug = FALSE;
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ ctx->mtx = g_mutex_new ();
+ memory_pool_add_destructor (pool, (pool_destruct_func) bayes_mutex_free, ctx->mtx);
+#else
+ ctx->mtx = memory_pool_alloc (pool, sizeof (GMutex));
+ g_mutex_init (ctx->mtx);
+#endif
return ctx;
}
}
}
+ /* Critical section as here can be lua callbacks calling */
+ g_mutex_lock (ctx->mtx);
cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE);
if (cur) {
memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_list_free, cur);
else {
cur = ctx->cfg->statfiles;
}
+ g_mutex_unlock (ctx->mtx);
data.statfiles_num = g_list_length (cur);
data.statfiles = g_new0 (struct bayes_statfile_data, data.statfiles_num);
GHashTable *results;
gboolean debug;
struct classifier_config *cfg;
+ GMutex *mtx;
};
struct classify_weight {
}
/*
- * Called if all filters are processed
+ * Called if all filters are processed, non-threaded and simple version
*/
-static void
+static gboolean
fin_learn_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
rspamd_dispatcher_restore (task->dispatcher);
}
}
+
+ return TRUE;
}
/*
g_mutex_free (mtx);
}
+
+static void
+event_cond_free (gpointer data)
+{
+ GCond *cond = data;
+
+ g_cond_free (cond);
+}
#endif
struct rspamd_async_session *
-new_async_session (memory_pool_t * pool, event_finalizer_t fin,
+new_async_session (memory_pool_t * pool, session_finalizer_t fin,
event_finalizer_t restore, event_finalizer_t cleanup, void *user_data)
{
struct rspamd_async_session *new;
new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
new->mtx = g_mutex_new ();
+ new->cond = g_cond_new ();
memory_pool_add_destructor (pool, (pool_destruct_func) event_mutex_free, new->mtx);
+ memory_pool_add_destructor (pool, (pool_destruct_func) event_cond_free, new->cond);
#else
new->mtx = memory_pool_alloc (pool, sizeof (GMutex));
g_mutex_init (new->mtx);
+ new->cond = memory_pool_alloc (pool, sizeof (GCond));
+ g_cond_init (new->cond);
#endif
new->threads = 0;
return;
}
+ g_mutex_lock (session->mtx);
new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event));
new->fin = fin;
new->user_data = user_data;
msg_info ("added event: %p, pending %d events, subsystem: %s", user_data, g_hash_table_size (session->events),
g_quark_to_string (subsystem));
#endif
+ g_mutex_unlock (session->mtx);
}
void
return;
}
+ g_mutex_lock (session->mtx);
/* Search for event */
search_ev.fin = fin;
search_ev.user_data = ud;
/* Remove event */
fin (ud);
}
-
-
+ g_mutex_unlock (session->mtx);
check_session_pending (session);
}
-static void
+static gboolean
rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
{
struct rspamd_async_event *ev = v;
if (ev->fin != NULL) {
ev->fin (ev->user_data);
}
+
+ return TRUE;
}
gboolean
return FALSE;
}
+ g_mutex_lock (session->mtx);
+ if (session->threads > 0) {
+ /* Wait for conditional variable to finish processing */
+ g_mutex_unlock (session->mtx);
+ g_cond_wait (session->cond, session->mtx);
+ }
+
session->wanna_die = TRUE;
- g_hash_table_foreach (session->events, rspamd_session_destroy, session);
+ g_hash_table_foreach_remove (session->events, rspamd_session_destroy, session);
- if (session->threads == 0) {
- if (session->cleanup != NULL) {
- session->cleanup (session->user_data);
- }
- return TRUE;
- }
+ /* Mutex can be destroyed here */
+ g_mutex_unlock (session->mtx);
- return FALSE;
+ if (session->cleanup != NULL) {
+ session->cleanup (session->user_data);
+ }
+ return TRUE;
}
gboolean
check_session_pending (struct rspamd_async_session *session)
{
g_mutex_lock (session->mtx);
- if (session->wanna_die && session->threads == 0 && g_hash_table_size (session->events) == 0) {
+ if (session->wanna_die && g_hash_table_size (session->events) == 0) {
session->wanna_die = FALSE;
- if (session->fin != NULL) {
- session->fin (session->user_data);
+ if (session->threads > 0) {
+ /* Wait for conditional variable to finish processing */
+ g_cond_wait (session->cond, session->mtx);
}
- /* Check events count again */
- if (g_hash_table_size (session->events) != 0) {
- if (session->restore != NULL) {
- session->restore (session->user_data);
+ if (session->fin != NULL) {
+ if (! session->fin (session->user_data)) {
+ g_mutex_unlock (session->mtx);
+ /* Session finished incompletely, perform restoration */
+ if (session->restore != NULL) {
+ session->restore (session->user_data);
+ /* Call pending once more */
+ return check_session_pending (session);
+ }
+ return TRUE;
}
- g_mutex_unlock (session->mtx);
- return TRUE;
}
g_mutex_unlock (session->mtx);
return FALSE;
remove_async_thread (struct rspamd_async_session *session)
{
if (g_atomic_int_dec_and_test (&session->threads)) {
- (void) check_session_pending (session);
+ /* Signal if there are any sessions waiting */
+ g_mutex_lock (session->mtx);
+ g_cond_signal (session->cond);
+ g_mutex_unlock (session->mtx);
}
#ifdef RSPAMD_EVENTS_DEBUG
msg_info ("removed thread: pending %d thread", session->threads);
struct rspamd_async_event;
typedef void (*event_finalizer_t)(void *user_data);
+typedef gboolean (*session_finalizer_t)(void *user_data);
struct rspamd_async_event {
GQuark subsystem;
};
struct rspamd_async_session {
- event_finalizer_t fin;
+ session_finalizer_t fin;
event_finalizer_t restore;
event_finalizer_t cleanup;
GHashTable *events;
gboolean wanna_die;
guint threads;
GMutex *mtx;
+ GCond *cond;
};
/**
* @return
*/
struct rspamd_async_session *new_async_session (memory_pool_t *pool,
- event_finalizer_t fin, event_finalizer_t restore,
+ session_finalizer_t fin, event_finalizer_t restore,
event_finalizer_t cleanup, void *user_data);
/**
}
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+static GStaticMutex result_mtx = G_STATIC_MUTEX_INIT;
+#else
+G_LOCK_DEFINE (result_mtx);
+#endif
+
static void
insert_result_common (struct worker_task *task, const gchar *symbol, double flag, GList * opts, gboolean single)
{
struct cache_item *item;
GList *cur, *metric_list;
+ /* Avoid concurrenting inserting of results */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_lock (&result_mtx);
+#else
+ G_LOCK (result_mtx);
+#endif
metric_list = g_hash_table_lookup (task->cfg->metrics_symbols, symbol);
if (metric_list) {
cur = metric_list;
/* XXX: it is not wise to destroy them here */
g_list_free (opts);
}
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ g_static_mutex_unlock (&result_mtx);
+#else
+ G_UNLOCK (result_mtx);
+#endif
}
/* Insert result that may be increased on next insertions */
struct classifier_callback_data *cd;
lua_State *L;
+
/* Go throught all callbacks and call them, appending results to list */
cur = g_list_first (ccf->pre_callbacks);
while (cur) {
cur = g_list_next (cur);
}
+ g_mutex_lock (lua_mtx);
if (res == NULL) {
L = task->cfg->lua_state;
/* Check function from global table 'classifiers' */
}
lua_pop (L, 1);
}
+ g_mutex_unlock (lua_mtx);
+
return res;
}
double out = in;
GList *cur;
+ g_mutex_lock (lua_mtx);
/* Go throught all callbacks and call them, appending results to list */
cur = g_list_first (ccf->pre_callbacks);
while (cur) {
cur = g_list_next (cur);
}
+ g_mutex_unlock (lua_mtx);
return out;
/* Lua module init function */
#define MODULE_INIT_FUNC "module_init"
+/* Global lua mutex */
+GMutex *lua_mtx = NULL;
+
const luaL_reg null_reg[] = {
{"__tostring", lua_class_tostring},
{NULL, NULL}
L = lua_open ();
luaL_openlibs (L);
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ lua_mtx = g_mutex_new ();
+#else
+ lua_mtx = g_malloc (sizeof (GMutex));
+ g_mutex_init (lua_mtx);
+#endif
+
(void)luaopen_rspamd (L);
(void)luaopen_logger (L);
(void)luaopen_config (L);
struct worker_task **ptask;
lua_State *L = task->cfg->lua_state;
+ g_mutex_lock (lua_mtx);
lua_getglobal (L, function);
ptask = lua_newuserdata (L, sizeof (struct worker_task *));
lua_setclass (L, "rspamd{task}", -1);
}
result = lua_tonumber (L, -1);
lua_pop (L, 1); /* pop returned value */
+ g_mutex_unlock (lua_mtx);
+
return result;
}
guint i;
lua_State *L = task->cfg->lua_state;
+ g_mutex_lock (lua_mtx);
lua_getglobal (L, function);
for (i = 0; i < number; i++) {
}
result = lua_tonumber (L, -1);
lua_pop (L, 1); /* pop returned value */
+ g_mutex_unlock (lua_mtx);
+
return result;
}
struct expression_argument *arg;
int nargs = 1, pop = 0;
+ g_mutex_lock (lua_mtx);
/* Call specified function and expect result of given expected_type */
/* First check function in config table */
lua_getglobal (L, "config");
if (lua_pcall (L, nargs, 1, 0) != 0) {
msg_info ("call to %s failed: %s", function, lua_tostring (L, -1));
+ g_mutex_unlock (lua_mtx);
return FALSE;
}
pop ++;
if (!lua_isboolean (L, -1)) {
lua_pop (L, pop);
msg_info ("function %s must return a boolean", function);
+ g_mutex_unlock (lua_mtx);
return FALSE;
}
*res = lua_toboolean (L, -1);
lua_pop (L, pop);
+ g_mutex_unlock (lua_mtx);
return TRUE;
}
struct consolidation_callback_data *data = (struct consolidation_callback_data *)arg;
lua_State *L = data->task->cfg->lua_state;
+ g_mutex_lock (lua_mtx);
lua_getglobal (L, data->func);
lua_pushstring (L, (const gchar *)key);
res = lua_tonumber (L, -1);
lua_pop (L, 1); /* pop returned value */
data->score += res;
+ g_mutex_unlock (lua_mtx);
}
double
return score;
}
+ g_mutex_lock (lua_mtx);
lua_getglobal (L, p->data);
lua_pushnumber (L, score);
res = lua_tonumber (L, -1);
lua_pop (L, 1);
+ g_mutex_unlock (lua_mtx);
return res;
}
#define LUA_INTERFACE_DEF(class, name) { #name, lua_##class##_##name }
extern const luaL_reg null_reg[];
+extern GMutex *lua_mtx;
#define RSPAMD_LUA_API_VERSION 9
GList *cur;
gboolean res = FALSE;
+ g_mutex_lock (lua_mtx);
if (cd->cb_is_ref) {
lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
}
lua_pop (cd->L, 1);
}
+ g_mutex_unlock (lua_mtx);
return res;
}
struct worker_task **ptask;
GList *cur;
+ g_mutex_lock (lua_mtx);
cur = task->cfg->post_filters;
while (cur) {
cd = cur->data;
}
cur = g_list_next (cur);
}
+ g_mutex_unlock (lua_mtx);
}
static gint
struct lua_callback_data *cd = ud;
struct worker_task **ptask;
+ g_mutex_lock (lua_mtx);
if (cd->cb_is_ref) {
lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
msg_info ("call to %s failed: %s", cd->cb_is_ref ? "local function" :
cd->callback.name, lua_tostring (cd->L, -1));
}
+ g_mutex_unlock (lua_mtx);
}
static gint
union rspamd_reply_element *elt;
GList *cur;
+ g_mutex_lock (lua_mtx);
if (cd->cb_is_ref) {
lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
if (cd->cb_is_ref) {
luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
+ g_mutex_unlock (lua_mtx);
}
static gint
# define STAT_UNLOCK() do {} while (0)
#endif
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+# define POOL_MTX_LOCK() do { g_static_mutex_lock (&pool->mtx); } while (0)
+# define POOL_MTX_UNLOCK() do { g_static_mutex_unlock (&pool->mtx); } while (0)
+#else
+# define POOL_MTX_LOCK() do { g_mutex_lock (&pool->mtx); } while (0)
+# define POOL_MTX_UNLOCK() do { g_mutex_unlock (&pool->mtx); } while (0)
+#endif
+
/*
* This define specify whether we should check all pools for free space for new object
* or just begin scan from current (recently attached) pool
struct _pool_chain_shared *chain;
gpointer map;
+
#if defined(HAVE_MMAP_ANON)
map = mmap (NULL, size + sizeof (struct _pool_chain_shared), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0);
if (map == MAP_FAILED) {
new->destructors = NULL;
/* Set it upon first call of set variable */
new->variables = NULL;
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ new->mtx = G_STATIC_MUTEX_INIT;
+#else
+ g_mutex_init (&new->mtx);
+#endif
mem_pool_stat->pools_allocated++;
gint free;
if (pool) {
+ POOL_MTX_LOCK ();
#ifdef MEMORY_GREEDY
cur = pool->first_pool;
#else
}
if (free < (gint)size && cur->next == NULL) {
/* Allocate new pool */
+
if (cur->len >= size + MEM_ALIGNMENT) {
new = pool_chain_new (cur->len);
}
/* No need to align again */
tmp = new->pos;
new->pos = tmp + size;
+ POOL_MTX_UNLOCK ();
return tmp;
}
tmp = align_ptr (cur->pos, MEM_ALIGNMENT);
cur->pos = tmp + size;
+ POOL_MTX_UNLOCK ();
return tmp;
}
return NULL;
if (pool) {
g_return_val_if_fail (size > 0, NULL);
+ POOL_MTX_LOCK ();
cur = pool->shared_pool;
if (!cur) {
cur = pool_chain_new_shared (pool->first_pool->len);
}
if (free < (gint)size && cur->next == NULL) {
/* Allocate new pool */
+
if (cur->len >= size + MEM_ALIGNMENT) {
new = pool_chain_new_shared (cur->len);
}
STAT_LOCK ();
mem_pool_stat->bytes_allocated += size;
STAT_UNLOCK ();
+ POOL_MTX_UNLOCK ();
return new->begin;
}
tmp = align_ptr (cur->pos, MEM_ALIGNMENT);
cur->pos = tmp + size;
+ POOL_MTX_UNLOCK ();
return tmp;
}
return NULL;
cur = memory_pool_alloc (pool, sizeof (struct _pool_destructors));
if (cur) {
+ POOL_MTX_LOCK ();
cur->func = func;
cur->data = data;
cur->function = function;
cur->loc = line;
cur->prev = pool->destructors;
pool->destructors = cur;
+ POOL_MTX_UNLOCK ();
}
}
struct _pool_chain_shared *cur_shared = pool->shared_pool, *tmp_shared;
struct _pool_destructors *destructor = pool->destructors;
+ POOL_MTX_LOCK ();
/* Call all pool destructors */
while (destructor) {
/* Avoid calling destructors for NULL pointers */
}
mem_pool_stat->pools_freed++;
+ POOL_MTX_UNLOCK ();
g_slice_free (memory_pool_t, pool);
}
struct _pool_chain_shared *shared_pool; /**< shared chain */
struct _pool_destructors *destructors; /**< destructors chain */
GHashTable *variables; /**< private memory pool variables */
+#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
+ GStaticMutex mtx; /**< threads lock */
+#else
+ GMutex mtx; /**< threads lock */
+#endif
} memory_pool_t;
/**
#include "smtp.h"
#include "smtp_proto.h"
-void
+gboolean
free_smtp_session (gpointer arg)
{
struct smtp_session *session = arg;
memory_pool_delete (session->pool);
g_free (session);
}
+
+ return TRUE;
}
gboolean
/*
* Called if all filters are processed
*/
-static void
+static gboolean
fin_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
/* Just process composites */
make_composites (task);
}
- /* Call post filters */
- lua_call_post_filters (task);
+ if (task->cfg->post_filters) {
+ /* More to process */
+ /* Special state */
+ task->state = WAIT_POST_FILTER;
+ return FALSE;
+ }
+
}
/* Check if we have all events finished */
- if (g_hash_table_size (task->s->events) == 0 && task->s->threads == 0) {
- task->state = WRITE_REPLY;
- if (task->fin_callback) {
- task->fin_callback (task->fin_arg);
- }
- else {
- rspamd_dispatcher_restore (task->dispatcher);
- }
+ task->state = WRITE_REPLY;
+ if (task->fin_callback) {
+ task->fin_callback (task->fin_arg);
+ }
+ else {
+ rspamd_dispatcher_restore (task->dispatcher);
}
+
+ return TRUE;
}
/*
{
struct worker_task *task = (struct worker_task *) arg;
- /* Special state */
- task->state = WAIT_POST_FILTER;
+ /* Call post filters */
+ lua_call_post_filters (task);
task->s->wanna_die = TRUE;
}