@@ -1197,7 +1197,7 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) | |||
struct rspamd_task **ptask; | |||
gint ret; | |||
struct thread_entry *thread_entry = lua_thread_pool_get(task->cfg->lua_thread_pool); | |||
struct thread_entry *thread_entry = lua_thread_pool_get (task->cfg->lua_thread_pool); | |||
cd->thread_entry = thread_entry; | |||
lua_State *thread = thread_entry->lua_state; | |||
@@ -1216,9 +1216,12 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) | |||
ret = lua_resume (thread, 1); | |||
if (ret == LUA_YIELD) { | |||
msg_err_task ("LUA_YIELD"); | |||
} else { | |||
if (ret != LUA_YIELD) { | |||
/* | |||
LUA_YIELD state should not be handled here. | |||
It should only happen when the thread initiated a asynchronous event and it will be restored as soon | |||
the event is finished | |||
*/ | |||
lua_metric_symbol_callback_return (task, ud, ret); | |||
} | |||
} | |||
@@ -1235,7 +1238,7 @@ lua_metric_symbol_callback_return (struct rspamd_task *task, gpointer ud, gint r | |||
if (ret != 0) { | |||
lua_pushcfunction (thread, rspamd_lua_traceback); | |||
lua_call (thread, 0, 1); | |||
lua_call (thread, 0, LUA_MULTRET); | |||
tb = lua_touserdata (thread, -1); | |||
msg_err_task ("call to (%s) failed (%d): %v", cd->symbol, ret, tb); | |||
@@ -1244,8 +1247,8 @@ lua_metric_symbol_callback_return (struct rspamd_task *task, gpointer ud, gint r | |||
g_string_free (tb, TRUE); | |||
lua_pop (thread, 1); | |||
} | |||
assert (lua_gettop (thread) >= cd->stack_level); | |||
// maybe there is a way to recover here. For now, just remove foulty thread | |||
g_assert (lua_gettop (thread) >= cd->stack_level); | |||
// maybe there is a way to recover here. For now, just remove faulty thread | |||
lua_thread_pool_terminate_entry (task->cfg->lua_thread_pool, cd->thread_entry); | |||
} | |||
else { | |||
@@ -1326,7 +1329,7 @@ lua_metric_symbol_callback_return (struct rspamd_task *task, gpointer ud, gint r | |||
lua_pop (thread, nresults); | |||
} | |||
assert (lua_gettop (thread) == cd->stack_level); /* we properly cleaned up the stack */ | |||
g_assert (lua_gettop (thread) == cd->stack_level); /* we properly cleaned up the stack */ | |||
lua_thread_pool_return(task->cfg->lua_thread_pool, cd->thread_entry); | |||
} |
@@ -1,7 +1,5 @@ | |||
#include "config.h" | |||
#include <assert.h> | |||
#include "lua_common.h" | |||
#include "lua_thread_pool.h" | |||
@@ -19,6 +17,7 @@ thread_entry_new (lua_State * L) | |||
ent = g_malloc (sizeof *ent); | |||
ent->lua_state = lua_newthread (L); | |||
ent->thread_index = luaL_ref (L, LUA_REGISTRYINDEX); | |||
return ent; | |||
} | |||
@@ -41,7 +40,7 @@ lua_thread_pool_new (lua_State * L) | |||
int i; | |||
struct thread_entry *ent; | |||
for (i = 0; i < pool->max_items; i ++) { | |||
for (i = 0; i < MAX(2, pool->max_items / 10); i ++) { | |||
ent = thread_entry_new (pool->L); | |||
g_queue_push_head (pool->available_items, ent); | |||
} | |||
@@ -75,16 +74,19 @@ lua_thread_pool_get(struct lua_thread_pool *pool) | |||
else { | |||
ent = thread_entry_new (pool->L); | |||
} | |||
return ent; | |||
} | |||
void | |||
lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry) | |||
{ | |||
assert (lua_status (thread_entry->lua_state) == 0); // we can't return a running/yielded stack into the pool | |||
g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't return a running/yielded thread into the pool */ | |||
if (pool->running_entry == thread_entry) { | |||
pool->running_entry = NULL; | |||
} | |||
if (g_queue_get_length (pool->available_items) <= pool->max_items) { | |||
g_queue_push_head (pool->available_items, thread_entry); | |||
} | |||
@@ -98,12 +100,13 @@ lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entr | |||
{ | |||
struct thread_entry *ent = NULL; | |||
/* we should only terminate failed threads */ | |||
g_assert (lua_status (thread_entry->lua_state) != 0 && lua_status (thread_entry->lua_state) != LUA_YIELD); | |||
if (pool->running_entry == thread_entry) { | |||
pool->running_entry = NULL; | |||
} | |||
// we should only terminate failed threads | |||
assert (lua_status (thread_entry->lua_state) != 0 && lua_status (thread_entry->lua_state) != LUA_YIELD); | |||
thread_entry_free (pool->L, thread_entry); | |||
if (g_queue_get_length (pool->available_items) <= pool->max_items) { |
@@ -10,24 +10,70 @@ struct thread_entry { | |||
struct thread_pool; | |||
/** | |||
* Allocates new thread pool on state L. Pre-creates number of lua-threads to use later on | |||
* | |||
* @param L | |||
* @return | |||
*/ | |||
struct lua_thread_pool * | |||
lua_thread_pool_new (lua_State * L); | |||
/** | |||
* Destroys the pool | |||
* @param pool | |||
*/ | |||
void | |||
lua_thread_pool_free (struct lua_thread_pool *pool); | |||
/** | |||
* Extracts a thread from the list of available ones. | |||
* It immediately becames running one and should be used to run a Lua script/function straight away. | |||
* as soon as the code is finished, it should be either returned into list of available threads by | |||
* calling lua_thread_pool_return() or terminated by calling lua_thread_pool_terminate_entry() | |||
* if the code finished with error. | |||
* | |||
* If the code performed YIELD, the thread is still running and it's live should be controlled by the callee | |||
* | |||
* @param pool | |||
* @return | |||
*/ | |||
struct thread_entry * | |||
lua_thread_pool_get(struct lua_thread_pool *pool); | |||
/** | |||
* Return thread into the list of available ones. It can't be done with yielded or dead threads. | |||
* | |||
* @param pool | |||
* @param thread_entry | |||
*/ | |||
void | |||
lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry); | |||
/** | |||
* Removes thread from Lua state. It should be done to dead (which ended with an error) threads only | |||
* | |||
* @param pool | |||
* @param thread_entry | |||
*/ | |||
void | |||
lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry); | |||
/** | |||
* Currently running thread. Typically needed in yielding point - to fill-up continuation. | |||
* | |||
* @param pool | |||
* @return | |||
*/ | |||
struct thread_entry * | |||
lua_thread_pool_get_running_entry(struct lua_thread_pool *pool); | |||
/** | |||
* Updates currently running thread | |||
* | |||
* @param pool | |||
* @param thread_entry | |||
*/ | |||
void | |||
lua_thread_pool_set_running_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry); | |||