You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lua_clickhouse.lua 13KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. --[[
  2. Copyright (c) 2018, Vsevolod Stakhov <vsevolod@highsecure.ru>
  3. Copyright (c) 2018, Mikhail Galanin <mgalanin@mimecast.com>
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. ]]--
  14. local rspamd_logger = require "rspamd_logger"
  15. local rspamd_http = require "rspamd_http"
  16. local lua_util = require "lua_util"
  17. local exports = {}
  18. local N = 'clickhouse'
  19. local default_timeout = 10.0
  20. local function escape_spaces(query)
  21. return query:gsub('%s', '%%20')
  22. end
  23. local function ch_number(a)
  24. if (a+2^52)-2^52 == a then
  25. -- Integer
  26. return tostring(math.floor(a))
  27. end
  28. return tostring(a)
  29. end
  30. local function clickhouse_quote(str)
  31. if str then
  32. return str:gsub('[\'\\]', '\\%1'):lower()
  33. end
  34. return ''
  35. end
  36. -- Converts an array to a string suitable for clickhouse
  37. local function array_to_string(ar)
  38. for i,elt in ipairs(ar) do
  39. if type(elt) == 'string' then
  40. ar[i] = '\'' .. clickhouse_quote(elt) .. '\''
  41. elseif type(elt) == 'number' then
  42. ar[i] = ch_number(elt)
  43. end
  44. end
  45. return table.concat(ar, ',')
  46. end
  47. -- Converts a row into TSV, taking extra care about arrays
  48. local function row_to_tsv(row)
  49. for i,elt in ipairs(row) do
  50. if type(elt) == 'table' then
  51. row[i] = '[' .. array_to_string(elt) .. ']'
  52. elseif type(elt) == 'number' then
  53. row[i] = ch_number(elt)
  54. end
  55. end
  56. return table.concat(row, '\t')
  57. end
  58. -- Parses JSONEachRow reply from CH
  59. local function parse_clickhouse_response(params, data)
  60. local ucl = require "ucl"
  61. if data == nil then
  62. -- clickhouse returned no data (i.e. empty result set): exiting
  63. return {}
  64. end
  65. local function parse_string(s)
  66. local parser = ucl.parser()
  67. local res, err = parser:parse_string(s)
  68. if not res then
  69. rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
  70. return nil
  71. end
  72. return parser:get_object()
  73. end
  74. -- iterate over rows and parse
  75. local ch_rows = lua_util.str_split(data, "\n")
  76. local parsed_rows = {}
  77. for _, plain_row in pairs(ch_rows) do
  78. if plain_row and plain_row:len() > 1 then
  79. local parsed_row = parse_string(plain_row)
  80. if parsed_row then
  81. table.insert(parsed_rows, parsed_row)
  82. end
  83. end
  84. end
  85. return parsed_rows
  86. end
  87. -- Helper to generate HTTP closure
  88. local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
  89. local function http_cb(err_message, code, data, _)
  90. if code ~= 200 or err_message then
  91. if not err_message then err_message = data end
  92. local ip_addr = upstream:get_addr():to_string(true)
  93. if fail_cb then
  94. fail_cb(params, err_message, data)
  95. else
  96. rspamd_logger.errx(params.log_obj,
  97. "request failed on clickhouse server %s: %s",
  98. ip_addr, err_message)
  99. end
  100. upstream:fail()
  101. else
  102. upstream:ok()
  103. local rows = parse_clickhouse_response(params, data)
  104. if rows then
  105. if ok_cb then
  106. ok_cb(params, rows)
  107. else
  108. lua_util.debugm(N, params.log_obj,
  109. "http_select_cb ok: %s, %s, %s, %s", err_message, code,
  110. data:gsub('[\n%s]+', ' '), _)
  111. end
  112. else
  113. if fail_cb then
  114. fail_cb(params, 'failed to parse reply', data)
  115. else
  116. local ip_addr = upstream:get_addr():to_string(true)
  117. rspamd_logger.errx(params.log_obj,
  118. "request failed on clickhouse server %s: %s",
  119. ip_addr, 'failed to parse reply')
  120. end
  121. end
  122. end
  123. end
  124. return http_cb
  125. end
  126. -- Helper to generate HTTP closure
  127. local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
  128. local function http_cb(err_message, code, data, _)
  129. if code ~= 200 or err_message then
  130. if not err_message then err_message = data end
  131. local ip_addr = upstream:get_addr():to_string(true)
  132. if fail_cb then
  133. fail_cb(params, err_message, data)
  134. else
  135. rspamd_logger.errx(params.log_obj,
  136. "request failed on clickhouse server %s: %s",
  137. ip_addr, err_message)
  138. end
  139. upstream:fail()
  140. else
  141. upstream:ok()
  142. if ok_cb then
  143. ok_cb(params, data)
  144. else
  145. lua_util.debugm(N, params.log_obj,
  146. "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
  147. data:gsub('[\n%s]+', ' '), _)
  148. end
  149. end
  150. end
  151. return http_cb
  152. end
  153. --[[[
  154. -- @function lua_clickhouse.select(upstream, settings, params, query,
  155. ok_cb, fail_cb)
  156. -- Make select request to clickhouse
  157. -- @param {upstream} upstream clickhouse server upstream
  158. -- @param {table} settings global settings table:
  159. -- * use_gsip: use gzip compression
  160. -- * timeout: request timeout
  161. -- * no_ssl_verify: skip SSL verification
  162. -- * user: HTTP user
  163. -- * password: HTTP password
  164. -- @param {params} HTTP request params
  165. -- @param {string} query select query (passed in HTTP body)
  166. -- @param {function} ok_cb callback to be called in case of success
  167. -- @param {function} fail_cb callback to be called in case of some error
  168. -- @return {boolean} whether a connection was successful
  169. -- @example
  170. --
  171. --]]
  172. exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
  173. local http_params = {}
  174. for k,v in pairs(params) do http_params[k] = v end
  175. http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb)
  176. http_params.gzip = settings.use_gzip
  177. http_params.mime_type = 'text/plain'
  178. http_params.timeout = settings.timeout or default_timeout
  179. http_params.no_ssl_verify = settings.no_ssl_verify
  180. http_params.user = settings.user
  181. http_params.password = settings.password
  182. http_params.body = query
  183. http_params.log_obj = params.task or params.config
  184. lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
  185. if not http_params.url then
  186. local connect_prefix = "http://"
  187. if settings.use_https then
  188. connect_prefix = 'https://'
  189. end
  190. local ip_addr = upstream:get_addr():to_string(true)
  191. http_params.url = connect_prefix .. ip_addr .. '/?default_format=JSONEachRow'
  192. end
  193. return rspamd_http.request(http_params)
  194. end
  195. --[[[
  196. -- @function lua_clickhouse.select_sync(upstream, settings, params, query,
  197. ok_cb, fail_cb)
  198. -- Make select request to clickhouse
  199. -- @param {upstream} upstream clickhouse server upstream
  200. -- @param {table} settings global settings table:
  201. -- * use_gsip: use gzip compression
  202. -- * timeout: request timeout
  203. -- * no_ssl_verify: skip SSL verification
  204. -- * user: HTTP user
  205. -- * password: HTTP password
  206. -- @param {params} HTTP request params
  207. -- @param {string} query select query (passed in HTTP body)
  208. -- @param {function} ok_cb callback to be called in case of success
  209. -- @param {function} fail_cb callback to be called in case of some error
  210. -- @return
  211. -- {string} error message if exists
  212. -- nil | {rows} | {http_response}
  213. -- @example
  214. --
  215. --]]
  216. exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_cb)
  217. local http_params = {}
  218. for k,v in pairs(params) do http_params[k] = v end
  219. http_params.gzip = settings.use_gzip
  220. http_params.mime_type = 'text/plain'
  221. http_params.timeout = settings.timeout or default_timeout
  222. http_params.no_ssl_verify = settings.no_ssl_verify
  223. http_params.user = settings.user
  224. http_params.password = settings.password
  225. http_params.body = query
  226. http_params.log_obj = params.task or params.config
  227. lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
  228. if not http_params.url then
  229. local connect_prefix = "http://"
  230. if settings.use_https then
  231. connect_prefix = 'https://'
  232. end
  233. local ip_addr = upstream:get_addr():to_string(true)
  234. http_params.url = connect_prefix .. ip_addr .. '/?default_format=JSONEachRow'
  235. end
  236. local err, response = rspamd_http.request(http_params)
  237. if err then
  238. return err, nil
  239. elseif response.code ~= 200 then
  240. return response.content, response
  241. else
  242. lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response)
  243. local rows = parse_clickhouse_response(params, response.content)
  244. return nil, rows
  245. end
  246. end
  247. --[[[
  248. -- @function lua_clickhouse.insert(upstream, settings, params, query, rows,
  249. ok_cb, fail_cb)
  250. -- Insert data rows to clickhouse
  251. -- @param {upstream} upstream clickhouse server upstream
  252. -- @param {table} settings global settings table:
  253. -- * use_gsip: use gzip compression
  254. -- * timeout: request timeout
  255. -- * no_ssl_verify: skip SSL verification
  256. -- * user: HTTP user
  257. -- * password: HTTP password
  258. -- @param {params} HTTP request params
  259. -- @param {string} query select query (passed in `query` request element with spaces escaped)
  260. -- @param {table|mixed} rows mix of strings, numbers or tables (for arrays)
  261. -- @param {function} ok_cb callback to be called in case of success
  262. -- @param {function} fail_cb callback to be called in case of some error
  263. -- @return {boolean} whether a connection was successful
  264. -- @example
  265. --
  266. --]]
  267. exports.insert = function (upstream, settings, params, query, rows,
  268. ok_cb, fail_cb)
  269. local fun = require "fun"
  270. local http_params = {}
  271. for k,v in pairs(params) do http_params[k] = v end
  272. http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
  273. http_params.gzip = settings.use_gzip
  274. http_params.mime_type = 'text/plain'
  275. http_params.timeout = settings.timeout or default_timeout
  276. http_params.no_ssl_verify = settings.no_ssl_verify
  277. http_params.user = settings.user
  278. http_params.password = settings.password
  279. http_params.method = 'POST'
  280. http_params.body = {table.concat(fun.totable(fun.map(function(row)
  281. return row_to_tsv(row)
  282. end, rows)), '\n'), '\n'}
  283. http_params.log_obj = params.task or params.config
  284. if not http_params.url then
  285. local connect_prefix = "http://"
  286. if settings.use_https then
  287. connect_prefix = 'https://'
  288. end
  289. local ip_addr = upstream:get_addr():to_string(true)
  290. http_params.url = string.format('%s%s/?query=%s%%20FORMAT%%20TabSeparated',
  291. connect_prefix,
  292. ip_addr,
  293. escape_spaces(query))
  294. end
  295. return rspamd_http.request(http_params)
  296. end
  297. --[[[
  298. -- @function lua_clickhouse.generic(upstream, settings, params, query,
  299. ok_cb, fail_cb)
  300. -- Make a generic request to Clickhouse (e.g. alter)
  301. -- @param {upstream} upstream clickhouse server upstream
  302. -- @param {table} settings global settings table:
  303. -- * use_gsip: use gzip compression
  304. -- * timeout: request timeout
  305. -- * no_ssl_verify: skip SSL verification
  306. -- * user: HTTP user
  307. -- * password: HTTP password
  308. -- @param {params} HTTP request params
  309. -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
  310. -- @param {function} ok_cb callback to be called in case of success
  311. -- @param {function} fail_cb callback to be called in case of some error
  312. -- @return {boolean} whether a connection was successful
  313. -- @example
  314. --
  315. --]]
  316. exports.generic = function (upstream, settings, params, query,
  317. ok_cb, fail_cb)
  318. local http_params = {}
  319. for k,v in pairs(params) do http_params[k] = v end
  320. http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
  321. http_params.gzip = settings.use_gzip
  322. http_params.mime_type = 'text/plain'
  323. http_params.timeout = settings.timeout or default_timeout
  324. http_params.no_ssl_verify = settings.no_ssl_verify
  325. http_params.user = settings.user
  326. http_params.password = settings.password
  327. http_params.log_obj = params.task or params.config
  328. http_params.body = query
  329. if not http_params.url then
  330. local connect_prefix = "http://"
  331. if settings.use_https then
  332. connect_prefix = 'https://'
  333. end
  334. local ip_addr = upstream:get_addr():to_string(true)
  335. http_params.url = connect_prefix .. ip_addr .. '/?default_format=JSONEachRow'
  336. end
  337. return rspamd_http.request(http_params)
  338. end
  339. --[[[
  340. -- @function lua_clickhouse.generic_sync(upstream, settings, params, query,
  341. ok_cb, fail_cb)
  342. -- Make a generic request to Clickhouse (e.g. alter)
  343. -- @param {upstream} upstream clickhouse server upstream
  344. -- @param {table} settings global settings table:
  345. -- * use_gsip: use gzip compression
  346. -- * timeout: request timeout
  347. -- * no_ssl_verify: skip SSL verification
  348. -- * user: HTTP user
  349. -- * password: HTTP password
  350. -- @param {params} HTTP request params
  351. -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
  352. -- @return {boolean} whether a connection was successful
  353. -- @example
  354. --
  355. --]]
  356. exports.generic_sync = function (upstream, settings, params, query)
  357. local http_params = {}
  358. for k,v in pairs(params) do http_params[k] = v end
  359. http_params.gzip = settings.use_gzip
  360. http_params.mime_type = 'text/plain'
  361. http_params.timeout = settings.timeout or default_timeout
  362. http_params.no_ssl_verify = settings.no_ssl_verify
  363. http_params.user = settings.user
  364. http_params.password = settings.password
  365. http_params.log_obj = params.task or params.config
  366. http_params.body = query
  367. if not http_params.url then
  368. local connect_prefix = "http://"
  369. if settings.use_https then
  370. connect_prefix = 'https://'
  371. end
  372. local ip_addr = upstream:get_addr():to_string(true)
  373. http_params.url = connect_prefix .. ip_addr .. '/?default_format=JSONEachRow'
  374. end
  375. return rspamd_http.request(http_params)
  376. end
  377. return exports