From ea7bb4922e4f2fa5ae64d0c5d740fbd3e8f55a3a Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Fri, 3 Aug 2018 19:14:33 +0100 Subject: [PATCH] [Project] Start Clickhouse utilities library --- lualib/lua_clickhouse.lua | 182 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 182 insertions(+) create mode 100644 lualib/lua_clickhouse.lua diff --git a/lualib/lua_clickhouse.lua b/lualib/lua_clickhouse.lua new file mode 100644 index 000000000..94cf1c5fb --- /dev/null +++ b/lualib/lua_clickhouse.lua @@ -0,0 +1,182 @@ +--[[ +Copyright (c) 2018, Vsevolod Stakhov +Copyright (c) 2018, Mikhail Galanin + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +]]-- + +local rspamd_logger = require "rspamd_logger" +local rspamd_http = require "rspamd_http" + +local exports = {} +local N = 'clickhouse' + +local default_timeout = 10.0 + +local function escape_spaces(query) + return query:gsub('%s', '%%20') +end + +local function clickhouse_quote(str) + if str then + return str:gsub('[\'\\]', '\\%1'):lower() + end + + return '' +end + +-- Converts an array to a string suitable for clickhouse +local function array_to_string(ar) + for i,elt in ipairs(ar) do + if type(elt) == 'string' then + ar[i] = '\'' .. clickhouse_quote(elt) .. '\'' + else + ar[i] = tostring(elt) + end + end + + return table.concat(ar, ',') +end + +-- Converts a row into TSV, taking extra care about arrays +local function row_to_tsv(row) + + for i,elt in ipairs(row) do + if type(elt) == 'table' then + row[i] = '[' .. array_to_string(elt) .. ']' + else + row[i] = tostring(elt) -- Assume there are no tabs there + end + end + + return table.concat(row, '\t') +end + +local function parse_clickhouse_response(params, data) + local lua_util = require "lua_util" + local ucl = require "ucl" + + rspamd_logger.debugm(N, params.log_obj, "got clickhouse response: %s", data) + if data == nil then + -- clickhouse returned no data (i.e. empty result set): exiting + return {} + end + + local function parse_string(s) + local parser = ucl.parser() + local res, err = parser:parse_string(s) + if not res then + rspamd_logger.errx(params.log_obj, 'Parser error: %s', err) + return nil + end + return parser:get_object() + end + + -- iterate over rows and parse + local ch_rows = lua_util.str_split(data, "\n") + local parsed_rows = {} + for _, plain_row in pairs(ch_rows) do + if plain_row and plain_row:len() > 1 then + local parsed_row = parse_string(plain_row) + if parsed_row then + table.insert(parsed_rows, parsed_row) + end + end + end + + return parsed_rows +end + +local function mk_http_select_cb(upstream, params, ok_cb, fail_cb) + local function http_cb(err_message, code, data, _) + if code ~= 200 or err_message then + if not err_message then err_message = data end + local ip_addr = upstream:get_addr():to_string(true) + rspamd_logger.errx(params.log_obj, + "request failed on clickhouse server %s: %s", + ip_addr, err_message) + + if fail_cb then + fail_cb(params, err_message, data) + end + upstream:fail() + else + upstream:ok() + rspamd_logger.debugm(N, params.log_obj, + "http_cb ok: %s, %s, %s, %s", err_message, code, data, _) + local rows = parse_clickhouse_response(params, data) + + if rows then + if ok_cb then + ok_cb(params, rows) + end + else + if fail_cb then + fail_cb(params, 'failed to parse reply', data) + end + end + end + end + + return http_cb +end + +--[[[ +-- @function lua_clickhouse.select_request(upstream, settings, params, query, + ok_cb, fail_cb) +-- Make select request to clickhouse +-- @param {upstream} upstream clickhouse server upstream +-- @param {table} settings global settings table: +-- * use_gsip: use gzip compression +-- * timeout: request timeout +-- * no_ssl_verify: skip SSL verification +-- * user: HTTP user +-- * password: HTTP password +-- @param {params} HTTP request params +-- @param {function} ok_cb callback to be called in case of success +-- @param {function} fail_cb callback to be called in case of some error +-- @return {boolean} whether a connection was successful +-- @example +-- +--]] + +exports.select_request = function (upstream, settings, params, query, ok_cb, fail_cb) + local http_params = {} + + for k,v in pairs(params) do http_params[k] = v end + + http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb) + http_params.gzip = settings.use_gzip + http_params.mime_type = 'text/plain' + http_params.timeout = settings.timeout or default_timeout + http_params.no_ssl_verify = settings.no_ssl_verify + http_params.user = settings.user + http_params.password = settings.password + http_params.body = query + http_params.log_obj = params.task or params.config + + rspamd_logger.debugm(N, http_params.log_obj, "clickhouse_request: %s", params.body) + + if not http_params.url then + local connect_prefix = "http://" + if settings.use_https then + connect_prefix = 'https://' + end + local ip_addr = upstream:get_addr():to_string(true) + http_params.url = connect_prefix .. ip_addr .. '/?default_format=JSONEachRow' + end + + return rspamd_http.request(http_params) +end + +return exports \ No newline at end of file -- 2.39.5