|
|
@@ -447,10 +447,10 @@ local function clickhouse_collect(task) |
|
|
|
nurls = #task:get_urls(true) |
|
|
|
end |
|
|
|
|
|
|
|
local timestamp = task:get_date({ |
|
|
|
local timestamp = math.floor(task:get_date({ |
|
|
|
format = 'connect', |
|
|
|
gmt = true, -- The only sane way to sync stuff with different timezones |
|
|
|
}) |
|
|
|
})) |
|
|
|
|
|
|
|
local action = task:get_metric_action('default') |
|
|
|
local digest = task:get_digest() |
|
|
@@ -731,18 +731,26 @@ local function upload_clickhouse_schema(upstream, ev_base, cfg) |
|
|
|
config = cfg, |
|
|
|
} |
|
|
|
|
|
|
|
local errored = false |
|
|
|
|
|
|
|
-- Apply schema sequentially |
|
|
|
fun.each(function(v) |
|
|
|
if errored then |
|
|
|
rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: due to previous errors", |
|
|
|
v, upstream:get_addr():to_string(true)) |
|
|
|
return |
|
|
|
end |
|
|
|
local sql = v |
|
|
|
local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql) |
|
|
|
local err, reply = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql) |
|
|
|
|
|
|
|
if err then |
|
|
|
rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: %s", |
|
|
|
sql, upstream:get_addr():to_string(true), err) |
|
|
|
errored = true |
|
|
|
return |
|
|
|
end |
|
|
|
rspamd_logger.debugm(N, rspamd_config, 'uploaded clickhouse schema element %s to %s', |
|
|
|
v, upstream:get_addr():to_string(true)) |
|
|
|
rspamd_logger.debugm(N, rspamd_config, 'uploaded clickhouse schema element %s to %s: %s', |
|
|
|
v, upstream:get_addr():to_string(true), reply) |
|
|
|
end, |
|
|
|
-- Also template schema version |
|
|
|
fun.map(function(v) |