Browse Source

[Rework] Rework Clickhouse plugin to use the new API

tags/1.8.0
Vsevolod Stakhov 5 years ago
parent
commit
3028e8571a
1 changed files with 117 additions and 234 deletions
  1. 117
    234
      src/plugins/lua/clickhouse.lua

+ 117
- 234
src/plugins/lua/clickhouse.lua View File

@@ -28,13 +28,11 @@ if confighelp then
return
end

local E = {}

local main_rows = {}
local attachment_rows = {}
local urls_rows = {}
local emails_rows = {}
local specific_rows = {}
--local specific_rows = {}
local asn_rows = {}
local symbols_rows = {}
local custom_rows = {}
@@ -252,34 +250,8 @@ local function clickhouse_asn_row(tname)
return elt
end

local function clickhouse_first_row()
table.insert(main_rows, clickhouse_main_row(settings['table']))
if settings['attachments_table'] then
table.insert(attachment_rows,
clickhouse_attachments_row(settings['attachments_table']))
end
if settings['urls_table'] then
table.insert(urls_rows,
clickhouse_urls_row(settings['urls_table']))
end
if settings['emails_table'] then
table.insert(emails_rows,
clickhouse_emails_row(settings['emails_table']))
end
if settings['asn_table'] then
table.insert(asn_rows,
clickhouse_asn_row(settings['asn_table']))
end
if settings.enable_symbols and settings['symbols_table'] then
table.insert(symbols_rows,
clickhouse_symbols_row(settings['symbols_table']))
end

for k,rule in pairs(settings.custom_rules) do
if not custom_rows[k] then custom_rows[k] = {} end
table.insert(custom_rows[k],
rule.first_row())
end
local function today(ts)
return os.date('%Y-%m-%d', ts)
end

local function clickhouse_check_symbol(task, symbols, need_score)
@@ -301,174 +273,73 @@ local function clickhouse_send_data(task)
local upstream = settings.upstream:get_upstream_round_robin()
local ip_addr = upstream:get_addr():to_string(true)

local function gen_http_cb(what, how_many)
return function (err_message, code, data, _)
if code ~= 200 or err_message then
if not err_message then err_message = data end
rspamd_logger.errx(task, "cannot send %s data to clickhouse server %s: %s",
what, ip_addr, err_message)
upstream:fail()
else
rspamd_logger.infox(task, "sent %s rows of %s to clickhouse server %s",
how_many - 1, what, ip_addr)
upstream:ok()
end
local function gen_success_cb(what, how_many)
return function (_, _)
rspamd_logger.infox(task, "sent %s rows of %s to clickhouse server %s",
how_many, what, ip_addr)
upstream:ok()
end
end

