You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lua_udp.c 13KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. /*-
  2. * Copyright 2019 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "lua_common.h"
  17. #include "lua_thread_pool.h"
  18. #include "utlist.h"
  19. #include "unix-std.h"
  20. #include <math.h>
  21. static const gchar *M = "rspamd lua udp";
  22. /***
  23. * @module rspamd_udp
  24. * Rspamd UDP module represents generic UDP asynchronous client available from LUA code.
  25. * This module is quite simple: it can either send requests to some address or
  26. * it can send requests and wait for replies, potentially handling retransmits.
  27. * @example
  28. local logger = require "rspamd_logger"
  29. local udp = require "rspamd_udp"
  30. rspamd_config.SYM = function(task)
  31. udp.sento({
  32. host = addr, -- must be ip address object (e.g. received by upstream module)
  33. port = 500,
  34. data = data, -- can be table, string or rspamd_text
  35. timeout = 0.5, -- default = 1s
  36. task = task, -- if has task
  37. session = session, -- optional
  38. ev_base = ev_base, -- if no task available
  39. }
  40. end
  41. */
  42. static const double default_udp_timeout = 1.0;
  43. LUA_FUNCTION_DEF (udp, sendto);
  44. static const struct luaL_reg udp_libf[] = {
  45. LUA_INTERFACE_DEF (udp, sendto),
  46. {NULL, NULL}
  47. };
  48. struct lua_udp_cbdata {
  49. struct event io;
  50. struct timeval tv;
  51. struct event_base *ev_base;
  52. struct rspamd_async_event *async_ev;
  53. struct rspamd_task *task;
  54. rspamd_mempool_t *pool;
  55. rspamd_inet_addr_t *addr;
  56. struct rspamd_symcache_item *item;
  57. struct rspamd_async_session *s;
  58. struct iovec *iov;
  59. lua_State *L;
  60. guint retransmits;
  61. guint iovlen;
  62. gint sock;
  63. gint cbref;
  64. gboolean sent;
  65. };
  66. #define msg_debug_udp(...) rspamd_conditional_debug_fast (NULL, cbd->addr, \
  67. rspamd_lua_udp_log_id, "lua_udp", cbd->pool->tag.uid, \
  68. G_STRFUNC, \
  69. __VA_ARGS__)
  70. INIT_LOG_MODULE(lua_udp)
  71. static inline void
  72. lua_fill_iov (lua_State *L, rspamd_mempool_t *pool,
  73. struct iovec *iov, gint pos)
  74. {
  75. if (lua_type (L, pos) == LUA_TUSERDATA) {
  76. struct rspamd_lua_text *t = lua_check_text (L, pos);
  77. if (t) {
  78. iov->iov_base = rspamd_mempool_alloc (pool, t->len);
  79. iov->iov_len = t->len;
  80. memcpy (iov->iov_base, t->start, t->len);
  81. }
  82. }
  83. else {
  84. const gchar *s;
  85. gsize len;
  86. s = lua_tolstring (L, pos, &len);
  87. iov->iov_base = rspamd_mempool_alloc (pool, len);
  88. iov->iov_len = len;
  89. memcpy (iov->iov_base, s, len);
  90. }
  91. }
  92. static void
  93. lua_udp_cbd_fin (gpointer p)
  94. {
  95. struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *)p;
  96. if (cbd->sock != -1) {
  97. if (rspamd_event_pending (&cbd->io, EV_READ|EV_WRITE)) {
  98. event_del (&cbd->io);
  99. }
  100. close (cbd->sock);
  101. }
  102. if (cbd->addr) {
  103. rspamd_inet_address_free (cbd->addr);
  104. }
  105. if (cbd->cbref) {
  106. luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
  107. }
  108. }
  109. static void
  110. lua_udp_maybe_free (struct lua_udp_cbdata *cbd)
  111. {
  112. if (cbd->item) {
  113. rspamd_symcache_item_async_dec_check (cbd->task, cbd->item, M);
  114. cbd->item = NULL;
  115. }
  116. if (cbd->async_ev) {
  117. rspamd_session_remove_event (cbd->s, lua_udp_cbd_fin, cbd);
  118. }
  119. else {
  120. lua_udp_cbd_fin (cbd);
  121. }
  122. }
  123. enum rspamd_udp_send_result {
  124. RSPAMD_SENT_OK,
  125. RSPAMD_SENT_RETRY,
  126. RSPAMD_SENT_FAILURE
  127. };
  128. static enum rspamd_udp_send_result
  129. lua_try_send_request (struct lua_udp_cbdata *cbd)
  130. {
  131. struct msghdr msg;
  132. gint r;
  133. memset (&msg, 0, sizeof (msg));
  134. msg.msg_iov = cbd->iov;
  135. msg.msg_iovlen = cbd->iovlen;
  136. msg.msg_name = rspamd_inet_address_get_sa (cbd->addr, &msg.msg_namelen);
  137. r = sendmsg (cbd->sock, &msg, 0);
  138. if (r != -1) {
  139. return RSPAMD_SENT_OK;
  140. }
  141. if (errno == EAGAIN || errno == EINTR) {
  142. return RSPAMD_SENT_RETRY;
  143. }
  144. return RSPAMD_SENT_FAILURE;
  145. }
  146. static void
  147. lua_udp_maybe_push_error (struct lua_udp_cbdata *cbd, const gchar *err)
  148. {
  149. if (cbd->cbref != -1) {
  150. gint top;
  151. lua_State *L = cbd->L;
  152. top = lua_gettop (L);
  153. lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
  154. /* Error message */
  155. lua_pushboolean (L, false);
  156. lua_pushstring (L, err);
  157. if (cbd->item) {
  158. rspamd_symcache_set_cur_item (cbd->task, cbd->item);
  159. }
  160. if (lua_pcall (L, 2, 0, 0) != 0) {
  161. msg_info ("callback call failed: %s", lua_tostring (L, -1));
  162. }
  163. lua_settop (L, top);
  164. }
  165. lua_udp_maybe_free (cbd);
  166. }
  167. static void
  168. lua_udp_push_data (struct lua_udp_cbdata *cbd, const gchar *data,
  169. gssize len)
  170. {
  171. if (cbd->cbref != -1) {
  172. gint top;
  173. lua_State *L = cbd->L;
  174. top = lua_gettop (L);
  175. lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref);
  176. /* Error message */
  177. lua_pushboolean (L, true);
  178. lua_pushlstring (L, data, len);
  179. if (cbd->item) {
  180. rspamd_symcache_set_cur_item (cbd->task, cbd->item);
  181. }
  182. if (lua_pcall (L, 2, 0, 0) != 0) {
  183. msg_info ("callback call failed: %s", lua_tostring (L, -1));
  184. }
  185. lua_settop (L, top);
  186. }
  187. lua_udp_maybe_free (cbd);
  188. }
  189. static gboolean
  190. lua_udp_maybe_register_event (struct lua_udp_cbdata *cbd)
  191. {
  192. if (cbd->s && !cbd->async_ev) {
  193. cbd->async_ev = rspamd_session_add_event (cbd->s, lua_udp_cbd_fin,
  194. cbd, M);
  195. if (!cbd->async_ev) {
  196. return FALSE;
  197. }
  198. }
  199. if (cbd->task && !cbd->item) {
  200. cbd->item = rspamd_symcache_get_cur_item (cbd->task);
  201. rspamd_symcache_item_async_inc (cbd->task, cbd->item, M);
  202. }
  203. return TRUE;
  204. }
  205. static void
  206. lua_udp_io_handler (gint fd, short what, gpointer p)
  207. {
  208. struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *)p;
  209. lua_State *L;
  210. gssize r;
  211. L = cbd->L;
  212. event_del (&cbd->io);
  213. if (what == EV_TIMEOUT) {
  214. if (cbd->sent && cbd->retransmits > 0) {
  215. r = lua_try_send_request (cbd);
  216. if (r == RSPAMD_SENT_OK) {
  217. event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
  218. event_base_set (cbd->ev_base, &cbd->io);
  219. event_add (&cbd->io, &cbd->tv);
  220. lua_udp_maybe_register_event (cbd);
  221. cbd->retransmits --;
  222. }
  223. else if (r == RSPAMD_SENT_FAILURE) {
  224. lua_udp_maybe_push_error (cbd, "write error");
  225. }
  226. else {
  227. cbd->retransmits --;
  228. event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
  229. event_base_set (cbd->ev_base, &cbd->io);
  230. event_add (&cbd->io, &cbd->tv);
  231. }
  232. }
  233. else {
  234. if (!cbd->sent) {
  235. lua_udp_maybe_push_error (cbd, "sent timeout");
  236. }
  237. else {
  238. lua_udp_maybe_push_error (cbd, "read timeout");
  239. }
  240. }
  241. }
  242. else if (what == EV_WRITE) {
  243. r = lua_try_send_request (cbd);
  244. if (r == RSPAMD_SENT_OK) {
  245. if (cbd->cbref != -1) {
  246. event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
  247. event_base_set (cbd->ev_base, &cbd->io);
  248. event_add (&cbd->io, &cbd->tv);
  249. cbd->sent = TRUE;
  250. }
  251. else {
  252. lua_udp_maybe_free (cbd);
  253. }
  254. }
  255. else if (r == RSPAMD_SENT_FAILURE) {
  256. lua_udp_maybe_push_error (cbd, "write error");
  257. }
  258. else {
  259. cbd->retransmits --;
  260. event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
  261. event_base_set (cbd->ev_base, &cbd->io);
  262. event_add (&cbd->io, &cbd->tv);
  263. }
  264. }
  265. else if (what == EV_READ) {
  266. guchar udpbuf[4096];
  267. socklen_t slen;
  268. struct sockaddr *sa;
  269. sa = rspamd_inet_address_get_sa (cbd->addr, &slen);
  270. r = recvfrom (cbd->sock, udpbuf, sizeof (udpbuf), 0, sa, &slen);
  271. if (r == -1) {
  272. lua_udp_maybe_push_error (cbd, strerror (errno));
  273. }
  274. else {
  275. lua_udp_push_data (cbd, udpbuf, r);
  276. }
  277. }
  278. }
  279. /***
  280. * @function rspamd_udp.sendto({params})
  281. * This function simply sends data to an external UDP service
  282. *
  283. * - `task`: rspamd task objects (implies `pool`, `session`, `ev_base` and `resolver` arguments)
  284. * - `ev_base`: event base (if no task specified)
  285. * - `session`: events session (no task)
  286. * - `host`: IP or name of the peer (required)
  287. * - `port`: remote port to use (if `host` has no port part this is required)
  288. * - `data`: a table of strings or `rspamd_text` objects that contains data pieces
  289. * @return {boolean} true if request has been sent (additional string if it has not)
  290. */
  291. static gint
  292. lua_udp_sendto (lua_State *L) {
  293. LUA_TRACE_POINT;
  294. const gchar *host;
  295. guint port;
  296. struct event_base *ev_base = NULL;
  297. struct lua_udp_cbdata *cbd;
  298. struct rspamd_async_session *session = NULL;
  299. struct rspamd_task *task = NULL;
  300. rspamd_inet_addr_t *addr;
  301. rspamd_mempool_t *pool = NULL;
  302. gdouble timeout = default_udp_timeout;
  303. if (lua_type (L, 1) == LUA_TTABLE) {
  304. lua_pushstring (L, "port");
  305. lua_gettable (L, -2);
  306. if (lua_type (L, -1) == LUA_TNUMBER) {
  307. port = luaL_checknumber (L, -1);
  308. }
  309. else {
  310. /* We assume that it is a unix socket */
  311. port = 0;
  312. }
  313. lua_pop (L, 1);
  314. lua_pushstring (L, "host");
  315. lua_gettable (L, -2);
  316. if (lua_type (L, -1) == LUA_TSTRING) {
  317. host = luaL_checkstring (L, -1);
  318. if (rspamd_parse_inet_address (&addr, host, 0)) {
  319. if (port != 0) {
  320. rspamd_inet_address_set_port (addr, port);
  321. }
  322. }
  323. else {
  324. lua_pop (L, 1);
  325. return luaL_error (L, "invalid host: %s", host);
  326. }
  327. }
  328. else if (lua_type (L, -1) == LUA_TUSERDATA) {
  329. struct rspamd_lua_ip *lip;
  330. lip = lua_check_ip (L, -1);
  331. if (lip == NULL || lip->addr == NULL) {
  332. lua_pop (L, 1);
  333. return luaL_error (L, "invalid host class");
  334. }
  335. addr = rspamd_inet_address_copy (lip->addr);
  336. if (port != 0) {
  337. rspamd_inet_address_set_port (addr, port);
  338. }
  339. }
  340. else {
  341. lua_pop (L, 1);
  342. return luaL_error (L, "invalid host");
  343. }
  344. lua_pop (L, 1);
  345. lua_pushstring (L, "task");
  346. lua_gettable (L, -2);
  347. if (lua_type (L, -1) == LUA_TUSERDATA) {
  348. task = lua_check_task (L, -1);
  349. ev_base = task->ev_base;
  350. session = task->s;
  351. pool = task->task_pool;
  352. }
  353. lua_pop (L, 1);
  354. if (task == NULL) {
  355. lua_pushstring (L, "ev_base");
  356. lua_gettable (L, -2);
  357. if (rspamd_lua_check_udata_maybe (L, -1, "rspamd{ev_base}")) {
  358. ev_base = *(struct event_base **) lua_touserdata (L, -1);
  359. } else {
  360. ev_base = NULL;
  361. }
  362. lua_pop (L, 1);
  363. lua_pushstring (L, "session");
  364. lua_gettable (L, -2);
  365. if (rspamd_lua_check_udata_maybe (L, -1, "rspamd{session}")) {
  366. session = *(struct rspamd_async_session **) lua_touserdata (L, -1);
  367. } else {
  368. session = NULL;
  369. }
  370. lua_pop (L, 1);
  371. lua_pushstring (L, "session");
  372. lua_gettable (L, -2);
  373. if (rspamd_lua_check_udata_maybe (L, -1, "rspamd{mempool}")) {
  374. pool = *(rspamd_mempool_t **) lua_touserdata (L, -1);
  375. } else {
  376. pool = NULL;
  377. }
  378. lua_pop (L, 1);
  379. }
  380. lua_pushstring (L, "timeout");
  381. lua_gettable (L, -2);
  382. if (lua_type (L, -1) == LUA_TNUMBER) {
  383. timeout = lua_tonumber (L, -1);
  384. }
  385. lua_pop (L, 1);
  386. if (!ev_base || !pool) {
  387. rspamd_inet_address_free (addr);
  388. return luaL_error (L, "invalid arguments");
  389. }
  390. if (!ev_base || !pool) {
  391. rspamd_inet_address_free (addr);
  392. return luaL_error (L, "invalid arguments");
  393. }
  394. cbd = rspamd_mempool_alloc0 (pool, sizeof (*cbd));
  395. cbd->ev_base = ev_base;
  396. cbd->pool = pool;
  397. cbd->s = session;
  398. cbd->addr = addr;
  399. cbd->sock = rspamd_socket_create (rspamd_inet_address_get_af (addr),
  400. SOCK_DGRAM, 0, TRUE);
  401. cbd->cbref = -1;
  402. double_to_tv (timeout, &cbd->tv);
  403. if (cbd->sock == -1) {
  404. rspamd_inet_address_free (addr);
  405. return luaL_error (L, "cannot open socket: %s", strerror (errno));
  406. }
  407. cbd->L = L;
  408. gsize data_len;
  409. lua_pushstring (L, "data");
  410. lua_gettable (L, -2);
  411. if (lua_type (L, -1) == LUA_TTABLE) {
  412. data_len = rspamd_lua_table_size (L, -1);
  413. cbd->iov = rspamd_mempool_alloc (pool,
  414. sizeof (*cbd->iov) * data_len);
  415. for (int i = 0; i < data_len; i ++) {
  416. lua_rawgeti (L, -1, i + 1);
  417. lua_fill_iov (L, pool, &cbd->iov[i], -1);
  418. lua_pop (L, 1);
  419. }
  420. cbd->iovlen = data_len;
  421. }
  422. else {
  423. cbd->iov = rspamd_mempool_alloc (pool, sizeof (*cbd->iov));
  424. cbd->iovlen = 1;
  425. lua_fill_iov (L, pool, cbd->iov, -1);
  426. }
  427. lua_pop (L, 1);
  428. lua_pushstring (L, "callback");
  429. lua_gettable (L, -2);
  430. if (lua_type (L, -1) == LUA_TFUNCTION) {
  431. cbd->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
  432. }
  433. else {
  434. lua_pop (L, 1);
  435. }
  436. lua_pushstring (L, "retransmits");
  437. lua_gettable (L, -2);
  438. if (lua_type (L, -1) == LUA_TNUMBER) {
  439. cbd->retransmits = lua_tonumber (L, -1);
  440. }
  441. lua_pop (L, 1);
  442. enum rspamd_udp_send_result r;
  443. r = lua_try_send_request (cbd);
  444. if (r == RSPAMD_SENT_OK) {
  445. if (cbd->cbref == -1) {
  446. lua_udp_maybe_free (cbd);
  447. }
  448. else {
  449. if (!lua_udp_maybe_register_event (cbd)) {
  450. lua_pushboolean (L, false);
  451. lua_pushstring (L, "session error");
  452. lua_udp_maybe_free (cbd);
  453. return 2;
  454. }
  455. event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd);
  456. event_base_set (cbd->ev_base, &cbd->io);
  457. event_add (&cbd->io, &cbd->tv);
  458. cbd->sent = TRUE;
  459. }
  460. lua_pushboolean (L, true);
  461. }
  462. else if (r == RSPAMD_SENT_FAILURE) {
  463. lua_pushboolean (L, false);
  464. lua_pushstring (L, strerror (errno));
  465. lua_udp_maybe_free (cbd);
  466. return 2;
  467. }
  468. else {
  469. event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd);
  470. event_base_set (cbd->ev_base, &cbd->io);
  471. event_add (&cbd->io, &cbd->tv);
  472. if (!lua_udp_maybe_register_event (cbd)) {
  473. lua_pushboolean (L, false);
  474. lua_pushstring (L, "session error");
  475. lua_udp_maybe_free (cbd);
  476. return 2;
  477. }
  478. }
  479. }
  480. else {
  481. return luaL_error (L, "invalid arguments");
  482. }
  483. return 1;
  484. }
  485. static gint
  486. lua_load_udp (lua_State * L)
  487. {
  488. lua_newtable (L);
  489. luaL_register (L, NULL, udp_libf);
  490. return 1;
  491. }
  492. void
  493. luaopen_udp (lua_State * L)
  494. {
  495. rspamd_lua_add_preload (L, "rspamd_udp", lua_load_udp);
  496. }