Browse Source

[Minor] Added coroutine support to HTTP module

tags/1.8.0
Mikhail Galanin 5 years ago
parent
commit
8c98b1f31b
5 changed files with 123 additions and 27 deletions
  1. 1
    1
      src/lua/lua_common.c
  2. 2
    2
      src/lua/lua_config.c
  3. 107
    10
      src/lua/lua_http.c
  4. 5
    4
      src/lua/lua_thread_pool.c
  5. 8
    10
      src/lua/lua_thread_pool.h

+ 1
- 1
src/lua/lua_common.c View File

@@ -1507,7 +1507,7 @@ GString *
rspamd_lua_get_traceback_string (lua_State *L)
{
GString *tb;
const gchar *msg = lua_tostring (L, 1);
const gchar *msg = lua_tostring (L, -1);

tb = g_string_sized_new (100);
g_string_append_printf (tb, "%s; trace:", msg);

+ 2
- 2
src/lua/lua_config.c View File

@@ -1209,7 +1209,7 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
thread_entry->cd = cd;

lua_State *thread = thread_entry->lua_state;
cd->stack_level = lua_gettop (cd->L);
cd->stack_level = lua_gettop (thread);

if (cd->cb_is_ref) {
lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref);
@@ -1226,7 +1226,7 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
thread_entry->error_callback = lua_metric_symbol_callback_error;
thread_entry->task = task;

lua_thread_call (task->cfg->lua_thread_pool, thread_entry, 1);
lua_thread_call (thread_entry, 1);
}

gint

+ 107
- 10
src/lua/lua_http.c View File

@@ -75,6 +75,7 @@ struct lua_http_cbdata {
gint flags;
gint fd;
gint cbref;
struct thread_entry *thread;
};

static const int default_http_timeout = 5000;
@@ -96,7 +97,10 @@ lua_http_fin (gpointer arg)
{
struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)arg;

luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref);
if (cbd->cbref != -1) {
luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref);
}

if (cbd->conn) {
/* Here we already have a connection, so we need to unref it */
rspamd_http_connection_unref (cbd->conn);
@@ -170,12 +174,19 @@ lua_http_push_error (struct lua_http_cbdata *cbd, const char *err)
lua_thread_pool_restore_callback (&lcbd);
}

static void lua_http_resume_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg, const char *err);

static void
lua_http_error_handler (struct rspamd_http_connection *conn, GError *err)
{
struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud;

lua_http_push_error (cbd, err->message);
if (cbd->cbref == -1) {
lua_http_resume_handler (conn, NULL, err->message);
}
else {
lua_http_push_error (cbd, err->message);
}
lua_http_maybe_free (cbd);
}

@@ -191,6 +202,11 @@ lua_http_finish_handler (struct rspamd_http_connection *conn,
struct lua_callback_state lcbd;
lua_State *L;

if (cbd->cbref == -1) {
lua_http_resume_handler (conn, msg, NULL);
lua_http_maybe_free (cbd);
return 0;
}
lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd);

L = lcbd.L;
@@ -245,6 +261,84 @@ lua_http_finish_handler (struct rspamd_http_connection *conn,
return 0;
}

/*
* resumes yielded thread
*/
static void
lua_http_resume_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg, const char *err)
{
struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud;
lua_State *L = cbd->thread->lua_state;
const gchar *body;
gsize body_len;
struct rspamd_http_header *h, *htmp;

msg_info ("T=%p, L=%p, status=%d, err=%s", cbd->thread, cbd->thread->lua_state, lua_status (cbd->thread->lua_state), err);
if (err) {
lua_pushstring (L, err);
lua_pushnil (L);
}
else {
/*
* 1 - nil (error)
* 2 - table:
* code (int)
* content (string)
* headers (table: header -> value)
*/
lua_pushnil (L); // error code

lua_createtable (L, 0, 3);

/* code */
lua_pushliteral (L, "code");
lua_pushinteger (L, msg->code);
lua_settable (L, -3);

/* content */
lua_pushliteral (L, "content");

body = rspamd_http_message_get_body (msg, &body_len);
if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_TEXT) {
struct rspamd_lua_text *t;

t = lua_newuserdata (L, sizeof (*t));
rspamd_lua_setclass (L, "rspamd{text}", -1);
t->start = body;
t->len = body_len;
t->flags = 0;
}
else {
if (body_len > 0) {
lua_pushlstring (L, body, body_len);
}
else {
lua_pushnil (L);
}
}
lua_settable (L, -3);

/* headers */
lua_pushliteral (L, "headers");
lua_newtable (L);

HASH_ITER (hh, msg->headers, h, htmp) {
/*
* Lowercase header name, as Lua cannot search in caseless matter
*/
rspamd_str_lc (h->combined->str, h->name.len);
lua_pushlstring (L, h->name.begin, h->name.len);
lua_pushlstring (L, h->value.begin, h->value.len);
lua_settable (L, -3);
}

lua_settable (L, -3);
}

