]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Add clickhouse plugin
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 23 Nov 2016 13:14:01 +0000 (13:14 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Wed, 23 Nov 2016 13:14:01 +0000 (13:14 +0000)
src/plugins/lua/clickhouse.lua [new file with mode: 0644]

diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
new file mode 100644 (file)
index 0000000..659c5d5
--- /dev/null
@@ -0,0 +1,464 @@
+--[[
+Copyright (c) 2016, Vsevolod Stakhov <vsevolod@highsecure.ru>
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+]]--
+
+local rspamd_logger = require 'rspamd_logger'
+local rspamd_http = require "rspamd_http"
+
+local rows = {}
+local attachment_rows = {}
+local urls_rows = {}
+local specific_rows = {}
+local nrows = 0
+
+local settings = {
+  limit = 1000,
+  server = "localhost:8123",
+  timeout = 5.0,
+  bayes_spam_symbols = {'BAYES_SPAM'},
+  bayes_ham_symbols = {'BAYES_HAM'},
+  fann_symbols = {'FANN_SCORE'},
+  fuzzy_symbols = {'FUZZY_DENIED'},
+  whitelist_symbols = {'WHITELIST_DKIM', 'WHITELIST_SPF_DKIM', 'WHITELIST_DMARC'},
+  dkim_allow_symbols = {'R_DKIM_ALLOW'},
+  dkim_reject_symbols = {'R_DKIM_REJECT'},
+  dmarc_allow_symbols = {'DMARC_POLICY_ALLOW'},
+  dmarc_reject_symbols = {'DMARC_POLICY_REJECT', 'DMARC_POLICY_QUARANTINE'},
+  table = 'rspamd',
+  attachments_table = 'rspamd_attachments',
+  urls_table = 'rspamd_urls',
+  ipmask = 19,
+  full_urls = false,
+  from_tables = nil
+}
+
+local function clickhouse_main_row(tname)
+  local fields = {
+    'Date',
+    'TS',
+    'From',
+    'MimeFrom',
+    'IP',
+    'Score',
+    'NRcpt',
+    'Size',
+    'IsWhitelist',
+    'IsBayes',
+    'IsFuzzy',
+    'IsFann',
+    'IsDkim',
+    'IsDmarc',
+    'NUrls',
+    'Action',
+    'FromUser',
+    'MimeUser',
+    'RcptUser',
+    'RcptDomain',
+    'ListId',
+    'Digest'
+  }
+  local elt = string.format('INSERT INTO %s (%s) VALUES ',
+    tname, table.concat(fields, ','))
+
+  return elt
+end
+
+local function clickhouse_attachments_row(tname)
+  local attachement_fields = {
+    'Date',
+    'Digest',
+    'Attachments.FileName',
+    'Attachments.ContentType',
+    'Attachments.Length',
+    'Attachments.Digest',
+  }
+  local elt = string.format('INSERT INTO %s (%s) VALUES ',
+    tname, table.concat(attachement_fields, ','))
+  return elt
+end
+
+local function clickhouse_urls_row(tname)
+  local urls_fields = {
+    'Date',
+    'Digest',
+    'Urls.Tld',
+    'Urls.Url',
+  }
+  local elt = string.format('INSERT INTO %s (%s) VALUES ',
+    settings['urls_table'], table.concat(urls_fields, ','))
+  return elt
+end
+
+local function clickhouse_first_row()
+  table.insert(rows, clickhouse_main_row(settings['table']))
+  if settings['attachments_table'] then
+    table.insert(attachment_rows,
+      clickhouse_attachments_row(settings['attachments_table']))
+  end
+  if settings['urls_table'] then
+    table.insert(urls_rows,
+      clickhouse_urls_row(settings['urls_table']))
+  end
+end
+
+local function clickhouse_check_symbol(task, symbols, need_score)
+  for _,s in ipairs(symbols) do
+    if task:has_symbol(s) then
+      if need_score then
+        local sym = task:get_symbol(s)[1]
+        return sym['score']
+      else
+        return true
+      end
+    end
+  end
+
+  return false
+end
+
+local function clickhouse_send_data(task)
+  local function http_cb(err_message, code, body, headers)
+    if code ~= 200 or err_message then
+      rspamd_logger.errx(task, "cannot send data to clickhouse server %s: %d:%s",
+        settings['server'], code, err_message)
+    else
+      rspamd_logger.infox(task, "sent %s rows to clickhouse server %s",
+        settings['limit'], settings['server'])
+    end
+  end
+
+  local body = table.concat(rows, ' ')
+  if not rspamd_http.request({
+      task = task,
+      url = 'http://' .. settings['server'],
+      body = body,
+      callback = http_cb,
+      mime_type = 'text/plain',
+      timeout = settings['timeout'],
+    }) then
+     rspamd_logger.errx(task, "cannot send data to clickhouse server %s: cannot make request",
+        settings['server'])
+  end
+
+  if #attachment_rows > 1 then
+    body = table.concat(attachment_rows, ' ')
+    if not rspamd_http.request({
+      task = task,
+      url = 'http://' .. settings['server'],
+      body = body,
+      callback = http_cb,
+      mime_type = 'text/plain',
+      timeout = settings['timeout'],
+    }) then
+      rspamd_logger.errx(task, "cannot send attachments to clickhouse server %s: cannot make request",
+        settings['server'])
+    end
+  end
+  if #urls_rows > 1 then
+    body = table.concat(urls_rows, ' ')
+    if not rspamd_http.request({
+      task = task,
+      url = 'http://' .. settings['server'],
+      body = body,
+      callback = http_cb,
+      mime_type = 'text/plain',
+      timeout = settings['timeout'],
+    }) then
+      rspamd_logger.errx(task, "cannot send urls to clickhouse server %s: cannot make request",
+        settings['server'])
+    end
+  end
+
+  for k,specific in pairs(specific_rows) do
+    if #specific > 1 then
+      body = table.concat(specific, ' ')
+      if not rspamd_http.request({
+        task = task,
+        url = 'http://' .. settings['server'],
+        body = body,
+        callback = http_cb,
+        mime_type = 'text/plain',
+        timeout = settings['timeout'],
+      }) then
+        rspamd_logger.errx(task, "cannot send data for domain %s to clickhouse server %s: cannot make request",
+          k, settings['server'])
+      end
+    end
+  end
+end
+
+local function clickhouse_quote(str)
+  if str then
+    return str:gsub('[\'\\]', '\\%1'):lower()
+  else
+    return ''
+  end
+end
+
+local function clickhouse_collect(task)
+  local from_domain = ''
+  local from_user = ''
+  if task:has_from('smtp') then
+    local from = task:get_from('smtp')[1]
+
+    if from then
+      from_domain = from['domain']
+      from_user = from['user']
+    end
+
+    if from_domain == '' then
+      if task:get_helo() then
+        from_domain = task:get_helo()
+      end
+    end
+  else
+    if task:get_helo() then
+      from_domain = task:get_helo()
+    end
+  end
+
+  local mime_domain = ''
+  local mime_user = ''
+  if task:has_from('mime') then
+    local from = task:get_from('mime')[1]
+    if from then
+      mime_domain = from['domain']
+      mime_user = from['user']
+    end
+  end
+
+  local ip_str = 'undefined'
+  local ip = task:get_from_ip()
+  if ip and ip:is_valid() then
+    local ipnet = ip:apply_mask(settings['ipmask'])
+    ip_str = ipnet:to_string()
+  end
+
+  local rcpt_user = ''
+  local rcpt_domain = ''
+  if task:has_recipients('smtp') then
+    local rcpt = task:get_recipients('smtp')[1]
+    rcpt_user = rcpt['user']
+    rcpt_domain = rcpt['domain']
+  end
+
+  local list_id = ''
+  local lh = task:get_header('List-Id')
+  if lh then
+    list_id = lh
+  end
+
+  local score = task:get_metric_score('default')[1];
+  local bayes = 'unknown';
+  local fuzzy = 'unknown';
+  local fann = 'unknown';
+  local whitelist = 'unknown';
+  local dkim = 'unknown';
+  local dmarc = 'unknown';
+
+  local ret
+
+  ret = clickhouse_check_symbol(task, settings['bayes_spam_symbols'], false)
+  if ret then
+    bayes = 'spam'
+  end
+
+  ret = clickhouse_check_symbol(task, settings['bayes_ham_symbols'], false)
+  if ret then
+    bayes = 'ham'
+  end
+
+  ret = clickhouse_check_symbol(task, settings['fann_symbols'], true)
+  if ret then
+    if ret > 0 then
+      fann = 'spam'
+    else
+      fann = 'ham'
+    end
+  end
+
+
+  ret = clickhouse_check_symbol(task, settings['whitelist_symbols'], true)
+  if ret then
+    if ret < 0 then
+      whitelist = 'whitelist'
+    else
+      whitelist = 'blacklist'
+    end
+  end
+
+  ret = clickhouse_check_symbol(task, settings['fuzzy_symbols'], false)
+  if ret then
+    fuzzy = 'deny'
+  end
+
+  ret = clickhouse_check_symbol(task, settings['dkim_allow_symbols'], false)
+  if ret then
+    dkim = 'allow'
+  end
+
+  ret = clickhouse_check_symbol(task, settings['dkim_reject_symbols'], false)
+  if ret then
+    dkim = 'reject'
+  end
+
+  ret = clickhouse_check_symbol(task, settings['dmarc_allow_symbols'], false)
+  if ret then
+    dmarc = 'allow'
+  end
+
+  ret = clickhouse_check_symbol(task, settings['dmarc_reject_symbols'], false)
+  if ret then
+    dmarc = 'reject'
+  end
+
+  local nrcpts = 0
+  if task:has_recipients('smtp') then
+    nrcpts = #task:get_recipients('smtp')
+  end
+
+  local nurls = 0
+  if task:has_urls(true) then
+    nurls = #task:get_urls(true)
+  end
+
+  local timestamp = task:get_date({
+    format = 'connect',
+    gmt = false
+  })
+
+  local elt = string.format("(today(),%d,'%s','%s','%s',%.2f,%d,%d,'%s','%s','%s','%s','%s','%s',%d,'%s','%s','%s','%s','%s','%s','%s')",
+        timestamp,
+        clickhouse_quote(from_domain), clickhouse_quote(mime_domain), ip_str, score,
+        nrcpts, task:get_size(), whitelist, bayes, fuzzy, fann,
+        dkim, dmarc, nurls, task:get_metric_action('default'),
+        clickhouse_quote(from_user), clickhouse_quote(mime_user),
+        clickhouse_quote(rcpt_user), clickhouse_quote(rcpt_domain),
+        clickhouse_quote(list_id), task:get_digest())
+  table.insert(rows, elt)
+
+  if settings['from_map'] and dkim == 'allow' then
+    -- Use dkim
+    local das = task:get_symbol(settings['dkim_allow_symbols'][1])
+    if das and das[1] and das[1]['options'] then
+      for i,dkim_domain in ipairs(das[1]['options']) do
+        local specific = settings.from_map:get_key(dkim_domain)
+        if specific then
+          if not specific_rows[specific] then
+            local first = clickhouse_main_row(specific)
+            specific_rows[specific] = {first}
+          end
+          table.insert(specific_rows[specific], elt)
+        end
+      end
+    end
+
+  end
+
+  -- Attachments step
+  local attachments_fnames = {}
+  local attachments_ctypes = {}
+  local attachments_lengths = {}
+  local attachments_digests = {}
+  for _,part in ipairs(task:get_parts()) do
+    local fname = part:get_filename()
+
+    if fname then
+      table.insert(attachments_fnames, string.format("'%s'", clickhouse_quote(fname)))
+      local type, subtype = part:get_type()
+      table.insert(attachments_ctypes, string.format("'%s/%s'",
+        clickhouse_quote(type), clickhouse_quote(subtype)))
+      table.insert(attachments_lengths, string.format("%s", tostring(part:get_length())))
+      table.insert(attachments_digests, string.format("'%s'", string.sub(part:get_digest(), 1, 16)))
+    end
+  end
+
+  if #attachments_fnames > 0 then
+    elt = string.format("(today(),'%s',[%s],[%s],[%s],[%s])",
+      task:get_digest(),
+      table.concat(attachments_fnames, ','),
+      table.concat(attachments_ctypes, ','),
+      table.concat(attachments_lengths, ','),
+      table.concat(attachments_digests, ','))
+    table.insert(attachment_rows, elt)
+  end
+
+  -- Urls step
+  local urls_tlds = {}
+  local urls_urls = {}
+  if task:has_urls(false) then
+    for _,u in ipairs(task:get_urls()) do
+      table.insert(urls_tlds, string.format("'%s'", clickhouse_quote(u:get_tld())))
+      if settings['full_urls'] then
+        table.insert(urls_urls, string.format("'%s'",
+          clickhouse_quote(u:get_text())))
+      else
+        table.insert(urls_urls, string.format("'%s'",
+          clickhouse_quote(u:get_host())))
+      end
+    end
+  end
+
+  if #urls_tlds > 0 then
+    elt = string.format("(today(),'%s',[%s],[%s])",
+      task:get_digest(),
+      table.concat(urls_tlds, ','),
+      table.concat(urls_urls, ','))
+    table.insert(urls_rows, elt)
+  end
+
+  nrows = nrows + 1
+
+  if nrows > settings['limit'] then
+    clickhouse_send_data(task)
+    nrows = 0
+    rows = {}
+    attachment_rows = {}
+    urls_rows = {}
+    specific_rows = {}
+    clickhouse_first_row()
+  end
+end
+
+local opts = rspamd_config:get_all_opt('clickhouse')
+if opts then
+    for k,v in pairs(opts) do
+      settings[k] = v
+    end
+
+    if not settings['server'] then
+      rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
+    else
+      if settings['from_tables'] then
+        settings['from_map'] = rspamd_config:add_map({
+          url = settings['from_tables'],
+          description = 'clickhouse specific domains',
+          type = 'regexp'
+        })
+      end
+      clickhouse_first_row()
+      rspamd_config:register_symbol({
+        name = 'CLICKHOUSE_COLLECT',
+        type = 'postfilter',
+        callback = clickhouse_collect,
+        priority = 10
+      })
+      rspamd_config:register_finish_script(function(task)
+        if nrows > 0 then
+          clickhouse_send_data(task)
+        end
+      end)
+    end
+end