You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "libutil/util.h"
  18. #include "libutil/map.h"
  19. #include "libutil/upstream.h"
  20. #include "libserver/protocol.h"
  21. #include "libserver/cfg_file.h"
  22. #include "libserver/url.h"
  23. #include "libserver/dns.h"
  24. #include "libmime/message.h"
  25. #include "rspamd.h"
  26. #include "libserver/worker_util.h"
  27. #include "keypairs_cache.h"
  28. #include "ottery.h"
  29. #include "unix-std.h"
  30. /* Rotate keys each minute by default */
  31. #define DEFAULT_ROTATION_TIME 60.0
  32. gpointer init_http_proxy (struct rspamd_config *cfg);
  33. void start_http_proxy (struct rspamd_worker *worker);
  34. worker_t http_proxy_worker = {
  35. "http_proxy", /* Name */
  36. init_http_proxy, /* Init function */
  37. start_http_proxy, /* Start function */
  38. TRUE, /* Has socket */
  39. FALSE, /* Non unique */
  40. FALSE, /* Non threaded */
  41. TRUE, /* Killable */
  42. SOCK_STREAM, /* TCP socket */
  43. RSPAMD_WORKER_VER
  44. };
  45. struct rspamd_http_upstream {
  46. gchar *name;
  47. struct upstream_list *u;
  48. struct rspamd_cryptobox_pubkey *key;
  49. };
  50. struct http_proxy_ctx {
  51. gdouble timeout;
  52. struct timeval io_tv;
  53. struct rspamd_config *cfg;
  54. /* DNS resolver */
  55. struct rspamd_dns_resolver *resolver;
  56. /* Events base */
  57. struct event_base *ev_base;
  58. /* Encryption key for clients */
  59. struct rspamd_cryptobox_keypair *key;
  60. /* Keys cache */
  61. struct rspamd_keypair_cache *keys_cache;
  62. /* Upstreams to use */
  63. GHashTable *upstreams;
  64. /* Default upstream */
  65. struct rspamd_http_upstream *default_upstream;
  66. /* Local rotating keypair for upstreams */
  67. struct rspamd_cryptobox_keypair *local_key;
  68. struct event rotate_ev;
  69. gdouble rotate_tm;
  70. };
  71. struct http_proxy_session {
  72. struct http_proxy_ctx *ctx;
  73. struct event_base *ev_base;
  74. struct rspamd_cryptobox_keypair *local_key;
  75. struct rspamd_cryptobox_pubkey *remote_key;
  76. struct upstream *up;
  77. gint client_sock;
  78. gint backend_sock;
  79. rspamd_inet_addr_t *client_addr;
  80. struct rspamd_http_connection *client_conn;
  81. struct rspamd_http_connection *backend_conn;
  82. struct rspamd_dns_resolver *resolver;
  83. gboolean replied;
  84. };
  85. static GQuark
  86. http_proxy_quark (void)
  87. {
  88. return g_quark_from_static_string ("http-proxy");
  89. }
  90. static gboolean
  91. http_proxy_parse_upstream (rspamd_mempool_t *pool,
  92. const ucl_object_t *obj,
  93. gpointer ud,
  94. struct rspamd_rcl_section *section,
  95. GError **err)
  96. {
  97. const ucl_object_t *elt;
  98. struct rspamd_http_upstream *up = NULL;
  99. struct http_proxy_ctx *ctx;
  100. struct rspamd_rcl_struct_parser *pd = ud;
  101. ctx = pd->user_struct;
  102. if (ucl_object_type (obj) != UCL_OBJECT) {
  103. g_set_error (err, http_proxy_quark (), 100,
  104. "upstream option must be an object");
  105. return FALSE;
  106. }
  107. elt = ucl_object_find_key (obj, "name");
  108. if (elt == NULL) {
  109. g_set_error (err, http_proxy_quark (), 100,
  110. "upstream option must have some name definition");
  111. return FALSE;
  112. }
  113. up = g_slice_alloc0 (sizeof (*up));
  114. up->name = g_strdup (ucl_object_tostring (elt));
  115. elt = ucl_object_find_key (obj, "key");
  116. if (elt != NULL) {
  117. up->key = rspamd_pubkey_from_base32 (ucl_object_tostring (elt), 0,
  118. RSPAMD_KEYPAIR_KEX, RSPAMD_CRYPTOBOX_MODE_25519);
  119. if (up->key == NULL) {
  120. g_set_error (err, http_proxy_quark (), 100,
  121. "cannot read upstream key");
  122. goto err;
  123. }
  124. }
  125. elt = ucl_object_find_key (obj, "hosts");
  126. if (elt == NULL) {
  127. g_set_error (err, http_proxy_quark (), 100,
  128. "upstream option must have some hosts definition");
  129. goto err;
  130. }
  131. up->u = rspamd_upstreams_create (ctx->cfg->ups_ctx);
  132. if (!rspamd_upstreams_from_ucl (up->u, elt, 11333, NULL)) {
  133. g_set_error (err, http_proxy_quark (), 100,
  134. "upstream has bad hosts definition");
  135. goto err;
  136. }
  137. elt = ucl_object_find_key (obj, "default");
  138. if (elt && ucl_object_toboolean (elt)) {
  139. ctx->default_upstream = up;
  140. }
  141. g_hash_table_insert (ctx->upstreams, up->name, up);
  142. return TRUE;
  143. err:
  144. if (up) {
  145. g_free (up->name);
  146. rspamd_upstreams_destroy (up->u);
  147. if (up->key) {
  148. rspamd_pubkey_unref (up->key);
  149. }
  150. g_slice_free1 (sizeof (*up), up);
  151. }
  152. return FALSE;
  153. }
  154. gpointer
  155. init_http_proxy (struct rspamd_config *cfg)
  156. {
  157. struct http_proxy_ctx *ctx;
  158. GQuark type;
  159. type = g_quark_try_string ("http_proxy");
  160. ctx = g_malloc0 (sizeof (struct http_proxy_ctx));
  161. ctx->timeout = 5.0;
  162. ctx->upstreams = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
  163. ctx->rotate_tm = DEFAULT_ROTATION_TIME;
  164. ctx->cfg = cfg;
  165. rspamd_rcl_register_worker_option (cfg,
  166. type,
  167. "timeout",
  168. rspamd_rcl_parse_struct_time,
  169. ctx,
  170. G_STRUCT_OFFSET (struct http_proxy_ctx,
  171. timeout),
  172. RSPAMD_CL_FLAG_TIME_FLOAT,
  173. "IO timeout");
  174. rspamd_rcl_register_worker_option (cfg,
  175. type,
  176. "rotate",
  177. rspamd_rcl_parse_struct_time,
  178. ctx,
  179. G_STRUCT_OFFSET (struct http_proxy_ctx,
  180. rotate_tm),
  181. RSPAMD_CL_FLAG_TIME_FLOAT,
  182. "Rotation keys time, default: "
  183. G_STRINGIFY (DEFAULT_ROTATION_TIME) " seconds");
  184. rspamd_rcl_register_worker_option (cfg,
  185. type,
  186. "keypair",
  187. rspamd_rcl_parse_struct_keypair,
  188. ctx,
  189. G_STRUCT_OFFSET (struct http_proxy_ctx,
  190. key),
  191. 0,
  192. "Server's keypair");
  193. rspamd_rcl_register_worker_option (cfg,
  194. type,
  195. "upstream",
  196. http_proxy_parse_upstream,
  197. ctx,
  198. 0,
  199. 0,
  200. "List of upstreams");
  201. return ctx;
  202. }
  203. static void
  204. proxy_session_cleanup (struct http_proxy_session *session)
  205. {
  206. rspamd_inet_address_destroy (session->client_addr);
  207. if (session->backend_conn) {
  208. rspamd_http_connection_unref (session->backend_conn);
  209. }
  210. if (session->client_conn) {
  211. rspamd_http_connection_unref (session->client_conn);
  212. }
  213. close (session->backend_sock);
  214. close (session->client_sock);
  215. g_slice_free1 (sizeof (*session), session);
  216. }
  217. static void
  218. proxy_client_write_error (struct http_proxy_session *session, gint code)
  219. {
  220. struct rspamd_http_message *reply;
  221. reply = rspamd_http_new_message (HTTP_RESPONSE);
  222. reply->code = code;
  223. rspamd_http_connection_write_message (session->client_conn,
  224. reply, NULL, NULL, session, session->client_sock,
  225. &session->ctx->io_tv, session->ev_base);
  226. }
  227. static void
  228. proxy_backend_error_handler (struct rspamd_http_connection *conn, GError *err)
  229. {
  230. struct http_proxy_session *session = conn->ud;
  231. msg_info ("abnormally closing connection from backend: %s, error: %s",
  232. rspamd_inet_address_to_string (rspamd_upstream_addr (session->up)),
  233. err->message);
  234. rspamd_http_connection_reset (session->backend_conn);
  235. /* Terminate session immediately */
  236. proxy_client_write_error (session, err->code);
  237. }
  238. static gint
  239. proxy_backend_finish_handler (struct rspamd_http_connection *conn,
  240. struct rspamd_http_message *msg)
  241. {
  242. struct http_proxy_session *session = conn->ud;
  243. rspamd_http_connection_steal_msg (session->backend_conn);
  244. rspamd_http_message_remove_header (msg, "Content-Length");
  245. rspamd_http_message_remove_header (msg, "Key");
  246. rspamd_http_connection_reset (session->backend_conn);
  247. rspamd_http_connection_write_message (session->client_conn,
  248. msg, NULL, NULL, session, session->client_sock,
  249. &session->ctx->io_tv, session->ev_base);
  250. return 0;
  251. }
  252. static void
  253. proxy_client_error_handler (struct rspamd_http_connection *conn, GError *err)
  254. {
  255. struct http_proxy_session *session = conn->ud;
  256. msg_info ("abnormally closing connection from: %s, error: %s",
  257. rspamd_inet_address_to_string (session->client_addr), err->message);
  258. /* Terminate session immediately */
  259. proxy_session_cleanup (session);
  260. }
  261. static gint
  262. proxy_client_finish_handler (struct rspamd_http_connection *conn,
  263. struct rspamd_http_message *msg)
  264. {
  265. struct http_proxy_session *session = conn->ud;
  266. struct rspamd_http_upstream *backend = NULL;
  267. const rspamd_ftok_t *host;
  268. gchar hostbuf[512];
  269. if (!session->replied) {
  270. host = rspamd_http_message_find_header (msg, "Host");
  271. if (host == NULL) {
  272. backend = session->ctx->default_upstream;
  273. }
  274. else {
  275. rspamd_strlcpy (hostbuf, host->begin, MIN(host->len + 1, sizeof (hostbuf)));
  276. backend = g_hash_table_lookup (session->ctx->upstreams, hostbuf);
  277. if (backend == NULL) {
  278. backend = session->ctx->default_upstream;
  279. }
  280. }
  281. if (backend == NULL) {
  282. /* No backend */
  283. msg_err ("cannot find upstream for %s", host ? hostbuf : "default");
  284. goto err;
  285. }
  286. else {
  287. session->up = rspamd_upstream_get (backend->u,
  288. RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
  289. if (session->up == NULL) {
  290. msg_err ("cannot select upstream for %s", host ? hostbuf : "default");
  291. goto err;
  292. }
  293. session->backend_sock = rspamd_inet_address_connect (
  294. rspamd_upstream_addr (session->up), SOCK_STREAM, TRUE);
  295. if (session->backend_sock == -1) {
  296. msg_err ("cannot connect upstream for %s", host ? hostbuf : "default");
  297. rspamd_upstream_fail (session->up);
  298. goto err;
  299. }
  300. rspamd_http_connection_steal_msg (session->client_conn);
  301. rspamd_http_message_remove_header (msg, "Content-Length");
  302. rspamd_http_message_remove_header (msg, "Key");
  303. rspamd_http_connection_reset (session->client_conn);
  304. session->backend_conn = rspamd_http_connection_new (
  305. NULL,
  306. proxy_backend_error_handler,
  307. proxy_backend_finish_handler,
  308. RSPAMD_HTTP_CLIENT_SIMPLE,
  309. RSPAMD_HTTP_CLIENT,
  310. session->ctx->keys_cache);
  311. rspamd_http_connection_set_key (session->backend_conn,
  312. session->ctx->local_key);
  313. msg->peer_key = rspamd_pubkey_ref (backend->key);
  314. session->replied = TRUE;
  315. rspamd_http_connection_write_message (session->backend_conn,
  316. msg, NULL, NULL, session, session->backend_sock,
  317. &session->ctx->io_tv, session->ev_base);
  318. }
  319. }
  320. else {
  321. proxy_session_cleanup (session);
  322. }
  323. return 0;
  324. err:
  325. session->replied = TRUE;
  326. proxy_client_write_error (session, 404);
  327. return 0;
  328. }
  329. static void
  330. proxy_accept_socket (gint fd, short what, void *arg)
  331. {
  332. struct rspamd_worker *worker = (struct rspamd_worker *) arg;
  333. struct http_proxy_ctx *ctx;
  334. rspamd_inet_addr_t *addr;
  335. struct http_proxy_session *session;
  336. gint nfd;
  337. ctx = worker->ctx;
  338. if ((nfd =
  339. rspamd_accept_from_socket (fd, &addr)) == -1) {
  340. msg_warn ("accept failed: %s", strerror (errno));
  341. return;
  342. }
  343. /* Check for EAGAIN */
  344. if (nfd == 0) {
  345. return;
  346. }
  347. msg_info ("accepted connection from %s port %d",
  348. rspamd_inet_address_to_string (addr),
  349. rspamd_inet_address_get_port (addr));
  350. session = g_slice_alloc0 (sizeof (*session));
  351. session->client_sock = nfd;
  352. session->client_addr = addr;
  353. session->resolver = ctx->resolver;
  354. session->client_conn = rspamd_http_connection_new (
  355. NULL,
  356. proxy_client_error_handler,
  357. proxy_client_finish_handler,
  358. 0,
  359. RSPAMD_HTTP_SERVER,
  360. ctx->keys_cache);
  361. session->ev_base = ctx->ev_base;
  362. session->ctx = ctx;
  363. if (ctx->key) {
  364. rspamd_http_connection_set_key (session->client_conn, ctx->key);
  365. }
  366. rspamd_http_connection_read_message (session->client_conn,
  367. session,
  368. nfd,
  369. &ctx->io_tv,
  370. ctx->ev_base);
  371. }
  372. static void
  373. proxy_rotate_key (gint fd, short what, void *arg)
  374. {
  375. struct timeval rot_tv;
  376. struct http_proxy_ctx *ctx = arg;
  377. gpointer kp;
  378. double_to_tv (ctx->rotate_tm, &rot_tv);
  379. rot_tv.tv_sec += ottery_rand_range (rot_tv.tv_sec);
  380. event_del (&ctx->rotate_ev);
  381. event_add (&ctx->rotate_ev, &rot_tv);
  382. kp = ctx->local_key;
  383. ctx->local_key = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX,
  384. RSPAMD_CRYPTOBOX_MODE_25519);
  385. rspamd_keypair_unref (kp);
  386. }
  387. void
  388. start_http_proxy (struct rspamd_worker *worker)
  389. {
  390. struct http_proxy_ctx *ctx = worker->ctx;
  391. struct timeval rot_tv;
  392. ctx->ev_base = rspamd_prepare_worker (worker, "http_proxy",
  393. proxy_accept_socket);
  394. rspamd_map_watch (worker->srv->cfg, ctx->ev_base);
  395. ctx->resolver = dns_resolver_init (worker->srv->logger,
  396. ctx->ev_base,
  397. worker->srv->cfg);
  398. double_to_tv (ctx->timeout, &ctx->io_tv);
  399. rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
  400. ctx->ev_base, ctx->resolver->r);
  401. /* XXX: stupid default */
  402. ctx->keys_cache = rspamd_keypair_cache_new (256);
  403. ctx->local_key = rspamd_keypair_new (RSPAMD_KEYPAIR_KEX,
  404. RSPAMD_CRYPTOBOX_MODE_25519);
  405. double_to_tv (ctx->rotate_tm, &rot_tv);
  406. rot_tv.tv_sec += ottery_rand_range (rot_tv.tv_sec);
  407. event_set (&ctx->rotate_ev, -1, EV_TIMEOUT, proxy_rotate_key, ctx);
  408. event_base_set (ctx->ev_base, &ctx->rotate_ev);
  409. event_add (&ctx->rotate_ev, &rot_tv);
  410. event_base_loop (ctx->ev_base, 0);
  411. rspamd_worker_block_signals ();
  412. g_mime_shutdown ();
  413. rspamd_log_close (worker->srv->logger);
  414. if (ctx->key) {
  415. rspamd_keypair_unref (ctx->key);
  416. }
  417. rspamd_keypair_cache_destroy (ctx->keys_cache);
  418. exit (EXIT_SUCCESS);
  419. }