lua_resume_thread (cbd->thread, 2);
}

static gboolean
lua_http_make_connection (struct lua_http_cbdata *cbd)
{
@@ -404,7 +498,7 @@ lua_http_request (lua_State *L)
const gchar *url, *lua_body;
rspamd_fstring_t *body = NULL;
gchar *to_resolve;
gint cbref;
gint cbref = -1;
gsize bodylen;
gdouble timeout = default_http_timeout;
gint flags = 0;
@@ -464,11 +558,9 @@ lua_http_request (lua_State *L)
lua_gettable (L, 1);
if (url == NULL || lua_type (L, -1) != LUA_TFUNCTION) {
lua_pop (L, 1);
msg_err ("http request has bad params");
lua_pushboolean (L, FALSE);
return 1;
} else {
cbref = luaL_ref (L, LUA_REGISTRYINDEX);
}
cbref = luaL_ref (L, LUA_REGISTRYINDEX);

lua_pushstring (L, "task");
lua_gettable (L, 1);
@@ -802,8 +894,13 @@ lua_http_request (lua_State *L)
}
}

lua_pushboolean (L, TRUE);
return 1;
if (cbd->cbref == -1) {
cbd->thread = lua_thread_pool_get_running_entry (cfg->lua_thread_pool);
return lua_yield_thread (cbd->thread, 0);
} else {
lua_pushboolean (L, TRUE);
return 1;
}
}

static gint

+ 5
- 4
src/lua/lua_thread_pool.c View File

@@ -92,6 +92,7 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa
if (g_queue_get_length (pool->available_items) <= pool->max_items) {
thread_entry->cd = NULL;
thread_entry->finish_callback = NULL;
thread_entry->error_callback = NULL;
thread_entry->task = NULL;
thread_entry->cfg = NULL;

@@ -102,7 +103,7 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa
}
}

void
static void
lua_thread_pool_terminate_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry)
{
struct thread_entry *ent = NULL;
@@ -164,10 +165,10 @@ lua_do_resume (lua_State *L, gint narg)
static void lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg);

void
lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg)
lua_thread_call (struct thread_entry *thread_entry, int narg)
{
g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't call running/yielded thread */
g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call running/yielded thread */
g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call without pool */

lua_resume_thread_internal (thread_entry, narg);
}
@@ -178,7 +179,7 @@ lua_resume_thread (struct thread_entry *thread_entry, gint narg)
/*
* The only state where we can resume from is LUA_YIELD
* Another acceptable status is OK (0) but in that case we should push function on stack
* to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function.
* to start the thread from, which is happening in lua_thread_call(), not in this function.
*/
g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD);


+ 8
- 10
src/lua/lua_thread_pool.h View File

@@ -70,15 +70,6 @@ lua_thread_pool_get(struct lua_thread_pool *pool);
void
lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry);

/**
* Removes thread from Lua state. It should be done to dead (which ended with an error) threads only
*
* @param pool
* @param thread_entry
*/
void
lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry);

/**
* Currently running thread. Typically needed in yielding point - to fill-up continuation.
*
@@ -115,8 +106,15 @@ void
lua_thread_pool_restore_callback (struct lua_callback_state *cbs);


/**
* Acts like lua_call but the tread is able to suspend execution.
* As soon as the call is over, call either thread_entry::finish_callback or thread_entry::error_callback.
*
* @param thread_entry
* @param narg
*/
void
lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg);
lua_thread_call (struct thread_entry *thread_entry, int narg);

#endif /* LUA_THREAD_POOL_H_ */


Loading…
Cancel
Save