]> source.dussan.org Git - rspamd.git/commitdiff
[Minor] Reworked clickhouse routines using new API
authorMikhail Galanin <mgalanin@mimecast.com>
Fri, 24 Aug 2018 08:18:06 +0000 (09:18 +0100)
committerMikhail Galanin <mgalanin@mimecast.com>
Fri, 24 Aug 2018 08:18:06 +0000 (09:18 +0100)
lualib/lua_clickhouse.lua
src/plugins/lua/clickhouse.lua

index cf6e9d89423ae193e8fe556c3f42152b97e7c86a..e14518ca6a9575f3e83eed2dd85a5648e8dc1b4f 100644 (file)
@@ -228,6 +228,65 @@ exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
   return rspamd_http.request(http_params)
 end
 
+--[[[
+-- @function lua_clickhouse.select_sync(upstream, settings, params, query,
+      ok_cb, fail_cb)
+-- Make select request to clickhouse
+-- @param {upstream} upstream clickhouse server upstream
+-- @param {table} settings global settings table:
+--   * use_gsip: use gzip compression
+--   * timeout: request timeout
+--   * no_ssl_verify: skip SSL verification
+--   * user: HTTP user
+--   * password: HTTP password
+-- @param {params} HTTP request params
+-- @param {string} query select query (passed in HTTP body)
+-- @param {function} ok_cb callback to be called in case of success
+-- @param {function} fail_cb callback to be called in case of some error
+-- @return
+--          {string} error message if exists
+--          nil | {rows} | {http_response}
+-- @example
+--
+--]]
+exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_cb)
+  local http_params = {}
+
+  for k,v in pairs(params) do http_params[k] = v end
+
+  http_params.gzip = settings.use_gzip
+  http_params.mime_type = 'text/plain'
+  http_params.timeout = settings.timeout or default_timeout
+  http_params.no_ssl_verify = settings.no_ssl_verify
+  http_params.user = settings.user
+  http_params.password = settings.password
+  http_params.body = query
+  http_params.log_obj = params.task or params.config
+
+  lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
+
+  if not http_params.url then
+    local connect_prefix = "http://"
+    if settings.use_https then
+      connect_prefix = 'https://'
+    end
+    local ip_addr = upstream:get_addr():to_string(true)
+    http_params.url = connect_prefix .. ip_addr .. '/?default_format=JSONEachRow'
+  end
+
+  local err, response = rspamd_http.request(http_params)
+
+  if err then
+    return err, nil
+  elseif response.code ~= 200 then
+    return response.content, response
+  else
+    lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response)
+    local rows = parse_clickhouse_response(params, response.content)
+    return nil, rows
+  end
+end
+
 --[[[
 -- @function lua_clickhouse.insert(upstream, settings, params, query, rows,
       ok_cb, fail_cb)
@@ -330,5 +389,47 @@ exports.generic = function (upstream, settings, params, query,
   return rspamd_http.request(http_params)
 end
 
+--[[[
+-- @function lua_clickhouse.generic_sync(upstream, settings, params, query,
+      ok_cb, fail_cb)
+-- Make a generic request to Clickhouse (e.g. alter)
+-- @param {upstream} upstream clickhouse server upstream
+-- @param {table} settings global settings table:
+--   * use_gsip: use gzip compression
+--   * timeout: request timeout
+--   * no_ssl_verify: skip SSL verification
+--   * user: HTTP user
+--   * password: HTTP password
+-- @param {params} HTTP request params
+-- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
+-- @return {boolean} whether a connection was successful
+-- @example
+--
+--]]
+exports.generic_sync = function (upstream, settings, params, query)
+  local http_params = {}
+
+  for k,v in pairs(params) do http_params[k] = v end
+
+  http_params.gzip = settings.use_gzip
+  http_params.mime_type = 'text/plain'
+  http_params.timeout = settings.timeout or default_timeout
+  http_params.no_ssl_verify = settings.no_ssl_verify
+  http_params.user = settings.user
+  http_params.password = settings.password
+  http_params.log_obj = params.task or params.config
+  http_params.body = query
+
+  if not http_params.url then
+    local connect_prefix = "http://"
+    if settings.use_https then
+      connect_prefix = 'https://'
+    end
+    local ip_addr = upstream:get_addr():to_string(true)
+    http_params.url = connect_prefix .. ip_addr .. '/?default_format=JSONEachRow'
+  end
+
+  return rspamd_http.request(http_params)
+end
 
 return exports
\ No newline at end of file
index c6657ae7c13da6aebd85befe992b924cdddef043..346ea2e97936a9e155e52c98ea47c75ee1f568fd 100644 (file)
@@ -594,28 +594,22 @@ local function do_remove_partition(ev_base, cfg, table_name, partition_id)
   local ch_params = {
     body = sql,
     ev_base = ev_base,
-    cfg = cfg,
+    config = cfg,
   }
 
-  local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
-      function(_, rows)
-        rspamd_logger.infox(rspamd_config,
-            'detached partition %s:%s on server %s', table_name, partition_id,
-            settings['server'])
-      end,
-      function(_, err)
-        rspamd_logger.errx(rspamd_config,
-            "cannot detach partition %s:%s from server %s: %s",
-            table_name, partition_id,
-            settings['server'], err)
-      end)
-
-  if not ret then
+  local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql)
+  if err then
     rspamd_logger.errx(rspamd_config,
-        "cannot detach partition %s:%s from server %s: cannot make request",
-        table_name, partition_id,
-        settings['server'])
+      "cannot detach partition %s:%s from server %s: %s",
+      table_name, partition_id,
+      settings['server'], err)
+    return
   end
+
+  rspamd_logger.infox(rspamd_config,
+      'detached partition %s:%s on server %s', table_name, partition_id,
+      settings['server'])
+
 end
 
 --[[
@@ -670,10 +664,7 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
   local upstream = settings.upstream:get_upstream_round_robin()
   local partition_to_remove_sql = "SELECT distinct partition, table FROM system.parts WHERE table in ('${tables}') and max_date <= toDate(now() - interval ${month} month);"
 
-  local table_names = {}
-  for table_name,_ in pairs(clickhouse_schema) do
-    table.insert(table_names, settings[table_name])
-  end
+  local table_names = {'rspamd'}
   local tables = table.concat(table_names, "', '")
   local sql_params = {
     tables = tables,
@@ -686,20 +677,15 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
     ev_base = ev_base,
     config = cfg,
   }
-  local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
-      function(_, rows)
-        fun.each(function(row)
-          do_remove_partition(ev_base, cfg, row.table, row.partition)
-        end, rows)
-      end,
-      function(_, err)
-        rspamd_logger.errx(rspamd_config,
-            "cannot send data to clickhouse server %s: %s",
-            settings['server'], err)
-      end)
-  if not ret then
-    rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request",
-            settings['server'])
+  local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql)
+  if err then
+    rspamd_logger.errx(rspamd_config,
+      "cannot send data to clickhouse server %s: %s",
+      settings['server'], err)
+  else
+    fun.each(function(row)
+      do_remove_partition(ev_base, cfg, row.table, row.partition)
+    end, rows)
   end
 
   -- settings.retention.period is added on initialisation, see below
@@ -711,29 +697,20 @@ local function upload_clickhouse_schema(upstream, ev_base, cfg)
     ev_base = ev_base,
     config = cfg,
   }
