return rspamd_http.request(http_params)
end
+--[[[
+-- @function lua_clickhouse.select_sync(upstream, settings, params, query,
+ ok_cb, fail_cb)
+-- Make select request to clickhouse
+-- @param {upstream} upstream clickhouse server upstream
+-- @param {table} settings global settings table:
+-- * use_gsip: use gzip compression
+-- * timeout: request timeout
+-- * no_ssl_verify: skip SSL verification
+-- * user: HTTP user
+-- * password: HTTP password
+-- @param {params} HTTP request params
+-- @param {string} query select query (passed in HTTP body)
+-- @param {function} ok_cb callback to be called in case of success
+-- @param {function} fail_cb callback to be called in case of some error
+-- @return
+-- {string} error message if exists
+-- nil | {rows} | {http_response}
+-- @example
+--
+--]]
+exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_cb)
+ local http_params = {}
+
+ for k,v in pairs(params) do http_params[k] = v end
+
+ http_params.gzip = settings.use_gzip
+ http_params.mime_type = 'text/plain'
+ http_params.timeout = settings.timeout or default_timeout
+ http_params.no_ssl_verify = settings.no_ssl_verify
+ http_params.user = settings.user
+ http_params.password = settings.password
+ http_params.body = query
+ http_params.log_obj = params.task or params.config
+
+ lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
+
+ if not http_params.url then
+ local connect_prefix = "http://"
+ if settings.use_https then
+ connect_prefix = 'https://'
+ end
+ local ip_addr = upstream:get_addr():to_string(true)
+ http_params.url = connect_prefix .. ip_addr .. '/?default_format=JSONEachRow'
+ end
+
+ local err, response = rspamd_http.request(http_params)
+
+ if err then
+ return err, nil
+ elseif response.code ~= 200 then
+ return response.content, response
+ else
+ lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response)
+ local rows = parse_clickhouse_response(params, response.content)
+ return nil, rows
+ end
+end
+
--[[[
-- @function lua_clickhouse.insert(upstream, settings, params, query, rows,
ok_cb, fail_cb)
return rspamd_http.request(http_params)
end
+--[[[
+-- @function lua_clickhouse.generic_sync(upstream, settings, params, query,
+ ok_cb, fail_cb)
+-- Make a generic request to Clickhouse (e.g. alter)
+-- @param {upstream} upstream clickhouse server upstream
+-- @param {table} settings global settings table:
+-- * use_gsip: use gzip compression
+-- * timeout: request timeout
+-- * no_ssl_verify: skip SSL verification
+-- * user: HTTP user
+-- * password: HTTP password
+-- @param {params} HTTP request params
+-- @param {string} query Clickhouse query (passed in `query` request element with spaces escaped)
+-- @return {boolean} whether a connection was successful
+-- @example
+--
+--]]
+exports.generic_sync = function (upstream, settings, params, query)
+ local http_params = {}
+
+ for k,v in pairs(params) do http_params[k] = v end
+
+ http_params.gzip = settings.use_gzip
+ http_params.mime_type = 'text/plain'
+ http_params.timeout = settings.timeout or default_timeout
+ http_params.no_ssl_verify = settings.no_ssl_verify
+ http_params.user = settings.user
+ http_params.password = settings.password
+ http_params.log_obj = params.task or params.config
+ http_params.body = query
+
+ if not http_params.url then
+ local connect_prefix = "http://"
+ if settings.use_https then
+ connect_prefix = 'https://'
+ end
+ local ip_addr = upstream:get_addr():to_string(true)
+ http_params.url = connect_prefix .. ip_addr .. '/?default_format=JSONEachRow'
+ end
+
+ return rspamd_http.request(http_params)
+end
return exports
\ No newline at end of file
local ch_params = {
body = sql,
ev_base = ev_base,
- cfg = cfg,
+ config = cfg,
}
- local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
- function(_, rows)
- rspamd_logger.infox(rspamd_config,
- 'detached partition %s:%s on server %s', table_name, partition_id,
- settings['server'])
- end,
- function(_, err)
- rspamd_logger.errx(rspamd_config,
- "cannot detach partition %s:%s from server %s: %s",
- table_name, partition_id,
- settings['server'], err)
- end)
-
- if not ret then
+ local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql)
+ if err then
rspamd_logger.errx(rspamd_config,
- "cannot detach partition %s:%s from server %s: cannot make request",
- table_name, partition_id,
- settings['server'])
+ "cannot detach partition %s:%s from server %s: %s",
+ table_name, partition_id,
+ settings['server'], err)
+ return
end
+
+ rspamd_logger.infox(rspamd_config,
+ 'detached partition %s:%s on server %s', table_name, partition_id,
+ settings['server'])
+
end
--[[
local upstream = settings.upstream:get_upstream_round_robin()
local partition_to_remove_sql = "SELECT distinct partition, table FROM system.parts WHERE table in ('${tables}') and max_date <= toDate(now() - interval ${month} month);"
- local table_names = {}
- for table_name,_ in pairs(clickhouse_schema) do
- table.insert(table_names, settings[table_name])
- end
+ local table_names = {'rspamd'}
local tables = table.concat(table_names, "', '")
local sql_params = {
tables = tables,
ev_base = ev_base,
config = cfg,
}
- local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
- function(_, rows)
- fun.each(function(row)
- do_remove_partition(ev_base, cfg, row.table, row.partition)
- end, rows)
- end,
- function(_, err)
- rspamd_logger.errx(rspamd_config,
- "cannot send data to clickhouse server %s: %s",
- settings['server'], err)
- end)
- if not ret then
- rspamd_logger.errx(rspamd_config, "cannot send data to clickhouse server %s: cannot make request",
- settings['server'])
+ local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql)
+ if err then
+ rspamd_logger.errx(rspamd_config,
+ "cannot send data to clickhouse server %s: %s",
+ settings['server'], err)
+ else
+ fun.each(function(row)
+ do_remove_partition(ev_base, cfg, row.table, row.partition)
+ end, rows)
end
-- settings.retention.period is added on initialisation, see below
ev_base = ev_base,
config = cfg,
}
+
-- Apply schema sequentially
- local function sql_recursor(i)
- if clickhouse_schema[i] then
- local sql = clickhouse_schema[i]
- local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
- function(_, _)
- rspamd_logger.infox(rspamd_config, 'uploaded clickhouse schema element %s to %s',
- i, upstream:get_addr():to_string(true))
- sql_recursor(i + 1)
- end,
- function(_, err)
- rspamd_logger.errx(rspamd_config,
- "cannot upload schema '%s' on clickhouse server %s: %s",
- sql, upstream:get_addr():to_string(true), err)
- end)
- if not ret then
- rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: cannot make request",
- sql, upstream:get_addr():to_string(true))
- end
+ for i,v in ipairs(clickhouse_schema) do
+ local sql = v
+ local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql)
+
+ if err then
+ rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: %s",
+ sql, upstream:get_addr():to_string(true), err)
+ return
end
+ rspamd_logger.infox(rspamd_config, 'uploaded clickhouse schema element %s to %s',
+ i, upstream:get_addr():to_string(true))
end
-
- sql_recursor(1)
end
local function maybe_apply_migrations(upstream, ev_base, cfg, version)
config = cfg,
}
local sql = [[EXISTS TABLE rspamd]]
- local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
- function(_, rows)
- if rows[1] and rows[1].result then
- if tonumber(rows[1].result) == 1 then
- -- Apply migration
- rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration')
- maybe_apply_migrations(upstream, ev_base, cfg, 1)
- else
- -- Upload schema
- rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema')
- upload_clickhouse_schema(upstream, ev_base, cfg)
- end
- else
- rspamd_logger.errx(rspamd_config,
- "unexpected reply on EXISTS command from server %s: %s",
- upstream:get_addr():to_string(true), rows)
- end
- end ,
- function(_, err)
- rspamd_logger.errx(rspamd_config,
- "cannot check if rspamd table exists on clickhouse server %s: %s",
- upstream:get_addr():to_string(true), err)
- end)
- if not ret then
- rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: cannot make request",
- upstream:get_addr():to_string(true))
+ local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql)
+ if err then
+ rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: %s",
+ upstream:get_addr():to_string(true), err)
+ return
+ end
+
+ if rows[1] and rows[1].result then
+ if tonumber(rows[1].result) == 1 then
+ -- Apply migration
+ rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration')
+ maybe_apply_migrations(upstream, ev_base, cfg, 1)
+ else
+ -- Upload schema
+ rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema')
+ upload_clickhouse_schema(upstream, ev_base, cfg)
+ end
+ else
+ rspamd_logger.errx(rspamd_config,
+ "unexpected reply on EXISTS command from server %s: %s",
+ upstream:get_addr():to_string(true), rows)
end
end
for k,rule in pairs(settings.custom_rules) do
if rule.schema then
local sql = rspamd_lua_utils.template(rule.schema, settings)
- local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
- nil,
- function(_, err)
- rspamd_logger.errx(rspamd_config,
- "cannot send custom schema %s to clickhouse server %s: %s",
- k, upstream:get_addr():to_string(true), err)
- end)
- if not ret then
- rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request",
- k, upstream:get_addr():to_string(true))
+ local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql)
+ if err then
+ rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request (%s)",
+ k, upstream:get_addr():to_string(true), err)
end
end
end
-- Now check the main schema and apply migrations if needed
local sql = [[SELECT MAX(Version) as v FROM rspamd_version]]
- local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
- function(_, rows)
- local version = tonumber(rows[1].v)
- maybe_apply_migrations(upstream, ev_base, cfg, version)
- end,
- function(_, err)
- -- It might be either no rspamd table or version 1
- rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table')
- check_rspamd_table(upstream, ev_base, cfg)
- end)
- if not ret then
- rspamd_logger.errx(rspamd_config, "cannot get version on clickhouse server %s: cannot make request",
- upstream:get_addr():to_string(true))
+ local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql)
+ if err then
+ if rows and rows.code == 404 then
+ rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table')
+ check_rspamd_table(upstream, ev_base, cfg)
+ else
+ rspamd_logger.errx(rspamd_config, "cannot get version on clickhouse server %s: %s",
+ upstream:get_addr():to_string(true), err)
+ end
+ else
+ local version = tonumber(rows[1].v)
+ maybe_apply_migrations(upstream, ev_base, cfg, version)
end
end