]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Support compressed maps
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 31 Oct 2016 10:41:08 +0000 (10:41 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 31 Oct 2016 10:41:08 +0000 (10:41 +0000)
src/libutil/map.c
src/libutil/map_private.h

index 5ef04b9f7d540d3ea3dbb50a94d563a8dcb8cdf1..52fc6a0115a5aac59532345955bd98cc70853df8 100644 (file)
@@ -26,6 +26,7 @@
 #include "unix-std.h"
 #include "http_parser.h"
 #include "libutil/regexp.h"
+#include "contrib/zstd/zstd.h"
 
 #ifdef WITH_HYPERSCAN
 #include "hs.h"
@@ -480,9 +481,61 @@ read_data:
                        goto err;
                }
 
-               msg_info_map ("read map data from %s (%z bytes)", cbd->data->host,
-                               dlen);
-               map->read_callback (in, cbd->data_len, &cbd->periodic->cbdata, TRUE);
+               if (cbd->bk->is_compressed) {
+                       ZSTD_DStream *zstream;
+                       ZSTD_inBuffer zin;
+                       ZSTD_outBuffer zout;
+                       guchar *out;
+                       gsize outlen, r;
+
+                       zstream = ZSTD_createDStream ();
+                       ZSTD_initDStream (zstream);
+
+                       zin.pos = 0;
+                       zin.src = in;
+                       zin.size = dlen;
+
+                       if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) {
+                               outlen = ZSTD_DStreamOutSize ();
+                       }
+
+                       out = g_malloc (outlen);
+
+                       zout.dst = out;
+                       zout.pos = 0;
+                       zout.size = outlen;
+
+                       while (zin.pos < zin.size) {
+                               r = ZSTD_decompressStream (zstream, &zout, &zin);
+
+                               if (ZSTD_isError (r)) {
+                                       msg_err_map ("cannot decompress data: %s",
+                                                       ZSTD_getErrorName (r));
+                                       ZSTD_freeDStream (zstream);
+                                       g_free (out);
+                                       goto err;
+                               }
+
+                               if (zout.pos == zout.size) {
+                                       /* We need to extend output buffer */
+                                       zout.size = zout.size * 1.5 + 1.0;
+                                       out = g_realloc (zout.dst, zout.size);
+                                       zout.dst = out;
+                               }
+                       }
+
+                       ZSTD_freeDStream (zstream);
+                       msg_info_map ("read map data from %s (%z bytes compressed, "
+                                       "%z uncompressed)", cbd->data->host,
+                                                               dlen, zout.pos);
+                       map->read_callback (out, zout.pos, &cbd->periodic->cbdata, TRUE);
+                       g_free (out);
+               }
+               else {
+                       msg_info_map ("read map data from %s (%z bytes)", cbd->data->host,
+                                       dlen);
+                       map->read_callback (in, cbd->data_len, &cbd->periodic->cbdata, TRUE);
+               }
 
                /*
                 * We know that a map is in the locked state
@@ -578,7 +631,62 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data,
        }
 
        if (len > 0) {
-               map->read_callback (bytes, len, &periodic->cbdata, TRUE);
+               if (bk->is_compressed) {
+                       ZSTD_DStream *zstream;
+                       ZSTD_inBuffer zin;
+                       ZSTD_outBuffer zout;
+                       guchar *out;
+                       gsize outlen, r;
+
+                       zstream = ZSTD_createDStream ();
+                       ZSTD_initDStream (zstream);
+
+                       zin.pos = 0;
+                       zin.src = bytes;
+                       zin.size = len;
+
+                       if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) {
+                               outlen = ZSTD_DStreamOutSize ();
+                       }
+
+                       out = g_malloc (outlen);
+
+                       zout.dst = out;
+                       zout.pos = 0;
+                       zout.size = outlen;
+
+                       while (zin.pos < zin.size) {
+                               r = ZSTD_decompressStream (zstream, &zout, &zin);
+
+                               if (ZSTD_isError (r)) {
+                                       msg_err_map ("cannot decompress data: %s",
+                                                       ZSTD_getErrorName (r));
+                                       ZSTD_freeDStream (zstream);
+                                       g_free (out);
+                                       munmap (bytes, len);
+                                       return FALSE;
+                               }
+
+                               if (zout.pos == zout.size) {
+                                       /* We need to extend output buffer */
+                                       zout.size = zout.size * 1.5 + 1.0;
+                                       out = g_realloc (zout.dst, zout.size);
+                                       zout.dst = out;
+                               }
+                       }
+
+                       ZSTD_freeDStream (zstream);
+                       msg_info_map ("read map data from %s (%z bytes compressed, "
+                                       "%z uncompressed)", data->filename,
+                                       len, zout.pos);
+                       map->read_callback (out, zout.pos, &periodic->cbdata, TRUE);
+                       g_free (out);
+               }
+               else {
+                       msg_info_map ("read map data from %s (%z bytes)", data->filename,
+                                       len);
+                       map->read_callback (bytes, len, &periodic->cbdata, TRUE);
+               }
        }
 
        munmap (bytes, len);
