@@ -417,25 +417,6 @@ void rspamd_lua_add_ref_dtor (lua_State *L, rspamd_mempool_t *pool, | |||
gboolean rspamd_lua_require_function (lua_State *L, const gchar *modname, | |||
const gchar *funcname); | |||
struct thread_entry; | |||
/** | |||
* Yields thread. should be only called in return statement | |||
* @param thread_entry | |||
* @param nresults | |||
* @return | |||
*/ | |||
gint | |||
lua_yield_thread (struct thread_entry *thread_entry, gint nresults); | |||
/** | |||
* Resumes suspended by lua_yield_thread () thread | |||
* @param task | |||
* @param thread_entry | |||
* @param narg | |||
*/ | |||
void | |||
lua_resume_thread (struct thread_entry *thread_entry, gint narg); | |||
/* Paths defs */ | |||
#define RSPAMD_CONFDIR_INDEX "CONFDIR" | |||
#define RSPAMD_RUNDIR_INDEX "RUNDIR" |
@@ -1203,7 +1203,7 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) | |||
struct rspamd_task **ptask; | |||
struct thread_entry *thread_entry; | |||
thread_entry = lua_thread_pool_get (task->cfg->lua_thread_pool); | |||
thread_entry = lua_thread_pool_get_for_task (task); | |||
g_assert(thread_entry->cd == NULL); | |||
thread_entry->cd = cd; | |||
@@ -1224,19 +1224,10 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) | |||
thread_entry->finish_callback = lua_metric_symbol_callback_return; | |||
thread_entry->error_callback = lua_metric_symbol_callback_error; | |||
thread_entry->task = task; | |||
lua_thread_call (thread_entry, 1); | |||
} | |||
gint | |||
lua_yield_thread (struct thread_entry *thread_entry, gint nresults) | |||
{ | |||
g_assert (thread_entry->cd != NULL); | |||
return lua_yield (thread_entry->lua_state, nresults); | |||
} | |||
static void | |||
lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg) | |||
{ |
@@ -117,7 +117,7 @@ lua_dns_request (lua_State *L) | |||
cbdata->s = session; | |||
cbdata->w = rspamd_session_get_watcher (session); | |||
rspamd_session_watcher_push (session); | |||
return lua_yield_thread (cbdata->thread, 0); | |||
return lua_thread_yield (cbdata->thread, 0); | |||
} | |||
else { | |||
lua_pushnil (L); | |||
@@ -147,7 +147,7 @@ lua_dns_callback (struct rdns_reply *reply, void *arg) | |||
lua_pushvalue (L, -3); | |||
} | |||
lua_resume_thread (cbdata->thread, 2); | |||
lua_thread_resume (cbdata->thread, 2); | |||
if (cbdata->s) { | |||
rspamd_session_watcher_pop (cbdata->s, cbdata->w); |
@@ -274,7 +274,6 @@ lua_http_resume_handler (struct rspamd_http_connection *conn, | |||
gsize body_len; | |||
struct rspamd_http_header *h, *htmp; | |||
msg_info ("T=%p, L=%p, status=%d, err=%s", cbd->thread, cbd->thread->lua_state, lua_status (cbd->thread->lua_state), err); | |||
if (err) { | |||
lua_pushstring (L, err); | |||
lua_pushnil (L); | |||
@@ -336,7 +335,7 @@ lua_http_resume_handler (struct rspamd_http_connection *conn, | |||
lua_settable (L, -3); | |||
} | |||
lua_resume_thread (cbd->thread, 2); | |||
lua_thread_resume (cbd->thread, 2); | |||
} | |||
static gboolean | |||
@@ -896,7 +895,7 @@ lua_http_request (lua_State *L) | |||
if (cbd->cbref == -1) { | |||
cbd->thread = lua_thread_pool_get_running_entry (cfg->lua_thread_pool); | |||
return lua_yield_thread (cbd->thread, 0); | |||
return lua_thread_yield (cbd->thread, 0); | |||
} else { | |||
lua_pushboolean (L, TRUE); | |||
return 1; |
@@ -60,12 +60,35 @@ lua_thread_pool_free (struct lua_thread_pool *pool) | |||
g_free (pool); | |||
} | |||
struct thread_entry *lua_thread_pool_get_for_config (struct rspamd_config *cfg); | |||
static struct thread_entry *lua_thread_pool_get (struct lua_thread_pool *pool); | |||
struct thread_entry * | |||
lua_thread_pool_get_for_task (struct rspamd_task *task) | |||
{ | |||
struct thread_entry *ent = lua_thread_pool_get (task->cfg->lua_thread_pool); | |||
ent->task = task; | |||
return ent; | |||
} | |||
struct thread_entry * | |||
lua_thread_pool_get_for_config (struct rspamd_config *cfg) | |||
{ | |||
struct thread_entry *ent = lua_thread_pool_get (cfg->lua_thread_pool); | |||
ent->cfg = cfg; | |||
return ent; | |||
} | |||
static struct thread_entry * | |||
lua_thread_pool_get (struct lua_thread_pool *pool) | |||
{ | |||
gpointer cur; | |||
struct thread_entry *ent = NULL; | |||
cur = g_queue_pop_head (pool->available_items); | |||
if (cur) { | |||
@@ -174,7 +197,7 @@ lua_thread_call (struct thread_entry *thread_entry, int narg) | |||
} | |||
void | |||
lua_resume_thread (struct thread_entry *thread_entry, gint narg) | |||
lua_thread_resume (struct thread_entry *thread_entry, gint narg) | |||
{ | |||
/* | |||
* The only state where we can resume from is LUA_YIELD | |||
@@ -235,4 +258,10 @@ lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg) | |||
lua_thread_pool_terminate_entry (pool, thread_entry); | |||
} | |||
} | |||
} | |||
} | |||
gint | |||
lua_thread_yield (struct thread_entry *thread_entry, gint nresults) | |||
{ | |||
return lua_yield (thread_entry->lua_state, nresults); | |||
} |
@@ -55,11 +55,20 @@ lua_thread_pool_free (struct lua_thread_pool *pool); | |||
* | |||
* If the code performed YIELD, the thread is still running and it's live should be controlled by the callee | |||
* | |||
* @param pool | |||
* @param task | |||
* @return | |||
*/ | |||
struct thread_entry * | |||
lua_thread_pool_get(struct lua_thread_pool *pool); | |||
lua_thread_pool_get_for_task (struct rspamd_task *task); | |||
/** | |||
* The same, but used when task is not available | |||
* | |||
* @param cfg | |||
* @return | |||
*/ | |||
struct thread_entry * | |||
lua_thread_pool_get_for_config (struct rspamd_config *cfg); | |||
/** | |||
* Return thread into the list of available ones. It can't be done with yielded or dead threads. | |||
@@ -116,5 +125,23 @@ lua_thread_pool_restore_callback (struct lua_callback_state *cbs); | |||
void | |||
lua_thread_call (struct thread_entry *thread_entry, int narg); | |||
/** | |||
* Yields thread. should be only called in return statement | |||
* @param thread_entry | |||
* @param nresults | |||
* @return | |||
*/ | |||
int | |||
lua_thread_yield (struct thread_entry *thread_entry, int nresults); | |||
/** | |||
* Resumes suspended by lua_yield_thread () thread | |||
* @param task | |||
* @param thread_entry | |||
* @param narg | |||
*/ | |||
void | |||
lua_thread_resume (struct thread_entry *thread_entry, int narg); | |||
#endif /* LUA_THREAD_POOL_H_ */ | |||
@@ -72,7 +72,7 @@ test_resume_get_thread(gint function_call) | |||
t1 = rspamd_get_virtual_ticks (); | |||
for (i = 0; i < N; i ++) { | |||
ent = lua_thread_pool_get (rspamd_main->cfg->lua_thread_pool); | |||
ent = lua_thread_pool_get_for_config (rspamd_main->cfg); | |||
lua_rawgeti (ent->lua_state, LUA_REGISTRYINDEX, function_call); | |||
lua_resume (ent->lua_state, 0); | |||
@@ -96,7 +96,7 @@ test_resume_get_new_thread(gint function_call) | |||
t1 = rspamd_get_virtual_ticks (); | |||
for (i = 0; i < N; i ++) { | |||
ent = lua_thread_pool_get (rspamd_main->cfg->lua_thread_pool); | |||
ent = lua_thread_pool_get_for_task (rspamd_main->cfg->lua_thread_pool); | |||
lua_rawgeti (ent->lua_state, LUA_REGISTRYINDEX, function_call); | |||
lua_resume (ent->lua_state, 0); |