Browse Source

[Feature] Support compressed maps

tags/1.4.0
Vsevolod Stakhov 7 years ago
parent
commit
767b1029a3
2 changed files with 181 additions and 8 deletions
  1. 180
    8
      src/libutil/map.c
  2. 1
    0
      src/libutil/map_private.h

+ 180
- 8
src/libutil/map.c View File

@@ -26,6 +26,7 @@
#include "unix-std.h"
#include "http_parser.h"
#include "libutil/regexp.h"
#include "contrib/zstd/zstd.h"

#ifdef WITH_HYPERSCAN
#include "hs.h"
@@ -480,9 +481,61 @@ read_data:
goto err;
}

msg_info_map ("read map data from %s (%z bytes)", cbd->data->host,
dlen);
map->read_callback (in, cbd->data_len, &cbd->periodic->cbdata, TRUE);
if (cbd->bk->is_compressed) {
ZSTD_DStream *zstream;
ZSTD_inBuffer zin;
ZSTD_outBuffer zout;
guchar *out;
gsize outlen, r;

zstream = ZSTD_createDStream ();
ZSTD_initDStream (zstream);

zin.pos = 0;
zin.src = in;
zin.size = dlen;

if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) {
outlen = ZSTD_DStreamOutSize ();
}

out = g_malloc (outlen);

zout.dst = out;
zout.pos = 0;
zout.size = outlen;

while (zin.pos < zin.size) {
r = ZSTD_decompressStream (zstream, &zout, &zin);

if (ZSTD_isError (r)) {
msg_err_map ("cannot decompress data: %s",
ZSTD_getErrorName (r));
ZSTD_freeDStream (zstream);
g_free (out);
goto err;
}

if (zout.pos == zout.size) {
/* We need to extend output buffer */
zout.size = zout.size * 1.5 + 1.0;
out = g_realloc (zout.dst, zout.size);
zout.dst = out;
}
}

ZSTD_freeDStream (zstream);
msg_info_map ("read map data from %s (%z bytes compressed, "
"%z uncompressed)", cbd->data->host,
dlen, zout.pos);
map->read_callback (out, zout.pos, &cbd->periodic->cbdata, TRUE);
g_free (out);
}
else {
msg_info_map ("read map data from %s (%z bytes)", cbd->data->host,
dlen);
map->read_callback (in, cbd->data_len, &cbd->periodic->cbdata, TRUE);
}

/*
* We know that a map is in the locked state
@@ -578,7 +631,62 @@ read_map_file (struct rspamd_map *map, struct file_map_data *data,
}

if (len > 0) {
map->read_callback (bytes, len, &periodic->cbdata, TRUE);
if (bk->is_compressed) {
ZSTD_DStream *zstream;
ZSTD_inBuffer zin;
ZSTD_outBuffer zout;
guchar *out;
gsize outlen, r;

zstream = ZSTD_createDStream ();
ZSTD_initDStream (zstream);

zin.pos = 0;
zin.src = bytes;
zin.size = len;

if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) {
outlen = ZSTD_DStreamOutSize ();
}

out = g_malloc (outlen);

zout.dst = out;
zout.pos = 0;
zout.size = outlen;

while (zin.pos < zin.size) {
r = ZSTD_decompressStream (zstream, &zout, &zin);

if (ZSTD_isError (r)) {
msg_err_map ("cannot decompress data: %s",
ZSTD_getErrorName (r));
ZSTD_freeDStream (zstream);
g_free (out);
munmap (bytes, len);
return FALSE;
}

if (zout.pos == zout.size) {
/* We need to extend output buffer */
zout.size = zout.size * 1.5 + 1.0;
out = g_realloc (zout.dst, zout.size);
zout.dst = out;
}
}

ZSTD_freeDStream (zstream);
msg_info_map ("read map data from %s (%z bytes compressed, "
"%z uncompressed)", data->filename,
len, zout.pos);
map->read_callback (out, zout.pos, &periodic->cbdata, TRUE);
g_free (out);
}
else {
msg_info_map ("read map data from %s (%z bytes)", data->filename,
len);
map->read_callback (bytes, len, &periodic->cbdata, TRUE);
}
}

