Browse Source

[Minor] use callback helpers to avoid conflicts between coroutine- and callback-based code

tags/1.8.0
Mikhail Galanin 5 years ago
parent
commit
15c7adc671
4 changed files with 187 additions and 126 deletions
  1. 71
    58
      src/lua/lua_dns.c
  2. 39
    22
      src/lua/lua_http.c
  3. 26
    15
      src/lua/lua_redis.c
  4. 51
    31
      src/lua/lua_tcp.c

+ 71
- 58
src/lua/lua_dns.c View File

@@ -78,7 +78,6 @@ lua_check_dns_resolver (lua_State * L)
}

struct lua_dns_cbdata {
lua_State *L;
struct thread_entry *thread;
struct rspamd_task *task;
struct rspamd_dns_resolver *resolver;
@@ -142,15 +141,22 @@ lua_dns_callback (struct rdns_reply *reply, gpointer arg)
struct rspamd_dns_resolver **presolver;
struct rdns_reply_entry *elt;
rspamd_inet_addr_t *addr;
lua_State *L;
struct lua_callback_state cbs;

if (cd->cbref != -1) {
lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->cbref);
lua_thread_pool_prepare_callback (cd->resolver->cfg->lua_thread_pool, &cbs);
L = cbs.L;

lua_rawgeti (L, LUA_REGISTRYINDEX, cd->cbref);

presolver = lua_newuserdata (cd->L, sizeof (gpointer));
rspamd_lua_setclass (cd->L, "rspamd{resolver}", -1);
presolver = lua_newuserdata (L, sizeof (gpointer));
rspamd_lua_setclass (L, "rspamd{resolver}", -1);

*presolver = cd->resolver;
lua_pushstring (cd->L, cd->to_resolve);
lua_pushstring (L, cd->to_resolve);
} else {
L = cd->thread->lua_state;
}

/*
@@ -161,72 +167,72 @@ lua_dns_callback (struct rdns_reply *reply, gpointer arg)
naddrs ++;
}

lua_createtable (cd->L, naddrs, 0);
lua_createtable (L, naddrs, 0);

LL_FOREACH (reply->entries, elt)
{
switch (elt->type) {
case RDNS_REQUEST_A:
addr = rspamd_inet_address_new (AF_INET, &elt->content.a.addr);
rspamd_lua_ip_push (cd->L, addr);
rspamd_lua_ip_push (L, addr);
rspamd_inet_address_free (addr);
lua_rawseti (cd->L, -2, ++i);
lua_rawseti (L, -2, ++i);
break;
case RDNS_REQUEST_AAAA:
addr = rspamd_inet_address_new (AF_INET6, &elt->content.aaa.addr);
rspamd_lua_ip_push (cd->L, addr);
rspamd_lua_ip_push (L, addr);
rspamd_inet_address_free (addr);
lua_rawseti (cd->L, -2, ++i);
lua_rawseti (L, -2, ++i);
break;
case RDNS_REQUEST_NS:
lua_pushstring (cd->L, elt->content.ns.name);
lua_rawseti (cd->L, -2, ++i);
lua_pushstring (L, elt->content.ns.name);
lua_rawseti (L, -2, ++i);
break;
case RDNS_REQUEST_PTR:
lua_pushstring (cd->L, elt->content.ptr.name);
lua_rawseti (cd->L, -2, ++i);
lua_pushstring (L, elt->content.ptr.name);
lua_rawseti (L, -2, ++i);
break;
case RDNS_REQUEST_TXT:
case RDNS_REQUEST_SPF:
lua_pushstring (cd->L, elt->content.txt.data);
lua_rawseti (cd->L, -2, ++i);
lua_pushstring (L, elt->content.txt.data);
lua_rawseti (L, -2, ++i);
break;
case RDNS_REQUEST_MX:
/* mx['name'], mx['priority'] */
lua_createtable (cd->L, 0, 2);
rspamd_lua_table_set (cd->L, "name", elt->content.mx.name);
lua_pushstring (cd->L, "priority");
lua_pushinteger (cd->L, elt->content.mx.priority);
lua_settable (cd->L, -3);
lua_createtable (L, 0, 2);
rspamd_lua_table_set (L, "name", elt->content.mx.name);
lua_pushstring (L, "priority");
lua_pushinteger (L, elt->content.mx.priority);
lua_settable (L, -3);