+
   -- Apply schema sequentially
-  local function sql_recursor(i)
-    if clickhouse_schema[i] then
-      local sql = clickhouse_schema[i]
-      local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
-          function(_, _)
-            rspamd_logger.infox(rspamd_config, 'uploaded clickhouse schema element %s to %s',
-                i, upstream:get_addr():to_string(true))
-            sql_recursor(i + 1)
-          end,
-          function(_, err)
-            rspamd_logger.errx(rspamd_config,
-                "cannot upload schema '%s' on clickhouse server %s: %s",
-                sql, upstream:get_addr():to_string(true), err)
-          end)
-      if not ret then
-        rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: cannot make request",
-            sql, upstream:get_addr():to_string(true))
-      end
+  for i,v in ipairs(clickhouse_schema) do
+    local sql = v
+    local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql)
+
+    if err then
+      rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: %s",
+        sql, upstream:get_addr():to_string(true), err)
+      return
     end
+    rspamd_logger.infox(rspamd_config, 'uploaded clickhouse schema element %s to %s',
+      i, upstream:get_addr():to_string(true))
   end
-
-  sql_recursor(1)
 end
 
 local function maybe_apply_migrations(upstream, ev_base, cfg, version)
@@ -792,32 +769,27 @@ local function check_rspamd_table(upstream, ev_base, cfg)
     config = cfg,
   }
   local sql = [[EXISTS TABLE rspamd]]
