]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] ASN support in Clickhouse module 1174/head
authorAndrew Lewis <nerf@judo.za.org>
Thu, 24 Nov 2016 14:14:37 +0000 (16:14 +0200)
committerAndrew Lewis <nerf@judo.za.org>
Thu, 24 Nov 2016 14:14:37 +0000 (16:14 +0200)
src/plugins/lua/clickhouse.lua

index 09e634b276a7503916e7e95e52e190f18fe4ed04..4465a9a6edf62351fa1e1a15f0af94ea2302fb5e 100644 (file)
@@ -21,6 +21,7 @@ local rows = {}
 local attachment_rows = {}
 local urls_rows = {}
 local specific_rows = {}
+local asn_rows = {}
 local nrows = 0
 
 local settings = {
@@ -89,6 +90,14 @@ CREATE TABLE rspamd_urls (
     `Urls.Tld` Array(String),
     `Urls.Url` Array(String)
 ) ENGINE = MergeTree(Date, Digest, 8192)
+
+CREATE TABLE rspamd_asn (
+    Date Date,
+    Digest FixedString(32),
+    ASN String,
+    Country FixedString(2),
+    IPNet String
+) ENGINE = MergeTree(Date, Digest, 8192)
 ]]
 
 local function clickhouse_main_row(tname)
@@ -148,6 +157,19 @@ local function clickhouse_urls_row(tname)
   return elt
 end
 
+local function clickhouse_asn_row(tname)
+  local asn_fields = {
+    'Date',
+    'Digest',
+    'ASN',
+    'Country',
+    'IPNet',
+  }
+  local elt = string.format('INSERT INTO %s (%s) VALUES ',
+    tname, table.concat(asn_fields, ','))
+  return elt
+end
+
 local function clickhouse_first_row()
   table.insert(rows, clickhouse_main_row(settings['table']))
   if settings['attachments_table'] then
@@ -158,6 +180,10 @@ local function clickhouse_first_row()
     table.insert(urls_rows,
       clickhouse_urls_row(settings['urls_table']))
   end
+  if settings['asn_table'] then
+    table.insert(asn_rows,
+      clickhouse_asn_row(settings['asn_table']))
+  end
 end
 
 local function clickhouse_check_symbol(task, symbols, need_score)
@@ -227,6 +253,20 @@ local function clickhouse_send_data(task)
         settings['server'])
     end
   end
+  if #asn_rows > 1 then
+    body = table.concat(asn_rows, ' ')
+    if not rspamd_http.request({
+      task = task,
+      url = 'http://' .. settings['server'],
+      body = body,
+      callback = http_cb,
+      mime_type = 'text/plain',
+      timeout = settings['timeout'],
+    }) then
+      rspamd_logger.errx(task, "cannot send asn info to clickhouse server %s: cannot make request",
+        settings['server'])
+    end
+  end
 
   for k,specific in pairs(specific_rows) do
     if #specific > 1 then
@@ -471,6 +511,28 @@ local function clickhouse_collect(task)
     table.insert(urls_rows, elt)
   end
 
+  -- ASN information
+  if settings['asn_table'] then
+    local asn, country, ipnet, ret = 'unknown', 'unknown', 'unknown'
+    local pool = task:get_mempool()
+    ret = pool:get_variable("asn")
+    if ret then
+      asn = ret
+    end
+    ret = pool:get_variable("country")
+    if ret then
+      country = ret
+    end
+    ret = pool:get_variable("ipnet")
+    if ret then
+      ipnet = ret
+    end
+    elt = string.format("(today(),'%s','%s','%s','%s')",
+      task:get_digest(),
+      clickhouse_quote(asn), clickhouse_quote(country), clickhouse_quote(ipnet))
+    table.insert(asn_rows, elt)
+  end
+
   nrows = nrows + 1
 
   if nrows > settings['limit'] then
@@ -480,6 +542,7 @@ local function clickhouse_collect(task)
     attachment_rows = {}
     urls_rows = {}
     specific_rows = {}
+    asn_rows = {}
     clickhouse_first_row()
   end
 end