You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

log_helper.c 6.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "libutil/util.h"
  18. #include "libserver/cfg_file.h"
  19. #include "libserver/cfg_rcl.h"
  20. #include "libserver/worker_util.h"
  21. #include "libserver/rspamd_control.h"
  22. #include "libutil/addr.h"
  23. #include "lua/lua_common.h"
  24. #include "unix-std.h"
  25. #include "utlist.h"
  26. #ifdef HAVE_GLOB_H
  27. #include <glob.h>
  28. #endif
  29. static gpointer init_log_helper (struct rspamd_config *cfg);
  30. static void start_log_helper (struct rspamd_worker *worker);
  31. worker_t log_helper_worker = {
  32. "log_helper", /* Name */
  33. init_log_helper, /* Init function */
  34. start_log_helper, /* Start function */
  35. RSPAMD_WORKER_UNIQUE | RSPAMD_WORKER_KILLABLE,
  36. RSPAMD_WORKER_SOCKET_NONE, /* No socket */
  37. RSPAMD_WORKER_VER /* Version info */
  38. };
  39. static const guint64 rspamd_log_helper_magic = 0x1090bb46aaa74c9aULL;
  40. /*
  41. * Worker's context
  42. */
  43. struct log_helper_ctx {
  44. guint64 magic;
  45. /* Events base */
  46. struct event_base *ev_base;
  47. /* DNS resolver */
  48. struct rspamd_dns_resolver *resolver;
  49. /* Config */
  50. struct rspamd_config *cfg;
  51. /* END OF COMMON PART */
  52. struct event log_ev;
  53. struct rspamd_worker_lua_script *scripts;
  54. lua_State *L;
  55. gint pair[2];
  56. };
  57. static gpointer
  58. init_log_helper (struct rspamd_config *cfg)
  59. {
  60. struct log_helper_ctx *ctx;
  61. GQuark type;
  62. type = g_quark_try_string ("log_helper");
  63. (void)type;
  64. ctx = rspamd_mempool_alloc (cfg->cfg_pool, sizeof (*ctx));
  65. ctx->magic = rspamd_log_helper_magic;
  66. ctx->cfg = cfg;
  67. return ctx;
  68. }
  69. static void
  70. rspamd_log_helper_read (gint fd, short what, gpointer ud)
  71. {
  72. struct log_helper_ctx *ctx = ud;
  73. guchar buf[8192];
  74. gssize r;
  75. guint32 n, i, nextra;
  76. struct rspamd_protocol_log_message_sum *sm;
  77. struct rspamd_worker_lua_script *sc;
  78. struct rspamd_config **pcfg;
  79. struct event_base **pevbase;
  80. r = read (fd, buf, sizeof (buf));
  81. if (r >= (gssize)sizeof (struct rspamd_protocol_log_message_sum)) {
  82. memcpy (&n, buf, sizeof (n));
  83. memcpy (&nextra, buf + sizeof (n), sizeof (nextra));
  84. if (n + nextra !=
  85. (r - sizeof (*sm)) / sizeof (struct rspamd_protocol_log_symbol_result)) {
  86. msg_warn ("cannot read data from log pipe: bad length: %d elements "
  87. "announced but %d available", n + nextra,
  88. (gint)((r - sizeof (*sm)) /
  89. sizeof (struct rspamd_protocol_log_symbol_result)));
  90. }
  91. else {
  92. sm = g_malloc (r);
  93. memcpy (sm, buf, r);
  94. DL_FOREACH (ctx->scripts, sc) {
  95. lua_rawgeti (ctx->L, LUA_REGISTRYINDEX, sc->cbref);
  96. lua_pushnumber (ctx->L, sm->score);
  97. lua_pushnumber (ctx->L, sm->required_score);
  98. lua_createtable (ctx->L, n, 0);
  99. for (i = 0; i < n; i ++) {
  100. lua_createtable (ctx->L, 2, 0);
  101. lua_pushinteger (ctx->L, sm->results[i].id);
  102. lua_rawseti (ctx->L, -2, 1);
  103. lua_pushnumber (ctx->L, sm->results[i].score);
  104. lua_rawseti (ctx->L, -2, 2);
  105. lua_rawseti (ctx->L, -2, (i + 1));
  106. }
  107. pcfg = lua_newuserdata (ctx->L, sizeof (*pcfg));
  108. *pcfg = ctx->cfg;
  109. rspamd_lua_setclass (ctx->L, "rspamd{config}", -1);
  110. lua_pushinteger (ctx->L, sm->settings_id);
  111. lua_createtable (ctx->L, nextra, 0);
  112. for (i = 0; i < nextra; i ++) {
  113. lua_createtable (ctx->L, 2, 0);
  114. lua_pushinteger (ctx->L, sm->results[i + n].id);
  115. lua_rawseti (ctx->L, -2, 1);
  116. lua_pushnumber (ctx->L, sm->results[i + n].score);
  117. lua_rawseti (ctx->L, -2, 2);
  118. lua_rawseti (ctx->L, -2, (i + 1));
  119. }
  120. pevbase = lua_newuserdata (ctx->L, sizeof (*pevbase));
  121. *pevbase = ctx->ev_base;
  122. rspamd_lua_setclass (ctx->L, "rspamd{ev_base}", -1);
  123. if (lua_pcall (ctx->L, 7, 0, 0) != 0) {
  124. msg_err ("error executing log handler code: %s",
  125. lua_tostring (ctx->L, -1));
  126. lua_pop (ctx->L, 1);
  127. }
  128. }
  129. g_free (sm);
  130. }
  131. }
  132. else if (r == -1) {
  133. if (errno != EAGAIN && errno != EINTR) {
  134. msg_warn ("cannot read data from log pipe: %s", strerror (errno));
  135. event_del (&ctx->log_ev);
  136. }
  137. }
  138. else if (r == 0) {
  139. msg_warn ("cannot read data from log pipe: EOF");
  140. event_del (&ctx->log_ev);
  141. }
  142. }
  143. static void
  144. rspamd_log_helper_reply_handler (struct rspamd_worker *worker,
  145. struct rspamd_srv_reply *rep, gint rep_fd,
  146. gpointer ud)
  147. {
  148. struct log_helper_ctx *ctx = ud;
  149. close (ctx->pair[1]);
  150. msg_info ("start waiting for log events");
  151. event_set (&ctx->log_ev, ctx->pair[0], EV_READ | EV_PERSIST,
  152. rspamd_log_helper_read, ctx);
  153. event_base_set (ctx->ev_base, &ctx->log_ev);
  154. event_add (&ctx->log_ev, NULL);
  155. }
  156. static void
  157. start_log_helper (struct rspamd_worker *worker)
  158. {
  159. struct log_helper_ctx *ctx = worker->ctx;
  160. gssize r = -1;
  161. gint nscripts = 0;
  162. struct rspamd_worker_lua_script *tmp;
  163. static struct rspamd_srv_command srv_cmd;
  164. ctx->ev_base = rspamd_prepare_worker (worker,
  165. "log_helper",
  166. NULL);
  167. ctx->cfg = worker->srv->cfg;
  168. ctx->scripts = worker->cf->scripts;
  169. ctx->L = ctx->cfg->lua_state;
  170. ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
  171. ctx->ev_base,
  172. worker->srv->cfg);
  173. rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
  174. ctx->ev_base, ctx->resolver->r);
  175. DL_COUNT (worker->cf->scripts, tmp, nscripts);
  176. msg_info ("started log_helper worker with %d scripts", nscripts);
  177. r = rspamd_socketpair (ctx->pair, FALSE);
  178. if (r == -1) {
  179. msg_err ("cannot create socketpair: %s, exiting now", strerror (errno));
  180. /* Prevent new processes spawning */
  181. exit (EXIT_SUCCESS);
  182. }
  183. memset (&srv_cmd, 0, sizeof (srv_cmd));
  184. srv_cmd.type = RSPAMD_SRV_LOG_PIPE;
  185. srv_cmd.cmd.log_pipe.type = RSPAMD_LOG_PIPE_SYMBOLS;
  186. /* Wait for startup being completed */
  187. rspamd_mempool_lock_mutex (worker->srv->start_mtx);
  188. rspamd_srv_send_command (worker, ctx->ev_base, &srv_cmd, ctx->pair[1],
  189. rspamd_log_helper_reply_handler, ctx);
  190. rspamd_mempool_unlock_mutex (worker->srv->start_mtx);
  191. rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base,
  192. worker);
  193. event_base_loop (ctx->ev_base, 0);
  194. close (ctx->pair[0]);
  195. rspamd_worker_block_signals ();
  196. REF_RELEASE (ctx->cfg);
  197. rspamd_log_close (worker->srv->logger, TRUE);
  198. exit (EXIT_SUCCESS);
  199. }