@@ -716,7 +824,7 @@ rspamd_map_dns_callback (struct rdns_reply *reply, void *arg)
 }
 
 static gboolean
-rspamd_map_read_cached (struct rspamd_map *map,
+rspamd_map_read_cached (struct rspamd_map *map, struct rspamd_map_backend *bk,
                struct map_periodic_cbdata *periodic, const gchar *host)
 {
        gsize len;
@@ -738,8 +846,63 @@ rspamd_map_read_cached (struct rspamd_map *map,
                return FALSE;
        }
 
-       map->read_callback (in, map->cache->len, &periodic->cbdata, TRUE);
-       msg_info_map ("read map data from %s (cached) (%z bytes)", host, len);
+       if (bk->is_compressed) {
+               ZSTD_DStream *zstream;
+               ZSTD_inBuffer zin;
+               ZSTD_outBuffer zout;
+               guchar *out;
+               gsize outlen, r;
+
+               zstream = ZSTD_createDStream ();
+               ZSTD_initDStream (zstream);
+
+               zin.pos = 0;
+               zin.src = in;
+               zin.size = len;
+
+               if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) {
+                       outlen = ZSTD_DStreamOutSize ();
+               }
+
+               out = g_malloc (outlen);
+
+               zout.dst = out;
+               zout.pos = 0;
+               zout.size = outlen;
+
+               while (zin.pos < zin.size) {
+                       r = ZSTD_decompressStream (zstream, &zout, &zin);
+
+                       if (ZSTD_isError (r)) {
+                               msg_err_map ("cannot decompress data: %s",
+                                               ZSTD_getErrorName (r));
+                               ZSTD_freeDStream (zstream);
+                               g_free (out);
+                               munmap (in, len);
+                               return FALSE;
+                       }
+
+                       if (zout.pos == zout.size) {
+                               /* We need to extend output buffer */
+                               zout.size = zout.size * 1.5 + 1.0;
+                               out = g_realloc (zout.dst, zout.size);
+                               zout.dst = out;
+                       }
+               }
+
+               ZSTD_freeDStream (zstream);
+               msg_info_map ("read map data from %s (cached) (%z bytes compressed, "
+                               "%z uncompressed)", host,
+                               len, zout.pos);
+               map->read_callback (out, zout.pos, &periodic->cbdata, TRUE);
+               g_free (out);
+       }
+       else {
+               msg_info_map ("read map data from %s (cached) (%z bytes)", host,
+                               len);
+               map->read_callback (in, len, &periodic->cbdata, TRUE);
+       }
+
        munmap (in, len);
 
        return TRUE;
@@ -775,7 +938,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map, struct rspamd_map_backe
 
                        return;
                }
-               else if (rspamd_map_read_cached (map, periodic, data->host)) {
+               else if (rspamd_map_read_cached (map, bk, periodic, data->host)) {
                        /* Switch to the next backend */
                        periodic->cur_backend ++;
                        data->last_checked = map->cache->last_checked;
@@ -1198,6 +1361,7 @@ rspamd_map_parse_backend (struct rspamd_config *cfg, const gchar *map_line)
        struct file_map_data *fdata = NULL;
        struct http_map_data *hdata = NULL;
        struct http_parser_url up;
+       const gchar *end, *p;
        rspamd_ftok_t tok;
 
        bk = g_slice_alloc0 (sizeof (*bk));
@@ -1207,6 +1371,14 @@ rspamd_map_parse_backend (struct rspamd_config *cfg, const gchar *map_line)
                goto err;
        }
 
+       end = map_line + strlen (map_line);
+       if (end - map_line > 5) {
+               p = end - 5;
+               if (g_ascii_strcasecmp (p, ".zstd") == 0) {
+                       bk->is_compressed = TRUE;
+               }
+       }
+
        /* Now check for each proto separately */
        if (bk->protocol == MAP_PROTO_FILE) {
                fdata = g_slice_alloc0 (sizeof (struct file_map_data));
index 194aef2ef933cec2ae3146bef4588a9e478c3c20..379dc8b50bbc9d4568bdb72b4556b5c3899e6c4d 100644 (file)
@@ -50,6 +50,7 @@ enum fetch_proto {
 struct rspamd_map_backend {
        enum fetch_proto protocol;
        gboolean is_signed;
+       gboolean is_compressed;
        struct rspamd_cryptobox_pubkey *trusted_pubkey;
        union {
                struct file_map_data *fd;