123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506 |
- --[[
- Copyright (c) 2018, Vsevolod Stakhov <vsevolod@highsecure.ru>
- Copyright (c) 2018, Mikhail Galanin <mgalanin@mimecast.com>
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ]]--
-
- --[[[
- -- @module lua_clickhouse
- -- This module contains Clickhouse access functions
- --]]
-
- local rspamd_logger = require "rspamd_logger"
- local rspamd_http = require "rspamd_http"
- local lua_util = require "lua_util"
- local rspamd_text = require "rspamd_text"
-
- local exports = {}
- local N = 'clickhouse'
-
- local default_timeout = 10.0
-
- local function escape_spaces(query)
- return query:gsub('%s', '%%20')
- end
-
- local function ch_number(a)
- if (a+2^52)-2^52 == a then
- -- Integer
- return tostring(math.floor(a))
- end
-
- return tostring(a)
- end
-
- local function clickhouse_quote(str)
- if str then
- return str:gsub('[\'\\\n\t]', {
- ['\''] = [[\']],
- ['\\'] = [[\\]],
- ['\n'] = [[\n]],
- ['\t'] = [[\t]],
- })
- end
-
- return ''
- end
-
- -- Converts an array to a string suitable for clickhouse
- local function array_to_string(ar)
- for i,elt in ipairs(ar) do
- if type(elt) == 'string' then
- ar[i] = '\'' .. clickhouse_quote(elt) .. '\''
- elseif type(elt) == 'number' then
- ar[i] = ch_number(elt)
- end
- end
-
- return table.concat(ar, ',')
- end
-
- -- Converts a row into TSV, taking extra care about arrays
- local function row_to_tsv(row)
-
- for i,elt in ipairs(row) do
- if type(elt) == 'table' then
- row[i] = '[' .. array_to_string(elt) .. ']'
- elseif type(elt) == 'number' then
- row[i] = ch_number(elt)
- else
- row[i] = clickhouse_quote(elt)
- end
- end
-
- return rspamd_text.fromtable(row, '\t')
- end
-
- exports.row_to_tsv = row_to_tsv
-
- -- Parses JSONEachRow reply from CH
- local function parse_clickhouse_response_json_eachrow(params, data)
- local ucl = require "ucl"
-
- if data == nil then
- -- clickhouse returned no data (i.e. empty result set): exiting
- return {}
- end
-
- local function parse_string(s)
- local parser = ucl.parser()
- local res, err = parser:parse_string(s)
- if not res then
- rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
- return nil
- end
- return parser:get_object()
- end
-
- -- iterate over rows and parse
- local ch_rows = lua_util.str_split(data, "\n")
- local parsed_rows = {}
- for _, plain_row in pairs(ch_rows) do
- if plain_row and plain_row:len() > 1 then
- local parsed_row = parse_string(plain_row)
- if parsed_row then
- table.insert(parsed_rows, parsed_row)
- end
- end
- end
-
- return parsed_rows
- end
-
- -- Parses JSON reply from CH
- local function parse_clickhouse_response_json(params, data)
- local ucl = require "ucl"
-
- if data == nil then
- -- clickhouse returned no data (i.e. empty result set) considered valid!
- return nil, {}
- end
-
- local function parse_string(s)
- local parser = ucl.parser()
- local res, err = parser:parse_string(s)
- if not res then
- rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
- return nil
- end
- return parser:get_object()
- end
-
- local json = parse_string(data)
-
- if not json then
- return 'bad json', {}
- end
-
- return nil,json
- end
-
- -- Helper to generate HTTP closure
- local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
- local function http_cb(err_message, code, data, _)
- if code ~= 200 or err_message then
- if not err_message then err_message = data end
- local ip_addr = upstream:get_addr():to_string(true)
-
- if fail_cb then
- fail_cb(params, err_message, data)
- else
- rspamd_logger.errx(params.log_obj,
- "request failed on clickhouse server %s: %s",
- ip_addr, err_message)
- end
- upstream:fail()
- else
- upstream:ok()
- local rows = parse_clickhouse_response_json_eachrow(params, data)
-
- if rows then
- if ok_cb then
- ok_cb(params, rows)
- else
- lua_util.debugm(N, params.log_obj,
- "http_select_cb ok: %s, %s, %s, %s", err_message, code,
- data:gsub('[\n%s]+', ' '), _)
- end
- else
- if fail_cb then
- fail_cb(params, 'failed to parse reply', data)
- else
- local ip_addr = upstream:get_addr():to_string(true)
- rspamd_logger.errx(params.log_obj,
- "request failed on clickhouse server %s: %s",
- ip_addr, 'failed to parse reply')
- end
- end
- end
- end
-
- return http_cb
- end
-
- -- Helper to generate HTTP closure
- local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
- local function http_cb(err_message, code, data, _)
- if code ~= 200 or err_message then
- if not err_message then err_message = data end
- local ip_addr = upstream:get_addr():to_string(true)
-
- if fail_cb then
- fail_cb(params, err_message, data)
- else
- rspamd_logger.errx(params.log_obj,
- "request failed on clickhouse server %s: %s",
- ip_addr, err_message)
- end
- upstream:fail()
- else
- upstream:ok()
-
- if ok_cb then
- local err,parsed = parse_clickhouse_response_json(data)
-
- if err then
- fail_cb(params, err, data)
- else
- ok_cb(params, parsed)
- end
-
- else
- lua_util.debugm(N, params.log_obj,
- "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
- data:gsub('[\n%s]+', ' '), _)
- end
- end
- end
-
- return http_cb
- end
-
- --[[[
- -- @function lua_clickhouse.select(upstream, settings, params, query,
- ok_cb, fail_cb)
- -- Make select request to clickhouse
- -- @param {upstream} upstream clickhouse server upstream
- -- @param {table} settings global settings table:
- -- * use_gsip: use gzip compression
- -- * timeout: request timeout
- -- * no_ssl_verify: skip SSL verification
- -- * user: HTTP user
- -- * password: HTTP password
- -- @param {params} HTTP request params
- -- @param {string} query select query (passed in HTTP body)
- -- @param {function} ok_cb callback to be called in case of success
- -- @param {function} fail_cb callback to be called in case of some error
- -- @return {boolean} whether a connection was successful
- -- @example
- --
- --]]
- exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
- local http_params = {}
-
- for k,v in pairs(params) do http_params[k] = v end
-
- http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb)
- http_params.gzip = settings.use_gzip
- http_params.mime_type = 'text/plain'
- http_params.timeout = settings.timeout or default_timeout
- http_params.no_ssl_verify = settings.no_ssl_verify
- http_params.user = settings.user
- http_params.password = settings.password
- http_params.body = query
- http_params.log_obj = params.task or params.config
-
- lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
-
- if not http_params.url then
- local connect_prefix = "http://"
- if settings.use_https then
- connect_prefix = 'https://'
- end
- local ip_addr = upstream:get_addr():to_string(true)
- local database = settings.database or 'default'
- http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
- connect_prefix, ip_addr, escape_spaces(database))
- end
-
- return rspamd_http.request(http_params)
- end
-
- --[[[
- -- @function lua_clickhouse.select_sync(upstream, settings, params, query,
- ok_cb, fail_cb)
- -- Make select request to clickhouse
- -- @param {upstream} upstream clickhouse server upstream
- -- @param {table} settings global settings table:
- -- * use_gsip: use gzip compression
- -- * timeout: request timeout
- -- * no_ssl_verify: skip SSL verification
- -- * user: HTTP user
- -- * password: HTTP password
- -- @param {params} HTTP request params
- -- @param {string} query select query (passed in HTTP body)
- -- @param {function} ok_cb callback to be called in case of success
- -- @param {function} fail_cb callback to be called in case of some error
- -- @return
- -- {string} error message if exists
- -- nil | {rows} | {http_response}
- -- @example
- --
- --]]
- exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_cb)
- local http_params = {}
-
- for k,v in pairs(params) do http_params[k] = v end
-
- http_params.gzip = settings.use_gzip
- http_params.mime_type = 'text/plain'
- http_params.timeout = settings.timeout or default_timeout
- http_params.no_ssl_verify = settings.no_ssl_verify
- http_params.user = settings.user
- http_params.password = settings.password
- http_params.body = query
- http_params.log_obj = params.task or params.config
-
- lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
-
- if not http_params.url then
- local connect_prefix = "http://"
- if settings.use_https then
- connect_prefix = 'https://'
- end
- local ip_addr = upstream:get_addr():to_string(true)
- local database = settings.database or 'default'
- http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
- connect_prefix, ip_addr, escape_spaces(database))
- end
-
- local err, response = rspamd_http.request(http_params)
-
- if err then
- return err, nil
- elseif response.code ~= 200 then
- return response.content, response
- else
- lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response)
- local rows = parse_clickhouse_response_json_eachrow(params, response.content)
- return nil, rows
- end
- end
-
- --[[[
- -- @function lua_clickhouse.insert(upstream, settings, params, query, rows,
- ok_cb, fail_cb)
- -- Insert data rows to clickhouse
- -- @param {upstream} upstream clickhouse server upstream
- -- @param {table} settings global settings table:
- -- * use_gsip: use gzip compression
- -- * timeout: request timeout
- -- * no_ssl_verify: skip SSL verification
- -- * user: HTTP user
- -- * password: HTTP password
- -- @param {params} HTTP request params
- -- @param {string} query select query (passed in `query` request element with spaces escaped)
- -- @param {table|mixed} rows mix of strings, numbers or tables (for arrays)
- -- @param {function} ok_cb callback to be called in case of success
- -- @param {function} fail_cb callback to be called in case of some error
- -- @return {boolean} whether a connection was successful
- -- @example
- --
- --]]
- exports.insert = function (upstream, settings, params, query, rows,
- ok_cb, fail_cb)
- local http_params = {}
-
- for k,v in pairs(params) do http_params[k] = v end
-
- http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
- http_params.gzip = settings.use_gzip
- http_params.mime_type = 'text/plain'
- http_params.timeout = settings.timeout or default_timeout
- http_params.no_ssl_verify = settings.no_ssl_verify
- http_params.user = settings.user
- http_params.password = settings.password
- http_params.method = 'POST'
- http_params.body = {rspamd_text.fromtable(rows, '\n'), '\n'}
- http_params.log_obj = params.task or params.config
-
- if not http_params.url then
- local connect_prefix = "http://"
- if settings.use_https then
- connect_prefix = 'https://'
- end
- local ip_addr = upstream:get_addr():to_string(true)
- local database = settings.database or 'default'
- http_params.url = string.format('%s%s/?database=%s&query=%s%%20FORMAT%%20TabSeparated',
- connect_prefix,
- ip_addr,
- escape_spaces(database),
- escape_spaces(query))
- end
-
- return rspamd_http.request(http_params)
- end
-
- --[[[
- -- @function lua_clickhouse.generic(upstream, settings, params, query,
- ok_cb, fail_cb)
- -- Make a generic request to Clickhouse (e.g. alter)
- -- @param {upstream} upstream clickhouse server upstream
- -- @param {table} settings global settings table:
- -- * use_gsip: use gzip compression
- -- * timeout: request timeout
- -- * no_ssl_verify: skip SSL verification
- -- * user: HTTP user
- -- * password: HTTP password
- -- @param {params} HTTP request params
- -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
- -- @param {function} ok_cb callback to be called in case of success
- -- @param {function} fail_cb callback to be called in case of some error
- -- @return {boolean} whether a connection was successful
- -- @example
- --
- --]]
- exports.generic = function (upstream, settings, params, query,
- ok_cb, fail_cb)
- local http_params = {}
-
- for k,v in pairs(params) do http_params[k] = v end
-
- http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
- http_params.gzip = settings.use_gzip
- http_params.mime_type = 'text/plain'
- http_params.timeout = settings.timeout or default_timeout
- http_params.no_ssl_verify = settings.no_ssl_verify
- http_params.user = settings.user
- http_params.password = settings.password
- http_params.log_obj = params.task or params.config
- http_params.body = query
-
- if not http_params.url then
- local connect_prefix = "http://"
- if settings.use_https then
- connect_prefix = 'https://'
- end
- local ip_addr = upstream:get_addr():to_string(true)
- local database = settings.database or 'default'
- http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
- connect_prefix, ip_addr, escape_spaces(database))
- end
-
- return rspamd_http.request(http_params)
- end
-
- --[[[
- -- @function lua_clickhouse.generic_sync(upstream, settings, params, query,
- ok_cb, fail_cb)
- -- Make a generic request to Clickhouse (e.g. alter)
- -- @param {upstream} upstream clickhouse server upstream
- -- @param {table} settings global settings table:
- -- * use_gsip: use gzip compression
- -- * timeout: request timeout
- -- * no_ssl_verify: skip SSL verification
- -- * user: HTTP user
- -- * password: HTTP password
- -- @param {params} HTTP request params
- -- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
- -- @return {boolean} whether a connection was successful
- -- @example
- --
- --]]
- exports.generic_sync = function (upstream, settings, params, query)
- local http_params = {}
-
- for k,v in pairs(params) do http_params[k] = v end
-
- http_params.gzip = settings.use_gzip
- http_params.mime_type = 'text/plain'
- http_params.timeout = settings.timeout or default_timeout
- http_params.no_ssl_verify = settings.no_ssl_verify
- http_params.user = settings.user
- http_params.password = settings.password
- http_params.log_obj = params.task or params.config
- http_params.body = query
-
- if not http_params.url then
- local connect_prefix = "http://"
- if settings.use_https then
- connect_prefix = 'https://'
- end
- local ip_addr = upstream:get_addr():to_string(true)
- local database = settings.database or 'default'
- http_params.url = string.format('%s%s/?database=%s&default_format=JSON',
- connect_prefix, ip_addr, escape_spaces(database))
- end
-
- local err, response = rspamd_http.request(http_params)
-
- if err then
- return err, nil
- elseif response.code ~= 200 then
- return response.content, response
- else
- lua_util.debugm(N, http_params.log_obj, "clickhouse generic response: %1", response)
- local e,obj = parse_clickhouse_response_json(params, response.content)
-
- if e then
- return e,nil
- end
- return nil, obj
- end
- end
-
- return exports
|