From: Mikhail Galanin Date: Fri, 17 Aug 2018 10:16:45 +0000 (+0100) Subject: [Minor] use callback helpers to avoid conflicts between coroutine- and callback-based... X-Git-Tag: 1.8.0~234^2~6 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=15c7adc671c7d8e22febab8e64e946f71e93738c;p=rspamd.git [Minor] use callback helpers to avoid conflicts between coroutine- and callback-based code --- diff --git a/src/lua/lua_dns.c b/src/lua/lua_dns.c index 451f3f717..045b2f1de 100644 --- a/src/lua/lua_dns.c +++ b/src/lua/lua_dns.c @@ -78,7 +78,6 @@ lua_check_dns_resolver (lua_State * L) } struct lua_dns_cbdata { - lua_State *L; struct thread_entry *thread; struct rspamd_task *task; struct rspamd_dns_resolver *resolver; @@ -142,15 +141,22 @@ lua_dns_callback (struct rdns_reply *reply, gpointer arg) struct rspamd_dns_resolver **presolver; struct rdns_reply_entry *elt; rspamd_inet_addr_t *addr; + lua_State *L; + struct lua_callback_state cbs; if (cd->cbref != -1) { - lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->cbref); + lua_thread_pool_prepare_callback (cd->resolver->cfg->lua_thread_pool, &cbs); + L = cbs.L; + + lua_rawgeti (L, LUA_REGISTRYINDEX, cd->cbref); - presolver = lua_newuserdata (cd->L, sizeof (gpointer)); - rspamd_lua_setclass (cd->L, "rspamd{resolver}", -1); + presolver = lua_newuserdata (L, sizeof (gpointer)); + rspamd_lua_setclass (L, "rspamd{resolver}", -1); *presolver = cd->resolver; - lua_pushstring (cd->L, cd->to_resolve); + lua_pushstring (L, cd->to_resolve); + } else { + L = cd->thread->lua_state; } /* @@ -161,72 +167,72 @@ lua_dns_callback (struct rdns_reply *reply, gpointer arg) naddrs ++; } - lua_createtable (cd->L, naddrs, 0); + lua_createtable (L, naddrs, 0); LL_FOREACH (reply->entries, elt) { switch (elt->type) { case RDNS_REQUEST_A: addr = rspamd_inet_address_new (AF_INET, &elt->content.a.addr); - rspamd_lua_ip_push (cd->L, addr); + rspamd_lua_ip_push (L, addr); rspamd_inet_address_free (addr); - lua_rawseti (cd->L, -2, ++i); + lua_rawseti (L, -2, ++i); break; case RDNS_REQUEST_AAAA: addr = rspamd_inet_address_new (AF_INET6, &elt->content.aaa.addr); - rspamd_lua_ip_push (cd->L, addr); + rspamd_lua_ip_push (L, addr); rspamd_inet_address_free (addr); - lua_rawseti (cd->L, -2, ++i); + lua_rawseti (L, -2, ++i); break; case RDNS_REQUEST_NS: - lua_pushstring (cd->L, elt->content.ns.name); - lua_rawseti (cd->L, -2, ++i); + lua_pushstring (L, elt->content.ns.name); + lua_rawseti (L, -2, ++i); break; case RDNS_REQUEST_PTR: - lua_pushstring (cd->L, elt->content.ptr.name); - lua_rawseti (cd->L, -2, ++i); + lua_pushstring (L, elt->content.ptr.name); + lua_rawseti (L, -2, ++i); break; case RDNS_REQUEST_TXT: case RDNS_REQUEST_SPF: - lua_pushstring (cd->L, elt->content.txt.data); - lua_rawseti (cd->L, -2, ++i); + lua_pushstring (L, elt->content.txt.data); + lua_rawseti (L, -2, ++i); break; case RDNS_REQUEST_MX: /* mx['name'], mx['priority'] */ - lua_createtable (cd->L, 0, 2); - rspamd_lua_table_set (cd->L, "name", elt->content.mx.name); - lua_pushstring (cd->L, "priority"); - lua_pushinteger (cd->L, elt->content.mx.priority); - lua_settable (cd->L, -3); + lua_createtable (L, 0, 2); + rspamd_lua_table_set (L, "name", elt->content.mx.name); + lua_pushstring (L, "priority"); + lua_pushinteger (L, elt->content.mx.priority); + lua_settable (L, -3); - lua_rawseti (cd->L, -2, ++i); + lua_rawseti (L, -2, ++i); break; case RDNS_REQUEST_SOA: - lua_createtable (cd->L, 0, 7); - rspamd_lua_table_set (cd->L, "ns", elt->content.soa.mname); - rspamd_lua_table_set (cd->L, "contact", elt->content.soa.admin); - lua_pushstring (cd->L, "serial"); - lua_pushinteger (cd->L, elt->content.soa.serial); - lua_settable (cd->L, -3); - lua_pushstring (cd->L, "refresh"); - lua_pushinteger (cd->L, elt->content.soa.refresh); - lua_settable (cd->L, -3); - lua_pushstring (cd->L, "retry"); - lua_pushinteger (cd->L, elt->content.soa.retry); - lua_settable (cd->L, -3); - lua_pushstring (cd->L, "expiry"); - lua_pushinteger (cd->L, elt->content.soa.expire); - lua_settable (cd->L, -3); + lua_createtable (L, 0, 7); + rspamd_lua_table_set (L, "ns", elt->content.soa.mname); + rspamd_lua_table_set (L, "contact", elt->content.soa.admin); + lua_pushstring (L, "serial"); + lua_pushinteger (L, elt->content.soa.serial); + lua_settable (L, -3); + lua_pushstring (L, "refresh"); + lua_pushinteger (L, elt->content.soa.refresh); + lua_settable (L, -3); + lua_pushstring (L, "retry"); + lua_pushinteger (L, elt->content.soa.retry); + lua_settable (L, -3); + lua_pushstring (L, "expiry"); + lua_pushinteger (L, elt->content.soa.expire); + lua_settable (L, -3); /* Negative TTL */ - lua_pushstring (cd->L, "nx"); - lua_pushinteger (cd->L, elt->content.soa.minimum); - lua_settable (cd->L, -3); + lua_pushstring (L, "nx"); + lua_pushinteger (L, elt->content.soa.minimum); + lua_settable (L, -3); - lua_rawseti (cd->L, -2, ++i); + lua_rawseti (L, -2, ++i); break; } } - lua_pushnil (cd->L); + lua_pushnil (L); } if (cd->cbref != -1) { @@ -239,25 +245,27 @@ lua_dns_callback (struct rdns_reply *reply, gpointer arg) * 6 - reply->authenticated */ if (reply->code != RDNS_RC_NOERROR) { - lua_pushnil (cd->L); - lua_pushstring (cd->L, rdns_strerror (reply->code)); + lua_pushnil (L); + lua_pushstring (L, rdns_strerror (reply->code)); } if (cd->user_str != NULL) { - lua_pushstring (cd->L, cd->user_str); + lua_pushstring (L, cd->user_str); } else { - lua_pushnil (cd->L); + lua_pushnil (L); } - lua_pushboolean (cd->L, reply->authenticated); + lua_pushboolean (L, reply->authenticated); - if (lua_pcall (cd->L, 6, 0, 0) != 0) { - msg_info ("call to dns callback failed: %s", lua_tostring (cd->L, -1)); - lua_pop (cd->L, 1); + if (lua_pcall (L, 6, 0, 0) != 0) { + msg_info ("call to dns callback failed: %s", lua_tostring (L, -1)); + lua_pop (L, 1); } /* Unref function */ - luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->cbref); + luaL_unref (L, LUA_REGISTRYINDEX, cd->cbref); + + lua_thread_pool_restore_callback (&cbs); } else { /* * 1 - true | false in the case of error @@ -269,18 +277,21 @@ lua_dns_callback (struct rdns_reply *reply, gpointer arg) * } */ if (reply->code != RDNS_RC_NOERROR) { - lua_pushboolean (cd->L, false); - lua_pushstring (cd->L, rdns_strerror (reply->code)); + lua_pushboolean (L, false); + lua_pushstring (L, rdns_strerror (reply->code)); } else { - lua_pushboolean (cd->L, reply->authenticated); - lua_setfield (cd->L, -3, "authenticated"); + lua_pushboolean (L, reply->authenticated); + lua_setfield (L, -3, "authenticated"); /* result 1 - not and error */ - lua_pushboolean (cd->L, true); + lua_pushboolean (L, true); /* push table into stack, result 2 - results itself */ - lua_pushvalue (cd->L, -3); + lua_pushvalue (L, -3); } + + g_assert (L == cd->thread->lua_state); + lua_resume_thread (cd->task, cd->thread, 2); } @@ -364,10 +375,12 @@ lua_dns_resolver_resolve_common (lua_State *L, pool = task->task_pool; session = task->s; } + else if (!session || !pool) { + return luaL_error (L, "invalid arguments: either 'task' or 'session'/'mempool' should be set"); + } if (pool != NULL && to_resolve != NULL) { cbdata = rspamd_mempool_alloc0 (pool, sizeof (struct lua_dns_cbdata)); - cbdata->L = L; cbdata->resolver = resolver; cbdata->cbref = cbref; cbdata->user_str = rspamd_mempool_strdup (pool, user_str); diff --git a/src/lua/lua_http.c b/src/lua/lua_http.c index c292428c2..64617be9b 100644 --- a/src/lua/lua_http.c +++ b/src/lua/lua_http.c @@ -14,6 +14,7 @@ * limitations under the License. */ #include "lua_common.h" +#include "lua_thread_pool.h" #include "http_private.h" #include "unix-std.h" #include "zlib.h" @@ -56,7 +57,6 @@ static const struct luaL_reg httplib_m[] = { #define RSPAMD_LUA_HTTP_FLAG_NOVERIFY (1 << 1) struct lua_http_cbdata { - lua_State *L; struct rspamd_http_connection *conn; struct rspamd_async_session *session; struct rspamd_async_watcher *w; @@ -96,7 +96,7 @@ lua_http_fin (gpointer arg) { struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)arg; - luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref); + luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref); if (cbd->conn) { /* Here we already have a connection, so we need to unref it */ rspamd_http_connection_unref (cbd->conn); @@ -152,13 +152,22 @@ lua_http_maybe_free (struct lua_http_cbdata *cbd) static void lua_http_push_error (struct lua_http_cbdata *cbd, const char *err) { - lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref); - lua_pushstring (cbd->L, err); + struct lua_callback_state lcbd; + lua_State *L; + + lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd); + + L = lcbd.L; - if (lua_pcall (cbd->L, 1, 0, 0) != 0) { - msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); - lua_pop (cbd->L, 1); + lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref); + lua_pushstring (L, err); + + if (lua_pcall (L, 1, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (L, -1)); + lua_pop (L, 1); } + + lua_thread_pool_restore_callback (&lcbd); } static void @@ -179,51 +188,60 @@ lua_http_finish_handler (struct rspamd_http_connection *conn, const gchar *body; gsize body_len; - lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref); + struct lua_callback_state lcbd; + lua_State *L; + + lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd); + + L = lcbd.L; + + lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref); /* Error */ - lua_pushnil (cbd->L); + lua_pushnil (L); /* Reply code */ - lua_pushinteger (cbd->L, msg->code); + lua_pushinteger (L, msg->code); /* Body */ body = rspamd_http_message_get_body (msg, &body_len); if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_TEXT) { struct rspamd_lua_text *t; - t = lua_newuserdata (cbd->L, sizeof (*t)); - rspamd_lua_setclass (cbd->L, "rspamd{text}", -1); + t = lua_newuserdata (L, sizeof (*t)); + rspamd_lua_setclass (L, "rspamd{text}", -1); t->start = body; t->len = body_len; t->flags = 0; } else { if (body_len > 0) { - lua_pushlstring (cbd->L, body, body_len); + lua_pushlstring (L, body, body_len); } else { - lua_pushnil (cbd->L); + lua_pushnil (L); } } /* Headers */ - lua_newtable (cbd->L); + lua_newtable (L); HASH_ITER (hh, msg->headers, h, htmp) { /* * Lowercase header name, as Lua cannot search in caseless matter */ rspamd_str_lc (h->combined->str, h->name.len); - lua_pushlstring (cbd->L, h->name.begin, h->name.len); - lua_pushlstring (cbd->L, h->value.begin, h->value.len); - lua_settable (cbd->L, -3); + lua_pushlstring (L, h->name.begin, h->name.len); + lua_pushlstring (L, h->value.begin, h->value.len); + lua_settable (L, -3); } - if (lua_pcall (cbd->L, 4, 0, 0) != 0) { - msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); - lua_pop (cbd->L, 1); + if (lua_pcall (L, 4, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (L, -1)); + lua_pop (L, 1); } lua_http_maybe_free (cbd); + lua_thread_pool_restore_callback (&lcbd); + return 0; } @@ -707,7 +725,6 @@ lua_http_request (lua_State *L) } cbd = g_malloc0 (sizeof (*cbd)); - cbd->L = L; cbd->cbref = cbref; cbd->msg = msg; cbd->ev_base = ev_base; diff --git a/src/lua/lua_redis.c b/src/lua/lua_redis.c index e5b97ebeb..0fc9c43b7 100644 --- a/src/lua/lua_redis.c +++ b/src/lua/lua_redis.c @@ -14,6 +14,7 @@ * limitations under the License. */ #include "lua_common.h" +#include "lua_thread_pool.h" #include "utlist.h" #include "contrib/hiredis/hiredis.h" @@ -92,7 +93,7 @@ struct lua_redis_specific_userdata; */ struct lua_redis_userdata { redisAsyncContext *ctx; - lua_State *L; + struct rspamd_task *task; struct rspamd_async_session *s; struct event_base *ev_base; struct rspamd_config *cfg; @@ -191,7 +192,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx) lua_redis_free_args (cur->args, cur->arglens, cur->nargs); if (cur->cbref != -1) { - luaL_unref (ud->L, LUA_REGISTRYINDEX, cur->cbref); + luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref); } g_free (cur); @@ -244,21 +245,27 @@ lua_redis_push_error (const gchar *err, gboolean connected) { struct lua_redis_userdata *ud = sp_ud->c; + struct lua_callback_state cbs; if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) { if (sp_ud->cbref != -1) { + + lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs); + /* Push error */ - lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref); + lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref); /* String of error */ - lua_pushstring (ud->L, err); + lua_pushstring (cbs.L, err); /* Data is nil */ - lua_pushnil (ud->L); + lua_pushnil (cbs.L); - if (lua_pcall (ud->L, 2, 0, 0) != 0) { - msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); - lua_pop (ud->L, 1); + if (lua_pcall (cbs.L, 2, 0, 0) != 0) { + msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1)); + lua_pop (cbs.L, 1); } + + lua_thread_pool_restore_callback (&cbs); } sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; @@ -323,21 +330,25 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx, struct lua_redis_specific_userdata *sp_ud) { struct lua_redis_userdata *ud = sp_ud->c; + struct lua_callback_state cbs; if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) { if (sp_ud->cbref != -1) { + lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs); + /* Push error */ - lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref); + lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref); /* Error is nil */ - lua_pushnil (ud->L); + lua_pushnil (cbs.L); /* Data */ - lua_redis_push_reply (ud->L, r, ctx->flags & LUA_REDIS_TEXTDATA); + lua_redis_push_reply (cbs.L, r, ctx->flags & LUA_REDIS_TEXTDATA); - if (lua_pcall (ud->L, 2, 0, 0) != 0) { - msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1)); - lua_pop (ud->L, 1); + if (lua_pcall (cbs.L, 2, 0, 0) != 0) { + msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1)); + lua_pop (cbs.L, 1); } + lua_thread_pool_restore_callback (&cbs); } sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; @@ -689,7 +700,7 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref) ud->cfg = cfg; ud->pool = cfg->redis_pool; ud->ev_base = ev_base; - ud->L = L; + ud->task = task; ret = TRUE; } diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c index 797bdcc4e..8d948c6d5 100644 --- a/src/lua/lua_tcp.c +++ b/src/lua/lua_tcp.c @@ -14,6 +14,7 @@ * limitations under the License. */ #include "lua_common.h" +#include "lua_thread_pool.h" #include "utlist.h" #include "unix-std.h" @@ -186,7 +187,6 @@ struct lua_tcp_dtor { #define LUA_TCP_FLAG_FINISHED (1 << 4) struct lua_tcp_cbdata { - lua_State *L; struct rspamd_async_session *session; struct rspamd_async_event *async_ev; struct event_base *ev_base; @@ -203,6 +203,7 @@ struct lua_tcp_cbdata { struct event ev; struct lua_tcp_dtor *dtors; ref_entry_t ref; + struct rspamd_task *task; }; #define msg_debug_tcp(...) rspamd_conditional_debug_fast (NULL, cbd->addr, \ @@ -249,7 +250,7 @@ lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd) if (hdl->type == LUA_WANT_READ) { if (hdl->h.r.cbref) { - luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.r.cbref); + luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, hdl->h.r.cbref); } if (hdl->h.r.stop_pattern) { @@ -258,7 +259,7 @@ lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd) } else { if (hdl->h.w.cbref) { - luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.w.cbref); + luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, hdl->h.w.cbref); } if (hdl->h.w.iov) { @@ -280,7 +281,7 @@ lua_tcp_fin (gpointer arg) msg_debug_tcp ("finishing TCP connection"); if (cbd->connect_cb) { - luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb); + luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, cbd->connect_cb); } if (cbd->fd != -1) { @@ -338,6 +339,11 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal, struct lua_tcp_cbdata **pcbd; struct lua_tcp_handler *hdl; gint cbref, top; + struct lua_callback_state cbs; + lua_State *L; + + lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs); + L = cbs.L; va_start (ap, err); @@ -356,27 +362,27 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal, } if (cbref != -1) { - top = lua_gettop (cbd->L); - lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref); + top = lua_gettop (L); + lua_rawgeti (L, LUA_REGISTRYINDEX, cbref); /* Error message */ va_copy (ap_copy, ap); - lua_pushvfstring (cbd->L, err, ap_copy); + lua_pushvfstring (L, err, ap_copy); va_end (ap_copy); /* Body */ - lua_pushnil (cbd->L); + lua_pushnil (L); /* Connection */ - pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd)); + pcbd = lua_newuserdata (L, sizeof (*pcbd)); *pcbd = cbd; - rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1); + rspamd_lua_setclass (L, "rspamd{tcp}", -1); REF_RETAIN (cbd); - if (lua_pcall (cbd->L, 3, 0, 0) != 0) { - msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); + if (lua_pcall (L, 3, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (L, -1)); } - lua_settop (cbd->L, top); + lua_settop (L, top); REF_RELEASE (cbd); } @@ -391,6 +397,8 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal, } va_end (ap); + + lua_thread_pool_restore_callback (&cbs); } static void @@ -400,6 +408,11 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) struct lua_tcp_cbdata **pcbd; struct lua_tcp_handler *hdl; gint cbref, arg_cnt, top; + struct lua_callback_state cbs; + lua_State *L; + + lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs); + L = cbs.L; hdl = g_queue_peek_head (cbd->handlers); @@ -413,15 +426,15 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) } if (cbref != -1) { - top = lua_gettop (cbd->L); - lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref); + top = lua_gettop (L); + lua_rawgeti (L, LUA_REGISTRYINDEX, cbref); /* Error */ - lua_pushnil (cbd->L); + lua_pushnil (L); /* Body */ if (hdl->type == LUA_WANT_READ) { - t = lua_newuserdata (cbd->L, sizeof (*t)); - rspamd_lua_setclass (cbd->L, "rspamd{text}", -1); + t = lua_newuserdata (L, sizeof (*t)); + rspamd_lua_setclass (L, "rspamd{text}", -1); t->start = (const gchar *)str; t->len = len; t->flags = 0; @@ -431,19 +444,21 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) arg_cnt = 2; } /* Connection */ - pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd)); + pcbd = lua_newuserdata (L, sizeof (*pcbd)); *pcbd = cbd; - rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1); + rspamd_lua_setclass (L, "rspamd{tcp}", -1); REF_RETAIN (cbd); - if (lua_pcall (cbd->L, arg_cnt, 0, 0) != 0) { - msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); + if (lua_pcall (L, arg_cnt, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (L, -1)); } - lua_settop (cbd->L, top); + lua_settop (L, top); REF_RELEASE (cbd); } + + lua_thread_pool_restore_callback (&cbs); } static void @@ -667,6 +682,8 @@ lua_tcp_handler (int fd, short what, gpointer ud) gssize r; gint so_error = 0; socklen_t so_len = sizeof (so_error); + struct lua_callback_state cbs; + lua_State *L; REF_RETAIN (cbd); @@ -693,22 +710,25 @@ lua_tcp_handler (int fd, short what, gpointer ud) else { cbd->flags |= LUA_TCP_FLAG_CONNECTED; + lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs); + L = cbs.L; + if (cbd->connect_cb != -1) { struct lua_tcp_cbdata **pcbd; gint top; - top = lua_gettop (cbd->L); - lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb); - pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd)); + top = lua_gettop (L); + lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->connect_cb); + pcbd = lua_newuserdata (L, sizeof (*pcbd)); *pcbd = cbd; REF_RETAIN (cbd); - rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1); + rspamd_lua_setclass (L, "rspamd{tcp}", -1); - if (lua_pcall (cbd->L, 1, 0, 0) != 0) { - msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); + if (lua_pcall (L, 1, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (L, -1)); } - lua_settop (cbd->L, top); + lua_settop (L, top); REF_RELEASE (cbd); } @@ -1174,7 +1194,7 @@ lua_tcp_request (lua_State *L) return 1; } - cbd->L = L; + cbd->task = task; h = rspamd_random_uint64_fast (); rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h); cbd->handlers = g_queue_new ();