]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Lua_udp: Implement fully functional client
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 21 Jan 2019 16:26:11 +0000 (16:26 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 21 Jan 2019 16:26:11 +0000 (16:26 +0000)
src/lua/lua_udp.c

index 3b54c5e9ec70525b2619e329e8630fbfa5509eef..1bff702a9630709e20ecab07412d4a46d1c24a65 100644 (file)
@@ -54,16 +54,21 @@ static const struct luaL_reg udp_libf[] = {
 
 struct lua_udp_cbdata {
        struct event io;
+       struct timeval tv;
        struct event_base *ev_base;
+       struct rspamd_async_event *async_ev;
+       struct rspamd_task *task;
        rspamd_mempool_t *pool;
        rspamd_inet_addr_t *addr;
        struct rspamd_symcache_item *item;
        struct rspamd_async_session *s;
        struct iovec *iov;
        lua_State *L;
+       guint retransmits;
        guint iovlen;
        gint sock;
        gint cbref;
+       gboolean sent;
 };
 
 #define msg_debug_udp(...)  rspamd_conditional_debug_fast (NULL, cbd->addr, \
@@ -110,6 +115,26 @@ lua_udp_cbd_fin (gpointer p)
        if (cbd->addr) {
                rspamd_inet_address_free (cbd->addr);
        }
+
+       if (cbd->cbref) {
+               luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+       }
+}
+
+static void
+lua_udp_maybe_free (struct lua_udp_cbdata *cbd)
+{
+       if (cbd->item) {
+               rspamd_symcache_item_async_dec_check (cbd->task, cbd->item, M);
+               cbd->item = NULL;
+       }
+
+       if (cbd->async_ev) {
+               rspamd_session_remove_event (cbd->s, lua_udp_cbd_fin, cbd);
+       }
+       else {
+               lua_udp_cbd_fin (cbd);
+       }
 }
 
 
@@ -143,44 +168,186 @@ lua_try_send_request (struct lua_udp_cbdata *cbd)
        return RSPAMD_SENT_FAILURE;
 }
 
