|
|
@@ -36,6 +36,7 @@ local emails_rows = {} |
|
|
|
local specific_rows = {} |
|
|
|
local asn_rows = {} |
|
|
|
local symbols_rows = {} |
|
|
|
local custom_rows = {} |
|
|
|
local nrows = 0 |
|
|
|
local connect_prefix = 'http://' |
|
|
|
|
|
|
@@ -69,6 +70,7 @@ local settings = { |
|
|
|
user = nil, |
|
|
|
password = nil, |
|
|
|
no_ssl_verify = false, |
|
|
|
custom_rules = {}, |
|
|
|
retention = { |
|
|
|
enable = false, |
|
|
|
method = 'detach', |
|
|
@@ -271,6 +273,12 @@ local function clickhouse_first_row() |
|
|
|
table.insert(symbols_rows, |
|
|
|
clickhouse_symbols_row(settings['symbols_table'])) |
|
|
|
end |
|
|
|
|
|
|
|
for k,rule in pairs(settings.custom_rules) do |
|
|
|
if not custom_rows[k] then custom_rows[k] = {} end |
|
|
|
table.insert(custom_rows[k], |
|
|
|
rule.first_row()) |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
local function clickhouse_check_symbol(task, symbols, need_score) |
|
|
@@ -430,6 +438,26 @@ local function clickhouse_send_data(task) |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
for k,crows in pairs(custom_rows) do |
|
|
|
if #crows > 1 then |
|
|
|
body = table.concat(crows, ' ') |
|
|
|
if not rspamd_http.request({ |
|
|
|
task = task, |
|
|
|
url = connect_prefix .. ip_addr, |
|
|
|
body = body, |
|
|
|
callback = gen_http_cb('custom data ('..k..')', #crows), |
|
|
|
mime_type = 'text/plain', |
|
|
|
timeout = settings['timeout'], |
|
|
|
no_ssl_verify = settings.no_ssl_verify, |
|
|
|
user = settings.user, |
|
|
|
password = settings.password, |
|
|
|
}) then |
|
|
|
rspamd_logger.errx(task, "cannot send custom data %s to clickhouse server %s: cannot make request", |
|
|
|
k, settings['server']) |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
local function clickhouse_quote(str) |
|
|
@@ -734,6 +762,12 @@ local function clickhouse_collect(task) |
|
|
|
table.insert(symbols_rows, elt) |
|
|
|
end |
|
|
|
|
|
|
|
-- Custom data |
|
|
|
for k,rule in pairs(settings.custom_rules) do |
|
|
|
if not custom_rows[k] then custom_rows[k] = {} end |
|
|
|
table.insert(custom_rows[k], rule.get_row(task)) |
|
|
|
end |
|
|
|
|
|
|
|
nrows = nrows + 1 |
|
|
|
|
|
|
|
if nrows > settings['limit'] then |
|
|
@@ -746,6 +780,7 @@ local function clickhouse_collect(task) |
|
|
|
specific_rows = {} |
|
|
|
asn_rows = {} |
|
|
|
symbols_rows = {} |
|
|
|
custom_rows = {} |
|
|
|
clickhouse_first_row() |
|
|
|
end |
|
|
|
end |
|
|
@@ -921,7 +956,48 @@ end |
|
|
|
local opts = rspamd_config:get_all_opt('clickhouse') |
|
|
|
if opts then |
|
|
|
for k,v in pairs(opts) do |
|
|
|
settings[k] = v |
|
|
|
if k == 'custom_rules' then |
|
|
|
if not v[1] then |
|
|
|
v = {v} |
|
|
|
end |
|
|
|
|
|
|
|
for i,rule in ipairs(v) do |
|
|
|
if rule.schema and rule.first_row and rule.get_row then |
|
|
|
local first_row, get_row |
|
|
|
local loadstring = loadstring or load |
|
|
|
local ret, res_or_err = pcall(loadstring(rule.first_row)) |
|
|
|
|
|
|
|
if not ret or type(res_or_err) ~= 'function' then |
|
|
|
rspamd_logger.errx(rspamd_config, 'invalid first_row (%s) - must be a function', |
|
|
|
res_or_err) |
|
|
|
else |
|
|
|
first_row = res_or_err |
|
|
|
end |
|
|
|
|
|
|
|
ret, res_or_err = pcall(loadstring(rule.get_row)) |
|
|
|
|
|
|
|
if not ret or type(res_or_err) ~= 'function' then |
|
|
|
rspamd_logger.errx(rspamd_config, 'invalid get_row (%s) - must be a function', |
|
|
|
res_or_err) |
|
|
|
else |
|
|
|
get_row = res_or_err |
|
|
|
end |
|
|
|
|
|
|
|
if first_row and get_row then |
|
|
|
local name = rule.name or tostring(i) |
|
|
|
settings.custom_rules[name] = { |
|
|
|
schema = rule.schema, |
|
|
|
first_row = first_row, |
|
|
|
get_row = get_row, |
|
|
|
} |
|
|
|
end |
|
|
|
else |
|
|
|
rspamd_logger.errx(rspamd_config, 'custom rule has no required attributes: schema, first_row and get_row') |
|
|
|
end |
|
|
|
end |
|
|
|
else |
|
|
|
settings[k] = v |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
if not settings['server'] and not settings['servers'] then |
|
|
@@ -944,6 +1020,7 @@ if opts then |
|
|
|
return |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
clickhouse_first_row() |
|
|
|
rspamd_config:register_symbol({ |
|
|
|
name = 'CLICKHOUSE_COLLECT', |
|
|
@@ -997,6 +1074,12 @@ if opts then |
|
|
|
for tab,sql in pairs(clickhouse_schema) do |
|
|
|
send_req(tab, rspamd_lua_utils.template(sql, settings)) |
|
|
|
end |
|
|
|
|
|
|
|
for k,rule in pairs(settings.custom_rules) do |
|
|
|
if rule.schema then |
|
|
|
send_req(k, rspamd_lua_utils.template(rule.schema, settings)) |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
if settings.retention.enable and settings.retention.method ~= 'drop' and settings.retention.method ~= 'detach' then |