aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMikhail Galanin <mgalanin@mimecast.com>2018-08-23 13:45:32 +0100
committerMikhail Galanin <mgalanin@mimecast.com>2018-08-23 13:45:32 +0100
commit8c98b1f31b80702c5634f634a59a9f950d57a9a0 (patch)
tree7da9fe321b3a1ddb70e794c1b951420f737b0ee3
parent170a4c4a04208aad1249494321be64e1f7b6c71b (diff)
downloadrspamd-8c98b1f31b80702c5634f634a59a9f950d57a9a0.tar.gz
rspamd-8c98b1f31b80702c5634f634a59a9f950d57a9a0.zip
[Minor] Added coroutine support to HTTP module
-rw-r--r--src/lua/lua_common.c2
-rw-r--r--src/lua/lua_config.c4
-rw-r--r--src/lua/lua_http.c117
-rw-r--r--src/lua/lua_thread_pool.c9
-rw-r--r--src/lua/lua_thread_pool.h18
5 files changed, 123 insertions, 27 deletions
diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c
index f446ec9e8..4f3c8046d 100644
--- a/src/lua/lua_common.c
+++ b/src/lua/lua_common.c
@@ -1507,7 +1507,7 @@ GString *
rspamd_lua_get_traceback_string (lua_State *L)
{
GString *tb;
- const gchar *msg = lua_tostring (L, 1);
+ const gchar *msg = lua_tostring (L, -1);
tb = g_string_sized_new (100);
g_string_append_printf (tb, "%s; trace:", msg);
diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c
index 629653e85..bf7c68ac5 100644
--- a/src/lua/lua_config.c
+++ b/src/lua/lua_config.c
@@ -1209,7 +1209,7 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
thread_entry->cd = cd;
lua_State *thread = thread_entry->lua_state;
- cd->stack_level = lua_gettop (cd->L);
+ cd->stack_level = lua_gettop (thread);
if (cd->cb_is_ref) {
lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref);
@@ -1226,7 +1226,7 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
thread_entry->error_callback = lua_metric_symbol_callback_error;
thread_entry->task = task;
- lua_thread_call (task->cfg->lua_thread_pool, thread_entry, 1);
+ lua_thread_call (thread_entry, 1);
}
gint
diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c
index 64617be9b..3b0030506 100644
--- a/src/lua/lua_http.c
+++ b/src/lua/lua_http.c
@@ -75,6 +75,7 @@ struct lua_http_cbdata {
gint flags;
gint fd;
gint cbref;
+ struct thread_entry *thread;
};
static const int default_http_timeout = 5000;
@@ -96,7 +97,10 @@ lua_http_fin (gpointer arg)
{
struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)arg;
- luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref);
+ if (cbd->cbref != -1) {
+ luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref);
+ }
+
if (cbd->conn) {
/* Here we already have a connection, so we need to unref it */
rspamd_http_connection_unref (cbd->conn);
@@ -170,12 +174,19 @@ lua_http_push_error (struct lua_http_cbdata *cbd, const char *err)
lua_thread_pool_restore_callback (&lcbd);
}
+static void lua_http_resume_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg, const char *err);
+
static void
lua_http_error_handler (struct rspamd_http_connection *conn, GError *err)
{
struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud;
-
- lua_http_push_error (cbd, err->message);
+ if (cbd->cbref == -1) {
+ lua_http_resume_handler (conn, NULL, err->message);
+ }
+ else {
+ lua_http_push_error (cbd, err->message);
+ }
lua_http_maybe_free (cbd);
}
@@ -191,6 +202,11 @@ lua_http_finish_handler (struct rspamd_http_connection *conn,
struct lua_callback_state lcbd;
lua_State *L;
+ if (cbd->cbref == -1) {
+ lua_http_resume_handler (conn, msg, NULL);
+ lua_http_maybe_free (cbd);
+ return 0;
+ }
lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd);
L = lcbd.L;
@@ -245,6 +261,84 @@ lua_http_finish_handler (struct rspamd_http_connection *conn,
return 0;
}
+/*
+ * resumes yielded thread
+ */
+static void
+lua_http_resume_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg, const char *err)
+{
+ struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud;
+ lua_State *L = cbd->thread->lua_state;
+ const gchar *body;
+ gsize body_len;
+ struct rspamd_http_header *h, *htmp;
+
+ msg_info ("T=%p, L=%p, status=%d, err=%s", cbd->thread, cbd->thread->lua_state, lua_status (cbd->thread->lua_state), err);
+ if (err) {
+ lua_pushstring (L, err);
+ lua_pushnil (L);
+ }
+ else {
+ /*
+ * 1 - nil (error)
+ * 2 - table:
+ * code (int)
+ * content (string)
+ * headers (table: header -> value)
+ */
+ lua_pushnil (L); // error code
+
+ lua_createtable (L, 0, 3);
+
+ /* code */
+ lua_pushliteral (L, "code");
+ lua_pushinteger (L, msg->code);
+ lua_settable (L, -3);
+
+ /* content */
+ lua_pushliteral (L, "content");
+
+ body = rspamd_http_message_get_body (msg, &body_len);
+ if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_TEXT) {
+ struct rspamd_lua_text *t;
+
+ t = lua_newuserdata (L, sizeof (*t));
+ rspamd_lua_setclass (L, "rspamd{text}", -1);
+ t->start = body;
+ t->len = body_len;
+ t->flags = 0;
+ }
+ else {
+ if (body_len > 0) {
+ lua_pushlstring (L, body, body_len);
+ }
+ else {
+ lua_pushnil (L);
+ }
+ }
+ lua_settable (L, -3);
+
+ /* headers */
+ lua_pushliteral (L, "headers");
+ lua_newtable (L);
+
+ HASH_ITER (hh, msg->headers, h, htmp) {
+ /*
+ * Lowercase header name, as Lua cannot search in caseless matter
+ */
+ rspamd_str_lc (h->combined->str, h->name.len);
+ lua_pushlstring (L, h->name.begin, h->name.len);
+ lua_pushlstring (L, h->value.begin, h->value.len);
+ lua_settable (L, -3);
+ }
+
+ lua_settable (L, -3);
+ }
+
+ lua_resume_thread (cbd->thread, 2);
+}
+
static gboolean
lua_http_make_connection (struct lua_http_cbdata *cbd)
{
@@ -404,7 +498,7 @@ lua_http_request (lua_State *L)
const gchar *url, *lua_body;
rspamd_fstring_t *body = NULL;
gchar *to_resolve;
- gint cbref;
+ gint cbref = -1;
gsize bodylen;
gdouble timeout = default_http_timeout;
gint flags = 0;
@@ -464,11 +558,9 @@ lua_http_request (lua_State *L)
lua_gettable (L, 1);
if (url == NULL || lua_type (L, -1) != LUA_TFUNCTION) {
lua_pop (L, 1);
- msg_err ("http request has bad params");
- lua_pushboolean (L, FALSE);
- return 1;
+ } else {
+ cbref = luaL_ref (L, LUA_REGISTRYINDEX);
}
- cbref = luaL_ref (L, LUA_REGISTRYINDEX);
lua_pushstring (L, "task");
lua_gettable (L, 1);
@@ -802,8 +894,13 @@ lua_http_request (lua_State *L)
}
}
- lua_pushboolean (L, TRUE);
- return 1;
+ if (cbd->cbref == -1) {
+ cbd->thread = lua_thread_pool_get_running_entry (cfg->lua_thread_pool);
+ return lua_yield_thread (cbd->thread, 0);
+ } else {
+ lua_pushboolean (L, TRUE);
+ return 1;
+ }
}
static gint
diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c
index 3525879dd..4c1681f96 100644
--- a/src/lua/lua_thread_pool.c
+++ b/src/lua/lua_thread_pool.c
@@ -92,6 +92,7 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa
if (g_queue_get_length (pool->available_items) <= pool->max_items) {
thread_entry->cd = NULL;
thread_entry->finish_callback = NULL;
+ thread_entry->error_callback = NULL;
thread_entry->task = NULL;
thread_entry->cfg = NULL;
@@ -102,7 +103,7 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa
}
}
-void
+static void
lua_thread_pool_terminate_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry)
{
struct thread_entry *ent = NULL;
@@ -164,10 +165,10 @@ lua_do_resume (lua_State *L, gint narg)
static void lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg);
void
-lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg)
+lua_thread_call (struct thread_entry *thread_entry, int narg)
{
g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't call running/yielded thread */
- g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call running/yielded thread */
+ g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call without pool */
lua_resume_thread_internal (thread_entry, narg);
}
@@ -178,7 +179,7 @@ lua_resume_thread (struct thread_entry *thread_entry, gint narg)
/*
* The only state where we can resume from is LUA_YIELD
* Another acceptable status is OK (0) but in that case we should push function on stack
- * to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function.
+ * to start the thread from, which is happening in lua_thread_call(), not in this function.
*/
g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD);
diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h
index e5b2f2873..c77f77455 100644
--- a/src/lua/lua_thread_pool.h
+++ b/src/lua/lua_thread_pool.h
@@ -71,15 +71,6 @@ void
lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry);
/**
- * Removes thread from Lua state. It should be done to dead (which ended with an error) threads only
- *
- * @param pool
- * @param thread_entry
- */
-void
-lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry);
-
-/**
* Currently running thread. Typically needed in yielding point - to fill-up continuation.
*
* @param pool
@@ -115,8 +106,15 @@ void
lua_thread_pool_restore_callback (struct lua_callback_state *cbs);
+/**
+ * Acts like lua_call but the tread is able to suspend execution.
+ * As soon as the call is over, call either thread_entry::finish_callback or thread_entry::error_callback.
+ *
+ * @param thread_entry
+ * @param narg
+ */
void
-lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg);
+lua_thread_call (struct thread_entry *thread_entry, int narg);
#endif /* LUA_THREAD_POOL_H_ */