You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lua_clickhouse.lua 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. --[[
  2. Copyright (c) 2018, Vsevolod Stakhov <vsevolod@highsecure.ru>
  3. Copyright (c) 2018, Mikhail Galanin <mgalanin@mimecast.com>
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. ]]--
  14. --[[[
  15. -- @module lua_clickhouse
  16. -- This module contains Clickhouse access functions
  17. --]]
  18. local rspamd_logger = require "rspamd_logger"
  19. local rspamd_http = require "rspamd_http"
  20. local lua_util = require "lua_util"
  21. local exports = {}
  22. local N = 'clickhouse'
  23. local default_timeout = 10.0
  24. local function escape_spaces(query)
  25. return query:gsub('%s', '%%20')
  26. end
  27. local function ch_number(a)
  28. if (a+2^52)-2^52 == a then
  29. -- Integer
  30. return tostring(math.floor(a))
  31. end
  32. return tostring(a)
  33. end
  34. local function clickhouse_quote(str)
  35. if str then
  36. return str:gsub('[\'\\]', '\\%1'):lower()
  37. end
  38. return ''
  39. end
  40. -- Converts an array to a string suitable for clickhouse
  41. local function array_to_string(ar)
  42. for i,elt in ipairs(ar) do
  43. if type(elt) == 'string' then
  44. ar[i] = '\'' .. clickhouse_quote(elt) .. '\''
  45. elseif type(elt) == 'number' then
  46. ar[i] = ch_number(elt)
  47. end
  48. end
  49. return table.concat(ar, ',')
  50. end
  51. -- Converts a row into TSV, taking extra care about arrays
  52. local function row_to_tsv(row)
  53. for i,elt in ipairs(row) do
  54. if type(elt) == 'table' then
  55. row[i] = '[' .. array_to_string(elt) .. ']'
  56. elseif type(elt) == 'number' then
  57. row[i] = ch_number(elt)
  58. end
  59. end
  60. return table.concat(row, '\t')
  61. end
  62. -- Parses JSONEachRow reply from CH
  63. local function parse_clickhouse_response_json_eachrow(params, data)
  64. local ucl = require "ucl"
  65. if data == nil then
  66. -- clickhouse returned no data (i.e. empty result set): exiting
  67. return {}
  68. end
  69. local function parse_string(s)
  70. local parser = ucl.parser()
  71. local res, err = parser:parse_string(s)
  72. if not res then
  73. rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
  74. return nil
  75. end
  76. return parser:get_object()
  77. end
  78. -- iterate over rows and parse
  79. local ch_rows = lua_util.str_split(data, "\n")
  80. local parsed_rows = {}
  81. for _, plain_row in pairs(ch_rows) do
  82. if plain_row and plain_row:len() > 1 then
  83. local parsed_row = parse_string(plain_row)
  84. if parsed_row then
  85. table.insert(parsed_rows, parsed_row)
  86. end
  87. end
  88. end
  89. return parsed_rows
  90. end
  91. -- Parses JSON reply from CH
  92. local function parse_clickhouse_response_json(params, data)
  93. local ucl = require "ucl"
  94. if data == nil then
  95. -- clickhouse returned no data (i.e. empty result set) considered valid!
  96. return nil, {}
  97. end
  98. local function parse_string(s)
  99. local parser = ucl.parser()
  100. local res, err = parser:parse_string(s)
  101. if not res then
  102. rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
  103. return nil
  104. end
  105. return parser:get_object()
  106. end
  107. local json = parse_string(data)
  108. if not json then
  109. return 'bad json', {}
  110. end
  111. return nil,json
  112. end
  113. -- Helper to generate HTTP closure
  114. local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
  115. local function http_cb(err_message, code, data, _)
  116. if code ~= 200 or err_message then
  117. if not err_message then err_message = data end
  118. local ip_addr = upstream:get_addr():to_string(true)
  119. if fail_cb then
  120. fail_cb(params, err_message, data)
  121. else
  122. rspamd_logger.errx(params.log_obj,
  123. "request failed on clickhouse server %s: %s",
  124. ip_addr, err_message)
  125. end
  126. upstream:fail()
  127. else
  128. upstream:ok()
  129. local rows = parse_clickhouse_response_json_eachrow(params, data)
  130. if rows then
  131. if ok_cb then
  132. ok_cb(params, rows)
  133. else
  134. lua_util.debugm(N, params.log_obj,
  135. "http_select_cb ok: %s, %s, %s, %s", err_message, code,
  136. data:gsub('[\n%s]+', ' '), _)
  137. end
  138. else
  139. if fail_cb then
  140. fail_cb(params, 'failed to parse reply', data)
  141. else
  142. local ip_addr = upstream:get_addr():to_string(true)
  143. rspamd_logger.errx(params.log_obj,
  144. "request failed on clickhouse server %s: %s",
  145. ip_addr, 'failed to parse reply')
  146. end
  147. end
  148. end
  149. end
  150. return http_cb
  151. end
  152. -- Helper to generate HTTP closure
  153. local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
  154. local function http_cb(err_message, code, data, _)
  155. if code ~= 200 or err_message then
  156. if not err_message then err_message = data end
  157. local ip_addr = upstream:get_addr():to_string(true)
  158. if fail_cb then
  159. fail_cb(params, err_message, data)
  160. else
  161. rspamd_logger.errx(params.log_obj,
  162. "request failed on clickhouse server %s: %s",
  163. ip_addr, err_message)
  164. end
  165. upstream:fail()
  166. else
  167. upstream:ok()
  168. if ok_cb then
  169. local err,parsed = parse_clickhouse_response_json(data)
  170. if err then
  171. fail_cb(params, err, data)
  172. else
  173. ok_cb(params, parsed)
  174. end
  175. else
  176. lua_util.debugm(N, params.log_obj,
  177. "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
  178. data:gsub('[\n%s]+', ' '), _)
  179. end
  180. end
  181. end
  182. return http_cb
  183. end
  184. --[[[
  185. -- @function lua_clickhouse.select(upstream, settings, params, query,
  186. ok_cb, fail_cb)
  187. -- Make select request to clickhouse
  188. -- @param {upstream} upstream clickhouse server upstream
  189. -- @param {table} settings global settings table:
  190. -- * use_gsip: use gzip compression
  191. -- * timeout: request timeout
  192. -- * no_ssl_verify: skip SSL verification
  193. -- * user: HTTP user
  194. -- * password: HTTP password
  195. -- @param {params} HTTP request params
  196. -- @param {string} query select query (passed in HTTP body)
  197. -- @param {function} ok_cb callback to be called in case of success
  198. -- @param {function} fail_cb callback to be called in case of some error
  199. -- @return {boolean} whether a connection was successful
  200. -- @example
  201. --
  202. --]]
  203. exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
  204. local http_params = {}
  205. for k,v in pairs(params) do http_params[k] = v end
  206. http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb)
  207. http_params.gzip = settings.use_gzip
  208. http_params.mime_type = 'text/plain'
  209. http_params.timeout = settings.timeout or default_timeout
  210. http_params.no_ssl_verify = settings.no_ssl_verify
  211. http_params.user = settings.user
  212. http_params.password = settings.password
  213. http_params.body = query
  214. http_params.log_obj = params.task or params.config
  215. lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
  216. if not http_params.url then
  217. local connect_prefix = "http://"
  218. if settings.use_https then
  219. connect_prefix = 'https://'
  220. end
  221. local ip_addr = upstream:get_addr():to_string(true)
  222. local database = settings.database or 'default'
  223. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  224. connect_prefix, ip_addr, escape_spaces(database))
  225. end
  226. return rspamd_http.request(http_params)
  227. end
  228. --[[[
  229. -- @function lua_clickhouse.select_sync(upstream, settings, params, query,
  230. ok_cb, fail_cb)
  231. -- Make select request to clickhouse
  232. -- @param {upstream} upstream clickhouse server upstream
  233. -- @param {table} settings global settings table:
  234. -- * use_gsip: use gzip compression
  235. -- * timeout: request timeout
  236. -- * no_ssl_verify: skip SSL verification
  237. -- * user: HTTP user
  238. -- * password: HTTP password
  239. -- @param {params} HTTP request params
  240. -- @param {string} query select query (passed in HTTP body)
  241. -- @param {function} ok_cb callback to be called in case of success
  242. -- @param {function} fail_cb callback to be called in case of some error
  243. -- @return
  244. -- {string} error message if exists
  245. -- nil | {rows} | {http_response}
  246. -- @example
  247. --
  248. --]]
  249. exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_cb)
  250. local http_params = {}
  251. for k,v in pairs(params) do http_params[k] = v end
  252. http_params.gzip = settings.use_gzip
  253. http_params.mime_type = 'text/plain'
  254. http_params.timeout = settings.timeout or default_timeout
  255. http_params.no_ssl_verify = settings.no_ssl_verify
  256. http_params.user = settings.user
  257. http_params.password = settings.password
  258. http_params.body = query
  259. http_params.log_obj = params.task or params.config
  260. lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
  261. if not http_params.url then
  262. local connect_prefix = "http://"
  263. if settings.use_https then
  264. connect_prefix = 'https://'
  265. end
  266. local ip_addr = upstream:get_addr():to_string(true)
  267. local database = settings.database or 'default'
  268. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  269. connect_prefix, ip_addr, escape_spaces(database))
  270. end
  271. local err, response = rspamd_http.request(http_params)
  272. if err then
  273. return err, nil
  274. elseif response.code ~= 200 then
  275. return response.content, response
  276. else
  277. lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response)
  278. local rows = parse_clickhouse_response_json_eachrow(params, response.content)
  279. return nil, rows
  280. end
  281. end
  282. --[[[
  283. -- @function lua_clickhouse.insert(upstream, settings, params, query, rows,
  284. ok_cb, fail_cb)
  285. -- Insert data rows to clickhouse
  286. -- @param {upstream} upstream clickhouse server upstream
  287. -- @param {table} settings global settings table:
  288. -- * use_gsip: use gzip compression
  289. -- * timeout: request timeout
  290. -- * no_ssl_verify: skip SSL verification
  291. -- * user: HTTP user
  292. -- * password: HTTP password
  293. -- @param {params} HTTP request params
  294. -- @param {string} query select query (passed in `query` request element with spaces escaped)
  295. -- @param {table|mixed} rows mix of strings, numbers or tables (for arrays)
  296. -- @param {function} ok_cb callback to be called in case of success
  297. -- @param {function} fail_cb callback to be called in case of some error
  298. -- @return {boolean} whether a connection was successful
  299. -- @example
  300. --
  301. --]]
  302. exports.insert = function (upstream, settings, params, query, rows,
  303. ok_cb, fail_cb)
  304. local fun = require "fun"
  305. local http_params = {}
  306. for k,v in pairs(params) do http_params[k] = v end
  307. http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
  308. http_params.gzip = settings.use_gzip
  309. http_params.mime_type = 'text/plain'
  310. http_params.timeout = settings.timeout or default_timeout
  311. http_params.no_ssl_verify = settings.no_ssl_verify
  312. http_params.user = settings.user
  313. http_params.password = settings.password
  314. http_params.method = 'POST'
  315. http_params.body = {table.concat(fun.totable(fun.map(function(row)
  316. return row_to_tsv(row)
  317. end, rows)), '\n'), '\n'}
  318. http_params.log_obj = params.task or params.config
  319. if not http_params.url then
  320. local connect_prefix = "http://"
  321. if settings.use_https then
  322. connect_prefix = 'https://'
  323. end
  324. local ip_addr = upstream:get_addr():to_string(true)
  325. local database = settings.database or 'default'
  326. http_params.url = string.format('%s%s/?database=%s&query=%s%%20FORMAT%%20TabSeparated',
  327. connect_prefix,
  328. ip_addr,
  329. escape_spaces(database),
  330. escape_spaces(query))
  331. end
  332. return rspamd_http.request(http_params)
  333. end
  334. --[[[
  335. -- @function lua_clickhouse.generic(upstream, settings, params, query,
  336. ok_cb, fail_cb)
  337. -- Make a generic request to Clickhouse (e.g. alter)
  338. -- @param {upstream} upstream clickhouse server upstream
  339. -- @param {table} settings global settings table:
  340. -- * use_gsip: use gzip compression
  341. -- * timeout: request timeout
  342. -- * no_ssl_verify: skip SSL verification
  343. -- * user: HTTP user
  344. -- * password: HTTP password
  345. -- @param {params} HTTP request params
  346. -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
  347. -- @param {function} ok_cb callback to be called in case of success
  348. -- @param {function} fail_cb callback to be called in case of some error
  349. -- @return {boolean} whether a connection was successful
  350. -- @example
  351. --
  352. --]]
  353. exports.generic = function (upstream, settings, params, query,
  354. ok_cb, fail_cb)
  355. local http_params = {}
  356. for k,v in pairs(params) do http_params[k] = v end
  357. http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
  358. http_params.gzip = settings.use_gzip
  359. http_params.mime_type = 'text/plain'
  360. http_params.timeout = settings.timeout or default_timeout
  361. http_params.no_ssl_verify = settings.no_ssl_verify
  362. http_params.user = settings.user
  363. http_params.password = settings.password
  364. http_params.log_obj = params.task or params.config
  365. http_params.body = query
  366. if not http_params.url then
  367. local connect_prefix = "http://"
  368. if settings.use_https then
  369. connect_prefix = 'https://'
  370. end
  371. local ip_addr = upstream:get_addr():to_string(true)
  372. local database = settings.database or 'default'
  373. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  374. connect_prefix, ip_addr, escape_spaces(database))
  375. end
  376. return rspamd_http.request(http_params)
  377. end
  378. --[[[
  379. -- @function lua_clickhouse.generic_sync(upstream, settings, params, query,
  380. ok_cb, fail_cb)
  381. -- Make a generic request to Clickhouse (e.g. alter)
  382. -- @param {upstream} upstream clickhouse server upstream
  383. -- @param {table} settings global settings table:
  384. -- * use_gsip: use gzip compression
  385. -- * timeout: request timeout
  386. -- * no_ssl_verify: skip SSL verification
  387. -- * user: HTTP user
  388. -- * password: HTTP password
  389. -- @param {params} HTTP request params
  390. -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
  391. -- @return {boolean} whether a connection was successful
  392. -- @example
  393. --
  394. --]]
  395. exports.generic_sync = function (upstream, settings, params, query)
  396. local http_params = {}
  397. for k,v in pairs(params) do http_params[k] = v end
  398. http_params.gzip = settings.use_gzip
  399. http_params.mime_type = 'text/plain'
  400. http_params.timeout = settings.timeout or default_timeout
  401. http_params.no_ssl_verify = settings.no_ssl_verify
  402. http_params.user = settings.user
  403. http_params.password = settings.password
  404. http_params.log_obj = params.task or params.config
  405. http_params.body = query
  406. if not http_params.url then
  407. local connect_prefix = "http://"
  408. if settings.use_https then
  409. connect_prefix = 'https://'
  410. end
  411. local ip_addr = upstream:get_addr():to_string(true)
  412. local database = settings.database or 'default'
  413. http_params.url = string.format('%s%s/?database=%s&default_format=JSON',
  414. connect_prefix, ip_addr, escape_spaces(database))
  415. end
  416. local err, response = rspamd_http.request(http_params)
  417. if err then
  418. return err, nil
  419. elseif response.code ~= 200 then
  420. return response.content, response
  421. else
  422. lua_util.debugm(N, http_params.log_obj, "clickhouse generic response: %1", response)
  423. local e,obj = parse_clickhouse_response_json(params, response.content)
  424. if e then
  425. return e,nil
  426. end
  427. return nil, obj
  428. end
  429. end
  430. return exports