You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lua_udp.c 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594
  1. /*-
  2. * Copyright 2019 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "lua_common.h"
  17. #include "lua_thread_pool.h"
  18. #include "utlist.h"
  19. #include "unix-std.h"
  20. #include <math.h>
  21. #include <src/libutil/libev_helper.h>
  22. static const char *M = "rspamd lua udp";
  23. /***
  24. * @module rspamd_udp
  25. * Rspamd UDP module is available from the version 1.9.0 and represents a generic
  26. * UDP asynchronous client available from the LUA code.
  27. * This module is quite simple: it can either send requests to some address or
  28. * it can send requests and wait for replies, potentially handling retransmits.
  29. * @example
  30. local logger = require "rspamd_logger"
  31. local udp = require "rspamd_udp"
  32. rspamd_config.SYM = function(task)
  33. udp.sento{
  34. host = addr, -- must be ip address object (e.g. received by upstream module)
  35. port = 500,
  36. data = {'str1', 'str2'}, -- can be table, string or rspamd_text
  37. timeout = 0.5, -- default = 1s
  38. task = task, -- if has task
  39. session = session, -- optional
  40. ev_base = ev_base, -- if no task available
  41. -- You can include callback and then Rspamd will try to read replies
  42. callback = function(success, data)
  43. -- success is bool, data is either data or an error (string)
  44. end,
  45. retransmits = 0, -- Or more if retransmitting is necessary
  46. }
  47. end
  48. */
  49. static const double default_udp_timeout = 1.0;
  50. LUA_FUNCTION_DEF(udp, sendto);
  51. static const struct luaL_reg udp_libf[] = {
  52. LUA_INTERFACE_DEF(udp, sendto),
  53. {NULL, NULL}};
  54. struct lua_udp_cbdata {
  55. struct ev_loop *event_loop;
  56. struct rspamd_io_ev ev;
  57. struct rspamd_async_event *async_ev;
  58. struct rspamd_task *task;
  59. rspamd_mempool_t *pool;
  60. rspamd_inet_addr_t *addr;
  61. struct rspamd_symcache_dynamic_item *item;
  62. struct rspamd_async_session *s;
  63. struct iovec *iov;
  64. lua_State *L;
  65. unsigned int retransmits;
  66. unsigned int iovlen;
  67. int sock;
  68. int cbref;
  69. gboolean sent;
  70. };
  71. #define msg_debug_udp(...) rspamd_conditional_debug_fast(NULL, cbd->addr, \
  72. rspamd_lua_udp_log_id, "lua_udp", cbd->pool->tag.uid, \
  73. G_STRFUNC, \
  74. __VA_ARGS__)
  75. INIT_LOG_MODULE(lua_udp)
  76. static inline void
  77. lua_fill_iov(lua_State *L, rspamd_mempool_t *pool,
  78. struct iovec *iov, int pos)
  79. {
  80. if (lua_type(L, pos) == LUA_TUSERDATA) {
  81. struct rspamd_lua_text *t = lua_check_text(L, pos);
  82. if (t) {
  83. iov->iov_base = rspamd_mempool_alloc(pool, t->len);
  84. iov->iov_len = t->len;
  85. memcpy(iov->iov_base, t->start, t->len);
  86. }
  87. }
  88. else {
  89. const char *s;
  90. gsize len;
  91. s = lua_tolstring(L, pos, &len);
  92. iov->iov_base = rspamd_mempool_alloc(pool, len);
  93. iov->iov_len = len;
  94. memcpy(iov->iov_base, s, len);
  95. }
  96. }
  97. static void
  98. lua_udp_cbd_fin(gpointer p)
  99. {
  100. struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *) p;
  101. if (cbd->sock != -1) {
  102. rspamd_ev_watcher_stop(cbd->event_loop, &cbd->ev);
  103. close(cbd->sock);
  104. }
  105. if (cbd->addr) {
  106. rspamd_inet_address_free(cbd->addr);
  107. }
  108. if (cbd->cbref) {
  109. luaL_unref(cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
  110. }
  111. }
  112. static void
  113. lua_udp_maybe_free(struct lua_udp_cbdata *cbd)
  114. {
  115. if (cbd->item) {
  116. rspamd_symcache_item_async_dec_check(cbd->task, cbd->item, M);
  117. cbd->item = NULL;
  118. }
  119. if (cbd->async_ev) {
  120. rspamd_session_remove_event(cbd->s, lua_udp_cbd_fin, cbd);
  121. }
  122. else {
  123. lua_udp_cbd_fin(cbd);
  124. }
  125. }
  126. enum rspamd_udp_send_result {
  127. RSPAMD_SENT_OK,
  128. RSPAMD_SENT_RETRY,
  129. RSPAMD_SENT_FAILURE
  130. };
  131. static enum rspamd_udp_send_result
  132. lua_try_send_request(struct lua_udp_cbdata *cbd)
  133. {
  134. struct msghdr msg;
  135. int r;
  136. memset(&msg, 0, sizeof(msg));
  137. msg.msg_iov = cbd->iov;
  138. msg.msg_iovlen = cbd->iovlen;
  139. msg.msg_name = rspamd_inet_address_get_sa(cbd->addr, &msg.msg_namelen);
  140. r = sendmsg(cbd->sock, &msg, 0);
  141. if (r != -1) {
  142. return RSPAMD_SENT_OK;
  143. }
  144. if (errno == EAGAIN || errno == EINTR) {
  145. return RSPAMD_SENT_RETRY;
  146. }
  147. return RSPAMD_SENT_FAILURE;
  148. }
  149. static void
  150. lua_udp_maybe_push_error(struct lua_udp_cbdata *cbd, const char *err)
  151. {
  152. if (cbd->cbref != -1) {
  153. int top;
  154. lua_State *L = cbd->L;
  155. top = lua_gettop(L);
  156. lua_rawgeti(L, LUA_REGISTRYINDEX, cbd->cbref);
  157. /* Error message */
  158. lua_pushboolean(L, false);
  159. lua_pushstring(L, err);
  160. if (cbd->item) {
  161. rspamd_symcache_set_cur_item(cbd->task, cbd->item);
  162. }
  163. if (lua_pcall(L, 2, 0, 0) != 0) {
  164. msg_info("callback call failed: %s", lua_tostring(L, -1));
  165. }
  166. lua_settop(L, top);
  167. }
  168. lua_udp_maybe_free(cbd);
  169. }
  170. static void
  171. lua_udp_push_data(struct lua_udp_cbdata *cbd, const char *data,
  172. gssize len)
  173. {
  174. if (cbd->cbref != -1) {
  175. int top;
  176. lua_State *L = cbd->L;
  177. top = lua_gettop(L);
  178. lua_rawgeti(L, LUA_REGISTRYINDEX, cbd->cbref);
  179. /* Error message */
  180. lua_pushboolean(L, true);
  181. lua_pushlstring(L, data, len);
  182. if (cbd->item) {
  183. rspamd_symcache_set_cur_item(cbd->task, cbd->item);
  184. }
  185. if (lua_pcall(L, 2, 0, 0) != 0) {
  186. msg_info("callback call failed: %s", lua_tostring(L, -1));
  187. }
  188. lua_settop(L, top);
  189. }
  190. lua_udp_maybe_free(cbd);
  191. }
  192. static gboolean
  193. lua_udp_maybe_register_event(struct lua_udp_cbdata *cbd)
  194. {
  195. if (cbd->s && !cbd->async_ev) {
  196. if (cbd->item) {
  197. cbd->async_ev = rspamd_session_add_event_full(cbd->s, lua_udp_cbd_fin,
  198. cbd, M,
  199. rspamd_symcache_dyn_item_name(cbd->task, cbd->item));
  200. }
  201. else {
  202. cbd->async_ev = rspamd_session_add_event(cbd->s, lua_udp_cbd_fin,
  203. cbd, M);
  204. }
  205. if (!cbd->async_ev) {
  206. return FALSE;
  207. }
  208. }
  209. if (cbd->task && !cbd->item) {
  210. cbd->item = rspamd_symcache_get_cur_item(cbd->task);
  211. rspamd_symcache_item_async_inc(cbd->task, cbd->item, M);
  212. }
  213. return TRUE;
  214. }
  215. static void
  216. lua_udp_io_handler(int fd, short what, gpointer p)
  217. {
  218. struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *) p;
  219. gssize r;
  220. if (what == EV_TIMEOUT) {
  221. if (cbd->sent && cbd->retransmits > 0) {
  222. r = lua_try_send_request(cbd);
  223. if (r == RSPAMD_SENT_OK) {
  224. rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_READ);
  225. lua_udp_maybe_register_event(cbd);
  226. cbd->retransmits--;
  227. }
  228. else if (r == RSPAMD_SENT_FAILURE) {
  229. lua_udp_maybe_push_error(cbd, "write error");
  230. }
  231. else {
  232. cbd->retransmits--;
  233. rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_WRITE);
  234. }
  235. }
  236. else {
  237. if (!cbd->sent) {
  238. lua_udp_maybe_push_error(cbd, "sent timeout");
  239. }
  240. else {
  241. lua_udp_maybe_push_error(cbd, "read timeout");
  242. }
  243. }
  244. }
  245. else if (what == EV_WRITE) {
  246. r = lua_try_send_request(cbd);
  247. if (r == RSPAMD_SENT_OK) {
  248. if (cbd->cbref != -1) {
  249. rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_READ);
  250. cbd->sent = TRUE;
  251. }
  252. else {
  253. lua_udp_maybe_free(cbd);
  254. }
  255. }
  256. else if (r == RSPAMD_SENT_FAILURE) {
  257. lua_udp_maybe_push_error(cbd, "write error");
  258. }
  259. else {
  260. cbd->retransmits--;
  261. rspamd_ev_watcher_reschedule(cbd->event_loop, &cbd->ev, EV_WRITE);
  262. }
  263. }
  264. else if (what == EV_READ) {
  265. unsigned char udpbuf[4096];
  266. socklen_t slen;
  267. struct sockaddr *sa;
  268. sa = rspamd_inet_address_get_sa(cbd->addr, &slen);
  269. r = recvfrom(cbd->sock, udpbuf, sizeof(udpbuf), 0, sa, &slen);
  270. if (r == -1) {
  271. lua_udp_maybe_push_error(cbd, strerror(errno));
  272. }
  273. else {
  274. lua_udp_push_data(cbd, udpbuf, r);
  275. }
  276. }
  277. }
  278. /***
  279. * @function rspamd_udp.sendto({params})
  280. * This function simply sends data to an external UDP service
  281. *
  282. * - `task`: rspamd task objects (implies `pool`, `session` and `ev_base` arguments)
  283. * - `ev_base`: event base (if no task specified)
  284. * - `session`: events session (no task, optional)
  285. * - `pool`: memory pool (if no task specified)
  286. * - `host`: IP or name of the peer (required)
  287. * - `port`: remote port to use (if `host` has no port part this is required)
  288. * - `data`: a table of strings or `rspamd_text` objects that contains data pieces
  289. * - `retransmits`: number of retransmits if needed
  290. * - `callback`: optional callback if reply should be read
  291. * @return {boolean} true if request has been sent (additional string if it has not)
  292. */
  293. static int
  294. lua_udp_sendto(lua_State *L)
  295. {
  296. LUA_TRACE_POINT;
  297. const char *host;
  298. unsigned int port;
  299. struct ev_loop *ev_base = NULL;
  300. struct lua_udp_cbdata *cbd;
  301. struct rspamd_async_session *session = NULL;
  302. struct rspamd_task *task = NULL;
  303. rspamd_inet_addr_t *addr;
  304. rspamd_mempool_t *pool = NULL;
  305. double timeout = default_udp_timeout;
  306. if (lua_type(L, 1) == LUA_TTABLE) {
  307. lua_pushstring(L, "port");
  308. lua_gettable(L, -2);
  309. if (lua_type(L, -1) == LUA_TNUMBER) {
  310. port = lua_tointeger(L, -1);
  311. }
  312. else {
  313. /* We assume that it is a unix socket */
  314. port = 0;
  315. }
  316. lua_pop(L, 1);
  317. lua_pushstring(L, "host");
  318. lua_gettable(L, -2);
  319. if (lua_type(L, -1) == LUA_TSTRING) {
  320. host = luaL_checkstring(L, -1);
  321. if (rspamd_parse_inet_address(&addr,
  322. host, strlen(host), RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
  323. if (port != 0) {
  324. rspamd_inet_address_set_port(addr, port);
  325. }
  326. }
  327. else {
  328. lua_pop(L, 1);
  329. return luaL_error(L, "invalid host: %s", host);
  330. }
  331. }
  332. else if (lua_type(L, -1) == LUA_TUSERDATA) {
  333. struct rspamd_lua_ip *lip;
  334. lip = lua_check_ip(L, -1);
  335. if (lip == NULL || lip->addr == NULL) {
  336. lua_pop(L, 1);
  337. return luaL_error(L, "invalid host class");
  338. }
  339. addr = rspamd_inet_address_copy(lip->addr, NULL);
  340. if (port != 0) {
  341. rspamd_inet_address_set_port(addr, port);
  342. }
  343. }
  344. else {
  345. lua_pop(L, 1);
  346. return luaL_error(L, "invalid host");
  347. }
  348. lua_pop(L, 1);
  349. lua_pushstring(L, "task");
  350. lua_gettable(L, -2);
  351. if (lua_type(L, -1) == LUA_TUSERDATA) {
  352. task = lua_check_task(L, -1);
  353. ev_base = task->event_loop;
  354. session = task->s;
  355. pool = task->task_pool;
  356. }
  357. lua_pop(L, 1);
  358. if (task == NULL) {
  359. lua_pushstring(L, "ev_base");
  360. lua_gettable(L, -2);
  361. if (rspamd_lua_check_udata_maybe(L, -1, rspamd_ev_base_classname)) {
  362. ev_base = *(struct ev_loop **) lua_touserdata(L, -1);
  363. }
  364. else {
  365. ev_base = NULL;
  366. }
  367. lua_pop(L, 1);
  368. lua_pushstring(L, "session");
  369. lua_gettable(L, -2);
  370. if (rspamd_lua_check_udata_maybe(L, -1, rspamd_session_classname)) {
  371. session = *(struct rspamd_async_session **) lua_touserdata(L, -1);
  372. }
  373. else {
  374. session = NULL;
  375. }
  376. lua_pop(L, 1);
  377. lua_pushstring(L, "pool");
  378. lua_gettable(L, -2);
  379. if (rspamd_lua_check_udata_maybe(L, -1, rspamd_mempool_classname)) {
  380. pool = *(rspamd_mempool_t **) lua_touserdata(L, -1);
  381. }
  382. else {
  383. pool = NULL;
  384. }
  385. lua_pop(L, 1);
  386. }
  387. lua_pushstring(L, "timeout");
  388. lua_gettable(L, -2);
  389. if (lua_type(L, -1) == LUA_TNUMBER) {
  390. timeout = lua_tonumber(L, -1);
  391. }
  392. lua_pop(L, 1);
  393. if (!ev_base || !pool) {
  394. rspamd_inet_address_free(addr);
  395. return luaL_error(L, "invalid arguments");
  396. }
  397. cbd = rspamd_mempool_alloc0(pool, sizeof(*cbd));
  398. cbd->event_loop = ev_base;
  399. cbd->pool = pool;
  400. cbd->s = session;
  401. cbd->addr = addr;
  402. cbd->sock = rspamd_socket_create(rspamd_inet_address_get_af(addr),
  403. SOCK_DGRAM, 0, TRUE);
  404. cbd->cbref = -1;
  405. cbd->ev.timeout = timeout;
  406. if (cbd->sock == -1) {
  407. rspamd_inet_address_free(addr);
  408. return luaL_error(L, "cannot open socket: %s", strerror(errno));
  409. }
  410. cbd->L = L;
  411. gsize data_len;
  412. lua_pushstring(L, "data");
  413. lua_gettable(L, -2);
  414. if (lua_type(L, -1) == LUA_TTABLE) {
  415. data_len = rspamd_lua_table_size(L, -1);
  416. cbd->iov = rspamd_mempool_alloc(pool,
  417. sizeof(*cbd->iov) * data_len);
  418. for (int i = 0; i < data_len; i++) {
  419. lua_rawgeti(L, -1, i + 1);
  420. lua_fill_iov(L, pool, &cbd->iov[i], -1);
  421. lua_pop(L, 1);
  422. }
  423. cbd->iovlen = data_len;
  424. }
  425. else {
  426. cbd->iov = rspamd_mempool_alloc(pool, sizeof(*cbd->iov));
  427. cbd->iovlen = 1;
  428. lua_fill_iov(L, pool, cbd->iov, -1);
  429. }
  430. lua_pop(L, 1);
  431. lua_pushstring(L, "callback");
  432. lua_gettable(L, -2);
  433. if (lua_type(L, -1) == LUA_TFUNCTION) {
  434. cbd->cbref = luaL_ref(L, LUA_REGISTRYINDEX);
  435. }
  436. else {
  437. lua_pop(L, 1);
  438. }
  439. lua_pushstring(L, "retransmits");
  440. lua_gettable(L, -2);
  441. if (lua_type(L, -1) == LUA_TNUMBER) {
  442. cbd->retransmits = lua_tonumber(L, -1);
  443. }
  444. lua_pop(L, 1);
  445. enum rspamd_udp_send_result r;
  446. r = lua_try_send_request(cbd);
  447. if (r == RSPAMD_SENT_OK) {
  448. if (cbd->cbref == -1) {
  449. lua_udp_maybe_free(cbd);
  450. }
  451. else {
  452. if (!lua_udp_maybe_register_event(cbd)) {
  453. lua_pushboolean(L, false);
  454. lua_pushstring(L, "session error");
  455. lua_udp_maybe_free(cbd);
  456. return 2;
  457. }
  458. rspamd_ev_watcher_init(&cbd->ev, cbd->sock, EV_READ,
  459. lua_udp_io_handler, cbd);
  460. rspamd_ev_watcher_start(cbd->event_loop, &cbd->ev, timeout);
  461. cbd->sent = TRUE;
  462. }
  463. lua_pushboolean(L, true);
  464. }
  465. else if (r == RSPAMD_SENT_FAILURE) {
  466. lua_pushboolean(L, false);
  467. lua_pushstring(L, strerror(errno));
  468. lua_udp_maybe_free(cbd);
  469. return 2;
  470. }
  471. else {
  472. rspamd_ev_watcher_init(&cbd->ev, cbd->sock, EV_WRITE,
  473. lua_udp_io_handler, cbd);
  474. rspamd_ev_watcher_start(cbd->event_loop, &cbd->ev, timeout);
  475. if (!lua_udp_maybe_register_event(cbd)) {
  476. lua_pushboolean(L, false);
  477. lua_pushstring(L, "session error");
  478. lua_udp_maybe_free(cbd);
  479. return 2;
  480. }
  481. }
  482. }
  483. else {
  484. return luaL_error(L, "invalid arguments");
  485. }
  486. return 1;
  487. }
  488. static int
  489. lua_load_udp(lua_State *L)
  490. {
  491. lua_newtable(L);
  492. luaL_register(L, NULL, udp_libf);
  493. return 1;
  494. }
  495. void luaopen_udp(lua_State *L)
  496. {
  497. rspamd_lua_add_preload(L, "rspamd_udp", lua_load_udp);
  498. }