You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

elastic.lua 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. --[[
  2. Copyright (c) 2017, Veselin Iordanov
  3. Copyright (c) 2022, Vsevolod Stakhov <vsevolod@rspamd.com>
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. ]]--
  14. local rspamd_logger = require 'rspamd_logger'
  15. local rspamd_http = require "rspamd_http"
  16. local lua_util = require "lua_util"
  17. local util = require "rspamd_util"
  18. local ucl = require "ucl"
  19. local rspamd_redis = require "lua_redis"
  20. local upstream_list = require "rspamd_upstream_list"
  21. local lua_settings = require "lua_settings"
  22. if confighelp then
  23. return
  24. end
  25. local rows = {}
  26. local nrows = 0
  27. local failed_sends = 0
  28. local elastic_template
  29. local redis_params
  30. local N = "elastic"
  31. local E = {}
  32. local HOSTNAME = util.get_hostname()
  33. local connect_prefix = 'http://'
  34. local enabled = true
  35. local ingest_geoip_type = 'plugins'
  36. local settings = {
  37. limit = 500,
  38. index_pattern = 'rspamd-%Y.%m.%d',
  39. template_file = rspamd_paths['SHAREDIR'] .. '/elastic/rspamd_template.json',
  40. kibana_file = rspamd_paths['SHAREDIR'] ..'/elastic/kibana.json',
  41. key_prefix = 'elastic-',
  42. expire = 3600,
  43. timeout = 5.0,
  44. failover = false,
  45. import_kibana = false,
  46. use_https = false,
  47. use_gzip = true,
  48. allow_local = false,
  49. user = nil,
  50. password = nil,
  51. no_ssl_verify = false,
  52. max_fail = 3,
  53. ingest_module = false,
  54. }
  55. local function read_file(path)
  56. local file = io.open(path, "rb")
  57. if not file then return nil end
  58. local content = file:read "*a"
  59. file:close()
  60. return content
  61. end
  62. local function elastic_send_data(task)
  63. local es_index = os.date(settings['index_pattern'])
  64. local tbl = {}
  65. for _,value in pairs(rows) do
  66. table.insert(tbl, '{ "index" : { "_index" : "'..es_index..
  67. '", "_type" : "_doc" ,"pipeline": "rspamd-geoip"} }')
  68. table.insert(tbl, ucl.to_format(value, 'json-compact'))
  69. end
  70. table.insert(tbl, '') -- For last \n
  71. local upstream = settings.upstream:get_upstream_round_robin()
  72. local ip_addr = upstream:get_addr():to_string(true)
  73. local push_url = connect_prefix .. ip_addr .. '/'..es_index..'/_bulk'
  74. local bulk_json = table.concat(tbl, "\n")
  75. local function http_callback(err, code, _, _)
  76. if err then
  77. rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s; failed attempts: %s/%s",
  78. push_url, err, failed_sends, settings.max_fail)
  79. else
  80. if code ~= 200 then
  81. rspamd_logger.infox(task,
  82. "cannot push data to elastic backend (%s): wrong http code %s (%s); failed attempts: %s/%s",
  83. push_url, err, code, failed_sends, settings.max_fail)
  84. else
  85. lua_util.debugm(N, task, "successfully sent %s (%s bytes) rows to ES",
  86. nrows, #bulk_json)
  87. end
  88. end
  89. end
  90. return rspamd_http.request({
  91. url = push_url,
  92. headers = {
  93. ['Content-Type'] = 'application/x-ndjson',
  94. },
  95. body = bulk_json,
  96. callback = http_callback,
  97. task = task,
  98. method = 'post',
  99. gzip = settings.use_gzip,
  100. no_ssl_verify = settings.no_ssl_verify,
  101. user = settings.user,
  102. password = settings.password,
  103. timeout = settings.timeout,
  104. })
  105. end
  106. local function get_general_metadata(task)
  107. local r = {}
  108. local ip_addr = task:get_ip()
  109. r.webmail = false
  110. if ip_addr and ip_addr:is_valid() then
  111. r.is_local = ip_addr:is_local()
  112. local origin = task:get_header('X-Originating-IP')
  113. if origin then
  114. origin = string.sub(origin, 2, -2)
  115. local rspamd_ip = require "rspamd_ip"
  116. local test = rspamd_ip.from_string(origin)
  117. if test and test:is_valid() then
  118. r.webmail = true
  119. r.ip = origin
  120. else
  121. r.ip = tostring(ip_addr)
  122. end
  123. else
  124. r.ip = tostring(ip_addr)
  125. end
  126. else
  127. r.ip = '127.0.0.1'
  128. end
  129. r.direction = "Inbound"
  130. r.user = task:get_user() or 'unknown'
  131. r.qid = task:get_queue_id() or 'unknown'
  132. r.action = task:get_metric_action()
  133. r.rspamd_server = HOSTNAME
  134. if r.user ~= 'unknown' then
  135. r.direction = "Outbound"
  136. end
  137. local s = task:get_metric_score()[1]
  138. r.score = s
  139. local rcpt = task:get_recipients('smtp')
  140. if rcpt then
  141. local l = {}
  142. for _, a in ipairs(rcpt) do
  143. table.insert(l, a['addr'])
  144. end
  145. r.rcpt = l
  146. else
  147. r.rcpt = 'unknown'
  148. end
  149. local from = task:get_from{'smtp', 'orig'}
  150. if ((from or E)[1] or E).addr then
  151. r.from = from[1].addr
  152. else
  153. r.from = 'unknown'
  154. end
  155. local mime_from = task:get_from{'mime', 'orig'}
  156. if ((mime_from or E)[1] or E).addr then
  157. r.mime_from = mime_from[1].addr
  158. else
  159. r.mime_from = 'unknown'
  160. end
  161. local syminf = task:get_symbols_all()
  162. r.symbols = syminf
  163. r.asn = {}
  164. local pool = task:get_mempool()
  165. r.asn.country = pool:get_variable("country") or 'unknown'
  166. r.asn.asn = pool:get_variable("asn") or 0
  167. r.asn.ipnet = pool:get_variable("ipnet") or 'unknown'
  168. local function process_header(name)
  169. local hdr = task:get_header_full(name)
  170. if hdr then
  171. local l = {}
  172. for _, h in ipairs(hdr) do
  173. table.insert(l, h.decoded)
  174. end
  175. return l
  176. else
  177. return 'unknown'
  178. end
  179. end
  180. r.header_from = process_header('from')
  181. r.header_to = process_header('to')
  182. r.header_subject = process_header('subject')
  183. r.header_date = process_header('date')
  184. r.message_id = task:get_message_id()
  185. local hname = task:get_hostname() or 'unknown'
  186. r.hostname = hname
  187. local settings_id = task:get_settings_id()
  188. if settings_id then
  189. -- Convert to string
  190. settings_id = lua_settings.settings_by_id(settings_id)
  191. if settings_id then
  192. settings_id = settings_id.name
  193. end
  194. end
  195. if not settings_id then
  196. settings_id = ''
  197. end
  198. r.settings_id = settings_id
  199. local scan_real = task:get_scan_time()
  200. scan_real = math.floor(scan_real * 1000)
  201. if scan_real < 0 then
  202. rspamd_logger.messagex(task,
  203. 'clock skew detected for message: %s ms real scan time (reset to 0)',
  204. scan_real)
  205. scan_real = 0
  206. end
  207. r.scan_time = scan_real
  208. return r
  209. end
  210. local function elastic_collect(task)
  211. if not enabled then return end
  212. if task:has_flag('skip') then return end
  213. if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then return end
  214. local row = {['rspamd_meta'] = get_general_metadata(task),
  215. ['@timestamp'] = tostring(util.get_time() * 1000)}
  216. table.insert(rows, row)
  217. nrows = nrows + 1
  218. if nrows > settings['limit'] then
  219. lua_util.debugm(N, task, 'send elastic search rows: %s', nrows)
  220. if elastic_send_data(task) then
  221. nrows = 0
  222. rows = {}
  223. failed_sends = 0;
  224. else
  225. failed_sends = failed_sends + 1
  226. if failed_sends > settings.max_fail then
  227. rspamd_logger.errx(task, 'cannot send %s rows to ES %s times, stop trying',
  228. nrows, failed_sends)
  229. nrows = 0
  230. rows = {}
  231. failed_sends = 0;
  232. end
  233. end
  234. end
  235. end
  236. local opts = rspamd_config:get_all_opt('elastic')
  237. local function check_elastic_server(cfg, ev_base, _)
  238. local upstream = settings.upstream:get_upstream_round_robin()
  239. local ip_addr = upstream:get_addr():to_string(true)
  240. local plugins_url = connect_prefix .. ip_addr .. '/_nodes/' .. ingest_geoip_type
  241. local function http_callback(err, code, body, _)
  242. if code == 200 then
  243. local parser = ucl.parser()
  244. local res,ucl_err = parser:parse_string(body)
  245. if not res then
  246. rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s',
  247. plugins_url, ucl_err)
  248. enabled = false;
  249. return
  250. end
  251. local obj = parser:get_object()
  252. for node,value in pairs(obj['nodes']) do
  253. local plugin_found = false
  254. for _,plugin in pairs(value['plugins']) do
  255. if plugin['name'] == 'ingest-geoip' then
  256. plugin_found = true
  257. lua_util.debugm(N, "ingest-geoip plugin has been found")
  258. end
  259. end
  260. if not plugin_found then
  261. rspamd_logger.infox(rspamd_config,
  262. 'Unable to find ingest-geoip on %1 node, disabling module', node)
  263. enabled = false
  264. return
  265. end
  266. end
  267. else
  268. rspamd_logger.errx('cannot get plugins from %s: %s(%s) (%s)', plugins_url,
  269. err, code, body)
  270. enabled = false
  271. end
  272. end
  273. rspamd_http.request({
  274. url = plugins_url,
  275. ev_base = ev_base,
  276. config = cfg,
  277. method = 'get',
  278. callback = http_callback,
  279. no_ssl_verify = settings.no_ssl_verify,
  280. user = settings.user,
  281. password = settings.password,
  282. timeout = settings.timeout,
  283. })
  284. end
  285. -- import ingest pipeline and kibana dashboard/visualization
  286. local function initial_setup(cfg, ev_base, worker)
  287. if not worker:is_primary_controller() then return end
  288. local upstream = settings.upstream:get_upstream_round_robin()
  289. local ip_addr = upstream:get_addr():to_string(true)
  290. local function push_kibana_template()
  291. -- add kibana dashboard and visualizations
  292. if settings['import_kibana'] then
  293. local kibana_mappings = read_file(settings['kibana_file'])
  294. if kibana_mappings then
  295. local parser = ucl.parser()
  296. local res,parser_err = parser:parse_string(kibana_mappings)
  297. if not res then
  298. rspamd_logger.infox(rspamd_config, 'kibana template cannot be parsed: %s',
  299. parser_err)
  300. enabled = false
  301. return
  302. end
  303. local obj = parser:get_object()
  304. local tbl = {}
  305. for _,item in ipairs(obj) do
  306. table.insert(tbl, '{ "index" : { "_index" : ".kibana", "_type" : "doc" ,"_id": "'..
  307. item['_type'] .. ':' .. item["_id"]..'"} }')
  308. table.insert(tbl, ucl.to_format(item['_source'], 'json-compact'))
  309. end
  310. table.insert(tbl, '') -- For last \n
  311. local kibana_url = connect_prefix .. ip_addr ..'/.kibana/_bulk'
  312. local function kibana_template_callback(err, code, body, _)
  313. if code ~= 200 then
  314. rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)', kibana_url,
  315. err, code, body)
  316. enabled = false
  317. else
  318. lua_util.debugm(N, 'pushed kibana template: %s', body)
  319. end
  320. end
  321. rspamd_http.request({
  322. url = kibana_url,
  323. ev_base = ev_base,
  324. config = cfg,
  325. headers = {
  326. ['Content-Type'] = 'application/x-ndjson',
  327. },
  328. body = table.concat(tbl, "\n"),
  329. method = 'post',
  330. gzip = settings.use_gzip,
  331. callback = kibana_template_callback,
  332. no_ssl_verify = settings.no_ssl_verify,
  333. user = settings.user,
  334. password = settings.password,
  335. timeout = settings.timeout,
  336. })
  337. else
  338. rspamd_logger.infox(rspamd_config, 'kibana template file %s not found', settings['kibana_file'])
  339. end
  340. end
  341. end
  342. if enabled then
  343. -- create ingest pipeline
  344. local geoip_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip'
  345. local function geoip_cb(err, code, body, _)
  346. if code ~= 200 then
  347. rspamd_logger.errx('cannot get data from %s: %s(%s) (%s)',
  348. geoip_url, err, code, body)
  349. enabled = false
  350. end
  351. end
  352. local template = {
  353. description = "Add geoip info for rspamd",
  354. processors = {
  355. {
  356. geoip = {
  357. field = "rspamd_meta.ip",
  358. target_field = "rspamd_meta.geoip"
  359. }
  360. }
  361. }
  362. }
  363. rspamd_http.request({
  364. url = geoip_url,
  365. ev_base = ev_base,
  366. config = cfg,
  367. callback = geoip_cb,
  368. headers = {
  369. ['Content-Type'] = 'application/json',
  370. },
  371. gzip = settings.use_gzip,
  372. body = ucl.to_format(template, 'json-compact'),
  373. method = 'put',
  374. no_ssl_verify = settings.no_ssl_verify,
  375. user = settings.user,
  376. password = settings.password,
  377. timeout = settings.timeout,
  378. })
  379. -- create template mappings if not exist
  380. local template_url = connect_prefix .. ip_addr ..'/_template/rspamd'
  381. local function http_template_put_callback(err, code, body, _)
  382. if code ~= 200 then
  383. rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)',
  384. template_url, err, code, body)
  385. enabled = false
  386. else
  387. lua_util.debugm(N, 'pushed rspamd template: %s', body)
  388. push_kibana_template()
  389. end
  390. end
  391. local function http_template_exist_callback(_, code, _, _)
  392. if code ~= 200 then
  393. rspamd_http.request({
  394. url = template_url,
  395. ev_base = ev_base,
  396. config = cfg,
  397. body = elastic_template,
  398. method = 'put',
  399. headers = {
  400. ['Content-Type'] = 'application/json',
  401. },
  402. gzip = settings.use_gzip,
  403. callback = http_template_put_callback,
  404. no_ssl_verify = settings.no_ssl_verify,
  405. user = settings.user,
  406. password = settings.password,
  407. timeout = settings.timeout,
  408. })
  409. else
  410. push_kibana_template()
  411. end
  412. end
  413. rspamd_http.request({
  414. url = template_url,
  415. ev_base = ev_base,
  416. config = cfg,
  417. method = 'head',
  418. callback = http_template_exist_callback,
  419. no_ssl_verify = settings.no_ssl_verify,
  420. user = settings.user,
  421. password = settings.password,
  422. timeout = settings.timeout,
  423. })
  424. end
  425. end
  426. redis_params = rspamd_redis.parse_redis_server('elastic')
  427. if redis_params and opts then
  428. for k,v in pairs(opts) do
  429. settings[k] = v
  430. end
  431. if not settings['server'] and not settings['servers'] then
  432. rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
  433. lua_util.disable_module(N, "config")
  434. else
  435. if settings.use_https then
  436. connect_prefix = 'https://'
  437. end
  438. if settings.ingest_module then
  439. ingest_geoip_type = 'modules'
  440. end
  441. settings.upstream = upstream_list.create(rspamd_config,
  442. settings['server'] or settings['servers'], 9200)
  443. if not settings.upstream then
  444. rspamd_logger.errx('cannot parse elastic address: %s',
  445. settings['server'] or settings['servers'])
  446. lua_util.disable_module(N, "config")
  447. return
  448. end
  449. if not settings['template_file'] then
  450. rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module')
  451. lua_util.disable_module(N, "config")
  452. return
  453. end
  454. elastic_template = read_file(settings['template_file']);
  455. if not elastic_template then
  456. rspamd_logger.infox(rspamd_config, 'elastic unable to read %s, disabling module',
  457. settings['template_file'])
  458. lua_util.disable_module(N, "config")
  459. return
  460. end
  461. rspamd_config:register_symbol({
  462. name = 'ELASTIC_COLLECT',
  463. type = 'idempotent',
  464. callback = elastic_collect,
  465. priority = 10,
  466. flags = 'empty,explicit_disable,ignore_passthrough',
  467. })
  468. rspamd_config:add_on_load(function(cfg, ev_base,worker)
  469. if worker:is_scanner() then
  470. check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements
  471. initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations
  472. end
  473. end)
  474. end
  475. end