summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-08-23 16:21:02 +0100
committerGitHub <noreply@github.com>2018-08-23 16:21:02 +0100
commite94ceeaf832692de3f3f212701c7a6dd74cd98d6 (patch)
tree6374c9c174798018dad81bdd999ed8cf57a98b07
parentf4d26bd0a17cb6e8c6839f4b8cda65257911a93e (diff)
parenta4b08ac566aa631f4421114e4131b0098139a009 (diff)
downloadrspamd-e94ceeaf832692de3f3f212701c7a6dd74cd98d6.tar.gz
rspamd-e94ceeaf832692de3f3f212701c7a6dd74cd98d6.zip
Merge pull request #2438 from negram/add-coroutine-to-http
Add coroutine to http
-rw-r--r--.circleci/config.yml2
-rw-r--r--src/lua/lua_common.c2
-rw-r--r--src/lua/lua_common.h2
-rw-r--r--src/lua/lua_config.c190
-rw-r--r--src/lua/lua_dns.c2
-rw-r--r--src/lua/lua_http.c117
-rw-r--r--src/lua/lua_thread_pool.c92
-rw-r--r--src/lua/lua_thread_pool.h36
-rw-r--r--test/functional/cases/210_clickhouse/001_migration.robot2
-rw-r--r--test/functional/cases/220_http.robot65
-rw-r--r--test/functional/configs/lua_test.conf4
-rw-r--r--test/functional/configs/plugins.conf5
-rw-r--r--test/functional/lib/rspamd.py36
-rw-r--r--test/functional/lua/http.lua78
-rwxr-xr-xtest/functional/util/dummy_fprot.py9
-rwxr-xr-xtest/functional/util/dummy_http.py100
16 files changed, 585 insertions, 157 deletions
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 1df9ddc7f..3755257ba 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -93,7 +93,7 @@ jobs:
- run: sudo apt-get install -qq libluajit-5.1-dev libpcre3-dev luarocks opendkim-tools python-pip redis-server
- run: sudo apt-get install clickhouse-server
- - run: sudo pip install demjson psutil robotframework requests
+ - run: sudo pip install demjson psutil robotframework requests http
- run: sudo luarocks install luacheck
- run: cd ../build
diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c
index f446ec9e8..4f3c8046d 100644
--- a/src/lua/lua_common.c
+++ b/src/lua/lua_common.c
@@ -1507,7 +1507,7 @@ GString *
rspamd_lua_get_traceback_string (lua_State *L)
{
GString *tb;
- const gchar *msg = lua_tostring (L, 1);
+ const gchar *msg = lua_tostring (L, -1);
tb = g_string_sized_new (100);
g_string_append_printf (tb, "%s; trace:", msg);
diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h
index 0a9527bb0..b84701bc1 100644
--- a/src/lua/lua_common.h
+++ b/src/lua/lua_common.h
@@ -434,7 +434,7 @@ lua_yield_thread (struct thread_entry *thread_entry, gint nresults);
* @param narg
*/
void
-lua_resume_thread (struct rspamd_task *task, struct thread_entry *thread_entry, gint narg);
+lua_resume_thread (struct thread_entry *thread_entry, gint narg);
/* Paths defs */
#define RSPAMD_CONFDIR_INDEX "CONFDIR"
diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c
index 6957ded7c..bf7c68ac5 100644
--- a/src/lua/lua_config.c
+++ b/src/lua/lua_config.c
@@ -1145,7 +1145,7 @@ lua_watcher_callback (gpointer session_data, gpointer ud)
rspamd_session_get_watcher (task->s));
}
else {
- res = lua_tonumber (L, level + 1);
+ res = (gint)lua_tonumber (L, level + 1);
}
if (res) {
@@ -1192,18 +1192,9 @@ lua_watcher_callback (gpointer session_data, gpointer ud)
lua_settop (L, err_idx - 1);
}
-gint
-lua_do_resume (lua_State *L, gint narg)
-{
-#if LUA_VERSION_NUM < 503
- return lua_resume (L, narg);
-#else
- return lua_resume (L, NULL, narg);
-#endif
-}
+static void lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret);
-static void
-lua_metric_symbol_callback_return (struct rspamd_task *task, struct thread_entry *thread_entry, gpointer ud, gint ret);
+static void lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg);
static void
lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
@@ -1211,7 +1202,6 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
struct lua_callback_data *cd = ud;
struct rspamd_task **ptask;
struct thread_entry *thread_entry;
- gint ret;
thread_entry = lua_thread_pool_get (task->cfg->lua_thread_pool);
@@ -1219,7 +1209,7 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
thread_entry->cd = cd;
lua_State *thread = thread_entry->lua_state;
- cd->stack_level = lua_gettop (cd->L);
+ cd->stack_level = lua_gettop (thread);
if (cd->cb_is_ref) {
lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref);
@@ -1232,16 +1222,11 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
rspamd_lua_setclass (thread, "rspamd{task}", -1);
*ptask = task;
- ret = lua_do_resume (thread, 1);
+ thread_entry->finish_callback = lua_metric_symbol_callback_return;
+ thread_entry->error_callback = lua_metric_symbol_callback_error;
+ thread_entry->task = task;
- if (ret != LUA_YIELD) {
- /*
- LUA_YIELD state should not be handled here.
- It should only happen when the thread initiated a asynchronous event and it will be restored as soon
- the event is finished
- */
- lua_metric_symbol_callback_return (task, thread_entry, ud, ret);
- }
+ lua_thread_call (thread_entry, 1);
}
gint
@@ -1252,132 +1237,105 @@ lua_yield_thread (struct thread_entry *thread_entry, gint nresults)
return lua_yield (thread_entry->lua_state, nresults);
}
-void
-lua_resume_thread (struct rspamd_task *task, struct thread_entry *thread_entry, gint narg)
+static void
+lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg)
{
- g_assert (thread_entry->cd != NULL);
-
- /*
- * The only state where we can resume from is LUA_YIELD
- * Another acceptable status is OK (0) but in that case we should push function on stack
- * to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function.
- */
- g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD);
-
- gint ret;
-
- lua_thread_pool_set_running_entry (task->cfg->lua_thread_pool, thread_entry);
- ret = lua_do_resume (thread_entry->lua_state, narg);
-
- if (ret != LUA_YIELD) {
- lua_metric_symbol_callback_return (task, thread_entry, thread_entry->cd, ret);
- }
+ struct lua_callback_data *cd = thread_entry->cd;
+ struct rspamd_task *task = thread_entry->task;
+ msg_err_task ("call to (%s) failed (%d): %s", cd->symbol, ret, msg);
}
static void
-lua_metric_symbol_callback_return (struct rspamd_task *task, struct thread_entry *thread_entry, gpointer ud, gint ret)
+lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret)
{
- GString *tb;
- struct lua_callback_data *cd = ud;
+ struct lua_callback_data *cd = thread_entry->cd;
+ struct rspamd_task *task = thread_entry->task;
int nresults;
struct rspamd_symbol_result *s;
- lua_State *thread = thread_entry->lua_state;
- if (ret != 0) {
+ (void)ret;
- tb = rspamd_lua_get_traceback_string (thread);
- msg_err_task ("call to (%s) failed (%d): %v", cd->symbol, ret, tb);
+ lua_State *L = thread_entry->lua_state;
- if (tb) {
- g_string_free (tb, TRUE);
- }
- g_assert (lua_gettop (thread) >= cd->stack_level);
- /* maybe there is a way to recover here. For now, just remove faulty thread */
- lua_thread_pool_terminate_entry (task->cfg->lua_thread_pool, thread_entry);
- }
- else {
- nresults = lua_gettop (thread) - cd->stack_level;
+ nresults = lua_gettop (L) - cd->stack_level;
- if (nresults >= 1) {
- /* Function returned boolean, so maybe we need to insert result? */
- gint res = 0;
- gint i;
- gdouble flag = 1.0;
- gint type;
- struct lua_watcher_data *wd;
+ if (nresults >= 1) {
+ /* Function returned boolean, so maybe we need to insert result? */
+ gint res = 0;
+ gint i;
+ gdouble flag = 1.0;
+ gint type;
+ struct lua_watcher_data *wd;
- type = lua_type (thread, cd->stack_level + 1);
+ type = lua_type (L, cd->stack_level + 1);
- if (type == LUA_TBOOLEAN) {
- res = lua_toboolean (thread, cd->stack_level + 1);
- }
- else if (type == LUA_TFUNCTION) {
- /* Function returned a closure that should be watched for */
- wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd));
- lua_pushvalue (thread /*cd->L*/, cd->stack_level + 1);
- wd->cb_ref = luaL_ref (thread, LUA_REGISTRYINDEX);
- wd->cbd = cd;
- rspamd_session_watcher_push_callback (task->s,
- rspamd_session_get_watcher (task->s),
- lua_watcher_callback, wd);
- /*
- * We immediately pop watcher since we have not registered
- * any async events from here
- */
- rspamd_session_watcher_pop (task->s,
- rspamd_session_get_watcher (task->s));
+ if (type == LUA_TBOOLEAN) {
+ res = lua_toboolean (L, cd->stack_level + 1);
+ }
+ else if (type == LUA_TFUNCTION) {
+ /* Function returned a closure that should be watched for */
+ wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd));
+ lua_pushvalue (L /*cd->L*/, cd->stack_level + 1);
+ wd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX);
+ wd->cbd = cd;
+ rspamd_session_watcher_push_callback (task->s,
+ rspamd_session_get_watcher (task->s),
+ lua_watcher_callback, wd);
+ /*
+ * We immediately pop watcher since we have not registered
+ * any async events from here
+ */
+ rspamd_session_watcher_pop (task->s,
+ rspamd_session_get_watcher (task->s));
+ }
+ else {
+ res = lua_tonumber (L, cd->stack_level + 1);
+ }
+
+ if (res) {
+ gint first_opt = 2;
+
+ if (lua_type (L, cd->stack_level + 2) == LUA_TNUMBER) {
+ flag = lua_tonumber (L, cd->stack_level + 2);
+ /* Shift opt index */
+ first_opt = 3;
}
else {
- res = lua_tonumber (thread, cd->stack_level + 1);
+ flag = res;
}
- if (res) {
- gint first_opt = 2;
+ s = rspamd_task_insert_result (task, cd->symbol, flag, NULL);
- if (lua_type (thread, cd->stack_level + 2) == LUA_TNUMBER) {
- flag = lua_tonumber (thread, cd->stack_level + 2);
- /* Shift opt index */
- first_opt = 3;
- }
- else {
- flag = res;
- }
+ if (s) {
+ guint last_pos = lua_gettop (L);
- s = rspamd_task_insert_result (task, cd->symbol, flag, NULL);
+ for (i = cd->stack_level + first_opt; i <= last_pos; i++) {
+ if (lua_type (L, i) == LUA_TSTRING) {
+ const char *opt = lua_tostring (L, i);
- if (s) {
- guint last_pos = lua_gettop (thread);
+ rspamd_task_add_result_option (task, s, opt);
+ }
+ else if (lua_type (L, i) == LUA_TTABLE) {
+ lua_pushvalue (L, i);
- for (i = cd->stack_level + first_opt; i <= last_pos; i++) {
- if (lua_type (thread, i) == LUA_TSTRING) {
- const char *opt = lua_tostring (thread, i);
+ for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) {
+ const char *opt = lua_tostring (L, -1);
rspamd_task_add_result_option (task, s, opt);
}
- else if (lua_type (thread, i) == LUA_TTABLE) {
- lua_pushvalue (thread, i);
- for (lua_pushnil (thread); lua_next (thread, -2); lua_pop (thread, 1)) {
- const char *opt = lua_tostring (thread, -1);
-
- rspamd_task_add_result_option (task, s, opt);
- }
-
- lua_pop (thread, 1);
- }
+ lua_pop (L, 1);
}
}
-
}
- lua_pop (thread, nresults);
}
- g_assert (lua_gettop (thread) == cd->stack_level); /* we properly cleaned up the stack */
-
- lua_thread_pool_return (task->cfg->lua_thread_pool, thread_entry);
+ lua_pop (L, nresults);
}
+ g_assert (lua_gettop (L) == cd->stack_level); /* we properly cleaned up the stack */
+
cd->stack_level = 0;
}
diff --git a/src/lua/lua_dns.c b/src/lua/lua_dns.c
index 0d12a137c..805921a03 100644
--- a/src/lua/lua_dns.c
+++ b/src/lua/lua_dns.c
@@ -147,7 +147,7 @@ lua_dns_callback (struct rdns_reply *reply, void *arg)
lua_pushvalue (L, -3);
}
- lua_resume_thread (cbdata->task, cbdata->thread, 2);
+ lua_resume_thread (cbdata->thread, 2);
if (cbdata->s) {
rspamd_session_watcher_pop (cbdata->s, cbdata->w);
diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c
index 64617be9b..3b0030506 100644
--- a/src/lua/lua_http.c
+++ b/src/lua/lua_http.c
@@ -75,6 +75,7 @@ struct lua_http_cbdata {
gint flags;
gint fd;
gint cbref;
+ struct thread_entry *thread;
};
static const int default_http_timeout = 5000;
@@ -96,7 +97,10 @@ lua_http_fin (gpointer arg)
{
struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)arg;
- luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref);
+ if (cbd->cbref != -1) {
+ luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref);
+ }
+
if (cbd->conn) {
/* Here we already have a connection, so we need to unref it */
rspamd_http_connection_unref (cbd->conn);
@@ -170,12 +174,19 @@ lua_http_push_error (struct lua_http_cbdata *cbd, const char *err)
lua_thread_pool_restore_callback (&lcbd);
}
+static void lua_http_resume_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg, const char *err);
+
static void
lua_http_error_handler (struct rspamd_http_connection *conn, GError *err)
{
struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud;
-
- lua_http_push_error (cbd, err->message);
+ if (cbd->cbref == -1) {
+ lua_http_resume_handler (conn, NULL, err->message);
+ }
+ else {
+ lua_http_push_error (cbd, err->message);
+ }
lua_http_maybe_free (cbd);
}
@@ -191,6 +202,11 @@ lua_http_finish_handler (struct rspamd_http_connection *conn,
struct lua_callback_state lcbd;
lua_State *L;
+ if (cbd->cbref == -1) {
+ lua_http_resume_handler (conn, msg, NULL);
+ lua_http_maybe_free (cbd);
+ return 0;
+ }
lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd);
L = lcbd.L;
@@ -245,6 +261,84 @@ lua_http_finish_handler (struct rspamd_http_connection *conn,
return 0;
}
+/*
+ * resumes yielded thread
+ */
+static void
+lua_http_resume_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg, const char *err)
+{
+ struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud;
+ lua_State *L = cbd->thread->lua_state;
+ const gchar *body;
+ gsize body_len;
+ struct rspamd_http_header *h, *htmp;
+
+ msg_info ("T=%p, L=%p, status=%d, err=%s", cbd->thread, cbd->thread->lua_state, lua_status (cbd->thread->lua_state), err);
+ if (err) {
+ lua_pushstring (L, err);
+ lua_pushnil (L);
+ }
+ else {
+ /*
+ * 1 - nil (error)
+ * 2 - table:
+ * code (int)
+ * content (string)
+ * headers (table: header -> value)
+ */
+ lua_pushnil (L); // error code
+
+ lua_createtable (L, 0, 3);
+
+ /* code */
+ lua_pushliteral (L, "code");
+ lua_pushinteger (L, msg->code);
+ lua_settable (L, -3);
+
+ /* content */
+ lua_pushliteral (L, "content");
+
+ body = rspamd_http_message_get_body (msg, &body_len);
+ if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_TEXT) {
+ struct rspamd_lua_text *t;
+
+ t = lua_newuserdata (L, sizeof (*t));
+ rspamd_lua_setclass (L, "rspamd{text}", -1);
+ t->start = body;
+ t->len = body_len;
+ t->flags = 0;
+ }
+ else {
+ if (body_len > 0) {
+ lua_pushlstring (L, body, body_len);
+ }
+ else {
+ lua_pushnil (L);
+ }
+ }
+ lua_settable (L, -3);
+
+ /* headers */
+ lua_pushliteral (L, "headers");
+ lua_newtable (L);
+
+ HASH_ITER (hh, msg->headers, h, htmp) {
+ /*
+ * Lowercase header name, as Lua cannot search in caseless matter
+ */
+ rspamd_str_lc (h->combined->str, h->name.len);
+ lua_pushlstring (L, h->name.begin, h->name.len);
+ lua_pushlstring (L, h->value.begin, h->value.len);
+ lua_settable (L, -3);
+ }
+
+ lua_settable (L, -3);
+ }
+
+ lua_resume_thread (cbd->thread, 2);
+}
+
static gboolean
lua_http_make_connection (struct lua_http_cbdata *cbd)
{
@@ -404,7 +498,7 @@ lua_http_request (lua_State *L)
const gchar *url, *lua_body;
rspamd_fstring_t *body = NULL;
gchar *to_resolve;
- gint cbref;
+ gint cbref = -1;
gsize bodylen;
gdouble timeout = default_http_timeout;
gint flags = 0;
@@ -464,11 +558,9 @@ lua_http_request (lua_State *L)
lua_gettable (L, 1);
if (url == NULL || lua_type (L, -1) != LUA_TFUNCTION) {
lua_pop (L, 1);
- msg_err ("http request has bad params");
- lua_pushboolean (L, FALSE);
- return 1;
+ } else {
+ cbref = luaL_ref (L, LUA_REGISTRYINDEX);
}
- cbref = luaL_ref (L, LUA_REGISTRYINDEX);
lua_pushstring (L, "task");
lua_gettable (L, 1);
@@ -802,8 +894,13 @@ lua_http_request (lua_State *L)
}
}
- lua_pushboolean (L, TRUE);
- return 1;
+ if (cbd->cbref == -1) {
+ cbd->thread = lua_thread_pool_get_running_entry (cfg->lua_thread_pool);
+ return lua_yield_thread (cbd->thread, 0);
+ } else {
+ lua_pushboolean (L, TRUE);
+ return 1;
+ }
}
static gint
diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c
index 979b31f6b..4c1681f96 100644
--- a/src/lua/lua_thread_pool.c
+++ b/src/lua/lua_thread_pool.c
@@ -91,6 +91,11 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa
if (g_queue_get_length (pool->available_items) <= pool->max_items) {
thread_entry->cd = NULL;
+ thread_entry->finish_callback = NULL;
+ thread_entry->error_callback = NULL;
+ thread_entry->task = NULL;
+ thread_entry->cfg = NULL;
+
g_queue_push_head (pool->available_items, thread_entry);
}
else {
@@ -98,7 +103,7 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa
}
}
-void
+static void
lua_thread_pool_terminate_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry)
{
struct thread_entry *ent = NULL;
@@ -146,3 +151,88 @@ lua_thread_pool_restore_callback (struct lua_callback_state *cbs)
lua_thread_pool_return (cbs->thread_pool, cbs->my_thread);
lua_thread_pool_set_running_entry (cbs->thread_pool, cbs->previous_thread);
}
+
+static gint
+lua_do_resume (lua_State *L, gint narg)
+{
+#if LUA_VERSION_NUM < 503
+ return lua_resume (L, narg);
+#else
+ return lua_resume (L, NULL, narg);
+#endif
+}
+
+static void lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg);
+
+void
+lua_thread_call (struct thread_entry *thread_entry, int narg)
+{
+ g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't call running/yielded thread */
+ g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call without pool */
+
+ lua_resume_thread_internal (thread_entry, narg);
+}
+
+void
+lua_resume_thread (struct thread_entry *thread_entry, gint narg)
+{
+ /*
+ * The only state where we can resume from is LUA_YIELD
+ * Another acceptable status is OK (0) but in that case we should push function on stack
+ * to start the thread from, which is happening in lua_thread_call(), not in this function.
+ */
+ g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD);
+
+ lua_resume_thread_internal (thread_entry, narg);
+}
+
+static void
+lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg)
+{
+ gint ret;
+ struct lua_thread_pool *pool;
+ GString *tb;
+ struct rspamd_task *task;
+
+ ret = lua_do_resume (thread_entry->lua_state, narg);
+
+ if (ret != LUA_YIELD) {
+ /*
+ LUA_YIELD state should not be handled here.
+ It should only happen when the thread initiated a asynchronous event and it will be restored as soon
+ the event is finished
+ */
+
+ if (thread_entry->task) {
+ pool = thread_entry->task->cfg->lua_thread_pool;
+ }
+ else {
+ pool = thread_entry->cfg->lua_thread_pool;
+ }
+ if (ret == 0) {
+ if (thread_entry->finish_callback) {
+ thread_entry->finish_callback (thread_entry, ret);
+ }
+ lua_thread_pool_return (pool, thread_entry);
+ }
+ else {
+ tb = rspamd_lua_get_traceback_string (thread_entry->lua_state);
+ if (thread_entry->error_callback) {
+ thread_entry->error_callback (thread_entry, ret, tb->str);
+ }
+ else if (thread_entry->task) {
+ task = thread_entry->task;
+ msg_err_task ("lua call failed (%d): %v", ret, tb);
+ }
+ else {
+ msg_err ("lua call failed (%d): %v", ret, tb);
+ }
+
+ if (tb) {
+ g_string_free (tb, TRUE);
+ }
+ /* maybe there is a way to recover here. For now, just remove faulty thread */
+ lua_thread_pool_terminate_entry (pool, thread_entry);
+ }
+ }
+} \ No newline at end of file
diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h
index b72b72e8d..c77f77455 100644
--- a/src/lua/lua_thread_pool.h
+++ b/src/lua/lua_thread_pool.h
@@ -3,13 +3,25 @@
#include <lua.h>
+struct thread_entry;
+struct lua_thread_pool;
+
+typedef void (*lua_thread_finish_t) (struct thread_entry *thread, int ret);
+typedef void (*lua_thread_error_t) (struct thread_entry *thread, int ret, const char *msg);
+
struct thread_entry {
lua_State *lua_state;
gint thread_index;
gpointer cd;
-};
-struct thread_pool;
+ /* function to handle result of called method, can be NULL */
+ lua_thread_finish_t finish_callback;
+
+ /* function to log result, i.e. if you want to modify error logging message or somehow process this state, can be NUL */
+ lua_thread_error_t error_callback;
+ struct rspamd_task *task;
+ struct rspamd_config *cfg;
+};
struct lua_callback_state {
lua_State *L;
@@ -59,15 +71,6 @@ void
lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry);
/**
- * Removes thread from Lua state. It should be done to dead (which ended with an error) threads only
- *
- * @param pool
- * @param thread_entry
- */
-void
-lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry);
-
-/**
* Currently running thread. Typically needed in yielding point - to fill-up continuation.
*
* @param pool
@@ -102,5 +105,16 @@ lua_thread_pool_prepare_callback (struct lua_thread_pool *pool, struct lua_callb
void
lua_thread_pool_restore_callback (struct lua_callback_state *cbs);
+
+/**
+ * Acts like lua_call but the tread is able to suspend execution.
+ * As soon as the call is over, call either thread_entry::finish_callback or thread_entry::error_callback.
+ *
+ * @param thread_entry
+ * @param narg
+ */
+void
+lua_thread_call (struct thread_entry *thread_entry, int narg);
+
#endif /* LUA_THREAD_POOL_H_ */
diff --git a/test/functional/cases/210_clickhouse/001_migration.robot b/test/functional/cases/210_clickhouse/001_migration.robot
index 81cfa9dca..0d8730ba1 100644
--- a/test/functional/cases/210_clickhouse/001_migration.robot
+++ b/test/functional/cases/210_clickhouse/001_migration.robot
@@ -24,7 +24,7 @@ Migration
Prepare rspamd
- Sleep 1 #TODO: replace this check with waiting until migration finishes
+ Sleep 2 #TODO: replace this check with waiting until migration finishes
Column should exist rspamd Symbols.Scores
Column should exist rspamd Attachments.Digest
diff --git a/test/functional/cases/220_http.robot b/test/functional/cases/220_http.robot
new file mode 100644
index 000000000..a8f47faa8
--- /dev/null
+++ b/test/functional/cases/220_http.robot
@@ -0,0 +1,65 @@
+*** Settings ***
+Test Setup Http Setup
+Test Teardown Http Teardown
+Library Process
+Library ${TESTDIR}/lib/rspamd.py
+Resource ${TESTDIR}/lib/rspamd.robot
+Variables ${TESTDIR}/lib/vars.py
+
+*** Variables ***
+# ${CONFIG} ${TESTDIR}/configs/http.conf
+${URL_TLD} ${TESTDIR}/../lua/unit/test_tld.dat
+${CONFIG} ${TESTDIR}/configs/lua_test.conf
+${MESSAGE} ${TESTDIR}/messages/spam_message.eml
+${REDIS_SCOPE} Suite
+${RSPAMD_SCOPE} Suite
+
+*** Test Cases ***
+Simple HTTP request
+ Check url /request get HTTP_DNS_200 HTTP_200 HTTP_CORO_DNS_200 HTTP_CORO_200 method_get hello world HTTP_CORO_200 (0.00)[hello world]
+ Check url /request post HTTP_DNS_200 HTTP_200 HTTP_CORO_DNS_200 HTTP_CORO_200 method_post hello post HTTP_CORO_DNS_200 (0.00)[hello post]
+
+*** Test Cases ***
+HTTP request 403
+ Check url /error_403 get HTTP_DNS_403 HTTP_403 HTTP_CORO_DNS_403 HTTP_CORO_403 method_get
+ Check url /error_403 post HTTP_DNS_403 HTTP_403 HTTP_CORO_DNS_403 HTTP_CORO_403 method_post
+
+
+*** Test Cases ***
+HTTP timeout
+ Check url /timeout get HTTP_ERROR HTTP_ERROR HTTP_CORO_DNS_ERROR HTTP_CORO_ERROR method_get IO timeout
+ Check url /timeout post HTTP_DNS_ERROR HTTP_ERROR HTTP_CORO_DNS_ERROR HTTP_CORO_ERROR method_post IO timeout
+
+
+*** Test Cases ***
+HTTP empty response
+ Check url /empty get HTTP_ERROR HTTP_ERROR HTTP_CORO_DNS_ERROR HTTP_CORO_ERROR method_get IO read error: unexpected EOF
+ Check url /empty post HTTP_DNS_ERROR HTTP_ERROR HTTP_CORO_DNS_ERROR HTTP_CORO_ERROR method_post IO read error: unexpected EOF
+
+
+*** Keywords ***
+Lua Setup
+ [Arguments] ${LUA_SCRIPT}
+ Set Global Variable ${LUA_SCRIPT}
+ Generic Setup
+
+Http Setup
+ Run Dummy Http
+ Lua Setup ${TESTDIR}/lua/http.lua
+
+Http Teardown
+ ${http_pid} = Get File /tmp/dummy_http.pid
+ Shutdown Process With Children ${http_pid}
+ Normal Teardown
+
+Run Dummy Http
+ [Arguments]
+ ${result} = Start Process ${TESTDIR}/util/dummy_http.py
+ Wait Until Created /tmp/dummy_http.pid
+
+
+Check url
+ [Arguments] ${url} ${method} @{expect_results}
+ ${result} = Scan Message With Rspamc --header=url:${url} --header=method:${method} ${MESSAGE}
+ : FOR ${expect} IN @{expect_results}
+ \ Check Rspamc ${result} ${expect} \ No newline at end of file
diff --git a/test/functional/configs/lua_test.conf b/test/functional/configs/lua_test.conf
index f01f07cad..af84a1578 100644
--- a/test/functional/configs/lua_test.conf
+++ b/test/functional/configs/lua_test.conf
@@ -10,6 +10,10 @@ options = {
name = "example.com",
type = "a";
replies = ["93.184.216.34"];
+ }, {
+ name = "site.resolveme",
+ type = "a";
+ replies = ["127.0.0.1"];
}]
}
}
diff --git a/test/functional/configs/plugins.conf b/test/functional/configs/plugins.conf
index eab954333..aaf27b61e 100644
--- a/test/functional/configs/plugins.conf
+++ b/test/functional/configs/plugins.conf
@@ -421,6 +421,11 @@ options = {
name = "rspamd.tk",
type = "txt";
replies = ["bio=a263adeab8acdcdb8b89e127b67d696061fdfbee"];
+ },
+ {
+ name = "fail1.org.org.za",
+ type = "txt";
+ replies = ["v=spf1 redirect=www.dnssec-failed.org"];
}];
}
}
diff --git a/test/functional/lib/rspamd.py b/test/functional/lib/rspamd.py
index 2010f4127..97ab516c1 100644
--- a/test/functional/lib/rspamd.py
+++ b/test/functional/lib/rspamd.py
@@ -8,11 +8,12 @@ import re
import shutil
import signal
import socket
-import string
+import errno
import sys
import tempfile
import time
from robot.libraries.BuiltIn import BuiltIn
+from robot.api import logger
if sys.version_info > (3,):
long = int
@@ -167,32 +168,47 @@ def update_dictionary(a, b):
a.update(b)
return a
-def shutdown_process(pid):
+def shutdown_process(process):
i = 0
while i < 100:
try:
- os.kill(pid, signal.SIGTERM)
+ os.kill(process.pid, signal.SIGTERM)
except OSError as e:
- assert e.errno == 3
+ assert e.errno == errno.ESRCH
return
+
+ if process.status() == psutil.STATUS_ZOMBIE:
+ return
+
i += 1
time.sleep(0.1)
+
while i < 200:
try:
- os.kill(pid, signal.SIGKILL)
+ os.kill(process.pid, signal.SIGKILL)
except OSError as e:
- assert e.errno == 3
+ assert e.errno == errno.ESRCH
return
+
+ if process.status() == psutil.STATUS_ZOMBIE:
+ return
+
i += 1
time.sleep(0.1)
- assert False, "Failed to shutdown process %s" % pid
+ assert False, "Failed to shutdown process %d (%s)" % (process.pid, process.name())
+
def shutdown_process_with_children(pid):
pid = int(pid)
- children = psutil.Process(pid=pid).children(recursive=False)
- shutdown_process(pid)
+ try:
+ process = psutil.Process(pid=pid)
+ except psutil.NoSuchProcess:
+ return
+ children = process.children(recursive=False)
+ shutdown_process(process)
for child in children:
try:
- shutdown_process(child.pid)
+ shutdown_process(child)
except:
pass
+
diff --git a/test/functional/lua/http.lua b/test/functional/lua/http.lua
new file mode 100644
index 000000000..03c4ca6fc
--- /dev/null
+++ b/test/functional/lua/http.lua
@@ -0,0 +1,78 @@
+local rspamd_http = require "rspamd_http"
+local rspamd_logger = require "rspamd_logger"
+
+local function http_symbol(task)
+
+ local url = tostring(task:get_request_header('url'))
+ local method = tostring(task:get_request_header('method'))
+
+ task:insert_result('method_' .. method, 1.0)
+
+ local function http_callback(err, code, body)
+ if err then
+ rspamd_logger.errx('http_callback error: ' .. err)
+ task:insert_result('HTTP_ERROR', 1.0, err)
+ else
+ task:insert_result('HTTP_' .. code, 1.0, body)
+ end
+ end
+
+ local function http_dns_callback(err, code, body)
+ if err then
+ rspamd_logger.errx('http_dns_callback error: ' .. err)
+ task:insert_result('HTTP_DNS_ERROR', 1.0, err)
+ else
+ task:insert_result('HTTP_DNS_' .. code, 1.0, body)
+ end
+ end
+
+ rspamd_http.request({
+ url = 'http://127.0.0.1:18080' .. url,
+ task = task,
+ method = method,
+ callback = http_callback,
+ timeout = 1,
+ })
+
+ --[[ request to this address involved DNS resolver subsystem ]]
+ rspamd_http.request({
+ url = 'http://site.resolveme:18080' .. url,
+ task = task,
+ method = method,
+ callback = http_dns_callback,
+ timeout = 1,
+ })
+
+ local err, response = rspamd_http.request({
+ url = 'http://127.0.0.1:18080' .. url,
+ task = task,
+ method = method,
+ timeout = 1,
+ })
+
+ if not err then
+ task:insert_result('HTTP_CORO_' .. response.code, 1.0, response.content)
+ else
+ task:insert_result('HTTP_CORO_ERROR', 1.0, err)
+ end
+
+ err, response = rspamd_http.request({
+ url = 'http://site.resolveme:18080' .. url,
+ task = task,
+ method = method,
+ timeout = 1,
+ })
+
+ if not err then
+ task:insert_result('HTTP_CORO_DNS_' .. response.code, 1.0, response.content)
+ else
+ task:insert_result('HTTP_CORO_DNS_ERROR', 1.0, err)
+ end
+end
+
+rspamd_config:register_symbol({
+ name = 'SIMPLE_TEST',
+ score = 1.0,
+ callback = http_symbol,
+ no_squeeze = true
+})
diff --git a/test/functional/util/dummy_fprot.py b/test/functional/util/dummy_fprot.py
index c35f0628f..34725280b 100755
--- a/test/functional/util/dummy_fprot.py
+++ b/test/functional/util/dummy_fprot.py
@@ -1,14 +1,15 @@
#!/usr/bin/env python
-
-PID = "/tmp/dummy_fprot.pid"
-
import os
import sys
+import signal
+
+
try:
import SocketServer as socketserver
except:
import socketserver
-import signal
+
+PID = "/tmp/dummy_fprot.pid"
class MyTCPHandler(socketserver.BaseRequestHandler):
diff --git a/test/functional/util/dummy_http.py b/test/functional/util/dummy_http.py
new file mode 100755
index 000000000..4f8e67ffd
--- /dev/null
+++ b/test/functional/util/dummy_http.py
@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+
+import BaseHTTPServer
+import time
+import os
+import sys
+import signal
+
+PORT = 18080
+HOST_NAME = '127.0.0.1'
+
+PID = "/tmp/dummy_http.pid"
+
+
+class MyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+
+ def do_HEAD(self):
+ self.send_response(200)
+ self.send_header("Content-type", "text/html")
+ self.end_headers()
+
+ def do_GET(self):
+ """Respond to a GET request."""
+ if self.path == "/empty":
+ self.finish()
+ return
+
+ if self.path == "/timeout":
+ time.sleep(2)
+
+ if self.path == "/error_403":
+ self.send_response(403)
+ else:
+ self.send_response(200)
+
+ self.send_header("Content-type", "text/plain")
+ self.end_headers()
+ self.wfile.write("hello world")
+
+ def do_POST(self):
+ """Respond to a GET request."""
+ if self.path == "/empty":
+ self.finish()
+ return
+
+ if self.path == "/timeout":
+ time.sleep(2)
+
+ if self.path == "/error_403":
+ self.send_response(403)
+ else:
+ self.send_response(200)
+
+ self.send_header("Content-type", "text/plain")
+ self.end_headers()
+ self.wfile.write("hello post")
+
+
+class MyHttp(BaseHTTPServer.HTTPServer):
+ def __init__(self, server_address, RequestHandlerClass, bind_and_activate=False):
+ BaseHTTPServer.HTTPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate)
+ self.keep_running = True
+
+ def run(self):
+ self.server_bind()
+ self.server_activate()
+
+ with open(PID, 'w+') as f:
+ f.write(str(os.getpid()))
+ f.close()
+
+ while self.keep_running:
+ try:
+ self.handle_request()
+ except Exception:
+ pass
+
+ def stop(self):
+ self.keep_running = False
+ self.server_close()
+
+
+if __name__ == '__main__':
+ server_class = BaseHTTPServer.HTTPServer
+ httpd = MyHttp((HOST_NAME, PORT), MyHandler)
+ httpd.allow_reuse_address = True
+ httpd.timeout = 1
+
+ def alarm_handler(signum, frame):
+ httpd.stop()
+
+ signal.signal(signal.SIGALRM, alarm_handler)
+ signal.signal(signal.SIGTERM, alarm_handler)
+ signal.alarm(10)
+
+ try:
+ httpd.run()
+ except KeyboardInterrupt:
+ pass
+ httpd.server_close()