aboutsummaryrefslogtreecommitdiffstats
path: root/lualib/lua_clickhouse.lua
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-08-03 19:28:10 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-08-03 19:28:33 +0100
commite0a0714d1d71443e9e61b27fc638186c6434bac9 (patch)
tree07036ff1832c17f1842a3a795143c59738dfa6a6 /lualib/lua_clickhouse.lua
parentea7bb4922e4f2fa5ae64d0c5d740fbd3e8f55a3a (diff)
downloadrspamd-e0a0714d1d71443e9e61b27fc638186c6434bac9.tar.gz
rspamd-e0a0714d1d71443e9e61b27fc638186c6434bac9.zip
[Project] Implement insert method
Diffstat (limited to 'lualib/lua_clickhouse.lua')
-rw-r--r--lualib/lua_clickhouse.lua90
1 files changed, 86 insertions, 4 deletions
diff --git a/lualib/lua_clickhouse.lua b/lualib/lua_clickhouse.lua
index 94cf1c5fb..fa94e4a43 100644
--- a/lualib/lua_clickhouse.lua
+++ b/lualib/lua_clickhouse.lua
@@ -62,6 +62,7 @@ local function row_to_tsv(row)
return table.concat(row, '\t')
end
+-- Parses JSONEachRow reply from CH
local function parse_clickhouse_response(params, data)
local lua_util = require "lua_util"
local ucl = require "ucl"
@@ -97,6 +98,7 @@ local function parse_clickhouse_response(params, data)
return parsed_rows
end
+-- Helper to generate HTTP closure
local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
local function http_cb(err_message, code, data, _)
if code ~= 200 or err_message then
@@ -131,8 +133,36 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
return http_cb
end
+-- Helper to generate HTTP closure
+local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
+ local function http_cb(err_message, code, data, _)
+ if code ~= 200 or err_message then
+ if not err_message then err_message = data end
+ local ip_addr = upstream:get_addr():to_string(true)
+ rspamd_logger.errx(params.log_obj,
+ "request failed on clickhouse server %s: %s",
+ ip_addr, err_message)
+
+ if fail_cb then
+ fail_cb(params, err_message, data)
+ end
+ upstream:fail()
+ else
+ upstream:ok()
+ rspamd_logger.debugm(N, params.log_obj,
+ "http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
+
+ if ok_cb then
+ ok_cb(params, data)
+ end
+ end
+ end
+
+ return http_cb
+end
+
--[[[
--- @function lua_clickhouse.select_request(upstream, settings, params, query,
+-- @function lua_clickhouse.select(upstream, settings, params, query,
ok_cb, fail_cb)
-- Make select request to clickhouse
-- @param {upstream} upstream clickhouse server upstream
@@ -143,14 +173,14 @@ end
-- * user: HTTP user
-- * password: HTTP password
-- @param {params} HTTP request params
+-- @param {string} query select query (passed in HTTP body)
-- @param {function} ok_cb callback to be called in case of success
-- @param {function} fail_cb callback to be called in case of some error
-- @return {boolean} whether a connection was successful
-- @example
--
--]]
-
-exports.select_request = function (upstream, settings, params, query, ok_cb, fail_cb)
+exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
local http_params = {}
for k,v in pairs(params) do http_params[k] = v end
@@ -165,7 +195,7 @@ exports.select_request = function (upstream, settings, params, query, ok_cb, fai
http_params.body = query
http_params.log_obj = params.task or params.config
- rspamd_logger.debugm(N, http_params.log_obj, "clickhouse_request: %s", params.body)
+ rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", params.body)
if not http_params.url then
local connect_prefix = "http://"
@@ -179,4 +209,56 @@ exports.select_request = function (upstream, settings, params, query, ok_cb, fai
return rspamd_http.request(http_params)
end
+--[[[
+-- @function lua_clickhouse.insert(upstream, settings, params, query, rows,
+ ok_cb, fail_cb)
+-- Insert data rows to clickhouse
+-- @param {upstream} upstream clickhouse server upstream
+-- @param {table} settings global settings table:
+-- * use_gsip: use gzip compression
+-- * timeout: request timeout
+-- * no_ssl_verify: skip SSL verification
+-- * user: HTTP user
+-- * password: HTTP password
+-- @param {params} HTTP request params
+-- @param {string} query select query (passed in `query` request element with spaces escaped)
+-- @param {table|mixed} rows mix of strings, numbers or tables (for arrays)
+-- @param {function} ok_cb callback to be called in case of success
+-- @param {function} fail_cb callback to be called in case of some error
+-- @return {boolean} whether a connection was successful
+-- @example
+--
+--]]
+exports.insert = function (upstream, settings, params, query, rows,
+ ok_cb, fail_cb)
+ local fun = require "fun"
+ local http_params = {}
+
+ for k,v in pairs(params) do http_params[k] = v end
+
+ http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
+ http_params.gzip = settings.use_gzip
+ http_params.mime_type = 'text/plain'
+ http_params.timeout = settings.timeout or default_timeout
+ http_params.no_ssl_verify = settings.no_ssl_verify
+ http_params.user = settings.user
+ http_params.password = settings.password
+ http_params.body = table.concat(fun.totable(fun.map(function(row)
+ return row_to_tsv(row)
+ end), rows), '\n')
+ http_params.log_obj = params.task or params.config
+
+ if not http_params.url then
+ local connect_prefix = "http://"
+ if settings.use_https then
+ connect_prefix = 'https://'
+ end
+ local ip_addr = upstream:get_addr():to_string(true)
+ http_params.url = string.format('%s%s/?query=%s', connect_prefix,
+ ip_addr, escape_spaces(query))
+ end
+
+ rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", params.body)
+end
+
return exports \ No newline at end of file