]> source.dussan.org Git - rspamd.git/commitdiff
[Minor] use callback helpers to avoid conflicts between coroutine- and callback-based...
authorMikhail Galanin <mgalanin@mimecast.com>
Fri, 17 Aug 2018 10:16:45 +0000 (11:16 +0100)
committerMikhail Galanin <mgalanin@mimecast.com>
Fri, 17 Aug 2018 10:16:45 +0000 (11:16 +0100)
src/lua/lua_dns.c
src/lua/lua_http.c
src/lua/lua_redis.c
src/lua/lua_tcp.c

index 451f3f717267dffe79b228ab94d847cbac6fb6af..045b2f1deaf64b3b9cd1dda4bafdab83a6e82a45 100644 (file)
@@ -78,7 +78,6 @@ lua_check_dns_resolver (lua_State * L)
 }
 
 struct lua_dns_cbdata {
-       lua_State *L;
        struct thread_entry *thread;
        struct rspamd_task *task;
        struct rspamd_dns_resolver *resolver;
@@ -142,15 +141,22 @@ lua_dns_callback (struct rdns_reply *reply, gpointer arg)
        struct rspamd_dns_resolver **presolver;
        struct rdns_reply_entry *elt;
        rspamd_inet_addr_t *addr;
+       lua_State *L;
+       struct lua_callback_state cbs;
 
        if (cd->cbref != -1) {
-               lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->cbref);
+               lua_thread_pool_prepare_callback (cd->resolver->cfg->lua_thread_pool, &cbs);
+               L = cbs.L;
+
+               lua_rawgeti (L, LUA_REGISTRYINDEX, cd->cbref);
 
-               presolver = lua_newuserdata (cd->L, sizeof (gpointer));
-               rspamd_lua_setclass (cd->L, "rspamd{resolver}", -1);
+               presolver = lua_newuserdata (L, sizeof (gpointer));
+               rspamd_lua_setclass (L, "rspamd{resolver}", -1);
 
                *presolver = cd->resolver;
-               lua_pushstring (cd->L, cd->to_resolve);
+               lua_pushstring (L, cd->to_resolve);
+       } else {
+               L = cd->thread->lua_state;
        }
 
        /*
@@ -161,72 +167,72 @@ lua_dns_callback (struct rdns_reply *reply, gpointer arg)
                        naddrs ++;
                }
 
-               lua_createtable (cd->L, naddrs, 0);
+               lua_createtable (L, naddrs, 0);
 
                LL_FOREACH (reply->entries, elt)
                {
                        switch (elt->type) {
                        case RDNS_REQUEST_A:
                                addr = rspamd_inet_address_new (AF_INET, &elt->content.a.addr);
-                               rspamd_lua_ip_push (cd->L, addr);
+                               rspamd_lua_ip_push (L, addr);
                                rspamd_inet_address_free (addr);
-                               lua_rawseti (cd->L, -2, ++i);
+                               lua_rawseti (L, -2, ++i);
                                break;
                        case RDNS_REQUEST_AAAA:
                                addr = rspamd_inet_address_new (AF_INET6, &elt->content.aaa.addr);
-                               rspamd_lua_ip_push (cd->L, addr);
+                               rspamd_lua_ip_push (L, addr);
                                rspamd_inet_address_free (addr);
-                               lua_rawseti (cd->L, -2, ++i);
+                               lua_rawseti (L, -2, ++i);
                                break;
                        case RDNS_REQUEST_NS:
-                               lua_pushstring (cd->L, elt->content.ns.name);
-                               lua_rawseti (cd->L, -2, ++i);
+                               lua_pushstring (L, elt->content.ns.name);
+                               lua_rawseti (L, -2, ++i);
                                break;
                        case RDNS_REQUEST_PTR:
-                               lua_pushstring (cd->L, elt->content.ptr.name);
-                               lua_rawseti (cd->L, -2, ++i);
+                               lua_pushstring (L, elt->content.ptr.name);
+                               lua_rawseti (L, -2, ++i);
                                break;
                        case RDNS_REQUEST_TXT:
                        case RDNS_REQUEST_SPF:
-                               lua_pushstring (cd->L, elt->content.txt.data);
-                               lua_rawseti (cd->L, -2, ++i);
+                               lua_pushstring (L, elt->content.txt.data);
+                               lua_rawseti (L, -2, ++i);
                                break;
                        case RDNS_REQUEST_MX:
                                /* mx['name'], mx['priority'] */
