You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lua_clickhouse.lua 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. --[[
  2. Copyright (c) 2018, Vsevolod Stakhov <vsevolod@highsecure.ru>
  3. Copyright (c) 2018, Mikhail Galanin <mgalanin@mimecast.com>
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. ]]--
  14. --[[[
  15. -- @module lua_clickhouse
  16. -- This module contains Clickhouse access functions
  17. --]]
  18. local rspamd_logger = require "rspamd_logger"
  19. local rspamd_http = require "rspamd_http"
  20. local lua_util = require "lua_util"
  21. local exports = {}
  22. local N = 'clickhouse'
  23. local default_timeout = 10.0
  24. local function escape_spaces(query)
  25. return query:gsub('%s', '%%20')
  26. end
  27. local function ch_number(a)
  28. if (a+2^52)-2^52 == a then
  29. -- Integer
  30. return tostring(math.floor(a))
  31. end
  32. return tostring(a)
  33. end
  34. local function clickhouse_quote(str)
  35. if str then
  36. return str:gsub('[\'\\]', '\\%1'):lower()
  37. end
  38. return ''
  39. end
  40. -- Converts an array to a string suitable for clickhouse
  41. local function array_to_string(ar)
  42. for i,elt in ipairs(ar) do
  43. if type(elt) == 'string' then
  44. ar[i] = '\'' .. clickhouse_quote(elt) .. '\''
  45. elseif type(elt) == 'number' then
  46. ar[i] = ch_number(elt)
  47. end
  48. end
  49. return table.concat(ar, ',')
  50. end
  51. -- Converts a row into TSV, taking extra care about arrays
  52. local function row_to_tsv(row)
  53. for i,elt in ipairs(row) do
  54. if type(elt) == 'table' then
  55. row[i] = '[' .. array_to_string(elt) .. ']'
  56. elseif type(elt) == 'number' then
  57. row[i] = ch_number(elt)
  58. end
  59. end
  60. return table.concat(row, '\t')
  61. end
  62. -- Parses JSONEachRow reply from CH
  63. local function parse_clickhouse_response(params, data)
  64. local ucl = require "ucl"
  65. if data == nil then
  66. -- clickhouse returned no data (i.e. empty result set): exiting
  67. return {}
  68. end
  69. local function parse_string(s)
  70. local parser = ucl.parser()
  71. local res, err = parser:parse_string(s)
  72. if not res then
  73. rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
  74. return nil
  75. end
  76. return parser:get_object()
  77. end
  78. -- iterate over rows and parse
  79. local ch_rows = lua_util.str_split(data, "\n")
  80. local parsed_rows = {}
  81. for _, plain_row in pairs(ch_rows) do
  82. if plain_row and plain_row:len() > 1 then
  83. local parsed_row = parse_string(plain_row)
  84. if parsed_row then
  85. table.insert(parsed_rows, parsed_row)
  86. end
  87. end
  88. end
  89. return parsed_rows
  90. end
  91. -- Helper to generate HTTP closure
  92. local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
  93. local function http_cb(err_message, code, data, _)
  94. if code ~= 200 or err_message then
  95. if not err_message then err_message = data end
  96. local ip_addr = upstream:get_addr():to_string(true)
  97. if fail_cb then
  98. fail_cb(params, err_message, data)
  99. else
  100. rspamd_logger.errx(params.log_obj,
  101. "request failed on clickhouse server %s: %s",
  102. ip_addr, err_message)
  103. end
  104. upstream:fail()
  105. else
  106. upstream:ok()
  107. local rows = parse_clickhouse_response(params, data)
  108. if rows then
  109. if ok_cb then
  110. ok_cb(params, rows)
  111. else
  112. lua_util.debugm(N, params.log_obj,
  113. "http_select_cb ok: %s, %s, %s, %s", err_message, code,
  114. data:gsub('[\n%s]+', ' '), _)
  115. end
  116. else
  117. if fail_cb then
  118. fail_cb(params, 'failed to parse reply', data)
  119. else
  120. local ip_addr = upstream:get_addr():to_string(true)
  121. rspamd_logger.errx(params.log_obj,
  122. "request failed on clickhouse server %s: %s",
  123. ip_addr, 'failed to parse reply')
  124. end
  125. end
  126. end
  127. end
  128. return http_cb
  129. end
  130. -- Helper to generate HTTP closure
  131. local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
  132. local function http_cb(err_message, code, data, _)
  133. if code ~= 200 or err_message then
  134. if not err_message then err_message = data end
  135. local ip_addr = upstream:get_addr():to_string(true)
  136. if fail_cb then
  137. fail_cb(params, err_message, data)
  138. else
  139. rspamd_logger.errx(params.log_obj,
  140. "request failed on clickhouse server %s: %s",
  141. ip_addr, err_message)
  142. end
  143. upstream:fail()
  144. else
  145. upstream:ok()
  146. if ok_cb then
  147. ok_cb(params, data)
  148. else
  149. lua_util.debugm(N, params.log_obj,
  150. "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
  151. data:gsub('[\n%s]+', ' '), _)
  152. end
  153. end
  154. end
  155. return http_cb
  156. end
  157. --[[[
  158. -- @function lua_clickhouse.select(upstream, settings, params, query,
  159. ok_cb, fail_cb)
  160. -- Make select request to clickhouse
  161. -- @param {upstream} upstream clickhouse server upstream
  162. -- @param {table} settings global settings table:
  163. -- * use_gsip: use gzip compression
  164. -- * timeout: request timeout
  165. -- * no_ssl_verify: skip SSL verification
  166. -- * user: HTTP user
  167. -- * password: HTTP password
  168. -- @param {params} HTTP request params
  169. -- @param {string} query select query (passed in HTTP body)
  170. -- @param {function} ok_cb callback to be called in case of success
  171. -- @param {function} fail_cb callback to be called in case of some error
  172. -- @return {boolean} whether a connection was successful
  173. -- @example
  174. --
  175. --]]
  176. exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
  177. local http_params = {}
  178. for k,v in pairs(params) do http_params[k] = v end
  179. http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb)
  180. http_params.gzip = settings.use_gzip
  181. http_params.mime_type = 'text/plain'
  182. http_params.timeout = settings.timeout or default_timeout
  183. http_params.no_ssl_verify = settings.no_ssl_verify
  184. http_params.user = settings.user
  185. http_params.password = settings.password
  186. http_params.body = query
  187. http_params.log_obj = params.task or params.config
  188. lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
  189. if not http_params.url then
  190. local connect_prefix = "http://"
  191. if settings.use_https then
  192. connect_prefix = 'https://'
  193. end
  194. local ip_addr = upstream:get_addr():to_string(true)
  195. local database = params.database or 'default'
  196. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  197. connect_prefix, ip_addr, escape_spaces(database))
  198. end
  199. return rspamd_http.request(http_params)
  200. end
  201. --[[[
  202. -- @function lua_clickhouse.select_sync(upstream, settings, params, query,
  203. ok_cb, fail_cb)
  204. -- Make select request to clickhouse
  205. -- @param {upstream} upstream clickhouse server upstream
  206. -- @param {table} settings global settings table:
  207. -- * use_gsip: use gzip compression
  208. -- * timeout: request timeout
  209. -- * no_ssl_verify: skip SSL verification
  210. -- * user: HTTP user
  211. -- * password: HTTP password
  212. -- @param {params} HTTP request params
  213. -- @param {string} query select query (passed in HTTP body)
  214. -- @param {function} ok_cb callback to be called in case of success
  215. -- @param {function} fail_cb callback to be called in case of some error
  216. -- @return
  217. -- {string} error message if exists
  218. -- nil | {rows} | {http_response}
  219. -- @example
  220. --
  221. --]]
  222. exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_cb)
  223. local http_params = {}
  224. for k,v in pairs(params) do http_params[k] = v end
  225. http_params.gzip = settings.use_gzip
  226. http_params.mime_type = 'text/plain'
  227. http_params.timeout = settings.timeout or default_timeout
  228. http_params.no_ssl_verify = settings.no_ssl_verify
  229. http_params.user = settings.user
  230. http_params.password = settings.password
  231. http_params.body = query
  232. http_params.log_obj = params.task or params.config
  233. lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
  234. if not http_params.url then
  235. local connect_prefix = "http://"
  236. if settings.use_https then
  237. connect_prefix = 'https://'
  238. end
  239. local ip_addr = upstream:get_addr():to_string(true)
  240. local database = params.database or 'default'
  241. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  242. connect_prefix, ip_addr, escape_spaces(database))
  243. end
  244. local err, response = rspamd_http.request(http_params)
  245. if err then
  246. return err, nil
  247. elseif response.code ~= 200 then
  248. return response.content, response
  249. else
  250. lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response)
  251. local rows = parse_clickhouse_response(params, response.content)
  252. return nil, rows
  253. end
  254. end
  255. --[[[
  256. -- @function lua_clickhouse.insert(upstream, settings, params, query, rows,
  257. ok_cb, fail_cb)
  258. -- Insert data rows to clickhouse
  259. -- @param {upstream} upstream clickhouse server upstream
  260. -- @param {table} settings global settings table:
  261. -- * use_gsip: use gzip compression
  262. -- * timeout: request timeout
  263. -- * no_ssl_verify: skip SSL verification
  264. -- * user: HTTP user
  265. -- * password: HTTP password
  266. -- @param {params} HTTP request params
  267. -- @param {string} query select query (passed in `query` request element with spaces escaped)
  268. -- @param {table|mixed} rows mix of strings, numbers or tables (for arrays)
  269. -- @param {function} ok_cb callback to be called in case of success
  270. -- @param {function} fail_cb callback to be called in case of some error
  271. -- @return {boolean} whether a connection was successful
  272. -- @example
  273. --
  274. --]]
  275. exports.insert = function (upstream, settings, params, query, rows,
  276. ok_cb, fail_cb)
  277. local fun = require "fun"
  278. local http_params = {}
  279. for k,v in pairs(params) do http_params[k] = v end
  280. http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
  281. http_params.gzip = settings.use_gzip
  282. http_params.mime_type = 'text/plain'
  283. http_params.timeout = settings.timeout or default_timeout
  284. http_params.no_ssl_verify = settings.no_ssl_verify
  285. http_params.user = settings.user
  286. http_params.password = settings.password
  287. http_params.method = 'POST'
  288. http_params.body = {table.concat(fun.totable(fun.map(function(row)
  289. return row_to_tsv(row)
  290. end, rows)), '\n'), '\n'}
  291. http_params.log_obj = params.task or params.config
  292. if not http_params.url then
  293. local connect_prefix = "http://"
  294. if settings.use_https then
  295. connect_prefix = 'https://'
  296. end
  297. local ip_addr = upstream:get_addr():to_string(true)
  298. local database = params.database or 'default'
  299. http_params.url = string.format('%s%s/?database=%s&query=%s%%20FORMAT%%20TabSeparated',
  300. connect_prefix,
  301. ip_addr,
  302. escape_spaces(database),
  303. escape_spaces(query))
  304. end
  305. return rspamd_http.request(http_params)
  306. end
  307. --[[[
  308. -- @function lua_clickhouse.generic(upstream, settings, params, query,
  309. ok_cb, fail_cb)
  310. -- Make a generic request to Clickhouse (e.g. alter)
  311. -- @param {upstream} upstream clickhouse server upstream
  312. -- @param {table} settings global settings table:
  313. -- * use_gsip: use gzip compression
  314. -- * timeout: request timeout
  315. -- * no_ssl_verify: skip SSL verification
  316. -- * user: HTTP user
  317. -- * password: HTTP password
  318. -- @param {params} HTTP request params
  319. -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
  320. -- @param {function} ok_cb callback to be called in case of success
  321. -- @param {function} fail_cb callback to be called in case of some error
  322. -- @return {boolean} whether a connection was successful
  323. -- @example
  324. --
  325. --]]
  326. exports.generic = function (upstream, settings, params, query,
  327. ok_cb, fail_cb)
  328. local http_params = {}
  329. for k,v in pairs(params) do http_params[k] = v end
  330. http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
  331. http_params.gzip = settings.use_gzip
  332. http_params.mime_type = 'text/plain'
  333. http_params.timeout = settings.timeout or default_timeout
  334. http_params.no_ssl_verify = settings.no_ssl_verify
  335. http_params.user = settings.user
  336. http_params.password = settings.password
  337. http_params.log_obj = params.task or params.config
  338. http_params.body = query
  339. if not http_params.url then
  340. local connect_prefix = "http://"
  341. if settings.use_https then
  342. connect_prefix = 'https://'
  343. end
  344. local ip_addr = upstream:get_addr():to_string(true)
  345. local database = params.database or 'default'
  346. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  347. connect_prefix, ip_addr, escape_spaces(database))
  348. end
  349. return rspamd_http.request(http_params)
  350. end
  351. --[[[
  352. -- @function lua_clickhouse.generic_sync(upstream, settings, params, query,
  353. ok_cb, fail_cb)
  354. -- Make a generic request to Clickhouse (e.g. alter)
  355. -- @param {upstream} upstream clickhouse server upstream
  356. -- @param {table} settings global settings table:
  357. -- * use_gsip: use gzip compression
  358. -- * timeout: request timeout
  359. -- * no_ssl_verify: skip SSL verification
  360. -- * user: HTTP user
  361. -- * password: HTTP password
  362. -- @param {params} HTTP request params
  363. -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
  364. -- @return {boolean} whether a connection was successful
  365. -- @example
  366. --
  367. --]]
  368. exports.generic_sync = function (upstream, settings, params, query)
  369. local http_params = {}
  370. for k,v in pairs(params) do http_params[k] = v end
  371. http_params.gzip = settings.use_gzip
  372. http_params.mime_type = 'text/plain'
  373. http_params.timeout = settings.timeout or default_timeout
  374. http_params.no_ssl_verify = settings.no_ssl_verify
  375. http_params.user = settings.user
  376. http_params.password = settings.password
  377. http_params.log_obj = params.task or params.config
  378. http_params.body = query
  379. if not http_params.url then
  380. local connect_prefix = "http://"
  381. if settings.use_https then
  382. connect_prefix = 'https://'
  383. end
  384. local ip_addr = upstream:get_addr():to_string(true)
  385. local database = params.database or 'default'
  386. http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
  387. connect_prefix, ip_addr, escape_spaces(database))
  388. end
  389. return rspamd_http.request(http_params)
  390. end
  391. return exports