Kaynağa Gözat

[Project] Lua_udp: Implement fully functional client

tags/1.9.0
Vsevolod Stakhov 5 yıl önce
ebeveyn
işleme
239f6045f9
1 değiştirilmiş dosya ile 250 ekleme ve 22 silme
  1. 250
    22
      src/lua/lua_udp.c

+ 250
- 22
src/lua/lua_udp.c Dosyayı Görüntüle

@@ -54,16 +54,21 @@ static const struct luaL_reg udp_libf[] = {

struct lua_udp_cbdata {
struct event io;
struct timeval tv;
struct event_base *ev_base;
struct rspamd_async_event *async_ev;
struct rspamd_task *task;
rspamd_mempool_t *pool;
rspamd_inet_addr_t *addr;
struct rspamd_symcache_item *item;
struct rspamd_async_session *s;
struct iovec *iov;
lua_State *L;
guint retransmits;
guint iovlen;
gint sock;
gint cbref;
gboolean sent;
};

#define msg_debug_udp(...) rspamd_conditional_debug_fast (NULL, cbd->addr, \
@@ -110,6 +115,26 @@ lua_udp_cbd_fin (gpointer p)
if (cbd->addr) {
rspamd_inet_address_free (cbd->addr);
}

if (cbd->cbref) {
luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
}
}

static void
lua_udp_maybe_free (struct lua_udp_cbdata *cbd)
{
if (cbd->item) {
rspamd_symcache_item_async_dec_check (cbd->task, cbd->item, M);
cbd->item = NULL;
}

if (cbd->async_ev) {
rspamd_session_remove_event (cbd->s, lua_udp_cbd_fin, cbd);
}
else {
lua_udp_cbd_fin (cbd);
}
}


@@ -143,44 +168,186 @@ lua_try_send_request (struct lua_udp_cbdata *cbd)
return RSPAMD_SENT_FAILURE;
}

static void
lua_udp_maybe_push_error (struct lua_udp_cbdata *cbd, const gchar *err)
{
if (cbd->cbref != -1) {
gint top;
lua_State *L = cbd->L;

top = lua_gettop (L);
lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);

/* Error message */
lua_pushboolean (L, false);
lua_pushstring (L, err);

if (cbd->item) {
rspamd_symcache_set_cur_item (cbd->task, cbd->item);
}

if (lua_pcall (L, 2, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}

lua_settop (L, top);
}

lua_udp_maybe_free (cbd);
}

static void
lua_udp_push_data (struct lua_udp_cbdata *cbd, const gchar *data,
gssize len)
{
if (cbd->cbref != -1) {
gint top;
lua_State *L = cbd->L;

top = lua_gettop (L);
lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);

/* Error message */
lua_pushboolean (L, true);
lua_pushlstring (L, data, len);

if (cbd->item) {
rspamd_symcache_set_cur_item (cbd->task, cbd->item);
}

if (lua_pcall (L, 2, 0, 0) != 0) {
msg_info ("callback call failed: %s", lua_tostring (L, -1));
}

lua_settop (L, top);
}

lua_udp_maybe_free (cbd);
}

static gboolean
lua_udp_maybe_register_event (struct lua_udp_cbdata *cbd)
{
if (cbd->s) {
cbd->async_ev = rspamd_session_add_event (cbd->s, lua_udp_cbd_fin,
cbd, M);

if (!cbd->async_ev) {
return FALSE;
}
}

if (cbd->task) {
cbd->item = rspamd_symcache_get_cur_item (cbd->task);
rspamd_symcache_item_async_inc (cbd->task, cbd->item, M);
}

return TRUE;
}

static void
lua_udp_io_handler (gint fd, short what, gpointer p)
{
struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *)p;
lua_State *L;
gssize r;

L = cbd->L;

if (what == EV_TIMEOUT) {
if (cbd->sent && cbd->retransmits > 0) {
r = lua_try_send_request (cbd);

if (r == RSPAMD_SENT_OK) {
event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
event_base_set (cbd->ev_base, &cbd->io);
event_add (&cbd->io, &cbd->tv);
lua_udp_maybe_register_event (cbd);
}
else if (r == RSPAMD_SENT_FAILURE) {
lua_udp_maybe_push_error (cbd, "write error");
}
else {
cbd->retransmits --;
event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
event_base_set (cbd->ev_base, &cbd->io);
event_add (&cbd->io, &cbd->tv);
}
}
else {
if (!cbd->sent) {
lua_udp_maybe_push_error (cbd, "sent timeout");
}
else {
lua_udp_maybe_push_error (cbd, "read timeout");
}
}
}
else if (what == EV_WRITE) {
r = lua_try_send_request (cbd);

if (r == RSPAMD_SENT_OK) {
if (cbd->cbref != -1) {
event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
event_base_set (cbd->ev_base, &cbd->io);
event_add (&cbd->io, &cbd->tv);
cbd->sent = TRUE;
}
else {
lua_udp_maybe_free (cbd);
}
}
else if (r == RSPAMD_SENT_FAILURE) {
lua_udp_maybe_push_error (cbd, "write error");
}
else {
cbd->retransmits --;
event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
event_base_set (cbd->ev_base, &cbd->io);
event_add (&cbd->io, &cbd->tv);
}
}
else if (what == EV_READ) {
guchar udpbuf[4096];
socklen_t slen;
struct sockaddr *sa;

sa = rspamd_inet_address_get_sa (cbd->addr, &slen);

r = recvfrom (cbd->sock, udpbuf, sizeof (udpbuf), 0, sa, &slen);

if (r == -1) {
lua_udp_maybe_push_error (cbd, strerror (errno));
}
else {
lua_udp_push_data (cbd, udpbuf, r);
}
}
}

