You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lua_clickhouse.lua 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. --[[
  2. Copyright (c) 2018, Vsevolod Stakhov <vsevolod@highsecure.ru>
  3. Copyright (c) 2018, Mikhail Galanin <mgalanin@mimecast.com>
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. ]]--
  14. --[[[
  15. -- @module lua_clickhouse
  16. -- This module contains Clickhouse access functions
  17. --]]
  18. local rspamd_logger = require "rspamd_logger"
  19. local rspamd_http = require "rspamd_http"
  20. local lua_util = require "lua_util"
  21. local exports = {}
  22. local N = 'clickhouse'
  23. local default_timeout = 10.0
  24. local function escape_spaces(query)
  25. return query:gsub('%s', '%%20')
  26. end
  27. local function ch_number(a)
  28. if (a+2^52)-2^52 == a then
  29. -- Integer
  30. return tostring(math.floor(a))
  31. end
  32. return tostring(a)
  33. end
  34. local function clickhouse_quote(str)
  35. if str then
  36. return str:gsub('[\'\\\n\t]', {
  37. ['\''] = [[\']],
  38. ['\\'] = [[\\]],
  39. ['\n'] = [[\n]],
  40. ['\t'] = [[\t]],
  41. })
  42. end
  43. return ''
  44. end
  45. -- Converts an array to a string suitable for clickhouse
  46. local function array_to_string(ar)
  47. for i,elt in ipairs(ar) do
  48. if type(elt) == 'string' then
  49. ar[i] = '\'' .. clickhouse_quote(elt) .. '\''
  50. elseif type(elt) == 'number' then
  51. ar[i] = ch_number(elt)
  52. end
  53. end
  54. return table.concat(ar, ',')
  55. end
  56. -- Converts a row into TSV, taking extra care about arrays
  57. local function row_to_tsv(row)
  58. for i,elt in ipairs(row) do
  59. if type(elt) == 'table' then
  60. row[i] = '[' .. array_to_string(elt) .. ']'
  61. elseif type(elt) == 'number' then
  62. row[i] = ch_number(elt)
  63. else
  64. row[i] = clickhouse_quote(elt)
  65. end
  66. end
  67. return table.concat(row, '\t')
  68. end
  69. -- Parses JSONEachRow reply from CH
  70. local function parse_clickhouse_response_json_eachrow(params, data)
  71. local ucl = require "ucl"
  72. if data == nil then
  73. -- clickhouse returned no data (i.e. empty result set): exiting
  74. return {}
  75. end
  76. local function parse_string(s)
  77. local parser = ucl.parser()
  78. local res, err = parser:parse_string(s)
  79. if not res then
  80. rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
  81. return nil
  82. end
  83. return parser:get_object()
  84. end
  85. -- iterate over rows and parse
  86. local ch_rows = lua_util.str_split(data, "\n")
  87. local parsed_rows = {}
  88. for _, plain_row in pairs(ch_rows) do
  89. if plain_row and plain_row:len() > 1 then
  90. local parsed_row = parse_string(plain_row)
  91. if parsed_row then
  92. table.insert(parsed_rows, parsed_row)
  93. end
  94. end
  95. end
  96. return parsed_rows
  97. end
  98. -- Parses JSON reply from CH
  99. local function parse_clickhouse_response_json(params, data)
  100. local ucl = require "ucl"
  101. if data == nil then
  102. -- clickhouse returned no data (i.e. empty result set) considered valid!
  103. return nil, {}
  104. end
  105. local function parse_string(s)
  106. local parser = ucl.parser()
  107. local res, err = parser:parse_string(s)
  108. if not res then
  109. rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
  110. return nil
  111. end
  112. return parser:get_object()
  113. end
  114. local json = parse_string(data)
  115. if not json then
  116. return 'bad json', {}
  117. end
  118. return nil,json
  119. end
  120. -- Helper to generate HTTP closure
  121. local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
  122. local function http_cb(err_message, code, data, _)
  123. if code ~= 200 or err_message then
  124. if not err_message then err_message = data end
  125. local ip_addr = upstream:get_addr():to_string(true)
  126. if fail_cb then
  127. fail_cb(params, err_message, data)
  128. else
  129. rspamd_logger.errx(params.log_obj,
  130. "request failed on clickhouse server %s: %s",
  131. ip_addr, err_message)
  132. end
  133. upstream:fail()
  134. else
  135. upstream:ok()
  136. local rows = parse_clickhouse_response_json_eachrow(params, data)
  137. if rows then
  138. if ok_cb then
  139. ok_cb(params, rows)
  140. else
  141. lua_util.debugm(N, params.log_obj,
  142. "http_select_cb ok: %s, %s, %s, %s", err_message, code,
  143. data:gsub('[\n%s]+', ' '), _)
  144. end
  145. else
  146. if fail_cb then
  147. fail_cb(params, 'failed to parse reply', data)
  148. else
  149. local ip_addr = upstream:get_addr():to_string(true)
  150. rspamd_logger.errx(params.log_obj,
  151. "request failed on clickhouse server %s: %s",
  152. ip_addr, 'failed to parse reply')
  153. end
  154. end
  155. end
  156. end
  157. return http_cb
  158. end
  159. -- Helper to generate HTTP closure
  160. local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
  161. local function http_cb(err_message, code, data, _)
  162. if code ~= 200 or err_message then
  163. if not err_message then err_message = data end
  164. local ip_addr = upstream:get_addr():to_string(true)
  165. if fail_cb then
  166. fail_cb(params, err_message, data)
  167. else
  168. rspamd_logger.errx(params.log_obj,
  169. "request failed on clickhouse server %s: %s",
  170. ip_addr, err_message)
  171. end
  172. upstream:fail()
  173. else
  174. upstream:ok()
  175. if ok_cb then
  176. local err,parsed = parse_clickhouse_response_json(data)
  177. if err then
  178. fail_cb(params, err, data)
  179. else
  180. ok_cb(params, parsed)
  181. end
  182. else
  183. lua_util.debugm(N, params.log_obj,
  184. "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
  185. data:gsub('[\n%s]+', ' '), _)
  186. end
  187. end
  188. end
  189. return http_cb
  190. end
  191. --[[[
  192. -- @function lua_clickhouse.select(upstream, settings, params, query,
  193. ok_cb, fail_cb)
  194. -- Make select request to clickhouse
  195. -- @param {upstream} upstream clickhouse server upstream
  196. -- @param {table} settings global settings table:
  197. -- * use_gsip: use gzip compression
  198. -- * timeout: request timeout
  199. -- * no_ssl_verify: skip SSL verification
  200. -- * user: HTTP user
  201. -- * password: HTTP password
  202. -- @param {params} HTTP request params
  203. -- @param {string} query select query (passed in HTTP body)
  204. -- @param {function} ok_cb callback to be called in case of success
  205. -- @param {function} fail_cb callback to be called in case of some error
  206. -- @return {boolean} whether a connection was successful
  207. -- @example
  208. --
  209. --]]
  210. exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
  211. local http_params = {}
  212. for k,v in pairs(params) do http_params[k] = v end
  213. http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb)
  214. http_params.gzip = settings.use_gzip
  215. http_params.mime_type = 'text/plain'
  216. http_params.timeout = settings.timeout or default_timeout
  217. http_params.no_ssl_verify = settings.no_ssl_verify
  218. http_params.user = settings.user
  219. http_params.password = settings.password
  220. http_params.body = query
  221. http_params.log_obj = params.task or params.config
  222. lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
  223. if not http_params.url then
  224. local connect_prefix = "http://"
  225. if settings.use_https then
  226. connect_prefix = 'https://'
  227. end
  228. local ip_addr = upstream:get_addr():to_string(true)
  229. local database = settings.database or 'default'
  230. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  231. connect_prefix, ip_addr, escape_spaces(database))
  232. end
  233. return rspamd_http.request(http_params)
  234. end
  235. --[[[
  236. -- @function lua_clickhouse.select_sync(upstream, settings, params, query,
  237. ok_cb, fail_cb)
  238. -- Make select request to clickhouse
  239. -- @param {upstream} upstream clickhouse server upstream
  240. -- @param {table} settings global settings table:
  241. -- * use_gsip: use gzip compression
  242. -- * timeout: request timeout
  243. -- * no_ssl_verify: skip SSL verification
  244. -- * user: HTTP user
  245. -- * password: HTTP password
  246. -- @param {params} HTTP request params
  247. -- @param {string} query select query (passed in HTTP body)
  248. -- @param {function} ok_cb callback to be called in case of success
  249. -- @param {function} fail_cb callback to be called in case of some error
  250. -- @return
  251. -- {string} error message if exists
  252. -- nil | {rows} | {http_response}
  253. -- @example
  254. --
  255. --]]
  256. exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_cb)
  257. local http_params = {}
  258. for k,v in pairs(params) do http_params[k] = v end
  259. http_params.gzip = settings.use_gzip
  260. http_params.mime_type = 'text/plain'
  261. http_params.timeout = settings.timeout or default_timeout
  262. http_params.no_ssl_verify = settings.no_ssl_verify
  263. http_params.user = settings.user
  264. http_params.password = settings.password
  265. http_params.body = query
  266. http_params.log_obj = params.task or params.config
  267. lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
  268. if not http_params.url then
  269. local connect_prefix = "http://"
  270. if settings.use_https then
  271. connect_prefix = 'https://'
  272. end
  273. local ip_addr = upstream:get_addr():to_string(true)
  274. local database = settings.database or 'default'
  275. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  276. connect_prefix, ip_addr, escape_spaces(database))
  277. end
  278. local err, response = rspamd_http.request(http_params)
  279. if err then
  280. return err, nil
  281. elseif response.code ~= 200 then
  282. return response.content, response
  283. else
  284. lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response)
  285. local rows = parse_clickhouse_response_json_eachrow(params, response.content)
  286. return nil, rows
  287. end
  288. end
  289. --[[[
  290. -- @function lua_clickhouse.insert(upstream, settings, params, query, rows,
  291. ok_cb, fail_cb)
  292. -- Insert data rows to clickhouse
  293. -- @param {upstream} upstream clickhouse server upstream
  294. -- @param {table} settings global settings table:
  295. -- * use_gsip: use gzip compression
  296. -- * timeout: request timeout
  297. -- * no_ssl_verify: skip SSL verification
  298. -- * user: HTTP user
  299. -- * password: HTTP password
  300. -- @param {params} HTTP request params
  301. -- @param {string} query select query (passed in `query` request element with spaces escaped)
  302. -- @param {table|mixed} rows mix of strings, numbers or tables (for arrays)
  303. -- @param {function} ok_cb callback to be called in case of success
  304. -- @param {function} fail_cb callback to be called in case of some error
  305. -- @return {boolean} whether a connection was successful
  306. -- @example
  307. --
  308. --]]
  309. exports.insert = function (upstream, settings, params, query, rows,
  310. ok_cb, fail_cb)
  311. local fun = require "fun"
  312. local http_params = {}
  313. for k,v in pairs(params) do http_params[k] = v end
  314. http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
  315. http_params.gzip = settings.use_gzip
  316. http_params.mime_type = 'text/plain'
  317. http_params.timeout = settings.timeout or default_timeout
  318. http_params.no_ssl_verify = settings.no_ssl_verify
  319. http_params.user = settings.user
  320. http_params.password = settings.password
  321. http_params.method = 'POST'
  322. http_params.body = {table.concat(fun.totable(fun.map(function(row)
  323. return row_to_tsv(row)
  324. end, rows)), '\n'), '\n'}
  325. http_params.log_obj = params.task or params.config
  326. if not http_params.url then
  327. local connect_prefix = "http://"
  328. if settings.use_https then
  329. connect_prefix = 'https://'
  330. end
  331. local ip_addr = upstream:get_addr():to_string(true)
  332. local database = settings.database or 'default'
  333. http_params.url = string.format('%s%s/?database=%s&query=%s%%20FORMAT%%20TabSeparated',
  334. connect_prefix,
  335. ip_addr,
  336. escape_spaces(database),
  337. escape_spaces(query))
  338. end
  339. return rspamd_http.request(http_params)
  340. end
  341. --[[[
  342. -- @function lua_clickhouse.generic(upstream, settings, params, query,
  343. ok_cb, fail_cb)
  344. -- Make a generic request to Clickhouse (e.g. alter)
  345. -- @param {upstream} upstream clickhouse server upstream
  346. -- @param {table} settings global settings table:
  347. -- * use_gsip: use gzip compression
  348. -- * timeout: request timeout
  349. -- * no_ssl_verify: skip SSL verification
  350. -- * user: HTTP user
  351. -- * password: HTTP password
  352. -- @param {params} HTTP request params
  353. -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
  354. -- @param {function} ok_cb callback to be called in case of success
  355. -- @param {function} fail_cb callback to be called in case of some error
  356. -- @return {boolean} whether a connection was successful
  357. -- @example
  358. --
  359. --]]
  360. exports.generic = function (upstream, settings, params, query,
  361. ok_cb, fail_cb)
  362. local http_params = {}
  363. for k,v in pairs(params) do http_params[k] = v end
  364. http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
  365. http_params.gzip = settings.use_gzip
  366. http_params.mime_type = 'text/plain'
  367. http_params.timeout = settings.timeout or default_timeout
  368. http_params.no_ssl_verify = settings.no_ssl_verify
  369. http_params.user = settings.user
  370. http_params.password = settings.password
  371. http_params.log_obj = params.task or params.config
  372. http_params.body = query
  373. if not http_params.url then
  374. local connect_prefix = "http://"
  375. if settings.use_https then
  376. connect_prefix = 'https://'
  377. end
  378. local ip_addr = upstream:get_addr():to_string(true)
  379. local database = settings.database or 'default'
  380. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  381. connect_prefix, ip_addr, escape_spaces(database))
  382. end
  383. return rspamd_http.request(http_params)
  384. end
  385. --[[[
  386. -- @function lua_clickhouse.generic_sync(upstream, settings, params, query,
  387. ok_cb, fail_cb)
  388. -- Make a generic request to Clickhouse (e.g. alter)
  389. -- @param {upstream} upstream clickhouse server upstream
  390. -- @param {table} settings global settings table:
  391. -- * use_gsip: use gzip compression
  392. -- * timeout: request timeout
  393. -- * no_ssl_verify: skip SSL verification
  394. -- * user: HTTP user
  395. -- * password: HTTP password
  396. -- @param {params} HTTP request params
  397. -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
  398. -- @return {boolean} whether a connection was successful
  399. -- @example
  400. --
  401. --]]
  402. exports.generic_sync = function (upstream, settings, params, query)
  403. local http_params = {}
  404. for k,v in pairs(params) do http_params[k] = v end
  405. http_params.gzip = settings.use_gzip
  406. http_params.mime_type = 'text/plain'
  407. http_params.timeout = settings.timeout or default_timeout
  408. http_params.no_ssl_verify = settings.no_ssl_verify
  409. http_params.user = settings.user
  410. http_params.password = settings.password
  411. http_params.log_obj = params.task or params.config
  412. http_params.body = query
  413. if not http_params.url then
  414. local connect_prefix = "http://"
  415. if settings.use_https then
  416. connect_prefix = 'https://'
  417. end
  418. local ip_addr = upstream:get_addr():to_string(true)
  419. local database = settings.database or 'default'
  420. http_params.url = string.format('%s%s/?database=%s&default_format=JSON',
  421. connect_prefix, ip_addr, escape_spaces(database))
  422. end
  423. local err, response = rspamd_http.request(http_params)
  424. if err then
  425. return err, nil
  426. elseif response.code ~= 200 then
  427. return response.content, response
  428. else
  429. lua_util.debugm(N, http_params.log_obj, "clickhouse generic response: %1", response)
  430. local e,obj = parse_clickhouse_response_json(params, response.content)
  431. if e then
  432. return e,nil
  433. end
  434. return nil, obj
  435. end
  436. end
  437. return exports