You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

elastic.lua 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. --[[
  2. Copyright (c) 2017, Veselin Iordanov
  3. Copyright (c) 2022, Vsevolod Stakhov <vsevolod@rspamd.com>
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. ]]--
  14. local rspamd_logger = require 'rspamd_logger'
  15. local rspamd_http = require "rspamd_http"
  16. local lua_util = require "lua_util"
  17. local util = require "rspamd_util"
  18. local ucl = require "ucl"
  19. local rspamd_redis = require "lua_redis"
  20. local upstream_list = require "rspamd_upstream_list"
  21. local lua_settings = require "lua_settings"
  22. if confighelp then
  23. return
  24. end
  25. local rows = {}
  26. local nrows = 0
  27. local failed_sends = 0
  28. local elastic_template
  29. local redis_params
  30. local N = "elastic"
  31. local E = {}
  32. local HOSTNAME = util.get_hostname()
  33. local connect_prefix = 'http://'
  34. local enabled = true
  35. local ingest_geoip_type = 'plugins'
  36. local settings = {
  37. limit = 500,
  38. index_pattern = 'rspamd-%Y.%m.%d',
  39. template_file = rspamd_paths['SHAREDIR'] .. '/elastic/rspamd_template.json',
  40. kibana_file = rspamd_paths['SHAREDIR'] .. '/elastic/kibana.json',
  41. key_prefix = 'elastic-',
  42. expire = 3600,
  43. timeout = 5.0,
  44. failover = false,
  45. import_kibana = false,
  46. use_https = false,
  47. use_gzip = true,
  48. allow_local = false,
  49. user = nil,
  50. password = nil,
  51. no_ssl_verify = false,
  52. max_fail = 3,
  53. ingest_module = false,
  54. elasticsearch_version = 6,
  55. }
  56. local function read_file(path)
  57. local file = io.open(path, "rb")
  58. if not file then
  59. return nil
  60. end
  61. local content = file:read "*a"
  62. file:close()
  63. return content
  64. end
  65. local function elastic_send_data(task)
  66. local es_index = os.date(settings['index_pattern'])
  67. local tbl = {}
  68. for _, value in pairs(rows) do
  69. if settings.elasticsearch_version >= 7 then
  70. table.insert(tbl, '{ "index" : { "_index" : "' .. es_index ..
  71. '","pipeline": "rspamd-geoip"} }')
  72. else
  73. table.insert(tbl, '{ "index" : { "_index" : "' .. es_index ..
  74. '", "_type" : "_doc" ,"pipeline": "rspamd-geoip"} }')
  75. end
  76. table.insert(tbl, ucl.to_format(value, 'json-compact'))
  77. end
  78. table.insert(tbl, '') -- For last \n
  79. local upstream = settings.upstream:get_upstream_round_robin()
  80. local ip_addr = upstream:get_addr():to_string(true)
  81. local push_url = connect_prefix .. ip_addr .. '/' .. es_index .. '/_bulk'
  82. local bulk_json = table.concat(tbl, "\n")
  83. local function http_callback(err, code, _, _)
  84. if err then
  85. rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s; failed attempts: %s/%s",
  86. push_url, err, failed_sends, settings.max_fail)
  87. else
  88. if code ~= 200 then
  89. rspamd_logger.infox(task,
  90. "cannot push data to elastic backend (%s): wrong http code %s (%s); failed attempts: %s/%s",
  91. push_url, err, code, failed_sends, settings.max_fail)
  92. else
  93. lua_util.debugm(N, task, "successfully sent %s (%s bytes) rows to ES",
  94. nrows, #bulk_json)
  95. end
  96. end
  97. end
  98. return rspamd_http.request({
  99. url = push_url,
  100. headers = {
  101. ['Content-Type'] = 'application/x-ndjson',
  102. },
  103. body = bulk_json,
  104. callback = http_callback,
  105. task = task,
  106. method = 'post',
  107. gzip = settings.use_gzip,
  108. no_ssl_verify = settings.no_ssl_verify,
  109. user = settings.user,
  110. password = settings.password,
  111. timeout = settings.timeout,
  112. })
  113. end
  114. local function get_general_metadata(task)
  115. local r = {}
  116. local ip_addr = task:get_ip()
  117. if ip_addr and ip_addr:is_valid() then
  118. r.is_local = ip_addr:is_local()
  119. r.ip = tostring(ip_addr)
  120. else
  121. r.ip = '127.0.0.1'
  122. end
  123. r.webmail = false
  124. r.sender_ip = 'unknown'
  125. local origin = task:get_header('X-Originating-IP')
  126. if origin then
  127. origin = origin:gsub('%[', ''):gsub('%]', '')
  128. local rspamd_ip = require "rspamd_ip"
  129. local origin_ip = rspamd_ip.from_string(origin)
  130. if origin_ip and origin_ip:is_valid() then
  131. r.webmail = true
  132. r.sender_ip = origin -- use string here
  133. end
  134. end
  135. r.direction = "Inbound"
  136. r.user = task:get_user() or 'unknown'
  137. r.qid = task:get_queue_id() or 'unknown'
  138. r.action = task:get_metric_action()
  139. r.rspamd_server = HOSTNAME
  140. if r.user ~= 'unknown' then
  141. r.direction = "Outbound"
  142. end
  143. local s = task:get_metric_score()[1]
  144. r.score = s
  145. local rcpt = task:get_recipients('smtp')
  146. if rcpt then
  147. local l = {}
  148. for _, a in ipairs(rcpt) do
  149. table.insert(l, a['addr'])
  150. end
  151. r.rcpt = l
  152. else
  153. r.rcpt = 'unknown'
  154. end
  155. local from = task:get_from { 'smtp', 'orig' }
  156. if ((from or E)[1] or E).addr then
  157. r.from = from[1].addr
  158. else
  159. r.from = 'unknown'
  160. end
  161. local mime_from = task:get_from { 'mime', 'orig' }
  162. if ((mime_from or E)[1] or E).addr then
  163. r.mime_from = mime_from[1].addr
  164. else
  165. r.mime_from = 'unknown'
  166. end
  167. local syminf = task:get_symbols_all()
  168. r.symbols = syminf
  169. r.asn = {}
  170. local pool = task:get_mempool()
  171. r.asn.country = pool:get_variable("country") or 'unknown'
  172. r.asn.asn = pool:get_variable("asn") or 0
  173. r.asn.ipnet = pool:get_variable("ipnet") or 'unknown'
  174. local function process_header(name)
  175. local hdr = task:get_header_full(name)
  176. if hdr then
  177. local l = {}
  178. for _, h in ipairs(hdr) do
  179. table.insert(l, h.decoded)
  180. end
  181. return l
  182. else
  183. return 'unknown'
  184. end
  185. end
  186. r.header_from = process_header('from')
  187. r.header_to = process_header('to')
  188. r.header_subject = process_header('subject')
  189. r.header_date = process_header('date')
  190. r.message_id = task:get_message_id()
  191. local hname = task:get_hostname() or 'unknown'
  192. r.hostname = hname
  193. local settings_id = task:get_settings_id()
  194. if settings_id then
  195. -- Convert to string
  196. settings_id = lua_settings.settings_by_id(settings_id)
  197. if settings_id then
  198. settings_id = settings_id.name
  199. end
  200. end
  201. if not settings_id then
  202. settings_id = ''
  203. end
  204. r.settings_id = settings_id
  205. local scan_real = task:get_scan_time()
  206. scan_real = math.floor(scan_real * 1000)
  207. if scan_real < 0 then
  208. rspamd_logger.messagex(task,
  209. 'clock skew detected for message: %s ms real scan time (reset to 0)',
  210. scan_real)
  211. scan_real = 0
  212. end
  213. r.scan_time = scan_real
  214. return r
  215. end
  216. local function elastic_collect(task)
  217. if not enabled then
  218. return
  219. end
  220. if task:has_flag('skip') then
  221. return
  222. end
  223. if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then
  224. return
  225. end
  226. local row = { ['rspamd_meta'] = get_general_metadata(task),
  227. ['@timestamp'] = tostring(util.get_time() * 1000) }
  228. table.insert(rows, row)
  229. nrows = nrows + 1
  230. if nrows > settings['limit'] then
  231. lua_util.debugm(N, task, 'send elastic search rows: %s', nrows)
  232. if elastic_send_data(task) then
  233. nrows = 0
  234. rows = {}
  235. failed_sends = 0;
  236. else
  237. failed_sends = failed_sends + 1
  238. if failed_sends > settings.max_fail then
  239. rspamd_logger.errx(task, 'cannot send %s rows to ES %s times, stop trying',
  240. nrows, failed_sends)
  241. nrows = 0
  242. rows = {}
  243. failed_sends = 0;
  244. end
  245. end
  246. end
  247. end
  248. local opts = rspamd_config:get_all_opt('elastic')
  249. local function check_elastic_server(cfg, ev_base, _)
  250. local upstream = settings.upstream:get_upstream_round_robin()
  251. local ip_addr = upstream:get_addr():to_string(true)
  252. local plugins_url = connect_prefix .. ip_addr .. '/_nodes/' .. ingest_geoip_type
  253. local function http_callback(err, code, body, _)
  254. if code == 200 then
  255. local parser = ucl.parser()
  256. local res, ucl_err = parser:parse_string(body)
  257. if not res then
  258. rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s',
  259. plugins_url, ucl_err)
  260. enabled = false;
  261. return
  262. end
  263. local obj = parser:get_object()
  264. for node, value in pairs(obj['nodes']) do
  265. local plugin_found = false
  266. for _, plugin in pairs(value['plugins']) do
  267. if plugin['name'] == 'ingest-geoip' then
  268. plugin_found = true
  269. lua_util.debugm(N, "ingest-geoip plugin has been found")
  270. end
  271. end
  272. if not plugin_found then
  273. rspamd_logger.infox(rspamd_config,
  274. 'Unable to find ingest-geoip on %1 node, disabling module', node)
  275. enabled = false
  276. return
  277. end
  278. end
  279. else
  280. rspamd_logger.errx('cannot get plugins from %s: %s(%s) (%s)', plugins_url,
  281. err, code, body)
  282. enabled = false
  283. end
  284. end
  285. rspamd_http.request({
  286. url = plugins_url,
  287. ev_base = ev_base,
  288. config = cfg,
  289. method = 'get',
  290. callback = http_callback,
  291. no_ssl_verify = settings.no_ssl_verify,
  292. user = settings.user,
  293. password = settings.password,
  294. timeout = settings.timeout,
  295. })
  296. end
  297. -- import ingest pipeline and kibana dashboard/visualization
  298. local function initial_setup(cfg, ev_base, worker)
  299. if not worker:is_primary_controller() then
  300. return
  301. end
  302. local upstream = settings.upstream:get_upstream_round_robin()
  303. local ip_addr = upstream:get_addr():to_string(true)
  304. local function push_kibana_template()
  305. -- add kibana dashboard and visualizations
  306. if settings['import_kibana'] then
  307. local kibana_mappings = read_file(settings['kibana_file'])
  308. if kibana_mappings then
  309. local parser = ucl.parser()
  310. local res, parser_err = parser:parse_string(kibana_mappings)
  311. if not res then
  312. rspamd_logger.infox(rspamd_config, 'kibana template cannot be parsed: %s',
  313. parser_err)
  314. enabled = false
  315. return
  316. end
  317. local obj = parser:get_object()
  318. local tbl = {}
  319. for _, item in ipairs(obj) do
  320. table.insert(tbl, '{ "index" : { "_index" : ".kibana", "_type" : "doc" ,"_id": "' ..
  321. item['_type'] .. ':' .. item["_id"] .. '"} }')
  322. table.insert(tbl, ucl.to_format(item['_source'], 'json-compact'))
  323. end
  324. table.insert(tbl, '') -- For last \n
  325. local kibana_url = connect_prefix .. ip_addr .. '/.kibana/_bulk'
  326. local function kibana_template_callback(err, code, body, _)
  327. if code ~= 200 then
  328. rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)', kibana_url,
  329. err, code, body)
  330. enabled = false
  331. else
  332. lua_util.debugm(N, 'pushed kibana template: %s', body)
  333. end
  334. end
  335. rspamd_http.request({
  336. url = kibana_url,
  337. ev_base = ev_base,
  338. config = cfg,
  339. headers = {
  340. ['Content-Type'] = 'application/x-ndjson',
  341. },
  342. body = table.concat(tbl, "\n"),
  343. method = 'post',
  344. gzip = settings.use_gzip,
  345. callback = kibana_template_callback,
  346. no_ssl_verify = settings.no_ssl_verify,
  347. user = settings.user,
  348. password = settings.password,
  349. timeout = settings.timeout,
  350. })
  351. else
  352. rspamd_logger.infox(rspamd_config, 'kibana template file %s not found', settings['kibana_file'])
  353. end
  354. end
  355. end
  356. if enabled then
  357. -- create ingest pipeline
  358. local geoip_url = connect_prefix .. ip_addr .. '/_ingest/pipeline/rspamd-geoip'
  359. local function geoip_cb(err, code, body, _)
  360. if code ~= 200 then
  361. rspamd_logger.errx('cannot get data from %s: %s(%s) (%s)',
  362. geoip_url, err, code, body)
  363. enabled = false
  364. end
  365. end
  366. local template = {
  367. description = "Add geoip info for rspamd",
  368. processors = {
  369. {
  370. geoip = {
  371. field = "rspamd_meta.ip",
  372. target_field = "rspamd_meta.geoip"
  373. }
  374. }
  375. }
  376. }
  377. rspamd_http.request({
  378. url = geoip_url,
  379. ev_base = ev_base,
  380. config = cfg,
  381. callback = geoip_cb,
  382. headers = {
  383. ['Content-Type'] = 'application/json',
  384. },
  385. gzip = settings.use_gzip,
  386. body = ucl.to_format(template, 'json-compact'),
  387. method = 'put',
  388. no_ssl_verify = settings.no_ssl_verify,
  389. user = settings.user,
  390. password = settings.password,
  391. timeout = settings.timeout,
  392. })
  393. -- create template mappings if not exist
  394. local template_url = connect_prefix .. ip_addr .. '/_template/rspamd'
  395. local function http_template_put_callback(err, code, body, _)
  396. if code ~= 200 then
  397. rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)',
  398. template_url, err, code, body)
  399. enabled = false
  400. else
  401. lua_util.debugm(N, 'pushed rspamd template: %s', body)
  402. push_kibana_template()
  403. end
  404. end
  405. local function http_template_exist_callback(_, code, _, _)
  406. if code ~= 200 then
  407. rspamd_http.request({
  408. url = template_url,
  409. ev_base = ev_base,
  410. config = cfg,
  411. body = elastic_template,
  412. method = 'put',
  413. headers = {
  414. ['Content-Type'] = 'application/json',
  415. },
  416. gzip = settings.use_gzip,
  417. callback = http_template_put_callback,
  418. no_ssl_verify = settings.no_ssl_verify,
  419. user = settings.user,
  420. password = settings.password,
  421. timeout = settings.timeout,
  422. })
  423. else
  424. push_kibana_template()
  425. end
  426. end
  427. rspamd_http.request({
  428. url = template_url,
  429. ev_base = ev_base,
  430. config = cfg,
  431. method = 'head',
  432. callback = http_template_exist_callback,
  433. no_ssl_verify = settings.no_ssl_verify,
  434. user = settings.user,
  435. password = settings.password,
  436. timeout = settings.timeout,
  437. })
  438. end
  439. end
  440. redis_params = rspamd_redis.parse_redis_server('elastic')
  441. if redis_params and opts then
  442. for k, v in pairs(opts) do
  443. settings[k] = v
  444. end
  445. if not settings['server'] and not settings['servers'] then
  446. rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
  447. lua_util.disable_module(N, "config")
  448. else
  449. if settings.use_https then
  450. connect_prefix = 'https://'
  451. end
  452. if settings.ingest_module then
  453. ingest_geoip_type = 'modules'
  454. end
  455. settings.upstream = upstream_list.create(rspamd_config,
  456. settings['server'] or settings['servers'], 9200)
  457. if not settings.upstream then
  458. rspamd_logger.errx('cannot parse elastic address: %s',
  459. settings['server'] or settings['servers'])
  460. lua_util.disable_module(N, "config")
  461. return
  462. end
  463. if not settings['template_file'] then
  464. rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module')
  465. lua_util.disable_module(N, "config")
  466. return
  467. end
  468. elastic_template = read_file(settings['template_file']);
  469. if not elastic_template then
  470. rspamd_logger.infox(rspamd_config, 'elastic unable to read %s, disabling module',
  471. settings['template_file'])
  472. lua_util.disable_module(N, "config")
  473. return
  474. end
  475. rspamd_config:register_symbol({
  476. name = 'ELASTIC_COLLECT',
  477. type = 'idempotent',
  478. callback = elastic_collect,
  479. flags = 'empty,explicit_disable,ignore_passthrough',
  480. augmentations = { string.format("timeout=%f", settings.timeout) },
  481. })
  482. rspamd_config:add_on_load(function(cfg, ev_base, worker)
  483. if worker:is_scanner() then
  484. check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements
  485. initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations
  486. end
  487. end)
  488. end
  489. end