/***
* @function rspamd_tcp.request({params})
* This function creates and sends TCP request to the specified host and port,
* resolves hostname (if needed) and invokes continuation callback upon data received
* from the remote peer. This function accepts table of arguments with the following
* attributes
* @function rspamd_udp.sendto({params})
* This function simply sends data to an external UDP service
*
* - `task`: rspamd task objects (implies `pool`, `session`, `ev_base` and `resolver` arguments)
* - `ev_base`: event base (if no task specified)
* - `session`: events session (no task)
* - `host`: IP or name of the peer (required)
* - `port`: remote port to use
* - `port`: remote port to use (if `host` has no port part this is required)
* - `data`: a table of strings or `rspamd_text` objects that contains data pieces
* - `callback`: continuation function (required)
* - `on_connect`: callback called on connection success
* - `timeout`: floating point value that specifies timeout for IO operations in **seconds**
* - `partial`: boolean flag that specifies that callback should be called on any data portion received
* - `stop_pattern`: stop reading on finding a certain pattern (e.g. \r\n.\r\n for smtp)
* - `shutdown`: half-close socket after writing (boolean: default false)
* - `read`: read response after sending request (boolean: default true)
* @return {boolean} true if request has been sent
* @return {boolean} true if request has been sent (additional string if it has not)
*/
static gint
lua_udp_sendto (lua_State *L) {
LUA_TRACE_POINT;
const gchar *host;
guint port;
gint cbref, tp, conn_cbref = -1;
struct event_base *ev_base = NULL;
struct lua_udp_cbdata *cbd;
struct rspamd_async_session *session = NULL;
struct rspamd_task *task = NULL;
rspamd_inet_addr_t *addr;
rspamd_mempool_t *pool = NULL;
struct iovec *iov = NULL;
guint niov = 0, total_out;
guint64 h;
gdouble timeout = default_udp_timeout;

if (lua_type (L, 1) == LUA_TTABLE) {
@@ -204,7 +371,9 @@ lua_udp_sendto (lua_State *L) {
host = luaL_checkstring (L, -1);

if (rspamd_parse_inet_address (&addr, host, 0)) {
rspamd_inet_address_set_port (addr, port);
if (port != 0) {
rspamd_inet_address_set_port (addr, port);
}
}
else {
lua_pop (L, 1);
@@ -222,6 +391,10 @@ lua_udp_sendto (lua_State *L) {
}

addr = rspamd_inet_address_copy (lip->addr);

if (port != 0) {
rspamd_inet_address_set_port (addr, port);
}
}
else {
lua_pop (L, 1);
@@ -276,6 +449,14 @@ lua_udp_sendto (lua_State *L) {
}
lua_pop (L, 1);

if (!ev_base || !pool) {
rspamd_inet_address_free (addr);

return luaL_error (L, "invalid arguments");
}



if (!ev_base || !pool) {
rspamd_inet_address_free (addr);

@@ -289,6 +470,8 @@ lua_udp_sendto (lua_State *L) {
cbd->addr = addr;
cbd->sock = rspamd_socket_create (rspamd_inet_address_get_af (addr),
SOCK_DGRAM, 0, TRUE);
cbd->cbref = -1;
double_to_tv (timeout, &cbd->tv);

if (cbd->sock == -1) {
rspamd_inet_address_free (addr);
@@ -301,6 +484,8 @@ lua_udp_sendto (lua_State *L) {
gsize data_len;

lua_pushstring (L, "data");
lua_gettable (L, -2);

if (lua_type (L, -1) == LUA_TTABLE) {
data_len = rspamd_lua_table_size (L, -1);
cbd->iov = rspamd_mempool_alloc (pool,
@@ -322,22 +507,65 @@ lua_udp_sendto (lua_State *L) {

lua_pop (L, 1);

lua_pushstring (L, "callback");
lua_gettable (L, -2);
if (lua_type (L, -1) == LUA_TFUNCTION) {
cbd->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
}
else {
lua_pop (L, 1);
}

lua_pushstring (L, "retransmits");
lua_gettable (L, -2);
if (lua_type (L, -1) == LUA_TNUMBER) {
cbd->retransmits = lua_tonumber (L, -1);
}
lua_pop (L, 1);

enum rspamd_udp_send_result r;

r = lua_try_send_request (cbd);
if (r == RSPAMD_SENT_OK) {
if (cbd->cbref == -1) {
lua_udp_maybe_free (cbd);
}
else {
if (!lua_udp_maybe_register_event (cbd)) {
lua_pushboolean (L, false);
lua_pushstring (L, "session error");
lua_udp_maybe_free (cbd);

return 2;
}

event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
event_base_set (cbd->ev_base, &cbd->io);
event_add (&cbd->io, &cbd->tv);
cbd->sent = TRUE;
}

lua_pushboolean (L, true);
lua_udp_cbd_fin (cbd);
}
else if (r == RSPAMD_SENT_FAILURE) {
lua_pushboolean (L, false);
lua_pushstring (L, strerror (errno));
lua_udp_cbd_fin (cbd);
lua_udp_maybe_free (cbd);

return 2;
}
else {
/* TODO: add waiting */
event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
event_base_set (cbd->ev_base, &cbd->io);
event_add (&cbd->io, &cbd->tv);

if (!lua_udp_maybe_register_event (cbd)) {
lua_pushboolean (L, false);
lua_pushstring (L, "session error");
lua_udp_maybe_free (cbd);

return 2;
}
}
}
else {

Loading…
İptal
Kaydet