You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

elastic.lua 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. --[[
  2. Copyright (c) 2017, Veselin Iordanov
  3. Copyright (c) 2018, Vsevolod Stakhov <vsevolod@highsecure.ru>
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. ]]--
  14. local rspamd_logger = require 'rspamd_logger'
  15. local rspamd_http = require "rspamd_http"
  16. local lua_util = require "lua_util"
  17. local util = require "rspamd_util"
  18. local ucl = require "ucl"
  19. local rspamd_redis = require "lua_redis"
  20. local upstream_list = require "rspamd_upstream_list"
  21. if confighelp then
  22. return
  23. end
  24. local rows = {}
  25. local nrows = 0
  26. local failed_sends = 0
  27. local elastic_template
  28. local redis_params
  29. local N = "elastic"
  30. local E = {}
  31. local connect_prefix = 'http://'
  32. local enabled = true
  33. local settings = {
  34. limit = 500,
  35. index_pattern = 'rspamd-%Y.%m.%d',
  36. template_file = rspamd_paths['SHAREDIR'] .. '/elastic/rspamd_template.json',
  37. kibana_file = rspamd_paths['SHAREDIR'] ..'/elastic/kibana.json',
  38. key_prefix = 'elastic-',
  39. expire = 3600,
  40. timeout = 5.0,
  41. failover = false,
  42. import_kibana = false,
  43. use_https = false,
  44. use_gzip = true,
  45. allow_local = false,
  46. user = nil,
  47. password = nil,
  48. no_ssl_verify = false,
  49. max_fail = 3,
  50. }
  51. local function read_file(path)
  52. local file = io.open(path, "rb")
  53. if not file then return nil end
  54. local content = file:read "*a"
  55. file:close()
  56. return content
  57. end
  58. local function elastic_send_data(task)
  59. local es_index = os.date(settings['index_pattern'])
  60. local tbl = {}
  61. for _,value in pairs(rows) do
  62. table.insert(tbl, '{ "index" : { "_index" : "'..es_index..
  63. '", "_type" : "logs" ,"pipeline": "rspamd-geoip"} }')
  64. table.insert(tbl, ucl.to_format(value, 'json-compact'))
  65. end
  66. table.insert(tbl, '') -- For last \n
  67. local upstream = settings.upstream:get_upstream_round_robin()
  68. local ip_addr = upstream:get_addr():to_string(true)
  69. local push_url = connect_prefix .. ip_addr .. '/'..es_index..'/_bulk'
  70. local bulk_json = table.concat(tbl, "\n")
  71. local err, response = rspamd_http.request({
  72. url = push_url,
  73. headers = {
  74. ['Content-Type'] = 'application/x-ndjson',
  75. },
  76. body = bulk_json,
  77. task = task,
  78. method = 'post',
  79. gzip = settings.use_gzip,
  80. no_ssl_verify = settings.no_ssl_verify,
  81. user = settings.user,
  82. password = settings.password,
  83. timeout = settings.timeout,
  84. })
  85. if err then
  86. rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s; failed attempts: %s/%s",
  87. push_url, err, failed_sends, settings.max_fail)
  88. else
  89. if response.code ~= 200 then
  90. rspamd_logger.infox(task,
  91. "cannot push data to elastic backend (%s): wrong http code %s (%s); failed attempts: %s/%s",
  92. push_url, err, response.code, failed_sends, settings.max_fail)
  93. else
  94. lua_util.debugm(N, task, "successfully sent %s (%s bytes) rows to ES",
  95. nrows, #bulk_json)
  96. return true
  97. end
  98. end
  99. return false
  100. end
  101. local function get_general_metadata(task)
  102. local r = {}
  103. local ip_addr = task:get_ip()
  104. r.webmail = false
  105. if ip_addr and ip_addr:is_valid() then
  106. r.is_local = ip_addr:is_local()
  107. local origin = task:get_header('X-Originating-IP')
  108. if origin then
  109. origin = string.sub(origin, 2, -2)
  110. local rspamd_ip = require "rspamd_ip"
  111. local test = rspamd_ip.from_string(origin)
  112. if test and test:is_valid() then
  113. r.webmail = true
  114. r.ip = origin
  115. else
  116. r.ip = tostring(ip_addr)
  117. end
  118. else
  119. r.ip = tostring(ip_addr)
  120. end
  121. else
  122. r.ip = '127.0.0.1'
  123. end
  124. r.direction = "Inbound"
  125. r.user = task:get_user() or 'unknown'
  126. r.qid = task:get_queue_id() or 'unknown'
  127. r.action = task:get_metric_action('default')
  128. if r.user ~= 'unknown' then
  129. r.direction = "Outbound"
  130. end
  131. local s = task:get_metric_score('default')[1]
  132. r.score = s
  133. local rcpt = task:get_recipients('smtp')
  134. if rcpt then
  135. local l = {}
  136. for _, a in ipairs(rcpt) do
  137. table.insert(l, a['addr'])
  138. end
  139. r.rcpt = l
  140. else
  141. r.rcpt = 'unknown'
  142. end
  143. local from = task:get_from('smtp')
  144. if ((from or E)[1] or E).addr then
  145. r.from = from[1].addr
  146. else
  147. r.from = 'unknown'
  148. end
  149. local syminf = task:get_symbols_all()
  150. r.symbols = syminf
  151. r.asn = {}
  152. local pool = task:get_mempool()
  153. r.asn.country = pool:get_variable("country") or 'unknown'
  154. r.asn.asn = pool:get_variable("asn") or 0
  155. r.asn.ipnet = pool:get_variable("ipnet") or 'unknown'
  156. local function process_header(name)
  157. local hdr = task:get_header_full(name)
  158. if hdr then
  159. local l = {}
  160. for _, h in ipairs(hdr) do
  161. table.insert(l, h.decoded)
  162. end
  163. return l
  164. else
  165. return 'unknown'
  166. end
  167. end
  168. r.header_from = process_header('from')
  169. r.header_to = process_header('to')
  170. r.header_subject = process_header('subject')
  171. r.header_date = process_header('date')
  172. r.message_id = task:get_message_id()
  173. local hname = task:get_hostname() or 'unknown'
  174. r.hostname = hname
  175. return r
  176. end
  177. local function elastic_collect(task)
  178. if not enabled then return end
  179. if task:has_flag('skip') then return end
  180. if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then return end
  181. local row = {['rspamd_meta'] = get_general_metadata(task),
  182. ['@timestamp'] = tostring(util.get_time() * 1000)}
  183. table.insert(rows, row)
  184. nrows = nrows + 1
  185. if nrows > settings['limit'] then
  186. lua_util.debugm(N, task, 'send elastic search rows: %s', nrows)
  187. if elastic_send_data(task) then
  188. nrows = 0
  189. rows = {}
  190. failed_sends = 0;
  191. else
  192. failed_sends = failed_sends + 1
  193. if failed_sends > settings.max_fail then
  194. rspamd_logger.errx(task, 'cannot send %s rows to ES %s times, stop trying',
  195. nrows, failed_sends)
  196. nrows = 0
  197. rows = {}
  198. failed_sends = 0;
  199. end
  200. end
  201. end
  202. end
  203. local opts = rspamd_config:get_all_opt('elastic')
  204. local function check_elastic_server(cfg, ev_base, _)
  205. local upstream = settings.upstream:get_upstream_round_robin()
  206. local ip_addr = upstream:get_addr():to_string(true)
  207. local plugins_url = connect_prefix .. ip_addr .. '/_nodes/plugins'
  208. local function http_callback(err, code, body, _)
  209. if code == 200 then
  210. local parser = ucl.parser()
  211. local res,ucl_err = parser:parse_string(body)
  212. if not res then
  213. rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s',
  214. plugins_url, ucl_err)
  215. enabled = false;
  216. return
  217. end
  218. local obj = parser:get_object()
  219. for node,value in pairs(obj['nodes']) do
  220. local plugin_found = false
  221. for _,plugin in pairs(value['plugins']) do
  222. if plugin['name'] == 'ingest-geoip' then
  223. plugin_found = true
  224. lua_util.debugm(N, "ingest-geoip plugin has been found")
  225. end
  226. end
  227. if not plugin_found then
  228. rspamd_logger.infox(rspamd_config,
  229. 'Unable to find ingest-geoip on %1 node, disabling module', node)
  230. enabled = false
  231. return
  232. end
  233. end
  234. else
  235. rspamd_logger.errx('cannot get plugins from %s: %s(%s) (%s)', plugins_url,
  236. err, code, body)
  237. enabled = false
  238. end
  239. end
  240. rspamd_http.request({
  241. url = plugins_url,
  242. ev_base = ev_base,
  243. config = cfg,
  244. method = 'get',
  245. callback = http_callback,
  246. no_ssl_verify = settings.no_ssl_verify,
  247. user = settings.user,
  248. password = settings.password,
  249. timeout = settings.timeout,
  250. })
  251. end
  252. -- import ingest pipeline and kibana dashboard/visualization
  253. local function initial_setup(cfg, ev_base, worker)
  254. if not worker:is_primary_controller() then return end
  255. local upstream = settings.upstream:get_upstream_round_robin()
  256. local ip_addr = upstream:get_addr():to_string(true)
  257. local function push_kibana_template()
  258. -- add kibana dashboard and visualizations
  259. if settings['import_kibana'] then
  260. local kibana_mappings = read_file(settings['kibana_file'])
  261. if kibana_mappings then
  262. local parser = ucl.parser()
  263. local res,parser_err = parser:parse_string(kibana_mappings)
  264. if not res then
  265. rspamd_logger.infox(rspamd_config, 'kibana template cannot be parsed: %s',
  266. parser_err)
  267. enabled = false
  268. return
  269. end
  270. local obj = parser:get_object()
  271. local tbl = {}
  272. for _,item in ipairs(obj) do
  273. table.insert(tbl, '{ "index" : { "_index" : ".kibana", "_type" : "doc" ,"_id": "'..
  274. item['_type'] .. ':' .. item["_id"]..'"} }')
  275. table.insert(tbl, ucl.to_format(item['_source'], 'json-compact'))
  276. end
  277. table.insert(tbl, '') -- For last \n
  278. local kibana_url = connect_prefix .. ip_addr ..'/.kibana/_bulk'
  279. local function kibana_template_callback(err, code, body, _)
  280. if code ~= 200 then
  281. rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)', kibana_url,
  282. err, code, body)
  283. enabled = false
  284. else
  285. lua_util.debugm(N, 'pushed kibana template: %s', body)
  286. end
  287. end
  288. rspamd_http.request({
  289. url = kibana_url,
  290. ev_base = ev_base,
  291. config = cfg,
  292. headers = {
  293. ['Content-Type'] = 'application/x-ndjson',
  294. },
  295. body = table.concat(tbl, "\n"),
  296. method = 'post',
  297. gzip = settings.use_gzip,
  298. callback = kibana_template_callback,
  299. no_ssl_verify = settings.no_ssl_verify,
  300. user = settings.user,
  301. password = settings.password,
  302. timeout = settings.timeout,
  303. })
  304. else
  305. rspamd_logger.infox(rspamd_config, 'kibana template file %s not found', settings['kibana_file'])
  306. end
  307. end
  308. end
  309. if enabled then
  310. -- create ingest pipeline
  311. local geoip_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip'
  312. local function geoip_cb(err, code, body, _)
  313. if code ~= 200 then
  314. rspamd_logger.errx('cannot get data from %s: %s(%s) (%s)',
  315. geoip_url, err, code, body)
  316. enabled = false
  317. end
  318. end
  319. local template = {
  320. description = "Add geoip info for rspamd",
  321. processors = {
  322. {
  323. geoip = {
  324. field = "rspamd_meta.ip",
  325. target_field = "rspamd_meta.geoip"
  326. }
  327. }
  328. }
  329. }
  330. rspamd_http.request({
  331. url = geoip_url,
  332. ev_base = ev_base,
  333. config = cfg,
  334. callback = geoip_cb,
  335. headers = {
  336. ['Content-Type'] = 'application/json',
  337. },
  338. gzip = settings.use_gzip,
  339. body = ucl.to_format(template, 'json-compact'),
  340. method = 'put',
  341. no_ssl_verify = settings.no_ssl_verify,
  342. user = settings.user,
  343. password = settings.password,
  344. timeout = settings.timeout,
  345. })
  346. -- create template mappings if not exist
  347. local template_url = connect_prefix .. ip_addr ..'/_template/rspamd'
  348. local function http_template_put_callback(err, code, body, _)
  349. if code ~= 200 then
  350. rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)',
  351. template_url, err, code, body)
  352. enabled = false
  353. else
  354. lua_util.debugm(N, 'pushed rspamd template: %s', body)
  355. push_kibana_template()
  356. end
  357. end
  358. local function http_template_exist_callback(_, code, _, _)
  359. if code ~= 200 then
  360. rspamd_http.request({
  361. url = template_url,
  362. ev_base = ev_base,
  363. config = cfg,
  364. body = elastic_template,
  365. method = 'put',
  366. headers = {
  367. ['Content-Type'] = 'application/json',
  368. },
  369. gzip = settings.use_gzip,
  370. callback = http_template_put_callback,
  371. no_ssl_verify = settings.no_ssl_verify,
  372. user = settings.user,
  373. password = settings.password,
  374. timeout = settings.timeout,
  375. })
  376. else
  377. push_kibana_template()
  378. end
  379. end
  380. rspamd_http.request({
  381. url = template_url,
  382. ev_base = ev_base,
  383. config = cfg,
  384. method = 'head',
  385. callback = http_template_exist_callback,
  386. no_ssl_verify = settings.no_ssl_verify,
  387. user = settings.user,
  388. password = settings.password,
  389. timeout = settings.timeout,
  390. })
  391. end
  392. end
  393. redis_params = rspamd_redis.parse_redis_server('elastic')
  394. if redis_params and opts then
  395. for k,v in pairs(opts) do
  396. settings[k] = v
  397. end
  398. if not settings['server'] and not settings['servers'] then
  399. rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
  400. lua_util.disable_module(N, "config")
  401. else
  402. if settings.use_https then
  403. connect_prefix = 'https://'
  404. end
  405. settings.upstream = upstream_list.create(rspamd_config,
  406. settings['server'] or settings['servers'], 9200)
  407. if not settings.upstream then
  408. rspamd_logger.errx('cannot parse elastic address: %s',
  409. settings['server'] or settings['servers'])
  410. lua_util.disable_module(N, "config")
  411. return
  412. end
  413. if not settings['template_file'] then
  414. rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module')
  415. lua_util.disable_module(N, "config")
  416. return
  417. end
  418. elastic_template = read_file(settings['template_file']);
  419. if not elastic_template then
  420. rspamd_logger.infox(rspamd_config, 'elastic unable to read %s, disabling module',
  421. settings['template_file'])
  422. lua_util.disable_module(N, "config")
  423. return
  424. end
  425. rspamd_config:register_symbol({
  426. name = 'ELASTIC_COLLECT',
  427. type = 'idempotent',
  428. callback = elastic_collect,
  429. priority = 10,
  430. flags = 'empty',
  431. })
  432. rspamd_config:add_on_load(function(cfg, ev_base,worker)
  433. if worker:is_scanner() then
  434. check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements
  435. initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations
  436. end
  437. end)
  438. end
  439. end