aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-08-08 13:59:52 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-08-08 14:01:37 +0100
commit1f4e2f7f68499430cc33eb4f4c1cfbd597703e50 (patch)
tree13f29cab84efa7c07ba8492cf1c4816645d972e0 /src/plugins
parentdfac7cd80b6050cfb291e57be2e9ee44c6a68e30 (diff)
downloadrspamd-1f4e2f7f68499430cc33eb4f4c1cfbd597703e50.tar.gz
rspamd-1f4e2f7f68499430cc33eb4f4c1cfbd597703e50.zip
[Project] Migrate CH data to a fat table
Diffstat (limited to 'src/plugins')
-rw-r--r--src/plugins/lua/clickhouse.lua228
1 files changed, 73 insertions, 155 deletions
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
index 54128235b..6eab86961 100644
--- a/src/plugins/lua/clickhouse.lua
+++ b/src/plugins/lua/clickhouse.lua
@@ -28,13 +28,7 @@ if confighelp then
return
end
-local main_rows = {}
-local attachment_rows = {}
-local urls_rows = {}
-local emails_rows = {}
---local specific_rows = {}
-local asn_rows = {}
-local symbols_rows = {}
+local data_rows = {}
local custom_rows = {}
local nrows = 0
local schema_version = 2 -- Current schema version
@@ -143,7 +137,7 @@ local migrations = {
}
-local function clickhouse_main_row(tname)
+local function clickhouse_main_row(res)
local fields = {
'Date',
'TS',
@@ -168,73 +162,52 @@ local function clickhouse_main_row(tname)
'ListId',
'Digest'
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_attachments_row(tname)
- local attachement_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_attachments_row(res)
+ local fields = {
'Attachments.FileName',
'Attachments.ContentType',
'Attachments.Length',
'Attachments.Digest',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(attachement_fields, ','))
- return elt
+
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_urls_row(tname)
- local urls_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_urls_row(res)
+ local fields = {
'Urls.Tld',
'Urls.Url',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(urls_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_emails_row(tname)
- local emails_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_emails_row(res)
+ local fields = {
'Emails',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(emails_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_symbols_row(tname)
- local symbols_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_symbols_row(res)
+ local fields = {
'Symbols.Names',
'Symbols.Scores',
'Symbols.Options',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(symbols_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_asn_row(tname)
- local asn_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_asn_row(res)
+ local fields = {
'ASN',
'Country',
'IPNet',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(asn_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
local function today(ts)
@@ -291,34 +264,19 @@ local function clickhouse_send_data(task)
end
end
- send_data('generic data', main_rows,
- clickhouse_main_row(settings['table']))
+ local fields = {}
+ clickhouse_main_row(fields)
+ clickhouse_attachments_row(fields)
+ clickhouse_urls_row(fields)
+ clickhouse_emails_row(fields)
+ clickhouse_asn_row(fields)
-
- if #attachment_rows > 1 then
- send_data('attachments data', attachment_rows,
- clickhouse_attachments_row(settings.attachments_table))
- end
-
- if #urls_rows > 1 then
- send_data('urls data', urls_rows,
- clickhouse_urls_row(settings.urls_table))
- end
-
- if #emails_rows > 1 then
- send_data('emails data', emails_rows,
- clickhouse_emails_row(settings.emails_table))
+ if settings.enable_symbols then
+ clickhouse_symbols_row(fields)
end
- if #asn_rows > 1 then
- send_data('asn data', asn_rows,
- clickhouse_asn_row(settings.asn_table))
- end
-
- if #symbols_rows > 1 then
- send_data('symbols data', symbols_rows,
- clickhouse_symbols_row(settings.symbols_table))
- end
+ send_data('generic data', data_rows,
+ string.format('INSERT INTO rspamd (%s)', table.concat(fields, ',')))
for k,crows in pairs(custom_rows) do
if #crows > 1 then
@@ -477,7 +435,7 @@ local function clickhouse_collect(task)
local action = task:get_metric_action('default')
local digest = task:get_digest()
- table.insert(main_rows, {
+ local row = {
today(timestamp),
timestamp,
from_domain,
@@ -499,25 +457,8 @@ local function clickhouse_collect(task)
rcpt_user,
rcpt_domain,
list_id,
- task:get_digest()
- })
-
---[[ TODO: has been broken
- if settings['from_map'] and dkim == 'allow' then
- -- Use dkim
- local das = task:get_symbol(settings['dkim_allow_symbols'][1])
- if ((das or E)[1] or E).options then
- for _,dkim_domain in ipairs(das[1]['options']) do
- local specific = settings.from_map:get_key(dkim_domain)
- if specific then
- specific_rows[specific] = {}
- table.insert(specific_rows[specific], elt)
- end
- end
- end
-
- end
---]]
+ digest
+ }
-- Attachments step
local attachments_fnames = {}
@@ -538,14 +479,15 @@ local function clickhouse_collect(task)
end
if #attachments_fnames > 0 then
- table.insert(attachment_rows, {
- today(timestamp),
- digest,
- attachments_fnames,
- attachments_ctypes,
- attachments_lengths,
- attachments_digests,
- })
+ table.insert(row, attachments_fnames)
+ table.insert(row, attachments_ctypes)
+ table.insert(row, attachments_lengths)
+ table.insert(row, attachments_digests)
+ else
+ table.insert(row, {})
+ table.insert(row, {})
+ table.insert(row, {})
+ table.insert(row, {})
end
-- Urls step
@@ -563,58 +505,43 @@ local function clickhouse_collect(task)
end
if #urls_tlds > 0 then
- table.insert(urls_rows, {
- today(timestamp),
- digest,
- urls_tlds,
- urls_urls
- })
+ table.insert(row, urls_tlds)
+ table.insert(row, urls_urls)
+ else
+ table.insert(row, {})
+ table.insert(row, {})
end
-- Emails step
- local emails = {}
if task:has_urls(true) then
- for _,u in ipairs(task:get_emails()) do
- table.insert(emails,
- string.format('%s@%s', u:get_user(), u:get_host()))
- end
- end
-
- if #emails > 0 then
- table.insert(emails_rows, {
- today(timestamp),
- digest,
- emails,
- })
+ table.insert(row, fun.totable(fun.map(function(u)
+ return string.format('%s@%s', u:get_user(), u:get_host())
+ end, task:get_emails())))
+ else
+ table.insert(row, {})
end
-- ASN information
- if settings['asn_table'] then
- local asn, country, ipnet = '--', '--', '--'
- local pool = task:get_mempool()
- ret = pool:get_variable("asn")
- if ret then
- asn = ret
- end
- ret = pool:get_variable("country")
- if ret then
- country = ret:sub(1, 2)
- end
- ret = pool:get_variable("ipnet")
- if ret then
- ipnet = ret
- end
- table.insert(asn_rows, {
- today(timestamp),
- digest,
- asn,
- country,
- ipnet
- })
+ local asn, country, ipnet = '--', '--', '--'
+ local pool = task:get_mempool()
+ ret = pool:get_variable("asn")
+ if ret then
+ asn = ret
end
+ ret = pool:get_variable("country")
+ if ret then
+ country = ret:sub(1, 2)
+ end
+ ret = pool:get_variable("ipnet")
+ if ret then
+ ipnet = ret
+ end
+ table.insert(row, asn)
+ table.insert(row, country)
+ table.insert(row, ipnet)
-- Symbols info
- if settings.enable_symbols and settings['symbols_table'] then
+ if settings.enable_symbols then
local symbols = task:get_symbols_all()
local syms_tab = {}
local scores_tab = {}
@@ -630,14 +557,9 @@ local function clickhouse_collect(task)
table.insert(options_tab, '');
end
end
-
- table.insert(symbols_rows, {
- today(timestamp),
- digest,
- syms_tab,
- scores_tab,
- options_tab
- })
+ table.insert(row, syms_tab)
+ table.insert(row, scores_tab)
+ table.insert(row, options_tab)
end
-- Custom data
@@ -647,17 +569,13 @@ local function clickhouse_collect(task)
end
nrows = nrows + 1
+ table.insert(data_rows, row)
rspamd_logger.debugm(N, task, "add clickhouse row %s / %s", nrows, settings.limit)
if nrows > settings['limit'] then
clickhouse_send_data(task)
nrows = 0
- main_rows = {}
- attachment_rows = {}
- urls_rows = {}
- emails_rows = {}
- asn_rows = {}
- symbols_rows = {}
+ data_rows = {}
custom_rows = {}
end
end