]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Store emails in Clickhouse
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 21 Mar 2018 12:22:49 +0000 (12:22 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 21 Mar 2018 13:09:52 +0000 (13:09 +0000)
src/plugins/lua/clickhouse.lua

index d64e698079e0ca4c4d47a5e417a4388700d3c7ab..590478d25c0990f88aba16bf7b1e6f0f44d0608f 100644 (file)
@@ -29,6 +29,7 @@ local E = {}
 local rows = {}
 local attachment_rows = {}
 local urls_rows = {}
+local emails_rows = {}
 local specific_rows = {}
 local asn_rows = {}
 local symbols_rows = {}
@@ -198,6 +199,17 @@ local function clickhouse_urls_row(tname)
   return elt
 end
 
+local function clickhouse_emails_row(tname)
+  local emails_fields = {
+    'Date',
+    'Digest',
+    'Emails',
+  }
+  local elt = string.format('INSERT INTO %s (%s) VALUES ',
+      tname, table.concat(emails_fields, ','))
+  return elt
+end
+
 local function clickhouse_symbols_row(tname)
   local symbols_fields = {
     'Date',
@@ -234,6 +246,10 @@ local function clickhouse_first_row()
     table.insert(urls_rows,
       clickhouse_urls_row(settings['urls_table']))
   end
+  if settings['emails_table'] then
+    table.insert(emails_rows,
+        clickhouse_emails_row(settings['emails_table']))
+  end
   if settings['asn_table'] then
     table.insert(asn_rows,
       clickhouse_asn_row(settings['asn_table']))
@@ -320,6 +336,20 @@ local function clickhouse_send_data(task)
         settings['server'])
     end
   end
+  if #emails_rows > 1 then
+    body = table.concat(emails_rows, ' ')
+    if not rspamd_http.request({
+      task = task,
+      url = connect_prefix .. ip_addr,
+      body = body,
+      callback = gen_http_cb('emails data', #emails_rows),
+      mime_type = 'text/plain',
+      timeout = settings['timeout'],
+    }) then
+      rspamd_logger.errx(task, "cannot send emails to clickhouse server %s: cannot make request",
+          settings['server'])
+    end
+  end
   if #asn_rows > 1 then
     body = table.concat(asn_rows, ' ')
     if not rspamd_http.request({
@@ -574,7 +604,7 @@ local function clickhouse_collect(task)
   local urls_tlds = {}
   local urls_urls = {}
   if task:has_urls(false) then
-    for _,u in ipairs(task:get_urls()) do
+    for _,u in ipairs(task:get_urls(false)) do
       table.insert(urls_tlds, string.format("'%s'", clickhouse_quote(u:get_tld())))
       if settings['full_urls'] then
         table.insert(urls_urls, string.format("'%s'",
@@ -594,6 +624,21 @@ local function clickhouse_collect(task)
     table.insert(urls_rows, elt)
   end
 
+  -- Emails step
+  local emails = {}
+  if task:has_urls(true) then
+    for _,u in ipairs(task:get_emails()) do
+      table.insert(emails, string.format("'%s'", clickhouse_quote(u:get_text())))
+    end
+  end
+
+  if #emails > 0 then
+    elt = string.format("(today(),'%s',[%s])",
+        task:get_digest(),
+        table.concat(emails, ','))
+    table.insert(emails_rows, elt)
+  end
+
   -- ASN information
   if settings['asn_table'] then
     local asn, country, ipnet = '--', '--', '--'