]> source.dussan.org Git - rspamd.git/commitdiff
Clickhouse tables retention
authorMikhail Galanin <mgalanin@mimecast.com>
Thu, 26 Jul 2018 10:52:09 +0000 (11:52 +0100)
committerMikhail Galanin <mgalanin@mimecast.com>
Thu, 26 Jul 2018 14:21:13 +0000 (15:21 +0100)
conf/modules.d/clickhouse.conf
src/plugins/lua/clickhouse.lua

index 3dbc3b60c491e247f2365d3b11fe091b80d9104f..c35352a03c54179b355c85a3b3f4249438777547 100644 (file)
@@ -51,6 +51,18 @@ clickhouse {
   #dmarc_allow_symbols = ["DMARC_POLICY_ALLOW"];
   #dmarc_reject_symbols = ["DMARC_POLICY_REJECT", "DMARC_POLICY_QUARANTINE"];
 
+  #retention {
+  #  # disabled by default
+  #  enable = true;
+  #  # drop | detach, please refer to ClickHouse docs for details
+  #  # http://clickhouse-docs.readthedocs.io/en/latest/query_language/queries.html#manipulations-with-partitions-and-parts
+  #  method = "drop";
+  #  # how many month the data should be kept in ClickHouse
+  #  period_months = 3;
+  #  # how often run the cleanup process
+  #  run_every = "7d";
+  #}
+
   .include(try=true,priority=5) "${DBDIR}/dynamic/clickhouse.conf"
   .include(try=true,priority=1,duplicate=merge) "$LOCAL_CONFDIR/local.d/clickhouse.conf"
   .include(try=true,priority=10) "$LOCAL_CONFDIR/override.d/clickhouse.conf"
index 913fedf20130817acc998154b58afb47223da245..2d32982bd0cec211fe108bc0823a115c2e47680f 100644 (file)
@@ -18,6 +18,9 @@ local rspamd_logger = require 'rspamd_logger'
 local rspamd_http = require "rspamd_http"
 local rspamd_lua_utils = require "lua_util"
 local upstream_list = require "rspamd_upstream_list"
+local lua_util = require "lua_util"
+local ucl = require "ucl"
+
 local N = "clickhouse"
 
 if confighelp then
@@ -66,10 +69,16 @@ local settings = {
   user = nil,
   password = nil,
   no_ssl_verify = false,
+  retention = {
+    enable = false,
+    method = 'detach',
+    period_months = 3,
+    run_every = '7d',
+  }
 }
 
 local clickhouse_schema = {
-rspamd = [[
+table = [[
 CREATE TABLE IF NOT EXISTS ${table}
 (
     Date Date,
@@ -97,7 +106,7 @@ CREATE TABLE IF NOT EXISTS ${table}
 ) ENGINE = MergeTree(Date, (TS, From), 8192)
 ]],
 
-  attachments = [[
+attachments_table = [[
 CREATE TABLE IF NOT EXISTS ${attachments_table} (
     Date Date,
     Digest FixedString(32),
@@ -108,7 +117,7 @@ CREATE TABLE IF NOT EXISTS ${attachments_table} (
 ) ENGINE = MergeTree(Date, Digest, 8192)
 ]],
 
-  urls = [[
+urls_table = [[
 CREATE TABLE IF NOT EXISTS ${urls_table} (
     Date Date,
     Digest FixedString(32),
@@ -117,7 +126,7 @@ CREATE TABLE IF NOT EXISTS ${urls_table} (
 ) ENGINE = MergeTree(Date, Digest, 8192)
 ]],
 
-  emails = [[
+emails_table = [[
 CREATE TABLE IF NOT EXISTS ${emails_table} (
     Date Date,
     Digest FixedString(32),
@@ -125,7 +134,7 @@ CREATE TABLE IF NOT EXISTS ${emails_table} (
 ) ENGINE = MergeTree(Date, Digest, 8192)
 ]],
 
-  asn = [[
+asn_table = [[
 CREATE TABLE IF NOT EXISTS ${asn_table} (
     Date Date,
     Digest FixedString(32),
@@ -135,7 +144,7 @@ CREATE TABLE IF NOT EXISTS ${asn_table} (
 ) ENGINE = MergeTree(Date, Digest, 8192)
 ]],
 
-  symbols = [[
+symbols_table = [[
 CREATE TABLE IF NOT EXISTS ${symbols_table} (
     Date Date,
     Digest FixedString(32),
@@ -741,6 +750,174 @@ local function clickhouse_collect(task)
   end
 end
 
+local function mk_remove_http_cb(upstream, params, ok_cb)
+  local function do_remove_http_cb(err_message, code, data, _)
+    if code ~= 200 or err_message then
+      if not err_message then err_message = data end
+      local ip_addr = upstream:get_addr():to_string(true)
+      rspamd_logger.errx(rspamd_config, "request failed on clickhouse server %s: %s",
+              ip_addr, err_message)
+      upstream:fail()
+    else
+      upstream:ok()
+      if (ok_cb) then
+        rspamd_logger.debugm(N, params.ev_base, "do_remove_http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
+        ok_cb(params.ev_base, params.config, data)
+      end
+    end
+  end
+  return do_remove_http_cb
+end
+
+local function clickhouse_request(upstream, ok_cb, params)
+  rspamd_logger.infox(rspamd_config, "clickhouse_request: %s", params.body) -- TODO: make debugm
+
+  params.callback = mk_remove_http_cb(upstream, params, ok_cb)
+  params.gzip = settings.use_gzip
+  params.mime_type = 'text/plain'
+  params.timeout = settings['timeout']
+  params.no_ssl_verify = settings.no_ssl_verify
+  params.user = settings.user
+  params.password = settings.password
+  if not params.url then
+    local ip_addr = upstream:get_addr():to_string(true)
+    params.url = connect_prefix .. ip_addr
+  end
+  return rspamd_http.request(params)
+end
+
+local function do_remove_partition(ev_base, cfg, table_name, partition_id)
+  rspamd_logger.debugm(N, ev_base, "removing partition %s.%s", table_name, partition_id)
+  local upstream = settings.upstream:get_upstream_round_robin()
+  local remove_partition_sql = "ALTER TABLE ${table_name} ${remove_method} PARTITION ${partition_id}"
+  local remove_method = (settings.retention.method == 'drop') and 'DROP' or 'DETACH'
+  local sql_params = {
+    ['table_name']     = table_name,
+    ['remove_method']  = remove_method,
+    ['partition_id']   = partition_id
+  }
+
+  local sql = rspamd_lua_utils.template(remove_partition_sql, sql_params)
+
+  local ch_params = {
+    body = sql,
+    ev_base = ev_base,
+    cfg = cfg,
+  }
+
+  if not clickhouse_request(upstream, nil, ch_params) then
+    rspamd_logger.errx(ev_base, "cannot send data to clickhouse server %s: cannot make request",
+            settings['server'])
+  end
+end
+
+local function parse_clickhouse_response(ev_base, cfg, data)
+  rspamd_logger.debugm(N, ev_base, "got clickhouse response: %s", data)
+  if data == nil then
+    -- clickhouse returned no data: exiting
+    return
+  end
+  local function parse_string(s)
+    local parser = ucl.parser()
+    local res, err = parser:parse_string(s)
+    if not res then
+      rspamd_logger.errx(ev_base, 'Parser error: %s', err)
+      return nil
+    end
+    return parser:get_object()
+  end
+
+  -- iterate over rows
+  local ch_rows = lua_util.str_split(data, "\n")
+  for _, plain_row in pairs(ch_rows) do
+    if plain_row and plain_row:len() > 1 then
+      local parsed_row = parse_string(plain_row)
+      do_remove_partition(ev_base, cfg, parsed_row.table, parsed_row.partition_id)
+    end
+  end
+end
+
+
+--[[
+  nil   - file is not writable, do not perform removal
+  0     - it's time to perform removal
+  <int> - how many seconds wait until next run
+]]
+local function get_last_removal_ago(ev_base)
+  local ts_file = string.format('%s/%s', rspamd_paths['DBDIR'], 'clickhouse_retention_run')
+  local f, err = io.open(ts_file, 'r')
+  local write_file
+  local last_ts
+
+  if err then
+    rspamd_logger.debugm(N, ev_base, 'Failed to open %s: %s', ts_file, err)
+  else
+    last_ts = tonumber(f:read('*number'))
+    f:close()
+  end
+
+  write_file, err = io.open(ts_file, 'w')
+  if err then
+    rspamd_logger.errx(ev_base, 'Failed to open %s, will not perform retention: %s', ts_file, err)
+    return nil
+  end
+
+  local current_ts = os.time()
+
+  if last_ts == nil or (last_ts + settings.retention.period) <= current_ts then
+    local res
+    res, err = write_file:write(tostring(current_ts))
+    if err then
+      rspamd_logger.errx(ev_base, 'Failed to write %s, will not perform retention: %s', ts_file, err)
+      return nil
+    end
+    write_file:close()
+    return 0
+  end
+
+  return (last_ts + settings.retention.period) - current_ts
+end
+
+local function clickhouse_remove_old_partitions(cfg, ev_base)
+  local last_time_ago = get_last_removal_ago(ev_base)
+  if last_time_ago == nil then
+    rspamd_logger.errx(ev_base, "Failed to get last run time. Disabling retention")
+    return false
+  elseif last_time_ago ~= 0 then
+    return last_time_ago
+  end
+
+  local upstream = settings.upstream:get_upstream_round_robin()
+  local ip_addr = upstream:get_addr():to_string(true)
+
+  local partition_to_remove_sql = "SELECT distinct partition_id, table FROM system.parts WHERE table in ('${tables}') and max_date < toDate(now() - interval ${month} month);"
+
+  local table_names = {}
+  for table_name,_ in pairs(clickhouse_schema) do
+    table.insert(table_names, settings[table_name])
+  end
+  local tables = table.concat(table_names, "', '")
+  local sql_params = {
+    tables = tables,
+    month  = settings.retention.period_months,
+  }
+  local sql = rspamd_lua_utils.template(partition_to_remove_sql, sql_params)
+
+  local ch_params = {
+    body = sql,
+    url  = string.format("%s%s/?default_format=JSONEachRow", connect_prefix, ip_addr),
+    ev_base = ev_base,
+    config = cfg,
+  }
+  if not clickhouse_request(upstream, parse_clickhouse_response, ch_params) then
+    rspamd_logger.errx(ev_base, "cannot send data to clickhouse server %s: cannot make request",
+            settings['server'])
+  end
+
+  -- settings.retention.period is added on initialisation, see below
+  return settings.retention.period
+end
+
 local opts = rspamd_config:get_all_opt('clickhouse')
 if opts then
     for k,v in pairs(opts) do
@@ -821,6 +998,30 @@ if opts then
               send_req(tab, rspamd_lua_utils.template(sql, settings))
             end
           end
+
+          if settings.retention.enable and settings.retention.method ~= 'drop' and settings.retention.method ~= 'detach' then
+            rspamd_logger.errx(rspamd_config, "retention.method should be either 'drop' or 'detach' (now: %s). Disabling retention",
+                    settings.retention.method)
+            settings.retention.enable = false
+          end
+          if settings.retention.enable and settings.retention.period_months < 1 or settings.retention.period_months > 1000 then
+            rspamd_logger.errx(rspamd_config, "please, set retention.period_months between 1 and 1000 (now: %s). Disabling retention",
+                    settings.retention.period_months)
+            settings.retention.enable = false
+          end
+          local period = lua_util.parse_time_interval(settings.retention.run_every)
+          if settings.retention.enable and period == nil then
+            rspamd_logger.errx(rspamd_config, "invalid value for retention.run_every (%s). Disabling retention",
+                    settings.retention.run_every)
+            settings.retention.enable = false
+          end
+
+          if settings.retention.enable then
+            settings.retention.period = period
+            rspamd_logger.infox(rspamd_config, "retention will be performed each %s seconds for %s month with method %s",
+                    period, settings.retention.period_months, settings.retention.method)
+            rspamd_config:add_periodic(ev_base, 0, clickhouse_remove_old_partitions, false)
+          end
         end
       end)
     end