}
struct lua_dns_cbdata {
- lua_State *L;
struct thread_entry *thread;
struct rspamd_task *task;
struct rspamd_dns_resolver *resolver;
struct rspamd_dns_resolver **presolver;
struct rdns_reply_entry *elt;
rspamd_inet_addr_t *addr;
+ lua_State *L;
+ struct lua_callback_state cbs;
if (cd->cbref != -1) {
- lua_rawgeti (cd->L, LUA_REGISTRYINDEX, cd->cbref);
+ lua_thread_pool_prepare_callback (cd->resolver->cfg->lua_thread_pool, &cbs);
+ L = cbs.L;
+
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cd->cbref);
- presolver = lua_newuserdata (cd->L, sizeof (gpointer));
- rspamd_lua_setclass (cd->L, "rspamd{resolver}", -1);
+ presolver = lua_newuserdata (L, sizeof (gpointer));
+ rspamd_lua_setclass (L, "rspamd{resolver}", -1);
*presolver = cd->resolver;
- lua_pushstring (cd->L, cd->to_resolve);
+ lua_pushstring (L, cd->to_resolve);
+ } else {
+ L = cd->thread->lua_state;
}
/*
naddrs ++;
}
- lua_createtable (cd->L, naddrs, 0);
+ lua_createtable (L, naddrs, 0);
LL_FOREACH (reply->entries, elt)
{
switch (elt->type) {
case RDNS_REQUEST_A:
addr = rspamd_inet_address_new (AF_INET, &elt->content.a.addr);
- rspamd_lua_ip_push (cd->L, addr);
+ rspamd_lua_ip_push (L, addr);
rspamd_inet_address_free (addr);
- lua_rawseti (cd->L, -2, ++i);
+ lua_rawseti (L, -2, ++i);
break;
case RDNS_REQUEST_AAAA:
addr = rspamd_inet_address_new (AF_INET6, &elt->content.aaa.addr);
- rspamd_lua_ip_push (cd->L, addr);
+ rspamd_lua_ip_push (L, addr);
rspamd_inet_address_free (addr);
- lua_rawseti (cd->L, -2, ++i);
+ lua_rawseti (L, -2, ++i);
break;
case RDNS_REQUEST_NS:
- lua_pushstring (cd->L, elt->content.ns.name);
- lua_rawseti (cd->L, -2, ++i);
+ lua_pushstring (L, elt->content.ns.name);
+ lua_rawseti (L, -2, ++i);
break;
case RDNS_REQUEST_PTR:
- lua_pushstring (cd->L, elt->content.ptr.name);
- lua_rawseti (cd->L, -2, ++i);
+ lua_pushstring (L, elt->content.ptr.name);
+ lua_rawseti (L, -2, ++i);
break;
case RDNS_REQUEST_TXT:
case RDNS_REQUEST_SPF:
- lua_pushstring (cd->L, elt->content.txt.data);
- lua_rawseti (cd->L, -2, ++i);
+ lua_pushstring (L, elt->content.txt.data);
+ lua_rawseti (L, -2, ++i);
break;
case RDNS_REQUEST_MX:
/* mx['name'], mx['priority'] */
- lua_createtable (cd->L, 0, 2);
- rspamd_lua_table_set (cd->L, "name", elt->content.mx.name);
- lua_pushstring (cd->L, "priority");
- lua_pushinteger (cd->L, elt->content.mx.priority);
- lua_settable (cd->L, -3);
+ lua_createtable (L, 0, 2);
+ rspamd_lua_table_set (L, "name", elt->content.mx.name);
+ lua_pushstring (L, "priority");
+ lua_pushinteger (L, elt->content.mx.priority);
+ lua_settable (L, -3);
- lua_rawseti (cd->L, -2, ++i);
+ lua_rawseti (L, -2, ++i);
break;
case RDNS_REQUEST_SOA:
- lua_createtable (cd->L, 0, 7);
- rspamd_lua_table_set (cd->L, "ns", elt->content.soa.mname);
- rspamd_lua_table_set (cd->L, "contact", elt->content.soa.admin);
- lua_pushstring (cd->L, "serial");
- lua_pushinteger (cd->L, elt->content.soa.serial);
- lua_settable (cd->L, -3);
- lua_pushstring (cd->L, "refresh");
- lua_pushinteger (cd->L, elt->content.soa.refresh);
- lua_settable (cd->L, -3);
- lua_pushstring (cd->L, "retry");
- lua_pushinteger (cd->L, elt->content.soa.retry);
- lua_settable (cd->L, -3);
- lua_pushstring (cd->L, "expiry");
- lua_pushinteger (cd->L, elt->content.soa.expire);
- lua_settable (cd->L, -3);
+ lua_createtable (L, 0, 7);
+ rspamd_lua_table_set (L, "ns", elt->content.soa.mname);
+ rspamd_lua_table_set (L, "contact", elt->content.soa.admin);
+ lua_pushstring (L, "serial");
+ lua_pushinteger (L, elt->content.soa.serial);
+ lua_settable (L, -3);
+ lua_pushstring (L, "refresh");
+ lua_pushinteger (L, elt->content.soa.refresh);
+ lua_settable (L, -3);
+ lua_pushstring (L, "retry");
+ lua_pushinteger (L, elt->content.soa.retry);
+ lua_settable (L, -3);
+ lua_pushstring (L, "expiry");
+ lua_pushinteger (L, elt->content.soa.expire);
+ lua_settable (L, -3);
/* Negative TTL */
- lua_pushstring (cd->L, "nx");
- lua_pushinteger (cd->L, elt->content.soa.minimum);
- lua_settable (cd->L, -3);
+ lua_pushstring (L, "nx");
+ lua_pushinteger (L, elt->content.soa.minimum);
+ lua_settable (L, -3);
- lua_rawseti (cd->L, -2, ++i);
+ lua_rawseti (L, -2, ++i);
break;
}
}
- lua_pushnil (cd->L);
+ lua_pushnil (L);
}
if (cd->cbref != -1) {
* 6 - reply->authenticated
*/
if (reply->code != RDNS_RC_NOERROR) {
- lua_pushnil (cd->L);
- lua_pushstring (cd->L, rdns_strerror (reply->code));
+ lua_pushnil (L);
+ lua_pushstring (L, rdns_strerror (reply->code));
}
if (cd->user_str != NULL) {
- lua_pushstring (cd->L, cd->user_str);
+ lua_pushstring (L, cd->user_str);
}
else {
- lua_pushnil (cd->L);
+ lua_pushnil (L);
}
- lua_pushboolean (cd->L, reply->authenticated);
+ lua_pushboolean (L, reply->authenticated);
- if (lua_pcall (cd->L, 6, 0, 0) != 0) {
- msg_info ("call to dns callback failed: %s", lua_tostring (cd->L, -1));
- lua_pop (cd->L, 1);
+ if (lua_pcall (L, 6, 0, 0) != 0) {
+ msg_info ("call to dns callback failed: %s", lua_tostring (L, -1));
+ lua_pop (L, 1);
}
/* Unref function */
- luaL_unref (cd->L, LUA_REGISTRYINDEX, cd->cbref);
+ luaL_unref (L, LUA_REGISTRYINDEX, cd->cbref);
+
+ lua_thread_pool_restore_callback (&cbs);
} else {
/*
* 1 - true | false in the case of error
* }
*/
if (reply->code != RDNS_RC_NOERROR) {
- lua_pushboolean (cd->L, false);
- lua_pushstring (cd->L, rdns_strerror (reply->code));
+ lua_pushboolean (L, false);
+ lua_pushstring (L, rdns_strerror (reply->code));
}
else {
- lua_pushboolean (cd->L, reply->authenticated);
- lua_setfield (cd->L, -3, "authenticated");
+ lua_pushboolean (L, reply->authenticated);
+ lua_setfield (L, -3, "authenticated");
/* result 1 - not and error */
- lua_pushboolean (cd->L, true);
+ lua_pushboolean (L, true);
/* push table into stack, result 2 - results itself */
- lua_pushvalue (cd->L, -3);
+ lua_pushvalue (L, -3);
}
+
+ g_assert (L == cd->thread->lua_state);
+
lua_resume_thread (cd->task, cd->thread, 2);
}
pool = task->task_pool;
session = task->s;
}
+ else if (!session || !pool) {
+ return luaL_error (L, "invalid arguments: either 'task' or 'session'/'mempool' should be set");
+ }
if (pool != NULL && to_resolve != NULL) {
cbdata = rspamd_mempool_alloc0 (pool, sizeof (struct lua_dns_cbdata));
- cbdata->L = L;
cbdata->resolver = resolver;
cbdata->cbref = cbref;
cbdata->user_str = rspamd_mempool_strdup (pool, user_str);
* limitations under the License.
*/
#include "lua_common.h"
+#include "lua_thread_pool.h"
#include "http_private.h"
#include "unix-std.h"
#include "zlib.h"
#define RSPAMD_LUA_HTTP_FLAG_NOVERIFY (1 << 1)
struct lua_http_cbdata {
- lua_State *L;
struct rspamd_http_connection *conn;
struct rspamd_async_session *session;
struct rspamd_async_watcher *w;
{
struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)arg;
- luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+ luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref);
if (cbd->conn) {
/* Here we already have a connection, so we need to unref it */
rspamd_http_connection_unref (cbd->conn);
static void
lua_http_push_error (struct lua_http_cbdata *cbd, const char *err)
{
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
- lua_pushstring (cbd->L, err);
+ struct lua_callback_state lcbd;
+ lua_State *L;
+
+ lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd);
+
+ L = lcbd.L;
- if (lua_pcall (cbd->L, 1, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
- lua_pop (cbd->L, 1);
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
+ lua_pushstring (L, err);
+
+ if (lua_pcall (L, 1, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
+ lua_pop (L, 1);
}
+
+ lua_thread_pool_restore_callback (&lcbd);
}
static void
const gchar *body;
gsize body_len;
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+ struct lua_callback_state lcbd;
+ lua_State *L;
+
+ lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd);
+
+ L = lcbd.L;
+
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
/* Error */
- lua_pushnil (cbd->L);
+ lua_pushnil (L);
/* Reply code */
- lua_pushinteger (cbd->L, msg->code);
+ lua_pushinteger (L, msg->code);
/* Body */
body = rspamd_http_message_get_body (msg, &body_len);
if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_TEXT) {
struct rspamd_lua_text *t;
- t = lua_newuserdata (cbd->L, sizeof (*t));
- rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
+ t = lua_newuserdata (L, sizeof (*t));
+ rspamd_lua_setclass (L, "rspamd{text}", -1);
t->start = body;
t->len = body_len;
t->flags = 0;
}
else {
if (body_len > 0) {
- lua_pushlstring (cbd->L, body, body_len);
+ lua_pushlstring (L, body, body_len);
}
else {
- lua_pushnil (cbd->L);
+ lua_pushnil (L);
}
}
/* Headers */
- lua_newtable (cbd->L);
+ lua_newtable (L);
HASH_ITER (hh, msg->headers, h, htmp) {
/*
* Lowercase header name, as Lua cannot search in caseless matter
*/
rspamd_str_lc (h->combined->str, h->name.len);
- lua_pushlstring (cbd->L, h->name.begin, h->name.len);
- lua_pushlstring (cbd->L, h->value.begin, h->value.len);
- lua_settable (cbd->L, -3);
+ lua_pushlstring (L, h->name.begin, h->name.len);
+ lua_pushlstring (L, h->value.begin, h->value.len);
+ lua_settable (L, -3);
}
- if (lua_pcall (cbd->L, 4, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
- lua_pop (cbd->L, 1);
+ if (lua_pcall (L, 4, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
+ lua_pop (L, 1);
}
lua_http_maybe_free (cbd);
+ lua_thread_pool_restore_callback (&lcbd);
+
return 0;
}
}
cbd = g_malloc0 (sizeof (*cbd));
- cbd->L = L;
cbd->cbref = cbref;
cbd->msg = msg;
cbd->ev_base = ev_base;
* limitations under the License.
*/
#include "lua_common.h"
+#include "lua_thread_pool.h"
#include "utlist.h"
#include "contrib/hiredis/hiredis.h"
*/
struct lua_redis_userdata {
redisAsyncContext *ctx;
- lua_State *L;
+ struct rspamd_task *task;
struct rspamd_async_session *s;
struct event_base *ev_base;
struct rspamd_config *cfg;
lua_redis_free_args (cur->args, cur->arglens, cur->nargs);
if (cur->cbref != -1) {
- luaL_unref (ud->L, LUA_REGISTRYINDEX, cur->cbref);
+ luaL_unref (ud->cfg->lua_state, LUA_REGISTRYINDEX, cur->cbref);
}
g_free (cur);
gboolean connected)
{
struct lua_redis_userdata *ud = sp_ud->c;
+ struct lua_callback_state cbs;
if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
if (sp_ud->cbref != -1) {
+
+ lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);
+
/* Push error */
- lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+ lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref);
/* String of error */
- lua_pushstring (ud->L, err);
+ lua_pushstring (cbs.L, err);
/* Data is nil */
- lua_pushnil (ud->L);
+ lua_pushnil (cbs.L);
- if (lua_pcall (ud->L, 2, 0, 0) != 0) {
- msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
- lua_pop (ud->L, 1);
+ if (lua_pcall (cbs.L, 2, 0, 0) != 0) {
+ msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1));
+ lua_pop (cbs.L, 1);
}
+
+ lua_thread_pool_restore_callback (&cbs);
}
sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
struct lua_redis_specific_userdata *sp_ud)
{
struct lua_redis_userdata *ud = sp_ud->c;
+ struct lua_callback_state cbs;
if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
if (sp_ud->cbref != -1) {
+ lua_thread_pool_prepare_callback (ud->cfg->lua_thread_pool, &cbs);
+
/* Push error */
- lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
+ lua_rawgeti (cbs.L, LUA_REGISTRYINDEX, sp_ud->cbref);
/* Error is nil */
- lua_pushnil (ud->L);
+ lua_pushnil (cbs.L);
/* Data */
- lua_redis_push_reply (ud->L, r, ctx->flags & LUA_REDIS_TEXTDATA);
+ lua_redis_push_reply (cbs.L, r, ctx->flags & LUA_REDIS_TEXTDATA);
- if (lua_pcall (ud->L, 2, 0, 0) != 0) {
- msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
- lua_pop (ud->L, 1);
+ if (lua_pcall (cbs.L, 2, 0, 0) != 0) {
+ msg_info ("call to callback failed: %s", lua_tostring (cbs.L, -1));
+ lua_pop (cbs.L, 1);
}
+ lua_thread_pool_restore_callback (&cbs);
}
sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
ud->cfg = cfg;
ud->pool = cfg->redis_pool;
ud->ev_base = ev_base;
- ud->L = L;
+ ud->task = task;
ret = TRUE;
}
* limitations under the License.
*/
#include "lua_common.h"
+#include "lua_thread_pool.h"
#include "utlist.h"
#include "unix-std.h"
#define LUA_TCP_FLAG_FINISHED (1 << 4)
struct lua_tcp_cbdata {
- lua_State *L;
struct rspamd_async_session *session;
struct rspamd_async_event *async_ev;
struct event_base *ev_base;
struct event ev;
struct lua_tcp_dtor *dtors;
ref_entry_t ref;
+ struct rspamd_task *task;
};
#define msg_debug_tcp(...) rspamd_conditional_debug_fast (NULL, cbd->addr, \
if (hdl->type == LUA_WANT_READ) {
if (hdl->h.r.cbref) {
- luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.r.cbref);
+ luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, hdl->h.r.cbref);
}
if (hdl->h.r.stop_pattern) {
}
else {
if (hdl->h.w.cbref) {
- luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.w.cbref);
+ luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, hdl->h.w.cbref);
}
if (hdl->h.w.iov) {
msg_debug_tcp ("finishing TCP connection");
if (cbd->connect_cb) {
- luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb);
+ luaL_unref (cbd->task->cfg->lua_state, LUA_REGISTRYINDEX, cbd->connect_cb);
}
if (cbd->fd != -1) {
struct lua_tcp_cbdata **pcbd;
struct lua_tcp_handler *hdl;
gint cbref, top;
+ struct lua_callback_state cbs;
+ lua_State *L;
+
+ lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
+ L = cbs.L;
va_start (ap, err);
}
if (cbref != -1) {
- top = lua_gettop (cbd->L);
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref);
+ top = lua_gettop (L);
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbref);
/* Error message */
va_copy (ap_copy, ap);
- lua_pushvfstring (cbd->L, err, ap_copy);
+ lua_pushvfstring (L, err, ap_copy);
va_end (ap_copy);
/* Body */
- lua_pushnil (cbd->L);
+ lua_pushnil (L);
/* Connection */
- pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+ pcbd = lua_newuserdata (L, sizeof (*pcbd));
*pcbd = cbd;
- rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+ rspamd_lua_setclass (L, "rspamd{tcp}", -1);
REF_RETAIN (cbd);
- if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+ if (lua_pcall (L, 3, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
- lua_settop (cbd->L, top);
+ lua_settop (L, top);
REF_RELEASE (cbd);
}
}
va_end (ap);
+
+ lua_thread_pool_restore_callback (&cbs);
}
static void
struct lua_tcp_cbdata **pcbd;
struct lua_tcp_handler *hdl;
gint cbref, arg_cnt, top;
+ struct lua_callback_state cbs;
+ lua_State *L;
+
+ lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
+ L = cbs.L;
hdl = g_queue_peek_head (cbd->handlers);
}
if (cbref != -1) {
- top = lua_gettop (cbd->L);
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref);
+ top = lua_gettop (L);
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbref);
/* Error */
- lua_pushnil (cbd->L);
+ lua_pushnil (L);
/* Body */
if (hdl->type == LUA_WANT_READ) {
- t = lua_newuserdata (cbd->L, sizeof (*t));
- rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
+ t = lua_newuserdata (L, sizeof (*t));
+ rspamd_lua_setclass (L, "rspamd{text}", -1);
t->start = (const gchar *)str;
t->len = len;
t->flags = 0;
arg_cnt = 2;
}
/* Connection */
- pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+ pcbd = lua_newuserdata (L, sizeof (*pcbd));
*pcbd = cbd;
- rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+ rspamd_lua_setclass (L, "rspamd{tcp}", -1);
REF_RETAIN (cbd);
- if (lua_pcall (cbd->L, arg_cnt, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+ if (lua_pcall (L, arg_cnt, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
- lua_settop (cbd->L, top);
+ lua_settop (L, top);
REF_RELEASE (cbd);
}
+
+ lua_thread_pool_restore_callback (&cbs);
}
static void
gssize r;
gint so_error = 0;
socklen_t so_len = sizeof (so_error);
+ struct lua_callback_state cbs;
+ lua_State *L;
REF_RETAIN (cbd);
else {
cbd->flags |= LUA_TCP_FLAG_CONNECTED;
+ lua_thread_pool_prepare_callback (cbd->task->cfg->lua_thread_pool, &cbs);
+ L = cbs.L;
+
if (cbd->connect_cb != -1) {
struct lua_tcp_cbdata **pcbd;
gint top;
- top = lua_gettop (cbd->L);
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb);
- pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+ top = lua_gettop (L);
+ lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->connect_cb);
+ pcbd = lua_newuserdata (L, sizeof (*pcbd));
*pcbd = cbd;
REF_RETAIN (cbd);
- rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+ rspamd_lua_setclass (L, "rspamd{tcp}", -1);
- if (lua_pcall (cbd->L, 1, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+ if (lua_pcall (L, 1, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (L, -1));
}
- lua_settop (cbd->L, top);
+ lua_settop (L, top);
REF_RELEASE (cbd);
}
return 1;
}
- cbd->L = L;
+ cbd->task = task;
h = rspamd_random_uint64_fast ();
rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h);
cbd->handlers = g_queue_new ();