123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594 |
- /*-
- * Copyright 2019 Vsevolod Stakhov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "lua_common.h"
- #include "lua_thread_pool.h"
- #include "utlist.h"
- #include "unix-std.h"
- #include <math.h>
- #include <src/libutil/libev_helper.h>
-
- static const char *M = "rspamd lua udp";
-
- /***
- * @module rspamd_udp
- * Rspamd UDP module is available from the version 1.9.0 and represents a generic
- * UDP asynchronous client available from the LUA code.
- * This module is quite simple: it can either send requests to some address or
- * it can send requests and wait for replies, potentially handling retransmits.
- * @example
- local logger = require "rspamd_logger"
- local udp = require "rspamd_udp"
-
- rspamd_config.SYM = function(task)
- udp.sento{
- host = addr, -- must be ip address object (e.g. received by upstream module)
- port = 500,
- data = {'str1', 'str2'}, -- can be table, string or rspamd_text
- timeout = 0.5, -- default = 1s
- task = task, -- if has task
- session = session, -- optional
- ev_base = ev_base, -- if no task available
- -- You can include callback and then Rspamd will try to read replies
- callback = function(success, data)
- -- success is bool, data is either data or an error (string)
- end,
- retransmits = 0, -- Or more if retransmitting is necessary
- }
- end
- */
-
- static const double default_udp_timeout = 1.0;
-
- LUA_FUNCTION_DEF(udp, sendto);
-
- static const struct luaL_reg udp_libf[] = {
- LUA_INTERFACE_DEF(udp, sendto),
- {NULL, NULL}};
-
- struct lua_udp_cbdata {
- struct ev_loop *event_loop;
- struct rspamd_io_ev ev;
- struct rspamd_async_event *async_ev;
- struct rspamd_task *task;
- rspamd_mempool_t *pool;
- rspamd_inet_addr_t *addr;
- struct rspamd_symcache_dynamic_item *item;
- struct rspamd_async_session *s;
- struct iovec *iov;
- lua_State *L;
- unsigned int retransmits;
- unsigned int iovlen;
- int sock;
- int cbref;
- gboolean sent;
- };
-
- #define msg_debug_udp(...) rspamd_conditional_debug_fast(NULL, cbd->addr, \
- rspamd_lua_udp_log_id, "lua_udp", cbd->pool->tag.uid, \
- G_STRFUNC, \
- __VA_ARGS__)
-
- INIT_LOG_MODULE(lua_udp)
-
- static inline void
- lua_fill_iov(lua_State *L, rspamd_mempool_t *pool,
- struct iovec *iov, int pos)
- {
- if (lua_type(L, pos) == LUA_TUSERDATA) {
- struct rspamd_lua_text *t = lua_check_text(L, pos);
-
- if (t) {
- iov->iov_base = rspamd_mempool_alloc(pool, t->len);
- iov->iov_len = t->len;
- memcpy(iov->iov_base, t->start, t->len);
- }
- }
- else {
- const char *s;
- gsize len;
-
- s = lua_tolstring(L, pos, &len);
-
- iov->iov_base = rspamd_mempool_alloc(pool, len);
- iov->iov_len = len;
- memcpy(iov->iov_base, s, len);
- }
- }
-
- static void
- lua_udp_cbd_fin(gpointer p)
- {
- struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *) p;
-
- if (cbd->sock != -1) {
- rspamd_ev_watcher_stop(cbd->event_loop, &cbd->ev);
- close(cbd->sock);
- }
-
- if (cbd->addr) {
- rspamd_inet_address_free(cbd->addr);
- }
-
- if (cbd->cbref) {
- luaL_unref(cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
- }
- }
-
- static void
- lua_udp_maybe_free(struct lua_udp_cbdata *cbd)
- {
- if (cbd->item) {
- rspamd_symcache_item_async_dec_check(cbd->task, cbd->item, M);
- cbd->item = NULL;
- }
-
- if (cbd->async_ev) {
- rspamd_session_remove_event(cbd->s, lua_udp_cbd_fin, cbd);
- }
- else {
- lua_udp_cbd_fin(cbd);
- }
- }
-
-
- enum rspamd_udp_send_result {
- RSPAMD_SENT_OK,
- RSPAMD_SENT_RETRY,
- RSPAMD_SENT_FAILURE
- };
-
- static enum rspamd_udp_send_result
- lua_try_send_request(struct lua_udp_cbdata *cbd)
- {
- struct msghdr msg;
- int r;
-
- memset(&msg, 0, sizeof(msg));
- msg.msg_iov = cbd->iov;
- msg.msg_iovlen = cbd->iovlen;
- msg.msg_name = rspamd_inet_address_get_sa(cbd->addr, &msg.msg_namelen);
-
- r = sendmsg(cbd->sock, &msg, 0);
-
- if (r != -1) {
- return RSPAMD_SENT_OK;
- }
-
- if (errno == EAGAIN || errno == EINTR) {
- return RSPAMD_SENT_RETRY;
- }
-
- return RSPAMD_SENT_FAILURE;
- }
-
- static void
- lua_udp_maybe_push_error(struct lua_udp_cbdata *cbd, const char *err)
- {
- if (cbd->cbref != -1) {
- int top;
- lua_State *L = cbd->L;
-
- top = lua_gettop(L);
- lua_rawgeti(L, LUA_REGISTRYINDEX, cbd->cbref);
-
- /* Error message */
- lua_pushboolean(L, false);
- lua_pushstring(L, err);
-
- if (cbd->item) {
- rspamd_symcache_set_cur_item(cbd->task, cbd->item);
- }
-
- if (lua_pcall(L, 2, 0, 0) != 0) {
- msg_info("callback call failed: %s", lua_tostring(L, -1));
- }
-
- lua_settop(L, top);
- }
-
- lua_udp_maybe_free(cbd);
- }
-
- static void
- lua_udp_push_data(struct lua_udp_cbdata *cbd, const char *data,
- gssize len)
- {
- if (cbd->cbref != -1) {
- int top;
- lua_State *L = cbd->L;
-
- top = lua_gettop(L);
- lua_rawgeti(L, LUA_REGISTRYINDEX, cbd->cbref);
-
- /* Error message */
- lua_pushboolean(L, true);
- lua_pushlstring(L, data, len);
-
- if (cbd->item) {
- rspamd_symcache_set_cur_item(cbd->task, cbd->item);
- }
-
- if (lua_pcall(L, 2, 0, 0) != 0) {
- msg_info("callback call failed: %s", lua_tostring(L, -1));
- }
-
- lua_settop(L, top);
- }
-
- lua_udp_maybe_free(cbd);
- }
-
- static gboolean
- lua_udp_maybe_register_event(struct lua_udp_cbdata *cbd)
- {
- if (cbd->s && !cbd->async_ev) {
- if (cbd->item) {
- cbd->async_ev = rspamd_session_add_event_full(cbd->s, lua_udp_cbd_fin,
- cbd, M,
- rspamd_symcache_dyn_item_name(cbd->task, cbd->item));
- }
- else {
- cbd->async_ev = rspamd_session_add_event(cbd->s, lua_udp_cbd_fin,
- cbd, M);
- }
-
- if (!cbd->async_ev) {
- return FALSE;
- }
- }
-
- if (cbd->task && !cbd->item) {
- cbd->item = rspamd_symcache_get_cur_item(cbd->task);
- rspamd_symcache_item_async_inc(cbd->task, cbd->item, M);
- }
-
- return TRUE;
- }
-
- static void
- lua_udp_io_handler(int fd, short what, gpointer p)
- {
- struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *) p;
- gssize r;
-
- if (what == EV_TIMEOUT) {
- if (cbd->sent && cbd->retransmits > 0) {
- r = lua_try_send_request(cbd);
-
- if (r == RSPAMD_SENT_OK) {
- rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_READ);
- lua_udp_maybe_register_event(cbd);
- cbd->retransmits--;
- }
- else if (r == RSPAMD_SENT_FAILURE) {
- lua_udp_maybe_push_error(cbd, "write error");
- }
- else {
- cbd->retransmits--;
- rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_WRITE);
- }
- }
- else {
- if (!cbd->sent) {
- lua_udp_maybe_push_error(cbd, "sent timeout");
- }
- else {
- lua_udp_maybe_push_error(cbd, "read timeout");
- }
- }
- }
- else if (what == EV_WRITE) {
- r = lua_try_send_request(cbd);
-
- if (r == RSPAMD_SENT_OK) {
- if (cbd->cbref != -1) {
- rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_READ);
- cbd->sent = TRUE;
- }
- else {
- lua_udp_maybe_free(cbd);
- }
- }
- else if (r == RSPAMD_SENT_FAILURE) {
- lua_udp_maybe_push_error(cbd, "write error");
- }
- else {
- cbd->retransmits--;
- rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_WRITE);
- }
- }
- else if (what == EV_READ) {
- unsigned char udpbuf[4096];
- socklen_t slen;
- struct sockaddr *sa;
-
- sa = rspamd_inet_address_get_sa(cbd->addr, &slen);
-
- r = recvfrom(cbd->sock, udpbuf, sizeof(udpbuf), 0, sa, &slen);
-
- if (r == -1) {
- lua_udp_maybe_push_error(cbd, strerror(errno));
- }
- else {
- lua_udp_push_data(cbd, udpbuf, r);
- }
- }
- }
-
- /***
- * @function rspamd_udp.sendto({params})
- * This function simply sends data to an external UDP service
- *
- * - `task`: rspamd task objects (implies `pool`, `session` and `ev_base` arguments)
- * - `ev_base`: event base (if no task specified)
- * - `session`: events session (no task, optional)
- * - `pool`: memory pool (if no task specified)
- * - `host`: IP or name of the peer (required)
- * - `port`: remote port to use (if `host` has no port part this is required)
- * - `data`: a table of strings or `rspamd_text` objects that contains data pieces
- * - `retransmits`: number of retransmits if needed
- * - `callback`: optional callback if reply should be read
- * @return {boolean} true if request has been sent (additional string if it has not)
- */
- static int
- lua_udp_sendto(lua_State *L)
- {
- LUA_TRACE_POINT;
- const char *host;
- unsigned int port;
- struct ev_loop *ev_base = NULL;
- struct lua_udp_cbdata *cbd;
- struct rspamd_async_session *session = NULL;
- struct rspamd_task *task = NULL;
- rspamd_inet_addr_t *addr;
- rspamd_mempool_t *pool = NULL;
- double timeout = default_udp_timeout;
-
- if (lua_type(L, 1) == LUA_TTABLE) {
- lua_pushstring(L, "port");
- lua_gettable(L, -2);
-
- if (lua_type(L, -1) == LUA_TNUMBER) {
- port = lua_tointeger(L, -1);
- }
- else {
- /* We assume that it is a unix socket */
- port = 0;
- }
-
- lua_pop(L, 1);
-
- lua_pushstring(L, "host");
- lua_gettable(L, -2);
-
- if (lua_type(L, -1) == LUA_TSTRING) {
- host = luaL_checkstring(L, -1);
-
- if (rspamd_parse_inet_address(&addr,
- host, strlen(host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
- if (port != 0) {
- rspamd_inet_address_set_port(addr, port);
- }
- }
- else {
- lua_pop(L, 1);
- return luaL_error(L, "invalid host: %s", host);
- }
- }
- else if (lua_type(L, -1) == LUA_TUSERDATA) {
- struct rspamd_lua_ip *lip;
-
- lip = lua_check_ip(L, -1);
-
- if (lip == NULL || lip->addr == NULL) {
- lua_pop(L, 1);
- return luaL_error(L, "invalid host class");
- }
-
- addr = rspamd_inet_address_copy(lip->addr, NULL);
-
- if (port != 0) {
- rspamd_inet_address_set_port(addr, port);
- }
- }
- else {
- lua_pop(L, 1);
- return luaL_error(L, "invalid host");
- }
-
- lua_pop(L, 1);
-
- lua_pushstring(L, "task");
- lua_gettable(L, -2);
- if (lua_type(L, -1) == LUA_TUSERDATA) {
- task = lua_check_task(L, -1);
- ev_base = task->event_loop;
- session = task->s;
- pool = task->task_pool;
- }
- lua_pop(L, 1);
-
- if (task == NULL) {
- lua_pushstring(L, "ev_base");
- lua_gettable(L, -2);
- if (rspamd_lua_check_udata_maybe(L, -1, rspamd_ev_base_classname)) {
- ev_base = *(struct ev_loop **) lua_touserdata(L, -1);
- }
- else {
- ev_base = NULL;
- }
- lua_pop(L, 1);
-
- lua_pushstring(L, "session");
- lua_gettable(L, -2);
- if (rspamd_lua_check_udata_maybe(L, -1, rspamd_session_classname)) {
- session = *(struct rspamd_async_session **) lua_touserdata(L, -1);
- }
- else {
- session = NULL;
- }
- lua_pop(L, 1);
-
- lua_pushstring(L, "pool");
- lua_gettable(L, -2);
- if (rspamd_lua_check_udata_maybe(L, -1, rspamd_mempool_classname)) {
- pool = *(rspamd_mempool_t **) lua_touserdata(L, -1);
- }
- else {
- pool = NULL;
- }
- lua_pop(L, 1);
- }
-
- lua_pushstring(L, "timeout");
- lua_gettable(L, -2);
- if (lua_type(L, -1) == LUA_TNUMBER) {
- timeout = lua_tonumber(L, -1);
- }
- lua_pop(L, 1);
-
- if (!ev_base || !pool) {
- rspamd_inet_address_free(addr);
-
- return luaL_error(L, "invalid arguments");
- }
-
-
- cbd = rspamd_mempool_alloc0(pool, sizeof(*cbd));
- cbd->event_loop = ev_base;
- cbd->pool = pool;
- cbd->s = session;
- cbd->addr = addr;
- cbd->sock = rspamd_socket_create(rspamd_inet_address_get_af(addr),
- SOCK_DGRAM, 0, TRUE);
- cbd->cbref = -1;
- cbd->ev.timeout = timeout;
-
- if (cbd->sock == -1) {
- rspamd_inet_address_free(addr);
-
- return luaL_error(L, "cannot open socket: %s", strerror(errno));
- }
-
- cbd->L = L;
-
- gsize data_len;
-
- lua_pushstring(L, "data");
- lua_gettable(L, -2);
-
- if (lua_type(L, -1) == LUA_TTABLE) {
- data_len = rspamd_lua_table_size(L, -1);
- cbd->iov = rspamd_mempool_alloc(pool,
- sizeof(*cbd->iov) * data_len);
-
- for (int i = 0; i < data_len; i++) {
- lua_rawgeti(L, -1, i + 1);
- lua_fill_iov(L, pool, &cbd->iov[i], -1);
- lua_pop(L, 1);
- }
-
- cbd->iovlen = data_len;
- }
- else {
- cbd->iov = rspamd_mempool_alloc(pool, sizeof(*cbd->iov));
- cbd->iovlen = 1;
- lua_fill_iov(L, pool, cbd->iov, -1);
- }
-
- lua_pop(L, 1);
-
- lua_pushstring(L, "callback");
- lua_gettable(L, -2);
- if (lua_type(L, -1) == LUA_TFUNCTION) {
- cbd->cbref = luaL_ref(L, LUA_REGISTRYINDEX);
- }
- else {
- lua_pop(L, 1);
- }
-
- lua_pushstring(L, "retransmits");
- lua_gettable(L, -2);
- if (lua_type(L, -1) == LUA_TNUMBER) {
- cbd->retransmits = lua_tonumber(L, -1);
- }
- lua_pop(L, 1);
-
- enum rspamd_udp_send_result r;
-
- r = lua_try_send_request(cbd);
- if (r == RSPAMD_SENT_OK) {
- if (cbd->cbref == -1) {
- lua_udp_maybe_free(cbd);
- }
- else {
- if (!lua_udp_maybe_register_event(cbd)) {
- lua_pushboolean(L, false);
- lua_pushstring(L, "session error");
- lua_udp_maybe_free(cbd);
-
- return 2;
- }
-
- rspamd_ev_watcher_init(&cbd->ev, cbd->sock, EV_READ,
- lua_udp_io_handler, cbd);
- rspamd_ev_watcher_start(cbd->event_loop, &cbd->ev, timeout);
- cbd->sent = TRUE;
- }
-
- lua_pushboolean(L, true);
- }
- else if (r == RSPAMD_SENT_FAILURE) {
- lua_pushboolean(L, false);
- lua_pushstring(L, strerror(errno));
- lua_udp_maybe_free(cbd);
-
- return 2;
- }
- else {
- rspamd_ev_watcher_init(&cbd->ev, cbd->sock, EV_WRITE,
- lua_udp_io_handler, cbd);
- rspamd_ev_watcher_start(cbd->event_loop, &cbd->ev, timeout);
-
- if (!lua_udp_maybe_register_event(cbd)) {
- lua_pushboolean(L, false);
- lua_pushstring(L, "session error");
- lua_udp_maybe_free(cbd);
-
- return 2;
- }
- }
- }
- else {
- return luaL_error(L, "invalid arguments");
- }
-
- return 1;
- }
-
- static int
- lua_load_udp(lua_State *L)
- {
- lua_newtable(L);
- luaL_register(L, NULL, udp_libf);
-
- return 1;
- }
-
- void luaopen_udp(lua_State *L)
- {
- rspamd_lua_add_preload(L, "rspamd_udp", lua_load_udp);
- }
|