From 6c28de22d121ad73faca2e8cc15f049c510dd068 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Thu, 29 Dec 2016 15:50:29 +0000 Subject: [PATCH] [Rework] Rework lua_tcp to allow TCP dialog - Now, lua_tcp has a chain of read and write events that are processed in order - The old API wasn't touched, however, new style API will be possible - Partial lua_tcp might be broken, so I need to revisit all plugins that use lua_tcp Issue: #1224 --- src/lua/lua_tcp.c | 593 +++++++++++++++++++++++++++++++++------------- 1 file changed, 431 insertions(+), 162 deletions(-) diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c index 7be257b51..b3276b709 100644 --- a/src/lua/lua_tcp.c +++ b/src/lua/lua_tcp.c @@ -19,7 +19,6 @@ #include "ref.h" #include "unix-std.h" -static void lua_tcp_handler (int fd, short what, gpointer ud); /*** * @module rspamd_tcp * Rspamd TCP module represents generic TCP asynchronous client available from LUA code. @@ -77,32 +76,64 @@ static const struct luaL_reg tcp_libm[] = { {NULL, NULL} }; +struct lua_tcp_read_handler { + gchar *stop_pattern; + gint cbref; +}; + +struct lua_tcp_write_handler { + struct iovec *iov; + guint iovlen; + guint pos; + guint total; + gint cbref; +}; + +enum lua_tcp_handler_type { + LUA_WANT_WRITE = 0, + LUA_WANT_READ, +}; + +struct lua_tcp_handler { + union { + struct lua_tcp_read_handler r; + struct lua_tcp_write_handler w; + } h; + enum lua_tcp_handler_type type; +}; + +struct lua_tcp_dtor { + rspamd_mempool_destruct_t dtor; + void *data; + struct lua_tcp_dtor *next; +}; + +#define LUA_TCP_FLAG_PARTIAL (1 << 0) +#define LUA_TCP_FLAG_SHUTDOWN (1 << 2) +#define LUA_TCP_FLAG_CONNECTED (1 << 3) + struct lua_tcp_cbdata { lua_State *L; struct rspamd_async_session *session; struct event_base *ev_base; struct timeval tv; rspamd_inet_addr_t *addr; - rspamd_mempool_t *pool; - struct iovec *iov; GByteArray *in; - gchar *stop_pattern; + GQueue *handlers; + gint fd; + gint connect_cb; + guint port; + guint flags; struct rspamd_async_watcher *w; struct event ev; + struct lua_tcp_dtor *dtors; ref_entry_t ref; - gint fd; - gint cbref; - gint connect_cb; - guint iovlen; - guint pos; - guint total; - guint16 port; - gboolean partial; - gboolean do_shutdown; - gboolean do_read; - gboolean connected; }; +static void lua_tcp_handler (int fd, short what, gpointer ud); +static void lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, + gboolean can_read, gboolean can_write); + static const int default_tcp_timeout = 5000; static struct rspamd_dns_resolver * @@ -117,12 +148,51 @@ lua_tcp_global_resolver (struct event_base *ev_base) return global_resolver; } +static gboolean +lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd) +{ + struct lua_tcp_handler *hdl; + + hdl = g_queue_pop_head (cbd->handlers); + + if (hdl == NULL) { + /* We are done */ + return FALSE; + } + + if (hdl->type == LUA_WANT_READ) { + if (hdl->h.r.cbref) { + luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.r.cbref); + } + + if (hdl->h.r.stop_pattern) { + g_free (hdl->h.r.stop_pattern); + } + } + else { + if (hdl->h.w.cbref) { + luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.w.cbref); + } + + if (hdl->h.w.iov) { + g_free (hdl->h.w.iov); + } + } + + g_slice_free1 (sizeof (*hdl), hdl); + + return TRUE; +} + static void lua_tcp_fin (gpointer arg) { struct lua_tcp_cbdata *cbd = (struct lua_tcp_cbdata *)arg; + struct lua_tcp_dtor *dtor, *dttmp; - luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref); + if (cbd->connect_cb) { + luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb); + } if (cbd->fd != -1) { event_del (&cbd->ev); @@ -133,6 +203,14 @@ lua_tcp_fin (gpointer arg) rspamd_inet_address_destroy (cbd->addr); } + while (lua_tcp_shift_handler (cbd)) {} + + LL_FOREACH_SAFE (cbd->dtors, dtor, dttmp) { + dtor->dtor (dtor->data); + g_slice_free1 (sizeof (*dtor), dtor); + } + + g_byte_array_unref (cbd->in); g_slice_free1 (sizeof (struct lua_tcp_cbdata), cbd); } @@ -161,25 +239,40 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, const char *err, ...) { va_list ap; struct lua_tcp_cbdata **pcbd; + struct lua_tcp_handler *hdl; + gint cbref; - lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref); + hdl = g_queue_peek_head (cbd->handlers); - /* Error message */ - va_start (ap, err); - lua_pushvfstring (cbd->L, err, ap); - va_end (ap); + g_assert (hdl != NULL); - /* Body */ - lua_pushnil (cbd->L); - /* Connection */ - pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd)); - *pcbd = cbd; - rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1); - REF_RETAIN (cbd); + if (hdl->type == LUA_WANT_READ) { + cbref = hdl->h.r.cbref; + } + else { + cbref = hdl->h.w.cbref; + } - if (lua_pcall (cbd->L, 3, 0, 0) != 0) { - msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); - lua_pop (cbd->L, 1); + if (cbref != -1) { + lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref); + + /* Error message */ + va_start (ap, err); + lua_pushvfstring (cbd->L, err, ap); + va_end (ap); + + /* Body */ + lua_pushnil (cbd->L); + /* Connection */ + pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd)); + *pcbd = cbd; + rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1); + REF_RETAIN (cbd); + + if (lua_pcall (cbd->L, 3, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); + lua_pop (cbd->L, 1); + } } REF_RELEASE (cbd); @@ -190,31 +283,63 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len) { struct rspamd_lua_text *t; struct lua_tcp_cbdata **pcbd; + struct lua_tcp_handler *hdl; + gint cbref; - lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref); - /* Error */ - lua_pushnil (cbd->L); - /* Body */ - t = lua_newuserdata (cbd->L, sizeof (*t)); - rspamd_lua_setclass (cbd->L, "rspamd{text}", -1); - t->start = (const gchar *)str; - t->len = len; - t->flags = 0; - /* Connection */ - pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd)); - *pcbd = cbd; - rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1); + hdl = g_queue_peek_head (cbd->handlers); - REF_RETAIN (cbd); + g_assert (hdl != NULL); - if (lua_pcall (cbd->L, 3, 0, 0) != 0) { - msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); - lua_pop (cbd->L, 1); + if (hdl->type == LUA_WANT_READ) { + cbref = hdl->h.r.cbref; + } + else { + cbref = hdl->h.w.cbref; + } + + if (cbref != -1) { + lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref); + /* Error */ + lua_pushnil (cbd->L); + /* Body */ + + if (hdl->type == LUA_WANT_READ) { + t = lua_newuserdata (cbd->L, sizeof (*t)); + rspamd_lua_setclass (cbd->L, "rspamd{text}", -1); + t->start = (const gchar *)str; + t->len = len; + t->flags = 0; + } + /* Connection */ + pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd)); + *pcbd = cbd; + rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1); + + REF_RETAIN (cbd); + + if (lua_pcall (cbd->L, 3, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1)); + lua_pop (cbd->L, 1); + } } REF_RELEASE (cbd); } +static void +lua_tcp_plan_read (struct lua_tcp_cbdata *cbd) +{ + event_del (&cbd->ev); +#ifdef EV_CLOSED + event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST|EV_CLOSED, + lua_tcp_handler, cbd); +#else + event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST, lua_tcp_handler, cbd); +#endif + event_base_set (cbd->ev_base, &cbd->ev); + event_add (&cbd->ev, &cbd->tv); +} + static void lua_tcp_write_helper (struct lua_tcp_cbdata *cbd) { @@ -224,20 +349,27 @@ lua_tcp_write_helper (struct lua_tcp_cbdata *cbd) gsize remain; gssize r; struct iovec *cur_iov; + struct lua_tcp_handler *hdl; + struct lua_tcp_write_handler *wh; struct msghdr msg; - if (cbd->pos == cbd->total) { + hdl = g_queue_peek_head (cbd->handlers); + + g_assert (hdl != NULL && hdl->type == LUA_WANT_WRITE); + wh = &hdl->h.w; + + if (wh->pos == wh->total) { goto call_finish_handler; } - start = &cbd->iov[0]; - niov = cbd->iovlen; - remain = cbd->pos; + start = &wh->iov[0]; + niov = wh->iovlen; + remain = wh->pos; /* We know that niov is small enough for that */ cur_iov = alloca (niov * sizeof (struct iovec)); - memcpy (cur_iov, cbd->iov, niov * sizeof (struct iovec)); + memcpy (cur_iov, wh->iov, niov * sizeof (struct iovec)); - for (i = 0; i < cbd->iovlen && remain > 0; i++) { + for (i = 0; i < wh->iovlen && remain > 0; i++) { /* Find out the first iov required */ start = &cur_iov[i]; if (start->iov_len <= remain) { @@ -264,15 +396,16 @@ lua_tcp_write_helper (struct lua_tcp_cbdata *cbd) if (r == -1) { lua_tcp_push_error (cbd, "IO write error while trying to write %d " "bytes: %s", (gint)remain, strerror (errno)); - REF_RELEASE (cbd); + lua_tcp_shift_handler (cbd); + lua_tcp_plan_handler_event (cbd, TRUE, FALSE); return; } else { - cbd->pos += r; + wh->pos += r; } - if (cbd->pos >= cbd->total) { + if (wh->pos >= wh->total) { goto call_finish_handler; } else { @@ -284,93 +417,132 @@ lua_tcp_write_helper (struct lua_tcp_cbdata *cbd) call_finish_handler: - if (cbd->do_shutdown) { + if ((cbd->flags & LUA_TCP_FLAG_SHUTDOWN)) { /* Half close the connection */ shutdown (cbd->fd, SHUT_WR); + cbd->flags &= ~LUA_TCP_FLAG_SHUTDOWN; } - if (cbd->do_read) { - event_del (&cbd->ev); -#ifdef EV_CLOSED - event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST|EV_CLOSED, - lua_tcp_handler, cbd); -#else - event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST, lua_tcp_handler, cbd); -#endif - event_base_set (cbd->ev_base, &cbd->ev); - event_add (&cbd->ev, &cbd->tv); - } - else { - lua_tcp_push_data (cbd, cbd->in->data, cbd->in->len); - REF_RELEASE (cbd); - } + lua_tcp_push_data (cbd, NULL, 0); + lua_tcp_shift_handler (cbd); + lua_tcp_plan_handler_event (cbd, TRUE, TRUE); } -static void -lua_tcp_handler (int fd, short what, gpointer ud) +static gboolean +lua_tcp_process_read_handler (struct lua_tcp_cbdata *cbd, + struct lua_tcp_read_handler *rh) { - struct lua_tcp_cbdata *cbd = ud; - gchar inbuf[8192]; - gssize r; guint slen; - gint so_error = 0; - socklen_t so_len = sizeof (so_error); + goffset pos; - REF_RETAIN (cbd); + if (rh->stop_pattern) { + slen = strlen (rh->stop_pattern); - if (what == EV_READ) { - g_assert (cbd->partial || cbd->in != NULL); + if (cbd->in->len >= slen) { + if ((pos = rspamd_substring_search (cbd->in->data, cbd->in->len, + rh->stop_pattern, slen)) != -1) { + lua_tcp_push_data (cbd, cbd->in->data, pos); - r = read (cbd->fd, inbuf, sizeof (inbuf)); + if (pos + slen < cbd->in->len) { + /* We have a leftover */ + memmove (cbd->in->data, cbd->in->data + pos + slen, + cbd->in->len - (pos + slen)); + lua_tcp_shift_handler (cbd); + } + else { + lua_tcp_shift_handler (cbd); - if (r <= 0) { - /* - * We actually can have connection reset here, so we just check if - * the cumulative buffer is not empty - */ - if (cbd->partial) { - if (r < 0) { - lua_tcp_push_error (cbd, "IO read error while trying to read %d " - "bytes: %s", (gint)sizeof (inbuf), - strerror (errno)); + return TRUE; } } else { - if (cbd->in->len > 0) { - lua_tcp_push_data (cbd, cbd->in->data, cbd->in->len); - } - else { - lua_tcp_push_error (cbd, "IO read error while trying to write %d " - "bytes: %s", (gint)sizeof (inbuf), - strerror (errno)); - } + /* Plan new read */ + lua_tcp_plan_read (cbd); } + } + } - REF_RELEASE (cbd); + return FALSE; +} + +static void +lua_tcp_process_read (struct lua_tcp_cbdata *cbd, + guchar *in, gssize r) +{ + struct lua_tcp_handler *hdl; + struct lua_tcp_read_handler *rh; + + hdl = g_queue_peek_head (cbd->handlers); + + g_assert (hdl != NULL && hdl->type == LUA_WANT_READ); + rh = &hdl->h.r; + + if (r > 0) { + if (cbd->flags & LUA_TCP_FLAG_PARTIAL) { + lua_tcp_push_data (cbd, in, r); + /* Plan next event */ + lua_tcp_shift_handler (cbd); + lua_tcp_shift_handler (cbd); } else { - if (cbd->partial) { - lua_tcp_push_data (cbd, inbuf, r); + g_byte_array_append (cbd->in, in, r); + + if (!lua_tcp_process_read_handler (cbd, rh)) { + /* Plan more read */ + lua_tcp_plan_read (cbd); } else { - g_byte_array_append (cbd->in, inbuf, r); + /* Go towards the next handler */ + lua_tcp_plan_handler_event (cbd, TRUE, TRUE); + } + } + } + else if (r == 0) { + /* EOF */ + if (cbd->in->len > 0) { + /* We have some data to process */ + lua_tcp_process_read_handler (cbd, rh); + } + else { + lua_tcp_push_error (cbd, "IO read error: connection terminated"); + } - if (cbd->stop_pattern) { - slen = strlen (cbd->stop_pattern); + lua_tcp_plan_handler_event (cbd, FALSE, TRUE); + } + else { + /* An error occurred */ + if (errno == EAGAIN || errno == EINTR) { + /* Restart call */ + lua_tcp_plan_read (cbd); - if (cbd->in->len >= slen) { - if (memcmp (cbd->stop_pattern, cbd->in->data + - (cbd->in->len - slen), slen) == 0) { - lua_tcp_push_data (cbd, cbd->in->data, cbd->in->len); - REF_RELEASE (cbd); - } - } - } - } + return; } + + /* Fatal error */ + lua_tcp_push_error (cbd, "IO read error while trying to read data: %s", + strerror (errno)); + + REF_RELEASE (cbd); + } +} + +static void +lua_tcp_handler (int fd, short what, gpointer ud) +{ + struct lua_tcp_cbdata *cbd = ud; + guchar inbuf[8192]; + gssize r; + gint so_error = 0; + socklen_t so_len = sizeof (so_error); + + REF_RETAIN (cbd); + + if (what == EV_READ) { + r = read (cbd->fd, inbuf, sizeof (inbuf)); + lua_tcp_process_read (cbd, inbuf, r); } else if (what == EV_WRITE) { - if (!cbd->connected) { + if (!(cbd->flags & LUA_TCP_FLAG_CONNECTED)) { if (getsockopt (fd, SOL_SOCKET, SO_ERROR, &so_error, &so_len) == -1) { lua_tcp_push_error (cbd, "Cannot get socket error: %s", strerror (errno)); @@ -384,7 +556,7 @@ lua_tcp_handler (int fd, short what, gpointer ud) goto out; } else { - cbd->connected = TRUE; + cbd->flags |= LUA_TCP_FLAG_CONNECTED; if (cbd->connect_cb != -1) { struct lua_tcp_cbdata **pcbd; @@ -422,6 +594,69 @@ out: REF_RELEASE (cbd); } +static void +lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read, + gboolean can_write) +{ + struct lua_tcp_handler *hdl; + + hdl = g_queue_peek_head (cbd->handlers); + + if (hdl == NULL) { + /* We are finished with a connection */ + REF_RELEASE (cbd); + } + else { + if (hdl->type == LUA_WANT_READ) { + /* We need to check if we have some leftover in the buffer */ + if (cbd->in->len > 0) { + if (lua_tcp_process_read_handler (cbd, &hdl->h.r)) { + /* We can go to the next handler */ + lua_tcp_shift_handler (cbd); + lua_tcp_plan_handler_event (cbd, can_read, can_write); + } + } + else { + if (can_read) { + /* We need to plan a new event */ + event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd); + event_base_set (cbd->ev_base, &cbd->ev); + event_add (&cbd->ev, &cbd->tv); + } + else { + /* Cannot read more */ + lua_tcp_push_error (cbd, "EOF, cannot read more data"); + lua_tcp_shift_handler (cbd); + lua_tcp_plan_handler_event (cbd, can_read, can_write); + } + } + } + else { + /* + * We need to plan write event if there is something in the + * write request + */ + if (hdl->h.w.pos < hdl->h.w.total) { + if (can_write) { + event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd); + event_base_set (cbd->ev_base, &cbd->ev); + event_add (&cbd->ev, &cbd->tv); + } + else { + /* Cannot read more */ + lua_tcp_push_error (cbd, "EOF, cannot read more data"); + lua_tcp_shift_handler (cbd); + lua_tcp_plan_handler_event (cbd, can_read, can_write); + } + } + else { + /* We shouldn't have empty write handlers */ + g_assert_not_reached (); + } + } + } +} + static gboolean lua_tcp_make_connection (struct lua_tcp_cbdata *cbd) { @@ -434,11 +669,9 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd) msg_info ("cannot connect to %s", rspamd_inet_address_to_string (cbd->addr)); return FALSE; } - cbd->fd = fd; - event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd); - event_base_set (cbd->ev_base, &cbd->ev); - event_add (&cbd->ev, &cbd->tv); + cbd->fd = fd; + lua_tcp_plan_handler_event (cbd, TRUE, TRUE); return TRUE; } @@ -476,12 +709,13 @@ lua_tcp_dns_handler (struct rdns_reply *reply, gpointer ud) } static gboolean -lua_tcp_arg_toiovec (lua_State *L, gint pos, rspamd_mempool_t *pool, +lua_tcp_arg_toiovec (lua_State *L, gint pos, struct lua_tcp_cbdata *cbd, struct iovec *vec) { struct rspamd_lua_text *t; gsize len; const gchar *str; + struct lua_tcp_dtor *dtor; if (lua_type (L, pos) == LUA_TUSERDATA) { t = lua_check_text (L, pos); @@ -493,7 +727,10 @@ lua_tcp_arg_toiovec (lua_State *L, gint pos, rspamd_mempool_t *pool, if (t->flags & RSPAMD_TEXT_FLAG_OWN) { /* Steal ownership */ t->flags = 0; - rspamd_mempool_add_destructor (pool, g_free, (void *)t->start); + dtor = g_slice_alloc0 (sizeof (*dtor)); + dtor->dtor = g_free; + dtor->data = (void *)t->start; + LL_PREPEND (cbd->dtors, dtor); } } else { @@ -503,7 +740,10 @@ lua_tcp_arg_toiovec (lua_State *L, gint pos, rspamd_mempool_t *pool, } else if (lua_type (L, pos) == LUA_TSTRING) { str = luaL_checklstring (L, pos, &len); - vec->iov_base = rspamd_mempool_alloc (pool, len); + vec->iov_base = g_malloc (len); + dtor = g_slice_alloc0 (sizeof (*dtor)); + dtor->dtor = g_free; + dtor->data = (void *)vec->iov_base; memcpy (vec->iov_base, str, len); vec->iov_len = len; } @@ -551,7 +791,6 @@ lua_tcp_request (lua_State *L) struct rspamd_dns_resolver *resolver; struct rspamd_async_session *session; struct rspamd_task *task = NULL; - rspamd_mempool_t *pool; struct iovec *iov = NULL; guint niov = 0, total_out; gdouble timeout = default_tcp_timeout; @@ -585,6 +824,8 @@ lua_tcp_request (lua_State *L) } cbref = luaL_ref (L, LUA_REGISTRYINDEX); + cbd = g_slice_alloc0 (sizeof (*cbd)); + lua_pushstring (L, "task"); lua_gettable (L, -2); if (lua_type (L, -1) == LUA_TUSERDATA) { @@ -592,7 +833,6 @@ lua_tcp_request (lua_State *L) ev_base = task->ev_base; resolver = task->resolver; session = task->s; - pool = task->task_pool; } lua_pop (L, 1); @@ -607,16 +847,6 @@ lua_tcp_request (lua_State *L) } lua_pop (L, 1); - lua_pushstring (L, "pool"); - lua_gettable (L, -2); - if (rspamd_lua_check_udata (L, -1, "rspamd{mempool}")) { - pool = *(rspamd_mempool_t **)lua_touserdata (L, -1); - } - else { - pool = NULL; - } - lua_pop (L, 1); - lua_pushstring (L, "resolver"); lua_gettable (L, -2); if (rspamd_lua_check_udata_maybe (L, -1, "rspamd{resolver}")) { @@ -638,10 +868,6 @@ lua_tcp_request (lua_State *L) lua_pop (L, 1); } - if (pool == NULL) { - return luaL_error (L, "tcp request has no memory pool associated"); - } - lua_pushstring (L, "timeout"); lua_gettable (L, -2); if (lua_type (L, -1) == LUA_TNUMBER) { @@ -652,7 +878,7 @@ lua_tcp_request (lua_State *L) lua_pushstring (L, "stop_pattern"); lua_gettable (L, -2); if (lua_type (L, -1) == LUA_TSTRING) { - stop_pattern = rspamd_mempool_strdup (pool, lua_tostring (L, -1)); + stop_pattern = g_strdup (lua_tostring (L, -1)); } lua_pop (L, 1); @@ -693,13 +919,16 @@ lua_tcp_request (lua_State *L) tp = lua_type (L, -1); if (tp == LUA_TSTRING || tp == LUA_TUSERDATA) { - iov = rspamd_mempool_alloc (pool, sizeof (*iov)); + iov = g_malloc (sizeof (*iov)); niov = 1; - if (!lua_tcp_arg_toiovec (L, -1, pool, iov)) { + if (!lua_tcp_arg_toiovec (L, -1, cbd, iov)) { lua_pop (L, 1); msg_err ("tcp request has bad data argument"); lua_pushboolean (L, FALSE); + g_free (iov); + g_slice_free1 (sizeof (*cbd), cbd); + return 1; } @@ -713,15 +942,18 @@ lua_tcp_request (lua_State *L) lua_pop (L, 1); } - iov = rspamd_mempool_alloc (pool, sizeof (*iov) * niov); + iov = g_malloc (sizeof (*iov) * niov); lua_pushnil (L); niov = 0; while (lua_next (L, -2) != 0) { - if (!lua_tcp_arg_toiovec (L, -1, pool, &iov[niov])) { + if (!lua_tcp_arg_toiovec (L, -1, cbd, &iov[niov])) { lua_pop (L, 2); msg_err ("tcp request has bad data argument at pos %d", niov); lua_pushboolean (L, FALSE); + g_free (iov); + g_slice_free1 (sizeof (*cbd), cbd); + return 1; } @@ -741,27 +973,64 @@ lua_tcp_request (lua_State *L) return 1; } - cbd = g_slice_alloc0 (sizeof (*cbd)); cbd->L = L; - cbd->cbref = cbref; + + if (total_out > 0) { + struct lua_tcp_handler *wh; + + wh = g_slice_alloc0 (sizeof (*wh)); + wh->type = LUA_WANT_WRITE; + wh->h.w.iov = iov; + wh->h.w.iovlen = niov; + wh->h.w.total = total_out; + wh->h.w.pos = 0; + /* Cannot set write handler here */ + wh->h.w.cbref = -1; + + if (cbref != -1 && !do_read) { + /* We have write only callback */ + wh->h.w.cbref = cbref; + } + else { + /* We have simple client callback */ + wh->h.w.cbref = -1; + } + + g_queue_push_tail (cbd->handlers, wh); + } + cbd->ev_base = ev_base; msec_to_tv (timeout, &cbd->tv); cbd->fd = -1; - cbd->pool = pool; - cbd->partial = partial; - cbd->do_shutdown = do_shutdown; - cbd->iov = iov; - cbd->iovlen = niov; - cbd->total = total_out; - cbd->pos = 0; cbd->port = port; - cbd->stop_pattern = stop_pattern; + + if (do_read) { + cbd->in = g_byte_array_sized_new (8192); + } + else { + /* Save some space... */ + cbd->in = g_byte_array_new (); + } + + if (partial) { + cbd->flags |= LUA_TCP_FLAG_PARTIAL; + } + + if (do_shutdown) { + cbd->flags |= LUA_TCP_FLAG_SHUTDOWN; + } + + if (do_read) { + struct lua_tcp_handler *rh; + + rh = g_slice_alloc0 (sizeof (*rh)); + rh->type = LUA_WANT_READ; + rh->h.r.cbref = cbref; + rh->h.r.stop_pattern = stop_pattern; + g_queue_push_tail (cbd->handlers, rh); + } + cbd->connect_cb = conn_cbref; - cbd->in = g_byte_array_new (); - cbd->do_read = do_read; - rspamd_mempool_add_destructor (cbd->pool, - (rspamd_mempool_destruct_t)g_byte_array_unref, - cbd->in); REF_INIT_RETAIN (cbd, lua_tcp_maybe_free); if (session) { -- 2.39.5