aboutsummaryrefslogtreecommitdiffstats
path: root/src/lua/lua_tcp.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2015-05-18 16:54:55 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2015-05-18 16:54:55 +0100
commit51edc2517af40c42381f983f2aa7175853c1208e (patch)
tree17cde12ff17aa5c52560592b336e4f73508550ee /src/lua/lua_tcp.c
parent453a65c386075e803ebdddd01f3145a0f221a12a (diff)
downloadrspamd-51edc2517af40c42381f983f2aa7175853c1208e.tar.gz
rspamd-51edc2517af40c42381f983f2aa7175853c1208e.zip
Implement IO in lua tcp.
Diffstat (limited to 'src/lua/lua_tcp.c')
-rw-r--r--src/lua/lua_tcp.c161
1 files changed, 161 insertions, 0 deletions
diff --git a/src/lua/lua_tcp.c b/src/lua/lua_tcp.c
index 20330299e..a5fb6a882 100644
--- a/src/lua/lua_tcp.c
+++ b/src/lua/lua_tcp.c
@@ -27,6 +27,8 @@
#include "dns.h"
#include "utlist.h"
+static void lua_tcp_handler (int fd, short what, gpointer ud);
+
LUA_FUNCTION_DEF (tcp, request);
static const struct luaL_reg tcp_libf[] = {
@@ -43,6 +45,8 @@ struct lua_tcp_cbdata {
rspamd_inet_addr_t *addr;
rspamd_mempool_t *pool;
struct iovec *iov;
+ GString *in;
+ struct event ev;
gint fd;
gint cbref;
guint iovlen;
@@ -74,6 +78,7 @@ lua_tcp_fin (gpointer arg)
luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
if (cbd->fd != -1) {
+ event_del (&cbd->ev);
close (cbd->fd);
}
@@ -106,6 +111,158 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, const char *err)
}
}
+static void
+lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const gchar *str, gsize len)
+{
+ struct rspamd_lua_text *t;
+
+ lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+ /* Error */
+ lua_pushnil (cbd->L);
+ /* Body */
+ t = lua_newuserdata (cbd->L, sizeof (*t));
+ rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
+ t->start = str;
+ t->len = len;
+
+ if (lua_pcall (cbd->L, 2, 0, 0) != 0) {
+ msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+ }
+
+ if (!cbd->partial) {
+ lua_tcp_maybe_free (cbd);
+ }
+}
+
+static void
+lua_tcp_write_helper (struct lua_tcp_cbdata *cbd)
+{
+ struct iovec *start;
+ guint niov, i;
+ gint flags = 0;
+ gsize remain;
+ gssize r;
+ struct iovec *cur_iov;
+ struct msghdr msg;
+
+ if (cbd->pos == cbd->total) {
+ goto call_finish_handler;
+ }
+
+ start = &cbd->iov[0];
+ niov = cbd->iovlen;
+ remain = cbd->pos;
+ /* We know that niov is small enough for that */
+ cur_iov = alloca (niov * sizeof (struct iovec));
+ memcpy (cur_iov, cbd->iov, niov * sizeof (struct iovec));
+ for (i = 0; i < cbd->iovlen && remain > 0; i++) {
+ /* Find out the first iov required */
+ start = &cur_iov[i];
+ if (start->iov_len <= remain) {
+ remain -= start->iov_len;
+ start = &cur_iov[i + 1];
+ niov--;
+ }
+ else {
+ start->iov_base = (void *)((char *)start->iov_base + remain);
+ start->iov_len -= remain;
+ remain = 0;
+ }
+ }
+
+ memset (&msg, 0, sizeof (msg));
+ msg.msg_iov = start;
+ msg.msg_iovlen = MIN (IOV_MAX, niov);
+ g_assert (niov > 0);
+#ifdef MSG_NOSIGNAL
+ flags = MSG_NOSIGNAL;
+#endif
+ r = sendmsg (cbd->fd, &msg, flags);
+
+ if (r == -1) {
+ lua_tcp_push_error (cbd, "IO write error");
+ lua_tcp_maybe_free (cbd);
+ return;
+ }
+ else {
+ cbd->pos += r;
+ }
+
+ if (cbd->pos >= cbd->total) {
+ goto call_finish_handler;
+ }
+ else {
+ /* Want to write more */
+ event_add (&cbd->ev, &cbd->tv);
+ }
+
+ return;
+
+call_finish_handler:
+
+ if (!cbd->partial) {
+ cbd->in = g_string_sized_new (BUFSIZ);
+ rspamd_mempool_add_destructor (cbd->pool, rspamd_gstring_free_hard,
+ cbd->in);
+ }
+
+ event_del (&cbd->ev);
+ event_set (&cbd->ev, cbd->fd, EV_READ | EV_PERSIST, lua_tcp_handler, cbd);
+ event_base_set (cbd->ev_base, &cbd->ev);
+ event_add (&cbd->ev, &cbd->tv);
+}
+
+static void
+lua_tcp_handler (int fd, short what, gpointer ud)
+{
+ struct lua_tcp_cbdata *cbd = ud;
+ gchar inbuf[BUFSIZ];
+ gssize r;
+
+ if (what == EV_READ) {
+ g_assert (cbd->partial || cbd->in != NULL);
+
+ r = read (cbd->fd, inbuf, sizeof (inbuf));
+
+ if (r <= 0) {
+ /*
+ * We actually can have connection reset here, so we just check if
+ * the cumulative buffer is not empty
+ */
+ if (cbd->partial) {
+ if (r < 0) {
+ lua_tcp_push_error (cbd, strerror (errno));
+ }
+ }
+ else {
+ if (cbd->in->len > 0) {
+ lua_tcp_push_data (cbd, cbd->in->str, cbd->in->len);
+ }
+ else {
+ lua_tcp_push_error (cbd, "IO read error");
+ }
+ }
+
+ lua_tcp_maybe_free (cbd);
+ }
+ else {
+ if (cbd->partial) {
+ lua_tcp_push_data (cbd, inbuf, r);
+ }
+ else {
+ g_string_append_len (cbd->in, inbuf, r);
+ }
+ }
+ }
+ else if (what == EV_WRITE) {
+ lua_tcp_write_helper (cbd);
+ }
+ else {
+ lua_tcp_push_error (cbd, "IO timeout");
+ lua_tcp_maybe_free (cbd);
+ }
+}
+
static gboolean
lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
{
@@ -120,6 +277,10 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
}
cbd->fd = fd;
+ event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd);
+ event_base_set (cbd->ev_base, &cbd->ev);
+ event_add (&cbd->ev, &cbd->tv);
+
return TRUE;
}