diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2021-10-15 11:43:00 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2021-10-15 11:43:00 +0100 |
commit | a30ec7cc13007a579fff471cbd9e77c57fedd7a1 (patch) | |
tree | c641be9211900f4aba8bf602e86a77fc3714ff61 /src/plugins/lua/aws_s3.lua | |
parent | f2298a00267d69c4ee03c15fcf800b4e71105192 (diff) | |
download | rspamd-a30ec7cc13007a579fff471cbd9e77c57fedd7a1.tar.gz rspamd-a30ec7cc13007a579fff471cbd9e77c57fedd7a1.zip |
[Feature] S3: Allow to store structured data in messagepack
Diffstat (limited to 'src/plugins/lua/aws_s3.lua')
-rw-r--r-- | src/plugins/lua/aws_s3.lua | 136 |
1 files changed, 97 insertions, 39 deletions
diff --git a/src/plugins/lua/aws_s3.lua b/src/plugins/lua/aws_s3.lua index bdfa4f402..7ec65eb5b 100644 --- a/src/plugins/lua/aws_s3.lua +++ b/src/plugins/lua/aws_s3.lua @@ -29,6 +29,8 @@ local settings = { s3_secret_key = nil, s3_key_id = nil, s3_timeout = 10, + save_raw = true, + save_structure = false, } local settings_schema = ts.shape{ @@ -40,13 +42,11 @@ local settings_schema = ts.shape{ enabled = ts.boolean:is_optional(), fail_action = ts.string:is_optional(), zstd_compress = ts.boolean:is_optional(), + save_raw = ts.boolean:is_optional(), + save_structure = ts.boolean:is_optional(), } -local function s3_aws_callback(task) - local uri = string.format('https://%s.s3.amazonaws.com', settings.s3_bucket) - -- Create a nonce - local nonce = rspamd_text.randombytes(16):base32() - local queue_id = task:get_queue_id() +local function raw_data(task, nonce, queue_id) local ext, content, content_type if settings.zstd_compress then @@ -59,50 +59,108 @@ local function s3_aws_callback(task) content_type = 'message/rfc-822' end + local path = string.format('/%s-%s.%s', queue_id, nonce, ext) + + return path, content, content_type +end + +local function structured_data(task, nonce, queue_id) + local ext, content, content_type + local lua_mime = require "lua_mime" + local ucl = require "ucl" + + if settings.zstd_compress then + ext = 'msgpack.zst' + content = rspamd_util.zstd_compress(ucl.to_format(lua_mime.message_to_ucl(task), 'msgpack')) + content_type = 'application/zstd' + else + ext = 'msgpack' + content = ucl.to_format(lua_mime.message_to_ucl(task), 'msgpack') + content_type = 'application/msgpack' + end + + local path = string.format('/%s-%s.%s', queue_id, nonce, ext) + + return path, content, content_type +end + +local function s3_aws_callback(task) + local uri = string.format('https://%s.s3.amazonaws.com', settings.s3_bucket) + -- Create a nonce + local nonce = rspamd_text.randombytes(16):base32() + local queue_id = task:get_queue_id() if not queue_id then queue_id = rspamd_text.randombytes(8):base32() end - local path = string.format('/%s-%s.%s', queue_id, nonce, ext) -- Hack to pass host local aws_host = string.format('%s.s3.amazonaws.com', settings.s3_bucket) - local hdrs = lua_aws.aws_request_enrich({ - region = settings.s3_region, - headers = { - ['Content-Type'] = content_type, - ['Host'] = aws_host - }, - uri = path, - key_id = settings.s3_key_id, - secret_key = settings.s3_secret_key, - method = 'PUT', - }, content) - - local function s3_http_callback(http_err, code, body, headers) - - if http_err then - if settings.fail_action then - task:set_pre_result(settings.fail_action, - string.format('S3 save failed: %s', http_err), N, - nil, nil, 'least') + local function gen_s3_http_callback(path) + return function (http_err, code, body, headers) + + if http_err then + if settings.fail_action then + task:set_pre_result(settings.fail_action, + string.format('S3 save failed: %s', http_err), N, + nil, nil, 'least') + end + rspamd_logger.errx(task, 'cannot save %s to AWS S3: %s', path, http_err) + else + rspamd_logger.messagex(task, 'saved message successfully in S3 object %s', path) end - rspamd_logger.errx(task, 'cannot save %s to AWS S3: %s', path, http_err) - else - rspamd_logger.messagex(task, 'saved message successfully in S3 object %s', path) + lua_util.debugm(N, task, 'obj=%s, err=%s, code=%s, body=%s, headers=%s', + path, http_err, code, body, headers) end - lua_util.debugm(N, task, 'obj=%s, err=%s, code=%s, body=%s, headers=%s', - path, http_err, code, body, headers) end - rspamd_http.request({ - url = uri .. path, - task = task, - method = 'PUT', - body = content, - callback = s3_http_callback, - headers = hdrs, - timeout = settings.s3_timeout, - }) + if settings.save_raw then + local path, content, content_type = raw_data(task, nonce, queue_id) + local hdrs = lua_aws.aws_request_enrich({ + region = settings.s3_region, + headers = { + ['Content-Type'] = content_type, + ['Host'] = aws_host + }, + uri = path, + key_id = settings.s3_key_id, + secret_key = settings.s3_secret_key, + method = 'PUT', + }, content) + rspamd_http.request({ + url = uri .. path, + task = task, + method = 'PUT', + body = content, + callback = gen_s3_http_callback(path), + headers = hdrs, + timeout = settings.s3_timeout, + }) + end + if settings.save_structure then + local path, content, content_type = structured_data(task, nonce, queue_id) + local hdrs = lua_aws.aws_request_enrich({ + region = settings.s3_region, + headers = { + ['Content-Type'] = content_type, + ['Host'] = aws_host + }, + uri = path, + key_id = settings.s3_key_id, + secret_key = settings.s3_secret_key, + method = 'PUT', + }, content) + rspamd_http.request({ + url = uri .. path, + task = task, + method = 'PUT', + body = content, + callback = gen_s3_http_callback(path), + headers = hdrs, + timeout = settings.s3_timeout, + }) + end + + end local opts = rspamd_config:get_all_opt('aws_s3') |