From: Vsevolod Stakhov Date: Wed, 8 Dec 2021 14:57:57 +0000 (+0000) Subject: [Feature] Aws_s3: Allow to store large parts separately X-Git-Tag: 3.2~177 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=75f1f90326f395b3726257e5b14c9f4c1bb6a41b;p=rspamd.git [Feature] Aws_s3: Allow to store large parts separately --- diff --git a/src/plugins/lua/aws_s3.lua b/src/plugins/lua/aws_s3.lua index 1a7873086..ec45dca7c 100644 --- a/src/plugins/lua/aws_s3.lua +++ b/src/plugins/lua/aws_s3.lua @@ -32,6 +32,7 @@ local settings = { s3_timeout = 10, save_raw = true, save_structure = false, + inline_content_limit = nil, } local settings_schema = ts.shape{ @@ -46,6 +47,7 @@ local settings_schema = ts.shape{ zstd_compress = ts.boolean:is_optional(), save_raw = ts.boolean:is_optional(), save_structure = ts.boolean:is_optional(), + inline_content_limit = (ts.integer + ts.string / tonumber):is_optional(), } local function raw_data(task, nonce, queue_id) @@ -66,24 +68,61 @@ local function raw_data(task, nonce, queue_id) return path, content, content_type end +local function gen_ext() + local ext = 'msgpack' + if settings.zstd_compress then + ext = 'msgpack.zst' + end + + return ext +end + +local function convert_to_ref(task, nonce, queue_id, part, external_refs) + local path = string.format('/%s-%s-%s.%s', queue_id, nonce, + rspamd_text.randombytes(8):base32(), gen_ext()) + local content = part.content + + if settings.zstd_compress then + external_refs[path] = rspamd_util.zstd_compress(content) + else + external_refs[path] = content + end + + part.content = nil + part.content_path = path + + return path +end + local function structured_data(task, nonce, queue_id) - local ext, content, content_type + local content, content_type + local external_refs = {} local lua_mime = require "lua_mime" local ucl = require "ucl" + local message_split = lua_mime.message_to_ucl(task) + + if settings.inline_content_limit and settings.inline_content_limit > 0 then + for i,part in ipairs(message_split.parts() or {}) do + if part.content and #part.content >= settings.inline_content_limit then + local ref = convert_to_ref(task, nonce, queue_id, part, external_refs) + lua_util.debugm(N, task, "convert part number %s to a reference %s", + i, ref) + end + end + end + if settings.zstd_compress then - ext = 'msgpack.zst' content = rspamd_util.zstd_compress(ucl.to_format(lua_mime.message_to_ucl(task), 'msgpack')) content_type = 'application/zstd' else - ext = 'msgpack' content = ucl.to_format(lua_mime.message_to_ucl(task), 'msgpack') content_type = 'application/msgpack' end - local path = string.format('/%s-%s.%s', queue_id, nonce, ext) + local path = string.format('/%s-%s.%s', queue_id, nonce, gen_ext()) - return path, content, content_type + return path, content, content_type, external_refs end local function s3_aws_callback(task) @@ -97,7 +136,7 @@ local function s3_aws_callback(task) -- Hack to pass host local aws_host = string.format('%s.%s', settings.s3_bucket, settings.s3_host) - local function gen_s3_http_callback(path) + local function gen_s3_http_callback(path, what) return function (http_err, code, body, headers) if http_err then @@ -108,7 +147,7 @@ local function s3_aws_callback(task) end rspamd_logger.errx(task, 'cannot save %s to AWS S3: %s', path, http_err) else - rspamd_logger.messagex(task, 'saved message successfully in S3 object %s', path) + rspamd_logger.messagex(task, 'saved %s successfully in S3 object %s', what, path) end lua_util.debugm(N, task, 'obj=%s, err=%s, code=%s, body=%s, headers=%s', path, http_err, code, body, headers) @@ -133,13 +172,13 @@ local function s3_aws_callback(task) task = task, method = 'PUT', body = content, - callback = gen_s3_http_callback(path), + callback = gen_s3_http_callback(path, 'raw message'), headers = hdrs, timeout = settings.s3_timeout, }) end if settings.save_structure then - local path, content, content_type = structured_data(task, nonce, queue_id) + local path, content, content_type, external_refs = structured_data(task, nonce, queue_id) local hdrs = lua_aws.aws_request_enrich({ region = settings.s3_region, headers = { @@ -156,10 +195,22 @@ local function s3_aws_callback(task) task = task, method = 'PUT', body = content, - callback = gen_s3_http_callback(path), + callback = gen_s3_http_callback(path, 'structured message'), headers = hdrs, timeout = settings.s3_timeout, }) + + for _,ref in ipairs(external_refs) do + rspamd_http.request({ + url = uri .. ref, + task = task, + method = 'PUT', + body = content, + callback = gen_s3_http_callback(ref, 'part content'), + headers = hdrs, + timeout = settings.s3_timeout, + }) + end end