You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lua_clickhouse.lua 16KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. --[[
  2. Copyright (c) 2022, Vsevolod Stakhov <vsevolod@rspamd.com>
  3. Copyright (c) 2018, Mikhail Galanin <mgalanin@mimecast.com>
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. ]]--
  14. --[[[
  15. -- @module lua_clickhouse
  16. -- This module contains Clickhouse access functions
  17. --]]
  18. local rspamd_logger = require "rspamd_logger"
  19. local rspamd_http = require "rspamd_http"
  20. local lua_util = require "lua_util"
  21. local rspamd_text = require "rspamd_text"
  22. local exports = {}
  23. local N = 'clickhouse'
  24. local default_timeout = 10.0
  25. local function escape_spaces(query)
  26. return query:gsub('%s', '%%20')
  27. end
  28. local function ch_number(a)
  29. if (a + 2 ^ 52) - 2 ^ 52 == a then
  30. -- Integer
  31. return tostring(math.floor(a))
  32. end
  33. return tostring(a)
  34. end
  35. local function clickhouse_quote(str)
  36. if str then
  37. return str:gsub('[\'\\\n\t\r]', {
  38. ['\''] = [[\']],
  39. ['\\'] = [[\\]],
  40. ['\n'] = [[\n]],
  41. ['\t'] = [[\t]],
  42. ['\r'] = [[\r]],
  43. })
  44. end
  45. return ''
  46. end
  47. -- Converts an array to a string suitable for clickhouse
  48. local function array_to_string(ar)
  49. for i, elt in ipairs(ar) do
  50. local t = type(elt)
  51. if t == 'string' then
  52. ar[i] = string.format('\'%s\'', clickhouse_quote(elt))
  53. elseif t == 'userdata' then
  54. ar[i] = string.format('\'%s\'', clickhouse_quote(tostring(elt)))
  55. elseif t == 'number' then
  56. ar[i] = ch_number(elt)
  57. end
  58. end
  59. return table.concat(ar, ',')
  60. end
  61. -- Converts a row into TSV, taking extra care about arrays
  62. local function row_to_tsv(row)
  63. for i, elt in ipairs(row) do
  64. local t = type(elt)
  65. if t == 'table' then
  66. row[i] = '[' .. array_to_string(elt) .. ']'
  67. elseif t == 'number' then
  68. row[i] = ch_number(elt)
  69. elseif t == 'userdata' then
  70. row[i] = clickhouse_quote(tostring(elt))
  71. else
  72. row[i] = clickhouse_quote(elt)
  73. end
  74. end
  75. return rspamd_text.fromtable(row, '\t')
  76. end
  77. exports.row_to_tsv = row_to_tsv
  78. -- Parses JSONEachRow reply from CH
  79. local function parse_clickhouse_response_json_eachrow(params, data, row_cb)
  80. local ucl = require "ucl"
  81. if data == nil then
  82. -- clickhouse returned no data (i.e. empty result set): exiting
  83. return {}
  84. end
  85. local function parse_string(s)
  86. local parser = ucl.parser()
  87. local res, err
  88. if type(s) == 'string' then
  89. res, err = parser:parse_string(s)
  90. else
  91. res, err = parser:parse_text(s)
  92. end
  93. if not res then
  94. rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
  95. return nil
  96. end
  97. return parser:get_object()
  98. end
  99. -- iterate over rows and parse
  100. local parsed_rows = {}
  101. for plain_row in data:lines() do
  102. if plain_row and #plain_row > 1 then
  103. local parsed_row = parse_string(plain_row)
  104. if parsed_row then
  105. if row_cb then
  106. row_cb(parsed_row)
  107. else
  108. table.insert(parsed_rows, parsed_row)
  109. end
  110. end
  111. end
  112. end
  113. return parsed_rows
  114. end
  115. -- Parses JSON reply from CH
  116. local function parse_clickhouse_response_json(params, data)
  117. local ucl = require "ucl"
  118. if data == nil then
  119. -- clickhouse returned no data (i.e. empty result set) considered valid!
  120. return nil, {}
  121. end
  122. local function parse_string(s)
  123. local parser = ucl.parser()
  124. local res, err
  125. if type(s) == 'string' then
  126. res, err = parser:parse_string(s)
  127. else
  128. res, err = parser:parse_text(s)
  129. end
  130. if not res then
  131. rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
  132. return nil
  133. end
  134. return parser:get_object()
  135. end
  136. local json = parse_string(data)
  137. if not json then
  138. return 'bad json', {}
  139. end
  140. return nil, json
  141. end
  142. -- Helper to generate HTTP closure
  143. local function mk_http_select_cb(upstream, params, ok_cb, fail_cb, row_cb)
  144. local function http_cb(err_message, code, data, _)
  145. if code ~= 200 or err_message then
  146. if not err_message then
  147. err_message = data
  148. end
  149. local ip_addr = upstream:get_addr():to_string(true)
  150. if fail_cb then
  151. fail_cb(params, err_message, data)
  152. else
  153. rspamd_logger.errx(params.log_obj,
  154. "request failed on clickhouse server %s: %s",
  155. ip_addr, err_message)
  156. end
  157. upstream:fail()
  158. else
  159. upstream:ok()
  160. local rows = parse_clickhouse_response_json_eachrow(params, data, row_cb)
  161. if rows then
  162. if ok_cb then
  163. ok_cb(params, rows)
  164. else
  165. lua_util.debugm(N, params.log_obj,
  166. "http_select_cb ok: %s, %s, %s, %s", err_message, code,
  167. data:gsub('[\n%s]+', ' '), _)
  168. end
  169. else
  170. if fail_cb then
  171. fail_cb(params, 'failed to parse reply', data)
  172. else
  173. local ip_addr = upstream:get_addr():to_string(true)
  174. rspamd_logger.errx(params.log_obj,
  175. "request failed on clickhouse server %s: %s",
  176. ip_addr, 'failed to parse reply')
  177. end
  178. end
  179. end
  180. end
  181. return http_cb
  182. end
  183. -- Helper to generate HTTP closure
  184. local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
  185. local function http_cb(err_message, code, data, _)
  186. if code ~= 200 or err_message then
  187. if not err_message then
  188. err_message = data
  189. end
  190. local ip_addr = upstream:get_addr():to_string(true)
  191. if fail_cb then
  192. fail_cb(params, err_message, data)
  193. else
  194. rspamd_logger.errx(params.log_obj,
  195. "request failed on clickhouse server %s: %s",
  196. ip_addr, err_message)
  197. end
  198. upstream:fail()
  199. else
  200. upstream:ok()
  201. if ok_cb then
  202. local err, parsed = parse_clickhouse_response_json(data)
  203. if err then
  204. fail_cb(params, err, data)
  205. else
  206. ok_cb(params, parsed)
  207. end
  208. else
  209. lua_util.debugm(N, params.log_obj,
  210. "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
  211. data:gsub('[\n%s]+', ' '), _)
  212. end
  213. end
  214. end
  215. return http_cb
  216. end
  217. --[[[
  218. -- @function lua_clickhouse.select(upstream, settings, params, query,
  219. ok_cb, fail_cb)
  220. -- Make select request to clickhouse
  221. -- @param {upstream} upstream clickhouse server upstream
  222. -- @param {table} settings global settings table:
  223. -- * use_gsip: use gzip compression
  224. -- * timeout: request timeout
  225. -- * no_ssl_verify: skip SSL verification
  226. -- * user: HTTP user
  227. -- * password: HTTP password
  228. -- @param {params} HTTP request params
  229. -- @param {string} query select query (passed in HTTP body)
  230. -- @param {function} ok_cb callback to be called in case of success
  231. -- @param {function} fail_cb callback to be called in case of some error
  232. -- @param {function} row_cb optional callback to be called on each parsed data row (instead of table insertion)
  233. -- @return {boolean} whether a connection was successful
  234. -- @example
  235. --
  236. --]]
  237. exports.select = function(upstream, settings, params, query, ok_cb, fail_cb, row_cb)
  238. local http_params = {}
  239. for k, v in pairs(params) do
  240. http_params[k] = v
  241. end
  242. http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb, row_cb)
  243. http_params.gzip = settings.use_gzip
  244. http_params.mime_type = 'text/plain'
  245. http_params.timeout = settings.timeout or default_timeout
  246. http_params.no_ssl_verify = settings.no_ssl_verify
  247. http_params.user = settings.user
  248. http_params.password = settings.password
  249. http_params.body = query
  250. http_params.log_obj = params.task or params.config
  251. http_params.opaque_body = true
  252. lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
  253. if not http_params.url then
  254. local connect_prefix = "http://"
  255. if settings.use_https then
  256. connect_prefix = 'https://'
  257. end
  258. local ip_addr = upstream:get_addr():to_string(true)
  259. local database = settings.database or 'default'
  260. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  261. connect_prefix, ip_addr, escape_spaces(database))
  262. end
  263. return rspamd_http.request(http_params)
  264. end
  265. --[[[
  266. -- @function lua_clickhouse.select_sync(upstream, settings, params, query,
  267. ok_cb, fail_cb, row_cb)
  268. -- Make select request to clickhouse
  269. -- @param {upstream} upstream clickhouse server upstream
  270. -- @param {table} settings global settings table:
  271. -- * use_gsip: use gzip compression
  272. -- * timeout: request timeout
  273. -- * no_ssl_verify: skip SSL verification
  274. -- * user: HTTP user
  275. -- * password: HTTP password
  276. -- @param {params} HTTP request params
  277. -- @param {string} query select query (passed in HTTP body)
  278. -- @param {function} ok_cb callback to be called in case of success
  279. -- @param {function} fail_cb callback to be called in case of some error
  280. -- @param {function} row_cb optional callback to be called on each parsed data row (instead of table insertion)
  281. -- @return
  282. -- {string} error message if exists
  283. -- nil | {rows} | {http_response}
  284. -- @example
  285. --
  286. --]]
  287. exports.select_sync = function(upstream, settings, params, query, row_cb)
  288. local http_params = {}
  289. for k, v in pairs(params) do
  290. http_params[k] = v
  291. end
  292. http_params.gzip = settings.use_gzip
  293. http_params.mime_type = 'text/plain'
  294. http_params.timeout = settings.timeout or default_timeout
  295. http_params.no_ssl_verify = settings.no_ssl_verify
  296. http_params.user = settings.user
  297. http_params.password = settings.password
  298. http_params.body = query
  299. http_params.log_obj = params.task or params.config
  300. http_params.opaque_body = true
  301. lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
  302. if not http_params.url then
  303. local connect_prefix = "http://"
  304. if settings.use_https then
  305. connect_prefix = 'https://'
  306. end
  307. local ip_addr = upstream:get_addr():to_string(true)
  308. local database = settings.database or 'default'
  309. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  310. connect_prefix, ip_addr, escape_spaces(database))
  311. end
  312. local err, response = rspamd_http.request(http_params)
  313. if err then
  314. return err, nil
  315. elseif response.code ~= 200 then
  316. return response.content, response
  317. else
  318. lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response)
  319. local rows = parse_clickhouse_response_json_eachrow(params, response.content, row_cb)
  320. return nil, rows
  321. end
  322. end
  323. --[[[
  324. -- @function lua_clickhouse.insert(upstream, settings, params, query, rows,
  325. ok_cb, fail_cb)
  326. -- Insert data rows to clickhouse
  327. -- @param {upstream} upstream clickhouse server upstream
  328. -- @param {table} settings global settings table:
  329. -- * use_gsip: use gzip compression
  330. -- * timeout: request timeout
  331. -- * no_ssl_verify: skip SSL verification
  332. -- * user: HTTP user
  333. -- * password: HTTP password
  334. -- @param {params} HTTP request params
  335. -- @param {string} query select query (passed in `query` request element with spaces escaped)
  336. -- @param {table|mixed} rows mix of strings, numbers or tables (for arrays)
  337. -- @param {function} ok_cb callback to be called in case of success
  338. -- @param {function} fail_cb callback to be called in case of some error
  339. -- @return {boolean} whether a connection was successful
  340. -- @example
  341. --
  342. --]]
  343. exports.insert = function(upstream, settings, params, query, rows,
  344. ok_cb, fail_cb)
  345. local http_params = {}
  346. for k, v in pairs(params) do
  347. http_params[k] = v
  348. end
  349. http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
  350. http_params.gzip = settings.use_gzip
  351. http_params.mime_type = 'text/plain'
  352. http_params.timeout = settings.timeout or default_timeout
  353. http_params.no_ssl_verify = settings.no_ssl_verify
  354. http_params.user = settings.user
  355. http_params.password = settings.password
  356. http_params.method = 'POST'
  357. http_params.body = { rspamd_text.fromtable(rows, '\n'), '\n' }
  358. http_params.log_obj = params.task or params.config
  359. if not http_params.url then
  360. local connect_prefix = "http://"
  361. if settings.use_https then
  362. connect_prefix = 'https://'
  363. end
  364. local ip_addr = upstream:get_addr():to_string(true)
  365. local database = settings.database or 'default'
  366. http_params.url = string.format('%s%s/?database=%s&query=%s%%20FORMAT%%20TabSeparated',
  367. connect_prefix,
  368. ip_addr,
  369. escape_spaces(database),
  370. escape_spaces(query))
  371. end
  372. return rspamd_http.request(http_params)
  373. end
  374. --[[[
  375. -- @function lua_clickhouse.generic(upstream, settings, params, query,
  376. ok_cb, fail_cb)
  377. -- Make a generic request to Clickhouse (e.g. alter)
  378. -- @param {upstream} upstream clickhouse server upstream
  379. -- @param {table} settings global settings table:
  380. -- * use_gsip: use gzip compression
  381. -- * timeout: request timeout
  382. -- * no_ssl_verify: skip SSL verification
  383. -- * user: HTTP user
  384. -- * password: HTTP password
  385. -- @param {params} HTTP request params
  386. -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
  387. -- @param {function} ok_cb callback to be called in case of success
  388. -- @param {function} fail_cb callback to be called in case of some error
  389. -- @return {boolean} whether a connection was successful
  390. -- @example
  391. --
  392. --]]
  393. exports.generic = function(upstream, settings, params, query,
  394. ok_cb, fail_cb)
  395. local http_params = {}
  396. for k, v in pairs(params) do
  397. http_params[k] = v
  398. end
  399. http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
  400. http_params.gzip = settings.use_gzip
  401. http_params.mime_type = 'text/plain'
  402. http_params.timeout = settings.timeout or default_timeout
  403. http_params.no_ssl_verify = settings.no_ssl_verify
  404. http_params.user = settings.user
  405. http_params.password = settings.password
  406. http_params.log_obj = params.task or params.config
  407. http_params.body = query
  408. if not http_params.url then
  409. local connect_prefix = "http://"
  410. if settings.use_https then
  411. connect_prefix = 'https://'
  412. end
  413. local ip_addr = upstream:get_addr():to_string(true)
  414. local database = settings.database or 'default'
  415. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  416. connect_prefix, ip_addr, escape_spaces(database))
  417. end
  418. return rspamd_http.request(http_params)
  419. end
  420. --[[[
  421. -- @function lua_clickhouse.generic_sync(upstream, settings, params, query,
  422. ok_cb, fail_cb)
  423. -- Make a generic request to Clickhouse (e.g. alter)
  424. -- @param {upstream} upstream clickhouse server upstream
  425. -- @param {table} settings global settings table:
  426. -- * use_gsip: use gzip compression
  427. -- * timeout: request timeout
  428. -- * no_ssl_verify: skip SSL verification
  429. -- * user: HTTP user
  430. -- * password: HTTP password
  431. -- @param {params} HTTP request params
  432. -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
  433. -- @return {boolean} whether a connection was successful
  434. -- @example
  435. --
  436. --]]
  437. exports.generic_sync = function(upstream, settings, params, query)
  438. local http_params = {}
  439. for k, v in pairs(params) do
  440. http_params[k] = v
  441. end
  442. http_params.gzip = settings.use_gzip
  443. http_params.mime_type = 'text/plain'
  444. http_params.timeout = settings.timeout or default_timeout
  445. http_params.no_ssl_verify = settings.no_ssl_verify
  446. http_params.user = settings.user
  447. http_params.password = settings.password
  448. http_params.log_obj = params.task or params.config
  449. http_params.body = query
  450. if not http_params.url then
  451. local connect_prefix = "http://"
  452. if settings.use_https then
  453. connect_prefix = 'https://'
  454. end
  455. local ip_addr = upstream:get_addr():to_string(true)
  456. local database = settings.database or 'default'
  457. http_params.url = string.format('%s%s/?database=%s&default_format=JSON',
  458. connect_prefix, ip_addr, escape_spaces(database))
  459. end
  460. local err, response = rspamd_http.request(http_params)
  461. if err then
  462. return err, nil
  463. elseif response.code ~= 200 then
  464. return response.content, response
  465. else
  466. lua_util.debugm(N, http_params.log_obj, "clickhouse generic response: %1", response)
  467. local e, obj = parse_clickhouse_response_json(params, response.content)
  468. if e then
  469. return e, nil
  470. end
  471. return nil, obj
  472. end
  473. end
  474. return exports