diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-08-23 16:21:02 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-23 16:21:02 +0100 |
commit | e94ceeaf832692de3f3f212701c7a6dd74cd98d6 (patch) | |
tree | 6374c9c174798018dad81bdd999ed8cf57a98b07 | |
parent | f4d26bd0a17cb6e8c6839f4b8cda65257911a93e (diff) | |
parent | a4b08ac566aa631f4421114e4131b0098139a009 (diff) | |
download | rspamd-e94ceeaf832692de3f3f212701c7a6dd74cd98d6.tar.gz rspamd-e94ceeaf832692de3f3f212701c7a6dd74cd98d6.zip |
Merge pull request #2438 from negram/add-coroutine-to-http
Add coroutine to http
-rw-r--r-- | .circleci/config.yml | 2 | ||||
-rw-r--r-- | src/lua/lua_common.c | 2 | ||||
-rw-r--r-- | src/lua/lua_common.h | 2 | ||||
-rw-r--r-- | src/lua/lua_config.c | 190 | ||||
-rw-r--r-- | src/lua/lua_dns.c | 2 | ||||
-rw-r--r-- | src/lua/lua_http.c | 117 | ||||
-rw-r--r-- | src/lua/lua_thread_pool.c | 92 | ||||
-rw-r--r-- | src/lua/lua_thread_pool.h | 36 | ||||
-rw-r--r-- | test/functional/cases/210_clickhouse/001_migration.robot | 2 | ||||
-rw-r--r-- | test/functional/cases/220_http.robot | 65 | ||||
-rw-r--r-- | test/functional/configs/lua_test.conf | 4 | ||||
-rw-r--r-- | test/functional/configs/plugins.conf | 5 | ||||
-rw-r--r-- | test/functional/lib/rspamd.py | 36 | ||||
-rw-r--r-- | test/functional/lua/http.lua | 78 | ||||
-rwxr-xr-x | test/functional/util/dummy_fprot.py | 9 | ||||
-rwxr-xr-x | test/functional/util/dummy_http.py | 100 |
16 files changed, 585 insertions, 157 deletions
diff --git a/.circleci/config.yml b/.circleci/config.yml index 1df9ddc7f..3755257ba 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -93,7 +93,7 @@ jobs: - run: sudo apt-get install -qq libluajit-5.1-dev libpcre3-dev luarocks opendkim-tools python-pip redis-server - run: sudo apt-get install clickhouse-server - - run: sudo pip install demjson psutil robotframework requests + - run: sudo pip install demjson psutil robotframework requests http - run: sudo luarocks install luacheck - run: cd ../build diff --git a/src/lua/lua_common.c b/src/lua/lua_common.c index f446ec9e8..4f3c8046d 100644 --- a/src/lua/lua_common.c +++ b/src/lua/lua_common.c @@ -1507,7 +1507,7 @@ GString * rspamd_lua_get_traceback_string (lua_State *L) { GString *tb; - const gchar *msg = lua_tostring (L, 1); + const gchar *msg = lua_tostring (L, -1); tb = g_string_sized_new (100); g_string_append_printf (tb, "%s; trace:", msg); diff --git a/src/lua/lua_common.h b/src/lua/lua_common.h index 0a9527bb0..b84701bc1 100644 --- a/src/lua/lua_common.h +++ b/src/lua/lua_common.h @@ -434,7 +434,7 @@ lua_yield_thread (struct thread_entry *thread_entry, gint nresults); * @param narg */ void -lua_resume_thread (struct rspamd_task *task, struct thread_entry *thread_entry, gint narg); +lua_resume_thread (struct thread_entry *thread_entry, gint narg); /* Paths defs */ #define RSPAMD_CONFDIR_INDEX "CONFDIR" diff --git a/src/lua/lua_config.c b/src/lua/lua_config.c index 6957ded7c..bf7c68ac5 100644 --- a/src/lua/lua_config.c +++ b/src/lua/lua_config.c @@ -1145,7 +1145,7 @@ lua_watcher_callback (gpointer session_data, gpointer ud) rspamd_session_get_watcher (task->s)); } else { - res = lua_tonumber (L, level + 1); + res = (gint)lua_tonumber (L, level + 1); } if (res) { @@ -1192,18 +1192,9 @@ lua_watcher_callback (gpointer session_data, gpointer ud) lua_settop (L, err_idx - 1); } -gint -lua_do_resume (lua_State *L, gint narg) -{ -#if LUA_VERSION_NUM < 503 - return lua_resume (L, narg); -#else - return lua_resume (L, NULL, narg); -#endif -} +static void lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret); -static void -lua_metric_symbol_callback_return (struct rspamd_task *task, struct thread_entry *thread_entry, gpointer ud, gint ret); +static void lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg); static void lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) @@ -1211,7 +1202,6 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) struct lua_callback_data *cd = ud; struct rspamd_task **ptask; struct thread_entry *thread_entry; - gint ret; thread_entry = lua_thread_pool_get (task->cfg->lua_thread_pool); @@ -1219,7 +1209,7 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) thread_entry->cd = cd; lua_State *thread = thread_entry->lua_state; - cd->stack_level = lua_gettop (cd->L); + cd->stack_level = lua_gettop (thread); if (cd->cb_is_ref) { lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref); @@ -1232,16 +1222,11 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud) rspamd_lua_setclass (thread, "rspamd{task}", -1); *ptask = task; - ret = lua_do_resume (thread, 1); + thread_entry->finish_callback = lua_metric_symbol_callback_return; + thread_entry->error_callback = lua_metric_symbol_callback_error; + thread_entry->task = task; - if (ret != LUA_YIELD) { - /* - LUA_YIELD state should not be handled here. - It should only happen when the thread initiated a asynchronous event and it will be restored as soon - the event is finished - */ - lua_metric_symbol_callback_return (task, thread_entry, ud, ret); - } + lua_thread_call (thread_entry, 1); } gint @@ -1252,132 +1237,105 @@ lua_yield_thread (struct thread_entry *thread_entry, gint nresults) return lua_yield (thread_entry->lua_state, nresults); } -void -lua_resume_thread (struct rspamd_task *task, struct thread_entry *thread_entry, gint narg) +static void +lua_metric_symbol_callback_error (struct thread_entry *thread_entry, int ret, const char *msg) { - g_assert (thread_entry->cd != NULL); - - /* - * The only state where we can resume from is LUA_YIELD - * Another acceptable status is OK (0) but in that case we should push function on stack - * to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function. - */ - g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD); - - gint ret; - - lua_thread_pool_set_running_entry (task->cfg->lua_thread_pool, thread_entry); - ret = lua_do_resume (thread_entry->lua_state, narg); - - if (ret != LUA_YIELD) { - lua_metric_symbol_callback_return (task, thread_entry, thread_entry->cd, ret); - } + struct lua_callback_data *cd = thread_entry->cd; + struct rspamd_task *task = thread_entry->task; + msg_err_task ("call to (%s) failed (%d): %s", cd->symbol, ret, msg); } static void -lua_metric_symbol_callback_return (struct rspamd_task *task, struct thread_entry *thread_entry, gpointer ud, gint ret) +lua_metric_symbol_callback_return (struct thread_entry *thread_entry, int ret) { - GString *tb; - struct lua_callback_data *cd = ud; + struct lua_callback_data *cd = thread_entry->cd; + struct rspamd_task *task = thread_entry->task; int nresults; struct rspamd_symbol_result *s; - lua_State *thread = thread_entry->lua_state; - if (ret != 0) { + (void)ret; - tb = rspamd_lua_get_traceback_string (thread); - msg_err_task ("call to (%s) failed (%d): %v", cd->symbol, ret, tb); + lua_State *L = thread_entry->lua_state; - if (tb) { - g_string_free (tb, TRUE); - } - g_assert (lua_gettop (thread) >= cd->stack_level); - /* maybe there is a way to recover here. For now, just remove faulty thread */ - lua_thread_pool_terminate_entry (task->cfg->lua_thread_pool, thread_entry); - } - else { - nresults = lua_gettop (thread) - cd->stack_level; + nresults = lua_gettop (L) - cd->stack_level; - if (nresults >= 1) { - /* Function returned boolean, so maybe we need to insert result? */ - gint res = 0; - gint i; - gdouble flag = 1.0; - gint type; - struct lua_watcher_data *wd; + if (nresults >= 1) { + /* Function returned boolean, so maybe we need to insert result? */ + gint res = 0; + gint i; + gdouble flag = 1.0; + gint type; + struct lua_watcher_data *wd; - type = lua_type (thread, cd->stack_level + 1); + type = lua_type (L, cd->stack_level + 1); - if (type == LUA_TBOOLEAN) { - res = lua_toboolean (thread, cd->stack_level + 1); - } - else if (type == LUA_TFUNCTION) { - /* Function returned a closure that should be watched for */ - wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd)); - lua_pushvalue (thread /*cd->L*/, cd->stack_level + 1); - wd->cb_ref = luaL_ref (thread, LUA_REGISTRYINDEX); - wd->cbd = cd; - rspamd_session_watcher_push_callback (task->s, - rspamd_session_get_watcher (task->s), - lua_watcher_callback, wd); - /* - * We immediately pop watcher since we have not registered - * any async events from here - */ - rspamd_session_watcher_pop (task->s, - rspamd_session_get_watcher (task->s)); + if (type == LUA_TBOOLEAN) { + res = lua_toboolean (L, cd->stack_level + 1); + } + else if (type == LUA_TFUNCTION) { + /* Function returned a closure that should be watched for */ + wd = rspamd_mempool_alloc (task->task_pool, sizeof (*wd)); + lua_pushvalue (L /*cd->L*/, cd->stack_level + 1); + wd->cb_ref = luaL_ref (L, LUA_REGISTRYINDEX); + wd->cbd = cd; + rspamd_session_watcher_push_callback (task->s, + rspamd_session_get_watcher (task->s), + lua_watcher_callback, wd); + /* + * We immediately pop watcher since we have not registered + * any async events from here + */ + rspamd_session_watcher_pop (task->s, + rspamd_session_get_watcher (task->s)); + } + else { + res = lua_tonumber (L, cd->stack_level + 1); + } + + if (res) { + gint first_opt = 2; + + if (lua_type (L, cd->stack_level + 2) == LUA_TNUMBER) { + flag = lua_tonumber (L, cd->stack_level + 2); + /* Shift opt index */ + first_opt = 3; } else { - res = lua_tonumber (thread, cd->stack_level + 1); + flag = res; } - if (res) { - gint first_opt = 2; + s = rspamd_task_insert_result (task, cd->symbol, flag, NULL); - if (lua_type (thread, cd->stack_level + 2) == LUA_TNUMBER) { - flag = lua_tonumber (thread, cd->stack_level + 2); - /* Shift opt index */ - first_opt = 3; - } - else { - flag = res; - } + if (s) { + guint last_pos = lua_gettop (L); - s = rspamd_task_insert_result (task, cd->symbol, flag, NULL); + for (i = cd->stack_level + first_opt; i <= last_pos; i++) { + if (lua_type (L, i) == LUA_TSTRING) { + const char *opt = lua_tostring (L, i); - if (s) { - guint last_pos = lua_gettop (thread); + rspamd_task_add_result_option (task, s, opt); + } + else if (lua_type (L, i) == LUA_TTABLE) { + lua_pushvalue (L, i); - for (i = cd->stack_level + first_opt; i <= last_pos; i++) { - if (lua_type (thread, i) == LUA_TSTRING) { - const char *opt = lua_tostring (thread, i); + for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) { + const char *opt = lua_tostring (L, -1); rspamd_task_add_result_option (task, s, opt); } - else if (lua_type (thread, i) == LUA_TTABLE) { - lua_pushvalue (thread, i); - for (lua_pushnil (thread); lua_next (thread, -2); lua_pop (thread, 1)) { - const char *opt = lua_tostring (thread, -1); - - rspamd_task_add_result_option (task, s, opt); - } - - lua_pop (thread, 1); - } + lua_pop (L, 1); } } - } - lua_pop (thread, nresults); } - g_assert (lua_gettop (thread) == cd->stack_level); /* we properly cleaned up the stack */ - - lua_thread_pool_return (task->cfg->lua_thread_pool, thread_entry); + lua_pop (L, nresults); } + g_assert (lua_gettop (L) == cd->stack_level); /* we properly cleaned up the stack */ + cd->stack_level = 0; } diff --git a/src/lua/lua_dns.c b/src/lua/lua_dns.c index 0d12a137c..805921a03 100644 --- a/src/lua/lua_dns.c +++ b/src/lua/lua_dns.c @@ -147,7 +147,7 @@ lua_dns_callback (struct rdns_reply *reply, void *arg) lua_pushvalue (L, -3); } - lua_resume_thread (cbdata->task, cbdata->thread, 2); + lua_resume_thread (cbdata->thread, 2); if (cbdata->s) { rspamd_session_watcher_pop (cbdata->s, cbdata->w); diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c index 64617be9b..3b0030506 100644 --- a/src/lua/lua_http.c +++ b/src/lua/lua_http.c @@ -75,6 +75,7 @@ struct lua_http_cbdata { gint flags; gint fd; gint cbref; + struct thread_entry *thread; }; static const int default_http_timeout = 5000; @@ -96,7 +97,10 @@ lua_http_fin (gpointer arg) { struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)arg; - luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref); + if (cbd->cbref != -1) { + luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref); + } + if (cbd->conn) { /* Here we already have a connection, so we need to unref it */ rspamd_http_connection_unref (cbd->conn); @@ -170,12 +174,19 @@ lua_http_push_error (struct lua_http_cbdata *cbd, const char *err) lua_thread_pool_restore_callback (&lcbd); } +static void lua_http_resume_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg, const char *err); + static void lua_http_error_handler (struct rspamd_http_connection *conn, GError *err) { struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud; - - lua_http_push_error (cbd, err->message); + if (cbd->cbref == -1) { + lua_http_resume_handler (conn, NULL, err->message); + } + else { + lua_http_push_error (cbd, err->message); + } lua_http_maybe_free (cbd); } @@ -191,6 +202,11 @@ lua_http_finish_handler (struct rspamd_http_connection *conn, struct lua_callback_state lcbd; lua_State *L; + if (cbd->cbref == -1) { + lua_http_resume_handler (conn, msg, NULL); + lua_http_maybe_free (cbd); + return 0; + } lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd); L = lcbd.L; @@ -245,6 +261,84 @@ lua_http_finish_handler (struct rspamd_http_connection *conn, return 0; } +/* + * resumes yielded thread + */ +static void +lua_http_resume_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg, const char *err) +{ + struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud; + lua_State *L = cbd->thread->lua_state; + const gchar *body; + gsize body_len; + struct rspamd_http_header *h, *htmp; + + msg_info ("T=%p, L=%p, status=%d, err=%s", cbd->thread, cbd->thread->lua_state, lua_status (cbd->thread->lua_state), err); + if (err) { + lua_pushstring (L, err); + lua_pushnil (L); + } + else { + /* + * 1 - nil (error) + * 2 - table: + * code (int) + * content (string) + * headers (table: header -> value) + */ + lua_pushnil (L); // error code + + lua_createtable (L, 0, 3); + + /* code */ + lua_pushliteral (L, "code"); + lua_pushinteger (L, msg->code); + lua_settable (L, -3); + + /* content */ + lua_pushliteral (L, "content"); + + body = rspamd_http_message_get_body (msg, &body_len); + if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_TEXT) { + struct rspamd_lua_text *t; + + t = lua_newuserdata (L, sizeof (*t)); + rspamd_lua_setclass (L, "rspamd{text}", -1); + t->start = body; + t->len = body_len; + t->flags = 0; + } + else { + if (body_len > 0) { + lua_pushlstring (L, body, body_len); + } + else { + lua_pushnil (L); + } + } + lua_settable (L, -3); + + /* headers */ + lua_pushliteral (L, "headers"); + lua_newtable (L); + + HASH_ITER (hh, msg->headers, h, htmp) { + /* + * Lowercase header name, as Lua cannot search in caseless matter + */ + rspamd_str_lc (h->combined->str, h->name.len); + lua_pushlstring (L, h->name.begin, h->name.len); + lua_pushlstring (L, h->value.begin, h->value.len); + lua_settable (L, -3); + } + + lua_settable (L, -3); + } + + lua_resume_thread (cbd->thread, 2); +} + static gboolean lua_http_make_connection (struct lua_http_cbdata *cbd) { @@ -404,7 +498,7 @@ lua_http_request (lua_State *L) const gchar *url, *lua_body; rspamd_fstring_t *body = NULL; gchar *to_resolve; - gint cbref; + gint cbref = -1; gsize bodylen; gdouble timeout = default_http_timeout; gint flags = 0; @@ -464,11 +558,9 @@ lua_http_request (lua_State *L) lua_gettable (L, 1); if (url == NULL || lua_type (L, -1) != LUA_TFUNCTION) { lua_pop (L, 1); - msg_err ("http request has bad params"); - lua_pushboolean (L, FALSE); - return 1; + } else { + cbref = luaL_ref (L, LUA_REGISTRYINDEX); } - cbref = luaL_ref (L, LUA_REGISTRYINDEX); lua_pushstring (L, "task"); lua_gettable (L, 1); @@ -802,8 +894,13 @@ lua_http_request (lua_State *L) } } - lua_pushboolean (L, TRUE); - return 1; + if (cbd->cbref == -1) { + cbd->thread = lua_thread_pool_get_running_entry (cfg->lua_thread_pool); + return lua_yield_thread (cbd->thread, 0); + } else { + lua_pushboolean (L, TRUE); + return 1; + } } static gint diff --git a/src/lua/lua_thread_pool.c b/src/lua/lua_thread_pool.c index 979b31f6b..4c1681f96 100644 --- a/src/lua/lua_thread_pool.c +++ b/src/lua/lua_thread_pool.c @@ -91,6 +91,11 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa if (g_queue_get_length (pool->available_items) <= pool->max_items) { thread_entry->cd = NULL; + thread_entry->finish_callback = NULL; + thread_entry->error_callback = NULL; + thread_entry->task = NULL; + thread_entry->cfg = NULL; + g_queue_push_head (pool->available_items, thread_entry); } else { @@ -98,7 +103,7 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa } } -void +static void lua_thread_pool_terminate_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry) { struct thread_entry *ent = NULL; @@ -146,3 +151,88 @@ lua_thread_pool_restore_callback (struct lua_callback_state *cbs) lua_thread_pool_return (cbs->thread_pool, cbs->my_thread); lua_thread_pool_set_running_entry (cbs->thread_pool, cbs->previous_thread); } + +static gint +lua_do_resume (lua_State *L, gint narg) +{ +#if LUA_VERSION_NUM < 503 + return lua_resume (L, narg); +#else + return lua_resume (L, NULL, narg); +#endif +} + +static void lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg); + +void +lua_thread_call (struct thread_entry *thread_entry, int narg) +{ + g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't call running/yielded thread */ + g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call without pool */ + + lua_resume_thread_internal (thread_entry, narg); +} + +void +lua_resume_thread (struct thread_entry *thread_entry, gint narg) +{ + /* + * The only state where we can resume from is LUA_YIELD + * Another acceptable status is OK (0) but in that case we should push function on stack + * to start the thread from, which is happening in lua_thread_call(), not in this function. + */ + g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD); + + lua_resume_thread_internal (thread_entry, narg); +} + +static void +lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg) +{ + gint ret; + struct lua_thread_pool *pool; + GString *tb; + struct rspamd_task *task; + + ret = lua_do_resume (thread_entry->lua_state, narg); + + if (ret != LUA_YIELD) { + /* + LUA_YIELD state should not be handled here. + It should only happen when the thread initiated a asynchronous event and it will be restored as soon + the event is finished + */ + + if (thread_entry->task) { + pool = thread_entry->task->cfg->lua_thread_pool; + } + else { + pool = thread_entry->cfg->lua_thread_pool; + } + if (ret == 0) { + if (thread_entry->finish_callback) { + thread_entry->finish_callback (thread_entry, ret); + } + lua_thread_pool_return (pool, thread_entry); + } + else { + tb = rspamd_lua_get_traceback_string (thread_entry->lua_state); + if (thread_entry->error_callback) { + thread_entry->error_callback (thread_entry, ret, tb->str); + } + else if (thread_entry->task) { + task = thread_entry->task; + msg_err_task ("lua call failed (%d): %v", ret, tb); + } + else { + msg_err ("lua call failed (%d): %v", ret, tb); + } + + if (tb) { + g_string_free (tb, TRUE); + } + /* maybe there is a way to recover here. For now, just remove faulty thread */ + lua_thread_pool_terminate_entry (pool, thread_entry); + } + } +}
\ No newline at end of file diff --git a/src/lua/lua_thread_pool.h b/src/lua/lua_thread_pool.h index b72b72e8d..c77f77455 100644 --- a/src/lua/lua_thread_pool.h +++ b/src/lua/lua_thread_pool.h @@ -3,13 +3,25 @@ #include <lua.h> +struct thread_entry; +struct lua_thread_pool; + +typedef void (*lua_thread_finish_t) (struct thread_entry *thread, int ret); +typedef void (*lua_thread_error_t) (struct thread_entry *thread, int ret, const char *msg); + struct thread_entry { lua_State *lua_state; gint thread_index; gpointer cd; -}; -struct thread_pool; + /* function to handle result of called method, can be NULL */ + lua_thread_finish_t finish_callback; + + /* function to log result, i.e. if you want to modify error logging message or somehow process this state, can be NUL */ + lua_thread_error_t error_callback; + struct rspamd_task *task; + struct rspamd_config *cfg; +}; struct lua_callback_state { lua_State *L; @@ -59,15 +71,6 @@ void lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry); /** - * Removes thread from Lua state. It should be done to dead (which ended with an error) threads only - * - * @param pool - * @param thread_entry - */ -void -lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry); - -/** * Currently running thread. Typically needed in yielding point - to fill-up continuation. * * @param pool @@ -102,5 +105,16 @@ lua_thread_pool_prepare_callback (struct lua_thread_pool *pool, struct lua_callb void lua_thread_pool_restore_callback (struct lua_callback_state *cbs); + +/** + * Acts like lua_call but the tread is able to suspend execution. + * As soon as the call is over, call either thread_entry::finish_callback or thread_entry::error_callback. + * + * @param thread_entry + * @param narg + */ +void +lua_thread_call (struct thread_entry *thread_entry, int narg); + #endif /* LUA_THREAD_POOL_H_ */ diff --git a/test/functional/cases/210_clickhouse/001_migration.robot b/test/functional/cases/210_clickhouse/001_migration.robot index 81cfa9dca..0d8730ba1 100644 --- a/test/functional/cases/210_clickhouse/001_migration.robot +++ b/test/functional/cases/210_clickhouse/001_migration.robot @@ -24,7 +24,7 @@ Migration Prepare rspamd - Sleep 1 #TODO: replace this check with waiting until migration finishes + Sleep 2 #TODO: replace this check with waiting until migration finishes Column should exist rspamd Symbols.Scores Column should exist rspamd Attachments.Digest diff --git a/test/functional/cases/220_http.robot b/test/functional/cases/220_http.robot new file mode 100644 index 000000000..a8f47faa8 --- /dev/null +++ b/test/functional/cases/220_http.robot @@ -0,0 +1,65 @@ +*** Settings *** +Test Setup Http Setup +Test Teardown Http Teardown +Library Process +Library ${TESTDIR}/lib/rspamd.py +Resource ${TESTDIR}/lib/rspamd.robot +Variables ${TESTDIR}/lib/vars.py + +*** Variables *** +# ${CONFIG} ${TESTDIR}/configs/http.conf +${URL_TLD} ${TESTDIR}/../lua/unit/test_tld.dat +${CONFIG} ${TESTDIR}/configs/lua_test.conf +${MESSAGE} ${TESTDIR}/messages/spam_message.eml +${REDIS_SCOPE} Suite +${RSPAMD_SCOPE} Suite + +*** Test Cases *** +Simple HTTP request + Check url /request get HTTP_DNS_200 HTTP_200 HTTP_CORO_DNS_200 HTTP_CORO_200 method_get hello world HTTP_CORO_200 (0.00)[hello world] + Check url /request post HTTP_DNS_200 HTTP_200 HTTP_CORO_DNS_200 HTTP_CORO_200 method_post hello post HTTP_CORO_DNS_200 (0.00)[hello post] + +*** Test Cases *** +HTTP request 403 + Check url /error_403 get HTTP_DNS_403 HTTP_403 HTTP_CORO_DNS_403 HTTP_CORO_403 method_get + Check url /error_403 post HTTP_DNS_403 HTTP_403 HTTP_CORO_DNS_403 HTTP_CORO_403 method_post + + +*** Test Cases *** +HTTP timeout + Check url /timeout get HTTP_ERROR HTTP_ERROR HTTP_CORO_DNS_ERROR HTTP_CORO_ERROR method_get IO timeout + Check url /timeout post HTTP_DNS_ERROR HTTP_ERROR HTTP_CORO_DNS_ERROR HTTP_CORO_ERROR method_post IO timeout + + +*** Test Cases *** +HTTP empty response + Check url /empty get HTTP_ERROR HTTP_ERROR HTTP_CORO_DNS_ERROR HTTP_CORO_ERROR method_get IO read error: unexpected EOF + Check url /empty post HTTP_DNS_ERROR HTTP_ERROR HTTP_CORO_DNS_ERROR HTTP_CORO_ERROR method_post IO read error: unexpected EOF + + +*** Keywords *** +Lua Setup + [Arguments] ${LUA_SCRIPT} + Set Global Variable ${LUA_SCRIPT} + Generic Setup + +Http Setup + Run Dummy Http + Lua Setup ${TESTDIR}/lua/http.lua + +Http Teardown + ${http_pid} = Get File /tmp/dummy_http.pid + Shutdown Process With Children ${http_pid} + Normal Teardown + +Run Dummy Http + [Arguments] + ${result} = Start Process ${TESTDIR}/util/dummy_http.py + Wait Until Created /tmp/dummy_http.pid + + +Check url + [Arguments] ${url} ${method} @{expect_results} + ${result} = Scan Message With Rspamc --header=url:${url} --header=method:${method} ${MESSAGE} + : FOR ${expect} IN @{expect_results} + \ Check Rspamc ${result} ${expect}
\ No newline at end of file diff --git a/test/functional/configs/lua_test.conf b/test/functional/configs/lua_test.conf index f01f07cad..af84a1578 100644 --- a/test/functional/configs/lua_test.conf +++ b/test/functional/configs/lua_test.conf @@ -10,6 +10,10 @@ options = { name = "example.com", type = "a"; replies = ["93.184.216.34"]; + }, { + name = "site.resolveme", + type = "a"; + replies = ["127.0.0.1"]; }] } } diff --git a/test/functional/configs/plugins.conf b/test/functional/configs/plugins.conf index eab954333..aaf27b61e 100644 --- a/test/functional/configs/plugins.conf +++ b/test/functional/configs/plugins.conf @@ -421,6 +421,11 @@ options = { name = "rspamd.tk", type = "txt"; replies = ["bio=a263adeab8acdcdb8b89e127b67d696061fdfbee"]; + }, + { + name = "fail1.org.org.za", + type = "txt"; + replies = ["v=spf1 redirect=www.dnssec-failed.org"]; }]; } } diff --git a/test/functional/lib/rspamd.py b/test/functional/lib/rspamd.py index 2010f4127..97ab516c1 100644 --- a/test/functional/lib/rspamd.py +++ b/test/functional/lib/rspamd.py @@ -8,11 +8,12 @@ import re import shutil import signal import socket -import string +import errno import sys import tempfile import time from robot.libraries.BuiltIn import BuiltIn +from robot.api import logger if sys.version_info > (3,): long = int @@ -167,32 +168,47 @@ def update_dictionary(a, b): a.update(b) return a -def shutdown_process(pid): +def shutdown_process(process): i = 0 while i < 100: try: - os.kill(pid, signal.SIGTERM) + os.kill(process.pid, signal.SIGTERM) except OSError as e: - assert e.errno == 3 + assert e.errno == errno.ESRCH return + + if process.status() == psutil.STATUS_ZOMBIE: + return + i += 1 time.sleep(0.1) + while i < 200: try: - os.kill(pid, signal.SIGKILL) + os.kill(process.pid, signal.SIGKILL) except OSError as e: - assert e.errno == 3 + assert e.errno == errno.ESRCH return + + if process.status() == psutil.STATUS_ZOMBIE: + return + i += 1 time.sleep(0.1) - assert False, "Failed to shutdown process %s" % pid + assert False, "Failed to shutdown process %d (%s)" % (process.pid, process.name()) + def shutdown_process_with_children(pid): pid = int(pid) - children = psutil.Process(pid=pid).children(recursive=False) - shutdown_process(pid) + try: + process = psutil.Process(pid=pid) + except psutil.NoSuchProcess: + return + children = process.children(recursive=False) + shutdown_process(process) for child in children: try: - shutdown_process(child.pid) + shutdown_process(child) except: pass + diff --git a/test/functional/lua/http.lua b/test/functional/lua/http.lua new file mode 100644 index 000000000..03c4ca6fc --- /dev/null +++ b/test/functional/lua/http.lua @@ -0,0 +1,78 @@ +local rspamd_http = require "rspamd_http" +local rspamd_logger = require "rspamd_logger" + +local function http_symbol(task) + + local url = tostring(task:get_request_header('url')) + local method = tostring(task:get_request_header('method')) + + task:insert_result('method_' .. method, 1.0) + + local function http_callback(err, code, body) + if err then + rspamd_logger.errx('http_callback error: ' .. err) + task:insert_result('HTTP_ERROR', 1.0, err) + else + task:insert_result('HTTP_' .. code, 1.0, body) + end + end + + local function http_dns_callback(err, code, body) + if err then + rspamd_logger.errx('http_dns_callback error: ' .. err) + task:insert_result('HTTP_DNS_ERROR', 1.0, err) + else + task:insert_result('HTTP_DNS_' .. code, 1.0, body) + end + end + + rspamd_http.request({ + url = 'http://127.0.0.1:18080' .. url, + task = task, + method = method, + callback = http_callback, + timeout = 1, + }) + + --[[ request to this address involved DNS resolver subsystem ]] + rspamd_http.request({ + url = 'http://site.resolveme:18080' .. url, + task = task, + method = method, + callback = http_dns_callback, + timeout = 1, + }) + + local err, response = rspamd_http.request({ + url = 'http://127.0.0.1:18080' .. url, + task = task, + method = method, + timeout = 1, + }) + + if not err then + task:insert_result('HTTP_CORO_' .. response.code, 1.0, response.content) + else + task:insert_result('HTTP_CORO_ERROR', 1.0, err) + end + + err, response = rspamd_http.request({ + url = 'http://site.resolveme:18080' .. url, + task = task, + method = method, + timeout = 1, + }) + + if not err then + task:insert_result('HTTP_CORO_DNS_' .. response.code, 1.0, response.content) + else + task:insert_result('HTTP_CORO_DNS_ERROR', 1.0, err) + end +end + +rspamd_config:register_symbol({ + name = 'SIMPLE_TEST', + score = 1.0, + callback = http_symbol, + no_squeeze = true +}) diff --git a/test/functional/util/dummy_fprot.py b/test/functional/util/dummy_fprot.py index c35f0628f..34725280b 100755 --- a/test/functional/util/dummy_fprot.py +++ b/test/functional/util/dummy_fprot.py @@ -1,14 +1,15 @@ #!/usr/bin/env python - -PID = "/tmp/dummy_fprot.pid" - import os import sys +import signal + + try: import SocketServer as socketserver except: import socketserver -import signal + +PID = "/tmp/dummy_fprot.pid" class MyTCPHandler(socketserver.BaseRequestHandler): diff --git a/test/functional/util/dummy_http.py b/test/functional/util/dummy_http.py new file mode 100755 index 000000000..4f8e67ffd --- /dev/null +++ b/test/functional/util/dummy_http.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python + +import BaseHTTPServer +import time +import os +import sys +import signal + +PORT = 18080 +HOST_NAME = '127.0.0.1' + +PID = "/tmp/dummy_http.pid" + + +class MyHandler(BaseHTTPServer.BaseHTTPRequestHandler): + + def do_HEAD(self): + self.send_response(200) + self.send_header("Content-type", "text/html") + self.end_headers() + + def do_GET(self): + """Respond to a GET request.""" + if self.path == "/empty": + self.finish() + return + + if self.path == "/timeout": + time.sleep(2) + + if self.path == "/error_403": + self.send_response(403) + else: + self.send_response(200) + + self.send_header("Content-type", "text/plain") + self.end_headers() + self.wfile.write("hello world") + + def do_POST(self): + """Respond to a GET request.""" + if self.path == "/empty": + self.finish() + return + + if self.path == "/timeout": + time.sleep(2) + + if self.path == "/error_403": + self.send_response(403) + else: + self.send_response(200) + + self.send_header("Content-type", "text/plain") + self.end_headers() + self.wfile.write("hello post") + + +class MyHttp(BaseHTTPServer.HTTPServer): + def __init__(self, server_address, RequestHandlerClass, bind_and_activate=False): + BaseHTTPServer.HTTPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate) + self.keep_running = True + + def run(self): + self.server_bind() + self.server_activate() + + with open(PID, 'w+') as f: + f.write(str(os.getpid())) + f.close() + + while self.keep_running: + try: + self.handle_request() + except Exception: + pass + + def stop(self): + self.keep_running = False + self.server_close() + + +if __name__ == '__main__': + server_class = BaseHTTPServer.HTTPServer + httpd = MyHttp((HOST_NAME, PORT), MyHandler) + httpd.allow_reuse_address = True + httpd.timeout = 1 + + def alarm_handler(signum, frame): + httpd.stop() + + signal.signal(signal.SIGALRM, alarm_handler) + signal.signal(signal.SIGTERM, alarm_handler) + signal.alarm(10) + + try: + httpd.run() + except KeyboardInterrupt: + pass + httpd.server_close() |