summaryrefslogtreecommitdiffstats
path: root/lualib/lua_clickhouse.lua
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-08-03 19:14:33 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-08-03 19:28:33 +0100
commitea7bb4922e4f2fa5ae64d0c5d740fbd3e8f55a3a (patch)
treed8214b4e8ff35d3e61d61f4c2fb97ffff65a13ef /lualib/lua_clickhouse.lua
parentfd524ca77cb8e30b6d2ec8783fa834055e5f65ac (diff)
downloadrspamd-ea7bb4922e4f2fa5ae64d0c5d740fbd3e8f55a3a.tar.gz
rspamd-ea7bb4922e4f2fa5ae64d0c5d740fbd3e8f55a3a.zip
[Project] Start Clickhouse utilities library
Diffstat (limited to 'lualib/lua_clickhouse.lua')
-rw-r--r--lualib/lua_clickhouse.lua182
1 files changed, 182 insertions, 0 deletions
diff --git a/lualib/lua_clickhouse.lua b/lualib/lua_clickhouse.lua
new file mode 100644
index 000000000..94cf1c5fb
--- /dev/null
+++ b/lualib/lua_clickhouse.lua
@@ -0,0 +1,182 @@
+--[[
+Copyright (c) 2018, Vsevolod Stakhov <vsevolod@highsecure.ru>
+Copyright (c) 2018, Mikhail Galanin <mgalanin@mimecast.com>
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+]]--
+
+local rspamd_logger = require "rspamd_logger"
+local rspamd_http = require "rspamd_http"
+
+local exports = {}
+local N = 'clickhouse'
+
+local default_timeout = 10.0
+
+local function escape_spaces(query)
+ return query:gsub('%s', '%%20')
+end
+
+local function clickhouse_quote(str)
+ if str then
+ return str:gsub('[\'\\]', '\\%1'):lower()
+ end
+
+ return ''
+end
+
+-- Converts an array to a string suitable for clickhouse
+local function array_to_string(ar)
+ for i,elt in ipairs(ar) do
+ if type(elt) == 'string' then
+ ar[i] = '\'' .. clickhouse_quote(elt) .. '\''
+ else
+ ar[i] = tostring(elt)
+ end
+ end
+
+ return table.concat(ar, ',')
+end
+
+-- Converts a row into TSV, taking extra care about arrays
+local function row_to_tsv(row)
+
+ for i,elt in ipairs(row) do
+ if type(elt) == 'table' then
+ row[i] = '[' .. array_to_string(elt) .. ']'
+ else
+ row[i] = tostring(elt) -- Assume there are no tabs there
+ end
+ end
+
+ return table.concat(row, '\t')
+end
+
+local function parse_clickhouse_response(params, data)
+ local lua_util = require "lua_util"
+ local ucl = require "ucl"
+
+ rspamd_logger.debugm(N, params.log_obj, "got clickhouse response: %s", data)
+ if data == nil then
+ -- clickhouse returned no data (i.e. empty result set): exiting
+ return {}
+ end
+
+ local function parse_string(s)
+ local parser = ucl.parser()
+ local res, err = parser:parse_string(s)
+ if not res then
+ rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
+ return nil
+ end
+ return parser:get_object()
+ end
+
+ -- iterate over rows and parse
+ local ch_rows = lua_util.str_split(data, "\n")
+ local parsed_rows = {}
+ for _, plain_row in pairs(ch_rows) do
+ if plain_row and plain_row:len() > 1 then
+ local parsed_row = parse_string(plain_row)
+ if parsed_row then
+ table.insert(parsed_rows, parsed_row)
+ end
+ end
+ end
+
+ return parsed_rows
+end
+
+local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
+ local function http_cb(err_message, code, data, _)
+ if code ~= 200 or err_message then
+ if not err_message then err_message = data end
+ local ip_addr = upstream:get_addr():to_string(true)
+ rspamd_logger.errx(params.log_obj,
+ "request failed on clickhouse server %s: %s",
+ ip_addr, err_message)
+
+ if fail_cb then
+ fail_cb(params, err_message, data)
+ end
+ upstream:fail()
+ else
+ upstream:ok()
+ rspamd_logger.debugm(N, params.log_obj,
+ "http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
+ local rows = parse_clickhouse_response(params, data)
+
+ if rows then
+ if ok_cb then
+ ok_cb(params, rows)
+ end
+ else
+ if fail_cb then
+ fail_cb(params, 'failed to parse reply', data)
+ end
+ end
+ end
+ end
+
+ return http_cb
+end
+
+--[[[
+-- @function lua_clickhouse.select_request(upstream, settings, params, query,
+ ok_cb, fail_cb)
+-- Make select request to clickhouse
+-- @param {upstream} upstream clickhouse server upstream
+-- @param {table} settings global settings table:
+-- * use_gsip: use gzip compression
+-- * timeout: request timeout
+-- * no_ssl_verify: skip SSL verification
+-- * user: HTTP user
+-- * password: HTTP password
+-- @param {params} HTTP request params
+-- @param {function} ok_cb callback to be called in case of success
+-- @param {function} fail_cb callback to be called in case of some error
+-- @return {boolean} whether a connection was successful
+-- @example
+--
+--]]
+
+exports.select_request = function (upstream, settings, params, query, ok_cb, fail_cb)
+ local http_params = {}
+
+ for k,v in pairs(params) do http_params[k] = v end
+
+ http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb)
+ http_params.gzip = settings.use_gzip
+ http_params.mime_type = 'text/plain'
+ http_params.timeout = settings.timeout or default_timeout
+ http_params.no_ssl_verify = settings.no_ssl_verify
+ http_params.user = settings.user
+ http_params.password = settings.password
+ http_params.body = query
+ http_params.log_obj = params.task or params.config
+
+ rspamd_logger.debugm(N, http_params.log_obj, "clickhouse_request: %s", params.body)
+
+ if not http_params.url then
+ local connect_prefix = "http://"
+ if settings.use_https then
+ connect_prefix = 'https://'
+ end
+ local ip_addr = upstream:get_addr():to_string(true)
+ http_params.url = connect_prefix .. ip_addr .. '/?default_format=JSONEachRow'
+ end
+
+ return rspamd_http.request(http_params)
+end
+
+return exports \ No newline at end of file