+static void
+lua_udp_maybe_push_error (struct lua_udp_cbdata *cbd, const gchar *err)
+{
+       if (cbd->cbref != -1) {
+               gint top;
+               lua_State *L = cbd->L;
+
+               top = lua_gettop (L);
+               lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
+
+               /* Error message */
+               lua_pushboolean (L, false);
+               lua_pushstring (L, err);
+
+               if (cbd->item) {
+                       rspamd_symcache_set_cur_item (cbd->task, cbd->item);
+               }
+
+               if (lua_pcall (L, 2, 0, 0) != 0) {
+                       msg_info ("callback call failed: %s", lua_tostring (L, -1));
+               }
+
+               lua_settop (L, top);
+       }
+
+       lua_udp_maybe_free (cbd);
+}
+
+static void
+lua_udp_push_data (struct lua_udp_cbdata *cbd, const gchar *data,
+               gssize len)
+{
+       if (cbd->cbref != -1) {
+               gint top;
+               lua_State *L = cbd->L;
+
+               top = lua_gettop (L);
+               lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
+
+               /* Error message */
+               lua_pushboolean (L, true);
+               lua_pushlstring (L, data, len);
+
+               if (cbd->item) {
+                       rspamd_symcache_set_cur_item (cbd->task, cbd->item);
+               }
+
+               if (lua_pcall (L, 2, 0, 0) != 0) {
+                       msg_info ("callback call failed: %s", lua_tostring (L, -1));
+               }
+
+               lua_settop (L, top);
+       }
+
+       lua_udp_maybe_free (cbd);
+}
+
+static gboolean
+lua_udp_maybe_register_event (struct lua_udp_cbdata *cbd)
+{
+       if (cbd->s) {
+               cbd->async_ev = rspamd_session_add_event (cbd->s, lua_udp_cbd_fin,
+                               cbd, M);
+
+               if (!cbd->async_ev) {
+                       return FALSE;
+               }
+       }
+
+       if (cbd->task) {
+               cbd->item = rspamd_symcache_get_cur_item (cbd->task);
+               rspamd_symcache_item_async_inc (cbd->task, cbd->item, M);
+       }
+
+       return TRUE;
+}
+
+static void
+lua_udp_io_handler (gint fd, short what, gpointer p)
+{
+       struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *)p;
+       lua_State *L;
+       gssize r;
+
+       L = cbd->L;
+
+       if (what == EV_TIMEOUT) {
+               if (cbd->sent  && cbd->retransmits > 0) {
+                       r = lua_try_send_request (cbd);
+
+                       if (r == RSPAMD_SENT_OK) {
+                               event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
+                               event_base_set (cbd->ev_base, &cbd->io);
+                               event_add (&cbd->io, &cbd->tv);
+                               lua_udp_maybe_register_event (cbd);
+                       }
+                       else if (r == RSPAMD_SENT_FAILURE) {
+                               lua_udp_maybe_push_error (cbd, "write error");
+                       }
+                       else {
+                               cbd->retransmits --;
+                               event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
+                               event_base_set (cbd->ev_base, &cbd->io);
+                               event_add (&cbd->io, &cbd->tv);
+                       }
+               }
+               else {
+                       if (!cbd->sent) {
+                               lua_udp_maybe_push_error (cbd, "sent timeout");
+                       }
+                       else {
+                               lua_udp_maybe_push_error (cbd, "read timeout");
+                       }
+               }
+       }
+       else if (what == EV_WRITE) {
+               r = lua_try_send_request (cbd);
+
+               if (r == RSPAMD_SENT_OK) {
+                       if (cbd->cbref != -1) {
+                               event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
+                               event_base_set (cbd->ev_base, &cbd->io);
+                               event_add (&cbd->io, &cbd->tv);
+                               cbd->sent = TRUE;
+                       }
+                       else {
+                               lua_udp_maybe_free (cbd);
+                       }
+               }
+               else if (r == RSPAMD_SENT_FAILURE) {
+                       lua_udp_maybe_push_error (cbd, "write error");
+               }
+               else {
+                       cbd->retransmits --;
+                       event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
+                       event_base_set (cbd->ev_base, &cbd->io);
+                       event_add (&cbd->io, &cbd->tv);
+               }
+       }
+       else if (what == EV_READ) {
+               guchar udpbuf[4096];
+               socklen_t slen;
+               struct sockaddr *sa;
+
+               sa = rspamd_inet_address_get_sa (cbd->addr, &slen);
+
+               r = recvfrom (cbd->sock, udpbuf, sizeof (udpbuf), 0, sa, &slen);
+
+               if (r == -1) {
+                       lua_udp_maybe_push_error (cbd, strerror (errno));
+               }
+               else {
+                       lua_udp_push_data (cbd, udpbuf, r);
+               }
+       }
+}
 
 /***
- * @function rspamd_tcp.request({params})
- * This function creates and sends TCP request to the specified host and port,
- * resolves hostname (if needed) and invokes continuation callback upon data received
- * from the remote peer. This function accepts table of arguments with the following
- * attributes
+ * @function rspamd_udp.sendto({params})
+ * This function simply sends data to an external UDP service
  *
  * - `task`: rspamd task objects (implies `pool`, `session`, `ev_base` and `resolver` arguments)
  * - `ev_base`: event base (if no task specified)
  * - `session`: events session (no task)
  * - `host`: IP or name of the peer (required)
- * - `port`: remote port to use
+ * - `port`: remote port to use (if `host` has no port part this is required)
  * - `data`: a table of strings or `rspamd_text` objects that contains data pieces
- * - `callback`: continuation function (required)
- * - `on_connect`: callback called on connection success
- * - `timeout`: floating point value that specifies timeout for IO operations in **seconds**
- * - `partial`: boolean flag that specifies that callback should be called on any data portion received
- * - `stop_pattern`: stop reading on finding a certain pattern (e.g. \r\n.\r\n for smtp)
- * - `shutdown`: half-close socket after writing (boolean: default false)
- * - `read`: read response after sending request (boolean: default true)
- * @return {boolean} true if request has been sent
+ * @return {boolean} true if request has been sent (additional string if it has not)
  */
 static gint
 lua_udp_sendto (lua_State *L) {
        LUA_TRACE_POINT;
        const gchar *host;
        guint port;
-       gint cbref, tp, conn_cbref = -1;
        struct event_base *ev_base = NULL;
        struct lua_udp_cbdata *cbd;
        struct rspamd_async_session *session = NULL;
        struct rspamd_task *task = NULL;
        rspamd_inet_addr_t *addr;
        rspamd_mempool_t *pool = NULL;
-       struct iovec *iov = NULL;
-       guint niov = 0, total_out;
-       guint64 h;
        gdouble timeout = default_udp_timeout;
 
        if (lua_type (L, 1) == LUA_TTABLE) {
@@ -204,7 +371,9 @@ lua_udp_sendto (lua_State *L) {
                        host = luaL_checkstring (L, -1);
 
                        if (rspamd_parse_inet_address (&addr, host, 0)) {
-                               rspamd_inet_address_set_port (addr, port);
+                               if (port != 0) {
+                                       rspamd_inet_address_set_port (addr, port);
+                               }
                        }
                        else {
                                lua_pop (L, 1);
@@ -222,6 +391,10 @@ lua_udp_sendto (lua_State *L) {
                        }
 
                        addr = rspamd_inet_address_copy (lip->addr);
+
+                       if (port != 0) {
+                               rspamd_inet_address_set_port (addr, port);
+                       }
                }
                else {
                        lua_pop (L, 1);
@@ -276,6 +449,14 @@ lua_udp_sendto (lua_State *L) {
                }
                lua_pop (L, 1);
 
+               if (!ev_base || !pool) {
+                       rspamd_inet_address_free (addr);
+
+                       return luaL_error (L, "invalid arguments");
+               }
+
+
+
                if (!ev_base || !pool) {
                        rspamd_inet_address_free (addr);
 
@@ -289,6 +470,8 @@ lua_udp_sendto (lua_State *L) {
                cbd->addr = addr;
                cbd->sock = rspamd_socket_create (rspamd_inet_address_get_af (addr),
                                SOCK_DGRAM, 0, TRUE);
+               cbd->cbref = -1;
+               double_to_tv (timeout, &cbd->tv);
 
                if (cbd->sock == -1) {
                        rspamd_inet_address_free (addr);
@@ -301,6 +484,8 @@ lua_udp_sendto (lua_State *L) {
                gsize data_len;
 
                lua_pushstring (L, "data");
+               lua_gettable (L, -2);
+
                if (lua_type (L, -1) == LUA_TTABLE) {
                        data_len = rspamd_lua_table_size (L, -1);
                        cbd->iov = rspamd_mempool_alloc (pool,
@@ -322,22 +507,65 @@ lua_udp_sendto (lua_State *L) {
 
                lua_pop (L, 1);
 
+               lua_pushstring (L, "callback");
+               lua_gettable (L, -2);
+               if (lua_type (L, -1) == LUA_TFUNCTION) {
+                       cbd->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
+               }
+               else {
+                       lua_pop (L, 1);
+               }
+
+               lua_pushstring (L, "retransmits");
+               lua_gettable (L, -2);
+               if (lua_type (L, -1) == LUA_TNUMBER) {
+                       cbd->retransmits = lua_tonumber (L, -1);
+               }
+               lua_pop (L, 1);
+
                enum rspamd_udp_send_result r;
 
                r = lua_try_send_request (cbd);
                if (r == RSPAMD_SENT_OK) {
+                       if (cbd->cbref == -1) {
+                               lua_udp_maybe_free (cbd);
+                       }
+                       else {
+                               if (!lua_udp_maybe_register_event (cbd)) {
+                                       lua_pushboolean (L, false);
+                                       lua_pushstring (L, "session error");
+                                       lua_udp_maybe_free (cbd);
+
+                                       return 2;
+                               }
+
+                               event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
+                               event_base_set (cbd->ev_base, &cbd->io);
+                               event_add (&cbd->io, &cbd->tv);
+                               cbd->sent = TRUE;
+                       }
+
                        lua_pushboolean (L, true);
-                       lua_udp_cbd_fin (cbd);
                }
                else if (r == RSPAMD_SENT_FAILURE) {
                        lua_pushboolean (L, false);
                        lua_pushstring (L, strerror (errno));
-                       lua_udp_cbd_fin (cbd);
+                       lua_udp_maybe_free (cbd);
 
                        return 2;
                }
                else {
-                       /* TODO: add waiting */
+                       event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
+                       event_base_set (cbd->ev_base, &cbd->io);
+                       event_add (&cbd->io, &cbd->tv);
+
+                       if (!lua_udp_maybe_register_event (cbd)) {
+                               lua_pushboolean (L, false);
+                               lua_pushstring (L, "session error");
+                               lua_udp_maybe_free (cbd);
+
+                               return 2;
+                       }
                }
        }
        else {