From 767b1029a3617fa88122879d3b7402210485da05 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 31 Oct 2016 10:41:08 +0000 Subject: [PATCH] [Feature] Support compressed maps --- src/libutil/map.c | 188 ++++++++++++++++++++++++++++++++++++-- src/libutil/map_private.h | 1 + 2 files changed, 181 insertions(+), 8 deletions(-) diff --git a/src/libutil/map.c b/src/libutil/map.c index 5ef04b9f7..52fc6a011 100644 --- a/src/libutil/map.c +++ b/src/libutil/map.c @@ -26,6 +26,7 @@ #include "unix-std.h" #include "http_parser.h" #include "libutil/regexp.h" +#include "contrib/zstd/zstd.h" #ifdef WITH_HYPERSCAN #include "hs.h" @@ -480,9 +481,61 @@ read_data: goto err; } - msg_info_map ("read map data from %s (%z bytes)", cbd->data->host, - dlen); - map->read_callback (in, cbd->data_len, &cbd->periodic->cbdata, TRUE); + if (cbd->bk->is_compressed) { + ZSTD_DStream *zstream; + ZSTD_inBuffer zin; + ZSTD_outBuffer zout; + guchar *out; + gsize outlen, r; + + zstream = ZSTD_createDStream (); + ZSTD_initDStream (zstream); + + zin.pos = 0; + zin.src = in; + zin.size = dlen; + + if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) { + outlen = ZSTD_DStreamOutSize (); + } + + out = g_malloc (outlen); + + zout.dst = out; + zout.pos = 0; + zout.size = outlen; + + while (zin.pos < zin.size) { + r = ZSTD_decompressStream (zstream, &zout, &zin); + + if (ZSTD_isError (r)) { + msg_err_map ("cannot decompress data: %s", + ZSTD_getErrorName (r)); + ZSTD_freeDStream (zstream); + g_free (out); + goto err; + } + + if (zout.pos == zout.size) { + /* We need to extend output buffer */ + zout.size = zout.size * 1.5 + 1.0; + out = g_realloc (zout.dst, zout.size); + zout.dst = out; + } + } + + ZSTD_freeDStream (zstream); + msg_info_map ("read map data from %s (%z bytes compressed, " + "%z uncompressed)", cbd->data->host, + dlen, zout.pos); + map->read_callback (out, zout.pos, &cbd->periodic->cbdata, TRUE); + g_free (out); + } + else { + msg_info_map ("read map data from %s (%z bytes)", cbd->data->host, + dlen); + map->read_callback (in, cbd->data_len, &cbd->periodic->cbdata, TRUE); + } /* * We know that a map is in the locked state @@ -578,7 +631,62 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data, } if (len > 0) { - map->read_callback (bytes, len, &periodic->cbdata, TRUE); + if (bk->is_compressed) { + ZSTD_DStream *zstream; + ZSTD_inBuffer zin; + ZSTD_outBuffer zout; + guchar *out; + gsize outlen, r; + + zstream = ZSTD_createDStream (); + ZSTD_initDStream (zstream); + + zin.pos = 0; + zin.src = bytes; + zin.size = len; + + if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) { + outlen = ZSTD_DStreamOutSize (); + } + + out = g_malloc (outlen); + + zout.dst = out; + zout.pos = 0; + zout.size = outlen; + + while (zin.pos < zin.size) { + r = ZSTD_decompressStream (zstream, &zout, &zin); + + if (ZSTD_isError (r)) { + msg_err_map ("cannot decompress data: %s", + ZSTD_getErrorName (r)); + ZSTD_freeDStream (zstream); + g_free (out); + munmap (bytes, len); + return FALSE; + } + + if (zout.pos == zout.size) { + /* We need to extend output buffer */ + zout.size = zout.size * 1.5 + 1.0; + out = g_realloc (zout.dst, zout.size); + zout.dst = out; + } + } + + ZSTD_freeDStream (zstream); + msg_info_map ("read map data from %s (%z bytes compressed, " + "%z uncompressed)", data->filename, + len, zout.pos); + map->read_callback (out, zout.pos, &periodic->cbdata, TRUE); + g_free (out); + } + else { + msg_info_map ("read map data from %s (%z bytes)", data->filename, + len); + map->read_callback (bytes, len, &periodic->cbdata, TRUE); + } } munmap (bytes, len); @@ -716,7 +824,7 @@ rspamd_map_dns_callback (struct rdns_reply *reply, void *arg) } static gboolean -rspamd_map_read_cached (struct rspamd_map *map, +rspamd_map_read_cached (struct rspamd_map *map, struct rspamd_map_backend *bk, struct map_periodic_cbdata *periodic, const gchar *host) { gsize len; @@ -738,8 +846,63 @@ rspamd_map_read_cached (struct rspamd_map *map, return FALSE; } - map->read_callback (in, map->cache->len, &periodic->cbdata, TRUE); - msg_info_map ("read map data from %s (cached) (%z bytes)", host, len); + if (bk->is_compressed) { + ZSTD_DStream *zstream; + ZSTD_inBuffer zin; + ZSTD_outBuffer zout; + guchar *out; + gsize outlen, r; + + zstream = ZSTD_createDStream (); + ZSTD_initDStream (zstream); + + zin.pos = 0; + zin.src = in; + zin.size = len; + + if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) { + outlen = ZSTD_DStreamOutSize (); + } + + out = g_malloc (outlen); + + zout.dst = out; + zout.pos = 0; + zout.size = outlen; + + while (zin.pos < zin.size) { + r = ZSTD_decompressStream (zstream, &zout, &zin); + + if (ZSTD_isError (r)) { + msg_err_map ("cannot decompress data: %s", + ZSTD_getErrorName (r)); + ZSTD_freeDStream (zstream); + g_free (out); + munmap (in, len); + return FALSE; + } + + if (zout.pos == zout.size) { + /* We need to extend output buffer */ + zout.size = zout.size * 1.5 + 1.0; + out = g_realloc (zout.dst, zout.size); + zout.dst = out; + } + } + + ZSTD_freeDStream (zstream); + msg_info_map ("read map data from %s (cached) (%z bytes compressed, " + "%z uncompressed)", host, + len, zout.pos); + map->read_callback (out, zout.pos, &periodic->cbdata, TRUE); + g_free (out); + } + else { + msg_info_map ("read map data from %s (cached) (%z bytes)", host, + len); + map->read_callback (in, len, &periodic->cbdata, TRUE); + } + munmap (in, len); return TRUE; @@ -775,7 +938,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map, struct rspamd_map_backe return; } - else if (rspamd_map_read_cached (map, periodic, data->host)) { + else if (rspamd_map_read_cached (map, bk, periodic, data->host)) { /* Switch to the next backend */ periodic->cur_backend ++; data->last_checked = map->cache->last_checked; @@ -1198,6 +1361,7 @@ rspamd_map_parse_backend (struct rspamd_config *cfg, const gchar *map_line) struct file_map_data *fdata = NULL; struct http_map_data *hdata = NULL; struct http_parser_url up; + const gchar *end, *p; rspamd_ftok_t tok; bk = g_slice_alloc0 (sizeof (*bk)); @@ -1207,6 +1371,14 @@ rspamd_map_parse_backend (struct rspamd_config *cfg, const gchar *map_line) goto err; } + end = map_line + strlen (map_line); + if (end - map_line > 5) { + p = end - 5; + if (g_ascii_strcasecmp (p, ".zstd") == 0) { + bk->is_compressed = TRUE; + } + } + /* Now check for each proto separately */ if (bk->protocol == MAP_PROTO_FILE) { fdata = g_slice_alloc0 (sizeof (struct file_map_data)); diff --git a/src/libutil/map_private.h b/src/libutil/map_private.h index 194aef2ef..379dc8b50 100644 --- a/src/libutil/map_private.h +++ b/src/libutil/map_private.h @@ -50,6 +50,7 @@ enum fetch_proto { struct rspamd_map_backend { enum fetch_proto protocol; gboolean is_signed; + gboolean is_compressed; struct rspamd_cryptobox_pubkey *trusted_pubkey; union { struct file_map_data *fd; -- 2.39.5