-                               lua_createtable (cd->L, 0, 2);
-                               rspamd_lua_table_set (cd->L, "name", elt->content.mx.name);
-                               lua_pushstring (cd->L, "priority");
-                               lua_pushinteger (cd->L, elt->content.mx.priority);
-                               lua_settable (cd->L, -3);
+                               lua_createtable (L, 0, 2);
+                               rspamd_lua_table_set (L, "name", elt->content.mx.name);
+                               lua_pushstring (L, "priority");
+                               lua_pushinteger (L, elt->content.mx.priority);
+                               lua_settable (L, -3);
 
-                               lua_rawseti (cd->L, -2, ++i);
+                               lua_rawseti (L, -2, ++i);
                                break;
                        case RDNS_REQUEST_SOA:
-                               lua_createtable (cd->L, 0, 7);
-                               rspamd_lua_table_set (cd->L, "ns", elt->content.soa.mname);
-                               rspamd_lua_table_set (cd->L, "contact", elt->content.soa.admin);
-                               lua_pushstring (cd->L, "serial");
-                               lua_pushinteger (cd->L, elt->content.soa.serial);
-                               lua_settable (cd->L, -3);
-                               lua_pushstring (cd->L, "refresh");
-                               lua_pushinteger (cd->L, elt->content.soa.refresh);
-                               lua_settable (cd->L, -3);
-                               lua_pushstring (cd->L, "retry");
-                               lua_pushinteger (cd->L, elt->content.soa.retry);
-                               lua_settable (cd->L, -3);
-                               lua_pushstring (cd->L, "expiry");
-                               lua_pushinteger (cd->L, elt->content.soa.expire);
-                               lua_settable (cd->L, -3);
+                               lua_createtable (L, 0, 7);
+                               rspamd_lua_table_set (L, "ns", elt->content.soa.mname);
+                               rspamd_lua_table_set (L, "contact", elt->content.soa.admin);
+                               lua_pushstring (L, "serial");
+                               lua_pushinteger (L, elt->content.soa.serial);
+                               lua_settable (L, -3);
+                               lua_pushstring (L, "refresh");
+                               lua_pushinteger (L, elt->content.soa.refresh);
+                               lua_settable (L, -3);
+                               lua_pushstring (L, "retry");
+                               lua_pushinteger (L, elt->content.soa.retry);
+                               lua_settable (L, -3);
+                               lua_pushstring (L, "expiry");
+                               lua_pushinteger (L, elt->content.soa.expire);
+                               lua_settable (L, -3);
                                /* Negative TTL */
-                               lua_pushstring (cd->L, "nx");
-                               lua_pushinteger (cd->L, elt->content.soa.minimum);
-                               lua_settable (cd->L, -3);
+                               lua_pushstring (L, "nx");
+                               lua_pushinteger (L, elt->content.soa.minimum);
+                               lua_settable (L, -3);
 
-                               lua_rawseti (cd->L, -2, ++i);
+                               lua_rawseti (L, -2, ++i);
                                break;
                        }
                }
