From f28249c5855506a2e089fc79a429f59cef88b085 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 8 May 2017 17:27:18 +0100 Subject: [Feature] Add compression/decompression to proxy --- src/rspamd_proxy.c | 105 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) (limited to 'src/rspamd_proxy.c') diff --git a/src/rspamd_proxy.c b/src/rspamd_proxy.c index acbdafdb7..d485851bf 100644 --- a/src/rspamd_proxy.c +++ b/src/rspamd_proxy.c @@ -33,6 +33,7 @@ #include "ottery.h" #include "unix-std.h" #include "libserver/milter.h" +#include "contrib/zstd/zstd.h" /* Rotate keys each minute by default */ #define DEFAULT_ROTATION_TIME 60.0 @@ -902,6 +903,110 @@ proxy_session_dtor (struct rspamd_proxy_session *session) g_slice_free1 (sizeof (*session), session); } +static void +proxy_request_compress (struct rspamd_http_message *msg) +{ + guint flags; + ZSTD_CCtx *zctx; + rspamd_fstring_t *body; + const gchar *in; + gsize inlen; + + flags = rspamd_http_message_get_flags (msg); + + if (!rspamd_http_message_find_header (msg, "compression")) { + if ((flags & RSPAMD_HTTP_FLAG_SHMEM) || + !(flags & RSPAMD_HTTP_FLAG_HAS_BODY)) { + /* Cannot compress shared or empty message */ + return; + } + + in = rspamd_http_message_get_body (msg, &inlen); + + if (in == NULL || inlen == 0) { + return; + } + + body = rspamd_fstring_sized_new (ZSTD_compressBound (inlen)); + zctx = ZSTD_createCCtx (); + body->len = ZSTD_compressCCtx (zctx, body->str, body->allocated, + in, inlen, 1); + + if (ZSTD_isError (body->len)) { + msg_err ("compression error"); + rspamd_fstring_free (body); + ZSTD_freeCCtx (zctx); + + return; + } + + ZSTD_freeCCtx (zctx); + rspamd_http_message_set_body_from_fstring_steal (msg, body); + rspamd_http_message_add_header (msg, "Compression", "zstd"); + } +} + +static void +proxy_request_decompress (struct rspamd_http_message *msg) +{ + rspamd_fstring_t *body; + const gchar *in; + gsize inlen, outlen, r; + ZSTD_DStream *zstream; + ZSTD_inBuffer zin; + ZSTD_outBuffer zout; + + if (rspamd_http_message_find_header (msg, "compression")) { + in = rspamd_http_message_get_body (msg, &inlen); + + if (in == NULL || inlen == 0) { + return; + } + + zstream = ZSTD_createDStream (); + ZSTD_initDStream (zstream); + + zin.pos = 0; + zin.src = in; + zin.size = inlen; + + if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) { + outlen = ZSTD_DStreamOutSize (); + } + + body = rspamd_fstring_sized_new (outlen); + zout.dst = body->str; + zout.pos = 0; + zout.size = outlen; + + while (zin.pos < zin.size) { + r = ZSTD_decompressStream (zstream, &zout, &zin); + + if (ZSTD_isError (r)) { + msg_err ("Decompression error: %s", ZSTD_getErrorName (r)); + ZSTD_freeDStream (zstream); + rspamd_fstring_free (body); + + return; + } + + if (zout.pos == zout.size) { + /* We need to extend output buffer */ + zout.size = zout.size * 1.5 + 1.0; + body = rspamd_fstring_grow (body, zout.size); + zout.size = body->allocated; + zout.dst = body->str; + } + } + + ZSTD_freeDStream (zstream); + rspamd_http_message_set_body_from_fstring_steal (msg, body); + rspamd_http_message_remove_header (msg, "Compression"); + } + + return; +} + static struct rspamd_proxy_session * proxy_session_refresh (struct rspamd_proxy_session *session) { -- cgit v1.2.3