lua_rawseti (cd->L, -2, ++i);
lua_rawseti (L, -2, ++i);
break;
case RDNS_REQUEST_SOA:
lua_createtable (cd->L, 0, 7);
rspamd_lua_table_set (cd->L, "ns", elt->content.soa.mname);
rspamd_lua_table_set (cd->L, "contact", elt->content.soa.admin);
lua_pushstring (cd->L, "serial");
lua_pushinteger (cd->L, elt->content.soa.serial);
lua_settable (cd->L, -3);
lua_pushstring (cd->L, "refresh");
lua_pushinteger (cd->L, elt->content.soa.refresh);
lua_settable (cd->L, -3);
lua_pushstring (cd->L, "retry");
lua_pushinteger (cd->L, elt->content.soa.retry);
lua_settable (cd->L, -3);
lua_pushstring (cd->L, "expiry");
lua_pushinteger (cd->L, elt->content.soa.expire);
lua_settable (cd->L, -3);
lua_createtable (L, 0, 7);
rspamd_lua_table_set (L, "ns", elt->content.soa.mname);
rspamd_lua_table_set (L, "contact", elt->content.soa.admin);
lua_pushstring (L, "serial");
lua_pushinteger (L, elt->content.soa.serial);
lua_settable (L, -3);
lua_pushstring (L, "refresh");
lua_pushinteger (L, elt->content.soa.refresh);
lua_settable (L, -3);
lua_pushstring (L, "retry");
lua_pushinteger (L, elt->content.soa.retry);
lua_settable (L, -3);
lua_pushstring (L, "expiry");
lua_pushinteger (L, elt->content.soa.expire);
lua_settable (L, -3);
/* Negative TTL */
lua_pushstring (cd->L, "nx");
lua_pushinteger (cd->L, elt->content.soa.minimum);
lua_settable (cd->L, -3);
lua_pushstring (L, "nx");
lua_pushinteger (L, elt->content.soa.minimum);
lua_settable (L, -3);

lua_rawseti (cd->L, -2, ++i);
lua_rawseti (L, -2, ++i);
break;
}
}
lua_pushnil (cd->L);
lua_pushnil (L);
}

if (cd->cbref != -1) {
@@ -239,25 +245,27 @@ lua_dns_callback (struct rdns_reply *reply, gpointer arg)
* 6 - reply->authenticated
*/
if (reply->code != RDNS_RC_NOERROR) {
lua_pushnil (cd->L);
lua_pushstring (cd->L, rdns_strerror (reply->code));
lua_pushnil (L);
lua_pushstring (L, rdns_strerror (reply->code));
}
if (cd->user_str != NULL) {
lua_pushstring (cd->L, cd->user_str);
lua_pushstring (L, cd->user_str);
}
else {
lua_pushnil (cd->L);
lua_pushnil (L);
}

lua_pushboolean (cd->L, reply->authenticated);
lua_pushboolean (L, reply->authenticated);

if (lua_pcall (cd->L, 6, 0, 0) != 0) {
msg_info ("call to dns callback failed: %s", lua_tostring (cd->L, -1));
lua_pop (cd->L, 1);
if (lua_pcall (L, 6, 0, 0) != 0) {
msg_info ("call to dns callback failed: %s", lua_tostring (L, -1));
lua_pop (L, 1);
}

/* Unref function */
luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->cbref);
luaL_unref (L, LUA_REGISTRYINDEX, cd->cbref);

lua_thread_pool_restore_callback (&cbs);
} else {
/*
* 1 - true | false in the case of error
@@ -269,18 +277,21 @@ lua_dns_callback (struct rdns_reply *reply, gpointer arg)
* }
*/
if (reply->code != RDNS_RC_NOERROR) {
lua_pushboolean (cd->L, false);
lua_pushstring (cd->L, rdns_strerror (reply->code));
lua_pushboolean (L, false);
lua_pushstring (L, rdns_strerror (reply->code));
}
else {
lua_pushboolean (cd->L, reply->authenticated);
lua_setfield (cd->L, -3, "authenticated");
lua_pushboolean (L, reply->authenticated);
lua_setfield (L, -3, "authenticated");

/* result 1 - not and error */
lua_pushboolean (cd->L, true);
lua_pushboolean (L, true);
/* push table into stack, result 2 - results itself */
lua_pushvalue (cd->L, -3);
lua_pushvalue (L, -3);
}

g_assert (L == cd->thread->lua_state);

lua_resume_thread (cd->task, cd->thread, 2);
}

