Browse Source

[Project] Migrate CH data to a fat table

tags/1.8.0
Vsevolod Stakhov 5 years ago
parent
commit
1f4e2f7f68
1 changed files with 73 additions and 155 deletions
  1. 73
    155
      src/plugins/lua/clickhouse.lua

+ 73
- 155
src/plugins/lua/clickhouse.lua View File

@@ -28,13 +28,7 @@ if confighelp then
return
end

local main_rows = {}
local attachment_rows = {}
local urls_rows = {}
local emails_rows = {}
--local specific_rows = {}
local asn_rows = {}
local symbols_rows = {}
local data_rows = {}
local custom_rows = {}
local nrows = 0
local schema_version = 2 -- Current schema version
@@ -143,7 +137,7 @@ local migrations = {
}


local function clickhouse_main_row(tname)
local function clickhouse_main_row(res)
local fields = {
'Date',
'TS',
@@ -168,73 +162,52 @@ local function clickhouse_main_row(tname)
'ListId',
'Digest'
}
local elt = string.format('INSERT INTO %s (%s) ',
tname, table.concat(fields, ','))

return elt
for _,v in ipairs(fields) do table.insert(res, v) end
end

local function clickhouse_attachments_row(tname)
local attachement_fields = {
'Date',
'Digest',
local function clickhouse_attachments_row(res)
local fields = {
'Attachments.FileName',
'Attachments.ContentType',
'Attachments.Length',
'Attachments.Digest',
}
local elt = string.format('INSERT INTO %s (%s) ',
tname, table.concat(attachement_fields, ','))
return elt

for _,v in ipairs(fields) do table.insert(res, v) end
end

local function clickhouse_urls_row(tname)
local urls_fields = {
'Date',
'Digest',
local function clickhouse_urls_row(res)
local fields = {
'Urls.Tld',
'Urls.Url',
}
local elt = string.format('INSERT INTO %s (%s) ',
tname, table.concat(urls_fields, ','))
return elt
for _,v in ipairs(fields) do table.insert(res, v) end
end

local function clickhouse_emails_row(tname)
local emails_fields = {
'Date',
'Digest',
local function clickhouse_emails_row(res)
local fields = {
'Emails',
}
local elt = string.format('INSERT INTO %s (%s) ',
tname, table.concat(emails_fields, ','))
return elt
for _,v in ipairs(fields) do table.insert(res, v) end
end

local function clickhouse_symbols_row(tname)
local symbols_fields = {
'Date',
'Digest',
local function clickhouse_symbols_row(res)
local fields = {
'Symbols.Names',
'Symbols.Scores',
'Symbols.Options',
}
local elt = string.format('INSERT INTO %s (%s) ',
tname, table.concat(symbols_fields, ','))
return elt
for _,v in ipairs(fields) do table.insert(res, v) end
end

local function clickhouse_asn_row(tname)
local asn_fields = {
'Date',
'Digest',
local function clickhouse_asn_row(res)
local fields = {
'ASN',
'Country',
'IPNet',
}
local elt = string.format('INSERT INTO %s (%s) ',
tname, table.concat(asn_fields, ','))
return elt
for _,v in ipairs(fields) do table.insert(res, v) end
end

local function today(ts)
@@ -291,34 +264,19 @@ local function clickhouse_send_data(task)
end
end

send_data('generic data', main_rows,
clickhouse_main_row(settings['table']))
local fields = {}
clickhouse_main_row(fields)
clickhouse_attachments_row(fields)
clickhouse_urls_row(fields)
clickhouse_emails_row(fields)
clickhouse_asn_row(fields)


if #attachment_rows > 1 then
send_data('attachments data', attachment_rows,
clickhouse_attachments_row(settings.attachments_table))
end

if #urls_rows > 1 then
send_data('urls data', urls_rows,
clickhouse_urls_row(settings.urls_table))
end

if #emails_rows > 1 then
send_data('emails data', emails_rows,
clickhouse_emails_row(settings.emails_table))
if settings.enable_symbols then
clickhouse_symbols_row(fields)
end

if #asn_rows > 1 then
send_data('asn data', asn_rows,
clickhouse_asn_row(settings.asn_table))
end

if #symbols_rows > 1 then
send_data('symbols data', symbols_rows,
clickhouse_symbols_row(settings.symbols_table))
end
send_data('generic data', data_rows,
string.format('INSERT INTO rspamd (%s)', table.concat(fields, ',')))

for k,crows in pairs(custom_rows) do
if #crows > 1 then
@@ -477,7 +435,7 @@ local function clickhouse_collect(task)
local action = task:get_metric_action('default')
local digest = task:get_digest()

table.insert(main_rows, {
local row = {
today(timestamp),
timestamp,
from_domain,
@@ -499,25 +457,8 @@ local function clickhouse_collect(task)
rcpt_user,
rcpt_domain,
list_id,
task:get_digest()
})

--[[ TODO: has been broken
if settings['from_map'] and dkim == 'allow' then
-- Use dkim
local das = task:get_symbol(settings['dkim_allow_symbols'][1])
if ((das or E)[1] or E).options then
for _,dkim_domain in ipairs(das[1]['options']) do
local specific = settings.from_map:get_key(dkim_domain)
if specific then
specific_rows[specific] = {}
table.insert(specific_rows[specific], elt)
end
end
end

end
--]]
digest
}

-- Attachments step
local attachments_fnames = {}
@@ -538,14 +479,15 @@ local function clickhouse_collect(task)
end

if #attachments_fnames > 0 then
table.insert(attachment_rows, {
today(timestamp),
digest,
attachments_fnames,
attachments_ctypes,
attachments_lengths,
attachments_digests,
})
table.insert(row, attachments_fnames)
table.insert(row, attachments_ctypes)
table.insert(row, attachments_lengths)
table.insert(row, attachments_digests)
else
table.insert(row, {})
table.insert(row, {})
table.insert(row, {})
table.insert(row, {})
end

-- Urls step
@@ -563,58 +505,43 @@ local function clickhouse_collect(task)
end

if #urls_tlds > 0 then
table.insert(urls_rows, {
today(timestamp),
digest,
urls_tlds,
urls_urls
})
table.insert(row, urls_tlds)
table.insert(row, urls_urls)
else
table.insert(row, {})
table.insert(row, {})
end

-- Emails step
local emails = {}
if task:has_urls(true) then
for _,u in ipairs(task:get_emails()) do
table.insert(emails,
string.format('%s@%s', u:get_user(), u:get_host()))
end
end

if #emails > 0 then
table.insert(emails_rows, {
today(timestamp),
digest,
emails,
})
table.insert(row, fun.totable(fun.map(function(u)
return string.format('%s@%s', u:get_user(), u:get_host())
end, task:get_emails())))
else
table.insert(row, {})
end

-- ASN information
if settings['asn_table'] then
local asn, country, ipnet = '--', '--', '--'
local pool = task:get_mempool()
ret = pool:get_variable("asn")
if ret then
asn = ret
end
ret = pool:get_variable("country")
if ret then
country = ret:sub(1, 2)
end
ret = pool:get_variable("ipnet")
if ret then
ipnet = ret
end
table.insert(asn_rows, {
today(timestamp),
digest,
asn,
country,
ipnet
})
local asn, country, ipnet = '--', '--', '--'
local pool = task:get_mempool()
ret = pool:get_variable("asn")
if ret then
asn = ret
end
ret = pool:get_variable("country")
if ret then
country = ret:sub(1, 2)
end
ret = pool:get_variable("ipnet")
if ret then
ipnet = ret
end
table.insert(row, asn)
table.insert(row, country)
table.insert(row, ipnet)

-- Symbols info
if settings.enable_symbols and settings['symbols_table'] then
if settings.enable_symbols then
local symbols = task:get_symbols_all()
local syms_tab = {}
local scores_tab = {}
@@ -630,14 +557,9 @@ local function clickhouse_collect(task)
table.insert(options_tab, '');
end
end

table.insert(symbols_rows, {
today(timestamp),
digest,
syms_tab,
scores_tab,
options_tab
})
table.insert(row, syms_tab)
table.insert(row, scores_tab)
table.insert(row, options_tab)
end

-- Custom data
@@ -647,17 +569,13 @@ local function clickhouse_collect(task)
end

nrows = nrows + 1
table.insert(data_rows, row)
rspamd_logger.debugm(N, task, "add clickhouse row %s / %s", nrows, settings.limit)

if nrows > settings['limit'] then
clickhouse_send_data(task)
nrows = 0
main_rows = {}
attachment_rows = {}
urls_rows = {}
emails_rows = {}
asn_rows = {}
symbols_rows = {}
data_rows = {}
custom_rows = {}
end
end

Loading…
Cancel
Save