diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-08-08 13:59:52 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-08-08 14:01:37 +0100 |
commit | 1f4e2f7f68499430cc33eb4f4c1cfbd597703e50 (patch) | |
tree | 13f29cab84efa7c07ba8492cf1c4816645d972e0 /src/plugins | |
parent | dfac7cd80b6050cfb291e57be2e9ee44c6a68e30 (diff) | |
download | rspamd-1f4e2f7f68499430cc33eb4f4c1cfbd597703e50.tar.gz rspamd-1f4e2f7f68499430cc33eb4f4c1cfbd597703e50.zip |
[Project] Migrate CH data to a fat table
Diffstat (limited to 'src/plugins')
-rw-r--r-- | src/plugins/lua/clickhouse.lua | 228 |
1 files changed, 73 insertions, 155 deletions
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index 54128235b..6eab86961 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -28,13 +28,7 @@ if confighelp then return end -local main_rows = {} -local attachment_rows = {} -local urls_rows = {} -local emails_rows = {} ---local specific_rows = {} -local asn_rows = {} -local symbols_rows = {} +local data_rows = {} local custom_rows = {} local nrows = 0 local schema_version = 2 -- Current schema version @@ -143,7 +137,7 @@ local migrations = { } -local function clickhouse_main_row(tname) +local function clickhouse_main_row(res) local fields = { 'Date', 'TS', @@ -168,73 +162,52 @@ local function clickhouse_main_row(tname) 'ListId', 'Digest' } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_attachments_row(tname) - local attachement_fields = { - 'Date', - 'Digest', +local function clickhouse_attachments_row(res) + local fields = { 'Attachments.FileName', 'Attachments.ContentType', 'Attachments.Length', 'Attachments.Digest', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(attachement_fields, ',')) - return elt + + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_urls_row(tname) - local urls_fields = { - 'Date', - 'Digest', +local function clickhouse_urls_row(res) + local fields = { 'Urls.Tld', 'Urls.Url', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(urls_fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_emails_row(tname) - local emails_fields = { - 'Date', - 'Digest', +local function clickhouse_emails_row(res) + local fields = { 'Emails', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(emails_fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_symbols_row(tname) - local symbols_fields = { - 'Date', - 'Digest', +local function clickhouse_symbols_row(res) + local fields = { 'Symbols.Names', 'Symbols.Scores', 'Symbols.Options', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(symbols_fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_asn_row(tname) - local asn_fields = { - 'Date', - 'Digest', +local function clickhouse_asn_row(res) + local fields = { 'ASN', 'Country', 'IPNet', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(asn_fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end local function today(ts) @@ -291,34 +264,19 @@ local function clickhouse_send_data(task) end end - send_data('generic data', main_rows, - clickhouse_main_row(settings['table'])) + local fields = {} + clickhouse_main_row(fields) + clickhouse_attachments_row(fields) + clickhouse_urls_row(fields) + clickhouse_emails_row(fields) + clickhouse_asn_row(fields) - - if #attachment_rows > 1 then - send_data('attachments data', attachment_rows, - clickhouse_attachments_row(settings.attachments_table)) - end - - if #urls_rows > 1 then - send_data('urls data', urls_rows, - clickhouse_urls_row(settings.urls_table)) - end - - if #emails_rows > 1 then - send_data('emails data', emails_rows, - clickhouse_emails_row(settings.emails_table)) + if settings.enable_symbols then + clickhouse_symbols_row(fields) end - if #asn_rows > 1 then - send_data('asn data', asn_rows, - clickhouse_asn_row(settings.asn_table)) - end - - if #symbols_rows > 1 then - send_data('symbols data', symbols_rows, - clickhouse_symbols_row(settings.symbols_table)) - end + send_data('generic data', data_rows, + string.format('INSERT INTO rspamd (%s)', table.concat(fields, ','))) for k,crows in pairs(custom_rows) do if #crows > 1 then @@ -477,7 +435,7 @@ local function clickhouse_collect(task) local action = task:get_metric_action('default') local digest = task:get_digest() - table.insert(main_rows, { + local row = { today(timestamp), timestamp, from_domain, @@ -499,25 +457,8 @@ local function clickhouse_collect(task) rcpt_user, rcpt_domain, list_id, - task:get_digest() - }) - ---[[ TODO: has been broken - if settings['from_map'] and dkim == 'allow' then - -- Use dkim - local das = task:get_symbol(settings['dkim_allow_symbols'][1]) - if ((das or E)[1] or E).options then - for _,dkim_domain in ipairs(das[1]['options']) do - local specific = settings.from_map:get_key(dkim_domain) - if specific then - specific_rows[specific] = {} - table.insert(specific_rows[specific], elt) - end - end - end - - end ---]] + digest + } -- Attachments step local attachments_fnames = {} @@ -538,14 +479,15 @@ local function clickhouse_collect(task) end if #attachments_fnames > 0 then - table.insert(attachment_rows, { - today(timestamp), - digest, - attachments_fnames, - attachments_ctypes, - attachments_lengths, - attachments_digests, - }) + table.insert(row, attachments_fnames) + table.insert(row, attachments_ctypes) + table.insert(row, attachments_lengths) + table.insert(row, attachments_digests) + else + table.insert(row, {}) + table.insert(row, {}) + table.insert(row, {}) + table.insert(row, {}) end -- Urls step @@ -563,58 +505,43 @@ local function clickhouse_collect(task) end if #urls_tlds > 0 then - table.insert(urls_rows, { - today(timestamp), - digest, - urls_tlds, - urls_urls - }) + table.insert(row, urls_tlds) + table.insert(row, urls_urls) + else + table.insert(row, {}) + table.insert(row, {}) end -- Emails step - local emails = {} if task:has_urls(true) then - for _,u in ipairs(task:get_emails()) do - table.insert(emails, - string.format('%s@%s', u:get_user(), u:get_host())) - end - end - - if #emails > 0 then - table.insert(emails_rows, { - today(timestamp), - digest, - emails, - }) + table.insert(row, fun.totable(fun.map(function(u) + return string.format('%s@%s', u:get_user(), u:get_host()) + end, task:get_emails()))) + else + table.insert(row, {}) end -- ASN information - if settings['asn_table'] then - local asn, country, ipnet = '--', '--', '--' - local pool = task:get_mempool() - ret = pool:get_variable("asn") - if ret then - asn = ret - end - ret = pool:get_variable("country") - if ret then - country = ret:sub(1, 2) - end - ret = pool:get_variable("ipnet") - if ret then - ipnet = ret - end - table.insert(asn_rows, { - today(timestamp), - digest, - asn, - country, - ipnet - }) + local asn, country, ipnet = '--', '--', '--' + local pool = task:get_mempool() + ret = pool:get_variable("asn") + if ret then + asn = ret end + ret = pool:get_variable("country") + if ret then + country = ret:sub(1, 2) + end + ret = pool:get_variable("ipnet") + if ret then + ipnet = ret + end + table.insert(row, asn) + table.insert(row, country) + table.insert(row, ipnet) -- Symbols info - if settings.enable_symbols and settings['symbols_table'] then + if settings.enable_symbols then local symbols = task:get_symbols_all() local syms_tab = {} local scores_tab = {} @@ -630,14 +557,9 @@ local function clickhouse_collect(task) table.insert(options_tab, ''); end end - - table.insert(symbols_rows, { - today(timestamp), - digest, - syms_tab, - scores_tab, - options_tab - }) + table.insert(row, syms_tab) + table.insert(row, scores_tab) + table.insert(row, options_tab) end -- Custom data @@ -647,17 +569,13 @@ local function clickhouse_collect(task) end nrows = nrows + 1 + table.insert(data_rows, row) rspamd_logger.debugm(N, task, "add clickhouse row %s / %s", nrows, settings.limit) if nrows > settings['limit'] then clickhouse_send_data(task) nrows = 0 - main_rows = {} - attachment_rows = {} - urls_rows = {} - emails_rows = {} - asn_rows = {} - symbols_rows = {} + data_rows = {} custom_rows = {} end end |