aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-12-29 15:50:29 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-12-29 18:58:16 +0000
commit6c28de22d121ad73faca2e8cc15f049c510dd068 (patch)
treece0d662157cbe51fbe074efa96d5b074211c64a6 /src
parent91085e2704de34f13ed6c3a4be4e53ea357f6ff4 (diff)
downloadrspamd-6c28de22d121ad73faca2e8cc15f049c510dd068.tar.gz
rspamd-6c28de22d121ad73faca2e8cc15f049c510dd068.zip
[Rework] Rework lua_tcp to allow TCP dialog
- Now, lua_tcp has a chain of read and write events that are processed in order - The old API wasn't touched, however, new style API will be possible - Partial lua_tcp might be broken, so I need to revisit all plugins that use lua_tcp Issue: #1224
Diffstat (limited to 'src')
-rw-r--r--src/lua/lua_tcp.c593
1 files changed, 431 insertions, 162 deletions
diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c
index 7be257b51..b3276b709 100644
--- a/src/lua/lua_tcp.c
+++ b/src/lua/lua_tcp.c
@@ -19,7 +19,6 @@
#include "ref.h"
#include "unix-std.h"
-static void lua_tcp_handler (int fd, short what, gpointer ud);
/***
* @module rspamd_tcp
* Rspamd TCP module represents generic TCP asynchronous client available from LUA code.
@@ -77,32 +76,64 @@ static const struct luaL_reg tcp_libm[] = {
{NULL, NULL}
};
+struct lua_tcp_read_handler {
+ gchar *stop_pattern;
+ gint cbref;
+};
+
+struct lua_tcp_write_handler {
+ struct iovec *iov;
+ guint iovlen;
+ guint pos;
+ guint total;
+ gint cbref;
+};
+
+enum lua_tcp_handler_type {
+ LUA_WANT_WRITE = 0,
+ LUA_WANT_READ,
+};
+
+struct lua_tcp_handler {
+ union {
+ struct lua_tcp_read_handler r;
+ struct lua_tcp_write_handler w;
+ } h;
+ enum lua_tcp_handler_type type;
+};
+
+struct lua_tcp_dtor {
+ rspamd_mempool_destruct_t dtor;
+ void *data;
+ struct lua_tcp_dtor *next;
+};
+
+#define LUA_TCP_FLAG_PARTIAL (1 << 0)
+#define LUA_TCP_FLAG_SHUTDOWN (1 << 2)
+#define LUA_TCP_FLAG_CONNECTED (1 << 3)
+
struct lua_tcp_cbdata {
lua_State *L;
struct rspamd_async_session *session;
struct event_base *ev_base;
struct timeval tv;
rspamd_inet_addr_t *addr;
- rspamd_mempool_t *pool;
- struct iovec *iov;
GByteArray *in;
- gchar *stop_pattern;
+ GQueue *handlers;
+ gint fd;
+ gint connect_cb;
+ guint port;
+ guint flags;
struct rspamd_async_watcher *w;
struct event ev;
+ struct lua_tcp_dtor *dtors;
ref_entry_t ref;
- gint fd;
- gint cbref;
- gint connect_cb;
- guint iovlen;
- guint pos;
- guint total;
- guint16 port;
- gboolean partial;
- gboolean do_shutdown;
- gboolean do_read;
- gboolean connected;
};
+static void lua_tcp_handler (int fd, short what, gpointer ud);
+static void lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd,
+ gboolean can_read, gboolean can_write);
+
static const int default_tcp_timeout = 5000;
static struct rspamd_dns_resolver *
@@ -117,12 +148,51 @@ lua_tcp_global_resolver (struct event_base *ev_base)
return global_resolver;
}
+static gboolean
+lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd)
+{
+ struct lua_tcp_handler *hdl;
+
+ hdl = g_queue_pop_head (cbd->handlers);
+
+ if (hdl == NULL) {
+ /* We are done */
+ return FALSE;
+ }
+
+ if (hdl->type == LUA_WANT_READ) {
+ if (hdl->h.r.cbref) {
+ luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.r.cbref);
+ }
+
+ if (hdl->h.r.stop_pattern) {
+ g_free (hdl->h.r.stop_pattern);
+ }
+ }
+ else {
+ if (hdl->h.w.cbref) {
+ luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.w.cbref);
+ }
+
+ if (hdl->h.w.iov) {
+ g_free (hdl->h.w.iov);
+ }
+ }
+
+ g_slice_free1 (sizeof (*hdl), hdl);
+
+ return TRUE;
+}
+
static void
lua_tcp_fin (gpointer arg)
{
struct lua_tcp_cbdata *cbd = (struct lua_tcp_cbdata *)arg;
+ struct lua_tcp_dtor *dtor, *dttmp;
- luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+ if (cbd->connect_cb) {
+ luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb);
+ }
if (cbd->fd != -1) {
event_del (&cbd->ev);
@@ -133,6 +203,14 @@ lua_tcp_fin (gpointer arg)
rspamd_inet_address_destroy (cbd->addr);
}
+ while (lua_tcp_shift_handler (cbd)) {}
+
+ LL_FOREACH_SAFE (cbd->dtors, dtor, dttmp) {
+ dtor->dtor (dtor->data);
+ g_slice_free1 (sizeof (*dtor), dtor);
+ }
+
+ g_byte_array_unref (cbd->in);
g_slice_free1 (sizeof (struct lua_tcp_cbdata), cbd);
}
@@ -161,25 +239,40 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, const char *err, ...)
{
va_list ap;
struct lua_tcp_cbdata **pcbd;
+ struct lua_tcp_handler *hdl;
+ gint cbref;
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+ hdl = g_queue_peek_head (cbd->handlers);
- /* Error message */
- va_start (ap, err);
- lua_pushvfstring (cbd->L, err, ap);
- va_end (ap);
+ g_assert (hdl != NULL);
- /* Body */
- lua_pushnil (cbd->L);
- /* Connection */
- pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
- *pcbd = cbd;
- rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
- REF_RETAIN (cbd);
+ if (hdl->type == LUA_WANT_READ) {
+ cbref = hdl->h.r.cbref;
+ }
+ else {
+ cbref = hdl->h.w.cbref;
+ }
- if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
- lua_pop (cbd->L, 1);
+ if (cbref != -1) {
+ lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref);
+
+ /* Error message */
+ va_start (ap, err);
+ lua_pushvfstring (cbd->L, err, ap);
+ va_end (ap);
+
+ /* Body */
+ lua_pushnil (cbd->L);
+ /* Connection */
+ pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+ *pcbd = cbd;
+ rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+ REF_RETAIN (cbd);
+
+ if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+ lua_pop (cbd->L, 1);
+ }
}
REF_RELEASE (cbd);
@@ -190,32 +283,64 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
{
struct rspamd_lua_text *t;
struct lua_tcp_cbdata **pcbd;
+ struct lua_tcp_handler *hdl;
+ gint cbref;
- lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
- /* Error */
- lua_pushnil (cbd->L);
- /* Body */
- t = lua_newuserdata (cbd->L, sizeof (*t));
- rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
- t->start = (const gchar *)str;
- t->len = len;
- t->flags = 0;
- /* Connection */
- pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
- *pcbd = cbd;
- rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+ hdl = g_queue_peek_head (cbd->handlers);
- REF_RETAIN (cbd);
+ g_assert (hdl != NULL);
- if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
- msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
- lua_pop (cbd->L, 1);
+ if (hdl->type == LUA_WANT_READ) {
+ cbref = hdl->h.r.cbref;
+ }
+ else {
+ cbref = hdl->h.w.cbref;
+ }
+
+ if (cbref != -1) {
+ lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref);
+ /* Error */
+ lua_pushnil (cbd->L);
+ /* Body */
+
+ if (hdl->type == LUA_WANT_READ) {
+ t = lua_newuserdata (cbd->L, sizeof (*t));
+ rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
+ t->start = (const gchar *)str;
+ t->len = len;
+ t->flags = 0;
+ }
+ /* Connection */
+ pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+ *pcbd = cbd;
+ rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+
+ REF_RETAIN (cbd);
+
+ if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+ lua_pop (cbd->L, 1);
+ }
}
REF_RELEASE (cbd);
}
static void
+lua_tcp_plan_read (struct lua_tcp_cbdata *cbd)
+{
+ event_del (&cbd->ev);
+#ifdef EV_CLOSED
+ event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST|EV_CLOSED,
+ lua_tcp_handler, cbd);
+#else
+ event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST, lua_tcp_handler, cbd);
+#endif
+ event_base_set (cbd->ev_base, &cbd->ev);
+ event_add (&cbd->ev, &cbd->tv);
+}
+
+static void
lua_tcp_write_helper (struct lua_tcp_cbdata *cbd)
{
struct iovec *start;
@@ -224,20 +349,27 @@ lua_tcp_write_helper (struct lua_tcp_cbdata *cbd)
gsize remain;
gssize r;
struct iovec *cur_iov;
+ struct lua_tcp_handler *hdl;
+ struct lua_tcp_write_handler *wh;
struct msghdr msg;
- if (cbd->pos == cbd->total) {
+ hdl = g_queue_peek_head (cbd->handlers);
+
+ g_assert (hdl != NULL && hdl->type == LUA_WANT_WRITE);
+ wh = &hdl->h.w;
+
+ if (wh->pos == wh->total) {
goto call_finish_handler;
}
- start = &cbd->iov[0];
- niov = cbd->iovlen;
- remain = cbd->pos;
+ start = &wh->iov[0];
+ niov = wh->iovlen;
+ remain = wh->pos;
/* We know that niov is small enough for that */
cur_iov = alloca (niov * sizeof (struct iovec));
- memcpy (cur_iov, cbd->iov, niov * sizeof (struct iovec));
+ memcpy (cur_iov, wh->iov, niov * sizeof (struct iovec));
- for (i = 0; i < cbd->iovlen && remain > 0; i++) {
+ for (i = 0; i < wh->iovlen && remain > 0; i++) {
/* Find out the first iov required */
start = &cur_iov[i];
if (start->iov_len <= remain) {
@@ -264,15 +396,16 @@ lua_tcp_write_helper (struct lua_tcp_cbdata *cbd)
if (r == -1) {
lua_tcp_push_error (cbd, "IO write error while trying to write %d "
"bytes: %s", (gint)remain, strerror (errno));
- REF_RELEASE (cbd);
+ lua_tcp_shift_handler (cbd);
+ lua_tcp_plan_handler_event (cbd, TRUE, FALSE);
return;
}
else {
- cbd->pos += r;
+ wh->pos += r;
}
- if (cbd->pos >= cbd->total) {
+ if (wh->pos >= wh->total) {
goto call_finish_handler;
}
else {
@@ -284,93 +417,132 @@ lua_tcp_write_helper (struct lua_tcp_cbdata *cbd)
call_finish_handler:
- if (cbd->do_shutdown) {
+ if ((cbd->flags & LUA_TCP_FLAG_SHUTDOWN)) {
/* Half close the connection */
shutdown (cbd->fd, SHUT_WR);
+ cbd->flags &= ~LUA_TCP_FLAG_SHUTDOWN;
}
- if (cbd->do_read) {
- event_del (&cbd->ev);
-#ifdef EV_CLOSED
- event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST|EV_CLOSED,
- lua_tcp_handler, cbd);
-#else
- event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST, lua_tcp_handler, cbd);
-#endif
- event_base_set (cbd->ev_base, &cbd->ev);
- event_add (&cbd->ev, &cbd->tv);
- }
- else {
- lua_tcp_push_data (cbd, cbd->in->data, cbd->in->len);
- REF_RELEASE (cbd);
- }
+ lua_tcp_push_data (cbd, NULL, 0);
+ lua_tcp_shift_handler (cbd);
+ lua_tcp_plan_handler_event (cbd, TRUE, TRUE);
}
-static void
-lua_tcp_handler (int fd, short what, gpointer ud)
+static gboolean
+lua_tcp_process_read_handler (struct lua_tcp_cbdata *cbd,
+ struct lua_tcp_read_handler *rh)
{
- struct lua_tcp_cbdata *cbd = ud;
- gchar inbuf[8192];
- gssize r;
guint slen;
- gint so_error = 0;
- socklen_t so_len = sizeof (so_error);
+ goffset pos;
- REF_RETAIN (cbd);
+ if (rh->stop_pattern) {
+ slen = strlen (rh->stop_pattern);
- if (what == EV_READ) {
- g_assert (cbd->partial || cbd->in != NULL);
+ if (cbd->in->len >= slen) {
+ if ((pos = rspamd_substring_search (cbd->in->data, cbd->in->len,
+ rh->stop_pattern, slen)) != -1) {
+ lua_tcp_push_data (cbd, cbd->in->data, pos);
- r = read (cbd->fd, inbuf, sizeof (inbuf));
+ if (pos + slen < cbd->in->len) {
+ /* We have a leftover */
+ memmove (cbd->in->data, cbd->in->data + pos + slen,
+ cbd->in->len - (pos + slen));
+ lua_tcp_shift_handler (cbd);
+ }
+ else {
+ lua_tcp_shift_handler (cbd);
- if (r <= 0) {
- /*
- * We actually can have connection reset here, so we just check if
- * the cumulative buffer is not empty
- */
- if (cbd->partial) {
- if (r < 0) {
- lua_tcp_push_error (cbd, "IO read error while trying to read %d "
- "bytes: %s", (gint)sizeof (inbuf),
- strerror (errno));
+ return TRUE;
}
}
else {
- if (cbd->in->len > 0) {
- lua_tcp_push_data (cbd, cbd->in->data, cbd->in->len);
- }
- else {
- lua_tcp_push_error (cbd, "IO read error while trying to write %d "
- "bytes: %s", (gint)sizeof (inbuf),
- strerror (errno));
- }
+ /* Plan new read */
+ lua_tcp_plan_read (cbd);
}
+ }
+ }
- REF_RELEASE (cbd);
+ return FALSE;
+}
+
+static void
+lua_tcp_process_read (struct lua_tcp_cbdata *cbd,
+ guchar *in, gssize r)
+{
+ struct lua_tcp_handler *hdl;
+ struct lua_tcp_read_handler *rh;
+
+ hdl = g_queue_peek_head (cbd->handlers);
+
+ g_assert (hdl != NULL && hdl->type == LUA_WANT_READ);
+ rh = &hdl->h.r;
+
+ if (r > 0) {
+ if (cbd->flags & LUA_TCP_FLAG_PARTIAL) {
+ lua_tcp_push_data (cbd, in, r);
+ /* Plan next event */
+ lua_tcp_shift_handler (cbd);
+ lua_tcp_shift_handler (cbd);
}
else {
- if (cbd->partial) {
- lua_tcp_push_data (cbd, inbuf, r);
+ g_byte_array_append (cbd->in, in, r);
+
+ if (!lua_tcp_process_read_handler (cbd, rh)) {
+ /* Plan more read */
+ lua_tcp_plan_read (cbd);
}
else {
- g_byte_array_append (cbd->in, inbuf, r);
+ /* Go towards the next handler */
+ lua_tcp_plan_handler_event (cbd, TRUE, TRUE);
+ }
+ }
+ }
+ else if (r == 0) {
+ /* EOF */
+ if (cbd->in->len > 0) {
+ /* We have some data to process */
+ lua_tcp_process_read_handler (cbd, rh);
+ }
+ else {
+ lua_tcp_push_error (cbd, "IO read error: connection terminated");
+ }
- if (cbd->stop_pattern) {
- slen = strlen (cbd->stop_pattern);
+ lua_tcp_plan_handler_event (cbd, FALSE, TRUE);
+ }
+ else {
+ /* An error occurred */
+ if (errno == EAGAIN || errno == EINTR) {
+ /* Restart call */
+ lua_tcp_plan_read (cbd);
- if (cbd->in->len >= slen) {
- if (memcmp (cbd->stop_pattern, cbd->in->data +
- (cbd->in->len - slen), slen) == 0) {
- lua_tcp_push_data (cbd, cbd->in->data, cbd->in->len);
- REF_RELEASE (cbd);
- }
- }
- }
- }
+ return;
}
+
+ /* Fatal error */
+ lua_tcp_push_error (cbd, "IO read error while trying to read data: %s",
+ strerror (errno));
+
+ REF_RELEASE (cbd);
+ }
+}
+
+static void
+lua_tcp_handler (int fd, short what, gpointer ud)
+{
+ struct lua_tcp_cbdata *cbd = ud;
+ guchar inbuf[8192];
+ gssize r;
+ gint so_error = 0;
+ socklen_t so_len = sizeof (so_error);
+
+ REF_RETAIN (cbd);
+
+ if (what == EV_READ) {
+ r = read (cbd->fd, inbuf, sizeof (inbuf));
+ lua_tcp_process_read (cbd, inbuf, r);
}
else if (what == EV_WRITE) {
- if (!cbd->connected) {
+ if (!(cbd->flags & LUA_TCP_FLAG_CONNECTED)) {
if (getsockopt (fd, SOL_SOCKET, SO_ERROR, &so_error, &so_len) == -1) {
lua_tcp_push_error (cbd, "Cannot get socket error: %s",
strerror (errno));
@@ -384,7 +556,7 @@ lua_tcp_handler (int fd, short what, gpointer ud)
goto out;
}
else {
- cbd->connected = TRUE;
+ cbd->flags |= LUA_TCP_FLAG_CONNECTED;
if (cbd->connect_cb != -1) {
struct lua_tcp_cbdata **pcbd;
@@ -422,6 +594,69 @@ out:
REF_RELEASE (cbd);
}
+static void
+lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read,
+ gboolean can_write)
+{
+ struct lua_tcp_handler *hdl;
+
+ hdl = g_queue_peek_head (cbd->handlers);
+
+ if (hdl == NULL) {
+ /* We are finished with a connection */
+ REF_RELEASE (cbd);
+ }
+ else {
+ if (hdl->type == LUA_WANT_READ) {
+ /* We need to check if we have some leftover in the buffer */
+ if (cbd->in->len > 0) {
+ if (lua_tcp_process_read_handler (cbd, &hdl->h.r)) {
+ /* We can go to the next handler */
+ lua_tcp_shift_handler (cbd);
+ lua_tcp_plan_handler_event (cbd, can_read, can_write);
+ }
+ }
+ else {
+ if (can_read) {
+ /* We need to plan a new event */
+ event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd);
+ event_base_set (cbd->ev_base, &cbd->ev);
+ event_add (&cbd->ev, &cbd->tv);
+ }
+ else {
+ /* Cannot read more */
+ lua_tcp_push_error (cbd, "EOF, cannot read more data");
+ lua_tcp_shift_handler (cbd);
+ lua_tcp_plan_handler_event (cbd, can_read, can_write);
+ }
+ }
+ }
+ else {
+ /*
+ * We need to plan write event if there is something in the
+ * write request
+ */
+ if (hdl->h.w.pos < hdl->h.w.total) {
+ if (can_write) {
+ event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd);
+ event_base_set (cbd->ev_base, &cbd->ev);
+ event_add (&cbd->ev, &cbd->tv);
+ }
+ else {
+ /* Cannot read more */
+ lua_tcp_push_error (cbd, "EOF, cannot read more data");
+ lua_tcp_shift_handler (cbd);
+ lua_tcp_plan_handler_event (cbd, can_read, can_write);
+ }
+ }
+ else {
+ /* We shouldn't have empty write handlers */
+ g_assert_not_reached ();
+ }
+ }
+ }
+}
+
static gboolean
lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
{
@@ -434,11 +669,9 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
msg_info ("cannot connect to %s", rspamd_inet_address_to_string (cbd->addr));
return FALSE;
}
- cbd->fd = fd;
- event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd);
- event_base_set (cbd->ev_base, &cbd->ev);
- event_add (&cbd->ev, &cbd->tv);
+ cbd->fd = fd;
+ lua_tcp_plan_handler_event (cbd, TRUE, TRUE);
return TRUE;
}
@@ -476,12 +709,13 @@ lua_tcp_dns_handler (struct rdns_reply *reply, gpointer ud)
}
static gboolean
-lua_tcp_arg_toiovec (lua_State *L, gint pos, rspamd_mempool_t *pool,
+lua_tcp_arg_toiovec (lua_State *L, gint pos, struct lua_tcp_cbdata *cbd,
struct iovec *vec)
{
struct rspamd_lua_text *t;
gsize len;
const gchar *str;
+ struct lua_tcp_dtor *dtor;
if (lua_type (L, pos) == LUA_TUSERDATA) {
t = lua_check_text (L, pos);
@@ -493,7 +727,10 @@ lua_tcp_arg_toiovec (lua_State *L, gint pos, rspamd_mempool_t *pool,
if (t->flags & RSPAMD_TEXT_FLAG_OWN) {
/* Steal ownership */
t->flags = 0;
- rspamd_mempool_add_destructor (pool, g_free, (void *)t->start);
+ dtor = g_slice_alloc0 (sizeof (*dtor));
+ dtor->dtor = g_free;
+ dtor->data = (void *)t->start;
+ LL_PREPEND (cbd->dtors, dtor);
}
}
else {
@@ -503,7 +740,10 @@ lua_tcp_arg_toiovec (lua_State *L, gint pos, rspamd_mempool_t *pool,
}
else if (lua_type (L, pos) == LUA_TSTRING) {
str = luaL_checklstring (L, pos, &len);
- vec->iov_base = rspamd_mempool_alloc (pool, len);
+ vec->iov_base = g_malloc (len);
+ dtor = g_slice_alloc0 (sizeof (*dtor));
+ dtor->dtor = g_free;
+ dtor->data = (void *)vec->iov_base;
memcpy (vec->iov_base, str, len);
vec->iov_len = len;
}
@@ -551,7 +791,6 @@ lua_tcp_request (lua_State *L)
struct rspamd_dns_resolver *resolver;
struct rspamd_async_session *session;
struct rspamd_task *task = NULL;
- rspamd_mempool_t *pool;
struct iovec *iov = NULL;
guint niov = 0, total_out;
gdouble timeout = default_tcp_timeout;
@@ -585,6 +824,8 @@ lua_tcp_request (lua_State *L)
}
cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+ cbd = g_slice_alloc0 (sizeof (*cbd));
+
lua_pushstring (L, "task");
lua_gettable (L, -2);
if (lua_type (L, -1) == LUA_TUSERDATA) {
@@ -592,7 +833,6 @@ lua_tcp_request (lua_State *L)
ev_base = task->ev_base;
resolver = task->resolver;
session = task->s;
- pool = task->task_pool;
}
lua_pop (L, 1);
@@ -607,16 +847,6 @@ lua_tcp_request (lua_State *L)
}
lua_pop (L, 1);
- lua_pushstring (L, "pool");
- lua_gettable (L, -2);
- if (rspamd_lua_check_udata (L, -1, "rspamd{mempool}")) {
- pool = *(rspamd_mempool_t **)lua_touserdata (L, -1);
- }
- else {
- pool = NULL;
- }
- lua_pop (L, 1);
-
lua_pushstring (L, "resolver");
lua_gettable (L, -2);
if (rspamd_lua_check_udata_maybe (L, -1, "rspamd{resolver}")) {
@@ -638,10 +868,6 @@ lua_tcp_request (lua_State *L)
lua_pop (L, 1);
}
- if (pool == NULL) {
- return luaL_error (L, "tcp request has no memory pool associated");
- }
-
lua_pushstring (L, "timeout");
lua_gettable (L, -2);
if (lua_type (L, -1) == LUA_TNUMBER) {
@@ -652,7 +878,7 @@ lua_tcp_request (lua_State *L)
lua_pushstring (L, "stop_pattern");
lua_gettable (L, -2);
if (lua_type (L, -1) == LUA_TSTRING) {
- stop_pattern = rspamd_mempool_strdup (pool, lua_tostring (L, -1));
+ stop_pattern = g_strdup (lua_tostring (L, -1));
}
lua_pop (L, 1);
@@ -693,13 +919,16 @@ lua_tcp_request (lua_State *L)
tp = lua_type (L, -1);
if (tp == LUA_TSTRING || tp == LUA_TUSERDATA) {
- iov = rspamd_mempool_alloc (pool, sizeof (*iov));
+ iov = g_malloc (sizeof (*iov));
niov = 1;
- if (!lua_tcp_arg_toiovec (L, -1, pool, iov)) {
+ if (!lua_tcp_arg_toiovec (L, -1, cbd, iov)) {
lua_pop (L, 1);
msg_err ("tcp request has bad data argument");
lua_pushboolean (L, FALSE);
+ g_free (iov);
+ g_slice_free1 (sizeof (*cbd), cbd);
+
return 1;
}
@@ -713,15 +942,18 @@ lua_tcp_request (lua_State *L)
lua_pop (L, 1);
}
- iov = rspamd_mempool_alloc (pool, sizeof (*iov) * niov);
+ iov = g_malloc (sizeof (*iov) * niov);
lua_pushnil (L);
niov = 0;
while (lua_next (L, -2) != 0) {
- if (!lua_tcp_arg_toiovec (L, -1, pool, &iov[niov])) {
+ if (!lua_tcp_arg_toiovec (L, -1, cbd, &iov[niov])) {
lua_pop (L, 2);
msg_err ("tcp request has bad data argument at pos %d", niov);
lua_pushboolean (L, FALSE);
+ g_free (iov);
+ g_slice_free1 (sizeof (*cbd), cbd);
+
return 1;
}
@@ -741,27 +973,64 @@ lua_tcp_request (lua_State *L)
return 1;
}
- cbd = g_slice_alloc0 (sizeof (*cbd));
cbd->L = L;
- cbd->cbref = cbref;
+
+ if (total_out > 0) {
+ struct lua_tcp_handler *wh;
+
+ wh = g_slice_alloc0 (sizeof (*wh));
+ wh->type = LUA_WANT_WRITE;
+ wh->h.w.iov = iov;
+ wh->h.w.iovlen = niov;
+ wh->h.w.total = total_out;
+ wh->h.w.pos = 0;
+ /* Cannot set write handler here */
+ wh->h.w.cbref = -1;
+
+ if (cbref != -1 && !do_read) {
+ /* We have write only callback */
+ wh->h.w.cbref = cbref;
+ }
+ else {
+ /* We have simple client callback */
+ wh->h.w.cbref = -1;
+ }
+
+ g_queue_push_tail (cbd->handlers, wh);
+ }
+
cbd->ev_base = ev_base;
msec_to_tv (timeout, &cbd->tv);
cbd->fd = -1;
- cbd->pool = pool;
- cbd->partial = partial;
- cbd->do_shutdown = do_shutdown;
- cbd->iov = iov;
- cbd->iovlen = niov;
- cbd->total = total_out;
- cbd->pos = 0;
cbd->port = port;
- cbd->stop_pattern = stop_pattern;
+
+ if (do_read) {
+ cbd->in = g_byte_array_sized_new (8192);
+ }
+ else {
+ /* Save some space... */
+ cbd->in = g_byte_array_new ();
+ }
+
+ if (partial) {
+ cbd->flags |= LUA_TCP_FLAG_PARTIAL;
+ }
+
+ if (do_shutdown) {
+ cbd->flags |= LUA_TCP_FLAG_SHUTDOWN;
+ }
+
+ if (do_read) {
+ struct lua_tcp_handler *rh;
+
+ rh = g_slice_alloc0 (sizeof (*rh));
+ rh->type = LUA_WANT_READ;
+ rh->h.r.cbref = cbref;
+ rh->h.r.stop_pattern = stop_pattern;
+ g_queue_push_tail (cbd->handlers, rh);
+ }
+
cbd->connect_cb = conn_cbref;
- cbd->in = g_byte_array_new ();
- cbd->do_read = do_read;
- rspamd_mempool_add_destructor (cbd->pool,
- (rspamd_mempool_destruct_t)g_byte_array_unref,
- cbd->in);
REF_INIT_RETAIN (cbd, lua_tcp_maybe_free);
if (session) {