You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lua_clickhouse.lua 15KB


  1. --[[
  2. Copyright (c) 2018, Vsevolod Stakhov <vsevolod@highsecure.ru>
  3. Copyright (c) 2018, Mikhail Galanin <mgalanin@mimecast.com>
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. ]]--
  14. --[[[
  15. -- @module lua_clickhouse
  16. -- This module contains Clickhouse access functions
  17. --]]
  18. local rspamd_logger = require "rspamd_logger"
  19. local rspamd_http = require "rspamd_http"
  20. local lua_util = require "lua_util"
  21. local rspamd_text = require "rspamd_text"
  22. local exports = {}
  23. local N = 'clickhouse'
  24. local default_timeout = 10.0
  25. local function escape_spaces(query)
  26. return query:gsub('%s', '%%20')
  27. end
  28. local function ch_number(a)
  29. if (a+2^52)-2^52 == a then
  30. -- Integer
  31. return tostring(math.floor(a))
  32. end
  33. return tostring(a)
  34. end
  35. local function clickhouse_quote(str)
  36. if str then
  37. return str:gsub('[\'\\\n\t]', {
  38. ['\''] = [[\']],
  39. ['\\'] = [[\\]],
  40. ['\n'] = [[\n]],
  41. ['\t'] = [[\t]],
  42. })
  43. end
  44. return ''
  45. end
  46. -- Converts an array to a string suitable for clickhouse
  47. local function array_to_string(ar)
  48. for i,elt in ipairs(ar) do
  49. if type(elt) == 'string' then
  50. ar[i] = '\'' .. clickhouse_quote(elt) .. '\''
  51. elseif type(elt) == 'number' then
  52. ar[i] = ch_number(elt)
  53. end
  54. end
  55. return table.concat(ar, ',')
  56. end
  57. -- Converts a row into TSV, taking extra care about arrays
  58. local function row_to_tsv(row)
  59. for i,elt in ipairs(row) do
  60. if type(elt) == 'table' then
  61. row[i] = '[' .. array_to_string(elt) .. ']'
  62. elseif type(elt) == 'number' then
  63. row[i] = ch_number(elt)
  64. else
  65. row[i] = clickhouse_quote(elt)
  66. end
  67. end
  68. return rspamd_text.fromtable(row, '\t')
  69. end
  70. exports.row_to_tsv = row_to_tsv
  71. -- Parses JSONEachRow reply from CH
  72. local function parse_clickhouse_response_json_eachrow(params, data)
  73. local ucl = require "ucl"
  74. if data == nil then
  75. -- clickhouse returned no data (i.e. empty result set): exiting
  76. return {}
  77. end
  78. local function parse_string(s)
  79. local parser = ucl.parser()
  80. local res, err = parser:parse_string(s)
  81. if not res then
  82. rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
  83. return nil
  84. end
  85. return parser:get_object()
  86. end
  87. -- iterate over rows and parse
  88. local ch_rows = lua_util.str_split(data, "\n")
  89. local parsed_rows = {}
  90. for _, plain_row in pairs(ch_rows) do
  91. if plain_row and plain_row:len() > 1 then
  92. local parsed_row = parse_string(plain_row)
  93. if parsed_row then
  94. table.insert(parsed_rows, parsed_row)
  95. end
  96. end
  97. end
  98. return parsed_rows
  99. end
  100. -- Parses JSON reply from CH
  101. local function parse_clickhouse_response_json(params, data)
  102. local ucl = require "ucl"
  103. if data == nil then
  104. -- clickhouse returned no data (i.e. empty result set) considered valid!
  105. return nil, {}
  106. end
  107. local function parse_string(s)
  108. local parser = ucl.parser()
  109. local res, err = parser:parse_string(s)
  110. if not res then
  111. rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
  112. return nil
  113. end
  114. return parser:get_object()
  115. end
  116. local json = parse_string(data)
  117. if not json then
  118. return 'bad json', {}
  119. end
  120. return nil,json
  121. end
  122. -- Helper to generate HTTP closure
  123. local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
  124. local function http_cb(err_message, code, data, _)
  125. if code ~= 200 or err_message then
  126. if not err_message then err_message = data end
  127. local ip_addr = upstream:get_addr():to_string(true)
  128. if fail_cb then
  129. fail_cb(params, err_message, data)
  130. else
  131. rspamd_logger.errx(params.log_obj,
  132. "request failed on clickhouse server %s: %s",
  133. ip_addr, err_message)
  134. end
  135. upstream:fail()
  136. else
  137. upstream:ok()
  138. local rows = parse_clickhouse_response_json_eachrow(params, data)
  139. if rows then
  140. if ok_cb then
  141. ok_cb(params, rows)
  142. else
  143. lua_util.debugm(N, params.log_obj,
  144. "http_select_cb ok: %s, %s, %s, %s", err_message, code,
  145. data:gsub('[\n%s]+', ' '), _)
  146. end
  147. else
  148. if fail_cb then
  149. fail_cb(params, 'failed to parse reply', data)
  150. else
  151. local ip_addr = upstream:get_addr():to_string(true)
  152. rspamd_logger.errx(params.log_obj,
  153. "request failed on clickhouse server %s: %s",
  154. ip_addr, 'failed to parse reply')
  155. end
  156. end
  157. end
  158. end
  159. return http_cb
  160. end
  161. -- Helper to generate HTTP closure
  162. local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
  163. local function http_cb(err_message, code, data, _)
  164. if code ~= 200 or err_message then
  165. if not err_message then err_message = data end
  166. local ip_addr = upstream:get_addr():to_string(true)
  167. if fail_cb then
  168. fail_cb(params, err_message, data)
  169. else
  170. rspamd_logger.errx(params.log_obj,
  171. "request failed on clickhouse server %s: %s",
  172. ip_addr, err_message)
  173. end
  174. upstream:fail()
  175. else
  176. upstream:ok()
  177. if ok_cb then
  178. local err,parsed = parse_clickhouse_response_json(data)
  179. if err then
  180. fail_cb(params, err, data)
  181. else
  182. ok_cb(params, parsed)
  183. end
  184. else
  185. lua_util.debugm(N, params.log_obj,
  186. "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
  187. data:gsub('[\n%s]+', ' '), _)
  188. end
  189. end
  190. end
  191. return http_cb
  192. end
  193. --[[[
  194. -- @function lua_clickhouse.select(upstream, settings, params, query,
  195. ok_cb, fail_cb)
  196. -- Make select request to clickhouse
  197. -- @param {upstream} upstream clickhouse server upstream
  198. -- @param {table} settings global settings table:
  199. -- * use_gsip: use gzip compression
  200. -- * timeout: request timeout
  201. -- * no_ssl_verify: skip SSL verification
  202. -- * user: HTTP user
  203. -- * password: HTTP password
  204. -- @param {params} HTTP request params
  205. -- @param {string} query select query (passed in HTTP body)
  206. -- @param {function} ok_cb callback to be called in case of success
  207. -- @param {function} fail_cb callback to be called in case of some error
  208. -- @return {boolean} whether a connection was successful
  209. -- @example
  210. --
  211. --]]
  212. exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
  213. local http_params = {}
  214. for k,v in pairs(params) do http_params[k] = v end
  215. http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb)
  216. http_params.gzip = settings.use_gzip
  217. http_params.mime_type = 'text/plain'
  218. http_params.timeout = settings.timeout or default_timeout
  219. http_params.no_ssl_verify = settings.no_ssl_verify
  220. http_params.user = settings.user
  221. http_params.password = settings.password
  222. http_params.body = query
  223. http_params.log_obj = params.task or params.config
  224. lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
  225. if not http_params.url then
  226. local connect_prefix = "http://"
  227. if settings.use_https then
  228. connect_prefix = 'https://'
  229. end
  230. local ip_addr = upstream:get_addr():to_string(true)
  231. local database = settings.database or 'default'
  232. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  233. connect_prefix, ip_addr, escape_spaces(database))
  234. end
  235. return rspamd_http.request(http_params)
  236. end
  237. --[[[
  238. -- @function lua_clickhouse.select_sync(upstream, settings, params, query,
  239. ok_cb, fail_cb)
  240. -- Make select request to clickhouse
  241. -- @param {upstream} upstream clickhouse server upstream
  242. -- @param {table} settings global settings table:
  243. -- * use_gsip: use gzip compression
  244. -- * timeout: request timeout
  245. -- * no_ssl_verify: skip SSL verification
  246. -- * user: HTTP user
  247. -- * password: HTTP password
  248. -- @param {params} HTTP request params
  249. -- @param {string} query select query (passed in HTTP body)
  250. -- @param {function} ok_cb callback to be called in case of success
  251. -- @param {function} fail_cb callback to be called in case of some error
  252. -- @return
  253. -- {string} error message if exists
  254. -- nil | {rows} | {http_response}
  255. -- @example
  256. --
  257. --]]
  258. exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_cb)
  259. local http_params = {}
  260. for k,v in pairs(params) do http_params[k] = v end
  261. http_params.gzip = settings.use_gzip
  262. http_params.mime_type = 'text/plain'
  263. http_params.timeout = settings.timeout or default_timeout
  264. http_params.no_ssl_verify = settings.no_ssl_verify
  265. http_params.user = settings.user
  266. http_params.password = settings.password
  267. http_params.body = query
  268. http_params.log_obj = params.task or params.config
  269. lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
  270. if not http_params.url then
  271. local connect_prefix = "http://"
  272. if settings.use_https then
  273. connect_prefix = 'https://'
  274. end
  275. local ip_addr = upstream:get_addr():to_string(true)
  276. local database = settings.database or 'default'
  277. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  278. connect_prefix, ip_addr, escape_spaces(database))
  279. end
  280. local err, response = rspamd_http.request(http_params)
  281. if err then
  282. return err, nil
  283. elseif response.code ~= 200 then
  284. return response.content, response
  285. else
  286. lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response)
  287. local rows = parse_clickhouse_response_json_eachrow(params, response.content)
  288. return nil, rows
  289. end
  290. end
  291. --[[[
  292. -- @function lua_clickhouse.insert(upstream, settings, params, query, rows,
  293. ok_cb, fail_cb)
  294. -- Insert data rows to clickhouse
  295. -- @param {upstream} upstream clickhouse server upstream
  296. -- @param {table} settings global settings table:
  297. -- * use_gsip: use gzip compression
  298. -- * timeout: request timeout
  299. -- * no_ssl_verify: skip SSL verification
  300. -- * user: HTTP user
  301. -- * password: HTTP password
  302. -- @param {params} HTTP request params
  303. -- @param {string} query select query (passed in `query` request element with spaces escaped)
  304. -- @param {table|mixed} rows mix of strings, numbers or tables (for arrays)
  305. -- @param {function} ok_cb callback to be called in case of success
  306. -- @param {function} fail_cb callback to be called in case of some error
  307. -- @return {boolean} whether a connection was successful
  308. -- @example
  309. --
  310. --]]
  311. exports.insert = function (upstream, settings, params, query, rows,
  312. ok_cb, fail_cb)
  313. local http_params = {}
  314. for k,v in pairs(params) do http_params[k] = v end
  315. http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
  316. http_params.gzip = settings.use_gzip
  317. http_params.mime_type = 'text/plain'
  318. http_params.timeout = settings.timeout or default_timeout
  319. http_params.no_ssl_verify = settings.no_ssl_verify
  320. http_params.user = settings.user
  321. http_params.password = settings.password
  322. http_params.method = 'POST'
  323. http_params.body = {rspamd_text.fromtable(rows, '\n'), '\n'}
  324. http_params.log_obj = params.task or params.config
  325. if not http_params.url then
  326. local connect_prefix = "http://"
  327. if settings.use_https then
  328. connect_prefix = 'https://'
  329. end
  330. local ip_addr = upstream:get_addr():to_string(true)
  331. local database = settings.database or 'default'
  332. http_params.url = string.format('%s%s/?database=%s&query=%s%%20FORMAT%%20TabSeparated',
  333. connect_prefix,
  334. ip_addr,
  335. escape_spaces(database),
  336. escape_spaces(query))
  337. end
  338. return rspamd_http.request(http_params)
  339. end
  340. --[[[
  341. -- @function lua_clickhouse.generic(upstream, settings, params, query,
  342. ok_cb, fail_cb)
  343. -- Make a generic request to Clickhouse (e.g. alter)
  344. -- @param {upstream} upstream clickhouse server upstream
  345. -- @param {table} settings global settings table:
  346. -- * use_gsip: use gzip compression
  347. -- * timeout: request timeout
  348. -- * no_ssl_verify: skip SSL verification
  349. -- * user: HTTP user
  350. -- * password: HTTP password
  351. -- @param {params} HTTP request params
  352. -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
  353. -- @param {function} ok_cb callback to be called in case of success
  354. -- @param {function} fail_cb callback to be called in case of some error
  355. -- @return {boolean} whether a connection was successful
  356. -- @example
  357. --
  358. --]]
  359. exports.generic = function (upstream, settings, params, query,
  360. ok_cb, fail_cb)
  361. local http_params = {}
  362. for k,v in pairs(params) do http_params[k] = v end
  363. http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
  364. http_params.gzip = settings.use_gzip
  365. http_params.mime_type = 'text/plain'
  366. http_params.timeout = settings.timeout or default_timeout
  367. http_params.no_ssl_verify = settings.no_ssl_verify
  368. http_params.user = settings.user
  369. http_params.password = settings.password
  370. http_params.log_obj = params.task or params.config
  371. http_params.body = query
  372. if not http_params.url then
  373. local connect_prefix = "http://"
  374. if settings.use_https then
  375. connect_prefix = 'https://'
  376. end
  377. local ip_addr = upstream:get_addr():to_string(true)
  378. local database = settings.database or 'default'
  379. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  380. connect_prefix, ip_addr, escape_spaces(database))
  381. end
  382. return rspamd_http.request(http_params)
  383. end
  384. --[[[
  385. -- @function lua_clickhouse.generic_sync(upstream, settings, params, query,
  386. ok_cb, fail_cb)
  387. -- Make a generic request to Clickhouse (e.g. alter)
  388. -- @param {upstream} upstream clickhouse server upstream
  389. -- @param {table} settings global settings table:
  390. -- * use_gsip: use gzip compression
  391. -- * timeout: request timeout
  392. -- * no_ssl_verify: skip SSL verification
  393. -- * user: HTTP user
  394. -- * password: HTTP password
  395. -- @param {params} HTTP request params
  396. -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
  397. -- @return {boolean} whether a connection was successful
  398. -- @example
  399. --
  400. --]]
  401. exports.generic_sync = function (upstream, settings, params, query)
  402. local http_params = {}
  403. for k,v in pairs(params) do http_params[k] = v end
  404. http_params.gzip = settings.use_gzip
  405. http_params.mime_type = 'text/plain'
  406. http_params.timeout = settings.timeout or default_timeout
  407. http_params.no_ssl_verify = settings.no_ssl_verify
  408. http_params.user = settings.user
  409. http_params.password = settings.password
  410. http_params.log_obj = params.task or params.config
  411. http_params.body = query
  412. if not http_params.url then
  413. local connect_prefix = "http://"
  414. if settings.use_https then
  415. connect_prefix = 'https://'
  416. end
  417. local ip_addr = upstream:get_addr():to_string(true)
  418. local database = settings.database or 'default'
  419. http_params.url = string.format('%s%s/?database=%s&default_format=JSON',
  420. connect_prefix, ip_addr, escape_spaces(database))
  421. end
  422. local err, response = rspamd_http.request(http_params)
  423. if err then
  424. return err, nil
  425. elseif response.code ~= 200 then
  426. return response.content, response
  427. else
  428. lua_util.debugm(N, http_params.log_obj, "clickhouse generic response: %1", response)
  429. local e,obj = parse_clickhouse_response_json(params, response.content)
  430. if e then
  431. return e,nil
  432. end
  433. return nil, obj
  434. end
  435. end
  436. return exports