@@ -364,10 +375,12 @@ lua_dns_resolver_resolve_common (lua_State *L,
pool = task->task_pool;
session = task->s;
}
else if (!session || !pool) {
return luaL_error (L, "invalid arguments: either 'task' or 'session'/'mempool' should be set");
}

if (pool != NULL && to_resolve != NULL) {
cbdata = rspamd_mempool_alloc0 (pool, sizeof (struct lua_dns_cbdata));
cbdata->L = L;
cbdata->resolver = resolver;
cbdata->cbref = cbref;
cbdata->user_str = rspamd_mempool_strdup (pool, user_str);

+ 39
- 22
src/lua/lua_http.c View File

@@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "lua_common.h"
#include "lua_thread_pool.h"
#include "http_private.h"
#include "unix-std.h"
#include "zlib.h"
@@ -56,7 +57,6 @@ static const struct luaL_reg httplib_m[] = {
#define RSPAMD_LUA_HTTP_FLAG_NOVERIFY (1 << 1)

struct lua_http_cbdata {
lua_State *L;
struct rspamd_http_connection *conn;
struct rspamd_async_session *session;
struct rspamd_async_watcher *w;
@@ -96,7 +96,7 @@ lua_http_fin (gpointer arg)
{
struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)arg;

luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref);
if (cbd->conn) {
/* Here we already have a connection, so we need to unref it */
rspamd_http_connection_unref (cbd->conn);
@@ -152,13 +152,22 @@ lua_http_maybe_free (struct lua_http_cbdata *cbd)
static void
lua_http_push_error (struct lua_http_cbdata *cbd, const char *err)
{
lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
lua_pushstring (cbd->L, err);
struct lua_callback_state lcbd;
lua_State *L;

lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd);

L = lcbd.L;

if (lua_pcall (cbd->L, 1, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
lua_pop (cbd->L, 1);
lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
lua_pushstring (L, err);

if (lua_pcall (L, 1, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
lua_pop (L, 1);
}

lua_thread_pool_restore_callback (&lcbd);
}

static void
@@ -179,51 +188,60 @@ lua_http_finish_handler (struct rspamd_http_connection *conn,
const gchar *body;
gsize body_len;

lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
struct lua_callback_state lcbd;
lua_State *L;

lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd);

L = lcbd.L;

lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
/* Error */
lua_pushnil (cbd->L);
lua_pushnil (L);
/* Reply code */
lua_pushinteger (cbd->L, msg->code);
lua_pushinteger (L, msg->code);
/* Body */
body = rspamd_http_message_get_body (msg, &body_len);

if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_TEXT) {
struct rspamd_lua_text *t;

t = lua_newuserdata (cbd->L, sizeof (*t));
rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
t = lua_newuserdata (L, sizeof (*t));
rspamd_lua_setclass (L, "rspamd{text}", -1);
t->start = body;
t->len = body_len;
t->flags = 0;
}
else {
if (body_len > 0) {
lua_pushlstring (cbd->L, body, body_len);
lua_pushlstring (L, body, body_len);
}
else {
lua_pushnil (cbd->L);
lua_pushnil (L);
}
}
/* Headers */
lua_newtable (cbd->L);
lua_newtable (L);

HASH_ITER (hh, msg->headers, h, htmp) {
/*
* Lowercase header name, as Lua cannot search in caseless matter
*/
rspamd_str_lc (h->combined->str, h->name.len);
lua_pushlstring (cbd->L, h->name.begin, h->name.len);
lua_pushlstring (cbd->L, h->value.begin, h->value.len);
lua_settable (cbd->L, -3);
lua_pushlstring (L, h->name.begin, h->name.len);
lua_pushlstring (L, h->value.begin, h->value.len);
lua_settable (L, -3);
}

if (lua_pcall (cbd->L, 4, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
lua_pop (cbd->L, 1);
if (lua_pcall (L, 4, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
lua_pop (L, 1);
}

lua_http_maybe_free (cbd);

lua_thread_pool_restore_callback (&lcbd);

return 0;
}

@@ -707,7 +725,6 @@ lua_http_request (lua_State *L)
}

cbd = g_malloc0 (sizeof (*cbd));
cbd->L = L;
cbd->cbref = cbref;
cbd->msg = msg;
cbd->ev_base = ev_base;

+ 26
- 15
src/lua/lua_redis.c View File

@@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "lua_common.h"
#include "lua_thread_pool.h"
#include "utlist.h"

#include "contrib/hiredis/hiredis.h"
@@ -92,7 +93,7 @@ struct lua_redis_specific_userdata;
*/
struct lua_redis_userdata {
redisAsyncContext *ctx;
lua_State *L;
struct rspamd_task *task;
struct rspamd_async_session *s;
struct event_base *ev_base;
struct rspamd_config *cfg;
@@ -191,7 +192,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
lua_redis_free_args (cur->args, cur->arglens, cur->nargs);

if (cur->cbref != -1) {
luaL_unref (ud->L, LUA_REGISTRYINDEX, cur->cbref);
luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref);
}

g_free (cur);
@@ -244,21 +245,27 @@ lua_redis_push_error (const gchar *err,
gboolean connected)
{
struct lua_redis_userdata *ud = sp_ud->c;
struct lua_callback_state cbs;

if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
if (sp_ud->cbref != -1) {

lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);

/* Push error */
lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref);

/* String of error */
lua_pushstring (ud->L, err);
lua_pushstring (cbs.L, err);
/* Data is nil */
lua_pushnil (ud->L);
lua_pushnil (cbs.L);

if (lua_pcall (ud->L, 2, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
lua_pop (ud->L, 1);
if (lua_pcall (cbs.L, 2, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1));
lua_pop (cbs.L, 1);
}

lua_thread_pool_restore_callback (&cbs);
}

sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
@@ -323,21 +330,25 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
struct lua_redis_specific_userdata *sp_ud)
{
struct lua_redis_userdata *ud = sp_ud->c;
struct lua_callback_state cbs;

if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
if (sp_ud->cbref != -1) {
lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);

/* Push error */
lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref);
/* Error is nil */
lua_pushnil (ud->L);
lua_pushnil (cbs.L);
/* Data */
lua_redis_push_reply (ud->L, r, ctx->flags & LUA_REDIS_TEXTDATA);
lua_redis_push_reply (cbs.L, r, ctx->flags & LUA_REDIS_TEXTDATA);

if (lua_pcall (ud->L, 2, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
lua_pop (ud->L, 1);
if (lua_pcall (cbs.L, 2, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1));
lua_pop (cbs.L, 1);
}

lua_thread_pool_restore_callback (&cbs);
}

sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
@@ -689,7 +700,7 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
ud->cfg = cfg;
ud->pool = cfg->redis_pool;
ud->ev_base = ev_base;
ud->L = L;
ud->task = task;

ret = TRUE;
}

+ 51
- 31
src/lua/lua_tcp.c View File

@@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "lua_common.h"
#include "lua_thread_pool.h"
#include "utlist.h"
#include "unix-std.h"

@@ -186,7 +187,6 @@ struct lua_tcp_dtor {
#define LUA_TCP_FLAG_FINISHED (1 << 4)

struct lua_tcp_cbdata {
lua_State *L;
struct rspamd_async_session *session;
struct rspamd_async_event *async_ev;
struct event_base *ev_base;
@@ -203,6 +203,7 @@ struct lua_tcp_cbdata {
struct event ev;
struct lua_tcp_dtor *dtors;
ref_entry_t ref;
struct rspamd_task *task;
};

#define msg_debug_tcp(...) rspamd_conditional_debug_fast (NULL, cbd->addr, \
@@ -249,7 +250,7 @@ lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd)

if (hdl->type == LUA_WANT_READ) {
if (hdl->h.r.cbref) {
luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.r.cbref);
luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, hdl->h.r.cbref);
}

if (hdl->h.r.stop_pattern) {
@@ -258,7 +259,7 @@ lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd)
}
else {
if (hdl->h.w.cbref) {
luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.w.cbref);
luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, hdl->h.w.cbref);
}

if (hdl->h.w.iov) {
@@ -280,7 +281,7 @@ lua_tcp_fin (gpointer arg)
msg_debug_tcp ("finishing TCP connection");

if (cbd->connect_cb) {
luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb);
luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, cbd->connect_cb);
}