-               lua_pushnil (cd->L);
+               lua_pushnil (L);
        }
 
        if (cd->cbref != -1) {
@@ -239,25 +245,27 @@ lua_dns_callback (struct rdns_reply *reply, gpointer arg)
                 * 6 - reply->authenticated
                 */
                if (reply->code != RDNS_RC_NOERROR) {
-                       lua_pushnil (cd->L);
-                       lua_pushstring (cd->L, rdns_strerror (reply->code));
+                       lua_pushnil (L);
+                       lua_pushstring (L, rdns_strerror (reply->code));
                }
                if (cd->user_str != NULL) {
-                       lua_pushstring (cd->L, cd->user_str);
+                       lua_pushstring (L, cd->user_str);
                }
                else {
-                       lua_pushnil (cd->L);
+                       lua_pushnil (L);
                }
 
-               lua_pushboolean (cd->L, reply->authenticated);
+               lua_pushboolean (L, reply->authenticated);
 
-               if (lua_pcall (cd->L, 6, 0, 0) != 0) {
-                       msg_info ("call to dns callback failed: %s", lua_tostring (cd->L, -1));
-                       lua_pop (cd->L, 1);
+               if (lua_pcall (L, 6, 0, 0) != 0) {
+                       msg_info ("call to dns callback failed: %s", lua_tostring (L, -1));
+                       lua_pop (L, 1);
                }
 
                /* Unref function */
-               luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->cbref);
+               luaL_unref (L, LUA_REGISTRYINDEX, cd->cbref);
+
+               lua_thread_pool_restore_callback (&cbs);
        } else {
                /*
                 * 1 - true | false in the case of error
@@ -269,18 +277,21 @@ lua_dns_callback (struct rdns_reply *reply, gpointer arg)
                 * }
                 */
                if (reply->code != RDNS_RC_NOERROR) {
-                       lua_pushboolean (cd->L, false);
-                       lua_pushstring (cd->L, rdns_strerror (reply->code));
+                       lua_pushboolean (L, false);
+                       lua_pushstring (L, rdns_strerror (reply->code));
                }
                else {
-                       lua_pushboolean (cd->L, reply->authenticated);
-                       lua_setfield (cd->L, -3, "authenticated");
+                       lua_pushboolean (L, reply->authenticated);
+                       lua_setfield (L, -3, "authenticated");
 
                        /* result 1 - not and error */
-                       lua_pushboolean (cd->L, true);
+                       lua_pushboolean (L, true);
                        /* push table into stack, result 2 - results itself */
-                       lua_pushvalue (cd->L, -3);
+                       lua_pushvalue (L, -3);
                }
+
+               g_assert (L == cd->thread->lua_state);
+
                lua_resume_thread (cd->task, cd->thread, 2);
        }
 
@@ -364,10 +375,12 @@ lua_dns_resolver_resolve_common (lua_State *L,
                pool = task->task_pool;
                session = task->s;
        }
