diff options
author | Mikhail Galanin <mgalanin@mimecast.com> | 2018-08-23 13:45:32 +0100 |
---|---|---|
committer | Mikhail Galanin <mgalanin@mimecast.com> | 2018-08-23 13:45:32 +0100 |
commit | 8c98b1f31b80702c5634f634a59a9f950d57a9a0 (patch) | |
tree | 7da9fe321b3a1ddb70e794c1b951420f737b0ee3 | |
parent | 170a4c4a04208aad1249494321be64e1f7b6c71b (diff) | |
download | rspamd-8c98b1f31b80702c5634f634a59a9f950d57a9a0.tar.gz rspamd-8c98b1f31b80702c5634f634a59a9f950d57a9a0.zip |
[Minor] Added coroutine support to HTTP module
-rw-r--r-- | src/lua/lua_common.c | 2 | ||||
-rw-r--r-- | src/lua/lua_config.c | 4 | ||||
-rw-r--r-- | src/lua/lua_http.c | 117 | ||||
-rw-r--r-- | src/lua/lua_thread_pool.c | 9 | ||||
-rw-r--r-- | src/lua/lua_thread_pool.h | 18 |
5 files changed, 123 insertions, 27 deletions
diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c index f446ec9e8..4f3c8046d 100644 --- a/src/lua/lua_common.c +++ b/src/lua/lua_common.c @@ -1507,7 +1507,7 @@ GString * rspamd_lua_get_traceback_string (lua_State *L) { GString *tb; - const gchar *msg = lua_tostring (L, 1); + const gchar *msg = lua_tostring (L, -1); tb = g_string_sized_new (100); g_string_append_printf (tb, "%s; trace:", msg); diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c index 629653e85..bf7c68ac5 100644 --- a/src/lua/lua_config.c +++ b/src/lua/lua_config.c @@ -1209,7 +1209,7 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) thread_entry->cd = cd; lua_State *thread = thread_entry->lua_state; - cd->stack_level = lua_gettop (cd->L); + cd->stack_level = lua_gettop (thread); if (cd->cb_is_ref) { lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref); @@ -1226,7 +1226,7 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) thread_entry->error_callback = lua_metric_symbol_callback_error; thread_entry->task = task; - lua_thread_call (task->cfg->lua_thread_pool, thread_entry, 1); + lua_thread_call (thread_entry, 1); } gint diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c index 64617be9b..3b0030506 100644 --- a/src/lua/lua_http.c +++ b/src/lua/lua_http.c @@ -75,6 +75,7 @@ struct lua_http_cbdata { gint flags; gint fd; gint cbref; + struct thread_entry *thread; }; static const int default_http_timeout = 5000; @@ -96,7 +97,10 @@ lua_http_fin (gpointer arg) { struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)arg; - luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref); + if (cbd->cbref != -1) { + luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref); + } + if (cbd->conn) { /* Here we already have a connection, so we need to unref it */ rspamd_http_connection_unref (cbd->conn); @@ -170,12 +174,19 @@ lua_http_push_error (struct lua_http_cbdata *cbd, const char *err) lua_thread_pool_restore_callback (&lcbd); } +static void lua_http_resume_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg, const char *err); + static void lua_http_error_handler (struct rspamd_http_connection *conn, GError *err) { struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud; - - lua_http_push_error (cbd, err->message); + if (cbd->cbref == -1) { + lua_http_resume_handler (conn, NULL, err->message); + } + else { + lua_http_push_error (cbd, err->message); + } lua_http_maybe_free (cbd); } @@ -191,6 +202,11 @@ lua_http_finish_handler (struct rspamd_http_connection *conn, struct lua_callback_state lcbd; lua_State *L; + if (cbd->cbref == -1) { + lua_http_resume_handler (conn, msg, NULL); + lua_http_maybe_free (cbd); + return 0; + } lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd); L = lcbd.L; @@ -245,6 +261,84 @@ lua_http_finish_handler (struct rspamd_http_connection *conn, return 0; } +/* + * resumes yielded thread + */ +static void +lua_http_resume_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg, const char *err) +{ + struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud; + lua_State *L = cbd->thread->lua_state; + const gchar *body; + gsize body_len; + struct rspamd_http_header *h, *htmp; + + msg_info ("T=%p, L=%p, status=%d, err=%s", cbd->thread, cbd->thread->lua_state, lua_status (cbd->thread->lua_state), err); + if (err) { + lua_pushstring (L, err); + lua_pushnil (L); + } + else { + /* + * 1 - nil (error) + * 2 - table: + * code (int) + * content (string) + * headers (table: header -> value) + */ + lua_pushnil (L); // error code + + lua_createtable (L, 0, 3); + + /* code */ + lua_pushliteral (L, "code"); + lua_pushinteger (L, msg->code); + lua_settable (L, -3); + + /* content */ + lua_pushliteral (L, "content"); + + body = rspamd_http_message_get_body (msg, &body_len); + if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_TEXT) { + struct rspamd_lua_text *t; + + t = lua_newuserdata (L, sizeof (*t)); + rspamd_lua_setclass (L, "rspamd{text}", -1); + t->start = body; + t->len = body_len; + t->flags = 0; + } + else { + if (body_len > 0) { + lua_pushlstring (L, body, body_len); + } + else { + lua_pushnil (L); + } + } + lua_settable (L, -3); + + /* headers */ + lua_pushliteral (L, "headers"); + lua_newtable (L); + + HASH_ITER (hh, msg->headers, h, htmp) { + /* + * Lowercase header name, as Lua cannot search in caseless matter + */ + rspamd_str_lc (h->combined->str, h->name.len); + lua_pushlstring (L, h->name.begin, h->name.len); + lua_pushlstring (L, h->value.begin, h->value.len); + lua_settable (L, -3); + } + + lua_settable (L, -3); + } + + lua_resume_thread (cbd->thread, 2); +} + static gboolean lua_http_make_connection (struct lua_http_cbdata *cbd) { @@ -404,7 +498,7 @@ lua_http_request (lua_State *L) const gchar *url, *lua_body; rspamd_fstring_t *body = NULL; gchar *to_resolve; - gint cbref; + gint cbref = -1; gsize bodylen; gdouble timeout = default_http_timeout; gint flags = 0; @@ -464,11 +558,9 @@ lua_http_request (lua_State *L) lua_gettable (L, 1); if (url == NULL || lua_type (L, -1) != LUA_TFUNCTION) { lua_pop (L, 1); - msg_err ("http request has bad params"); - lua_pushboolean (L, FALSE); - return 1; + } else { + cbref = luaL_ref (L, LUA_REGISTRYINDEX); } - cbref = luaL_ref (L, LUA_REGISTRYINDEX); lua_pushstring (L, "task"); lua_gettable (L, 1); @@ -802,8 +894,13 @@ lua_http_request (lua_State *L) } } - lua_pushboolean (L, TRUE); - return 1; + if (cbd->cbref == -1) { + cbd->thread = lua_thread_pool_get_running_entry (cfg->lua_thread_pool); + return lua_yield_thread (cbd->thread, 0); + } else { + lua_pushboolean (L, TRUE); + return 1; + } } static gint diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c index 3525879dd..4c1681f96 100644 --- a/src/lua/lua_thread_pool.c +++ b/src/lua/lua_thread_pool.c @@ -92,6 +92,7 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa if (g_queue_get_length (pool->available_items) <= pool->max_items) { thread_entry->cd = NULL; thread_entry->finish_callback = NULL; + thread_entry->error_callback = NULL; thread_entry->task = NULL; thread_entry->cfg = NULL; @@ -102,7 +103,7 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa } } -void +static void lua_thread_pool_terminate_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry) { struct thread_entry *ent = NULL; @@ -164,10 +165,10 @@ lua_do_resume (lua_State *L, gint narg) static void lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg); void -lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg) +lua_thread_call (struct thread_entry *thread_entry, int narg) { g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't call running/yielded thread */ - g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call running/yielded thread */ + g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call without pool */ lua_resume_thread_internal (thread_entry, narg); } @@ -178,7 +179,7 @@ lua_resume_thread (struct thread_entry *thread_entry, gint narg) /* * The only state where we can resume from is LUA_YIELD * Another acceptable status is OK (0) but in that case we should push function on stack - * to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function. + * to start the thread from, which is happening in lua_thread_call(), not in this function. */ g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD); diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h index e5b2f2873..c77f77455 100644 --- a/src/lua/lua_thread_pool.h +++ b/src/lua/lua_thread_pool.h @@ -71,15 +71,6 @@ void lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry); /** - * Removes thread from Lua state. It should be done to dead (which ended with an error) threads only - * - * @param pool - * @param thread_entry - */ -void -lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry); - -/** * Currently running thread. Typically needed in yielding point - to fill-up continuation. * * @param pool @@ -115,8 +106,15 @@ void lua_thread_pool_restore_callback (struct lua_callback_state *cbs); +/** + * Acts like lua_call but the tread is able to suspend execution. + * As soon as the call is over, call either thread_entry::finish_callback or thread_entry::error_callback. + * + * @param thread_entry + * @param narg + */ void -lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg); +lua_thread_call (struct thread_entry *thread_entry, int narg); #endif /* LUA_THREAD_POOL_H_ */ |