Browse Source

* More things to be thread-safe:

 - pool allocator is now thread-safe
 - lua subsystem now holds lock to avoid lua stack corruption
 - events subsystem now using conditional variables to wait for async_threads
 - insert_result is thread-safe now
tags/0.4.7
Vsevolod Stakhov 12 years ago
parent
commit
a5b48a05a9

+ 20
- 0
src/classifiers/bayes.c View File

@@ -165,6 +165,16 @@ bayes_classify_callback (gpointer key, gpointer value, gpointer data)
return FALSE;
}

#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
static void
bayes_mutex_free (gpointer data)
{
GMutex *mtx = data;

g_mutex_free (mtx);
}
#endif

struct classifier_ctx*
bayes_init (memory_pool_t *pool, struct classifier_config *cfg)
{
@@ -173,6 +183,13 @@ bayes_init (memory_pool_t *pool, struct classifier_config *cfg)
ctx->pool = pool;
ctx->cfg = cfg;
ctx->debug = FALSE;
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
ctx->mtx = g_mutex_new ();
memory_pool_add_destructor (pool, (pool_destruct_func) bayes_mutex_free, ctx->mtx);
#else
ctx->mtx = memory_pool_alloc (pool, sizeof (GMutex));
g_mutex_init (ctx->mtx);
#endif

return ctx;
}
@@ -205,6 +222,8 @@ bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input,
}
}

/* Critical section as here can be lua callbacks calling */
g_mutex_lock (ctx->mtx);
cur = call_classifier_pre_callbacks (ctx->cfg, task, FALSE, FALSE);
if (cur) {
memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_list_free, cur);
@@ -212,6 +231,7 @@ bayes_classify (struct classifier_ctx* ctx, statfile_pool_t *pool, GTree *input,
else {
cur = ctx->cfg->statfiles;
}
g_mutex_unlock (ctx->mtx);

data.statfiles_num = g_list_length (cur);
data.statfiles = g_new0 (struct bayes_statfile_data, data.statfiles_num);

+ 1
- 0
src/classifiers/classifiers.h View File

@@ -17,6 +17,7 @@ struct classifier_ctx {
GHashTable *results;
gboolean debug;
struct classifier_config *cfg;
GMutex *mtx;
};

struct classify_weight {

+ 4
- 2
src/controller.c View File

@@ -846,9 +846,9 @@ process_normal_command (const gchar *line)
}

/*
* Called if all filters are processed
* Called if all filters are processed, non-threaded and simple version
*/
static void
static gboolean
fin_learn_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
@@ -870,6 +870,8 @@ fin_learn_task (void *arg)
rspamd_dispatcher_restore (task->dispatcher);
}
}

return TRUE;
}

/*

+ 52
- 22
src/events.c View File

@@ -56,10 +56,18 @@ event_mutex_free (gpointer data)

g_mutex_free (mtx);
}

static void
event_cond_free (gpointer data)
{
GCond *cond = data;

g_cond_free (cond);
}
#endif

struct rspamd_async_session *
new_async_session (memory_pool_t * pool, event_finalizer_t fin,
new_async_session (memory_pool_t * pool, session_finalizer_t fin,
event_finalizer_t restore, event_finalizer_t cleanup, void *user_data)
{
struct rspamd_async_session *new;
@@ -74,10 +82,14 @@ new_async_session (memory_pool_t * pool, event_finalizer_t fin,
new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
new->mtx = g_mutex_new ();
new->cond = g_cond_new ();
memory_pool_add_destructor (pool, (pool_destruct_func) event_mutex_free, new->mtx);
memory_pool_add_destructor (pool, (pool_destruct_func) event_cond_free, new->cond);
#else
new->mtx = memory_pool_alloc (pool, sizeof (GMutex));
g_mutex_init (new->mtx);
new->cond = memory_pool_alloc (pool, sizeof (GCond));
g_cond_init (new->cond);
#endif
new->threads = 0;

@@ -96,6 +108,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
return;
}

g_mutex_lock (session->mtx);
new = memory_pool_alloc (session->pool, sizeof (struct rspamd_async_event));
new->fin = fin;
new->user_data = user_data;
@@ -106,6 +119,7 @@ register_async_event (struct rspamd_async_session *session, event_finalizer_t fi
msg_info ("added event: %p, pending %d events, subsystem: %s", user_data, g_hash_table_size (session->events),
g_quark_to_string (subsystem));
#endif
g_mutex_unlock (session->mtx);
}

void
@@ -118,6 +132,7 @@ remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin
return;
}

g_mutex_lock (session->mtx);
/* Search for event */
search_ev.fin = fin;
search_ev.user_data = ud;
@@ -130,13 +145,12 @@ remove_normal_event (struct rspamd_async_session *session, event_finalizer_t fin
/* Remove event */
fin (ud);
}