+       else if (!session || !pool) {
+               return luaL_error (L, "invalid arguments: either 'task' or 'session'/'mempool' should be set");
+       }
 
        if (pool != NULL && to_resolve != NULL) {
                cbdata = rspamd_mempool_alloc0 (pool, sizeof (struct lua_dns_cbdata));
-               cbdata->L = L;
                cbdata->resolver = resolver;
                cbdata->cbref = cbref;
                cbdata->user_str = rspamd_mempool_strdup (pool, user_str);
index c292428c2c858f498730a8a55526d7a90039f058..64617be9b8e75dcb8a149d98a4122c15f64feb83 100644 (file)
@@ -14,6 +14,7 @@
  * limitations under the License.
  */
 #include "lua_common.h"
+#include "lua_thread_pool.h"
 #include "http_private.h"
 #include "unix-std.h"
 #include "zlib.h"
@@ -56,7 +57,6 @@ static const struct luaL_reg httplib_m[] = {
 #define RSPAMD_LUA_HTTP_FLAG_NOVERIFY (1 << 1)
 
 struct lua_http_cbdata {
-       lua_State *L;
        struct rspamd_http_connection *conn;
        struct rspamd_async_session *session;
        struct rspamd_async_watcher *w;
@@ -96,7 +96,7 @@ lua_http_fin (gpointer arg)
 {
        struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)arg;
 
-       luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+       luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref);
        if (cbd->conn) {
                /* Here we already have a connection, so we need to unref it */
                rspamd_http_connection_unref (cbd->conn);
@@ -152,13 +152,22 @@ lua_http_maybe_free (struct lua_http_cbdata *cbd)
 static void
 lua_http_push_error (struct lua_http_cbdata *cbd, const char *err)
 {
-       lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
-       lua_pushstring (cbd->L, err);
+       struct lua_callback_state lcbd;
+       lua_State *L;
+
+       lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd);
+
+       L = lcbd.L;
 
-       if (lua_pcall (cbd->L, 1, 0, 0) != 0) {
-               msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
-               lua_pop (cbd->L, 1);
+       lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
+       lua_pushstring (L, err);
+
+       if (lua_pcall (L, 1, 0, 0) != 0) {
+               msg_info ("callback call failed: %s", lua_tostring (L, -1));
+               lua_pop (L, 1);
        }
+
+       lua_thread_pool_restore_callback (&lcbd);
 }
 
 static void
@@ -179,51 +188,60 @@ lua_http_finish_handler (struct rspamd_http_connection *conn,
        const gchar *body;
        gsize body_len;
 
-       lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+       struct lua_callback_state lcbd;
+       lua_State *L;
+
+       lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd);
+
+       L = lcbd.L;
+
+       lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
        /* Error */
-       lua_pushnil (cbd->L);
+       lua_pushnil (L);
        /* Reply code */
-       lua_pushinteger (cbd->L, msg->code);
+       lua_pushinteger (L, msg->code);
        /* Body */
        body = rspamd_http_message_get_body (msg, &body_len);
 
        if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_TEXT) {
                struct rspamd_lua_text *t;
 
-               t = lua_newuserdata (cbd->L, sizeof (*t));
-               rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
+               t = lua_newuserdata (L, sizeof (*t));
+               rspamd_lua_setclass (L, "rspamd{text}", -1);
                t->start = body;
                t->len = body_len;
                t->flags = 0;
        }
        else {
                if (body_len > 0) {
-                       lua_pushlstring (cbd->L, body, body_len);
+                       lua_pushlstring (L, body, body_len);
                }
                else {
-                       lua_pushnil (cbd->L);
+                       lua_pushnil (L);
                }
        }
        /* Headers */
-       lua_newtable (cbd->L);
+       lua_newtable (L);
 
        HASH_ITER (hh, msg->headers, h, htmp) {
                /*
                 * Lowercase header name, as Lua cannot search in caseless matter
                 */
                rspamd_str_lc (h->combined->str, h->name.len);
-               lua_pushlstring (cbd->L, h->name.begin, h->name.len);
-               lua_pushlstring (cbd->L, h->value.begin, h->value.len);
-               lua_settable (cbd->L, -3);
+               lua_pushlstring (L, h->name.begin, h->name.len);
+               lua_pushlstring (L, h->value.begin, h->value.len);
+               lua_settable (L, -3);
        }
 
-       if (lua_pcall (cbd->L, 4, 0, 0) != 0) {
-               msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
-               lua_pop (cbd->L, 1);
+       if (lua_pcall (L, 4, 0, 0) != 0) {
+               msg_info ("callback call failed: %s", lua_tostring (L, -1));
+               lua_pop (L, 1);
        }
 
        lua_http_maybe_free (cbd);
 
+       lua_thread_pool_restore_callback (&lcbd);
+
        return 0;
 }
 
@@ -707,7 +725,6 @@ lua_http_request (lua_State *L)
        }
 
        cbd = g_malloc0 (sizeof (*cbd));
-       cbd->L = L;
        cbd->cbref = cbref;
        cbd->msg = msg;
        cbd->ev_base = ev_base;
index e5b97ebeb8e7e452558d959fefdcb1c4a53593a7..0fc9c43b7e4f3266bae57a2dea0fb3233b8bf886 100644 (file)
@@ -14,6 +14,7 @@
  * limitations under the License.
  */
 #include "lua_common.h"
+#include "lua_thread_pool.h"
 #include "utlist.h"
 
 #include "contrib/hiredis/hiredis.h"
@@ -92,7 +93,7 @@ struct lua_redis_specific_userdata;
  */
 struct lua_redis_userdata {
        redisAsyncContext *ctx;
-       lua_State *L;
+       struct rspamd_task *task;
        struct rspamd_async_session *s;
        struct event_base *ev_base;
        struct rspamd_config *cfg;
@@ -191,7 +192,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
                        lua_redis_free_args (cur->args, cur->arglens, cur->nargs);
 
                        if (cur->cbref != -1) {
-                               luaL_unref (ud->L, LUA_REGISTRYINDEX, cur->cbref);
+                               luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref);
                        }
 
                        g_free (cur);
@@ -244,21 +245,27 @@ lua_redis_push_error (const gchar *err,
        gboolean connected)
 {
        struct lua_redis_userdata *ud = sp_ud->c;
+       struct lua_callback_state cbs;
 
        if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
                if (sp_ud->cbref != -1) {
+
+                       lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);
+
                        /* Push error */
-                       lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+                       lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref);
 
                        /* String of error */
-                       lua_pushstring (ud->L, err);
+                       lua_pushstring (cbs.L, err);
                        /* Data is nil */
-                       lua_pushnil (ud->L);
+                       lua_pushnil (cbs.L);
 
-                       if (lua_pcall (ud->L, 2, 0, 0) != 0) {
-                               msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
-                               lua_pop (ud->L, 1);
+                       if (lua_pcall (cbs.L, 2, 0, 0) != 0) {
+                               msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1));
+                               lua_pop (cbs.L, 1);
                        }
+
+                       lua_thread_pool_restore_callback (&cbs);
                }
 
                sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
