From 1e3f1f0c2d454ee283c05c70912993cf199e0fe1 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 17 Feb 2020 10:04:19 +0000 Subject: [PATCH] [Project] Clickhouse: Add extra columns concept --- src/plugins/lua/clickhouse.lua | 112 ++++++++++++++++++++++++++++++++- 1 file changed, 110 insertions(+), 2 deletions(-) diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index 442aee4c1..ab1693b48 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -91,7 +91,8 @@ local settings = { method = 'detach', period_months = 3, run_every = '7d', - } + }, + extra_columns = {}, } --- @language SQL @@ -350,6 +351,10 @@ local function clickhouse_asn_row(res) for _,v in ipairs(fields) do table.insert(res, v) end end +local function clickhouse_extra_columns(res) + for _,v in ipairs(settings.extra_columns) do table.insert(res, v.name) end +end + local function today(ts) return os.date('!%Y-%m-%d', ts) end @@ -430,6 +435,10 @@ local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows) clickhouse_groups_row(fields) end + if #settings.extra_columns > 0 then + clickhouse_extra_columns(fields) + end + send_data('generic data', gen_rows, string.format('INSERT INTO rspamd (%s)', table.concat(fields, ','))) @@ -831,6 +840,19 @@ local function clickhouse_collect(task) table.insert(row, gr_scores_tab) end + -- Extra columns + if #settings.extra_columns > 0 then + for _,col in ipairs(settings.extra_columns) do + local elts = col.selector(task) + + if elts then + table.insert(row, elts) + else + table.insert(row, col.default_value) + end + end + end + -- Custom data for k,rule in pairs(settings.custom_rules) do if not custom_rows[k] then custom_rows[k] = {} end @@ -1107,6 +1129,47 @@ local function maybe_apply_migrations(upstream, ev_base, cfg, version) migration_recursor(version) end +local function add_extra_columns(upstream, ev_base, cfg) + local ch_params = { + ev_base = ev_base, + config = cfg, + } + -- Apply migrations sequentially + local function columns_recursor(i) + if i <= #settings.extra_columns then + local col = settings.extra_columns[i] + local prev_column + if i == 1 then + prev_column = 'Digest' + else + prev_column = settings.extra_columns[i - 1].name + end + local sql = string.format('ALTER TABLE rspamd ADD COLUMN IF NOT EXISTS `%s` %s AFTER `%s`', + col.name, col.type, prev_column) + local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql, + function(_, _) + rspamd_logger.infox(rspamd_config, + 'added extra column %s (%s) after %s', + col.name, col.type, prev_column) + -- Apply the next statement + columns_recursor(i + 1) + end , + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot apply add column alter %s: '%s' on clickhouse server %s: %s", + i, sql, upstream:get_addr():to_string(true), err) + end) + if not ret then + rspamd_logger.errx(rspamd_config, + "cannot apply add column alter %s: '%s' on clickhouse server %s: cannot make request", + i, sql, upstream:get_addr():to_string(true)) + end + end + end + + columns_recursor(1) +end + local function check_rspamd_table(upstream, ev_base, cfg) local ch_params = { ev_base = ev_base, @@ -1173,6 +1236,10 @@ local function check_clickhouse_upstream(upstream, ev_base, cfg) local version = tonumber(rows[1].v) maybe_apply_migrations(upstream, ev_base, cfg, version) end + + if #settings.extra_columns > 0 then + add_extra_columns(upstream, ev_base, cfg) + end end local opts = rspamd_config:get_all_opt('clickhouse') @@ -1223,7 +1290,7 @@ if opts then end end else - settings[k] = v + settings[k] = lua_util.deepcopy(v) end end @@ -1251,6 +1318,47 @@ if opts then settings.exceptions, N) end + if settings.extra_columns then + -- Check sanity and create selector closures + local lua_selectors = require "lua_selectors" + + for col_name,col_data in pairs(settings.extra_columns) do + if not col_data.selector or not col_data.type then + rspamd_logger.errx(rspamd_config, 'cannot add clickhouse extra row %s: no type or no selector', + col_name) + else + local selector = lua_selectors.create_selector_closure(rspamd_config, + col_data.selector, col_data.delimiter or '', false) + + if not selector then + rspamd_logger.errx(rspamd_config, 'cannot add clickhouse extra row %s: bad selector: %s', + col_name, col_data.selector) + -- Remove column + settings.extra_columns[col_name] = nil + else + if not col_data.default_value then + if col_data.type:lower():match('Array') then + col_data.default_value = {} + else + col_data.default_value = '' + end + end + end + end + end + + -- Convert extra columns from a map to an array sorted by column name to + -- preserve strict order when doing altering + local extra_cols = {} + for col_name,col_data in pairs(settings.extra_columns) do + local nelt = lua_util.shallowcopy(col_data) + nelt.name = col_name + extra_cols[#extra_cols + 1] = nelt + end + table.sort(extra_cols, function(c1, c2) return c1.name < c2.name end) + settings.extra_columns = extra_cols + end + rspamd_config:register_symbol({ name = 'CLICKHOUSE_COLLECT', type = 'idempotent', -- 2.39.5