You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lua_worker.c 9.8KB


  1. /* Copyright (c) 2010-2012, Vsevolod Stakhov
  2. * All rights reserved.
  3. *
  4. * Redistribution and use in source and binary forms, with or without
  5. * modification, are permitted provided that the following conditions are met:
  6. * * Redistributions of source code must retain the above copyright
  7. * notice, this list of conditions and the following disclaimer.
  8. * * Redistributions in binary form must reproduce the above copyright
  9. * notice, this list of conditions and the following disclaimer in the
  10. * documentation and/or other materials provided with the distribution.
  11. *
  12. * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
  13. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  14. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  15. * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
  16. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  17. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  18. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  19. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  20. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  21. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  22. */
  23. #include "config.h"
  24. #include "util.h"
  25. #include "rspamd.h"
  26. #include "libserver/worker_util.h"
  27. #include "protocol.h"
  28. #include "upstream.h"
  29. #include "cfg_file.h"
  30. #include "url.h"
  31. #include "message.h"
  32. #include "map.h"
  33. #include "dns.h"
  34. #include "unix-std.h"
  35. #include "lua/lua_common.h"
  36. #ifdef WITH_GPERF_TOOLS
  37. # include <glib/gprintf.h>
  38. #endif
  39. /* 60 seconds for worker's IO */
  40. #define DEFAULT_WORKER_IO_TIMEOUT 60000
  41. gpointer init_lua_worker (struct rspamd_config *cfg);
  42. void start_lua_worker (struct rspamd_worker *worker);
  43. worker_t lua_worker = {
  44. "lua", /* Name */
  45. init_lua_worker, /* Init function */
  46. start_lua_worker, /* Start function */
  47. TRUE, /* Has socket */
  48. FALSE, /* Non unique */
  49. FALSE, /* Non threaded */
  50. TRUE, /* Killable */
  51. SOCK_STREAM /* TCP socket */
  52. };
  53. /*
  54. * Worker's context
  55. */
  56. struct rspamd_lua_worker_ctx {
  57. /* DNS resolver */
  58. struct rspamd_dns_resolver *resolver;
  59. /* Events base */
  60. struct event_base *ev_base;
  61. /* Other params */
  62. GHashTable *params;
  63. /* Lua script to load */
  64. gchar *file;
  65. /* Lua state */
  66. lua_State *L;
  67. /* Callback for accept */
  68. gint cbref_accept;
  69. /* Callback for finishing */
  70. gint cbref_fin;
  71. /* Config file */
  72. struct rspamd_config *cfg;
  73. /* The rest options */
  74. ucl_object_t *opts;
  75. };
  76. /* Lua bindings */
  77. LUA_FUNCTION_DEF (worker, get_ev_base);
  78. LUA_FUNCTION_DEF (worker, register_accept_callback);
  79. LUA_FUNCTION_DEF (worker, register_exit_callback);
  80. LUA_FUNCTION_DEF (worker, get_option);
  81. LUA_FUNCTION_DEF (worker, get_resolver);
  82. LUA_FUNCTION_DEF (worker, get_cfg);
  83. static const struct luaL_reg lua_workerlib_m[] = {
  84. LUA_INTERFACE_DEF (worker, get_ev_base),
  85. LUA_INTERFACE_DEF (worker, register_accept_callback),
  86. LUA_INTERFACE_DEF (worker, register_exit_callback),
  87. LUA_INTERFACE_DEF (worker, get_option),
  88. LUA_INTERFACE_DEF (worker, get_resolver),
  89. LUA_INTERFACE_DEF (worker, get_cfg),
  90. {"__tostring", rspamd_lua_class_tostring},
  91. {NULL, NULL}
  92. };
  93. /* Basic functions of LUA API for worker object */
  94. static gint
  95. luaopen_lua_worker (lua_State * L)
  96. {
  97. rspamd_lua_new_class (L, "rspamd{worker}", lua_workerlib_m);
  98. luaL_register (L, "rspamd_worker", null_reg);
  99. lua_pop (L, 1); /* remove metatable from stack */
  100. return 1;
  101. }
  102. struct rspamd_lua_worker_ctx *
  103. lua_check_lua_worker (lua_State * L)
  104. {
  105. void *ud = luaL_checkudata (L, 1, "rspamd{worker}");
  106. luaL_argcheck (L, ud != NULL, 1, "'worker' expected");
  107. return ud ? *((struct rspamd_lua_worker_ctx **)ud) : NULL;
  108. }
  109. static int
  110. lua_worker_get_ev_base (lua_State *L)
  111. {
  112. struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L);
  113. struct event_base **pbase;
  114. if (ctx) {
  115. pbase = lua_newuserdata (L, sizeof (struct event_base *));
  116. rspamd_lua_setclass (L, "rspamd{ev_base}", -1);
  117. *pbase = ctx->ev_base;
  118. }
  119. else {
  120. lua_pushnil (L);
  121. }
  122. return 1;
  123. }
  124. static int
  125. lua_worker_register_accept_callback (lua_State *L)
  126. {
  127. struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L);
  128. if (ctx) {
  129. if (!lua_isfunction (L, 2)) {
  130. msg_err ("invalid callback passed");
  131. lua_pushnil (L);
  132. }
  133. else {
  134. lua_pushvalue (L, 2);
  135. ctx->cbref_accept = luaL_ref (L, LUA_REGISTRYINDEX);
  136. return 0;
  137. }
  138. }
  139. else {
  140. lua_pushnil (L);
  141. }
  142. return 1;
  143. }
  144. static int
  145. lua_worker_register_exit_callback (lua_State *L)
  146. {
  147. struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L);
  148. if (ctx) {
  149. if (!lua_isfunction (L, 2)) {
  150. msg_err ("invalid callback passed");
  151. lua_pushnil (L);
  152. }
  153. else {
  154. lua_pushvalue (L, 2);
  155. ctx->cbref_fin = luaL_ref (L, LUA_REGISTRYINDEX);
  156. return 0;
  157. }
  158. }
  159. else {
  160. lua_pushnil (L);
  161. }
  162. return 1;
  163. }
  164. /* XXX: This fucntions should be rewritten completely */
  165. static int
  166. lua_worker_get_option (lua_State *L)
  167. {
  168. struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L);
  169. const ucl_object_t *val;
  170. const gchar *name;
  171. if (ctx) {
  172. name = luaL_checkstring (L, 2);
  173. if (name == NULL) {
  174. msg_err ("no name specified");
  175. lua_pushnil (L);
  176. }
  177. else {
  178. val = ucl_object_find_key (ctx->opts, name);
  179. if (val == NULL) {
  180. lua_pushnil (L);
  181. }
  182. else {
  183. ucl_object_push_lua (L, val, TRUE);
  184. }
  185. }
  186. }
  187. else {
  188. lua_pushnil (L);
  189. }
  190. return 1;
  191. }
  192. static int
  193. lua_worker_get_resolver (lua_State *L)
  194. {
  195. struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L);
  196. struct rspamd_dns_resolver **presolver;
  197. if (ctx) {
  198. presolver = lua_newuserdata (L, sizeof (gpointer));
  199. rspamd_lua_setclass (L, "rspamd{resolver}", -1);
  200. *presolver = ctx->resolver;
  201. }
  202. else {
  203. lua_pushnil (L);
  204. }
  205. return 1;
  206. }
  207. static int
  208. lua_worker_get_cfg (lua_State *L)
  209. {
  210. struct rspamd_lua_worker_ctx *ctx = lua_check_lua_worker (L);
  211. struct rspamd_config **pcfg;
  212. if (ctx) {
  213. pcfg = lua_newuserdata (L, sizeof (gpointer));
  214. rspamd_lua_setclass (L, "rspamd{config}", -1);
  215. *pcfg = ctx->cfg;
  216. }
  217. else {
  218. lua_pushnil (L);
  219. }
  220. return 1;
  221. }
  222. /* End of lua API */
  223. /*
  224. * Accept new connection and construct task
  225. */
  226. static void
  227. lua_accept_socket (gint fd, short what, void *arg)
  228. {
  229. struct rspamd_worker *worker = (struct rspamd_worker *) arg;
  230. struct rspamd_lua_worker_ctx *ctx, **pctx;
  231. gint nfd;
  232. lua_State *L;
  233. rspamd_inet_addr_t *addr;
  234. ctx = worker->ctx;
  235. L = ctx->L;
  236. if ((nfd =
  237. rspamd_accept_from_socket (fd, &addr)) == -1) {
  238. msg_warn ("accept failed: %s", strerror (errno));
  239. return;
  240. }
  241. /* Check for EAGAIN */
  242. if (nfd == 0) {
  243. return;
  244. }
  245. msg_info ("accepted connection from %s port %d",
  246. rspamd_inet_address_to_string (addr),
  247. rspamd_inet_address_get_port (addr));
  248. /* Call finalizer function */
  249. lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->cbref_accept);
  250. pctx = lua_newuserdata (L, sizeof (gpointer));
  251. rspamd_lua_setclass (L, "rspamd{worker}", -1);
  252. *pctx = ctx;
  253. lua_pushinteger (L, nfd);
  254. rspamd_lua_ip_push (L, addr);
  255. lua_pushinteger (L, 0);
  256. if (lua_pcall (L, 4, 0, 0) != 0) {
  257. msg_info ("call to worker accept failed: %s", lua_tostring (L, -1));
  258. }
  259. rspamd_inet_address_destroy (addr);
  260. }
  261. static gboolean
  262. rspamd_lua_worker_parser (ucl_object_t *obj, gpointer ud)
  263. {
  264. struct rspamd_lua_worker_ctx *ctx = ud;
  265. ctx->opts = obj;
  266. return TRUE;
  267. }
  268. gpointer
  269. init_lua_worker (struct rspamd_config *cfg)
  270. {
  271. struct rspamd_lua_worker_ctx *ctx;
  272. GQuark type;
  273. type = g_quark_try_string ("lua");
  274. ctx = g_malloc0 (sizeof (struct rspamd_lua_worker_ctx));
  275. ctx->params = g_hash_table_new_full (rspamd_str_hash,
  276. rspamd_str_equal,
  277. g_free,
  278. (GDestroyNotify)g_list_free);
  279. rspamd_rcl_register_worker_option (cfg, type, "file",
  280. rspamd_rcl_parse_struct_string, ctx,
  281. G_STRUCT_OFFSET (struct rspamd_lua_worker_ctx, file), 0);
  282. rspamd_rcl_register_worker_parser (cfg, type, rspamd_lua_worker_parser,
  283. ctx);
  284. return ctx;
  285. }
  286. /*
  287. * Start worker process
  288. */
  289. void
  290. start_lua_worker (struct rspamd_worker *worker)
  291. {
  292. struct rspamd_lua_worker_ctx *ctx = worker->ctx, **pctx;
  293. lua_State *L;
  294. #ifdef WITH_PROFILER
  295. extern void _start (void), etext (void);
  296. monstartup ((u_long) & _start, (u_long) & etext);
  297. #endif
  298. ctx->ev_base = rspamd_prepare_worker (worker,
  299. "lua_worker",
  300. lua_accept_socket);
  301. L = worker->srv->cfg->lua_state;
  302. ctx->L = L;
  303. ctx->cfg = worker->srv->cfg;
  304. ctx->resolver = dns_resolver_init (worker->srv->logger,
  305. ctx->ev_base,
  306. worker->srv->cfg);
  307. /* Open worker's lib */
  308. luaopen_lua_worker (L);
  309. if (ctx->file == NULL) {
  310. msg_err ("No lua script defined, so no reason to exist");
  311. exit (EXIT_SUCCESS);
  312. }
  313. if (access (ctx->file, R_OK) == -1) {
  314. msg_err ("Error reading lua script %s: %s", ctx->file,
  315. strerror (errno));
  316. exit (EXIT_SUCCESS);
  317. }
  318. pctx = lua_newuserdata (L, sizeof (gpointer));
  319. rspamd_lua_setclass (L, "rspamd{worker}", -1);
  320. lua_setglobal (L, "rspamd_worker");
  321. *pctx = ctx;
  322. if (luaL_dofile (L, ctx->file) != 0) {
  323. msg_err ("Error executing lua script %s: %s", ctx->file,
  324. lua_tostring (L, -1));
  325. exit (EXIT_SUCCESS);
  326. }
  327. if (ctx->cbref_accept == 0) {
  328. msg_err ("No accept function defined, so no reason to exist");
  329. exit (EXIT_SUCCESS);
  330. }
  331. /* Maps events */
  332. rspamd_map_watch (worker->srv->cfg, ctx->ev_base);
  333. event_base_loop (ctx->ev_base, 0);
  334. rspamd_worker_block_signals ();
  335. luaL_unref (L, LUA_REGISTRYINDEX, ctx->cbref_accept);
  336. if (ctx->cbref_fin != 0) {
  337. /* Call finalizer function */
  338. lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->cbref_fin);
  339. pctx = lua_newuserdata (L, sizeof (gpointer));
  340. rspamd_lua_setclass (L, "rspamd{worker}", -1);
  341. *pctx = ctx;
  342. if (lua_pcall (L, 1, 0, 0) != 0) {
  343. msg_info ("call to worker finalizer failed: %s", lua_tostring (L,
  344. -1));
  345. }
  346. /* Free resources */
  347. luaL_unref (L, LUA_REGISTRYINDEX, ctx->cbref_fin);
  348. }
  349. rspamd_log_close (worker->srv->logger);
  350. exit (EXIT_SUCCESS);
  351. }
  352. /*
  353. * vi:ts=4
  354. */