You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lua_upstream.c 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635
  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "lua_common.h"
  18. /***
  19. * @module rspamd_upstream_list
  20. * This module implements upstreams manipulation from LUA API. This functionality
  21. * can be used for load balancing using different strategies including:
  22. *
  23. * - round-robin: balance upstreams one by one selecting accordingly to their weight
  24. * - hash: use stable hashing algorithm to distribute values according to some static strings
  25. * - master-slave: always prefer upstream with higher priority unless it is not available
  26. *
  27. * Here is an example of upstreams manipulations:
  28. * @example
  29. local rspamd_logger = require "rspamd_logger"
  30. local rspamd_redis = require "rspamd_redis"
  31. local upstream_list = require "rspamd_upstream_list"
  32. local upstreams = upstream_list.create('127.0.0.1,10.0.0.1,10.0.0.2', 6379)
  33. local function sym_callback(task)
  34. local upstream = upstreams:get_upstream_by_hash(task:get_from()[1]['domain'])
  35. local function cb(task, err, data)
  36. if err then
  37. upstream:fail()
  38. else
  39. upstream:ok()
  40. end
  41. end
  42. local addr = upstream:get_addr()
  43. rspamd_redis.make_request(task, addr, cb,
  44. 'PUSH', {'key', 'value'})
  45. end
  46. */
  47. /* Upstream list functions */
  48. LUA_FUNCTION_DEF (upstream_list, create);
  49. LUA_FUNCTION_DEF (upstream_list, destroy);
  50. LUA_FUNCTION_DEF (upstream_list, all_upstreams);
  51. LUA_FUNCTION_DEF (upstream_list, get_upstream_by_hash);
  52. LUA_FUNCTION_DEF (upstream_list, get_upstream_round_robin);
  53. LUA_FUNCTION_DEF (upstream_list, get_upstream_master_slave);
  54. LUA_FUNCTION_DEF (upstream_list, add_watcher);
  55. static const struct luaL_reg upstream_list_m[] = {
  56. LUA_INTERFACE_DEF (upstream_list, get_upstream_by_hash),
  57. LUA_INTERFACE_DEF (upstream_list, get_upstream_round_robin),
  58. LUA_INTERFACE_DEF (upstream_list, get_upstream_master_slave),
  59. LUA_INTERFACE_DEF (upstream_list, all_upstreams),
  60. LUA_INTERFACE_DEF (upstream_list, add_watcher),
  61. {"__tostring", rspamd_lua_class_tostring},
  62. {"__gc", lua_upstream_list_destroy},
  63. {NULL, NULL}
  64. };
  65. static const struct luaL_reg upstream_list_f[] = {
  66. LUA_INTERFACE_DEF (upstream_list, create),
  67. {NULL, NULL}
  68. };
  69. /* Upstream functions */
  70. LUA_FUNCTION_DEF (upstream, ok);
  71. LUA_FUNCTION_DEF (upstream, fail);
  72. LUA_FUNCTION_DEF (upstream, get_addr);
  73. LUA_FUNCTION_DEF (upstream, destroy);
  74. static const struct luaL_reg upstream_m[] = {
  75. LUA_INTERFACE_DEF (upstream, ok),
  76. LUA_INTERFACE_DEF (upstream, fail),
  77. LUA_INTERFACE_DEF (upstream, get_addr),
  78. {"__tostring", rspamd_lua_class_tostring},
  79. {"__gc", lua_upstream_destroy},
  80. {NULL, NULL}
  81. };
  82. /* Upstream class */
  83. struct rspamd_lua_upstream {
  84. struct upstream *up;
  85. gint upref;
  86. };
  87. static struct rspamd_lua_upstream *
  88. lua_check_upstream (lua_State * L)
  89. {
  90. void *ud = rspamd_lua_check_udata (L, 1, "rspamd{upstream}");
  91. luaL_argcheck (L, ud != NULL, 1, "'upstream' expected");
  92. return ud ? (struct rspamd_lua_upstream *)ud : NULL;
  93. }
  94. /***
  95. * @method upstream:get_addr()
  96. * Get ip of upstream
  97. * @return {ip} ip address object
  98. */
  99. static gint
  100. lua_upstream_get_addr (lua_State *L)
  101. {
  102. LUA_TRACE_POINT;
  103. struct rspamd_lua_upstream *up = lua_check_upstream (L);
  104. if (up) {
  105. rspamd_lua_ip_push (L, rspamd_upstream_addr_next (up->up));
  106. }
  107. else {
  108. lua_pushnil (L);
  109. }
  110. return 1;
  111. }
  112. /***
  113. * @method upstream:fail()
  114. * Indicate upstream failure. After certain amount of failures during specified time frame, an upstream is marked as down and does not participate in rotations.
  115. */
  116. static gint
  117. lua_upstream_fail (lua_State *L)
  118. {
  119. LUA_TRACE_POINT;
  120. struct rspamd_lua_upstream *up = lua_check_upstream (L);
  121. gboolean fail_addr = FALSE;
  122. const gchar *reason = "unknown";
  123. if (up) {
  124. if (lua_isboolean (L, 2)) {
  125. fail_addr = lua_toboolean (L, 2);
  126. if (lua_isstring (L, 3)) {
  127. reason = lua_tostring (L, 3);
  128. }
  129. }
  130. else if (lua_isstring (L, 2)) {
  131. reason = lua_tostring (L, 2);
  132. }
  133. rspamd_upstream_fail (up->up, fail_addr, reason);
  134. }
  135. return 0;
  136. }
  137. /***
  138. * @method upstream:ok()
  139. * Indicates upstream success. Resets errors count for an upstream.
  140. */
  141. static gint
  142. lua_upstream_ok (lua_State *L)
  143. {
  144. LUA_TRACE_POINT;
  145. struct rspamd_lua_upstream *up = lua_check_upstream (L);
  146. if (up) {
  147. rspamd_upstream_ok (up->up);
  148. }
  149. return 0;
  150. }
  151. static gint
  152. lua_upstream_destroy (lua_State *L)
  153. {
  154. LUA_TRACE_POINT;
  155. struct rspamd_lua_upstream *up = lua_check_upstream (L);
  156. if (up) {
  157. /* Remove reference to the parent */
  158. luaL_unref (L, LUA_REGISTRYINDEX, up->upref);
  159. /* Upstream belongs to the upstream list, so no free here */
  160. }
  161. return 0;
  162. }
  163. /* Upstream list class */
  164. static struct upstream_list *
  165. lua_check_upstream_list (lua_State * L)
  166. {
  167. void *ud = rspamd_lua_check_udata (L, 1, "rspamd{upstream_list}");
  168. luaL_argcheck (L, ud != NULL, 1, "'upstream_list' expected");
  169. return ud ? *((struct upstream_list **)ud) : NULL;
  170. }
  171. static struct rspamd_lua_upstream *
  172. lua_push_upstream (lua_State * L, gint up_idx, struct upstream *up)
  173. {
  174. struct rspamd_lua_upstream *lua_ups;
  175. if (up_idx < 0) {
  176. up_idx = lua_gettop (L) + up_idx + 1;
  177. }
  178. lua_ups = lua_newuserdata (L, sizeof (*lua_ups));
  179. lua_ups->up = up;
  180. rspamd_lua_setclass (L, "rspamd{upstream}", -1);
  181. /* Store parent in the upstream to prevent gc */
  182. lua_pushvalue (L, up_idx);
  183. lua_ups->upref = luaL_ref (L, LUA_REGISTRYINDEX);
  184. return lua_ups;
  185. }
  186. /***
  187. * @function upstream_list.create(cfg, def, [default_port])
  188. * Create new upstream list from its string definition in form `<upstream>,<upstream>;<upstream>`
  189. * @param {rspamd_config} cfg configuration reference
  190. * @param {string} def upstream list definition
  191. * @param {number} default_port default port for upstreams
  192. * @return {upstream_list} upstream list structure
  193. */
  194. static gint
  195. lua_upstream_list_create (lua_State *L)
  196. {
  197. LUA_TRACE_POINT;
  198. struct upstream_list *new = NULL, **pnew;
  199. struct rspamd_config *cfg = NULL;
  200. const gchar *def;
  201. guint default_port = 0;
  202. gint top;
  203. if (lua_type (L, 1) == LUA_TUSERDATA) {
  204. cfg = lua_check_config (L, 1);
  205. top = 2;
  206. }
  207. else {
  208. top = 1;
  209. }
  210. if (lua_gettop (L) >= top + 1) {
  211. default_port = luaL_checknumber (L, top + 1);
  212. }
  213. if (lua_type (L, top) == LUA_TSTRING) {
  214. def = luaL_checkstring (L, top);
  215. new = rspamd_upstreams_create (cfg ? cfg->ups_ctx : NULL);
  216. if (rspamd_upstreams_parse_line (new, def, default_port, NULL)) {
  217. pnew = lua_newuserdata (L, sizeof (struct upstream_list *));
  218. rspamd_lua_setclass (L, "rspamd{upstream_list}", -1);
  219. *pnew = new;
  220. }
  221. else {
  222. rspamd_upstreams_destroy (new);
  223. lua_pushnil (L);
  224. }
  225. }
  226. else if (lua_type (L, top) == LUA_TTABLE) {
  227. new = rspamd_upstreams_create (cfg ? cfg->ups_ctx : NULL);
  228. pnew = lua_newuserdata (L, sizeof (struct upstream_list *));
  229. rspamd_lua_setclass (L, "rspamd{upstream_list}", -1);
  230. *pnew = new;
  231. lua_pushvalue (L, top);
  232. for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) {
  233. def = lua_tostring (L, -1);
  234. if (!def || !rspamd_upstreams_parse_line (new, def, default_port, NULL)) {
  235. msg_warn ("cannot parse upstream %s", def);
  236. }
  237. }
  238. lua_pop (L, 1);
  239. }
  240. else {
  241. return luaL_error (L, "invalid arguments");
  242. }
  243. return 1;
  244. }
  245. /**
  246. * Destroy a single upstream list object
  247. * @param L
  248. * @return
  249. */
  250. static gint
  251. lua_upstream_list_destroy (lua_State *L)
  252. {
  253. LUA_TRACE_POINT;
  254. struct upstream_list *upl = lua_check_upstream_list (L);
  255. rspamd_upstreams_destroy (upl);
  256. return 0;
  257. }
  258. /***
  259. * @method upstream_list:get_upstream_by_hash(key)
  260. * Get upstream by hash from key
  261. * @param {string} key a string used as input for stable hash algorithm
  262. * @return {upstream} upstream from a list corresponding to the given key
  263. */
  264. static gint
  265. lua_upstream_list_get_upstream_by_hash (lua_State *L)
  266. {
  267. LUA_TRACE_POINT;
  268. struct upstream_list *upl;
  269. struct upstream *selected;
  270. const gchar *key;
  271. gsize keyl;
  272. upl = lua_check_upstream_list (L);
  273. if (upl) {
  274. key = luaL_checklstring (L, 2, &keyl);
  275. if (key) {
  276. selected = rspamd_upstream_get (upl, RSPAMD_UPSTREAM_HASHED, key,
  277. (guint)keyl);
  278. if (selected) {
  279. lua_push_upstream (L, 1, selected);
  280. }
  281. else {
  282. lua_pushnil (L);
  283. }
  284. }
  285. else {
  286. lua_pushnil (L);
  287. }
  288. }
  289. else {
  290. return luaL_error (L, "invalid arguments");
  291. }
  292. return 1;
  293. }
  294. /***
  295. * @method upstream_list:get_upstream_round_robin()
  296. * Get upstream round robin (by current weight)
  297. * @return {upstream} upstream from a list in round-robin matter
  298. */
  299. static gint
  300. lua_upstream_list_get_upstream_round_robin (lua_State *L)
  301. {
  302. LUA_TRACE_POINT;
  303. struct upstream_list *upl;
  304. struct upstream *selected;
  305. upl = lua_check_upstream_list (L);
  306. if (upl) {
  307. selected = rspamd_upstream_get (upl, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
  308. if (selected) {
  309. lua_push_upstream (L, 1, selected);
  310. }
  311. else {
  312. lua_pushnil (L);
  313. }
  314. }
  315. else {
  316. return luaL_error (L, "invalid arguments");
  317. }
  318. return 1;
  319. }
  320. /***
  321. * @method upstream_list:get_upstream_master_slave()
  322. * Get upstream master slave order (by static priority)
  323. * @return {upstream} upstream from a list in master-slave order
  324. */
  325. static gint
  326. lua_upstream_list_get_upstream_master_slave (lua_State *L)
  327. {
  328. LUA_TRACE_POINT;
  329. struct upstream_list *upl;
  330. struct upstream *selected;
  331. upl = lua_check_upstream_list (L);
  332. if (upl) {
  333. selected = rspamd_upstream_get (upl, RSPAMD_UPSTREAM_MASTER_SLAVE,
  334. NULL,
  335. 0);
  336. if (selected) {
  337. lua_push_upstream (L, 1, selected);
  338. }
  339. else {
  340. lua_pushnil (L);
  341. }
  342. }
  343. else {
  344. return luaL_error (L, "invalid arguments");
  345. }
  346. return 1;
  347. }
  348. struct upstream_foreach_cbdata {
  349. lua_State *L;
  350. gint ups_pos;
  351. };
  352. static void lua_upstream_inserter (struct upstream *up, guint idx, void *ud)
  353. {
  354. struct upstream_foreach_cbdata *cbd = (struct upstream_foreach_cbdata *)ud;
  355. lua_push_upstream (cbd->L, cbd->ups_pos, up);
  356. lua_rawseti (cbd->L, -2, idx + 1);
  357. }
  358. /***
  359. * @method upstream_list:all_upstreams()
  360. * Returns all upstreams for this list
  361. * @return {table|upstream} all upstreams defined
  362. */
  363. static gint
  364. lua_upstream_list_all_upstreams (lua_State *L)
  365. {
  366. LUA_TRACE_POINT;
  367. struct upstream_list *upl;
  368. struct upstream_foreach_cbdata cbd;
  369. upl = lua_check_upstream_list (L);
  370. if (upl) {
  371. cbd.L = L;
  372. cbd.ups_pos = 1;
  373. lua_createtable (L, rspamd_upstreams_count (upl), 0);
  374. rspamd_upstreams_foreach (upl, lua_upstream_inserter, &cbd);
  375. }
  376. else {
  377. return luaL_error (L, "invalid arguments");
  378. }
  379. return 1;
  380. }
  381. static inline enum rspamd_upstreams_watch_event
  382. lua_str_to_upstream_flag (const gchar *str)
  383. {
  384. enum rspamd_upstreams_watch_event fl = 0;
  385. if (strcmp (str, "success") == 0) {
  386. fl = RSPAMD_UPSTREAM_WATCH_SUCCESS;
  387. }
  388. else if (strcmp (str, "failure") == 0) {
  389. fl = RSPAMD_UPSTREAM_WATCH_FAILURE;
  390. }
  391. else if (strcmp (str, "online") == 0) {
  392. fl = RSPAMD_UPSTREAM_WATCH_ONLINE;
  393. }
  394. else if (strcmp (str, "offline") == 0) {
  395. fl = RSPAMD_UPSTREAM_WATCH_OFFLINE;
  396. }
  397. else {
  398. msg_err ("invalid flag: %s", str);
  399. }
  400. return fl;
  401. }
  402. static inline const gchar *
  403. lua_upstream_flag_to_str (enum rspamd_upstreams_watch_event fl)
  404. {
  405. const gchar *res = "unknown";
  406. /* Works with single flags, not combinations */
  407. if (fl & RSPAMD_UPSTREAM_WATCH_SUCCESS) {
  408. res = "success";
  409. }
  410. else if (fl & RSPAMD_UPSTREAM_WATCH_FAILURE) {
  411. res = "failure";
  412. }
  413. else if (fl & RSPAMD_UPSTREAM_WATCH_ONLINE) {
  414. res = "online";
  415. }
  416. else if (fl & RSPAMD_UPSTREAM_WATCH_OFFLINE) {
  417. res = "offline";
  418. }
  419. else {
  420. msg_err ("invalid flag: %d", fl);
  421. }
  422. return res;
  423. }
  424. struct rspamd_lua_upstream_watcher_cbdata {
  425. lua_State *L;
  426. gint cbref;
  427. gint parent_cbref; /* Reference to the upstream list */
  428. struct upstream_list *upl;
  429. };
  430. static void
  431. lua_upstream_watch_func (struct upstream *up,
  432. enum rspamd_upstreams_watch_event event,
  433. guint cur_errors,
  434. void *ud)
  435. {
  436. struct rspamd_lua_upstream_watcher_cbdata *cdata =
  437. (struct rspamd_lua_upstream_watcher_cbdata *)ud;
  438. lua_State *L;
  439. const gchar *what;
  440. gint err_idx;
  441. L = cdata->L;
  442. what = lua_upstream_flag_to_str (event);
  443. lua_pushcfunction (L, &rspamd_lua_traceback);
  444. err_idx = lua_gettop (L);
  445. lua_rawgeti (L, LUA_REGISTRYINDEX, cdata->cbref);
  446. lua_pushstring (L, what);
  447. struct rspamd_lua_upstream *lua_ups = lua_newuserdata (L, sizeof (*lua_ups));
  448. lua_ups->up = up;
  449. rspamd_lua_setclass (L, "rspamd{upstream}", -1);
  450. /* Store parent in the upstream to prevent gc */
  451. lua_rawgeti (L, LUA_REGISTRYINDEX, cdata->parent_cbref);
  452. lua_ups->upref = luaL_ref (L, LUA_REGISTRYINDEX);
  453. lua_pushinteger (L, cur_errors);
  454. if (lua_pcall (L, 3, 0, err_idx) != 0) {
  455. msg_err ("cannot call watch function for upstream: %s", lua_tostring (L, -1));
  456. lua_settop (L, 0);
  457. return;
  458. }
  459. lua_settop (L, 0);
  460. }
  461. static void
  462. lua_upstream_watch_dtor (gpointer ud)
  463. {
  464. struct rspamd_lua_upstream_watcher_cbdata *cdata =
  465. (struct rspamd_lua_upstream_watcher_cbdata *)ud;
  466. luaL_unref (cdata->L, LUA_REGISTRYINDEX, cdata->cbref);
  467. luaL_unref (cdata->L, LUA_REGISTRYINDEX, cdata->parent_cbref);
  468. g_free (cdata);
  469. }
  470. /***
  471. * @method upstream_list:add_watcher(what, cb)
  472. * Add new watcher to the upstream lists events (table or a string):
  473. * - `success` - called whenever upstream successfully used
  474. * - `failure` - called on upstream error
  475. * - `online` - called when upstream is being taken online from offline
  476. * - `offline` - called when upstream is being taken offline from online
  477. * Callback is a function: function(what, upstream, cur_errors) ... end
  478. * @example
  479. ups:add_watcher('success', function(what, up, cur_errors) ... end)
  480. ups:add_watcher({'online', 'offline'}, function(what, up, cur_errors) ... end)
  481. * @return nothing
  482. */
  483. static gint
  484. lua_upstream_list_add_watcher (lua_State *L)
  485. {
  486. LUA_TRACE_POINT;
  487. struct upstream_list *upl;
  488. upl = lua_check_upstream_list (L);
  489. if (upl &&
  490. (lua_type (L, 2) == LUA_TTABLE || lua_type (L, 2) == LUA_TSTRING) &&
  491. lua_type (L, 3) == LUA_TFUNCTION) {
  492. enum rspamd_upstreams_watch_event flags = 0;
  493. struct rspamd_lua_upstream_watcher_cbdata *cdata;
  494. if (lua_type (L, 2) == LUA_TSTRING) {
  495. flags = lua_str_to_upstream_flag (lua_tostring (L, 2));
  496. }
  497. else {
  498. for (lua_pushnil (L); lua_next (L, -2); lua_pop (L, 1)) {
  499. if (lua_isstring (L, -1)) {
  500. flags |= lua_str_to_upstream_flag (lua_tostring (L, -1));
  501. }
  502. else {
  503. lua_pop (L, 1);
  504. return luaL_error (L, "invalid arguments");
  505. }
  506. }
  507. }
  508. cdata = g_malloc0 (sizeof (*cdata));
  509. lua_pushvalue (L, 3); /* callback */
  510. cdata->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
  511. cdata->L = L;
  512. cdata->upl = upl;
  513. lua_pushvalue (L, 1); /* upstream list itself */
  514. cdata->parent_cbref = luaL_ref (L, LUA_REGISTRYINDEX);
  515. rspamd_upstreams_add_watch_callback (upl, flags,
  516. lua_upstream_watch_func, lua_upstream_watch_dtor, cdata);
  517. }
  518. else {
  519. return luaL_error (L, "invalid arguments");
  520. }
  521. return 0;
  522. }
  523. static gint
  524. lua_load_upstream_list (lua_State * L)
  525. {
  526. lua_newtable (L);
  527. luaL_register (L, NULL, upstream_list_f);
  528. return 1;
  529. }
  530. void
  531. luaopen_upstream (lua_State * L)
  532. {
  533. rspamd_lua_new_class (L, "rspamd{upstream_list}", upstream_list_m);
  534. lua_pop (L, 1);
  535. rspamd_lua_add_preload (L, "rspamd_upstream_list", lua_load_upstream_list);
  536. rspamd_lua_new_class (L, "rspamd{upstream}", upstream_m);
  537. lua_pop (L, 1);
  538. }