]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Aws_s3: Allow to store large parts separately
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 8 Dec 2021 14:57:57 +0000 (14:57 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 8 Dec 2021 14:57:57 +0000 (14:57 +0000)
src/plugins/lua/aws_s3.lua

index 1a78730868f2ea477c864c030591679ac05914ae..ec45dca7c4ba2220bf7a90c866e5bd2019c30f91 100644 (file)
@@ -32,6 +32,7 @@ local settings = {
   s3_timeout = 10,
   save_raw = true,
   save_structure = false,
+  inline_content_limit = nil,
 }
 
 local settings_schema = ts.shape{
@@ -46,6 +47,7 @@ local settings_schema = ts.shape{
   zstd_compress = ts.boolean:is_optional(),
   save_raw = ts.boolean:is_optional(),
   save_structure = ts.boolean:is_optional(),
+  inline_content_limit = (ts.integer + ts.string / tonumber):is_optional(),
 }
 
 local function raw_data(task, nonce, queue_id)
@@ -66,24 +68,61 @@ local function raw_data(task, nonce, queue_id)
   return path, content, content_type
 end
 
+local function gen_ext()
+  local ext = 'msgpack'
+  if settings.zstd_compress then
+    ext = 'msgpack.zst'
+  end
+
+  return ext
+end
+
+local function convert_to_ref(task, nonce, queue_id, part, external_refs)
+  local path = string.format('/%s-%s-%s.%s', queue_id, nonce,
+      rspamd_text.randombytes(8):base32(), gen_ext())
+  local content = part.content
+
+  if settings.zstd_compress then
+    external_refs[path] = rspamd_util.zstd_compress(content)
+  else
+    external_refs[path] = content
+  end
+
+  part.content = nil
+  part.content_path = path
+
+  return path
+end
+
 local function structured_data(task, nonce, queue_id)
-  local ext, content, content_type
+  local content, content_type
+  local external_refs = {}
   local lua_mime = require "lua_mime"
   local ucl = require "ucl"
 
+  local message_split = lua_mime.message_to_ucl(task)
+
+  if settings.inline_content_limit and settings.inline_content_limit > 0 then
+    for i,part in ipairs(message_split.parts() or {}) do
+      if part.content and #part.content >= settings.inline_content_limit then
+        local ref = convert_to_ref(task, nonce, queue_id, part, external_refs)
+        lua_util.debugm(N, task, "convert part number %s to a reference %s",
+          i, ref)
+      end
+    end
+  end
+
   if settings.zstd_compress then
-    ext = 'msgpack.zst'
     content = rspamd_util.zstd_compress(ucl.to_format(lua_mime.message_to_ucl(task), 'msgpack'))
     content_type = 'application/zstd'
   else
-    ext = 'msgpack'
     content = ucl.to_format(lua_mime.message_to_ucl(task), 'msgpack')
     content_type = 'application/msgpack'
   end
 
-  local path = string.format('/%s-%s.%s', queue_id, nonce, ext)
+  local path = string.format('/%s-%s.%s', queue_id, nonce, gen_ext())
 
-  return path, content, content_type
+  return path, content, content_type, external_refs
 end
 
 local function s3_aws_callback(task)
@@ -97,7 +136,7 @@ local function s3_aws_callback(task)
   -- Hack to pass host
   local aws_host = string.format('%s.%s', settings.s3_bucket, settings.s3_host)
 
-  local function gen_s3_http_callback(path)
+  local function gen_s3_http_callback(path, what)
     return function (http_err, code, body, headers)
 
       if http_err then
@@ -108,7 +147,7 @@ local function s3_aws_callback(task)
         end
         rspamd_logger.errx(task, 'cannot save %s to AWS S3: %s', path, http_err)
       else
-        rspamd_logger.messagex(task, 'saved message successfully in S3 object %s', path)
+        rspamd_logger.messagex(task, 'saved %s successfully in S3 object %s', what, path)
       end
       lua_util.debugm(N, task, 'obj=%s, err=%s, code=%s, body=%s, headers=%s',
           path, http_err, code, body, headers)
@@ -133,13 +172,13 @@ local function s3_aws_callback(task)
       task = task,
       method = 'PUT',
       body = content,
-      callback = gen_s3_http_callback(path),
+      callback = gen_s3_http_callback(path, 'raw message'),
       headers = hdrs,
       timeout = settings.s3_timeout,
     })
   end
   if settings.save_structure then
-    local path, content, content_type = structured_data(task, nonce, queue_id)
+    local path, content, content_type, external_refs = structured_data(task, nonce, queue_id)
     local hdrs = lua_aws.aws_request_enrich({
       region = settings.s3_region,
       headers = {
@@ -156,10 +195,22 @@ local function s3_aws_callback(task)
       task = task,
       method = 'PUT',
       body = content,
-      callback = gen_s3_http_callback(path),
+      callback = gen_s3_http_callback(path, 'structured message'),
       headers = hdrs,
       timeout = settings.s3_timeout,
     })
+
+    for _,ref in ipairs(external_refs) do
+      rspamd_http.request({
+        url = uri .. ref,
+        task = task,
+        method = 'PUT',
+        body = content,
+        callback = gen_s3_http_callback(ref, 'part content'),
+        headers = hdrs,
+        timeout = settings.s3_timeout,
+      })
+    end
   end