aboutsummaryrefslogtreecommitdiffstats
path: root/lualib/lua_clickhouse.lua
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2019-04-15 17:32:44 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2019-04-15 17:32:44 +0100
commit6c18baf8309664f5cafb3d61fa2f2376f4347cd6 (patch)
tree10e78a50f97fbd11de8941b8552b0aa4d8ed2fcd /lualib/lua_clickhouse.lua
parentf550bb866341e4b48037d6bf946433956e060034 (diff)
downloadrspamd-6c18baf8309664f5cafb3d61fa2f2376f4347cd6.tar.gz
rspamd-6c18baf8309664f5cafb3d61fa2f2376f4347cd6.zip
[Fix] Lua_clickhouse: Fix CH errors processing
Diffstat (limited to 'lualib/lua_clickhouse.lua')
-rw-r--r--lualib/lua_clickhouse.lua65
1 files changed, 59 insertions, 6 deletions
diff --git a/lualib/lua_clickhouse.lua b/lualib/lua_clickhouse.lua
index 96ea59f02..7064ac925 100644
--- a/lualib/lua_clickhouse.lua
+++ b/lualib/lua_clickhouse.lua
@@ -78,7 +78,7 @@ local function row_to_tsv(row)
end
-- Parses JSONEachRow reply from CH
-local function parse_clickhouse_response(params, data)
+local function parse_clickhouse_response_json_eachrow(params, data)
local ucl = require "ucl"
if data == nil then
@@ -111,6 +111,38 @@ local function parse_clickhouse_response(params, data)
return parsed_rows
end
+-- Parses JSON reply from CH
+local function parse_clickhouse_response_json(params, data)
+ local ucl = require "ucl"
+
+ if data == nil then
+ -- clickhouse returned no data (i.e. empty result set): exiting
+ return 'no data', {}
+ end
+
+ if data:match('DB::Exception') then
+ return data, {}
+ end
+
+ local function parse_string(s)
+ local parser = ucl.parser()
+ local res, err = parser:parse_string(s)
+ if not res then
+ rspamd_logger.errx(params.log_obj, 'Parser error: %s', err)
+ return nil
+ end
+ return parser:get_object()
+ end
+
+ local json = parse_string(data)
+
+ if not json then
+ return 'bad json', {}
+ end
+
+ return parsed_rows
+end
+
-- Helper to generate HTTP closure
local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
local function http_cb(err_message, code, data, _)
@@ -128,7 +160,7 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
upstream:fail()
else
upstream:ok()
- local rows = parse_clickhouse_response(params, data)
+ local rows = parse_clickhouse_response_json_eachrow(params, data)
if rows then
if ok_cb then
@@ -173,7 +205,14 @@ local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
upstream:ok()
if ok_cb then
- ok_cb(params, data)
+ local err,parsed = parse_clickhouse_response_json(data)
+
+ if err then
+ fail_cb(params, err, data)
+ else
+ ok_cb(params, parsed)
+ end
+
else
lua_util.debugm(N, params.log_obj,
"http_insert_cb ok: %s, %s, %s, %s", err_message, code,
@@ -291,7 +330,7 @@ exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_c
return response.content, response
else
lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response)
- local rows = parse_clickhouse_response(params, response.content)
+ local rows = parse_clickhouse_response_json_eachrow(params, response.content)
return nil, rows
end
end
@@ -440,11 +479,25 @@ exports.generic_sync = function (upstream, settings, params, query)
end
local ip_addr = upstream:get_addr():to_string(true)
local database = settings.database or 'default'
- http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
+ http_params.url = string.format('%s%s/?database=%s&default_format=JSON',
connect_prefix, ip_addr, escape_spaces(database))
end
- return rspamd_http.request(http_params)
+ local err, response = rspamd_http.request(http_params)
+
+ if err then
+ return err, nil
+ elseif response.code ~= 200 then
+ return response.content, response
+ else
+ lua_util.debugm(N, http_params.log_obj, "clickhouse generic response: %1", response)
+ local e,obj = parse_clickhouse_response_json(params, response.content)
+
+ if e then
+ return e,nil
+ end
+ return nil, obj
+ end
end
return exports \ No newline at end of file