From: Mikhail Galanin Date: Wed, 22 Aug 2018 14:58:22 +0000 (+0100) Subject: [Minor] Move resume/yield methods into appropriate place X-Git-Tag: 1.8.0~216^2~4 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=434446f72add375d0f7a94a94e4797d9bd6d1d6a;p=rspamd.git [Minor] Move resume/yield methods into appropriate place --- diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h index 0a9527bb0..b84701bc1 100644 --- a/src/lua/lua_common.h +++ b/src/lua/lua_common.h @@ -434,7 +434,7 @@ lua_yield_thread (struct thread_entry *thread_entry, gint nresults); * @param narg */ void -lua_resume_thread (struct rspamd_task *task, struct thread_entry *thread_entry, gint narg); +lua_resume_thread (struct thread_entry *thread_entry, gint narg); /* Paths defs */ #define RSPAMD_CONFDIR_INDEX "CONFDIR" diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c index 6957ded7c..629653e85 100644 --- a/src/lua/lua_config.c +++ b/src/lua/lua_config.c @@ -1145,7 +1145,7 @@ lua_watcher_callback (gpointer session_data, gpointer ud) rspamd_session_get_watcher (task->s)); } else { - res = lua_tonumber (L, level + 1); + res = (gint)lua_tonumber (L, level + 1); } if (res) { @@ -1192,18 +1192,9 @@ lua_watcher_callback (gpointer session_data, gpointer ud) lua_settop (L, err_idx - 1); } -gint -lua_do_resume (lua_State *L, gint narg) -{ -#if LUA_VERSION_NUM < 503 - return lua_resume (L, narg); -#else - return lua_resume (L, NULL, narg); -#endif -} +static void lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret); -static void -lua_metric_symbol_callback_return (struct rspamd_task *task, struct thread_entry *thread_entry, gpointer ud, gint ret); +static void lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg); static void lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) @@ -1211,7 +1202,6 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) struct lua_callback_data *cd = ud; struct rspamd_task **ptask; struct thread_entry *thread_entry; - gint ret; thread_entry = lua_thread_pool_get (task->cfg->lua_thread_pool); @@ -1232,16 +1222,11 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) rspamd_lua_setclass (thread, "rspamd{task}", -1); *ptask = task; - ret = lua_do_resume (thread, 1); + thread_entry->finish_callback = lua_metric_symbol_callback_return; + thread_entry->error_callback = lua_metric_symbol_callback_error; + thread_entry->task = task; - if (ret != LUA_YIELD) { - /* - LUA_YIELD state should not be handled here. - It should only happen when the thread initiated a asynchronous event and it will be restored as soon - the event is finished - */ - lua_metric_symbol_callback_return (task, thread_entry, ud, ret); - } + lua_thread_call (task->cfg->lua_thread_pool, thread_entry, 1); } gint @@ -1252,132 +1237,105 @@ lua_yield_thread (struct thread_entry *thread_entry, gint nresults) return lua_yield (thread_entry->lua_state, nresults); } -void -lua_resume_thread (struct rspamd_task *task, struct thread_entry *thread_entry, gint narg) +static void +lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg) { - g_assert (thread_entry->cd != NULL); - - /* - * The only state where we can resume from is LUA_YIELD - * Another acceptable status is OK (0) but in that case we should push function on stack - * to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function. - */ - g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD); - - gint ret; - - lua_thread_pool_set_running_entry (task->cfg->lua_thread_pool, thread_entry); - ret = lua_do_resume (thread_entry->lua_state, narg); - - if (ret != LUA_YIELD) { - lua_metric_symbol_callback_return (task, thread_entry, thread_entry->cd, ret); - } + struct lua_callback_data *cd = thread_entry->cd; + struct rspamd_task *task = thread_entry->task; + msg_err_task ("call to (%s) failed (%d): %s", cd->symbol, ret, msg); } static void -lua_metric_symbol_callback_return (struct rspamd_task *task, struct thread_entry *thread_entry, gpointer ud, gint ret) +lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret) { - GString *tb; - struct lua_callback_data *cd = ud; + struct lua_callback_data *cd = thread_entry->cd; + struct rspamd_task *task = thread_entry->task; int nresults; struct rspamd_symbol_result *s; - lua_State *thread = thread_entry->lua_state; - if (ret != 0) { + (void)ret; - tb = rspamd_lua_get_traceback_string (thread); - msg_err_task ("call to (%s) failed (%d): %v", cd->symbol, ret, tb); + lua_State *L = thread_entry->lua_state; - if (tb) { - g_string_free (tb, TRUE); - } - g_assert (lua_gettop (thread) >= cd->stack_level); - /* maybe there is a way to recover here. For now, just remove faulty thread */ - lua_thread_pool_terminate_entry (task->cfg->lua_thread_pool, thread_entry); - } - else { - nresults = lua_gettop (thread) - cd->stack_level; + nresults = lua_gettop (L) - cd->stack_level; - if (nresults >= 1) { - /* Function returned boolean, so maybe we need to insert result? */ - gint res = 0; - gint i; - gdouble flag = 1.0; - gint type; - struct lua_watcher_data *wd; + if (nresults >= 1) { + /* Function returned boolean, so maybe we need to insert result? */ + gint res = 0; + gint i; + gdouble flag = 1.0; + gint type; + struct lua_watcher_data *wd; - type = lua_type (thread, cd->stack_level + 1); + type = lua_type (L, cd->stack_level + 1); - if (type == LUA_TBOOLEAN) { - res = lua_toboolean (thread, cd->stack_level + 1); - } - else if (type == LUA_TFUNCTION) { - /* Function returned a closure that should be watched for */ - wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd)); - lua_pushvalue (thread /*cd->L*/, cd->stack_level + 1); - wd->cb_ref = luaL_ref (thread, LUA_REGISTRYINDEX); - wd->cbd = cd; - rspamd_session_watcher_push_callback (task->s, - rspamd_session_get_watcher (task->s), - lua_watcher_callback, wd); - /* - * We immediately pop watcher since we have not registered - * any async events from here - */ - rspamd_session_watcher_pop (task->s, - rspamd_session_get_watcher (task->s)); + if (type == LUA_TBOOLEAN) { + res = lua_toboolean (L, cd->stack_level + 1); + } + else if (type == LUA_TFUNCTION) { + /* Function returned a closure that should be watched for */ + wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd)); + lua_pushvalue (L /*cd->L*/, cd->stack_level + 1); + wd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX); + wd->cbd = cd; + rspamd_session_watcher_push_callback (task->s, + rspamd_session_get_watcher (task->s), + lua_watcher_callback, wd); + /* + * We immediately pop watcher since we have not registered + * any async events from here + */ + rspamd_session_watcher_pop (task->s, + rspamd_session_get_watcher (task->s)); + } + else { + res = lua_tonumber (L, cd->stack_level + 1); + } + + if (res) { + gint first_opt = 2; + + if (lua_type (L, cd->stack_level + 2) == LUA_TNUMBER) { + flag = lua_tonumber (L, cd->stack_level + 2); + /* Shift opt index */ + first_opt = 3; } else { - res = lua_tonumber (thread, cd->stack_level + 1); + flag = res; } - if (res) { - gint first_opt = 2; + s = rspamd_task_insert_result (task, cd->symbol, flag, NULL); - if (lua_type (thread, cd->stack_level + 2) == LUA_TNUMBER) { - flag = lua_tonumber (thread, cd->stack_level + 2); - /* Shift opt index */ - first_opt = 3; - } - else { - flag = res; - } + if (s) { + guint last_pos = lua_gettop (L); - s = rspamd_task_insert_result (task, cd->symbol, flag, NULL); + for (i = cd->stack_level + first_opt; i <= last_pos; i++) { + if (lua_type (L, i) == LUA_TSTRING) { + const char *opt = lua_tostring (L, i); - if (s) { - guint last_pos = lua_gettop (thread); + rspamd_task_add_result_option (task, s, opt); + } + else if (lua_type (L, i) == LUA_TTABLE) { + lua_pushvalue (L, i); - for (i = cd->stack_level + first_opt; i <= last_pos; i++) { - if (lua_type (thread, i) == LUA_TSTRING) { - const char *opt = lua_tostring (thread, i); + for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) { + const char *opt = lua_tostring (L, -1); rspamd_task_add_result_option (task, s, opt); } - else if (lua_type (thread, i) == LUA_TTABLE) { - lua_pushvalue (thread, i); - for (lua_pushnil (thread); lua_next (thread, -2); lua_pop (thread, 1)) { - const char *opt = lua_tostring (thread, -1); - - rspamd_task_add_result_option (task, s, opt); - } - - lua_pop (thread, 1); - } + lua_pop (L, 1); } } - } - lua_pop (thread, nresults); } - g_assert (lua_gettop (thread) == cd->stack_level); /* we properly cleaned up the stack */ - - lua_thread_pool_return (task->cfg->lua_thread_pool, thread_entry); + lua_pop (L, nresults); } + g_assert (lua_gettop (L) == cd->stack_level); /* we properly cleaned up the stack */ + cd->stack_level = 0; } diff --git a/src/lua/lua_dns.c b/src/lua/lua_dns.c index 0d12a137c..805921a03 100644 --- a/src/lua/lua_dns.c +++ b/src/lua/lua_dns.c @@ -147,7 +147,7 @@ lua_dns_callback (struct rdns_reply *reply, void *arg) lua_pushvalue (L, -3); } - lua_resume_thread (cbdata->task, cbdata->thread, 2); + lua_resume_thread (cbdata->thread, 2); if (cbdata->s) { rspamd_session_watcher_pop (cbdata->s, cbdata->w); diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c index 979b31f6b..3525879dd 100644 --- a/src/lua/lua_thread_pool.c +++ b/src/lua/lua_thread_pool.c @@ -91,6 +91,10 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa if (g_queue_get_length (pool->available_items) <= pool->max_items) { thread_entry->cd = NULL; + thread_entry->finish_callback = NULL; + thread_entry->task = NULL; + thread_entry->cfg = NULL; + g_queue_push_head (pool->available_items, thread_entry); } else { @@ -146,3 +150,88 @@ lua_thread_pool_restore_callback (struct lua_callback_state *cbs) lua_thread_pool_return (cbs->thread_pool, cbs->my_thread); lua_thread_pool_set_running_entry (cbs->thread_pool, cbs->previous_thread); } + +static gint +lua_do_resume (lua_State *L, gint narg) +{ +#if LUA_VERSION_NUM < 503 + return lua_resume (L, narg); +#else + return lua_resume (L, NULL, narg); +#endif +} + +static void lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg); + +void +lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg) +{ + g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't call running/yielded thread */ + g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call running/yielded thread */ + + lua_resume_thread_internal (thread_entry, narg); +} + +void +lua_resume_thread (struct thread_entry *thread_entry, gint narg) +{ + /* + * The only state where we can resume from is LUA_YIELD + * Another acceptable status is OK (0) but in that case we should push function on stack + * to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function. + */ + g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD); + + lua_resume_thread_internal (thread_entry, narg); +} + +static void +lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg) +{ + gint ret; + struct lua_thread_pool *pool; + GString *tb; + struct rspamd_task *task; + + ret = lua_do_resume (thread_entry->lua_state, narg); + + if (ret != LUA_YIELD) { + /* + LUA_YIELD state should not be handled here. + It should only happen when the thread initiated a asynchronous event and it will be restored as soon + the event is finished + */ + + if (thread_entry->task) { + pool = thread_entry->task->cfg->lua_thread_pool; + } + else { + pool = thread_entry->cfg->lua_thread_pool; + } + if (ret == 0) { + if (thread_entry->finish_callback) { + thread_entry->finish_callback (thread_entry, ret); + } + lua_thread_pool_return (pool, thread_entry); + } + else { + tb = rspamd_lua_get_traceback_string (thread_entry->lua_state); + if (thread_entry->error_callback) { + thread_entry->error_callback (thread_entry, ret, tb->str); + } + else if (thread_entry->task) { + task = thread_entry->task; + msg_err_task ("lua call failed (%d): %v", ret, tb); + } + else { + msg_err ("lua call failed (%d): %v", ret, tb); + } + + if (tb) { + g_string_free (tb, TRUE); + } + /* maybe there is a way to recover here. For now, just remove faulty thread */ + lua_thread_pool_terminate_entry (pool, thread_entry); + } + } +} \ No newline at end of file diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h index b72b72e8d..e5b2f2873 100644 --- a/src/lua/lua_thread_pool.h +++ b/src/lua/lua_thread_pool.h @@ -3,13 +3,25 @@ #include +struct thread_entry; +struct lua_thread_pool; + +typedef void (*lua_thread_finish_t) (struct thread_entry *thread, int ret); +typedef void (*lua_thread_error_t) (struct thread_entry *thread, int ret, const char *msg); + struct thread_entry { lua_State *lua_state; gint thread_index; gpointer cd; -}; -struct thread_pool; + /* function to handle result of called method, can be NULL */ + lua_thread_finish_t finish_callback; + + /* function to log result, i.e. if you want to modify error logging message or somehow process this state, can be NUL */ + lua_thread_error_t error_callback; + struct rspamd_task *task; + struct rspamd_config *cfg; +}; struct lua_callback_state { lua_State *L; @@ -102,5 +114,9 @@ lua_thread_pool_prepare_callback (struct lua_thread_pool *pool, struct lua_callb void lua_thread_pool_restore_callback (struct lua_callback_state *cbs); + +void +lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg); + #endif /* LUA_THREAD_POOL_H_ */