You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

lua_clickhouse.lua 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. --[[
  2. Copyright (c) 2018, Vsevolod Stakhov <vsevolod@highsecure.ru>
  3. Copyright (c) 2018, Mikhail Galanin <mgalanin@mimecast.com>
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. ]]--
  14. local rspamd_logger = require "rspamd_logger"
  15. local rspamd_http = require "rspamd_http"
  16. local exports = {}
  17. local N = 'clickhouse'
  18. local default_timeout = 10.0
  19. local function escape_spaces(query)
  20. return query:gsub('%s', '%%20')
  21. end
  22. local function ch_number(a)
  23. if (a+2^52)-2^52 == a then
  24. -- Integer
  25. return tostring(math.floor(a))
  26. end
  27. return tostring(a)
  28. end
  29. local function clickhouse_quote(str)
  30. if str then
  31. return str:gsub('[\'\\]', '\\%1'):lower()
  32. end
  33. return ''
  34. end
  35. -- Converts an array to a string suitable for clickhouse
  36. local function array_to_string(ar)
  37. for i,elt in ipairs(ar) do
  38. if type(elt) == 'string' then
  39. ar[i] = '\'' .. clickhouse_quote(elt) .. '\''
  40. elseif type(elt) == 'number' then
  41. ar[i] = ch_number(elt)
  42. end
  43. end
  44. return table.concat(ar, ',')
  45. end
  46. -- Converts a row into TSV, taking extra care about arrays
  47. local function row_to_tsv(row)
  48. for i,elt in ipairs(row) do
  49. if type(elt) == 'table' then
  50. row[i] = '[' .. array_to_string(elt) .. ']'
  51. elseif type(elt) == 'number' then
  52. row[i] = ch_number(elt)
  53. end
  54. end
  55. return table.concat(row, '\t')
  56. end
  57. -- Parses JSONEachRow reply from CH
  58. local function parse_clickhouse_response(params, data)
  59. local lua_util = require "lua_util"
  60. local ucl = require "ucl"
  61. if data == nil then
  62. -- clickhouse returned no data (i.e. empty result set): exiting
  63. return {}
  64. end
  65. local function parse_string(s)
  66. local parser = ucl.parser()
  67. local res, err = parser:parse_string(s)
  68. if not res then
  69. rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
  70. return nil
  71. end
  72. return parser:get_object()
  73. end
  74. -- iterate over rows and parse
  75. local ch_rows = lua_util.str_split(data, "\n")
  76. local parsed_rows = {}
  77. for _, plain_row in pairs(ch_rows) do
  78. if plain_row and plain_row:len() > 1 then
  79. local parsed_row = parse_string(plain_row)
  80. if parsed_row then
  81. table.insert(parsed_rows, parsed_row)
  82. end
  83. end
  84. end
  85. return parsed_rows
  86. end
  87. -- Helper to generate HTTP closure
  88. local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
  89. local function http_cb(err_message, code, data, _)
  90. if code ~= 200 or err_message then
  91. if not err_message then err_message = data end
  92. local ip_addr = upstream:get_addr():to_string(true)
  93. if fail_cb then
  94. fail_cb(params, err_message, data)
  95. else
  96. rspamd_logger.errx(params.log_obj,
  97. "request failed on clickhouse server %s: %s",
  98. ip_addr, err_message)
  99. end
  100. upstream:fail()
  101. else
  102. upstream:ok()
  103. local rows = parse_clickhouse_response(params, data)
  104. if rows then
  105. if ok_cb then
  106. ok_cb(params, rows)
  107. else
  108. rspamd_logger.debugm(N, params.log_obj,
  109. "http_select_cb ok: %s, %s, %s, %s", err_message, code,
  110. data:gsub('[\n%s]+', ' '), _)
  111. end
  112. else
  113. if fail_cb then
  114. fail_cb(params, 'failed to parse reply', data)
  115. else
  116. local ip_addr = upstream:get_addr():to_string(true)
  117. rspamd_logger.errx(params.log_obj,
  118. "request failed on clickhouse server %s: %s",
  119. ip_addr, 'failed to parse reply')
  120. end
  121. end
  122. end
  123. end
  124. return http_cb
  125. end
  126. -- Helper to generate HTTP closure
  127. local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
  128. local function http_cb(err_message, code, data, _)
  129. if code ~= 200 or err_message then
  130. if not err_message then err_message = data end
  131. local ip_addr = upstream:get_addr():to_string(true)
  132. if fail_cb then
  133. fail_cb(params, err_message, data)
  134. else
  135. rspamd_logger.errx(params.log_obj,
  136. "request failed on clickhouse server %s: %s",
  137. ip_addr, err_message)
  138. end
  139. upstream:fail()
  140. else
  141. upstream:ok()
  142. if ok_cb then
  143. ok_cb(params, data)
  144. else
  145. rspamd_logger.debugm(N, params.log_obj,
  146. "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
  147. data:gsub('[\n%s]+', ' '), _)
  148. end
  149. end
  150. end
  151. return http_cb
  152. end
  153. --[[[
  154. -- @function lua_clickhouse.select(upstream, settings, params, query,
  155. ok_cb, fail_cb)
  156. -- Make select request to clickhouse
  157. -- @param {upstream} upstream clickhouse server upstream
  158. -- @param {table} settings global settings table:
  159. -- * use_gsip: use gzip compression
  160. -- * timeout: request timeout
  161. -- * no_ssl_verify: skip SSL verification
  162. -- * user: HTTP user
  163. -- * password: HTTP password
  164. -- @param {params} HTTP request params
  165. -- @param {string} query select query (passed in HTTP body)
  166. -- @param {function} ok_cb callback to be called in case of success
  167. -- @param {function} fail_cb callback to be called in case of some error
  168. -- @return {boolean} whether a connection was successful
  169. -- @example
  170. --
  171. --]]
  172. exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
  173. local http_params = {}
  174. for k,v in pairs(params) do http_params[k] = v end
  175. http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb)
  176. http_params.gzip = settings.use_gzip
  177. http_params.mime_type = 'text/plain'
  178. http_params.timeout = settings.timeout or default_timeout
  179. http_params.no_ssl_verify = settings.no_ssl_verify
  180. http_params.user = settings.user
  181. http_params.password = settings.password
  182. http_params.body = query
  183. http_params.log_obj = params.task or params.config
  184. rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
  185. if not http_params.url then
  186. local connect_prefix = "http://"
  187. if settings.use_https then
  188. connect_prefix = 'https://'
  189. end
  190. local ip_addr = upstream:get_addr():to_string(true)
  191. http_params.url = connect_prefix .. ip_addr .. '/?default_format=JSONEachRow'
  192. end
  193. return rspamd_http.request(http_params)
  194. end
  195. --[[[
  196. -- @function lua_clickhouse.insert(upstream, settings, params, query, rows,
  197. ok_cb, fail_cb)
  198. -- Insert data rows to clickhouse
  199. -- @param {upstream} upstream clickhouse server upstream
  200. -- @param {table} settings global settings table:
  201. -- * use_gsip: use gzip compression
  202. -- * timeout: request timeout
  203. -- * no_ssl_verify: skip SSL verification
  204. -- * user: HTTP user
  205. -- * password: HTTP password
  206. -- @param {params} HTTP request params
  207. -- @param {string} query select query (passed in `query` request element with spaces escaped)
  208. -- @param {table|mixed} rows mix of strings, numbers or tables (for arrays)
  209. -- @param {function} ok_cb callback to be called in case of success
  210. -- @param {function} fail_cb callback to be called in case of some error
  211. -- @return {boolean} whether a connection was successful
  212. -- @example
  213. --
  214. --]]
  215. exports.insert = function (upstream, settings, params, query, rows,
  216. ok_cb, fail_cb)
  217. local fun = require "fun"
  218. local http_params = {}
  219. for k,v in pairs(params) do http_params[k] = v end
  220. http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
  221. http_params.gzip = settings.use_gzip
  222. http_params.mime_type = 'text/plain'
  223. http_params.timeout = settings.timeout or default_timeout
  224. http_params.no_ssl_verify = settings.no_ssl_verify
  225. http_params.user = settings.user
  226. http_params.password = settings.password
  227. http_params.method = 'POST'
  228. http_params.body = {table.concat(fun.totable(fun.map(function(row)
  229. return row_to_tsv(row)
  230. end, rows)), '\n'), '\n'}
  231. http_params.log_obj = params.task or params.config
  232. if not http_params.url then
  233. local connect_prefix = "http://"
  234. if settings.use_https then
  235. connect_prefix = 'https://'
  236. end
  237. local ip_addr = upstream:get_addr():to_string(true)
  238. http_params.url = string.format('%s%s/?query=%s%%20FORMAT%%20TabSeparated',
  239. connect_prefix,
  240. ip_addr,
  241. escape_spaces(query))
  242. end
  243. return rspamd_http.request(http_params)
  244. end
  245. --[[[
  246. -- @function lua_clickhouse.generic(upstream, settings, params, query,
  247. ok_cb, fail_cb)
  248. -- Make a generic request to Clickhouse (e.g. alter)
  249. -- @param {upstream} upstream clickhouse server upstream
  250. -- @param {table} settings global settings table:
  251. -- * use_gsip: use gzip compression
  252. -- * timeout: request timeout
  253. -- * no_ssl_verify: skip SSL verification
  254. -- * user: HTTP user
  255. -- * password: HTTP password
  256. -- @param {params} HTTP request params
  257. -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
  258. -- @param {function} ok_cb callback to be called in case of success
  259. -- @param {function} fail_cb callback to be called in case of some error
  260. -- @return {boolean} whether a connection was successful
  261. -- @example
  262. --
  263. --]]
  264. exports.generic = function (upstream, settings, params, query,
  265. ok_cb, fail_cb)
  266. local http_params = {}
  267. for k,v in pairs(params) do http_params[k] = v end
  268. http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
  269. http_params.gzip = settings.use_gzip
  270. http_params.mime_type = 'text/plain'
  271. http_params.timeout = settings.timeout or default_timeout
  272. http_params.no_ssl_verify = settings.no_ssl_verify
  273. http_params.user = settings.user
  274. http_params.password = settings.password
  275. http_params.log_obj = params.task or params.config
  276. http_params.body = query
  277. if not http_params.url then
  278. local connect_prefix = "http://"
  279. if settings.use_https then
  280. connect_prefix = 'https://'
  281. end
  282. local ip_addr = upstream:get_addr():to_string(true)
  283. http_params.url = connect_prefix .. ip_addr .. '/?default_format=JSONEachRow'
  284. end
  285. return rspamd_http.request(http_params)
  286. end
  287. return exports