aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMikhail Galanin <mgalanin@mimecast.com>2018-08-08 16:05:11 +0100
committerMikhail Galanin <mgalanin@mimecast.com>2018-08-08 16:05:11 +0100
commit99ebeb7c1d3e9f32d46664ea2e91fa731d430e31 (patch)
tree0b976ba58af3ef0a8469b74ca01721560c5a4616 /src
parent2c501018c04ca0a972c600b1324be21545e9f133 (diff)
parenta756a6ef6a58c3062def2a8a0795498ab8568f94 (diff)
downloadrspamd-99ebeb7c1d3e9f32d46664ea2e91fa731d430e31.tar.gz
rspamd-99ebeb7c1d3e9f32d46664ea2e91fa731d430e31.zip
Merge branch 'master' into lua-coroutine-model
Diffstat (limited to 'src')
-rw-r--r--src/libutil/ssl_util.c12
-rw-r--r--src/plugins/lua/clickhouse.lua522
2 files changed, 282 insertions, 252 deletions
diff --git a/src/libutil/ssl_util.c b/src/libutil/ssl_util.c
index a90bd5e36..1eab5821f 100644
--- a/src/libutil/ssl_util.c
+++ b/src/libutil/ssl_util.c
@@ -746,6 +746,18 @@ void
rspamd_ssl_connection_free (struct rspamd_ssl_connection *conn)
{
if (conn) {
+ /*
+ * SSL_RECEIVED_SHUTDOWN tells SSL_shutdown to act as if we had already
+ * received a close notify from the other end. SSL_shutdown will then
+ * send the final close notify in reply. The other end will receive the
+ * close notify and send theirs. By this time, we will have already
+ * closed the socket and the other end's real close notify will never be
+ * received. In effect, both sides will think that they have completed a
+ * clean shutdown and keep their sessions valid. This strategy will fail
+ * if the socket is not ready for writing, in which case this hack will
+ * lead to an unclean shutdown and lost session on the other end.
+ */
+ SSL_set_shutdown (conn->ssl, SSL_RECEIVED_SHUTDOWN);
SSL_shutdown (conn->ssl);
SSL_free (conn->ssl);
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
index 444465814..c2375d765 100644
--- a/src/plugins/lua/clickhouse.lua
+++ b/src/plugins/lua/clickhouse.lua
@@ -15,7 +15,6 @@ limitations under the License.
]]--
local rspamd_logger = require 'rspamd_logger'
-local rspamd_http = require "rspamd_http"
local rspamd_lua_utils = require "lua_util"
local upstream_list = require "rspamd_upstream_list"
local lua_util = require "lua_util"
@@ -28,16 +27,10 @@ if confighelp then
return
end
-local main_rows = {}
-local attachment_rows = {}
-local urls_rows = {}
-local emails_rows = {}
---local specific_rows = {}
-local asn_rows = {}
-local symbols_rows = {}
+local data_rows = {}
local custom_rows = {}
local nrows = 0
-local connect_prefix = 'http://'
+local schema_version = 2 -- Current schema version
local settings = {
limit = 1000,
@@ -52,12 +45,6 @@ local settings = {
dmarc_allow_symbols = {'DMARC_POLICY_ALLOW'},
dmarc_reject_symbols = {'DMARC_POLICY_REJECT', 'DMARC_POLICY_QUARANTINE'},
stop_symbols = {},
- table = 'rspamd',
- attachments_table = 'rspamd_attachments',
- urls_table = 'rspamd_urls',
- emails_table = 'rspamd_emails',
- symbols_table = 'rspamd_symbols',
- asn_table = 'rspamd_asn',
ipmask = 19,
ipmask6 = 48,
full_urls = false,
@@ -78,9 +65,9 @@ local settings = {
}
}
-local clickhouse_schema = {
-table = [[
-CREATE TABLE IF NOT EXISTS ${table}
+--- @language SQL
+local clickhouse_schema = {[[
+CREATE TABLE rspamd
(
Date Date,
TS DateTime,
@@ -103,60 +90,52 @@ CREATE TABLE IF NOT EXISTS ${table}
RcptUser String,
RcptDomain String,
ListId String,
- Digest FixedString(32)
-) ENGINE = MergeTree(Date, (TS, From), 8192)
-]],
-
-attachments_table = [[
-CREATE TABLE IF NOT EXISTS ${attachments_table} (
- Date Date,
- Digest FixedString(32),
`Attachments.FileName` Array(String),
`Attachments.ContentType` Array(String),
`Attachments.Length` Array(UInt32),
- `Attachments.Digest` Array(FixedString(16))
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-urls_table = [[
-CREATE TABLE IF NOT EXISTS ${urls_table} (
- Date Date,
- Digest FixedString(32),
+ `Attachments.Digest` Array(FixedString(16)),
`Urls.Tld` Array(String),
- `Urls.Url` Array(String)
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-emails_table = [[
-CREATE TABLE IF NOT EXISTS ${emails_table} (
- Date Date,
- Digest FixedString(32),
- Emails Array(String)
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-asn_table = [[
-CREATE TABLE IF NOT EXISTS ${asn_table} (
- Date Date,
- Digest FixedString(32),
+ `Urls.Url` Array(String),
+ Emails Array(String),
ASN String,
Country FixedString(2),
- IPNet String
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-symbols_table = [[
-CREATE TABLE IF NOT EXISTS ${symbols_table} (
- Date Date,
- Digest FixedString(32),
+ IPNet String,
`Symbols.Names` Array(String),
`Symbols.Scores` Array(Float64),
- `Symbols.Options` Array(String)
-) ENGINE = MergeTree(Date, Digest, 8192)
-]]
+ `Symbols.Options` Array(String),
+ Digest FixedString(32)
+) ENGINE = MergeTree(Date, (TS, From), 8192)
+]],
+[[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]],
+[[INSERT INTO rspamd_version (Version) Values (2)]],
}
-local function clickhouse_main_row(tname)
+-- This describes SQL queries to migrate between versions
+local migrations = {
+ [1] = {
+ -- Move to a wide fat table
+ [[ALTER TABLE rspamd
+ ADD COLUMN `Attachments.FileName` Array(String) AFTER ListId,
+ ADD COLUMN `Attachments.ContentType` Array(String) AFTER `Attachments.FileName`,
+ ADD COLUMN `Attachments.Length` Array(UInt32) AFTER `Attachments.ContentType`,
+ ADD COLUMN `Attachments.Digest` Array(FixedString(16)) AFTER `Attachments.Length`,
+ ADD COLUMN `Urls.Tld` Array(String) AFTER `Attachments.Digest`,
+ ADD COLUMN `Urls.Url` Array(String) AFTER `Urls.Tld`,
+ ADD COLUMN Emails Array(String) AFTER `Urls.Url`,
+ ADD COLUMN ASN String AFTER Emails,
+ ADD COLUMN Country FixedString(2) AFTER ASN,
+ ADD COLUMN IPNet String AFTER Country,
+ ADD COLUMN `Symbols.Names` Array(String) AFTER IPNet,
+ ADD COLUMN `Symbols.Scores` Array(Float64) AFTER `Symbols.Names`,
+ ADD COLUMN `Symbols.Options` Array(String) AFTER `Symbols.Scores`]],
+ -- Add explicit version
+ [[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]],
+ [[INSERT INTO rspamd_version (Version) Values (2)]],
+ }
+}
+
+
+local function clickhouse_main_row(res)
local fields = {
'Date',
'TS',
@@ -181,73 +160,52 @@ local function clickhouse_main_row(tname)
'ListId',
'Digest'
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_attachments_row(tname)
- local attachement_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_attachments_row(res)
+ local fields = {
'Attachments.FileName',
'Attachments.ContentType',
'Attachments.Length',
'Attachments.Digest',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(attachement_fields, ','))
- return elt
+
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_urls_row(tname)
- local urls_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_urls_row(res)
+ local fields = {
'Urls.Tld',
'Urls.Url',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(urls_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_emails_row(tname)
- local emails_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_emails_row(res)
+ local fields = {
'Emails',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(emails_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_symbols_row(tname)
- local symbols_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_symbols_row(res)
+ local fields = {
'Symbols.Names',
'Symbols.Scores',
'Symbols.Options',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(symbols_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_asn_row(tname)
- local asn_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_asn_row(res)
+ local fields = {
'ASN',
'Country',
'IPNet',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(asn_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
local function today(ts)
@@ -304,34 +262,19 @@ local function clickhouse_send_data(task)
end
end
- send_data('generic data', main_rows,
- clickhouse_main_row(settings['table']))
-
+ local fields = {}
+ clickhouse_main_row(fields)
+ clickhouse_attachments_row(fields)
+ clickhouse_urls_row(fields)
+ clickhouse_emails_row(fields)
+ clickhouse_asn_row(fields)
- if #attachment_rows > 1 then
- send_data('attachments data', attachment_rows,
- clickhouse_attachments_row(settings.attachments_table))
+ if settings.enable_symbols then
+ clickhouse_symbols_row(fields)
end
- if #urls_rows > 1 then
- send_data('urls data', urls_rows,
- clickhouse_urls_row(settings.urls_table))
- end
-
- if #emails_rows > 1 then
- send_data('emails data', emails_rows,
- clickhouse_emails_row(settings.emails_table))
- end
-
- if #asn_rows > 1 then
- send_data('asn data', asn_rows,
- clickhouse_asn_row(settings.asn_table))
- end
-
- if #symbols_rows > 1 then
- send_data('symbols data', symbols_rows,
- clickhouse_symbols_row(settings.symbols_table))
- end
+ send_data('generic data', data_rows,
+ string.format('INSERT INTO rspamd (%s)', table.concat(fields, ',')))
for k,crows in pairs(custom_rows) do
if #crows > 1 then
@@ -490,7 +433,7 @@ local function clickhouse_collect(task)
local action = task:get_metric_action('default')
local digest = task:get_digest()
- table.insert(main_rows, {
+ local row = {
today(timestamp),
timestamp,
from_domain,
@@ -512,25 +455,8 @@ local function clickhouse_collect(task)
rcpt_user,
rcpt_domain,
list_id,
- task:get_digest()
- })
-
---[[ TODO: has been broken
- if settings['from_map'] and dkim == 'allow' then
- -- Use dkim
- local das = task:get_symbol(settings['dkim_allow_symbols'][1])
- if ((das or E)[1] or E).options then
- for _,dkim_domain in ipairs(das[1]['options']) do
- local specific = settings.from_map:get_key(dkim_domain)
- if specific then
- specific_rows[specific] = {}
- table.insert(specific_rows[specific], elt)
- end
- end
- end
-
- end
---]]
+ digest
+ }
-- Attachments step
local attachments_fnames = {}
@@ -551,14 +477,15 @@ local function clickhouse_collect(task)
end
if #attachments_fnames > 0 then
- table.insert(attachment_rows, {
- today(timestamp),
- digest,
- attachments_fnames,
- attachments_ctypes,
- attachments_lengths,
- attachments_digests,
- })
+ table.insert(row, attachments_fnames)
+ table.insert(row, attachments_ctypes)
+ table.insert(row, attachments_lengths)
+ table.insert(row, attachments_digests)
+ else
+ table.insert(row, {})
+ table.insert(row, {})
+ table.insert(row, {})
+ table.insert(row, {})
end
-- Urls step
@@ -576,58 +503,43 @@ local function clickhouse_collect(task)
end
if #urls_tlds > 0 then
- table.insert(urls_rows, {
- today(timestamp),
- digest,
- urls_tlds,
- urls_urls
- })
+ table.insert(row, urls_tlds)
+ table.insert(row, urls_urls)
+ else
+ table.insert(row, {})
+ table.insert(row, {})
end
-- Emails step
- local emails = {}
if task:has_urls(true) then
- for _,u in ipairs(task:get_emails()) do
- table.insert(emails,
- string.format('%s@%s', u:get_user(), u:get_host()))
- end
- end
-
- if #emails > 0 then
- table.insert(emails_rows, {
- today(timestamp),
- digest,
- emails,
- })
+ table.insert(row, fun.totable(fun.map(function(u)
+ return string.format('%s@%s', u:get_user(), u:get_host())
+ end, task:get_emails())))
+ else
+ table.insert(row, {})
end
-- ASN information
- if settings['asn_table'] then
- local asn, country, ipnet = '--', '--', '--'
- local pool = task:get_mempool()
- ret = pool:get_variable("asn")
- if ret then
- asn = ret
- end
- ret = pool:get_variable("country")
- if ret then
- country = ret:sub(1, 2)
- end
- ret = pool:get_variable("ipnet")
- if ret then
- ipnet = ret
- end
- table.insert(asn_rows, {
- today(timestamp),
- digest,
- asn,
- country,
- ipnet
- })
+ local asn, country, ipnet = '--', '--', '--'
+ local pool = task:get_mempool()
+ ret = pool:get_variable("asn")
+ if ret then
+ asn = ret
+ end
+ ret = pool:get_variable("country")
+ if ret then
+ country = ret:sub(1, 2)
+ end
+ ret = pool:get_variable("ipnet")
+ if ret then
+ ipnet = ret
end
+ table.insert(row, asn)
+ table.insert(row, country)
+ table.insert(row, ipnet)
-- Symbols info
- if settings.enable_symbols and settings['symbols_table'] then
+ if settings.enable_symbols then
local symbols = task:get_symbols_all()
local syms_tab = {}
local scores_tab = {}
@@ -643,14 +555,9 @@ local function clickhouse_collect(task)
table.insert(options_tab, '');
end
end
-
- table.insert(symbols_rows, {
- today(timestamp),
- digest,
- syms_tab,
- scores_tab,
- options_tab
- })
+ table.insert(row, syms_tab)
+ table.insert(row, scores_tab)
+ table.insert(row, options_tab)
end
-- Custom data
@@ -660,17 +567,13 @@ local function clickhouse_collect(task)
end
nrows = nrows + 1
+ table.insert(data_rows, row)
rspamd_logger.debugm(N, task, "add clickhouse row %s / %s", nrows, settings.limit)
if nrows > settings['limit'] then
clickhouse_send_data(task)
nrows = 0
- main_rows = {}
- attachment_rows = {}
- urls_rows = {}
- emails_rows = {}
- asn_rows = {}
- symbols_rows = {}
+ data_rows = {}
custom_rows = {}
end
end
@@ -803,6 +706,163 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
return settings.retention.period
end
+local function upload_clickhouse_schema(upstream, ev_base, cfg)
+ local ch_params = {
+ ev_base = ev_base,
+ config = cfg,
+ }
+ -- Apply schema sequentially
+ local function sql_recursor(i)
+ if clickhouse_schema[i] then
+ local sql = clickhouse_schema[i]
+ local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
+ function(_, _)
+ rspamd_logger.infox(rspamd_config, 'uploaded clickhouse schema element %s to %s',
+ i, upstream:get_addr():to_string(true))
+ sql_recursor(i + 1)
+ end,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot upload schema '%s' on clickhouse server %s: %s",
+ sql, upstream:get_addr():to_string(true), err)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: cannot make request",
+ sql, upstream:get_addr():to_string(true))
+ end
+ end
+ end
+
+ sql_recursor(1)
+end
+
+local function maybe_apply_migrations(upstream, ev_base, cfg, version)
+ local ch_params = {
+ ev_base = ev_base,
+ config = cfg,
+ }
+ -- Apply migrations sequentially
+ local function migration_recursor(i)
+ if i < schema_version then
+ if migrations[i] then
+ -- We also need to apply statements sequentially
+ local function sql_recursor(j)
+ if migrations[i][j] then
+ local sql = migrations[i][j]
+ local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
+ function(_, _)
+ rspamd_logger.infox(rspamd_config,
+ 'applied migration to version %s from version %s: %s',
+ i + 1, version, sql:gsub('[\n%s]+', ' '))
+ if j == #migrations[i] then
+ -- Go to the next migration
+ migration_recursor(i + 1)
+ else
+ -- Apply the next statement
+ sql_recursor(j + 1)
+ end
+ end ,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot apply migration %s: '%s' on clickhouse server %s: %s",
+ i, sql, upstream:get_addr():to_string(true), err)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config,
+ "cannot apply migration %s: '%s' on clickhouse server %s: cannot make request",
+ i, sql, upstream:get_addr():to_string(true))
+ end
+ end
+ end
+
+ sql_recursor(1)
+ else
+ -- Try another migration
+ migration_recursor(i + 1)
+ end
+ end
+ end
+
+ migration_recursor(version)
+end
+
+local function check_rspamd_table(upstream, ev_base, cfg)
+ local ch_params = {
+ ev_base = ev_base,
+ config = cfg,
+ }
+ local sql = [[EXISTS TABLE rspamd]]
+ local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
+ function(_, rows)
+ if rows[1] and rows[1].result then
+ if tonumber(rows[1].result) == 1 then
+ -- Apply migration
+ rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration')
+ maybe_apply_migrations(upstream, ev_base, cfg, 1)
+ else
+ -- Upload schema
+ rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema')
+ upload_clickhouse_schema(upstream, ev_base, cfg)
+ end
+ else
+ rspamd_logger.errx(rspamd_config,
+ "unexpected reply on EXISTS command from server %s: %s",
+ upstream:get_addr():to_string(true), rows)
+ end
+ end ,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot check if rspamd table exists on clickhouse server %s: %s",
+ upstream:get_addr():to_string(true), err)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: cannot make request",
+ upstream:get_addr():to_string(true))
+ end
+end
+
+
+local function check_clickhouse_upstream(upstream, ev_base, cfg)
+ local ch_params = {
+ ev_base = ev_base,
+ config = cfg,
+ }
+ -- If we have some custom rules, we just send its schema to the upstream
+ for k,rule in pairs(settings.custom_rules) do
+ if rule.schema then
+ local sql = rspamd_lua_utils.template(rule.schema, settings)
+ local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
+ nil,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot send custom schema %s to clickhouse server %s: %s",
+ k, upstream:get_addr():to_string(true), err)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request",
+ k, upstream:get_addr():to_string(true))
+ end
+ end
+ end
+
+ -- Now check the main schema and apply migrations if needed
+ local sql = [[SELECT MAX(Version) as v FROM rspamd_version]]
+ local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
+ function(_, rows)
+ local version = tonumber(rows[1].v)
+ maybe_apply_migrations(upstream, ev_base, cfg, version)
+ end,
+ function(_, err)
+ -- It might be either no rspamd table or version 1
+ rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table')
+ check_rspamd_table(upstream, ev_base, cfg)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config, "cannot get version on clickhouse server %s: cannot make request",
+ upstream:get_addr():to_string(true))
+ end
+end
+
local opts = rspamd_config:get_all_opt('clickhouse')
if opts then
for k,v in pairs(opts) do
@@ -856,9 +916,6 @@ if opts then
else
settings['from_map'] = rspamd_map_add('clickhouse', 'from_tables',
'regexp', 'clickhouse specific domains')
- if settings.use_https then
- connect_prefix = 'https://'
- end
settings.upstream = upstream_list.create(rspamd_config,
settings['server'] or settings['servers'], 8123)
@@ -888,46 +945,7 @@ if opts then
local upstreams = settings.upstream:all_upstreams()
for _,up in ipairs(upstreams) do
- local ip_addr = up:get_addr():to_string(true)
-
- local function send_req(elt, sql)
- local function http_cb(err_message, code, data, _)
- if code ~= 200 or err_message then
- if not err_message then err_message = data end
- rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: %s",
- elt, ip_addr, err_message)
- up:fail()
- else
- up:ok()
- end
- end
-
- if not rspamd_http.request({
- ev_base = ev_base,
- config = cfg,
- url = connect_prefix .. ip_addr,
- body = sql,
- callback = http_cb,
- mime_type = 'text/plain',
- timeout = settings['timeout'],
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- }) then
- rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request",
- elt, ip_addr)
- end
- end
-
- for tab,sql in pairs(clickhouse_schema) do
- send_req(tab, rspamd_lua_utils.template(sql, settings))
- end
-
- for k,rule in pairs(settings.custom_rules) do
- if rule.schema then
- send_req(k, rspamd_lua_utils.template(rule.schema, settings))
- end
- end
+ check_clickhouse_upstream(up, ev_base, cfg)
end
if settings.retention.enable and settings.retention.method ~= 'drop' and settings.retention.method ~= 'detach' then