aboutsummaryrefslogtreecommitdiffstats
path: root/src/lua
diff options
context:
space:
mode:
Diffstat (limited to 'src/lua')
-rw-r--r--src/lua/lua_common.h2
-rw-r--r--src/lua/lua_config.c188
-rw-r--r--src/lua/lua_dns.c2
-rw-r--r--src/lua/lua_thread_pool.c89
-rw-r--r--src/lua/lua_thread_pool.h20
5 files changed, 182 insertions, 119 deletions
diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h
index 0a9527bb0..b84701bc1 100644
--- a/src/lua/lua_common.h
+++ b/src/lua/lua_common.h
@@ -434,7 +434,7 @@ lua_yield_thread (struct thread_entry *thread_entry, gint nresults);
* @param narg
*/
void
-lua_resume_thread (struct rspamd_task *task, struct thread_entry *thread_entry, gint narg);
+lua_resume_thread (struct thread_entry *thread_entry, gint narg);
/* Paths defs */
#define RSPAMD_CONFDIR_INDEX "CONFDIR"
diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c
index 6957ded7c..629653e85 100644
--- a/src/lua/lua_config.c
+++ b/src/lua/lua_config.c
@@ -1145,7 +1145,7 @@ lua_watcher_callback (gpointer session_data, gpointer ud)
rspamd_session_get_watcher (task->s));
}
else {
- res = lua_tonumber (L, level + 1);
+ res = (gint)lua_tonumber (L, level + 1);
}
if (res) {
@@ -1192,18 +1192,9 @@ lua_watcher_callback (gpointer session_data, gpointer ud)
lua_settop (L, err_idx - 1);
}
-gint
-lua_do_resume (lua_State *L, gint narg)
-{
-#if LUA_VERSION_NUM < 503
- return lua_resume (L, narg);
-#else
- return lua_resume (L, NULL, narg);
-#endif
-}
+static void lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret);
-static void
-lua_metric_symbol_callback_return (struct rspamd_task *task, struct thread_entry *thread_entry, gpointer ud, gint ret);
+static void lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg);
static void
lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
@@ -1211,7 +1202,6 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
struct lua_callback_data *cd = ud;
struct rspamd_task **ptask;
struct thread_entry *thread_entry;
- gint ret;
thread_entry = lua_thread_pool_get (task->cfg->lua_thread_pool);
@@ -1232,16 +1222,11 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
rspamd_lua_setclass (thread, "rspamd{task}", -1);
*ptask = task;
- ret = lua_do_resume (thread, 1);
+ thread_entry->finish_callback = lua_metric_symbol_callback_return;
+ thread_entry->error_callback = lua_metric_symbol_callback_error;
+ thread_entry->task = task;
- if (ret != LUA_YIELD) {
- /*
- LUA_YIELD state should not be handled here.
- It should only happen when the thread initiated a asynchronous event and it will be restored as soon
- the event is finished
- */
- lua_metric_symbol_callback_return (task, thread_entry, ud, ret);
- }
+ lua_thread_call (task->cfg->lua_thread_pool, thread_entry, 1);
}
gint
@@ -1252,132 +1237,105 @@ lua_yield_thread (struct thread_entry *thread_entry, gint nresults)
return lua_yield (thread_entry->lua_state, nresults);
}
-void
-lua_resume_thread (struct rspamd_task *task, struct thread_entry *thread_entry, gint narg)
+static void
+lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg)
{
- g_assert (thread_entry->cd != NULL);
-
- /*
- * The only state where we can resume from is LUA_YIELD
- * Another acceptable status is OK (0) but in that case we should push function on stack
- * to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function.
- */
- g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD);
-
- gint ret;
-
- lua_thread_pool_set_running_entry (task->cfg->lua_thread_pool, thread_entry);
- ret = lua_do_resume (thread_entry->lua_state, narg);
-
- if (ret != LUA_YIELD) {
- lua_metric_symbol_callback_return (task, thread_entry, thread_entry->cd, ret);
- }
+ struct lua_callback_data *cd = thread_entry->cd;
+ struct rspamd_task *task = thread_entry->task;
+ msg_err_task ("call to (%s) failed (%d): %s", cd->symbol, ret, msg);
}
static void
-lua_metric_symbol_callback_return (struct rspamd_task *task, struct thread_entry *thread_entry, gpointer ud, gint ret)
+lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret)
{
- GString *tb;
- struct lua_callback_data *cd = ud;
+ struct lua_callback_data *cd = thread_entry->cd;
+ struct rspamd_task *task = thread_entry->task;
int nresults;
struct rspamd_symbol_result *s;
- lua_State *thread = thread_entry->lua_state;
- if (ret != 0) {
+ (void)ret;
- tb = rspamd_lua_get_traceback_string (thread);
- msg_err_task ("call to (%s) failed (%d): %v", cd->symbol, ret, tb);
+ lua_State *L = thread_entry->lua_state;
- if (tb) {
- g_string_free (tb, TRUE);
- }
- g_assert (lua_gettop (thread) >= cd->stack_level);
- /* maybe there is a way to recover here. For now, just remove faulty thread */
- lua_thread_pool_terminate_entry (task->cfg->lua_thread_pool, thread_entry);
- }
- else {
- nresults = lua_gettop (thread) - cd->stack_level;
+ nresults = lua_gettop (L) - cd->stack_level;
- if (nresults >= 1) {
- /* Function returned boolean, so maybe we need to insert result? */
- gint res = 0;
- gint i;
- gdouble flag = 1.0;
- gint type;
- struct lua_watcher_data *wd;
+ if (nresults >= 1) {
+ /* Function returned boolean, so maybe we need to insert result? */
+ gint res = 0;
+ gint i;
+ gdouble flag = 1.0;
+ gint type;
+ struct lua_watcher_data *wd;
- type = lua_type (thread, cd->stack_level + 1);
+ type = lua_type (L, cd->stack_level + 1);
- if (type == LUA_TBOOLEAN) {
- res = lua_toboolean (thread, cd->stack_level + 1);
- }
- else if (type == LUA_TFUNCTION) {
- /* Function returned a closure that should be watched for */
- wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd));
- lua_pushvalue (thread /*cd->L*/, cd->stack_level + 1);
- wd->cb_ref = luaL_ref (thread, LUA_REGISTRYINDEX);
- wd->cbd = cd;
- rspamd_session_watcher_push_callback (task->s,
- rspamd_session_get_watcher (task->s),
- lua_watcher_callback, wd);
- /*
- * We immediately pop watcher since we have not registered
- * any async events from here
- */
- rspamd_session_watcher_pop (task->s,
- rspamd_session_get_watcher (task->s));
+ if (type == LUA_TBOOLEAN) {
+ res = lua_toboolean (L, cd->stack_level + 1);
+ }
+ else if (type == LUA_TFUNCTION) {
+ /* Function returned a closure that should be watched for */
+ wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd));
+ lua_pushvalue (L /*cd->L*/, cd->stack_level + 1);
+ wd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX);
+ wd->cbd = cd;
+ rspamd_session_watcher_push_callback (task->s,
+ rspamd_session_get_watcher (task->s),
+ lua_watcher_callback, wd);
+ /*
+ * We immediately pop watcher since we have not registered
+ * any async events from here
+ */
+ rspamd_session_watcher_pop (task->s,
+ rspamd_session_get_watcher (task->s));
+ }
+ else {
+ res = lua_tonumber (L, cd->stack_level + 1);
+ }
+
+ if (res) {
+ gint first_opt = 2;
+
+ if (lua_type (L, cd->stack_level + 2) == LUA_TNUMBER) {
+ flag = lua_tonumber (L, cd->stack_level + 2);
+ /* Shift opt index */
+ first_opt = 3;
}
else {
- res = lua_tonumber (thread, cd->stack_level + 1);
+ flag = res;
}
- if (res) {
- gint first_opt = 2;
+ s = rspamd_task_insert_result (task, cd->symbol, flag, NULL);
- if (lua_type (thread, cd->stack_level + 2) == LUA_TNUMBER) {
- flag = lua_tonumber (thread, cd->stack_level + 2);
- /* Shift opt index */
- first_opt = 3;
- }
- else {
- flag = res;
- }
+ if (s) {
+ guint last_pos = lua_gettop (L);
- s = rspamd_task_insert_result (task, cd->symbol, flag, NULL);
+ for (i = cd->stack_level + first_opt; i <= last_pos; i++) {
+ if (lua_type (L, i) == LUA_TSTRING) {
+ const char *opt = lua_tostring (L, i);
- if (s) {
- guint last_pos = lua_gettop (thread);
+ rspamd_task_add_result_option (task, s, opt);
+ }
+ else if (lua_type (L, i) == LUA_TTABLE) {
+ lua_pushvalue (L, i);
- for (i = cd->stack_level + first_opt; i <= last_pos; i++) {
- if (lua_type (thread, i) == LUA_TSTRING) {
- const char *opt = lua_tostring (thread, i);
+ for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) {
+ const char *opt = lua_tostring (L, -1);
rspamd_task_add_result_option (task, s, opt);
}
- else if (lua_type (thread, i) == LUA_TTABLE) {
- lua_pushvalue (thread, i);
- for (lua_pushnil (thread); lua_next (thread, -2); lua_pop (thread, 1)) {
- const char *opt = lua_tostring (thread, -1);
-
- rspamd_task_add_result_option (task, s, opt);
- }
-
- lua_pop (thread, 1);
- }
+ lua_pop (L, 1);
}
}
-
}
- lua_pop (thread, nresults);
}
- g_assert (lua_gettop (thread) == cd->stack_level); /* we properly cleaned up the stack */
-
- lua_thread_pool_return (task->cfg->lua_thread_pool, thread_entry);
+ lua_pop (L, nresults);
}
+ g_assert (lua_gettop (L) == cd->stack_level); /* we properly cleaned up the stack */
+
cd->stack_level = 0;
}
diff --git a/src/lua/lua_dns.c b/src/lua/lua_dns.c
index 0d12a137c..805921a03 100644
--- a/src/lua/lua_dns.c
+++ b/src/lua/lua_dns.c
@@ -147,7 +147,7 @@ lua_dns_callback (struct rdns_reply *reply, void *arg)
lua_pushvalue (L, -3);
}
- lua_resume_thread (cbdata->task, cbdata->thread, 2);
+ lua_resume_thread (cbdata->thread, 2);
if (cbdata->s) {
rspamd_session_watcher_pop (cbdata->s, cbdata->w);
diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c
index 979b31f6b..3525879dd 100644
--- a/src/lua/lua_thread_pool.c
+++ b/src/lua/lua_thread_pool.c
@@ -91,6 +91,10 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa
if (g_queue_get_length (pool->available_items) <= pool->max_items) {
thread_entry->cd = NULL;
+ thread_entry->finish_callback = NULL;
+ thread_entry->task = NULL;
+ thread_entry->cfg = NULL;
+
g_queue_push_head (pool->available_items, thread_entry);
}
else {
@@ -146,3 +150,88 @@ lua_thread_pool_restore_callback (struct lua_callback_state *cbs)
lua_thread_pool_return (cbs->thread_pool, cbs->my_thread);
lua_thread_pool_set_running_entry (cbs->thread_pool, cbs->previous_thread);
}
+
+static gint
+lua_do_resume (lua_State *L, gint narg)
+{
+#if LUA_VERSION_NUM < 503
+ return lua_resume (L, narg);
+#else
+ return lua_resume (L, NULL, narg);
+#endif
+}
+
+static void lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg);
+
+void
+lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg)
+{
+ g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't call running/yielded thread */
+ g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call running/yielded thread */
+
+ lua_resume_thread_internal (thread_entry, narg);
+}
+
+void
+lua_resume_thread (struct thread_entry *thread_entry, gint narg)
+{
+ /*
+ * The only state where we can resume from is LUA_YIELD
+ * Another acceptable status is OK (0) but in that case we should push function on stack
+ * to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function.
+ */
+ g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD);
+
+ lua_resume_thread_internal (thread_entry, narg);
+}
+
+static void
+lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg)
+{
+ gint ret;
+ struct lua_thread_pool *pool;
+ GString *tb;
+ struct rspamd_task *task;
+
+ ret = lua_do_resume (thread_entry->lua_state, narg);
+
+ if (ret != LUA_YIELD) {
+ /*
+ LUA_YIELD state should not be handled here.
+ It should only happen when the thread initiated a asynchronous event and it will be restored as soon
+ the event is finished
+ */
+
+ if (thread_entry->task) {
+ pool = thread_entry->task->cfg->lua_thread_pool;
+ }
+ else {
+ pool = thread_entry->cfg->lua_thread_pool;
+ }
+ if (ret == 0) {
+ if (thread_entry->finish_callback) {
+ thread_entry->finish_callback (thread_entry, ret);
+ }
+ lua_thread_pool_return (pool, thread_entry);
+ }
+ else {
+ tb = rspamd_lua_get_traceback_string (thread_entry->lua_state);
+ if (thread_entry->error_callback) {
+ thread_entry->error_callback (thread_entry, ret, tb->str);
+ }
+ else if (thread_entry->task) {
+ task = thread_entry->task;
+ msg_err_task ("lua call failed (%d): %v", ret, tb);
+ }
+ else {
+ msg_err ("lua call failed (%d): %v", ret, tb);
+ }
+
+ if (tb) {
+ g_string_free (tb, TRUE);
+ }
+ /* maybe there is a way to recover here. For now, just remove faulty thread */
+ lua_thread_pool_terminate_entry (pool, thread_entry);
+ }
+ }
+} \ No newline at end of file
diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h
index b72b72e8d..e5b2f2873 100644
--- a/src/lua/lua_thread_pool.h
+++ b/src/lua/lua_thread_pool.h
@@ -3,13 +3,25 @@
#include <lua.h>
+struct thread_entry;
+struct lua_thread_pool;
+
+typedef void (*lua_thread_finish_t) (struct thread_entry *thread, int ret);
+typedef void (*lua_thread_error_t) (struct thread_entry *thread, int ret, const char *msg);
+
struct thread_entry {
lua_State *lua_state;
gint thread_index;
gpointer cd;
-};
-struct thread_pool;
+ /* function to handle result of called method, can be NULL */
+ lua_thread_finish_t finish_callback;
+
+ /* function to log result, i.e. if you want to modify error logging message or somehow process this state, can be NUL */
+ lua_thread_error_t error_callback;
+ struct rspamd_task *task;
+ struct rspamd_config *cfg;
+};
struct lua_callback_state {
lua_State *L;
@@ -102,5 +114,9 @@ lua_thread_pool_prepare_callback (struct lua_thread_pool *pool, struct lua_callb
void
lua_thread_pool_restore_callback (struct lua_callback_state *cbs);
+
+void
+lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg);
+
#endif /* LUA_THREAD_POOL_H_ */