summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--conf/mime_types.inc4
-rw-r--r--contrib/librdns/resolver.c2
-rw-r--r--lualib/lua_clickhouse.lua35
-rw-r--r--src/libutil/ssl_util.c12
-rw-r--r--src/plugins/lua/clickhouse.lua522
-rw-r--r--test/functional/configs/plugins.conf230
6 files changed, 539 insertions, 266 deletions
diff --git a/conf/mime_types.inc b/conf/mime_types.inc
index e63dff09a..7c480d0fe 100644
--- a/conf/mime_types.inc
+++ b/conf/mime_types.inc
@@ -203,8 +203,8 @@ application/pidf+xml 0
application/pidf-diff+xml 0
application/pkcs10 0
application/pkcs12 0
-application/pkcs7-mime 0
-application/pkcs7-signature 0
+application/pkcs7-mime -1
+application/pkcs7-signature -1
application/pkcs8 0
application/pkix-attr-cert 0
application/pkix-cert 0
diff --git a/contrib/librdns/resolver.c b/contrib/librdns/resolver.c
index 3abb17304..b9b156c5e 100644
--- a/contrib/librdns/resolver.c
+++ b/contrib/librdns/resolver.c
@@ -994,6 +994,8 @@ void rdns_resolver_set_fake_reply (struct rdns_resolver *resolver,
abort ();
}
+ fake_rep->rcode = rcode;
+
memcpy (&fake_rep->key, srch, sizeof (*srch) + len);
if (reply) {
diff --git a/lualib/lua_clickhouse.lua b/lualib/lua_clickhouse.lua
index 3f3c4de40..01d1f0b2b 100644
--- a/lualib/lua_clickhouse.lua
+++ b/lualib/lua_clickhouse.lua
@@ -76,7 +76,6 @@ local function parse_clickhouse_response(params, data)
local lua_util = require "lua_util"
local ucl = require "ucl"
- rspamd_logger.debugm(N, params.log_obj, "got clickhouse response: %s", data)
if data == nil then
-- clickhouse returned no data (i.e. empty result set): exiting
return {}
@@ -113,27 +112,35 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
if code ~= 200 or err_message then
if not err_message then err_message = data end
local ip_addr = upstream:get_addr():to_string(true)
- rspamd_logger.errx(params.log_obj,
- "request failed on clickhouse server %s: %s",
- ip_addr, err_message)
if fail_cb then
fail_cb(params, err_message, data)
+ else
+ rspamd_logger.errx(params.log_obj,
+ "request failed on clickhouse server %s: %s",
+ ip_addr, err_message)
end
upstream:fail()
else
upstream:ok()
- rspamd_logger.debugm(N, params.log_obj,
- "http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
local rows = parse_clickhouse_response(params, data)
if rows then
if ok_cb then
ok_cb(params, rows)
+ else
+ rspamd_logger.debugm(N, params.log_obj,
+ "http_select_cb ok: %s, %s, %s, %s", err_message, code,
+ data:gsub('[\n%s]+', ' '), _)
end
else
if fail_cb then
fail_cb(params, 'failed to parse reply', data)
+ else
+ local ip_addr = upstream:get_addr():to_string(true)
+ rspamd_logger.errx(params.log_obj,
+ "request failed on clickhouse server %s: %s",
+ ip_addr, 'failed to parse reply')
end
end
end
@@ -148,21 +155,24 @@ local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
if code ~= 200 or err_message then
if not err_message then err_message = data end
local ip_addr = upstream:get_addr():to_string(true)
- rspamd_logger.errx(params.log_obj,
- "request failed on clickhouse server %s: %s",
- ip_addr, err_message)
if fail_cb then
fail_cb(params, err_message, data)
+ else
+ rspamd_logger.errx(params.log_obj,
+ "request failed on clickhouse server %s: %s",
+ ip_addr, err_message)
end
upstream:fail()
else
upstream:ok()
- rspamd_logger.debugm(N, params.log_obj,
- "http_cb ok: %s, %s, %s, %s", err_message, code, data, _)
if ok_cb then
ok_cb(params, data)
+ else
+ rspamd_logger.debugm(N, params.log_obj,
+ "http_insert_cb ok: %s, %s, %s, %s", err_message, code,
+ data:gsub('[\n%s]+', ' '), _)
end
end
end
@@ -204,7 +214,7 @@ exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
http_params.body = query
http_params.log_obj = params.task or params.config
- rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", params.body)
+ rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
if not http_params.url then
local connect_prefix = "http://"
@@ -306,6 +316,7 @@ exports.generic = function (upstream, settings, params, query,
http_params.user = settings.user
http_params.password = settings.password
http_params.log_obj = params.task or params.config
+ http_params.body = query
if not http_params.url then
local connect_prefix = "http://"
diff --git a/src/libutil/ssl_util.c b/src/libutil/ssl_util.c
index a90bd5e36..1eab5821f 100644
--- a/src/libutil/ssl_util.c
+++ b/src/libutil/ssl_util.c
@@ -746,6 +746,18 @@ void
rspamd_ssl_connection_free (struct rspamd_ssl_connection *conn)
{
if (conn) {
+ /*
+ * SSL_RECEIVED_SHUTDOWN tells SSL_shutdown to act as if we had already
+ * received a close notify from the other end. SSL_shutdown will then
+ * send the final close notify in reply. The other end will receive the
+ * close notify and send theirs. By this time, we will have already
+ * closed the socket and the other end's real close notify will never be
+ * received. In effect, both sides will think that they have completed a
+ * clean shutdown and keep their sessions valid. This strategy will fail
+ * if the socket is not ready for writing, in which case this hack will
+ * lead to an unclean shutdown and lost session on the other end.
+ */
+ SSL_set_shutdown (conn->ssl, SSL_RECEIVED_SHUTDOWN);
SSL_shutdown (conn->ssl);
SSL_free (conn->ssl);
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
index 444465814..c2375d765 100644
--- a/src/plugins/lua/clickhouse.lua
+++ b/src/plugins/lua/clickhouse.lua
@@ -15,7 +15,6 @@ limitations under the License.
]]--
local rspamd_logger = require 'rspamd_logger'
-local rspamd_http = require "rspamd_http"
local rspamd_lua_utils = require "lua_util"
local upstream_list = require "rspamd_upstream_list"
local lua_util = require "lua_util"
@@ -28,16 +27,10 @@ if confighelp then
return
end
-local main_rows = {}
-local attachment_rows = {}
-local urls_rows = {}
-local emails_rows = {}
---local specific_rows = {}
-local asn_rows = {}
-local symbols_rows = {}
+local data_rows = {}
local custom_rows = {}
local nrows = 0
-local connect_prefix = 'http://'
+local schema_version = 2 -- Current schema version
local settings = {
limit = 1000,
@@ -52,12 +45,6 @@ local settings = {
dmarc_allow_symbols = {'DMARC_POLICY_ALLOW'},
dmarc_reject_symbols = {'DMARC_POLICY_REJECT', 'DMARC_POLICY_QUARANTINE'},
stop_symbols = {},
- table = 'rspamd',
- attachments_table = 'rspamd_attachments',
- urls_table = 'rspamd_urls',
- emails_table = 'rspamd_emails',
- symbols_table = 'rspamd_symbols',
- asn_table = 'rspamd_asn',
ipmask = 19,
ipmask6 = 48,
full_urls = false,
@@ -78,9 +65,9 @@ local settings = {
}
}
-local clickhouse_schema = {
-table = [[
-CREATE TABLE IF NOT EXISTS ${table}
+--- @language SQL
+local clickhouse_schema = {[[
+CREATE TABLE rspamd
(
Date Date,
TS DateTime,
@@ -103,60 +90,52 @@ CREATE TABLE IF NOT EXISTS ${table}
RcptUser String,
RcptDomain String,
ListId String,
- Digest FixedString(32)
-) ENGINE = MergeTree(Date, (TS, From), 8192)
-]],
-
-attachments_table = [[
-CREATE TABLE IF NOT EXISTS ${attachments_table} (
- Date Date,
- Digest FixedString(32),
`Attachments.FileName` Array(String),
`Attachments.ContentType` Array(String),
`Attachments.Length` Array(UInt32),
- `Attachments.Digest` Array(FixedString(16))
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-urls_table = [[
-CREATE TABLE IF NOT EXISTS ${urls_table} (
- Date Date,
- Digest FixedString(32),
+ `Attachments.Digest` Array(FixedString(16)),
`Urls.Tld` Array(String),
- `Urls.Url` Array(String)
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-emails_table = [[
-CREATE TABLE IF NOT EXISTS ${emails_table} (
- Date Date,
- Digest FixedString(32),
- Emails Array(String)
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-asn_table = [[
-CREATE TABLE IF NOT EXISTS ${asn_table} (
- Date Date,
- Digest FixedString(32),
+ `Urls.Url` Array(String),
+ Emails Array(String),
ASN String,
Country FixedString(2),
- IPNet String
-) ENGINE = MergeTree(Date, Digest, 8192)
-]],
-
-symbols_table = [[
-CREATE TABLE IF NOT EXISTS ${symbols_table} (
- Date Date,
- Digest FixedString(32),
+ IPNet String,
`Symbols.Names` Array(String),
`Symbols.Scores` Array(Float64),
- `Symbols.Options` Array(String)
-) ENGINE = MergeTree(Date, Digest, 8192)
-]]
+ `Symbols.Options` Array(String),
+ Digest FixedString(32)
+) ENGINE = MergeTree(Date, (TS, From), 8192)
+]],
+[[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]],
+[[INSERT INTO rspamd_version (Version) Values (2)]],
}
-local function clickhouse_main_row(tname)
+-- This describes SQL queries to migrate between versions
+local migrations = {
+ [1] = {
+ -- Move to a wide fat table
+ [[ALTER TABLE rspamd
+ ADD COLUMN `Attachments.FileName` Array(String) AFTER ListId,
+ ADD COLUMN `Attachments.ContentType` Array(String) AFTER `Attachments.FileName`,
+ ADD COLUMN `Attachments.Length` Array(UInt32) AFTER `Attachments.ContentType`,
+ ADD COLUMN `Attachments.Digest` Array(FixedString(16)) AFTER `Attachments.Length`,
+ ADD COLUMN `Urls.Tld` Array(String) AFTER `Attachments.Digest`,
+ ADD COLUMN `Urls.Url` Array(String) AFTER `Urls.Tld`,
+ ADD COLUMN Emails Array(String) AFTER `Urls.Url`,
+ ADD COLUMN ASN String AFTER Emails,
+ ADD COLUMN Country FixedString(2) AFTER ASN,
+ ADD COLUMN IPNet String AFTER Country,
+ ADD COLUMN `Symbols.Names` Array(String) AFTER IPNet,
+ ADD COLUMN `Symbols.Scores` Array(Float64) AFTER `Symbols.Names`,
+ ADD COLUMN `Symbols.Options` Array(String) AFTER `Symbols.Scores`]],
+ -- Add explicit version
+ [[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]],
+ [[INSERT INTO rspamd_version (Version) Values (2)]],
+ }
+}
+
+
+local function clickhouse_main_row(res)
local fields = {
'Date',
'TS',
@@ -181,73 +160,52 @@ local function clickhouse_main_row(tname)
'ListId',
'Digest'
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_attachments_row(tname)
- local attachement_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_attachments_row(res)
+ local fields = {
'Attachments.FileName',
'Attachments.ContentType',
'Attachments.Length',
'Attachments.Digest',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(attachement_fields, ','))
- return elt
+
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_urls_row(tname)
- local urls_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_urls_row(res)
+ local fields = {
'Urls.Tld',
'Urls.Url',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(urls_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_emails_row(tname)
- local emails_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_emails_row(res)
+ local fields = {
'Emails',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(emails_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_symbols_row(tname)
- local symbols_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_symbols_row(res)
+ local fields = {
'Symbols.Names',
'Symbols.Scores',
'Symbols.Options',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(symbols_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
-local function clickhouse_asn_row(tname)
- local asn_fields = {
- 'Date',
- 'Digest',
+local function clickhouse_asn_row(res)
+ local fields = {
'ASN',
'Country',
'IPNet',
}
- local elt = string.format('INSERT INTO %s (%s) ',
- tname, table.concat(asn_fields, ','))
- return elt
+ for _,v in ipairs(fields) do table.insert(res, v) end
end
local function today(ts)
@@ -304,34 +262,19 @@ local function clickhouse_send_data(task)
end
end
- send_data('generic data', main_rows,
- clickhouse_main_row(settings['table']))
-
+ local fields = {}
+ clickhouse_main_row(fields)
+ clickhouse_attachments_row(fields)
+ clickhouse_urls_row(fields)
+ clickhouse_emails_row(fields)
+ clickhouse_asn_row(fields)
- if #attachment_rows > 1 then
- send_data('attachments data', attachment_rows,
- clickhouse_attachments_row(settings.attachments_table))
+ if settings.enable_symbols then
+ clickhouse_symbols_row(fields)
end
- if #urls_rows > 1 then
- send_data('urls data', urls_rows,
- clickhouse_urls_row(settings.urls_table))
- end
-
- if #emails_rows > 1 then
- send_data('emails data', emails_rows,
- clickhouse_emails_row(settings.emails_table))
- end
-
- if #asn_rows > 1 then
- send_data('asn data', asn_rows,
- clickhouse_asn_row(settings.asn_table))
- end
-
- if #symbols_rows > 1 then
- send_data('symbols data', symbols_rows,
- clickhouse_symbols_row(settings.symbols_table))
- end
+ send_data('generic data', data_rows,
+ string.format('INSERT INTO rspamd (%s)', table.concat(fields, ',')))
for k,crows in pairs(custom_rows) do
if #crows > 1 then
@@ -490,7 +433,7 @@ local function clickhouse_collect(task)
local action = task:get_metric_action('default')
local digest = task:get_digest()
- table.insert(main_rows, {
+ local row = {
today(timestamp),
timestamp,
from_domain,
@@ -512,25 +455,8 @@ local function clickhouse_collect(task)
rcpt_user,
rcpt_domain,
list_id,
- task:get_digest()
- })
-
---[[ TODO: has been broken
- if settings['from_map'] and dkim == 'allow' then
- -- Use dkim
- local das = task:get_symbol(settings['dkim_allow_symbols'][1])
- if ((das or E)[1] or E).options then
- for _,dkim_domain in ipairs(das[1]['options']) do
- local specific = settings.from_map:get_key(dkim_domain)
- if specific then
- specific_rows[specific] = {}
- table.insert(specific_rows[specific], elt)
- end
- end
- end
-
- end
---]]
+ digest
+ }
-- Attachments step
local attachments_fnames = {}
@@ -551,14 +477,15 @@ local function clickhouse_collect(task)
end
if #attachments_fnames > 0 then
- table.insert(attachment_rows, {
- today(timestamp),
- digest,
- attachments_fnames,
- attachments_ctypes,
- attachments_lengths,
- attachments_digests,
- })
+ table.insert(row, attachments_fnames)
+ table.insert(row, attachments_ctypes)
+ table.insert(row, attachments_lengths)
+ table.insert(row, attachments_digests)
+ else
+ table.insert(row, {})
+ table.insert(row, {})
+ table.insert(row, {})
+ table.insert(row, {})
end
-- Urls step
@@ -576,58 +503,43 @@ local function clickhouse_collect(task)
end
if #urls_tlds > 0 then
- table.insert(urls_rows, {
- today(timestamp),
- digest,
- urls_tlds,
- urls_urls
- })
+ table.insert(row, urls_tlds)
+ table.insert(row, urls_urls)
+ else
+ table.insert(row, {})
+ table.insert(row, {})
end
-- Emails step
- local emails = {}
if task:has_urls(true) then
- for _,u in ipairs(task:get_emails()) do
- table.insert(emails,
- string.format('%s@%s', u:get_user(), u:get_host()))
- end
- end
-
- if #emails > 0 then
- table.insert(emails_rows, {
- today(timestamp),
- digest,
- emails,
- })
+ table.insert(row, fun.totable(fun.map(function(u)
+ return string.format('%s@%s', u:get_user(), u:get_host())
+ end, task:get_emails())))
+ else
+ table.insert(row, {})
end
-- ASN information
- if settings['asn_table'] then
- local asn, country, ipnet = '--', '--', '--'
- local pool = task:get_mempool()
- ret = pool:get_variable("asn")
- if ret then
- asn = ret
- end
- ret = pool:get_variable("country")
- if ret then
- country = ret:sub(1, 2)
- end
- ret = pool:get_variable("ipnet")
- if ret then
- ipnet = ret
- end
- table.insert(asn_rows, {
- today(timestamp),
- digest,
- asn,
- country,
- ipnet
- })
+ local asn, country, ipnet = '--', '--', '--'
+ local pool = task:get_mempool()
+ ret = pool:get_variable("asn")
+ if ret then
+ asn = ret
+ end
+ ret = pool:get_variable("country")
+ if ret then
+ country = ret:sub(1, 2)
+ end
+ ret = pool:get_variable("ipnet")
+ if ret then
+ ipnet = ret
end
+ table.insert(row, asn)
+ table.insert(row, country)
+ table.insert(row, ipnet)
-- Symbols info
- if settings.enable_symbols and settings['symbols_table'] then
+ if settings.enable_symbols then
local symbols = task:get_symbols_all()
local syms_tab = {}
local scores_tab = {}
@@ -643,14 +555,9 @@ local function clickhouse_collect(task)
table.insert(options_tab, '');
end
end
-
- table.insert(symbols_rows, {
- today(timestamp),
- digest,
- syms_tab,
- scores_tab,
- options_tab
- })
+ table.insert(row, syms_tab)
+ table.insert(row, scores_tab)
+ table.insert(row, options_tab)
end
-- Custom data
@@ -660,17 +567,13 @@ local function clickhouse_collect(task)
end
nrows = nrows + 1
+ table.insert(data_rows, row)
rspamd_logger.debugm(N, task, "add clickhouse row %s / %s", nrows, settings.limit)
if nrows > settings['limit'] then
clickhouse_send_data(task)
nrows = 0
- main_rows = {}
- attachment_rows = {}
- urls_rows = {}
- emails_rows = {}
- asn_rows = {}
- symbols_rows = {}
+ data_rows = {}
custom_rows = {}
end
end
@@ -803,6 +706,163 @@ local function clickhouse_remove_old_partitions(cfg, ev_base)
return settings.retention.period
end
+local function upload_clickhouse_schema(upstream, ev_base, cfg)
+ local ch_params = {
+ ev_base = ev_base,
+ config = cfg,
+ }
+ -- Apply schema sequentially
+ local function sql_recursor(i)
+ if clickhouse_schema[i] then
+ local sql = clickhouse_schema[i]
+ local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
+ function(_, _)
+ rspamd_logger.infox(rspamd_config, 'uploaded clickhouse schema element %s to %s',
+ i, upstream:get_addr():to_string(true))
+ sql_recursor(i + 1)
+ end,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot upload schema '%s' on clickhouse server %s: %s",
+ sql, upstream:get_addr():to_string(true), err)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: cannot make request",
+ sql, upstream:get_addr():to_string(true))
+ end
+ end
+ end
+
+ sql_recursor(1)
+end
+
+local function maybe_apply_migrations(upstream, ev_base, cfg, version)
+ local ch_params = {
+ ev_base = ev_base,
+ config = cfg,
+ }
+ -- Apply migrations sequentially
+ local function migration_recursor(i)
+ if i < schema_version then
+ if migrations[i] then
+ -- We also need to apply statements sequentially
+ local function sql_recursor(j)
+ if migrations[i][j] then
+ local sql = migrations[i][j]
+ local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
+ function(_, _)
+ rspamd_logger.infox(rspamd_config,
+ 'applied migration to version %s from version %s: %s',
+ i + 1, version, sql:gsub('[\n%s]+', ' '))
+ if j == #migrations[i] then
+ -- Go to the next migration
+ migration_recursor(i + 1)
+ else
+ -- Apply the next statement
+ sql_recursor(j + 1)
+ end
+ end ,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot apply migration %s: '%s' on clickhouse server %s: %s",
+ i, sql, upstream:get_addr():to_string(true), err)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config,
+ "cannot apply migration %s: '%s' on clickhouse server %s: cannot make request",
+ i, sql, upstream:get_addr():to_string(true))
+ end
+ end
+ end
+
+ sql_recursor(1)
+ else
+ -- Try another migration
+ migration_recursor(i + 1)
+ end
+ end
+ end
+
+ migration_recursor(version)
+end
+
+local function check_rspamd_table(upstream, ev_base, cfg)
+ local ch_params = {
+ ev_base = ev_base,
+ config = cfg,
+ }
+ local sql = [[EXISTS TABLE rspamd]]
+ local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
+ function(_, rows)
+ if rows[1] and rows[1].result then
+ if tonumber(rows[1].result) == 1 then
+ -- Apply migration
+ rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration')
+ maybe_apply_migrations(upstream, ev_base, cfg, 1)
+ else
+ -- Upload schema
+ rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema')
+ upload_clickhouse_schema(upstream, ev_base, cfg)
+ end
+ else
+ rspamd_logger.errx(rspamd_config,
+ "unexpected reply on EXISTS command from server %s: %s",
+ upstream:get_addr():to_string(true), rows)
+ end
+ end ,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot check if rspamd table exists on clickhouse server %s: %s",
+ upstream:get_addr():to_string(true), err)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: cannot make request",
+ upstream:get_addr():to_string(true))
+ end
+end
+
+
+local function check_clickhouse_upstream(upstream, ev_base, cfg)
+ local ch_params = {
+ ev_base = ev_base,
+ config = cfg,
+ }
+ -- If we have some custom rules, we just send its schema to the upstream
+ for k,rule in pairs(settings.custom_rules) do
+ if rule.schema then
+ local sql = rspamd_lua_utils.template(rule.schema, settings)
+ local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
+ nil,
+ function(_, err)
+ rspamd_logger.errx(rspamd_config,
+ "cannot send custom schema %s to clickhouse server %s: %s",
+ k, upstream:get_addr():to_string(true), err)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request",
+ k, upstream:get_addr():to_string(true))
+ end
+ end
+ end
+
+ -- Now check the main schema and apply migrations if needed
+ local sql = [[SELECT MAX(Version) as v FROM rspamd_version]]
+ local ret = lua_clickhouse.select(upstream, settings, ch_params, sql,
+ function(_, rows)
+ local version = tonumber(rows[1].v)
+ maybe_apply_migrations(upstream, ev_base, cfg, version)
+ end,
+ function(_, err)
+ -- It might be either no rspamd table or version 1
+ rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table')
+ check_rspamd_table(upstream, ev_base, cfg)
+ end)
+ if not ret then
+ rspamd_logger.errx(rspamd_config, "cannot get version on clickhouse server %s: cannot make request",
+ upstream:get_addr():to_string(true))
+ end
+end
+
local opts = rspamd_config:get_all_opt('clickhouse')
if opts then
for k,v in pairs(opts) do
@@ -856,9 +916,6 @@ if opts then
else
settings['from_map'] = rspamd_map_add('clickhouse', 'from_tables',
'regexp', 'clickhouse specific domains')
- if settings.use_https then
- connect_prefix = 'https://'
- end
settings.upstream = upstream_list.create(rspamd_config,
settings['server'] or settings['servers'], 8123)
@@ -888,46 +945,7 @@ if opts then
local upstreams = settings.upstream:all_upstreams()
for _,up in ipairs(upstreams) do
- local ip_addr = up:get_addr():to_string(true)
-
- local function send_req(elt, sql)
- local function http_cb(err_message, code, data, _)
- if code ~= 200 or err_message then
- if not err_message then err_message = data end
- rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: %s",
- elt, ip_addr, err_message)
- up:fail()
- else
- up:ok()
- end
- end
-
- if not rspamd_http.request({
- ev_base = ev_base,
- config = cfg,
- url = connect_prefix .. ip_addr,
- body = sql,
- callback = http_cb,
- mime_type = 'text/plain',
- timeout = settings['timeout'],
- no_ssl_verify = settings.no_ssl_verify,
- user = settings.user,
- password = settings.password,
- }) then
- rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request",
- elt, ip_addr)
- end
- end
-
- for tab,sql in pairs(clickhouse_schema) do
- send_req(tab, rspamd_lua_utils.template(sql, settings))
- end
-
- for k,rule in pairs(settings.custom_rules) do
- if rule.schema then
- send_req(k, rspamd_lua_utils.template(rule.schema, settings))
- end
- end
+ check_clickhouse_upstream(up, ev_base, cfg)
end
if settings.retention.enable and settings.retention.method ~= 'drop' and settings.retention.method ~= 'detach' then
diff --git a/test/functional/configs/plugins.conf b/test/functional/configs/plugins.conf
index 65141d5c0..8de4c3c02 100644
--- a/test/functional/configs/plugins.conf
+++ b/test/functional/configs/plugins.conf
@@ -21,6 +21,236 @@ options = {
name = "dkim._domainkey.invalid.za.org",
type = "txt";
replies = ["v=DKIM1; k=rsa; p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDEEXmNGQq7PUrr9Mg4UakTFHgXBCy2DOztkrZm+0OrVWtiRzGluxBkbOWTBwuU3/Yw97yTphBMQxzWFN603/f/KPAQcF/Lc1l+6kmIBBxNXjjGuOK/3PYKZVntUdKmqcQBYfnHdzH2Tohbuyx1a7xqnv6VSChqQrZU4CwkeT3+eQIDAQAB"];
+ },
+ {
+ name = "_dmarc.cacophony.za.org",
+ type = "txt";
+ replies = ["v=DMARC1; p=none; sp=reject"];
+ },
+ {
+ name = "_dmarc.my.mom.za.org",
+ type = "txt";
+ replies = ["v=DMARC1; p=reject"];
+ },
+ {
+ name = "example.net",
+ type = "txt";
+ replies = ["v=spf1 -all"];
+ },
+ {
+ name = "fail4.org.org.za",
+ type = "txt";
+ replies = ["v=spf1 redirect=asdfsfewewrredfs"];
+ },
+ {
+ name = "_dmarc.reject.cacophony.za.org",
+ type = "txt";
+ replies = ["v=DMARC1; p=reject"];
+ },
+ {
+ name = "spf.cacophony.za.org",
+ type = "txt";
+ replies = ["v=spf1 ip4:8.8.4.4 -all"];
+ },
+ {
+ name = "fail7.org.org.za",
+ type = "a";
+ rcode = 'norec';
+ },
+ {
+ name = "fail6.org.org.za",
+ type = "txt";
+ replies = ["v=spf1 ip4:8.8.8.8 mx -all"];
+ },
+ {
+ name = "fail6.org.org.za",
+ type = "mx";
+ rcode = 'norec';
+ },
+ {
+ name = "fail7.org.org.za",
+ type = "aaaa";
+ rcode = 'norec';
+ },
+ {
+ name = "_dmarc.quarantine.cacophony.za.org",
+ type = "txt";
+ replies = ["v=DMARC1; p=quarantine"];
+ },
+ {
+ name = "_dmarc.yo.mom.za.org",
+ type = "txt";
+ replies = ["v=DMARC1; p=reject; aspf=s; adkim=s;"];
+ },
+ {
+ name = "yo.mom.za.org",
+ type = "txt";
+ replies = ["v=spf1 ip4:37.48.67.26 -all"];
+ },
+ {
+ name = "testdkim._domainkey.mom.za.org",
+ type = "txt";
+ replies = ["v=DKIM1; k=rsa; p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC3v4VPE1QMHUzsMRbC8VzXNq82mDjiv9Gi1NB/YYC+vIYZT+sE/Uxnr0Clk8C2jgzEr3jcxgQEWZfMtEEg/EfEJvh4SrXWv9c0gw1EEfxKxX9i+r8yBQtc/EWospWVDkhF2lAvQAK1lV1ZiU7psJ6fh1CI39uZyWdAktZzWLf0zQIDAQAB"];
+ },
+ {
+ name = "_dmarc.rspamd.tk",
+ type = "txt";
+ replies = ["bio=a263adeab8acdcdb8b89e127b67d696061fdfbee"];
+ },
+ {
+ name = "fail2.org.org.za",
+ type = "txt";
+ replies = ["v=spf1 ip4:8.8.4.4 include:www.dnssec-failed.org -all"];
+ },
+ {
+ name = "fail3.org.org.za",
+ type = "txt";
+ replies = ["v=spf1 ip4:8.8.8.8 include:total.barf -all"];
+ },
+ {
+ name = "mom.za.org",
+ type = "txt";
+ replies = ["v=spf1 ip4:37.48.67.26 -all"];
+ },
+ {
+ name = "testdkim._domainkey.asdf.rspamd.tk", # testdkim._domainkey.asdf.rspamd.tk is an alias for rspamd.tk
+ type = "txt";
+ replies = ["bio=a263adeab8acdcdb8b89e127b67d696061fdfbee"];
+ },
+ {
+ name = "testdkim._domainkey.rspamd.tk", # testdkim._domainkey.rspamd.tk is an alias for rspamd.tk
+ type = "txt";
+ replies = ["bio=a263adeab8acdcdb8b89e127b67d696061fdfbee"];
+ },
+ {
+ name = "pass1.org.org.za",
+ type = "txt";
+ replies = ["v=spf1 include:pass2.org.org.za -all"];
+ },
+ {
+ name = "95.142.99.88.in-addr.arpa",
+ type = "ptr";
+ replies = ["mail.highsecure.ru"];
+ },
+ {
+ name = "mail.highsecure.ru",
+ type = "a";
+ replies = ["88.99.142.95"];
+ },
+ {
+ name = "mail.highsecure.ru",
+ type = "aaaa";
+ rcode = 'norec';
+ },
+ {
+ name = "1.0.66.128.in-addr.arpa",
+ type = "ptr";
+ rcode = 'nxdomain';
+ },
+ {
+ name = "182.216.85.209.in-addr.arpa",
+ type = "ptr";
+ replies = ["mail-qt0-f182.google.com"];
+ },
+ {
+ name = "crazyspf.cacophony.za.org",
+ type = "txt";
+ replies = ["v=spf1 ptr:cacophony.za.org ptr:rspamd.com ptr:yahoo.com ptr:yahoo.net ptr:highsecure.ru -all"];
+ },
+ {
+ name = "pass2.org.org.za",
+ type = "txt";
+ replies = ["v=spf1 ip4:8.8.8.8 -all"];
+ },
+ {
+ name = "_dmarc.yoni.za.org",
+ type = "txt";
+ replies = ["v=DMARC1; p=reject; sp=none;"];
+ },
+ {
+ name = "fail10.org.org.za",
+ type = "txt";
+ replies = ["v=spf1 redirect=fail5.org.org.za"];
+ },
+ {
+ name = "fail5.org.org.za",
+ type = "txt";
+ replies = ["v=spf1 OMGBARF"];
+ },
+ {
+ name = "fail7.org.org.za",
+ type = "txt";
+ replies = ["v=spf1 ip4:8.8.8.8 a -all"];
+ },
+ {
+ name = "co.za",
+ type = "txt";
+ rcode = 'norec';
+ },
+ {
+ name = "testdkim1._domainkey.yoni.za.org",
+ type = "txt";
+ replies = ["v=DKIM1; k=rsa; p=BARF"];
+ },
+ {
+ name = "_dmarc.yoni.za.net",
+ type = "txt";
+ replies = ["v=DMARC1; p=none; sp=quarantine"];
+ },
+ {
+ name = "za",
+ type = "txt";
+ replies = ["Top-level domain for South Africa"];
+ },
+ {
+ name = "_dmarc.foo.yoni.za.org",
+ type = "txt";
+ rcode = 'nxdomain';
+ },
+ {
+ name = "_dmarc.foo.cacophony.za.org",
+ type = "txt";
+ rcode = 'nxdomain';
+ },
+ {
+ name = "_dmarc.foo.yoni.za.net",
+ type = "txt";
+ rcode = 'nxdomain';
+ },
+ {
+ name = "_dmarc.dnssec-failed.org",
+ type = "txt";
+ rcode = 'timeout';
+ },
+ {
+ name = "_dmarc.example.com",
+ type = "txt";
+ rcode = 'nxdomain';
+ },
+ {
+ name = "testdkim1._domainkey.dnssec-failed.org",
+ type = "txt";
+ rcode = 'timeout';
+ },
+ {
+ name = "total.barf",
+ type = "txt";
+ rcode = 'nxdomain';
+ },
+ {
+ name = "_dmarc.foo.cacophony.za.org",
+ type = "txt";
+ rcode = 'nxdomain';
+ },
+ {
+ name = "zzzzaaaa",
+ type = "txt";
+ rcode = 'nxdomain';
+ },
+ {
+ name = "asdfsfewewrredfs",
+ type = "txt";
+ rcode = 'nxdomain';
}];
}
}