diff options
author | Mikhail Galanin <mgalanin@mimecast.com> | 2018-08-24 09:18:06 +0100 |
---|---|---|
committer | Mikhail Galanin <mgalanin@mimecast.com> | 2018-08-24 09:18:06 +0100 |
commit | 0fbaf1290c84643c94712ca26fd53872fddc5fc4 (patch) | |
tree | 045173eee17842da3d1bc7ded49c3fb7633a9817 | |
parent | 3c0b613742ac6fa4fc75f384387d69d753f8a682 (diff) | |
download | rspamd-0fbaf1290c84643c94712ca26fd53872fddc5fc4.tar.gz rspamd-0fbaf1290c84643c94712ca26fd53872fddc5fc4.zip |
[Minor] Reworked clickhouse routines using new API
-rw-r--r-- | lualib/lua_clickhouse.lua | 101 | ||||
-rw-r--r-- | src/plugins/lua/clickhouse.lua | 175 |
2 files changed, 171 insertions, 105 deletions
diff --git a/lualib/lua_clickhouse.lua b/lualib/lua_clickhouse.lua index cf6e9d894..e14518ca6 100644 --- a/lualib/lua_clickhouse.lua +++ b/lualib/lua_clickhouse.lua @@ -229,6 +229,65 @@ exports.select = function (upstream, settings, params, query, ok_cb, fail_cb) end --[[[ +-- @function lua_clickhouse.select_sync(upstream, settings, params, query, + ok_cb, fail_cb) +-- Make select request to clickhouse +-- @param {upstream} upstream clickhouse server upstream +-- @param {table} settings global settings table: +-- * use_gsip: use gzip compression +-- * timeout: request timeout +-- * no_ssl_verify: skip SSL verification +-- * user: HTTP user +-- * password: HTTP password +-- @param {params} HTTP request params +-- @param {string} query select query (passed in HTTP body) +-- @param {function} ok_cb callback to be called in case of success +-- @param {function} fail_cb callback to be called in case of some error +-- @return +-- {string} error message if exists +-- nil | {rows} | {http_response} +-- @example +-- +--]] +exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_cb) + local http_params = {} + + for k,v in pairs(params) do http_params[k] = v end + + http_params.gzip = settings.use_gzip + http_params.mime_type = 'text/plain' + http_params.timeout = settings.timeout or default_timeout + http_params.no_ssl_verify = settings.no_ssl_verify + http_params.user = settings.user + http_params.password = settings.password + http_params.body = query + http_params.log_obj = params.task or params.config + + lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body) + + if not http_params.url then + local connect_prefix = "http://" + if settings.use_https then + connect_prefix = 'https://' + end + local ip_addr = upstream:get_addr():to_string(true) + http_params.url = connect_prefix .. ip_addr .. '/?default_format=JSONEachRow' + end + + local err, response = rspamd_http.request(http_params) + + if err then + return err, nil + elseif response.code ~= 200 then + return response.content, response + else + lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response) + local rows = parse_clickhouse_response(params, response.content) + return nil, rows + end +end + +--[[[ -- @function lua_clickhouse.insert(upstream, settings, params, query, rows, ok_cb, fail_cb) -- Insert data rows to clickhouse @@ -330,5 +389,47 @@ exports.generic = function (upstream, settings, params, query, return rspamd_http.request(http_params) end +--[[[ +-- @function lua_clickhouse.generic_sync(upstream, settings, params, query, + ok_cb, fail_cb) +-- Make a generic request to Clickhouse (e.g. alter) +-- @param {upstream} upstream clickhouse server upstream +-- @param {table} settings global settings table: +-- * use_gsip: use gzip compression +-- * timeout: request timeout +-- * no_ssl_verify: skip SSL verification +-- * user: HTTP user +-- * password: HTTP password +-- @param {params} HTTP request params +-- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped) +-- @return {boolean} whether a connection was successful +-- @example +-- +--]] +exports.generic_sync = function (upstream, settings, params, query) + local http_params = {} + + for k,v in pairs(params) do http_params[k] = v end + + http_params.gzip = settings.use_gzip + http_params.mime_type = 'text/plain' + http_params.timeout = settings.timeout or default_timeout + http_params.no_ssl_verify = settings.no_ssl_verify + http_params.user = settings.user + http_params.password = settings.password + http_params.log_obj = params.task or params.config + http_params.body = query + + if not http_params.url then + local connect_prefix = "http://" + if settings.use_https then + connect_prefix = 'https://' + end + local ip_addr = upstream:get_addr():to_string(true) + http_params.url = connect_prefix .. ip_addr .. '/?default_format=JSONEachRow' + end + + return rspamd_http.request(http_params) +end return exports
\ No newline at end of file diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index c6657ae7c..346ea2e97 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -594,28 +594,22 @@ local function do_remove_partition(ev_base, cfg, table_name, partition_id) local ch_params = { body = sql, ev_base = ev_base, - cfg = cfg, + config = cfg, } - local ret = lua_clickhouse.select(upstream, settings, ch_params, sql, - function(_, rows) - rspamd_logger.infox(rspamd_config, - 'detached partition %s:%s on server %s', table_name, partition_id, - settings['server']) - end, - function(_, err) - rspamd_logger.errx(rspamd_config, - "cannot detach partition %s:%s from server %s: %s", - table_name, partition_id, - settings['server'], err) - end) - - if not ret then + local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql) + if err then rspamd_logger.errx(rspamd_config, - "cannot detach partition %s:%s from server %s: cannot make request", - table_name, partition_id, - settings['server']) + "cannot detach partition %s:%s from server %s: %s", + table_name, partition_id, + settings['server'], err) + return end + + rspamd_logger.infox(rspamd_config, + 'detached partition %s:%s on server %s', table_name, partition_id, + settings['server']) + end --[[ @@ -670,10 +664,7 @@ local function clickhouse_remove_old_partitions(cfg, ev_base) local upstream = settings.upstream:get_upstream_round_robin() local partition_to_remove_sql = "SELECT distinct partition, table FROM system.parts WHERE table in ('${tables}') and max_date <= toDate(now() - interval ${month} month);" - local table_names = {} - for table_name,_ in pairs(clickhouse_schema) do - table.insert(table_names, settings[table_name]) - end + local table_names = {'rspamd'} local tables = table.concat(table_names, "', '") local sql_params = { tables = tables, @@ -686,20 +677,15 @@ local function clickhouse_remove_old_partitions(cfg, ev_base) ev_base = ev_base, config = cfg, } - local ret = lua_clickhouse.select(upstream, settings, ch_params, sql, - function(_, rows) - fun.each(function(row) - do_remove_partition(ev_base, cfg, row.table, row.partition) - end, rows) - end, - function(_, err) - rspamd_logger.errx(rspamd_config, - "cannot send data to clickhouse server %s: %s", - settings['server'], err) - end) - if not ret then - rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request", - settings['server']) + local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql) + if err then + rspamd_logger.errx(rspamd_config, + "cannot send data to clickhouse server %s: %s", + settings['server'], err) + else + fun.each(function(row) + do_remove_partition(ev_base, cfg, row.table, row.partition) + end, rows) end -- settings.retention.period is added on initialisation, see below @@ -711,29 +697,20 @@ local function upload_clickhouse_schema(upstream, ev_base, cfg) ev_base = ev_base, config = cfg, } + -- Apply schema sequentially - local function sql_recursor(i) - if clickhouse_schema[i] then - local sql = clickhouse_schema[i] - local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql, - function(_, _) - rspamd_logger.infox(rspamd_config, 'uploaded clickhouse schema element %s to %s', - i, upstream:get_addr():to_string(true)) - sql_recursor(i + 1) - end, - function(_, err) - rspamd_logger.errx(rspamd_config, - "cannot upload schema '%s' on clickhouse server %s: %s", - sql, upstream:get_addr():to_string(true), err) - end) - if not ret then - rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: cannot make request", - sql, upstream:get_addr():to_string(true)) - end + for i,v in ipairs(clickhouse_schema) do + local sql = v + local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql) + + if err then + rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: %s", + sql, upstream:get_addr():to_string(true), err) + return end + rspamd_logger.infox(rspamd_config, 'uploaded clickhouse schema element %s to %s', + i, upstream:get_addr():to_string(true)) end - - sql_recursor(1) end local function maybe_apply_migrations(upstream, ev_base, cfg, version) @@ -792,32 +769,27 @@ local function check_rspamd_table(upstream, ev_base, cfg) config = cfg, } local sql = [[EXISTS TABLE rspamd]] - local ret = lua_clickhouse.select(upstream, settings, ch_params, sql, - function(_, rows) - if rows[1] and rows[1].result then - if tonumber(rows[1].result) == 1 then - -- Apply migration - rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration') - maybe_apply_migrations(upstream, ev_base, cfg, 1) - else - -- Upload schema - rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema') - upload_clickhouse_schema(upstream, ev_base, cfg) - end - else - rspamd_logger.errx(rspamd_config, - "unexpected reply on EXISTS command from server %s: %s", - upstream:get_addr():to_string(true), rows) - end - end , - function(_, err) - rspamd_logger.errx(rspamd_config, - "cannot check if rspamd table exists on clickhouse server %s: %s", - upstream:get_addr():to_string(true), err) - end) - if not ret then - rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: cannot make request", - upstream:get_addr():to_string(true)) + local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql) + if err then + rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: %s", + upstream:get_addr():to_string(true), err) + return + end + + if rows[1] and rows[1].result then + if tonumber(rows[1].result) == 1 then + -- Apply migration + rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration') + maybe_apply_migrations(upstream, ev_base, cfg, 1) + else + -- Upload schema + rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema') + upload_clickhouse_schema(upstream, ev_base, cfg) + end + else + rspamd_logger.errx(rspamd_config, + "unexpected reply on EXISTS command from server %s: %s", + upstream:get_addr():to_string(true), rows) end end @@ -831,35 +803,28 @@ local function check_clickhouse_upstream(upstream, ev_base, cfg) for k,rule in pairs(settings.custom_rules) do if rule.schema then local sql = rspamd_lua_utils.template(rule.schema, settings) - local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql, - nil, - function(_, err) - rspamd_logger.errx(rspamd_config, - "cannot send custom schema %s to clickhouse server %s: %s", - k, upstream:get_addr():to_string(true), err) - end) - if not ret then - rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request", - k, upstream:get_addr():to_string(true)) + local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql) + if err then + rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request (%s)", + k, upstream:get_addr():to_string(true), err) end end end -- Now check the main schema and apply migrations if needed local sql = [[SELECT MAX(Version) as v FROM rspamd_version]] - local ret = lua_clickhouse.select(upstream, settings, ch_params, sql, - function(_, rows) - local version = tonumber(rows[1].v) - maybe_apply_migrations(upstream, ev_base, cfg, version) - end, - function(_, err) - -- It might be either no rspamd table or version 1 - rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table') - check_rspamd_table(upstream, ev_base, cfg) - end) - if not ret then - rspamd_logger.errx(rspamd_config, "cannot get version on clickhouse server %s: cannot make request", - upstream:get_addr():to_string(true)) + local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql) + if err then + if rows and rows.code == 404 then + rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table') + check_rspamd_table(upstream, ev_base, cfg) + else + rspamd_logger.errx(rspamd_config, "cannot get version on clickhouse server %s: %s", + upstream:get_addr():to_string(true), err) + end + else + local version = tonumber(rows[1].v) + maybe_apply_migrations(upstream, ev_base, cfg, version) end end |