From ee078eb929c5c9c8a91da600836c3a932f956ce2 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 21 Mar 2018 12:22:49 +0000 Subject: [PATCH] [Feature] Store emails in Clickhouse --- src/plugins/lua/clickhouse.lua | 47 +++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index d64e69807..590478d25 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -29,6 +29,7 @@ local E = {} local rows = {} local attachment_rows = {} local urls_rows = {} +local emails_rows = {} local specific_rows = {} local asn_rows = {} local symbols_rows = {} @@ -198,6 +199,17 @@ local function clickhouse_urls_row(tname) return elt end +local function clickhouse_emails_row(tname) + local emails_fields = { + 'Date', + 'Digest', + 'Emails', + } + local elt = string.format('INSERT INTO %s (%s) VALUES ', + tname, table.concat(emails_fields, ',')) + return elt +end + local function clickhouse_symbols_row(tname) local symbols_fields = { 'Date', @@ -234,6 +246,10 @@ local function clickhouse_first_row() table.insert(urls_rows, clickhouse_urls_row(settings['urls_table'])) end + if settings['emails_table'] then + table.insert(emails_rows, + clickhouse_emails_row(settings['emails_table'])) + end if settings['asn_table'] then table.insert(asn_rows, clickhouse_asn_row(settings['asn_table'])) @@ -320,6 +336,20 @@ local function clickhouse_send_data(task) settings['server']) end end + if #emails_rows > 1 then + body = table.concat(emails_rows, ' ') + if not rspamd_http.request({ + task = task, + url = connect_prefix .. ip_addr, + body = body, + callback = gen_http_cb('emails data', #emails_rows), + mime_type = 'text/plain', + timeout = settings['timeout'], + }) then + rspamd_logger.errx(task, "cannot send emails to clickhouse server %s: cannot make request", + settings['server']) + end + end if #asn_rows > 1 then body = table.concat(asn_rows, ' ') if not rspamd_http.request({ @@ -574,7 +604,7 @@ local function clickhouse_collect(task) local urls_tlds = {} local urls_urls = {} if task:has_urls(false) then - for _,u in ipairs(task:get_urls()) do + for _,u in ipairs(task:get_urls(false)) do table.insert(urls_tlds, string.format("'%s'", clickhouse_quote(u:get_tld()))) if settings['full_urls'] then table.insert(urls_urls, string.format("'%s'", @@ -594,6 +624,21 @@ local function clickhouse_collect(task) table.insert(urls_rows, elt) end + -- Emails step + local emails = {} + if task:has_urls(true) then + for _,u in ipairs(task:get_emails()) do + table.insert(emails, string.format("'%s'", clickhouse_quote(u:get_text()))) + end + end + + if #emails > 0 then + elt = string.format("(today(),'%s',[%s])", + task:get_digest(), + table.concat(emails, ',')) + table.insert(emails_rows, elt) + end + -- ASN information if settings['asn_table'] then local asn, country, ipnet = '--', '--', '--' -- 2.39.5