summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-08-06 17:56:17 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-08-06 18:00:34 +0100
commit3028e8571abd3ca3112d36710776b66c1009051c (patch)
treebbf0c9e3d46deb3b1b46287957fdc926dd1ee9cb /src
parentf1f676fadcf4fe650c666184317d8603dfa4214c (diff)
downloadrspamd-3028e8571abd3ca3112d36710776b66c1009051c.tar.gz
rspamd-3028e8571abd3ca3112d36710776b66c1009051c.zip
[Rework] Rework Clickhouse plugin to use the new API
Diffstat (limited to 'src')
-rw-r--r--src/plugins/lua/clickhouse.lua351
1 files changed, 117 insertions, 234 deletions
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
index a4fd07034..3eebd5b5d 100644
--- a/src/plugins/lua/clickhouse.lua
+++ b/src/plugins/lua/clickhouse.lua
@@ -28,13 +28,11 @@ if confighelp then
return
end
-local E = {}
-
local main_rows = {}
local attachment_rows = {}
local urls_rows = {}
local emails_rows = {}
-local specific_rows = {}
+--local specific_rows = {}
local asn_rows = {}
local symbols_rows = {}
local custom_rows = {}
@@ -252,34 +250,8 @@ local function clickhouse_asn_row(tname)
return elt
end
-local function clickhouse_first_row()
- table.insert(main_rows, clickhouse_main_row(settings['table']))
- if settings['attachments_table'] then
- table.insert(attachment_rows,
- clickhouse_attachments_row(settings['attachments_table']))
- end
- if settings['urls_table'] then
- table.insert(urls_rows,
- clickhouse_urls_row(settings['urls_table']))
- end
- if settings['emails_table'] then
- table.insert(emails_rows,
- clickhouse_emails_row(settings['emails_table']))
- end
- if settings['asn_table'] then
- table.insert(asn_rows,
- clickhouse_asn_row(settings['asn_table']))
- end
- if settings.enable_symbols and settings['symbols_table'] then
- table.insert(symbols_rows,
- clickhouse_symbols_row(settings['symbols_table']))
- end
-
- for k,rule in pairs(settings.custom_rules) do
- if not custom_rows[k] then custom_rows[k] = {} end
- table.insert(custom_rows[k],
- rule.first_row())
- end
+local function today(ts)
+ return os.date('%Y-%m-%d', ts)
end
local function clickhouse_check_symbol(task, symbols, need_score)
@@ -301,174 +273,73 @@ local function clickhouse_send_data(task)
local upstream = settings.upstream:get_upstream_round_robin()
local ip_addr = upstream:get_addr():to_string(true)
- local function gen_http_cb(what, how_many)
- return function (err_message, code, data, _)
- if code ~= 200 or err_message then
- if not err_message then err_message = data end
- rspamd_logger.errx(task, "cannot send %s data to clickhouse server %s: %s",
- what, ip_addr, err_message)
- upstream:fail()
- else
- rspamd_logger.infox(task, "sent %s rows of %s to clickhouse server %s",
- how_many - 1, what, ip_addr)
- upstream:ok()
- end
+ local function gen_success_cb(what, how_many)
+ return function (_, _)
+ rspamd_logger.infox(task, "sent %s rows of %s to clickhouse server %s",
+ how_many, what, ip_addr)
+ upstream:ok()
end
end
- local body = table.concat(main_rows, ' ')
- if not rspamd_http.request({
- task = task,
- url = connect_prefix .. ip_addr,
- body = body,
- callback = gen_http_cb('generic data', #main_rows),
- gzip = settings.use_gzip,
- mime_type = 'text/plain',
- timeout = settings['timeout'],
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- }) then
- rspamd_logger.errx(task, "cannot send data to clickhouse server %s: cannot make request",
- settings['server'])
+ local function gen_fail_cb(what, how_many)
+ return function (_, err)
+ rspamd_logger.errx(task, "cannot send %s rows of %s data to clickhouse server %s: %s",
+ how_many, what, ip_addr, err)
+ upstream:fail()
+ end
end
- if #attachment_rows > 1 then
- body = table.concat(attachment_rows, ' ')
- if not rspamd_http.request({
+ local function send_data(what, tbl, query)
+ local ch_params = {
task = task,
- url = connect_prefix .. ip_addr,
- body = body,
- callback = gen_http_cb('attachments data', #attachment_rows),
- mime_type = 'text/plain',
- timeout = settings['timeout'],
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- }) then
- rspamd_logger.errx(task, "cannot send attachments to clickhouse server %s: cannot make request",
- settings['server'])
+ }
+ local ret = lua_clickhouse.insert(upstream, settings, ch_params,
+ query, tbl,
+ gen_success_cb(what, #tbl),
+ gen_fail_cb(what, #tbl))
+ if not ret then
+ rspamd_logger.errx(task, "cannot send %s rows of %s data to clickhouse server %s: %s",
+ #tbl, what, ip_addr, 'cannot make HTTP request')
end
end
+
+ send_data('generic data', main_rows,
+ clickhouse_main_row(settings['table']))
+
+
+ if #attachment_rows > 1 then
+ send_data('attachments data', attachment_rows,
+ clickhouse_attachments_row(settings.attachments_table))
+ end
+
if #urls_rows > 1 then
- body = table.concat(urls_rows, ' ')
- if not rspamd_http.request({
- task = task,
- url = connect_prefix .. ip_addr,
- body = body,
- callback = gen_http_cb('urls data', #urls_rows),
- mime_type = 'text/plain',
- timeout = settings['timeout'],
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- }) then
- rspamd_logger.errx(task, "cannot send urls to clickhouse server %s: cannot make request",
- settings['server'])
- end
+ send_data('urls data', urls_rows,
+ clickhouse_urls_row(settings.urls_table))
end
+
if #emails_rows > 1 then
- body = table.concat(emails_rows, ' ')
- if not rspamd_http.request({
- task = task,
- url = connect_prefix .. ip_addr,
- body = body,
- callback = gen_http_cb('emails data', #emails_rows),
- mime_type = 'text/plain',
- timeout = settings['timeout'],
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- }) then
- rspamd_logger.errx(task, "cannot send emails to clickhouse server %s: cannot make request",
- settings['server'])
- end
+ send_data('emails data', emails_rows,
+ clickhouse_emails_row(settings.emails_table))
end
+
if #asn_rows > 1 then
- body = table.concat(asn_rows, ' ')
- if not rspamd_http.request({
- task = task,
- url = connect_prefix .. ip_addr,
- body = body,
- callback = gen_http_cb('asn data', #asn_rows),
- mime_type = 'text/plain',
- timeout = settings['timeout'],
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- }) then
- rspamd_logger.errx(task, "cannot send asn info to clickhouse server %s: cannot make request",
- settings['server'])
- end
+ send_data('asn data', asn_rows,
+ clickhouse_asn_row(settings.asn_table))
end
if #symbols_rows > 1 then
- body = table.concat(symbols_rows, ' ')
- if not rspamd_http.request({
- task = task,
- url = connect_prefix .. ip_addr,
- body = body,
- callback = gen_http_cb('symbols data', #symbols_rows),
- mime_type = 'text/plain',
- timeout = settings['timeout'],
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- }) then
- rspamd_logger.errx(task, "cannot send symbols info to clickhouse server %s: cannot make request",
- settings['server'])
- end
- end
-
- for k,specific in pairs(specific_rows) do
- if #specific > 1 then
- body = table.concat(specific, ' ')
- if not rspamd_http.request({
- task = task,
- url = connect_prefix .. ip_addr,
- body = body,
- callback = gen_http_cb('domain specific data ('..k..')', #specific),
- mime_type = 'text/plain',
- timeout = settings['timeout'],
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- }) then
- rspamd_logger.errx(task, "cannot send data for domain %s to clickhouse server %s: cannot make request",
- k, settings['server'])
- end
- end
+ send_data('symbols data', symbols_rows,
+ clickhouse_symbols_row(settings.symbols_table))
end
for k,crows in pairs(custom_rows) do
if #crows > 1 then
- body = table.concat(crows, ' ')
- if not rspamd_http.request({
- task = task,
- url = connect_prefix .. ip_addr,
- body = body,
- callback = gen_http_cb('custom data ('..k..')', #crows),
- mime_type = 'text/plain',
- timeout = settings['timeout'],
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- }) then
- rspamd_logger.errx(task, "cannot send custom data %s to clickhouse server %s: cannot make request",
- k, settings['server'])
- end
+ send_data('custom data ('..k..')', settings.custom_rules[k].first_row(),
+ crows)
end
end
end
-local function clickhouse_quote(str)
- if str then
- return str:gsub('[\'\\]', '\\%1'):lower()
- else
- return ''
- end
-end
-
local function clickhouse_collect(task)
if not settings.allow_local and rspamd_lua_utils.is_rspamc_or_controller(task) then return end
@@ -615,16 +486,34 @@ local function clickhouse_collect(task)
gmt = false
})
- local elt = string.format("(today(),%d,'%s','%s','%s',%.2f,%d,%d,'%s','%s','%s','%s','%s','%s',%d,'%s','%s','%s','%s','%s','%s','%s')",
- timestamp,
- clickhouse_quote(from_domain), clickhouse_quote(mime_domain), ip_str, score,
- nrcpts, task:get_size(), whitelist, bayes, fuzzy, fann,
- dkim, dmarc, nurls, task:get_metric_action('default'),
- clickhouse_quote(from_user), clickhouse_quote(mime_user),
- clickhouse_quote(rcpt_user), clickhouse_quote(rcpt_domain),
- clickhouse_quote(list_id), task:get_digest())
- table.insert(main_rows, elt)
+ local action = task:get_metric_action('default')
+
+ table.insert(main_rows, {
+ today(timestamp),
+ timestamp,
+ from_domain,
+ mime_domain,
+ ip_str,
+ score,
+ nrcpts,
+ task:get_size(),
+ whitelist,
+ bayes,
+ fuzzy,
+ fann,
+ dkim,
+ dmarc,
+ nurls,
+ action,
+ from_user,
+ mime_user,
+ rcpt_user,
+ rcpt_domain,
+ list_id,
+ task:get_digest()
+ })
+--[[ TODO: has been broken
if settings['from_map'] and dkim == 'allow' then
-- Use dkim
local das = task:get_symbol(settings['dkim_allow_symbols'][1])
@@ -632,16 +521,14 @@ local function clickhouse_collect(task)
for _,dkim_domain in ipairs(das[1]['options']) do
local specific = settings.from_map:get_key(dkim_domain)
if specific then
- if not specific_rows[specific] then
- local first = clickhouse_main_row(specific)
- specific_rows[specific] = {first}
- end
+ specific_rows[specific] = {}
table.insert(specific_rows[specific], elt)
end
end
end
end
+--]]
-- Attachments step
local attachments_fnames = {}
@@ -652,23 +539,24 @@ local function clickhouse_collect(task)
local fname = part:get_filename()
if fname then
- table.insert(attachments_fnames, string.format("'%s'", clickhouse_quote(fname)))
+ table.insert(attachments_fnames, fname)
local type, subtype = part:get_type()
- table.insert(attachments_ctypes, string.format("'%s/%s'",
- clickhouse_quote(type), clickhouse_quote(subtype)))
- table.insert(attachments_lengths, string.format("%s", tostring(part:get_length())))
- table.insert(attachments_digests, string.format("'%s'", string.sub(part:get_digest(), 1, 16)))
+ table.insert(attachments_ctypes, string.format("%s/%s",
+ type, subtype))
+ table.insert(attachments_lengths, part:get_length())
+ table.insert(attachments_digests, string.sub(part:get_digest(), 1, 16))
end
end
if #attachments_fnames > 0 then
- elt = string.format("(today(),'%s',[%s],[%s],[%s],[%s])",
+ table.insert(attachment_rows, {
+ today(timestamp),
task:get_digest(),
- table.concat(attachments_fnames, ','),
- table.concat(attachments_ctypes, ','),
- table.concat(attachments_lengths, ','),
- table.concat(attachments_digests, ','))
- table.insert(attachment_rows, elt)
+ attachments_fnames,
+ attachments_ctypes,
+ attachments_lengths,
+ attachments_digests,
+ })
end
-- Urls step
@@ -676,40 +564,39 @@ local function clickhouse_collect(task)
local urls_urls = {}
if task:has_urls(false) then
for _,u in ipairs(task:get_urls(false)) do
- table.insert(urls_tlds, string.format("'%s'", clickhouse_quote(u:get_tld())))
+ table.insert(urls_tlds, u:get_tld())
if settings['full_urls'] then
- table.insert(urls_urls, string.format("'%s'",
- clickhouse_quote(u:get_text())))
+ table.insert(urls_urls, u:get_text())
else
- table.insert(urls_urls, string.format("'%s'",
- clickhouse_quote(u:get_host())))
+ table.insert(urls_urls, u:get_host())
end
end
end
if #urls_tlds > 0 then
- elt = string.format("(today(),'%s',[%s],[%s])",
+ table.insert(urls_rows, {
+ today(timestamp),
task:get_digest(),
- table.concat(urls_tlds, ','),
- table.concat(urls_urls, ','))
- table.insert(urls_rows, elt)
+ urls_tlds,
+ urls_urls
+ })
end
-- Emails step
local emails = {}
if task:has_urls(true) then
for _,u in ipairs(task:get_emails()) do
- table.insert(emails, string.format("'%s'", clickhouse_quote(
- string.format('%s@%s', u:get_user(), u:get_host())
- )))
+ table.insert(emails,
+ string.format('%s@%s', u:get_user(), u:get_host()))
end
end
if #emails > 0 then
- elt = string.format("(today(),'%s',[%s])",
- task:get_digest(),
- table.concat(emails, ','))
- table.insert(emails_rows, elt)
+ table.insert(emails_rows, {
+ today(timestamp),
+ task:get_digest(),
+ emails,
+ })
end
-- ASN information
@@ -728,10 +615,13 @@ local function clickhouse_collect(task)
if ret then
ipnet = ret
end
- elt = string.format("(today(),'%s','%s','%s','%s')",
+ table.insert(asn_rows, {
+ today(timestamp),
task:get_digest(),
- clickhouse_quote(asn), clickhouse_quote(country), clickhouse_quote(ipnet))
- table.insert(asn_rows, elt)
+ asn,
+ country,
+ ipnet
+ })
end
-- Symbols info
@@ -742,25 +632,22 @@ local function clickhouse_collect(task)
local options_tab = {}
for _,s in ipairs(symbols) do
- table.insert(syms_tab, string.format("'%s'",
- clickhouse_quote(s.name or '')))
- table.insert(scores_tab, string.format('%.3f', s.score))
+ table.insert(syms_tab, s.name or '')
+ table.insert(scores_tab, s.score)
if s.options then
- table.insert(options_tab, string.format("'%s'",
- clickhouse_quote(table.concat(s.options, ','))))
+ table.insert(options_tab, table.concat(s.options, ','))
else
table.insert(options_tab, "''");
end
end
- elt = string.format("(today(),'%s',[%s],[%s],[%s])",
- task:get_digest(),
- table.concat(syms_tab, ','),
- table.concat(scores_tab, ','),
- table.concat(options_tab, ','))
-
- table.insert(symbols_rows, elt)
+ table.insert(symbols_rows, {
+ today(timestamp),
+ syms_tab,
+ scores_tab,
+ options_tab
+ })
end
-- Custom data
@@ -778,11 +665,9 @@ local function clickhouse_collect(task)
attachment_rows = {}
urls_rows = {}
emails_rows = {}
- specific_rows = {}
asn_rows = {}
symbols_rows = {}
custom_rows = {}
- clickhouse_first_row()
end
end
@@ -981,8 +866,6 @@ if opts then
return
end
-
- clickhouse_first_row()
rspamd_config:register_symbol({
name = 'CLICKHOUSE_COLLECT',
type = 'idempotent',