@@ -323,21 +330,25 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
                struct lua_redis_specific_userdata *sp_ud)
 {
        struct lua_redis_userdata *ud = sp_ud->c;
+       struct lua_callback_state cbs;
 
        if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
                if (sp_ud->cbref != -1) {
+                       lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);
+
                        /* Push error */
-                       lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+                       lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref);
                        /* Error is nil */
-                       lua_pushnil (ud->L);
+                       lua_pushnil (cbs.L);
                        /* Data */
-                       lua_redis_push_reply (ud->L, r, ctx->flags & LUA_REDIS_TEXTDATA);
+                       lua_redis_push_reply (cbs.L, r, ctx->flags & LUA_REDIS_TEXTDATA);
 
-                       if (lua_pcall (ud->L, 2, 0, 0) != 0) {
-                               msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
-                               lua_pop (ud->L, 1);
+                       if (lua_pcall (cbs.L, 2, 0, 0) != 0) {
+                               msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1));
+                               lua_pop (cbs.L, 1);
                        }
 
+                       lua_thread_pool_restore_callback (&cbs);
                }
 
                sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
@@ -689,7 +700,7 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
                        ud->cfg = cfg;
                        ud->pool = cfg->redis_pool;
                        ud->ev_base = ev_base;
-                       ud->L = L;
+                       ud->task = task;
 
                        ret = TRUE;
                }
index 797bdcc4eb919e32bfdb42bd0ae99e25a0a1b1a4..8d948c6d55ecd2eb5b134451a1b5a0a445ef208f 100644 (file)
@@ -14,6 +14,7 @@
  * limitations under the License.
  */
 #include "lua_common.h"
+#include "lua_thread_pool.h"
 #include "utlist.h"
 #include "unix-std.h"
 
@@ -186,7 +187,6 @@ struct lua_tcp_dtor {
 #define LUA_TCP_FLAG_FINISHED (1 << 4)
 
 struct lua_tcp_cbdata {
-       lua_State *L;
        struct rspamd_async_session *session;
        struct rspamd_async_event *async_ev;
        struct event_base *ev_base;
@@ -203,6 +203,7 @@ struct lua_tcp_cbdata {
        struct event ev;
        struct lua_tcp_dtor *dtors;
        ref_entry_t ref;
+       struct rspamd_task *task;
 };
 
 #define msg_debug_tcp(...)  rspamd_conditional_debug_fast (NULL, cbd->addr, \
@@ -249,7 +250,7 @@ lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd)
 
        if (hdl->type == LUA_WANT_READ) {
                if (hdl->h.r.cbref) {
-                       luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.r.cbref);
+                       luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, hdl->h.r.cbref);
                }
 
                if (hdl->h.r.stop_pattern) {
@@ -258,7 +259,7 @@ lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd)
        }
        else {
                if (hdl->h.w.cbref) {
-                       luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.w.cbref);
+                       luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, hdl->h.w.cbref);
                }
 
                if (hdl->h.w.iov) {
@@ -280,7 +281,7 @@ lua_tcp_fin (gpointer arg)
        msg_debug_tcp ("finishing TCP connection");
 
        if (cbd->connect_cb) {
-               luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb);
+               luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, cbd->connect_cb);
        }
 
        if (cbd->fd != -1) {
@@ -338,6 +339,11 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
        struct lua_tcp_cbdata **pcbd;
        struct lua_tcp_handler *hdl;
        gint cbref, top;
+       struct lua_callback_state cbs;
+       lua_State *L;
+
+       lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
+       L = cbs.L;
 
        va_start (ap, err);
 
@@ -356,27 +362,27 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
                }
 
                if (cbref != -1) {
-                       top = lua_gettop (cbd->L);
-                       lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref);
+                       top = lua_gettop (L);
+                       lua_rawgeti (L, LUA_REGISTRYINDEX, cbref);
 
                        /* Error message */
                        va_copy (ap_copy, ap);
-                       lua_pushvfstring (cbd->L, err, ap_copy);
+                       lua_pushvfstring (L, err, ap_copy);
                        va_end (ap_copy);
 
                        /* Body */
-                       lua_pushnil (cbd->L);
+                       lua_pushnil (L);
                        /* Connection */
-                       pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+                       pcbd = lua_newuserdata (L, sizeof (*pcbd));
                        *pcbd = cbd;
