You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

aws_s3.lua 7.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. --[[
  2. Copyright (c) 2022, Vsevolod Stakhov <vsevolod@rspamd.com>
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ]]--
  13. local N = "aws_s3"
  14. local lua_util = require "lua_util"
  15. local lua_aws = require "lua_aws"
  16. local rspamd_logger = require "rspamd_logger"
  17. local ts = (require "tableshape").types
  18. local rspamd_text = require "rspamd_text"
  19. local rspamd_http = require "rspamd_http"
  20. local rspamd_util = require "rspamd_util"
  21. local settings = {
  22. s3_bucket = nil,
  23. s3_region = 'us-east-1',
  24. s3_host = 's3.amazonaws.com',
  25. s3_secret_key = nil,
  26. s3_key_id = nil,
  27. s3_timeout = 10,
  28. save_raw = true,
  29. save_structure = false,
  30. inline_content_limit = nil,
  31. }
  32. local settings_schema = ts.shape{
  33. s3_bucket = ts.string,
  34. s3_region = ts.string,
  35. s3_host = ts.string,
  36. s3_secret_key = ts.string,
  37. s3_key_id = ts.string,
  38. s3_timeout = ts.number + ts.string / lua_util.parse_time_interval,
  39. enabled = ts.boolean:is_optional(),
  40. fail_action = ts.string:is_optional(),
  41. zstd_compress = ts.boolean:is_optional(),
  42. save_raw = ts.boolean:is_optional(),
  43. save_structure = ts.boolean:is_optional(),
  44. inline_content_limit = ts.number:is_optional(),
  45. }
  46. local function raw_data(task, nonce, queue_id)
  47. local ext, content, content_type
  48. if settings.zstd_compress then
  49. ext = 'eml.zst'
  50. content = rspamd_util.zstd_compress(task:get_content())
  51. content_type = 'application/zstd'
  52. else
  53. ext = 'eml'
  54. content = task:get_content()
  55. content_type = 'message/rfc-822'
  56. end
  57. local path = string.format('/%s-%s.%s', queue_id, nonce, ext)
  58. return path, content, content_type
  59. end
  60. local function gen_ext(base)
  61. local ext = base
  62. if settings.zstd_compress then
  63. ext = base .. '.zst'
  64. end
  65. return ext
  66. end
  67. local function convert_to_ref(task, nonce, queue_id, part, external_refs)
  68. local path = string.format('/%s-%s-%s.%s', queue_id, nonce,
  69. rspamd_text.randombytes(8):base32(), gen_ext('raw'))
  70. local content = part.content
  71. if settings.zstd_compress then
  72. external_refs[path] = rspamd_util.zstd_compress(content)
  73. else
  74. external_refs[path] = content
  75. end
  76. part.content = nil
  77. part.content_path = path
  78. return path
  79. end
  80. local function structured_data(task, nonce, queue_id)
  81. local content, content_type
  82. local external_refs = {}
  83. local lua_mime = require "lua_mime"
  84. local ucl = require "ucl"
  85. local message_split = lua_mime.message_to_ucl(task)
  86. if settings.inline_content_limit and settings.inline_content_limit > 0 then
  87. for i,part in ipairs(message_split.parts or {}) do
  88. if part.content and #part.content >= settings.inline_content_limit then
  89. local ref = convert_to_ref(task, nonce, queue_id, part, external_refs)
  90. lua_util.debugm(N, task, "convert part number %s to a reference %s",
  91. i, ref)
  92. end
  93. end
  94. end
  95. if settings.zstd_compress then
  96. content = rspamd_util.zstd_compress(ucl.to_format(message_split, 'msgpack'))
  97. content_type = 'application/zstd'
  98. else
  99. content = ucl.to_format(message_split, 'msgpack')
  100. content_type = 'application/msgpack'
  101. end
  102. local path = string.format('/%s-%s.%s', queue_id, nonce, gen_ext('msgpack'))
  103. return path, content, content_type, external_refs
  104. end
  105. local function s3_aws_callback(task)
  106. local uri = string.format('https://%s.%s', settings.s3_bucket, settings.s3_host)
  107. -- Create a nonce
  108. local nonce = rspamd_text.randombytes(16):base32()
  109. local queue_id = task:get_queue_id()
  110. if not queue_id then
  111. queue_id = rspamd_text.randombytes(8):base32()
  112. end
  113. -- Hack to pass host
  114. local aws_host = string.format('%s.%s', settings.s3_bucket, settings.s3_host)
  115. local function gen_s3_http_callback(path, what)
  116. return function (http_err, code, body, headers)
  117. if http_err then
  118. if settings.fail_action then
  119. task:set_pre_result(settings.fail_action,
  120. string.format('S3 save failed: %s', http_err), N,
  121. nil, nil, 'least')
  122. end
  123. rspamd_logger.errx(task, 'cannot save %s to AWS S3: %s', path, http_err)
  124. else
  125. rspamd_logger.messagex(task, 'saved %s successfully in S3 object %s', what, path)
  126. end
  127. lua_util.debugm(N, task, 'obj=%s, err=%s, code=%s, body=%s, headers=%s',
  128. path, http_err, code, body, headers)
  129. end
  130. end
  131. if settings.save_raw then
  132. local path, content, content_type = raw_data(task, nonce, queue_id)
  133. local hdrs = lua_aws.aws_request_enrich({
  134. region = settings.s3_region,
  135. headers = {
  136. ['Content-Type'] = content_type,
  137. ['Host'] = aws_host
  138. },
  139. uri = path,
  140. key_id = settings.s3_key_id,
  141. secret_key = settings.s3_secret_key,
  142. method = 'PUT',
  143. }, content)
  144. rspamd_http.request({
  145. url = uri .. path,
  146. task = task,
  147. method = 'PUT',
  148. body = content,
  149. callback = gen_s3_http_callback(path, 'raw message'),
  150. headers = hdrs,
  151. timeout = settings.s3_timeout,
  152. })
  153. end
  154. if settings.save_structure then
  155. local path, content, content_type, external_refs = structured_data(task, nonce, queue_id)
  156. local hdrs = lua_aws.aws_request_enrich({
  157. region = settings.s3_region,
  158. headers = {
  159. ['Content-Type'] = content_type,
  160. ['Host'] = aws_host
  161. },
  162. uri = path,
  163. key_id = settings.s3_key_id,
  164. secret_key = settings.s3_secret_key,
  165. method = 'PUT',
  166. }, content)
  167. rspamd_http.request({
  168. url = uri .. path,
  169. task = task,
  170. method = 'PUT',
  171. body = content,
  172. callback = gen_s3_http_callback(path, 'structured message'),
  173. headers = hdrs,
  174. upstream = settings.upstreams:get_upstream_round_robin(),
  175. timeout = settings.s3_timeout,
  176. })
  177. for ref,part_content in pairs(external_refs) do
  178. local part_hdrs = lua_aws.aws_request_enrich({
  179. region = settings.s3_region,
  180. headers = {
  181. ['Content-Type'] = content_type,
  182. ['Host'] = aws_host
  183. },
  184. uri = ref,
  185. key_id = settings.s3_key_id,
  186. secret_key = settings.s3_secret_key,
  187. method = 'PUT',
  188. }, part_content)
  189. rspamd_http.request({
  190. url = uri .. ref,
  191. task = task,
  192. upstream = settings.upstreams:get_upstream_round_robin(),
  193. method = 'PUT',
  194. body = part_content,
  195. callback = gen_s3_http_callback(ref, 'part content'),
  196. headers = part_hdrs,
  197. timeout = settings.s3_timeout,
  198. })
  199. end
  200. end
  201. end
  202. local opts = rspamd_config:get_all_opt('aws_s3')
  203. if not opts then
  204. return
  205. end
  206. settings = lua_util.override_defaults(settings, opts)
  207. local res,err = settings_schema:transform(settings)
  208. if not res then
  209. rspamd_logger.warnx(rspamd_config, 'plugin is misconfigured: %s', err)
  210. lua_util.disable_module(N, "config")
  211. return
  212. end
  213. rspamd_logger.infox(rspamd_config, 'enabled AWS s3 dump to %s', res.s3_bucket)
  214. settings = res
  215. settings.upstreams = lua_util.http_upstreams_by_url(rspamd_config:get_mempool(),
  216. string.format('https://%s.%s', settings.s3_bucket, settings.s3_host))
  217. if not settings.upstreams then
  218. rspamd_logger.warnx(rspamd_config, 'cannot parse hostname: %s',
  219. string.format('https://%s.%s', settings.s3_bucket, settings.s3_host))
  220. lua_util.disable_module(N, "config")
  221. return
  222. end
  223. rspamd_config:register_symbol({
  224. name = 'EXPORT_AWS_S3',
  225. type = settings.fail_action and 'postfilter' or 'idempotent',
  226. callback = s3_aws_callback,
  227. priority = settings.fail_action and lua_util.symbols_priorities.high or nil,
  228. flags = 'empty,explicit_disable,ignore_passthrough,nostat',
  229. })