local body = table.concat(main_rows, ' ')
if not rspamd_http.request({
task = task,
url = connect_prefix .. ip_addr,
body = body,
callback = gen_http_cb('generic data', #main_rows),
gzip = settings.use_gzip,
mime_type = 'text/plain',
timeout = settings['timeout'],
no_ssl_verify = settings.no_ssl_verify,
user = settings.user,
password = settings.password,
}) then
rspamd_logger.errx(task, "cannot send data to clickhouse server %s: cannot make request",
settings['server'])
local function gen_fail_cb(what, how_many)
return function (_, err)
rspamd_logger.errx(task, "cannot send %s rows of %s data to clickhouse server %s: %s",
how_many, what, ip_addr, err)
upstream:fail()
end
end

if #attachment_rows > 1 then
body = table.concat(attachment_rows, ' ')
if not rspamd_http.request({
local function send_data(what, tbl, query)
local ch_params = {
task = task,
url = connect_prefix .. ip_addr,
body = body,
callback = gen_http_cb('attachments data', #attachment_rows),
mime_type = 'text/plain',
timeout = settings['timeout'],
no_ssl_verify = settings.no_ssl_verify,
user = settings.user,
password = settings.password,
}) then
rspamd_logger.errx(task, "cannot send attachments to clickhouse server %s: cannot make request",
settings['server'])
}
local ret = lua_clickhouse.insert(upstream, settings, ch_params,
query, tbl,
gen_success_cb(what, #tbl),
gen_fail_cb(what, #tbl))
if not ret then
rspamd_logger.errx(task, "cannot send %s rows of %s data to clickhouse server %s: %s",
#tbl, what, ip_addr, 'cannot make HTTP request')
end
end

send_data('generic data', main_rows,
clickhouse_main_row(settings['table']))


if #attachment_rows > 1 then
send_data('attachments data', attachment_rows,
clickhouse_attachments_row(settings.attachments_table))
end

if #urls_rows > 1 then
body = table.concat(urls_rows, ' ')
if not rspamd_http.request({
task = task,
url = connect_prefix .. ip_addr,
body = body,
callback = gen_http_cb('urls data', #urls_rows),
mime_type = 'text/plain',
timeout = settings['timeout'],
no_ssl_verify = settings.no_ssl_verify,
user = settings.user,
password = settings.password,
}) then
rspamd_logger.errx(task, "cannot send urls to clickhouse server %s: cannot make request",
settings['server'])
end
send_data('urls data', urls_rows,
clickhouse_urls_row(settings.urls_table))
end

if #emails_rows > 1 then
body = table.concat(emails_rows, ' ')
if not rspamd_http.request({
task = task,
url = connect_prefix .. ip_addr,
body = body,
callback = gen_http_cb('emails data', #emails_rows),
mime_type = 'text/plain',
timeout = settings['timeout'],
no_ssl_verify = settings.no_ssl_verify,
user = settings.user,
password = settings.password,
}) then
rspamd_logger.errx(task, "cannot send emails to clickhouse server %s: cannot make request",
settings['server'])
end
send_data('emails data', emails_rows,
clickhouse_emails_row(settings.emails_table))
end

if #asn_rows > 1 then
body = table.concat(asn_rows, ' ')
if not rspamd_http.request({
task = task,
url = connect_prefix .. ip_addr,
body = body,
callback = gen_http_cb('asn data', #asn_rows),
mime_type = 'text/plain',
timeout = settings['timeout'],
no_ssl_verify = settings.no_ssl_verify,
user = settings.user,
password = settings.password,
}) then
rspamd_logger.errx(task, "cannot send asn info to clickhouse server %s: cannot make request",
settings['server'])
end
send_data('asn data', asn_rows,
clickhouse_asn_row(settings.asn_table))
end

if #symbols_rows > 1 then
body = table.concat(symbols_rows, ' ')
if not rspamd_http.request({
task = task,
url = connect_prefix .. ip_addr,
body = body,
callback = gen_http_cb('symbols data', #symbols_rows),
mime_type = 'text/plain',
timeout = settings['timeout'],
no_ssl_verify = settings.no_ssl_verify,
user = settings.user,
password = settings.password,
}) then
rspamd_logger.errx(task, "cannot send symbols info to clickhouse server %s: cannot make request",
settings['server'])
end
end

for k,specific in pairs(specific_rows) do
if #specific > 1 then
body = table.concat(specific, ' ')
if not rspamd_http.request({
task = task,
url = connect_prefix .. ip_addr,
body = body,
callback = gen_http_cb('domain specific data ('..k..')', #specific),
mime_type = 'text/plain',
timeout = settings['timeout'],
no_ssl_verify = settings.no_ssl_verify,
user = settings.user,
password = settings.password,
}) then
rspamd_logger.errx(task, "cannot send data for domain %s to clickhouse server %s: cannot make request",
k, settings['server'])
end
end
send_data('symbols data', symbols_rows,
clickhouse_symbols_row(settings.symbols_table))
end

for k,crows in pairs(custom_rows) do
if #crows > 1 then
body = table.concat(crows, ' ')
if not rspamd_http.request({
task = task,
url = connect_prefix .. ip_addr,
body = body,
callback = gen_http_cb('custom data ('..k..')', #crows),
mime_type = 'text/plain',
timeout = settings['timeout'],
no_ssl_verify = settings.no_ssl_verify,
user = settings.user,
password = settings.password,
}) then
rspamd_logger.errx(task, "cannot send custom data %s to clickhouse server %s: cannot make request",
k, settings['server'])
end
send_data('custom data ('..k..')', settings.custom_rules[k].first_row(),
crows)
end
end
end

local function clickhouse_quote(str)
if str then
return str:gsub('[\'\\]', '\\%1'):lower()
else
return ''
end
end

local function clickhouse_collect(task)
if not settings.allow_local and rspamd_lua_utils.is_rspamc_or_controller(task) then return end

@@ -615,16 +486,34 @@ local function clickhouse_collect(task)
gmt = false
})

local elt = string.format("(today(),%d,'%s','%s','%s',%.2f,%d,%d,'%s','%s','%s','%s','%s','%s',%d,'%s','%s','%s','%s','%s','%s','%s')",
timestamp,
clickhouse_quote(from_domain), clickhouse_quote(mime_domain), ip_str, score,
nrcpts, task:get_size(), whitelist, bayes, fuzzy, fann,
dkim, dmarc, nurls, task:get_metric_action('default'),
clickhouse_quote(from_user), clickhouse_quote(mime_user),
clickhouse_quote(rcpt_user), clickhouse_quote(rcpt_domain),
clickhouse_quote(list_id), task:get_digest())
table.insert(main_rows, elt)
local action = task:get_metric_action('default')

table.insert(main_rows, {
today(timestamp),
timestamp,
from_domain,
mime_domain,
ip_str,
score,
nrcpts,
task:get_size(),
whitelist,
bayes,
fuzzy,
fann,
dkim,
dmarc,
nurls,
action,
from_user,
mime_user,
rcpt_user,
rcpt_domain,
list_id,
task:get_digest()
})

--[[ TODO: has been broken
if settings['from_map'] and dkim == 'allow' then
-- Use dkim
local das = task:get_symbol(settings['dkim_allow_symbols'][1])
@@ -632,16 +521,14 @@ local function clickhouse_collect(task)
for _,dkim_domain in ipairs(das[1]['options']) do
local specific = settings.from_map:get_key(dkim_domain)
if specific then
if not specific_rows[specific] then
local first = clickhouse_main_row(specific)
specific_rows[specific] = {first}
end
specific_rows[specific] = {}
table.insert(specific_rows[specific], elt)
end
end
end

end
--]]

-- Attachments step
local attachments_fnames = {}
@@ -652,23 +539,24 @@ local function clickhouse_collect(task)
local fname = part:get_filename()

if fname then
table.insert(attachments_fnames, string.format("'%s'", clickhouse_quote(fname)))
table.insert(attachments_fnames, fname)
local type, subtype = part:get_type()
table.insert(attachments_ctypes, string.format("'%s/%s'",
clickhouse_quote(type), clickhouse_quote(subtype)))
table.insert(attachments_lengths, string.format("%s", tostring(part:get_length())))
table.insert(attachments_digests, string.format("'%s'", string.sub(part:get_digest(), 1, 16)))
table.insert(attachments_ctypes, string.format("%s/%s",
type, subtype))
table.insert(attachments_lengths, part:get_length())
table.insert(attachments_digests, string.sub(part:get_digest(), 1, 16))
end
end

if #attachments_fnames > 0 then
elt = string.format("(today(),'%s',[%s],[%s],[%s],[%s])",
table.insert(attachment_rows, {
today(timestamp),
task:get_digest(),
table.concat(attachments_fnames, ','),
table.concat(attachments_ctypes, ','),
table.concat(attachments_lengths, ','),
table.concat(attachments_digests, ','))
table.insert(attachment_rows, elt)
attachments_fnames,
attachments_ctypes,
attachments_lengths,
attachments_digests,
})
end

-- Urls step
@@ -676,40 +564,39 @@ local function clickhouse_collect(task)
local urls_urls = {}
if task:has_urls(false) then
for _,u in ipairs(task:get_urls(false)) do
table.insert(urls_tlds, string.format("'%s'", clickhouse_quote(u:get_tld())))
table.insert(urls_tlds, u:get_tld())
if settings['full_urls'] then
table.insert(urls_urls, string.format("'%s'",
clickhouse_quote(u:get_text())))
table.insert(urls_urls, u:get_text())
else
table.insert(urls_urls, string.format("'%s'",
clickhouse_quote(u:get_host())))
table.insert(urls_urls, u:get_host())
end
end
end

if #urls_tlds > 0 then
elt = string.format("(today(),'%s',[%s],[%s])",
table.insert(urls_rows, {
today(timestamp),
task:get_digest(),
table.concat(urls_tlds, ','),
table.concat(urls_urls, ','))
table.insert(urls_rows, elt)
urls_tlds,
urls_urls
})
end

-- Emails step
local emails = {}
if task:has_urls(true) then
for _,u in ipairs(task:get_emails()) do
table.insert(emails, string.format("'%s'", clickhouse_quote(
string.format('%s@%s', u:get_user(), u:get_host())
)))
table.insert(emails,
string.format('%s@%s', u:get_user(), u:get_host()))
end
end

if #emails > 0 then
elt = string.format("(today(),'%s',[%s])",
task:get_digest(),
table.concat(emails, ','))
table.insert(emails_rows, elt)
table.insert(emails_rows, {
today(timestamp),
task:get_digest(),
emails,
})
end

-- ASN information
@@ -728,10 +615,13 @@ local function clickhouse_collect(task)
if ret then
ipnet = ret
end
elt = string.format("(today(),'%s','%s','%s','%s')",
table.insert(asn_rows, {
today(timestamp),
task:get_digest(),
clickhouse_quote(asn), clickhouse_quote(country), clickhouse_quote(ipnet))
table.insert(asn_rows, elt)
asn,
country,
ipnet
})
end

-- Symbols info
@@ -742,25 +632,22 @@ local function clickhouse_collect(task)
local options_tab = {}

for _,s in ipairs(symbols) do
table.insert(syms_tab, string.format("'%s'",
clickhouse_quote(s.name or '')))
table.insert(scores_tab, string.format('%.3f', s.score))
table.insert(syms_tab, s.name or '')
table.insert(scores_tab, s.score)

if s.options then
table.insert(options_tab, string.format("'%s'",
clickhouse_quote(table.concat(s.options, ','))))
table.insert(options_tab, table.concat(s.options, ','))
else
table.insert(options_tab, "''");
end
end

elt = string.format("(today(),'%s',[%s],[%s],[%s])",
task:get_digest(),
table.concat(syms_tab, ','),
table.concat(scores_tab, ','),
table.concat(options_tab, ','))

table.insert(symbols_rows, elt)
table.insert(symbols_rows, {
today(timestamp),
syms_tab,
scores_tab,
options_tab
})
end

-- Custom data
@@ -778,11 +665,9 @@ local function clickhouse_collect(task)
attachment_rows = {}
urls_rows = {}
emails_rows = {}
specific_rows = {}
asn_rows = {}
symbols_rows = {}
custom_rows = {}
clickhouse_first_row()
end
end

@@ -981,8 +866,6 @@ if opts then
return
end


clickhouse_first_row()
rspamd_config:register_symbol({
name = 'CLICKHOUSE_COLLECT',
type = 'idempotent',

Loading…
Cancel
Save