summaryrefslogtreecommitdiffstats
path: root/src/lua/lua_tcp.c
diff options
context:
space:
mode:
authorMikhail Galanin <mgalanin@mimecast.com>2018-08-17 11:16:45 +0100
committerMikhail Galanin <mgalanin@mimecast.com>2018-08-17 11:16:45 +0100
commit15c7adc671c7d8e22febab8e64e946f71e93738c (patch)
tree13f6905213d8c1e507d2276a40c76e5ff3158c70 /src/lua/lua_tcp.c
parent55afdd2905dd1d9f58982691a404c373e768d304 (diff)
downloadrspamd-15c7adc671c7d8e22febab8e64e946f71e93738c.tar.gz
rspamd-15c7adc671c7d8e22febab8e64e946f71e93738c.zip
[Minor] use callback helpers to avoid conflicts between coroutine- and callback-based code
Diffstat (limited to 'src/lua/lua_tcp.c')
-rw-r--r--src/lua/lua_tcp.c82
1 files changed, 51 insertions, 31 deletions
diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c
index 797bdcc4e..8d948c6d5 100644
--- a/src/lua/lua_tcp.c
+++ b/src/lua/lua_tcp.c
@@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "lua_common.h"
+#include "lua_thread_pool.h"
#include "utlist.h"
#include "unix-std.h"
@@ -186,7 +187,6 @@ struct lua_tcp_dtor {
#define LUA_TCP_FLAG_FINISHED (1 << 4)
struct lua_tcp_cbdata {
- lua_State *L;
struct rspamd_async_session *session;
struct rspamd_async_event *async_ev;
struct event_base *ev_base;
@@ -203,6 +203,7 @@ struct lua_tcp_cbdata {
struct event ev;
struct lua_tcp_dtor *dtors;
ref_entry_t ref;
+ struct rspamd_task *task;
};
#define msg_debug_tcp(...) rspamd_conditional_debug_fast (NULL, cbd->addr, \
@@ -249,7 +250,7 @@ lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd)
if (hdl->type == LUA_WANT_READ) {
if (hdl->h.r.cbref) {
- luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.r.cbref);
+ luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, hdl->h.r.cbref);
}
if (hdl->h.r.stop_pattern) {
@@ -258,7 +259,7 @@ lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd)
}
else {
if (hdl->h.w.cbref) {
- luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.w.cbref);
+ luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, hdl->h.w.cbref);
}
if (hdl->h.w.iov) {
@@ -280,7 +281,7 @@ lua_tcp_fin (gpointer arg)
msg_debug_tcp ("finishing TCP connection");
if (cbd->connect_cb) {
- luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb);
+ luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, cbd->connect_cb);
}
if (cbd->fd != -1) {
@@ -338,6 +339,11 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
struct lua_tcp_cbdata **pcbd;
struct lua_tcp_handler *hdl;
gint cbref, top;
+ struct lua_callback_state cbs;
+ lua_State *L;
+
+ lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
+ L = cbs.L;
va_start (ap, err);
@@ -356,27 +362,27 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
}
if (cbref != -1) {
- top = lua_gettop (cbd->L);
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref);
+ top = lua_gettop (L);
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbref);
/* Error message */
va_copy (ap_copy, ap);
- lua_pushvfstring (cbd->L, err, ap_copy);
+ lua_pushvfstring (L, err, ap_copy);
va_end (ap_copy);
/* Body */
- lua_pushnil (cbd->L);
+ lua_pushnil (L);
/* Connection */
- pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+ pcbd = lua_newuserdata (L, sizeof (*pcbd));
*pcbd = cbd;
- rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+ rspamd_lua_setclass (L, "rspamd{tcp}", -1);
REF_RETAIN (cbd);
- if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+ if (lua_pcall (L, 3, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
- lua_settop (cbd->L, top);
+ lua_settop (L, top);
REF_RELEASE (cbd);
}
@@ -391,6 +397,8 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
}
va_end (ap);
+
+ lua_thread_pool_restore_callback (&cbs);
}
static void
@@ -400,6 +408,11 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
struct lua_tcp_cbdata **pcbd;
struct lua_tcp_handler *hdl;
gint cbref, arg_cnt, top;
+ struct lua_callback_state cbs;
+ lua_State *L;
+
+ lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
+ L = cbs.L;
hdl = g_queue_peek_head (cbd->handlers);
@@ -413,15 +426,15 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
}
if (cbref != -1) {
- top = lua_gettop (cbd->L);
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref);
+ top = lua_gettop (L);
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbref);
/* Error */
- lua_pushnil (cbd->L);
+ lua_pushnil (L);
/* Body */
if (hdl->type == LUA_WANT_READ) {
- t = lua_newuserdata (cbd->L, sizeof (*t));
- rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
+ t = lua_newuserdata (L, sizeof (*t));
+ rspamd_lua_setclass (L, "rspamd{text}", -1);
t->start = (const gchar *)str;
t->len = len;
t->flags = 0;
@@ -431,19 +444,21 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
arg_cnt = 2;
}
/* Connection */
- pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+ pcbd = lua_newuserdata (L, sizeof (*pcbd));
*pcbd = cbd;
- rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+ rspamd_lua_setclass (L, "rspamd{tcp}", -1);
REF_RETAIN (cbd);
- if (lua_pcall (cbd->L, arg_cnt, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+ if (lua_pcall (L, arg_cnt, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
- lua_settop (cbd->L, top);
+ lua_settop (L, top);
REF_RELEASE (cbd);
}
+
+ lua_thread_pool_restore_callback (&cbs);
}
static void
@@ -667,6 +682,8 @@ lua_tcp_handler (int fd, short what, gpointer ud)
gssize r;
gint so_error = 0;
socklen_t so_len = sizeof (so_error);
+ struct lua_callback_state cbs;
+ lua_State *L;
REF_RETAIN (cbd);
@@ -693,22 +710,25 @@ lua_tcp_handler (int fd, short what, gpointer ud)
else {
cbd->flags |= LUA_TCP_FLAG_CONNECTED;
+ lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
+ L = cbs.L;
+
if (cbd->connect_cb != -1) {
struct lua_tcp_cbdata **pcbd;
gint top;
- top = lua_gettop (cbd->L);
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb);
- pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+ top = lua_gettop (L);
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->connect_cb);
+ pcbd = lua_newuserdata (L, sizeof (*pcbd));
*pcbd = cbd;
REF_RETAIN (cbd);
- rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+ rspamd_lua_setclass (L, "rspamd{tcp}", -1);
- if (lua_pcall (cbd->L, 1, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+ if (lua_pcall (L, 1, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
- lua_settop (cbd->L, top);
+ lua_settop (L, top);
REF_RELEASE (cbd);
}
@@ -1174,7 +1194,7 @@ lua_tcp_request (lua_State *L)
return 1;
}
- cbd->L = L;
+ cbd->task = task;
h = rspamd_random_uint64_fast ();
rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h);
cbd->handlers = g_queue_new ();