-  local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
-      function(_, rows)
-        if rows[1] and rows[1].result then
-          if tonumber(rows[1].result) == 1 then
-            -- Apply migration
-            rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration')
-            maybe_apply_migrations(upstream, ev_base, cfg, 1)
-          else
-            -- Upload schema
-            rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema')
-            upload_clickhouse_schema(upstream, ev_base, cfg)
-          end
-        else
-          rspamd_logger.errx(rspamd_config,
-              "unexpected reply on EXISTS command from server %s: %s",
-              upstream:get_addr():to_string(true), rows)
-        end
-      end ,
-      function(_, err)
-        rspamd_logger.errx(rspamd_config,
-            "cannot check if rspamd table exists on clickhouse server %s: %s",
-            upstream:get_addr():to_string(true), err)
-      end)
-  if not ret then
-    rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: cannot make request",
-        upstream:get_addr():to_string(true))
+  local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql)
+  if err then
+    rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: %s",
+      upstream:get_addr():to_string(true), err)
+    return
+  end
+
+  if rows[1] and rows[1].result then
+    if tonumber(rows[1].result) == 1 then
+      -- Apply migration
+      rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration')
+      maybe_apply_migrations(upstream, ev_base, cfg, 1)
+    else
+      -- Upload schema
+      rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema')
+      upload_clickhouse_schema(upstream, ev_base, cfg)
+    end
+  else
+    rspamd_logger.errx(rspamd_config,
+        "unexpected reply on EXISTS command from server %s: %s",
+        upstream:get_addr():to_string(true), rows)
   end
 end
 
@@ -831,35 +803,28 @@ local function check_clickhouse_upstream(upstream, ev_base, cfg)
   for k,rule in pairs(settings.custom_rules) do
     if rule.schema then
       local sql = rspamd_lua_utils.template(rule.schema, settings)
-      local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
-          nil,
-          function(_, err)
-            rspamd_logger.errx(rspamd_config,
-                "cannot send custom schema %s to clickhouse server %s: %s",
-                k, upstream:get_addr():to_string(true), err)
-          end)
-      if not ret then
-        rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request",
-            k, upstream:get_addr():to_string(true))
+      local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql)
+      if err then
+        rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request (%s)",
+            k, upstream:get_addr():to_string(true), err)
       end
     end
   end
 
   -- Now check the main schema and apply migrations if needed
   local sql = [[SELECT MAX(Version) as v FROM rspamd_version]]
-  local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
-      function(_, rows)
-        local version = tonumber(rows[1].v)
-        maybe_apply_migrations(upstream, ev_base, cfg, version)
-      end,
-      function(_, err)
-        -- It might be either no rspamd table or version 1
-        rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table')
-        check_rspamd_table(upstream, ev_base, cfg)
-      end)
-  if not ret then
-    rspamd_logger.errx(rspamd_config, "cannot get version on clickhouse server %s: cannot make request",
-        upstream:get_addr():to_string(true))
+  local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql)
+  if err then
+    if rows and rows.code == 404 then
+      rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table')
+      check_rspamd_table(upstream, ev_base, cfg)
+    else
+      rspamd_logger.errx(rspamd_config, "cannot get version on clickhouse server %s: %s",
+        upstream:get_addr():to_string(true), err)
+    end
+  else
+    local version = tonumber(rows[1].v)
+    maybe_apply_migrations(upstream, ev_base, cfg, version)
   end
 end