-                       rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+                       rspamd_lua_setclass (L, "rspamd{tcp}", -1);
                        REF_RETAIN (cbd);
 
-                       if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
-                               msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+                       if (lua_pcall (L, 3, 0, 0) != 0) {
+                               msg_info ("callback call failed: %s", lua_tostring (L, -1));
                        }
 
-                       lua_settop (cbd->L, top);
+                       lua_settop (L, top);
 
                        REF_RELEASE (cbd);
                }
@@ -391,6 +397,8 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
        }
 
        va_end (ap);
+
+       lua_thread_pool_restore_callback (&cbs);
 }
 
 static void
@@ -400,6 +408,11 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
        struct lua_tcp_cbdata **pcbd;
        struct lua_tcp_handler *hdl;
        gint cbref, arg_cnt, top;
+       struct lua_callback_state cbs;
+       lua_State *L;
+
+       lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
+       L = cbs.L;
 
        hdl = g_queue_peek_head (cbd->handlers);
 
@@ -413,15 +426,15 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
        }
 
        if (cbref != -1) {
-               top = lua_gettop (cbd->L);
-               lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref);
+               top = lua_gettop (L);
+               lua_rawgeti (L, LUA_REGISTRYINDEX, cbref);
                /* Error */
-               lua_pushnil (cbd->L);
+               lua_pushnil (L);
                /* Body */
 
                if (hdl->type == LUA_WANT_READ) {
-                       t = lua_newuserdata (cbd->L, sizeof (*t));
-                       rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
+                       t = lua_newuserdata (L, sizeof (*t));
+                       rspamd_lua_setclass (L, "rspamd{text}", -1);
                        t->start = (const gchar *)str;
                        t->len = len;
                        t->flags = 0;
@@ -431,19 +444,21 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
                        arg_cnt = 2;
                }
                /* Connection */
-               pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+               pcbd = lua_newuserdata (L, sizeof (*pcbd));
                *pcbd = cbd;
-               rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+               rspamd_lua_setclass (L, "rspamd{tcp}", -1);
 
                REF_RETAIN (cbd);
 
-               if (lua_pcall (cbd->L, arg_cnt, 0, 0) != 0) {
-                       msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+               if (lua_pcall (L, arg_cnt, 0, 0) != 0) {
+                       msg_info ("callback call failed: %s", lua_tostring (L, -1));
                }
 
-               lua_settop (cbd->L, top);
+               lua_settop (L, top);
                REF_RELEASE (cbd);
        }
+
+       lua_thread_pool_restore_callback (&cbs);
 }
 
 static void
@@ -667,6 +682,8 @@ lua_tcp_handler (int fd, short what, gpointer ud)
        gssize r;
        gint so_error = 0;
        socklen_t so_len = sizeof (so_error);
+       struct lua_callback_state cbs;
+       lua_State *L;
 
        REF_RETAIN (cbd);
 
@@ -693,22 +710,25 @@ lua_tcp_handler (int fd, short what, gpointer ud)
                        else {
                                cbd->flags |= LUA_TCP_FLAG_CONNECTED;
 
+                               lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
+                               L = cbs.L;
+
                                if (cbd->connect_cb != -1) {
                                        struct lua_tcp_cbdata **pcbd;
                                        gint top;
 
-                                       top = lua_gettop (cbd->L);
-                                       lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb);
-                                       pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+                                       top = lua_gettop (L);
+                                       lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->connect_cb);
+                                       pcbd = lua_newuserdata (L, sizeof (*pcbd));
                                        *pcbd = cbd;
                                        REF_RETAIN (cbd);
-                                       rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+                                       rspamd_lua_setclass (L, "rspamd{tcp}", -1);
 
-                                       if (lua_pcall (cbd->L, 1, 0, 0) != 0) {
-                                               msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+                                       if (lua_pcall (L, 1, 0, 0) != 0) {
+                                               msg_info ("callback call failed: %s", lua_tostring (L, -1));
                                        }
 
-                                       lua_settop (cbd->L, top);
+                                       lua_settop (L, top);
 
                                        REF_RELEASE (cbd);
                                }
@@ -1174,7 +1194,7 @@ lua_tcp_request (lua_State *L)
                return 1;
        }
 
-       cbd->L = L;
+       cbd->task = task;
        h = rspamd_random_uint64_fast ();
        rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h);
        cbd->handlers = g_queue_new ();