]> source.dussan.org Git - rspamd.git/commitdiff
[Project] Implement insert method
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 3 Aug 2018 18:28:10 +0000 (19:28 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Fri, 3 Aug 2018 18:28:33 +0000 (19:28 +0100)
lualib/lua_clickhouse.lua

index 94cf1c5fb58abe4f5bc20a520c6c745fe4bc019d..fa94e4a43533b36b40fa3ed5a708e9ddab4231dc 100644 (file)
@@ -62,6 +62,7 @@ local function row_to_tsv(row)
   return table.concat(row, '\t')
 end
 
+-- Parses JSONEachRow reply from CH
 local function parse_clickhouse_response(params, data)
   local lua_util = require "lua_util"
   local ucl = require "ucl"
@@ -97,6 +98,7 @@ local function parse_clickhouse_response(params, data)
   return parsed_rows
 end
 
+-- Helper to generate HTTP closure
 local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
   local function http_cb(err_message, code, data, _)
     if code ~= 200 or err_message then
@@ -131,8 +133,36 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
   return http_cb
 end
 
+-- Helper to generate HTTP closure
+local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
+  local function http_cb(err_message, code, data, _)
+    if code ~= 200 or err_message then
+      if not err_message then err_message = data end
+      local ip_addr = upstream:get_addr():to_string(true)
+      rspamd_logger.errx(params.log_obj,
+          "request failed on clickhouse server %s: %s",
+          ip_addr, err_message)
+
+      if fail_cb then
+        fail_cb(params, err_message, data)
+      end
+      upstream:fail()
+    else
+      upstream:ok()
+      rspamd_logger.debugm(N, params.log_obj,
+          "http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
+
+      if ok_cb then
+        ok_cb(params, data)
+      end
+    end
+  end
+
+  return http_cb
+end
+
 --[[[
--- @function lua_clickhouse.select_request(upstream, settings, params, query,
+-- @function lua_clickhouse.select(upstream, settings, params, query,
       ok_cb, fail_cb)
 -- Make select request to clickhouse
 -- @param {upstream} upstream clickhouse server upstream
@@ -143,14 +173,14 @@ end
 --   * user: HTTP user
 --   * password: HTTP password
 -- @param {params} HTTP request params
+-- @param {string} query select query (passed in HTTP body)
 -- @param {function} ok_cb callback to be called in case of success
 -- @param {function} fail_cb callback to be called in case of some error
 -- @return {boolean} whether a connection was successful
 -- @example
 --
 --]]
-
-exports.select_request = function (upstream, settings, params, query, ok_cb, fail_cb)
+exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
   local http_params = {}
 
   for k,v in pairs(params) do http_params[k] = v end
@@ -165,7 +195,7 @@ exports.select_request = function (upstream, settings, params, query, ok_cb, fai
   http_params.body = query
   http_params.log_obj = params.task or params.config
 
-  rspamd_logger.debugm(N, http_params.log_obj, "clickhouse_request: %s", params.body)
+  rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", params.body)
 
   if not http_params.url then
     local connect_prefix = "http://"
@@ -179,4 +209,56 @@ exports.select_request = function (upstream, settings, params, query, ok_cb, fai
   return rspamd_http.request(http_params)
 end
 
+--[[[
+-- @function lua_clickhouse.insert(upstream, settings, params, query, rows,
+      ok_cb, fail_cb)
+-- Insert data rows to clickhouse
+-- @param {upstream} upstream clickhouse server upstream
+-- @param {table} settings global settings table:
+--   * use_gsip: use gzip compression
+--   * timeout: request timeout
+--   * no_ssl_verify: skip SSL verification
+--   * user: HTTP user
+--   * password: HTTP password
+-- @param {params} HTTP request params
+-- @param {string} query select query (passed in `query` request element with spaces escaped)
+-- @param {table|mixed} rows mix of strings, numbers or tables (for arrays)
+-- @param {function} ok_cb callback to be called in case of success
+-- @param {function} fail_cb callback to be called in case of some error
+-- @return {boolean} whether a connection was successful
+-- @example
+--
+--]]
+exports.insert = function (upstream, settings, params, query, rows,
+                              ok_cb, fail_cb)
+  local fun = require "fun"
+  local http_params = {}
+
+  for k,v in pairs(params) do http_params[k] = v end
+
+  http_params.callback = mk_http_insert_cb(upstream, http_params, ok_cb, fail_cb)
+  http_params.gzip = settings.use_gzip
+  http_params.mime_type = 'text/plain'
+  http_params.timeout = settings.timeout or default_timeout
+  http_params.no_ssl_verify = settings.no_ssl_verify
+  http_params.user = settings.user
+  http_params.password = settings.password
+  http_params.body = table.concat(fun.totable(fun.map(function(row)
+    return row_to_tsv(row)
+  end), rows), '\n')
+  http_params.log_obj = params.task or params.config
+
+  if not http_params.url then
+    local connect_prefix = "http://"
+    if settings.use_https then
+      connect_prefix = 'https://'
+    end
+    local ip_addr = upstream:get_addr():to_string(true)
+    http_params.url = string.format('%s%s/?query=%s', connect_prefix,
+        ip_addr, escape_spaces(query))
+  end
+
+  rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", params.body)
+end
+
 return exports
\ No newline at end of file