aboutsummaryrefslogtreecommitdiffstats
path: root/src/lua
diff options
context:
space:
mode:
authorMikhail Galanin <mgalanin@mimecast.com>2018-08-08 09:01:49 +0100
committerMikhail Galanin <mgalanin@mimecast.com>2018-08-08 09:06:27 +0100
commit2c501018c04ca0a972c600b1324be21545e9f133 (patch)
tree7b7b81462669ae5a7c52d1956ba525e1022c5400 /src/lua
parent77d8a4588723e97c6f6a3cfef584a6ef5516b951 (diff)
downloadrspamd-2c501018c04ca0a972c600b1324be21545e9f133.tar.gz
rspamd-2c501018c04ca0a972c600b1324be21545e9f133.zip
[Project] coroutine threaded model for API calls: thread pool
Diffstat (limited to 'src/lua')
-rw-r--r--src/lua/CMakeLists.txt3
-rw-r--r--src/lua/lua_config.c100
-rw-r--r--src/lua/lua_thread_pool.c125
-rw-r--r--src/lua/lua_thread_pool.h35
4 files changed, 229 insertions, 34 deletions
diff --git a/src/lua/CMakeLists.txt b/src/lua/CMakeLists.txt
index 9c561c0e4..ffc4b27ca 100644
--- a/src/lua/CMakeLists.txt
+++ b/src/lua/CMakeLists.txt
@@ -25,6 +25,7 @@ SET(LUASRC ${CMAKE_CURRENT_SOURCE_DIR}/lua_common.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_fann.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_sqlite3.c
${CMAKE_CURRENT_SOURCE_DIR}/lua_cryptobox.c
- ${CMAKE_CURRENT_SOURCE_DIR}/lua_map.c)
+ ${CMAKE_CURRENT_SOURCE_DIR}/lua_map.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/lua_thread_pool.c)
SET(RSPAMD_LUA ${LUASRC} PARENT_SCOPE) \ No newline at end of file
diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c
index 2093cbe01..2ee7d072d 100644
--- a/src/lua/lua_config.c
+++ b/src/lua/lua_config.c
@@ -19,6 +19,7 @@
#include "libserver/composites.h"
#include "libmime/lang_detection.h"
#include "lua/lua_map.h"
+#include "lua/lua_thread_pool.h"
#include "utlist.h"
#include <math.h>
@@ -1036,6 +1037,8 @@ struct lua_callback_data {
gint ref;
} callback;
gboolean cb_is_ref;
+ gpointer thread_entry;
+ gint stack_level;
gint order;
};
@@ -1185,42 +1188,68 @@ lua_watcher_callback (gpointer session_data, gpointer ud)
}
static void
+lua_metric_symbol_callback_return (struct rspamd_task *task, gpointer ud, gint ret);
+
+static void
lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
{
struct lua_callback_data *cd = ud;
struct rspamd_task **ptask;
- gint level = lua_gettop (cd->L), nresults, err_idx, ret;
- lua_State *L = cd->L;
- GString *tb;
- struct rspamd_symbol_result *s;
+ gint ret;
- lua_pushcfunction (L, &rspamd_lua_traceback);
- err_idx = lua_gettop (L);
+ struct thread_entry *thread_entry = lua_thread_pool_get(task->cfg->lua_thread_pool);
+ cd->thread_entry = thread_entry;
- level ++;
+ lua_State *thread = thread_entry->lua_state;
+ cd->stack_level = lua_gettop (cd->L);
if (cd->cb_is_ref) {
- lua_rawgeti (L, LUA_REGISTRYINDEX, cd->callback.ref);
+ lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref);
}
else {
- lua_getglobal (L, cd->callback.name);
+ lua_getglobal (thread, cd->callback.name);
}
- ptask = lua_newuserdata (L, sizeof (struct rspamd_task *));
- rspamd_lua_setclass (L, "rspamd{task}", -1);
+ ptask = lua_newuserdata (thread, sizeof (struct rspamd_task *));
+ rspamd_lua_setclass (thread, "rspamd{task}", -1);
*ptask = task;
- if ((ret = lua_pcall (L, 1, LUA_MULTRET, err_idx)) != 0) {
- tb = lua_touserdata (L, -1);
+ ret = lua_resume (thread, 1);
+
+ if (ret == LUA_YIELD) {
+ msg_err_task ("LUA_YIELD");
+ } else {
+ lua_metric_symbol_callback_return (task, ud, ret);
+ }
+}
+
+static void
+lua_metric_symbol_callback_return (struct rspamd_task *task, gpointer ud, gint ret)
+{
+ GString *tb;
+ struct lua_callback_data *cd = ud;
+ int nresults;
+ struct rspamd_symbol_result *s;
+ struct thread_entry *thread_entry = cd->thread_entry;
+ lua_State *thread = thread_entry->lua_state;
+
+ if (ret != 0) {
+ lua_pushcfunction (thread, rspamd_lua_traceback);
+ lua_call (thread, 0, 1);
+
+ tb = lua_touserdata (thread, -1);
msg_err_task ("call to (%s) failed (%d): %v", cd->symbol, ret, tb);
if (tb) {
g_string_free (tb, TRUE);
- lua_pop (L, 1);
+ lua_pop (thread, 1);
}
+ assert (lua_gettop (thread) >= cd->stack_level);
+ // maybe there is a way to recover here. For now, just remove foulty thread
+ lua_thread_pool_terminate_entry (task->cfg->lua_thread_pool, cd->thread_entry);
}
else {
- nresults = lua_gettop (L) - level;
+ nresults = lua_gettop (thread) - cd->stack_level;
if (nresults >= 1) {
/* Function returned boolean, so maybe we need to insert result? */
@@ -1230,16 +1259,16 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
gint type;
struct lua_watcher_data *wd;
- type = lua_type (cd->L, level + 1);
+ type = lua_type (thread, cd->stack_level + 1);
if (type == LUA_TBOOLEAN) {
- res = lua_toboolean (L, level + 1);
+ res = lua_toboolean (thread, cd->stack_level + 1);
}
else if (type == LUA_TFUNCTION) {
/* Function returned a closure that should be watched for */
wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd));
- lua_pushvalue (cd->L, level + 1);
- wd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX);
+ lua_pushvalue (thread /*cd->L*/, cd->stack_level + 1);
+ wd->cb_ref = luaL_ref (thread, LUA_REGISTRYINDEX);
wd->cbd = cd;
rspamd_session_watcher_push_callback (task->s,
rspamd_session_get_watcher (task->s),
@@ -1252,14 +1281,14 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
rspamd_session_get_watcher (task->s));
}
else {
- res = lua_tonumber (L, level + 1);
+ res = lua_tonumber (thread, cd->stack_level + 1);
}
if (res) {
gint first_opt = 2;
- if (lua_type (L, level + 2) == LUA_TNUMBER) {
- flag = lua_tonumber (L, level + 2);
+ if (lua_type (thread, cd->stack_level + 2) == LUA_TNUMBER) {
+ flag = lua_tonumber (thread, cd->stack_level + 2);
/* Shift opt index */
first_opt = 3;
}
@@ -1270,35 +1299,40 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
s = rspamd_task_insert_result (task, cd->symbol, flag, NULL);
if (s) {
- guint last_pos = lua_gettop (L);
+ guint last_pos = lua_gettop (thread);
- for (i = level + first_opt; i <= last_pos; i++) {
- if (lua_type (L, i) == LUA_TSTRING) {
- const char *opt = lua_tostring (L, i);
+ for (i = cd->stack_level + first_opt; i <= last_pos; i++) {
+ if (lua_type (thread, i) == LUA_TSTRING) {
+ const char *opt = lua_tostring (thread, i);
rspamd_task_add_result_option (task, s, opt);
}
- else if (lua_type (L, i) == LUA_TTABLE) {
- lua_pushvalue (L, i);
+ else if (lua_type (thread, i) == LUA_TTABLE) {
+ lua_pushvalue (thread, i);
- for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) {
- const char *opt = lua_tostring (L, -1);
+ for (lua_pushnil (thread); lua_next (thread, -2); lua_pop (thread, 1)) {
+ const char *opt = lua_tostring (thread, -1);
rspamd_task_add_result_option (task, s, opt);
}
- lua_pop (L, 1);
+ lua_pop (thread, 1);
}
}
}
}
- lua_pop (L, nresults);
+ lua_pop (thread, nresults);
}
+
+ assert (lua_gettop (thread) == cd->stack_level); /* we properly cleaned up the stack */
+
+ lua_thread_pool_return(task->cfg->lua_thread_pool, cd->thread_entry);
}
- lua_pop (L, 1); /* Error function */
+ cd->thread_entry = NULL;
+ cd->stack_level = 0;
}
static gint
diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c
new file mode 100644
index 000000000..6effbe769
--- /dev/null
+++ b/src/lua/lua_thread_pool.c
@@ -0,0 +1,125 @@
+#include "config.h"
+
+#include <assert.h>
+
+#include "lua_common.h"
+#include "lua_thread_pool.h"
+
+struct lua_thread_pool {
+ GQueue *available_items;
+ lua_State *L;
+ gint max_items;
+ struct thread_entry *running_entry;
+};
+
+static struct thread_entry *
+thread_entry_new (lua_State * L)
+{
+ struct thread_entry *ent;
+ ent = g_malloc (sizeof *ent);
+ ent->lua_state = lua_newthread (L);
+ ent->thread_index = luaL_ref (L, LUA_REGISTRYINDEX);
+ return ent;
+}
+
+static void
+thread_entry_free (lua_State * L, struct thread_entry *ent)
+{
+ luaL_unref (L, LUA_REGISTRYINDEX, ent->thread_index);
+ g_free (ent);
+}
+
+struct lua_thread_pool *
+lua_thread_pool_new (lua_State * L)
+{
+ struct lua_thread_pool * pool = g_new0 (struct lua_thread_pool, 1);
+
+ pool->L = L;
+ pool->max_items = 100;
+
+ pool->available_items = g_queue_new ();
+ int i;
+
+ struct thread_entry *ent;
+ for (i = 0; i < pool->max_items; i ++) {
+ ent = thread_entry_new (pool->L);
+ g_queue_push_head (pool->available_items, ent);
+ }
+
+ return pool;
+}
+
+void
+lua_thread_pool_free (struct lua_thread_pool *pool)
+{
+ struct thread_entry *ent = NULL;
+ while (!g_queue_is_empty (pool->available_items)) {
+ ent = g_queue_pop_head (pool->available_items);
+ thread_entry_free (pool->L, ent);
+ }
+ g_queue_free (pool->available_items);
+ g_free (pool);
+}
+
+struct thread_entry *
+lua_thread_pool_get(struct lua_thread_pool *pool)
+{
+ gpointer cur;
+ struct thread_entry *ent = NULL;
+
+ cur = g_queue_pop_head (pool->available_items);
+
+ if (cur) {
+ ent = cur;
+ }
+ else {
+ ent = thread_entry_new (pool->L);
+ }
+ return ent;
+}
+
+void
+lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry)
+{
+ assert (lua_status (thread_entry->lua_state) == 0); // we can't return a running/yielded stack into the pool
+ if (pool->running_entry == thread_entry) {
+ pool->running_entry = NULL;
+ }
+ if (g_queue_get_length (pool->available_items) <= pool->max_items) {
+ g_queue_push_head (pool->available_items, thread_entry);
+ }
+ else {
+ thread_entry_free (pool->L, thread_entry);
+ }
+}
+
+void
+lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry)
+{
+ struct thread_entry *ent = NULL;
+
+ if (pool->running_entry == thread_entry) {
+ pool->running_entry = NULL;
+ }
+
+ // we should only terminate failed threads
+ assert (lua_status (thread_entry->lua_state) != 0 && lua_status (thread_entry->lua_state) != LUA_YIELD);
+ thread_entry_free (pool->L, thread_entry);
+
+ if (g_queue_get_length (pool->available_items) <= pool->max_items) {
+ ent = thread_entry_new (pool->L);
+ g_queue_push_head (pool->available_items, ent);
+ }
+}
+
+struct thread_entry *
+lua_thread_pool_get_running_entry(struct lua_thread_pool *pool)
+{
+ return pool->running_entry;
+}
+
+void
+lua_thread_pool_set_running_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry)
+{
+ pool->running_entry = thread_entry;
+}
diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h
new file mode 100644
index 000000000..01643df9e
--- /dev/null
+++ b/src/lua/lua_thread_pool.h
@@ -0,0 +1,35 @@
+#ifndef LUA_THREAD_POOL_H_
+#define LUA_THREAD_POOL_H_
+
+#include <lua.h>
+
+struct thread_entry {
+ lua_State *lua_state;
+ gint thread_index;
+};
+
+struct thread_pool;
+
+struct lua_thread_pool *
+lua_thread_pool_new (lua_State * L);
+
+void
+lua_thread_pool_free (struct lua_thread_pool *pool);
+
+struct thread_entry *
+lua_thread_pool_get(struct lua_thread_pool *pool);
+
+void
+lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry);
+
+void
+lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry);
+
+struct thread_entry *
+lua_thread_pool_get_running_entry(struct lua_thread_pool *pool);
+
+void
+lua_thread_pool_set_running_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry);
+
+#endif /* LUA_THREAD_POOL_H_ */
+