if (cbd->fd != -1) {
@@ -338,6 +339,11 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
struct lua_tcp_cbdata **pcbd;
struct lua_tcp_handler *hdl;
gint cbref, top;
struct lua_callback_state cbs;
lua_State *L;

lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
L = cbs.L;

va_start (ap, err);

@@ -356,27 +362,27 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
}

if (cbref != -1) {
top = lua_gettop (cbd->L);
lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref);
top = lua_gettop (L);
lua_rawgeti (L, LUA_REGISTRYINDEX, cbref);

/* Error message */
va_copy (ap_copy, ap);
lua_pushvfstring (cbd->L, err, ap_copy);
lua_pushvfstring (L, err, ap_copy);
va_end (ap_copy);

/* Body */
lua_pushnil (cbd->L);
lua_pushnil (L);
/* Connection */
pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
pcbd = lua_newuserdata (L, sizeof (*pcbd));
*pcbd = cbd;
rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
rspamd_lua_setclass (L, "rspamd{tcp}", -1);
REF_RETAIN (cbd);

if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
if (lua_pcall (L, 3, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}

lua_settop (cbd->L, top);
lua_settop (L, top);

REF_RELEASE (cbd);
}
@@ -391,6 +397,8 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, gboolean is_fatal,
}

va_end (ap);