munmap (bytes, len);
@@ -716,7 +824,7 @@ rspamd_map_dns_callback (struct rdns_reply *reply, void *arg)
}

static gboolean
rspamd_map_read_cached (struct rspamd_map *map,
rspamd_map_read_cached (struct rspamd_map *map, struct rspamd_map_backend *bk,
struct map_periodic_cbdata *periodic, const gchar *host)
{
gsize len;
@@ -738,8 +846,63 @@ rspamd_map_read_cached (struct rspamd_map *map,
return FALSE;
}

map->read_callback (in, map->cache->len, &periodic->cbdata, TRUE);
msg_info_map ("read map data from %s (cached) (%z bytes)", host, len);
if (bk->is_compressed) {
ZSTD_DStream *zstream;
ZSTD_inBuffer zin;
ZSTD_outBuffer zout;
guchar *out;
gsize outlen, r;

zstream = ZSTD_createDStream ();
ZSTD_initDStream (zstream);

zin.pos = 0;
zin.src = in;
zin.size = len;

if ((outlen = ZSTD_getDecompressedSize (zin.src, zin.size)) == 0) {
outlen = ZSTD_DStreamOutSize ();
}

out = g_malloc (outlen);

zout.dst = out;
zout.pos = 0;
zout.size = outlen;

while (zin.pos < zin.size) {
r = ZSTD_decompressStream (zstream, &zout, &zin);

if (ZSTD_isError (r)) {
msg_err_map ("cannot decompress data: %s",
ZSTD_getErrorName (r));
ZSTD_freeDStream (zstream);
g_free (out);
munmap (in, len);
return FALSE;
}

if (zout.pos == zout.size) {
/* We need to extend output buffer */
zout.size = zout.size * 1.5 + 1.0;
out = g_realloc (zout.dst, zout.size);
zout.dst = out;
}
}

ZSTD_freeDStream (zstream);
msg_info_map ("read map data from %s (cached) (%z bytes compressed, "
"%z uncompressed)", host,
len, zout.pos);
map->read_callback (out, zout.pos, &periodic->cbdata, TRUE);
g_free (out);
}
else {
msg_info_map ("read map data from %s (cached) (%z bytes)", host,
len);
map->read_callback (in, len, &periodic->cbdata, TRUE);
}

munmap (in, len);

return TRUE;
@@ -775,7 +938,7 @@ rspamd_map_common_http_callback (struct rspamd_map *map, struct rspamd_map_backe

return;
}
else if (rspamd_map_read_cached (map, periodic, data->host)) {
else if (rspamd_map_read_cached (map, bk, periodic, data->host)) {
/* Switch to the next backend */
periodic->cur_backend ++;
data->last_checked = map->cache->last_checked;
@@ -1198,6 +1361,7 @@ rspamd_map_parse_backend (struct rspamd_config *cfg, const gchar *map_line)
struct file_map_data *fdata = NULL;
struct http_map_data *hdata = NULL;
struct http_parser_url up;
const gchar *end, *p;
rspamd_ftok_t tok;

bk = g_slice_alloc0 (sizeof (*bk));
@@ -1207,6 +1371,14 @@ rspamd_map_parse_backend (struct rspamd_config *cfg, const gchar *map_line)
goto err;
}

end = map_line + strlen (map_line);
if (end - map_line > 5) {
p = end - 5;
if (g_ascii_strcasecmp (p, ".zstd") == 0) {
bk->is_compressed = TRUE;
}
}

/* Now check for each proto separately */
if (bk->protocol == MAP_PROTO_FILE) {
fdata = g_slice_alloc0 (sizeof (struct file_map_data));

+ 1
- 0
src/libutil/map_private.h View File

@@ -50,6 +50,7 @@ enum fetch_proto {
struct rspamd_map_backend {
enum fetch_proto protocol;
gboolean is_signed;
gboolean is_compressed;
struct rspamd_cryptobox_pubkey *trusted_pubkey;
union {
struct file_map_data *fd;

Loading…
Cancel
Save