From 6c18baf8309664f5cafb3d61fa2f2376f4347cd6 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 15 Apr 2019 17:32:44 +0100 Subject: [PATCH] [Fix] Lua_clickhouse: Fix CH errors processing --- lualib/lua_clickhouse.lua | 65 +++++++++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/lualib/lua_clickhouse.lua b/lualib/lua_clickhouse.lua index 96ea59f02..7064ac925 100644 --- a/lualib/lua_clickhouse.lua +++ b/lualib/lua_clickhouse.lua @@ -78,7 +78,7 @@ local function row_to_tsv(row) end -- Parses JSONEachRow reply from CH -local function parse_clickhouse_response(params, data) +local function parse_clickhouse_response_json_eachrow(params, data) local ucl = require "ucl" if data == nil then @@ -111,6 +111,38 @@ local function parse_clickhouse_response(params, data) return parsed_rows end +-- Parses JSON reply from CH +local function parse_clickhouse_response_json(params, data) + local ucl = require "ucl" + + if data == nil then + -- clickhouse returned no data (i.e. empty result set): exiting + return 'no data', {} + end + + if data:match('DB::Exception') then + return data, {} + end + + local function parse_string(s) + local parser = ucl.parser() + local res, err = parser:parse_string(s) + if not res then + rspamd_logger.errx(params.log_obj, 'Parser error: %s', err) + return nil + end + return parser:get_object() + end + + local json = parse_string(data) + + if not json then + return 'bad json', {} + end + + return parsed_rows +end + -- Helper to generate HTTP closure local function mk_http_select_cb(upstream, params, ok_cb, fail_cb) local function http_cb(err_message, code, data, _) @@ -128,7 +160,7 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb) upstream:fail() else upstream:ok() - local rows = parse_clickhouse_response(params, data) + local rows = parse_clickhouse_response_json_eachrow(params, data) if rows then if ok_cb then @@ -173,7 +205,14 @@ local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb) upstream:ok() if ok_cb then - ok_cb(params, data) + local err,parsed = parse_clickhouse_response_json(data) + + if err then + fail_cb(params, err, data) + else + ok_cb(params, parsed) + end + else lua_util.debugm(N, params.log_obj, "http_insert_cb ok: %s, %s, %s, %s", err_message, code, @@ -291,7 +330,7 @@ exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_c return response.content, response else lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response) - local rows = parse_clickhouse_response(params, response.content) + local rows = parse_clickhouse_response_json_eachrow(params, response.content) return nil, rows end end @@ -440,11 +479,25 @@ exports.generic_sync = function (upstream, settings, params, query) end local ip_addr = upstream:get_addr():to_string(true) local database = settings.database or 'default' - http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow', + http_params.url = string.format('%s%s/?database=%s&default_format=JSON', connect_prefix, ip_addr, escape_spaces(database)) end - return rspamd_http.request(http_params) + local err, response = rspamd_http.request(http_params) + + if err then + return err, nil + elseif response.code ~= 200 then + return response.content, response + else + lua_util.debugm(N, http_params.log_obj, "clickhouse generic response: %1", response) + local e,obj = parse_clickhouse_response_json(params, response.content) + + if e then + return e,nil + end + return nil, obj + end end return exports \ No newline at end of file -- 2.39.5