lua_thread_pool_restore_callback (&cbs);
}

static void
@@ -400,6 +408,11 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
struct lua_tcp_cbdata **pcbd;
struct lua_tcp_handler *hdl;
gint cbref, arg_cnt, top;
struct lua_callback_state cbs;
lua_State *L;

lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
L = cbs.L;

hdl = g_queue_peek_head (cbd->handlers);

@@ -413,15 +426,15 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
}

if (cbref != -1) {
top = lua_gettop (cbd->L);
lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref);
top = lua_gettop (L);
lua_rawgeti (L, LUA_REGISTRYINDEX, cbref);
/* Error */
lua_pushnil (cbd->L);
lua_pushnil (L);
/* Body */

if (hdl->type == LUA_WANT_READ) {
t = lua_newuserdata (cbd->L, sizeof (*t));
rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
t = lua_newuserdata (L, sizeof (*t));
rspamd_lua_setclass (L, "rspamd{text}", -1);
t->start = (const gchar *)str;
t->len = len;
t->flags = 0;
@@ -431,19 +444,21 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
arg_cnt = 2;
}
/* Connection */
pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
pcbd = lua_newuserdata (L, sizeof (*pcbd));
*pcbd = cbd;
rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
rspamd_lua_setclass (L, "rspamd{tcp}", -1);

REF_RETAIN (cbd);

if (lua_pcall (cbd->L, arg_cnt, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
if (lua_pcall (L, arg_cnt, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}

lua_settop (cbd->L, top);
lua_settop (L, top);
REF_RELEASE (cbd);
}

lua_thread_pool_restore_callback (&cbs);
}

static void
@@ -667,6 +682,8 @@ lua_tcp_handler (int fd, short what, gpointer ud)
gssize r;
gint so_error = 0;
socklen_t so_len = sizeof (so_error);
struct lua_callback_state cbs;
lua_State *L;

REF_RETAIN (cbd);

@@ -693,22 +710,25 @@ lua_tcp_handler (int fd, short what, gpointer ud)
else {
cbd->flags |= LUA_TCP_FLAG_CONNECTED;

lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
L = cbs.L;

if (cbd->connect_cb != -1) {
struct lua_tcp_cbdata **pcbd;
gint top;

top = lua_gettop (cbd->L);
lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb);
pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
top = lua_gettop (L);
lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->connect_cb);
pcbd = lua_newuserdata (L, sizeof (*pcbd));
*pcbd = cbd;
REF_RETAIN (cbd);
rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
rspamd_lua_setclass (L, "rspamd{tcp}", -1);

if (lua_pcall (cbd->L, 1, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
if (lua_pcall (L, 1, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}

lua_settop (cbd->L, top);
lua_settop (L, top);

REF_RELEASE (cbd);
}
@@ -1174,7 +1194,7 @@ lua_tcp_request (lua_State *L)
return 1;
}

cbd->L = L;
cbd->task = task;
h = rspamd_random_uint64_fast ();
rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h);
cbd->handlers = g_queue_new ();

Loading…
Cancel
Save