diff options
Diffstat (limited to 'lualib')
-rw-r--r-- | lualib/lua_clickhouse.lua | 24 |
1 files changed, 15 insertions, 9 deletions
diff --git a/lualib/lua_clickhouse.lua b/lualib/lua_clickhouse.lua index 09989131a..e0d4c5618 100644 --- a/lualib/lua_clickhouse.lua +++ b/lualib/lua_clickhouse.lua @@ -95,7 +95,7 @@ end exports.row_to_tsv = row_to_tsv -- Parses JSONEachRow reply from CH -local function parse_clickhouse_response_json_eachrow(params, data) +local function parse_clickhouse_response_json_eachrow(params, data, row_cb) local ucl = require "ucl" if data == nil then @@ -125,7 +125,11 @@ local function parse_clickhouse_response_json_eachrow(params, data) if plain_row and #plain_row > 1 then local parsed_row = parse_string(plain_row) if parsed_row then - table.insert(parsed_rows, parsed_row) + if row_cb then + row_cb(parsed_row) + else + table.insert(parsed_rows, parsed_row) + end end end end @@ -169,7 +173,7 @@ local function parse_clickhouse_response_json(params, data) end -- Helper to generate HTTP closure -local function mk_http_select_cb(upstream, params, ok_cb, fail_cb) +local function mk_http_select_cb(upstream, params, ok_cb, fail_cb, row_cb) local function http_cb(err_message, code, data, _) if code ~= 200 or err_message then if not err_message then err_message = data end @@ -185,7 +189,7 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb) upstream:fail() else upstream:ok() - local rows = parse_clickhouse_response_json_eachrow(params, data) + local rows = parse_clickhouse_response_json_eachrow(params, data, row_cb) if rows then if ok_cb then @@ -264,16 +268,17 @@ end -- @param {string} query select query (passed in HTTP body) -- @param {function} ok_cb callback to be called in case of success -- @param {function} fail_cb callback to be called in case of some error +-- @param {function} row_cb optional callback to be called on each parsed data row (instead of table insertion) -- @return {boolean} whether a connection was successful -- @example -- --]] -exports.select = function (upstream, settings, params, query, ok_cb, fail_cb) +exports.select = function (upstream, settings, params, query, ok_cb, fail_cb, row_cb) local http_params = {} for k,v in pairs(params) do http_params[k] = v end - http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb) + http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb, row_cb) http_params.gzip = settings.use_gzip http_params.mime_type = 'text/plain' http_params.timeout = settings.timeout or default_timeout @@ -302,7 +307,7 @@ end --[[[ -- @function lua_clickhouse.select_sync(upstream, settings, params, query, - ok_cb, fail_cb) + ok_cb, fail_cb, row_cb) -- Make select request to clickhouse -- @param {upstream} upstream clickhouse server upstream -- @param {table} settings global settings table: @@ -315,13 +320,14 @@ end -- @param {string} query select query (passed in HTTP body) -- @param {function} ok_cb callback to be called in case of success -- @param {function} fail_cb callback to be called in case of some error +-- @param {function} row_cb optional callback to be called on each parsed data row (instead of table insertion) -- @return -- {string} error message if exists -- nil | {rows} | {http_response} -- @example -- --]] -exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_cb) +exports.select_sync = function (upstream, settings, params, query, row_cb) local http_params = {} for k,v in pairs(params) do http_params[k] = v end @@ -357,7 +363,7 @@ exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_c return response.content, response else lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response) - local rows = parse_clickhouse_response_json_eachrow(params, response.content) + local rows = parse_clickhouse_response_json_eachrow(params, response.content, row_cb) return nil, rows end end |