g_mutex_unlock (session->mtx);

check_session_pending (session);
}

static void
static gboolean
rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
{
struct rspamd_async_event *ev = v;
@@ -145,6 +159,8 @@ rspamd_session_destroy (gpointer k, gpointer v, gpointer unused)
if (ev->fin != NULL) {
ev->fin (ev->user_data);
}

return TRUE;
}

gboolean
@@ -155,36 +171,47 @@ destroy_session (struct rspamd_async_session *session)
return FALSE;
}

g_mutex_lock (session->mtx);
if (session->threads > 0) {
/* Wait for conditional variable to finish processing */
g_mutex_unlock (session->mtx);
g_cond_wait (session->cond, session->mtx);
}

session->wanna_die = TRUE;

g_hash_table_foreach (session->events, rspamd_session_destroy, session);
g_hash_table_foreach_remove (session->events, rspamd_session_destroy, session);

if (session->threads == 0) {
if (session->cleanup != NULL) {
session->cleanup (session->user_data);
}
return TRUE;
}
/* Mutex can be destroyed here */
g_mutex_unlock (session->mtx);

return FALSE;
if (session->cleanup != NULL) {
session->cleanup (session->user_data);
}
return TRUE;
}

gboolean
check_session_pending (struct rspamd_async_session *session)
{
g_mutex_lock (session->mtx);
if (session->wanna_die && session->threads == 0 && g_hash_table_size (session->events) == 0) {
if (session->wanna_die && g_hash_table_size (session->events) == 0) {
session->wanna_die = FALSE;
if (session->fin != NULL) {
session->fin (session->user_data);
if (session->threads > 0) {
/* Wait for conditional variable to finish processing */
g_cond_wait (session->cond, session->mtx);
}
/* Check events count again */
if (g_hash_table_size (session->events) != 0) {
if (session->restore != NULL) {
session->restore (session->user_data);
if (session->fin != NULL) {
if (! session->fin (session->user_data)) {
g_mutex_unlock (session->mtx);
/* Session finished incompletely, perform restoration */
if (session->restore != NULL) {
session->restore (session->user_data);
/* Call pending once more */
return check_session_pending (session);
}
return TRUE;
}
g_mutex_unlock (session->mtx);
return TRUE;
}
g_mutex_unlock (session->mtx);
return FALSE;
@@ -215,7 +242,10 @@ void
remove_async_thread (struct rspamd_async_session *session)
{
if (g_atomic_int_dec_and_test (&session->threads)) {
(void) check_session_pending (session);
/* Signal if there are any sessions waiting */
g_mutex_lock (session->mtx);
g_cond_signal (session->cond);
g_mutex_unlock (session->mtx);
}
#ifdef RSPAMD_EVENTS_DEBUG
msg_info ("removed thread: pending %d thread", session->threads);

+ 4
- 2
src/events.h View File

@@ -6,6 +6,7 @@
struct rspamd_async_event;

typedef void (*event_finalizer_t)(void *user_data);
typedef gboolean (*session_finalizer_t)(void *user_data);

struct rspamd_async_event {
GQuark subsystem;
@@ -15,7 +16,7 @@ struct rspamd_async_event {
};

struct rspamd_async_session {
event_finalizer_t fin;
session_finalizer_t fin;
event_finalizer_t restore;
event_finalizer_t cleanup;
GHashTable *events;
@@ -24,6 +25,7 @@ struct rspamd_async_session {
gboolean wanna_die;
guint threads;
GMutex *mtx;
GCond *cond;
};

/**
@@ -36,7 +38,7 @@ struct rspamd_async_session {
* @return
*/
struct rspamd_async_session *new_async_session (memory_pool_t *pool,
event_finalizer_t fin, event_finalizer_t restore,
session_finalizer_t fin, event_finalizer_t restore,
event_finalizer_t cleanup, void *user_data);

/**

+ 17
- 0
src/filter.c View File

@@ -140,6 +140,12 @@ insert_metric_result (struct worker_task *task, struct metric *metric, const gch
}

#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
static GStaticMutex result_mtx = G_STATIC_MUTEX_INIT;
#else
G_LOCK_DEFINE (result_mtx);
#endif

static void
insert_result_common (struct worker_task *task, const gchar *symbol, double flag, GList * opts, gboolean single)
{
@@ -147,6 +153,12 @@ insert_result_common (struct worker_task *task, const gchar *symbol, double flag
struct cache_item *item;
GList *cur, *metric_list;

/* Avoid concurrenting inserting of results */
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
g_static_mutex_lock (&result_mtx);
#else
G_LOCK (result_mtx);
#endif
metric_list = g_hash_table_lookup (task->cfg->metrics_symbols, symbol);
if (metric_list) {
cur = metric_list;
@@ -174,6 +186,11 @@ insert_result_common (struct worker_task *task, const gchar *symbol, double flag
/* XXX: it is not wise to destroy them here */
g_list_free (opts);
}
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
g_static_mutex_unlock (&result_mtx);
#else
G_UNLOCK (result_mtx);
#endif
}

/* Insert result that may be increased on next insertions */

+ 6
- 0
src/lua/lua_classifier.c View File

@@ -123,6 +123,7 @@ call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task
struct classifier_callback_data *cd;
lua_State *L;


/* Go throught all callbacks and call them, appending results to list */
cur = g_list_first (ccf->pre_callbacks);
while (cur) {
@@ -134,6 +135,7 @@ call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task
cur = g_list_next (cur);
}
g_mutex_lock (lua_mtx);
if (res == NULL) {
L = task->cfg->lua_state;
/* Check function from global table 'classifiers' */
@@ -149,6 +151,8 @@ call_classifier_pre_callbacks (struct classifier_config *ccf, struct worker_task
}
lua_pop (L, 1);
}
g_mutex_unlock (lua_mtx);

return res;
}

@@ -162,6 +166,7 @@ call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_tas
double out = in;
GList *cur;

g_mutex_lock (lua_mtx);
/* Go throught all callbacks and call them, appending results to list */
cur = g_list_first (ccf->pre_callbacks);
while (cur) {
@@ -190,6 +195,7 @@ call_classifier_post_callbacks (struct classifier_config *ccf, struct worker_tas

cur = g_list_next (cur);
}
g_mutex_unlock (lua_mtx);

return out;


+ 24
- 0
src/lua/lua_common.c View File

@@ -28,6 +28,9 @@
/* Lua module init function */
#define MODULE_INIT_FUNC "module_init"

/* Global lua mutex */
GMutex *lua_mtx = NULL;

const luaL_reg null_reg[] = {
{"__tostring", lua_class_tostring},
{NULL, NULL}
@@ -224,6 +227,13 @@ init_lua (struct config_file *cfg)
L = lua_open ();
luaL_openlibs (L);

#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
lua_mtx = g_mutex_new ();
#else
lua_mtx = g_malloc (sizeof (GMutex));
g_mutex_init (lua_mtx);
#endif

(void)luaopen_rspamd (L);
(void)luaopen_logger (L);
(void)luaopen_config (L);
@@ -320,6 +330,7 @@ lua_call_filter (const gchar *function, struct worker_task *task)
struct worker_task **ptask;
lua_State *L = task->cfg->lua_state;

g_mutex_lock (lua_mtx);
lua_getglobal (L, function);
ptask = lua_newuserdata (L, sizeof (struct worker_task *));
lua_setclass (L, "rspamd{task}", -1);
@@ -335,6 +346,8 @@ lua_call_filter (const gchar *function, struct worker_task *task)
}
result = lua_tonumber (L, -1);
lua_pop (L, 1); /* pop returned value */
g_mutex_unlock (lua_mtx);

return result;
}

@@ -345,6 +358,7 @@ lua_call_chain_filter (const gchar *function, struct worker_task *task, gint *ma
guint i;
lua_State *L = task->cfg->lua_state;

g_mutex_lock (lua_mtx);
lua_getglobal (L, function);

for (i = 0; i < number; i++) {
@@ -360,6 +374,8 @@ lua_call_chain_filter (const gchar *function, struct worker_task *task, gint *ma
}
result = lua_tonumber (L, -1);
lua_pop (L, 1); /* pop returned value */
g_mutex_unlock (lua_mtx);

return result;
}

@@ -374,6 +390,7 @@ lua_call_expression_func (const gchar *module, const gchar *function,
struct expression_argument *arg;
int nargs = 1, pop = 0;

g_mutex_lock (lua_mtx);
/* Call specified function and expect result of given expected_type */
/* First check function in config table */
lua_getglobal (L, "config");
@@ -435,6 +452,7 @@ lua_call_expression_func (const gchar *module, const gchar *function,

if (lua_pcall (L, nargs, 1, 0) != 0) {
msg_info ("call to %s failed: %s", function, lua_tostring (L, -1));
g_mutex_unlock (lua_mtx);
return FALSE;
}
pop ++;
@@ -442,11 +460,13 @@ lua_call_expression_func (const gchar *module, const gchar *function,
if (!lua_isboolean (L, -1)) {
lua_pop (L, pop);
msg_info ("function %s must return a boolean", function);
g_mutex_unlock (lua_mtx);
return FALSE;
}
*res = lua_toboolean (L, -1);
lua_pop (L, pop);

g_mutex_unlock (lua_mtx);
return TRUE;
}

@@ -468,6 +488,7 @@ lua_consolidation_callback (gpointer key, gpointer value, gpointer arg)
struct consolidation_callback_data *data = (struct consolidation_callback_data *)arg;
lua_State *L = data->task->cfg->lua_state;

g_mutex_lock (lua_mtx);
lua_getglobal (L, data->func);

lua_pushstring (L, (const gchar *)key);
@@ -483,6 +504,7 @@ lua_consolidation_callback (gpointer key, gpointer value, gpointer arg)
res = lua_tonumber (L, -1);
lua_pop (L, 1); /* pop returned value */
data->score += res;
g_mutex_unlock (lua_mtx);
}

double
@@ -519,6 +541,7 @@ lua_normalizer_func (struct config_file *cfg, long double score, void *params)
return score;
}

g_mutex_lock (lua_mtx);
lua_getglobal (L, p->data);
lua_pushnumber (L, score);

@@ -533,6 +556,7 @@ lua_normalizer_func (struct config_file *cfg, long double score, void *params)
res = lua_tonumber (L, -1);
lua_pop (L, 1);

g_mutex_unlock (lua_mtx);
return res;
}


+ 1
- 0
src/lua/lua_common.h View File

@@ -15,6 +15,7 @@
#define LUA_INTERFACE_DEF(class, name) { #name, lua_##class##_##name }

extern const luaL_reg null_reg[];
extern GMutex *lua_mtx;

#define RSPAMD_LUA_API_VERSION 9


+ 6
- 0
src/lua/lua_config.c View File

@@ -318,6 +318,7 @@ lua_config_function_callback (struct worker_task *task, GList *args, void *user_
GList *cur;
gboolean res = FALSE;

g_mutex_lock (lua_mtx);
if (cd->cb_is_ref) {
lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
@@ -347,6 +348,7 @@ lua_config_function_callback (struct worker_task *task, GList *args, void *user_
}
lua_pop (cd->L, 1);
}
g_mutex_unlock (lua_mtx);

return res;
}
@@ -439,6 +441,7 @@ lua_call_post_filters (struct worker_task *task)
struct worker_task **ptask;
GList *cur;

g_mutex_lock (lua_mtx);
cur = task->cfg->post_filters;
while (cur) {
cd = cur->data;
@@ -458,6 +461,7 @@ lua_call_post_filters (struct worker_task *task)
}
cur = g_list_next (cur);
}
g_mutex_unlock (lua_mtx);
}

static gint
@@ -583,6 +587,7 @@ lua_metric_symbol_callback (struct worker_task *task, gpointer ud)
struct lua_callback_data *cd = ud;
struct worker_task **ptask;

g_mutex_lock (lua_mtx);
if (cd->cb_is_ref) {
lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
@@ -597,6 +602,7 @@ lua_metric_symbol_callback (struct worker_task *task, gpointer ud)
msg_info ("call to %s failed: %s", cd->cb_is_ref ? "local function" :
cd->callback.name, lua_tostring (cd->L, -1));
}
g_mutex_unlock (lua_mtx);
}

static gint

+ 2
- 0
src/lua/lua_task.c View File

@@ -468,6 +468,7 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
union rspamd_reply_element *elt;
GList *cur;

g_mutex_lock (lua_mtx);
if (cd->cb_is_ref) {
lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
@@ -553,6 +554,7 @@ lua_dns_callback (struct rspamd_dns_reply *reply, gpointer arg)
if (cd->cb_is_ref) {
luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->callback.ref);
}
g_mutex_unlock (lua_mtx);
}

static gint

+ 26
- 0
src/mem_pool.c View File

@@ -41,6 +41,14 @@ pthread_mutex_t stat_mtx = PTHREAD_MUTEX_INITIALIZER;
# define STAT_UNLOCK() do {} while (0)
#endif

#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
# define POOL_MTX_LOCK() do { g_static_mutex_lock (&pool->mtx); } while (0)
# define POOL_MTX_UNLOCK() do { g_static_mutex_unlock (&pool->mtx); } while (0)
#else
# define POOL_MTX_LOCK() do { g_mutex_lock (&pool->mtx); } while (0)
# define POOL_MTX_UNLOCK() do { g_mutex_unlock (&pool->mtx); } while (0)
#endif

/*
* This define specify whether we should check all pools for free space for new object
* or just begin scan from current (recently attached) pool
@@ -101,6 +109,7 @@ pool_chain_new_shared (gsize size)
struct _pool_chain_shared *chain;
gpointer map;


#if defined(HAVE_MMAP_ANON)
map = mmap (NULL, size + sizeof (struct _pool_chain_shared), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0);
if (map == MAP_FAILED) {
@@ -189,6 +198,11 @@ memory_pool_new (gsize size)
new->destructors = NULL;
/* Set it upon first call of set variable */
new->variables = NULL;
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
new->mtx = G_STATIC_MUTEX_INIT;
#else
g_mutex_init (&new->mtx);
#endif

mem_pool_stat->pools_allocated++;

@@ -203,6 +217,7 @@ memory_pool_alloc (memory_pool_t * pool, gsize size)
gint free;

if (pool) {
POOL_MTX_LOCK ();
#ifdef MEMORY_GREEDY
cur = pool->first_pool;
#else
@@ -214,6 +229,7 @@ memory_pool_alloc (memory_pool_t * pool, gsize size)
}
if (free < (gint)size && cur->next == NULL) {
/* Allocate new pool */

if (cur->len >= size + MEM_ALIGNMENT) {
new = pool_chain_new (cur->len);
}
@@ -227,10 +243,12 @@ memory_pool_alloc (memory_pool_t * pool, gsize size)
/* No need to align again */
tmp = new->pos;
new->pos = tmp + size;
POOL_MTX_UNLOCK ();
return tmp;
}
tmp = align_ptr (cur->pos, MEM_ALIGNMENT);
cur->pos = tmp + size;
POOL_MTX_UNLOCK ();
return tmp;
}
return NULL;
@@ -317,6 +335,7 @@ memory_pool_alloc_shared (memory_pool_t * pool, gsize size)
if (pool) {
g_return_val_if_fail (size > 0, NULL);

POOL_MTX_LOCK ();
cur = pool->shared_pool;
if (!cur) {
cur = pool_chain_new_shared (pool->first_pool->len);
@@ -329,6 +348,7 @@ memory_pool_alloc_shared (memory_pool_t * pool, gsize size)
}
if (free < (gint)size && cur->next == NULL) {
/* Allocate new pool */

if (cur->len >= size + MEM_ALIGNMENT) {
new = pool_chain_new_shared (cur->len);
}
@@ -342,10 +362,12 @@ memory_pool_alloc_shared (memory_pool_t * pool, gsize size)
STAT_LOCK ();
mem_pool_stat->bytes_allocated += size;
STAT_UNLOCK ();
POOL_MTX_UNLOCK ();
return new->begin;
}
tmp = align_ptr (cur->pos, MEM_ALIGNMENT);
cur->pos = tmp + size;
POOL_MTX_UNLOCK ();
return tmp;
}
return NULL;
@@ -455,12 +477,14 @@ memory_pool_add_destructor_full (memory_pool_t * pool, pool_destruct_func func,

cur = memory_pool_alloc (pool, sizeof (struct _pool_destructors));
if (cur) {
POOL_MTX_LOCK ();
cur->func = func;
cur->data = data;
cur->function = function;
cur->loc = line;
cur->prev = pool->destructors;
pool->destructors = cur;
POOL_MTX_UNLOCK ();
}
}

@@ -488,6 +512,7 @@ memory_pool_delete (memory_pool_t * pool)
struct _pool_chain_shared *cur_shared = pool->shared_pool, *tmp_shared;
struct _pool_destructors *destructor = pool->destructors;

POOL_MTX_LOCK ();
/* Call all pool destructors */
while (destructor) {
/* Avoid calling destructors for NULL pointers */
@@ -522,6 +547,7 @@ memory_pool_delete (memory_pool_t * pool)
}

mem_pool_stat->pools_freed++;
POOL_MTX_UNLOCK ();
g_slice_free (memory_pool_t, pool);
}


+ 5
- 0
src/mem_pool.h View File

@@ -76,6 +76,11 @@ typedef struct memory_pool_s {
struct _pool_chain_shared *shared_pool; /**< shared chain */
struct _pool_destructors *destructors; /**< destructors chain */
GHashTable *variables; /**< private memory pool variables */
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
GStaticMutex mtx; /**< threads lock */
#else
GMutex mtx; /**< threads lock */
#endif
} memory_pool_t;

/**

+ 3
- 1
src/smtp_utils.c View File

@@ -28,7 +28,7 @@
#include "smtp.h"
#include "smtp_proto.h"

void
gboolean
free_smtp_session (gpointer arg)
{
struct smtp_session *session = arg;
@@ -56,6 +56,8 @@ free_smtp_session (gpointer arg)
memory_pool_delete (session->pool);
g_free (session);
}

return TRUE;
}

gboolean

+ 18
- 13
src/worker.c View File

@@ -613,7 +613,7 @@ err_socket (GError * err, void *arg)
/*
* Called if all filters are processed
*/
static void
static gboolean
fin_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
@@ -630,20 +630,25 @@ fin_task (void *arg)
/* Just process composites */
make_composites (task);
}
/* Call post filters */
lua_call_post_filters (task);
if (task->cfg->post_filters) {
/* More to process */
/* Special state */
task->state = WAIT_POST_FILTER;
return FALSE;
}

}

/* Check if we have all events finished */
if (g_hash_table_size (task->s->events) == 0 && task->s->threads == 0) {
task->state = WRITE_REPLY;
if (task->fin_callback) {
task->fin_callback (task->fin_arg);
}
else {
rspamd_dispatcher_restore (task->dispatcher);
}
task->state = WRITE_REPLY;
if (task->fin_callback) {
task->fin_callback (task->fin_arg);
}
else {
rspamd_dispatcher_restore (task->dispatcher);
}

return TRUE;
}

/*
@@ -654,8 +659,8 @@ restore_task (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;

/* Special state */
task->state = WAIT_POST_FILTER;
/* Call post filters */
lua_call_post_filters (task);
task->s->wanna_die = TRUE;
}


Loading…
Cancel
Save