From: Vsevolod Stakhov Date: Fri, 3 Aug 2018 18:28:10 +0000 (+0100) Subject: [Project] Implement insert method X-Git-Tag: 1.8.0~315 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=e0a0714d1d71443e9e61b27fc638186c6434bac9;p=rspamd.git [Project] Implement insert method --- diff --git a/lualib/lua_clickhouse.lua b/lualib/lua_clickhouse.lua index 94cf1c5fb..fa94e4a43 100644 --- a/lualib/lua_clickhouse.lua +++ b/lualib/lua_clickhouse.lua @@ -62,6 +62,7 @@ local function row_to_tsv(row) return table.concat(row, '\t') end +-- Parses JSONEachRow reply from CH local function parse_clickhouse_response(params, data) local lua_util = require "lua_util" local ucl = require "ucl" @@ -97,6 +98,7 @@ local function parse_clickhouse_response(params, data) return parsed_rows end +-- Helper to generate HTTP closure local function mk_http_select_cb(upstream, params, ok_cb, fail_cb) local function http_cb(err_message, code, data, _) if code ~= 200 or err_message then @@ -131,8 +133,36 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb) return http_cb end +-- Helper to generate HTTP closure +local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb) + local function http_cb(err_message, code, data, _) + if code ~= 200 or err_message then + if not err_message then err_message = data end + local ip_addr = upstream:get_addr():to_string(true) + rspamd_logger.errx(params.log_obj, + "request failed on clickhouse server %s: %s", + ip_addr, err_message) + + if fail_cb then + fail_cb(params, err_message, data) + end + upstream:fail() + else + upstream:ok() + rspamd_logger.debugm(N, params.log_obj, + "http_cb ok: %s, %s, %s, %s", err_message, code, data, _) + + if ok_cb then + ok_cb(params, data) + end + end + end + + return http_cb +end + --[[[ --- @function lua_clickhouse.select_request(upstream, settings, params, query, +-- @function lua_clickhouse.select(upstream, settings, params, query, ok_cb, fail_cb) -- Make select request to clickhouse -- @param {upstream} upstream clickhouse server upstream @@ -143,14 +173,14 @@ end -- * user: HTTP user -- * password: HTTP password -- @param {params} HTTP request params +-- @param {string} query select query (passed in HTTP body) -- @param {function} ok_cb callback to be called in case of success -- @param {function} fail_cb callback to be called in case of some error -- @return {boolean} whether a connection was successful -- @example -- --]] - -exports.select_request = function (upstream, settings, params, query, ok_cb, fail_cb) +exports.select = function (upstream, settings, params, query, ok_cb, fail_cb) local http_params = {} for k,v in pairs(params) do http_params[k] = v end @@ -165,7 +195,7 @@ exports.select_request = function (upstream, settings, params, query, ok_cb, fai http_params.body = query http_params.log_obj = params.task or params.config - rspamd_logger.debugm(N, http_params.log_obj, "clickhouse_request: %s", params.body) + rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", params.body) if not http_params.url then local connect_prefix = "http://" @@ -179,4 +209,56 @@ exports.select_request = function (upstream, settings, params, query, ok_cb, fai return rspamd_http.request(http_params) end +--[[[ +-- @function lua_clickhouse.insert(upstream, settings, params, query, rows, + ok_cb, fail_cb) +-- Insert data rows to clickhouse +-- @param {upstream} upstream clickhouse server upstream +-- @param {table} settings global settings table: +-- * use_gsip: use gzip compression +-- * timeout: request timeout +-- * no_ssl_verify: skip SSL verification +-- * user: HTTP user +-- * password: HTTP password +-- @param {params} HTTP request params +-- @param {string} query select query (passed in `query` request element with spaces escaped) +-- @param {table|mixed} rows mix of strings, numbers or tables (for arrays) +-- @param {function} ok_cb callback to be called in case of success +-- @param {function} fail_cb callback to be called in case of some error +-- @return {boolean} whether a connection was successful +-- @example +-- +--]] +exports.insert = function (upstream, settings, params, query, rows, + ok_cb, fail_cb) + local fun = require "fun" + local http_params = {} + + for k,v in pairs(params) do http_params[k] = v end + + http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb) + http_params.gzip = settings.use_gzip + http_params.mime_type = 'text/plain' + http_params.timeout = settings.timeout or default_timeout + http_params.no_ssl_verify = settings.no_ssl_verify + http_params.user = settings.user + http_params.password = settings.password + http_params.body = table.concat(fun.totable(fun.map(function(row) + return row_to_tsv(row) + end), rows), '\n') + http_params.log_obj = params.task or params.config + + if not http_params.url then + local connect_prefix = "http://" + if settings.use_https then + connect_prefix = 'https://' + end + local ip_addr = upstream:get_addr():to_string(true) + http_params.url = string.format('%s%s/?query=%s', connect_prefix, + ip_addr, escape_spaces(query)) + end + + rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", params.body) +end + return exports \ No newline at end of file