diff options
-rw-r--r-- | conf/mime_types.inc | 4 | ||||
-rw-r--r-- | contrib/librdns/resolver.c | 2 | ||||
-rw-r--r-- | lualib/lua_clickhouse.lua | 35 | ||||
-rw-r--r-- | src/libutil/ssl_util.c | 12 | ||||
-rw-r--r-- | src/plugins/lua/clickhouse.lua | 522 | ||||
-rw-r--r-- | test/functional/configs/plugins.conf | 230 |
6 files changed, 539 insertions, 266 deletions
diff --git a/conf/mime_types.inc b/conf/mime_types.inc index e63dff09a..7c480d0fe 100644 --- a/conf/mime_types.inc +++ b/conf/mime_types.inc @@ -203,8 +203,8 @@ application/pidf+xml 0 application/pidf-diff+xml 0 application/pkcs10 0 application/pkcs12 0 -application/pkcs7-mime 0 -application/pkcs7-signature 0 +application/pkcs7-mime -1 +application/pkcs7-signature -1 application/pkcs8 0 application/pkix-attr-cert 0 application/pkix-cert 0 diff --git a/contrib/librdns/resolver.c b/contrib/librdns/resolver.c index 3abb17304..b9b156c5e 100644 --- a/contrib/librdns/resolver.c +++ b/contrib/librdns/resolver.c @@ -994,6 +994,8 @@ void rdns_resolver_set_fake_reply (struct rdns_resolver *resolver, abort (); } + fake_rep->rcode = rcode; + memcpy (&fake_rep->key, srch, sizeof (*srch) + len); if (reply) { diff --git a/lualib/lua_clickhouse.lua b/lualib/lua_clickhouse.lua index 3f3c4de40..01d1f0b2b 100644 --- a/lualib/lua_clickhouse.lua +++ b/lualib/lua_clickhouse.lua @@ -76,7 +76,6 @@ local function parse_clickhouse_response(params, data) local lua_util = require "lua_util" local ucl = require "ucl" - rspamd_logger.debugm(N, params.log_obj, "got clickhouse response: %s", data) if data == nil then -- clickhouse returned no data (i.e. empty result set): exiting return {} @@ -113,27 +112,35 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb) if code ~= 200 or err_message then if not err_message then err_message = data end local ip_addr = upstream:get_addr():to_string(true) - rspamd_logger.errx(params.log_obj, - "request failed on clickhouse server %s: %s", - ip_addr, err_message) if fail_cb then fail_cb(params, err_message, data) + else + rspamd_logger.errx(params.log_obj, + "request failed on clickhouse server %s: %s", + ip_addr, err_message) end upstream:fail() else upstream:ok() - rspamd_logger.debugm(N, params.log_obj, - "http_cb ok: %s, %s, %s, %s", err_message, code, data, _) local rows = parse_clickhouse_response(params, data) if rows then if ok_cb then ok_cb(params, rows) + else + rspamd_logger.debugm(N, params.log_obj, + "http_select_cb ok: %s, %s, %s, %s", err_message, code, + data:gsub('[\n%s]+', ' '), _) end else if fail_cb then fail_cb(params, 'failed to parse reply', data) + else + local ip_addr = upstream:get_addr():to_string(true) + rspamd_logger.errx(params.log_obj, + "request failed on clickhouse server %s: %s", + ip_addr, 'failed to parse reply') end end end @@ -148,21 +155,24 @@ local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb) if code ~= 200 or err_message then if not err_message then err_message = data end local ip_addr = upstream:get_addr():to_string(true) - rspamd_logger.errx(params.log_obj, - "request failed on clickhouse server %s: %s", - ip_addr, err_message) if fail_cb then fail_cb(params, err_message, data) + else + rspamd_logger.errx(params.log_obj, + "request failed on clickhouse server %s: %s", + ip_addr, err_message) end upstream:fail() else upstream:ok() - rspamd_logger.debugm(N, params.log_obj, - "http_cb ok: %s, %s, %s, %s", err_message, code, data, _) if ok_cb then ok_cb(params, data) + else + rspamd_logger.debugm(N, params.log_obj, + "http_insert_cb ok: %s, %s, %s, %s", err_message, code, + data:gsub('[\n%s]+', ' '), _) end end end @@ -204,7 +214,7 @@ exports.select = function (upstream, settings, params, query, ok_cb, fail_cb) http_params.body = query http_params.log_obj = params.task or params.config - rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", params.body) + rspamd_logger.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body) if not http_params.url then local connect_prefix = "http://" @@ -306,6 +316,7 @@ exports.generic = function (upstream, settings, params, query, http_params.user = settings.user http_params.password = settings.password http_params.log_obj = params.task or params.config + http_params.body = query if not http_params.url then local connect_prefix = "http://" diff --git a/src/libutil/ssl_util.c b/src/libutil/ssl_util.c index a90bd5e36..1eab5821f 100644 --- a/src/libutil/ssl_util.c +++ b/src/libutil/ssl_util.c @@ -746,6 +746,18 @@ void rspamd_ssl_connection_free (struct rspamd_ssl_connection *conn) { if (conn) { + /* + * SSL_RECEIVED_SHUTDOWN tells SSL_shutdown to act as if we had already + * received a close notify from the other end. SSL_shutdown will then + * send the final close notify in reply. The other end will receive the + * close notify and send theirs. By this time, we will have already + * closed the socket and the other end's real close notify will never be + * received. In effect, both sides will think that they have completed a + * clean shutdown and keep their sessions valid. This strategy will fail + * if the socket is not ready for writing, in which case this hack will + * lead to an unclean shutdown and lost session on the other end. + */ + SSL_set_shutdown (conn->ssl, SSL_RECEIVED_SHUTDOWN); SSL_shutdown (conn->ssl); SSL_free (conn->ssl); diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index 444465814..c2375d765 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -15,7 +15,6 @@ limitations under the License. ]]-- local rspamd_logger = require 'rspamd_logger' -local rspamd_http = require "rspamd_http" local rspamd_lua_utils = require "lua_util" local upstream_list = require "rspamd_upstream_list" local lua_util = require "lua_util" @@ -28,16 +27,10 @@ if confighelp then return end -local main_rows = {} -local attachment_rows = {} -local urls_rows = {} -local emails_rows = {} ---local specific_rows = {} -local asn_rows = {} -local symbols_rows = {} +local data_rows = {} local custom_rows = {} local nrows = 0 -local connect_prefix = 'http://' +local schema_version = 2 -- Current schema version local settings = { limit = 1000, @@ -52,12 +45,6 @@ local settings = { dmarc_allow_symbols = {'DMARC_POLICY_ALLOW'}, dmarc_reject_symbols = {'DMARC_POLICY_REJECT', 'DMARC_POLICY_QUARANTINE'}, stop_symbols = {}, - table = 'rspamd', - attachments_table = 'rspamd_attachments', - urls_table = 'rspamd_urls', - emails_table = 'rspamd_emails', - symbols_table = 'rspamd_symbols', - asn_table = 'rspamd_asn', ipmask = 19, ipmask6 = 48, full_urls = false, @@ -78,9 +65,9 @@ local settings = { } } -local clickhouse_schema = { -table = [[ -CREATE TABLE IF NOT EXISTS ${table} +--- @language SQL +local clickhouse_schema = {[[ +CREATE TABLE rspamd ( Date Date, TS DateTime, @@ -103,60 +90,52 @@ CREATE TABLE IF NOT EXISTS ${table} RcptUser String, RcptDomain String, ListId String, - Digest FixedString(32) -) ENGINE = MergeTree(Date, (TS, From), 8192) -]], - -attachments_table = [[ -CREATE TABLE IF NOT EXISTS ${attachments_table} ( - Date Date, - Digest FixedString(32), `Attachments.FileName` Array(String), `Attachments.ContentType` Array(String), `Attachments.Length` Array(UInt32), - `Attachments.Digest` Array(FixedString(16)) -) ENGINE = MergeTree(Date, Digest, 8192) -]], - -urls_table = [[ -CREATE TABLE IF NOT EXISTS ${urls_table} ( - Date Date, - Digest FixedString(32), + `Attachments.Digest` Array(FixedString(16)), `Urls.Tld` Array(String), - `Urls.Url` Array(String) -) ENGINE = MergeTree(Date, Digest, 8192) -]], - -emails_table = [[ -CREATE TABLE IF NOT EXISTS ${emails_table} ( - Date Date, - Digest FixedString(32), - Emails Array(String) -) ENGINE = MergeTree(Date, Digest, 8192) -]], - -asn_table = [[ -CREATE TABLE IF NOT EXISTS ${asn_table} ( - Date Date, - Digest FixedString(32), + `Urls.Url` Array(String), + Emails Array(String), ASN String, Country FixedString(2), - IPNet String -) ENGINE = MergeTree(Date, Digest, 8192) -]], - -symbols_table = [[ -CREATE TABLE IF NOT EXISTS ${symbols_table} ( - Date Date, - Digest FixedString(32), + IPNet String, `Symbols.Names` Array(String), `Symbols.Scores` Array(Float64), - `Symbols.Options` Array(String) -) ENGINE = MergeTree(Date, Digest, 8192) -]] + `Symbols.Options` Array(String), + Digest FixedString(32) +) ENGINE = MergeTree(Date, (TS, From), 8192) +]], +[[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]], +[[INSERT INTO rspamd_version (Version) Values (2)]], } -local function clickhouse_main_row(tname) +-- This describes SQL queries to migrate between versions +local migrations = { + [1] = { + -- Move to a wide fat table + [[ALTER TABLE rspamd + ADD COLUMN `Attachments.FileName` Array(String) AFTER ListId, + ADD COLUMN `Attachments.ContentType` Array(String) AFTER `Attachments.FileName`, + ADD COLUMN `Attachments.Length` Array(UInt32) AFTER `Attachments.ContentType`, + ADD COLUMN `Attachments.Digest` Array(FixedString(16)) AFTER `Attachments.Length`, + ADD COLUMN `Urls.Tld` Array(String) AFTER `Attachments.Digest`, + ADD COLUMN `Urls.Url` Array(String) AFTER `Urls.Tld`, + ADD COLUMN Emails Array(String) AFTER `Urls.Url`, + ADD COLUMN ASN String AFTER Emails, + ADD COLUMN Country FixedString(2) AFTER ASN, + ADD COLUMN IPNet String AFTER Country, + ADD COLUMN `Symbols.Names` Array(String) AFTER IPNet, + ADD COLUMN `Symbols.Scores` Array(Float64) AFTER `Symbols.Names`, + ADD COLUMN `Symbols.Options` Array(String) AFTER `Symbols.Scores`]], + -- Add explicit version + [[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]], + [[INSERT INTO rspamd_version (Version) Values (2)]], + } +} + + +local function clickhouse_main_row(res) local fields = { 'Date', 'TS', @@ -181,73 +160,52 @@ local function clickhouse_main_row(tname) 'ListId', 'Digest' } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_attachments_row(tname) - local attachement_fields = { - 'Date', - 'Digest', +local function clickhouse_attachments_row(res) + local fields = { 'Attachments.FileName', 'Attachments.ContentType', 'Attachments.Length', 'Attachments.Digest', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(attachement_fields, ',')) - return elt + + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_urls_row(tname) - local urls_fields = { - 'Date', - 'Digest', +local function clickhouse_urls_row(res) + local fields = { 'Urls.Tld', 'Urls.Url', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(urls_fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_emails_row(tname) - local emails_fields = { - 'Date', - 'Digest', +local function clickhouse_emails_row(res) + local fields = { 'Emails', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(emails_fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_symbols_row(tname) - local symbols_fields = { - 'Date', - 'Digest', +local function clickhouse_symbols_row(res) + local fields = { 'Symbols.Names', 'Symbols.Scores', 'Symbols.Options', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(symbols_fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_asn_row(tname) - local asn_fields = { - 'Date', - 'Digest', +local function clickhouse_asn_row(res) + local fields = { 'ASN', 'Country', 'IPNet', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(asn_fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end local function today(ts) @@ -304,34 +262,19 @@ local function clickhouse_send_data(task) end end - send_data('generic data', main_rows, - clickhouse_main_row(settings['table'])) - + local fields = {} + clickhouse_main_row(fields) + clickhouse_attachments_row(fields) + clickhouse_urls_row(fields) + clickhouse_emails_row(fields) + clickhouse_asn_row(fields) - if #attachment_rows > 1 then - send_data('attachments data', attachment_rows, - clickhouse_attachments_row(settings.attachments_table)) + if settings.enable_symbols then + clickhouse_symbols_row(fields) end - if #urls_rows > 1 then - send_data('urls data', urls_rows, - clickhouse_urls_row(settings.urls_table)) - end - - if #emails_rows > 1 then - send_data('emails data', emails_rows, - clickhouse_emails_row(settings.emails_table)) - end - - if #asn_rows > 1 then - send_data('asn data', asn_rows, - clickhouse_asn_row(settings.asn_table)) - end - - if #symbols_rows > 1 then - send_data('symbols data', symbols_rows, - clickhouse_symbols_row(settings.symbols_table)) - end + send_data('generic data', data_rows, + string.format('INSERT INTO rspamd (%s)', table.concat(fields, ','))) for k,crows in pairs(custom_rows) do if #crows > 1 then @@ -490,7 +433,7 @@ local function clickhouse_collect(task) local action = task:get_metric_action('default') local digest = task:get_digest() - table.insert(main_rows, { + local row = { today(timestamp), timestamp, from_domain, @@ -512,25 +455,8 @@ local function clickhouse_collect(task) rcpt_user, rcpt_domain, list_id, - task:get_digest() - }) - ---[[ TODO: has been broken - if settings['from_map'] and dkim == 'allow' then - -- Use dkim - local das = task:get_symbol(settings['dkim_allow_symbols'][1]) - if ((das or E)[1] or E).options then - for _,dkim_domain in ipairs(das[1]['options']) do - local specific = settings.from_map:get_key(dkim_domain) - if specific then - specific_rows[specific] = {} - table.insert(specific_rows[specific], elt) - end - end - end - - end ---]] + digest + } -- Attachments step local attachments_fnames = {} @@ -551,14 +477,15 @@ local function clickhouse_collect(task) end if #attachments_fnames > 0 then - table.insert(attachment_rows, { - today(timestamp), - digest, - attachments_fnames, - attachments_ctypes, - attachments_lengths, - attachments_digests, - }) + table.insert(row, attachments_fnames) + table.insert(row, attachments_ctypes) + table.insert(row, attachments_lengths) + table.insert(row, attachments_digests) + else + table.insert(row, {}) + table.insert(row, {}) + table.insert(row, {}) + table.insert(row, {}) end -- Urls step @@ -576,58 +503,43 @@ local function clickhouse_collect(task) end if #urls_tlds > 0 then - table.insert(urls_rows, { - today(timestamp), - digest, - urls_tlds, - urls_urls - }) + table.insert(row, urls_tlds) + table.insert(row, urls_urls) + else + table.insert(row, {}) + table.insert(row, {}) end -- Emails step - local emails = {} if task:has_urls(true) then - for _,u in ipairs(task:get_emails()) do - table.insert(emails, - string.format('%s@%s', u:get_user(), u:get_host())) - end - end - - if #emails > 0 then - table.insert(emails_rows, { - today(timestamp), - digest, - emails, - }) + table.insert(row, fun.totable(fun.map(function(u) + return string.format('%s@%s', u:get_user(), u:get_host()) + end, task:get_emails()))) + else + table.insert(row, {}) end -- ASN information - if settings['asn_table'] then - local asn, country, ipnet = '--', '--', '--' - local pool = task:get_mempool() - ret = pool:get_variable("asn") - if ret then - asn = ret - end - ret = pool:get_variable("country") - if ret then - country = ret:sub(1, 2) - end - ret = pool:get_variable("ipnet") - if ret then - ipnet = ret - end - table.insert(asn_rows, { - today(timestamp), - digest, - asn, - country, - ipnet - }) + local asn, country, ipnet = '--', '--', '--' + local pool = task:get_mempool() + ret = pool:get_variable("asn") + if ret then + asn = ret + end + ret = pool:get_variable("country") + if ret then + country = ret:sub(1, 2) + end + ret = pool:get_variable("ipnet") + if ret then + ipnet = ret end + table.insert(row, asn) + table.insert(row, country) + table.insert(row, ipnet) -- Symbols info - if settings.enable_symbols and settings['symbols_table'] then + if settings.enable_symbols then local symbols = task:get_symbols_all() local syms_tab = {} local scores_tab = {} @@ -643,14 +555,9 @@ local function clickhouse_collect(task) table.insert(options_tab, ''); end end - - table.insert(symbols_rows, { - today(timestamp), - digest, - syms_tab, - scores_tab, - options_tab - }) + table.insert(row, syms_tab) + table.insert(row, scores_tab) + table.insert(row, options_tab) end -- Custom data @@ -660,17 +567,13 @@ local function clickhouse_collect(task) end nrows = nrows + 1 + table.insert(data_rows, row) rspamd_logger.debugm(N, task, "add clickhouse row %s / %s", nrows, settings.limit) if nrows > settings['limit'] then clickhouse_send_data(task) nrows = 0 - main_rows = {} - attachment_rows = {} - urls_rows = {} - emails_rows = {} - asn_rows = {} - symbols_rows = {} + data_rows = {} custom_rows = {} end end @@ -803,6 +706,163 @@ local function clickhouse_remove_old_partitions(cfg, ev_base) return settings.retention.period end +local function upload_clickhouse_schema(upstream, ev_base, cfg) + local ch_params = { + ev_base = ev_base, + config = cfg, + } + -- Apply schema sequentially + local function sql_recursor(i) + if clickhouse_schema[i] then + local sql = clickhouse_schema[i] + local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql, + function(_, _) + rspamd_logger.infox(rspamd_config, 'uploaded clickhouse schema element %s to %s', + i, upstream:get_addr():to_string(true)) + sql_recursor(i + 1) + end, + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot upload schema '%s' on clickhouse server %s: %s", + sql, upstream:get_addr():to_string(true), err) + end) + if not ret then + rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: cannot make request", + sql, upstream:get_addr():to_string(true)) + end + end + end + + sql_recursor(1) +end + +local function maybe_apply_migrations(upstream, ev_base, cfg, version) + local ch_params = { + ev_base = ev_base, + config = cfg, + } + -- Apply migrations sequentially + local function migration_recursor(i) + if i < schema_version then + if migrations[i] then + -- We also need to apply statements sequentially + local function sql_recursor(j) + if migrations[i][j] then + local sql = migrations[i][j] + local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql, + function(_, _) + rspamd_logger.infox(rspamd_config, + 'applied migration to version %s from version %s: %s', + i + 1, version, sql:gsub('[\n%s]+', ' ')) + if j == #migrations[i] then + -- Go to the next migration + migration_recursor(i + 1) + else + -- Apply the next statement + sql_recursor(j + 1) + end + end , + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot apply migration %s: '%s' on clickhouse server %s: %s", + i, sql, upstream:get_addr():to_string(true), err) + end) + if not ret then + rspamd_logger.errx(rspamd_config, + "cannot apply migration %s: '%s' on clickhouse server %s: cannot make request", + i, sql, upstream:get_addr():to_string(true)) + end + end + end + + sql_recursor(1) + else + -- Try another migration + migration_recursor(i + 1) + end + end + end + + migration_recursor(version) +end + +local function check_rspamd_table(upstream, ev_base, cfg) + local ch_params = { + ev_base = ev_base, + config = cfg, + } + local sql = [[EXISTS TABLE rspamd]] + local ret = lua_clickhouse.select(upstream, settings, ch_params, sql, + function(_, rows) + if rows[1] and rows[1].result then + if tonumber(rows[1].result) == 1 then + -- Apply migration + rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration') + maybe_apply_migrations(upstream, ev_base, cfg, 1) + else + -- Upload schema + rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema') + upload_clickhouse_schema(upstream, ev_base, cfg) + end + else + rspamd_logger.errx(rspamd_config, + "unexpected reply on EXISTS command from server %s: %s", + upstream:get_addr():to_string(true), rows) + end + end , + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot check if rspamd table exists on clickhouse server %s: %s", + upstream:get_addr():to_string(true), err) + end) + if not ret then + rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: cannot make request", + upstream:get_addr():to_string(true)) + end +end + + +local function check_clickhouse_upstream(upstream, ev_base, cfg) + local ch_params = { + ev_base = ev_base, + config = cfg, + } + -- If we have some custom rules, we just send its schema to the upstream + for k,rule in pairs(settings.custom_rules) do + if rule.schema then + local sql = rspamd_lua_utils.template(rule.schema, settings) + local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql, + nil, + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot send custom schema %s to clickhouse server %s: %s", + k, upstream:get_addr():to_string(true), err) + end) + if not ret then + rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request", + k, upstream:get_addr():to_string(true)) + end + end + end + + -- Now check the main schema and apply migrations if needed + local sql = [[SELECT MAX(Version) as v FROM rspamd_version]] + local ret = lua_clickhouse.select(upstream, settings, ch_params, sql, + function(_, rows) + local version = tonumber(rows[1].v) + maybe_apply_migrations(upstream, ev_base, cfg, version) + end, + function(_, err) + -- It might be either no rspamd table or version 1 + rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table') + check_rspamd_table(upstream, ev_base, cfg) + end) + if not ret then + rspamd_logger.errx(rspamd_config, "cannot get version on clickhouse server %s: cannot make request", + upstream:get_addr():to_string(true)) + end +end + local opts = rspamd_config:get_all_opt('clickhouse') if opts then for k,v in pairs(opts) do @@ -856,9 +916,6 @@ if opts then else settings['from_map'] = rspamd_map_add('clickhouse', 'from_tables', 'regexp', 'clickhouse specific domains') - if settings.use_https then - connect_prefix = 'https://' - end settings.upstream = upstream_list.create(rspamd_config, settings['server'] or settings['servers'], 8123) @@ -888,46 +945,7 @@ if opts then local upstreams = settings.upstream:all_upstreams() for _,up in ipairs(upstreams) do - local ip_addr = up:get_addr():to_string(true) - - local function send_req(elt, sql) - local function http_cb(err_message, code, data, _) - if code ~= 200 or err_message then - if not err_message then err_message = data end - rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: %s", - elt, ip_addr, err_message) - up:fail() - else - up:ok() - end - end - - if not rspamd_http.request({ - ev_base = ev_base, - config = cfg, - url = connect_prefix .. ip_addr, - body = sql, - callback = http_cb, - mime_type = 'text/plain', - timeout = settings['timeout'], - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - }) then - rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request", - elt, ip_addr) - end - end - - for tab,sql in pairs(clickhouse_schema) do - send_req(tab, rspamd_lua_utils.template(sql, settings)) - end - - for k,rule in pairs(settings.custom_rules) do - if rule.schema then - send_req(k, rspamd_lua_utils.template(rule.schema, settings)) - end - end + check_clickhouse_upstream(up, ev_base, cfg) end if settings.retention.enable and settings.retention.method ~= 'drop' and settings.retention.method ~= 'detach' then diff --git a/test/functional/configs/plugins.conf b/test/functional/configs/plugins.conf index 65141d5c0..8de4c3c02 100644 --- a/test/functional/configs/plugins.conf +++ b/test/functional/configs/plugins.conf @@ -21,6 +21,236 @@ options = { name = "dkim._domainkey.invalid.za.org", type = "txt"; replies = ["v=DKIM1; k=rsa; p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDEEXmNGQq7PUrr9Mg4UakTFHgXBCy2DOztkrZm+0OrVWtiRzGluxBkbOWTBwuU3/Yw97yTphBMQxzWFN603/f/KPAQcF/Lc1l+6kmIBBxNXjjGuOK/3PYKZVntUdKmqcQBYfnHdzH2Tohbuyx1a7xqnv6VSChqQrZU4CwkeT3+eQIDAQAB"]; + }, + { + name = "_dmarc.cacophony.za.org", + type = "txt"; + replies = ["v=DMARC1; p=none; sp=reject"]; + }, + { + name = "_dmarc.my.mom.za.org", + type = "txt"; + replies = ["v=DMARC1; p=reject"]; + }, + { + name = "example.net", + type = "txt"; + replies = ["v=spf1 -all"]; + }, + { + name = "fail4.org.org.za", + type = "txt"; + replies = ["v=spf1 redirect=asdfsfewewrredfs"]; + }, + { + name = "_dmarc.reject.cacophony.za.org", + type = "txt"; + replies = ["v=DMARC1; p=reject"]; + }, + { + name = "spf.cacophony.za.org", + type = "txt"; + replies = ["v=spf1 ip4:8.8.4.4 -all"]; + }, + { + name = "fail7.org.org.za", + type = "a"; + rcode = 'norec'; + }, + { + name = "fail6.org.org.za", + type = "txt"; + replies = ["v=spf1 ip4:8.8.8.8 mx -all"]; + }, + { + name = "fail6.org.org.za", + type = "mx"; + rcode = 'norec'; + }, + { + name = "fail7.org.org.za", + type = "aaaa"; + rcode = 'norec'; + }, + { + name = "_dmarc.quarantine.cacophony.za.org", + type = "txt"; + replies = ["v=DMARC1; p=quarantine"]; + }, + { + name = "_dmarc.yo.mom.za.org", + type = "txt"; + replies = ["v=DMARC1; p=reject; aspf=s; adkim=s;"]; + }, + { + name = "yo.mom.za.org", + type = "txt"; + replies = ["v=spf1 ip4:37.48.67.26 -all"]; + }, + { + name = "testdkim._domainkey.mom.za.org", + type = "txt"; + replies = ["v=DKIM1; k=rsa; p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC3v4VPE1QMHUzsMRbC8VzXNq82mDjiv9Gi1NB/YYC+vIYZT+sE/Uxnr0Clk8C2jgzEr3jcxgQEWZfMtEEg/EfEJvh4SrXWv9c0gw1EEfxKxX9i+r8yBQtc/EWospWVDkhF2lAvQAK1lV1ZiU7psJ6fh1CI39uZyWdAktZzWLf0zQIDAQAB"]; + }, + { + name = "_dmarc.rspamd.tk", + type = "txt"; + replies = ["bio=a263adeab8acdcdb8b89e127b67d696061fdfbee"]; + }, + { + name = "fail2.org.org.za", + type = "txt"; + replies = ["v=spf1 ip4:8.8.4.4 include:www.dnssec-failed.org -all"]; + }, + { + name = "fail3.org.org.za", + type = "txt"; + replies = ["v=spf1 ip4:8.8.8.8 include:total.barf -all"]; + }, + { + name = "mom.za.org", + type = "txt"; + replies = ["v=spf1 ip4:37.48.67.26 -all"]; + }, + { + name = "testdkim._domainkey.asdf.rspamd.tk", # testdkim._domainkey.asdf.rspamd.tk is an alias for rspamd.tk + type = "txt"; + replies = ["bio=a263adeab8acdcdb8b89e127b67d696061fdfbee"]; + }, + { + name = "testdkim._domainkey.rspamd.tk", # testdkim._domainkey.rspamd.tk is an alias for rspamd.tk + type = "txt"; + replies = ["bio=a263adeab8acdcdb8b89e127b67d696061fdfbee"]; + }, + { + name = "pass1.org.org.za", + type = "txt"; + replies = ["v=spf1 include:pass2.org.org.za -all"]; + }, + { + name = "95.142.99.88.in-addr.arpa", + type = "ptr"; + replies = ["mail.highsecure.ru"]; + }, + { + name = "mail.highsecure.ru", + type = "a"; + replies = ["88.99.142.95"]; + }, + { + name = "mail.highsecure.ru", + type = "aaaa"; + rcode = 'norec'; + }, + { + name = "1.0.66.128.in-addr.arpa", + type = "ptr"; + rcode = 'nxdomain'; + }, + { + name = "182.216.85.209.in-addr.arpa", + type = "ptr"; + replies = ["mail-qt0-f182.google.com"]; + }, + { + name = "crazyspf.cacophony.za.org", + type = "txt"; + replies = ["v=spf1 ptr:cacophony.za.org ptr:rspamd.com ptr:yahoo.com ptr:yahoo.net ptr:highsecure.ru -all"]; + }, + { + name = "pass2.org.org.za", + type = "txt"; + replies = ["v=spf1 ip4:8.8.8.8 -all"]; + }, + { + name = "_dmarc.yoni.za.org", + type = "txt"; + replies = ["v=DMARC1; p=reject; sp=none;"]; + }, + { + name = "fail10.org.org.za", + type = "txt"; + replies = ["v=spf1 redirect=fail5.org.org.za"]; + }, + { + name = "fail5.org.org.za", + type = "txt"; + replies = ["v=spf1 OMGBARF"]; + }, + { + name = "fail7.org.org.za", + type = "txt"; + replies = ["v=spf1 ip4:8.8.8.8 a -all"]; + }, + { + name = "co.za", + type = "txt"; + rcode = 'norec'; + }, + { + name = "testdkim1._domainkey.yoni.za.org", + type = "txt"; + replies = ["v=DKIM1; k=rsa; p=BARF"]; + }, + { + name = "_dmarc.yoni.za.net", + type = "txt"; + replies = ["v=DMARC1; p=none; sp=quarantine"]; + }, + { + name = "za", + type = "txt"; + replies = ["Top-level domain for South Africa"]; + }, + { + name = "_dmarc.foo.yoni.za.org", + type = "txt"; + rcode = 'nxdomain'; + }, + { + name = "_dmarc.foo.cacophony.za.org", + type = "txt"; + rcode = 'nxdomain'; + }, + { + name = "_dmarc.foo.yoni.za.net", + type = "txt"; + rcode = 'nxdomain'; + }, + { + name = "_dmarc.dnssec-failed.org", + type = "txt"; + rcode = 'timeout'; + }, + { + name = "_dmarc.example.com", + type = "txt"; + rcode = 'nxdomain'; + }, + { + name = "testdkim1._domainkey.dnssec-failed.org", + type = "txt"; + rcode = 'timeout'; + }, + { + name = "total.barf", + type = "txt"; + rcode = 'nxdomain'; + }, + { + name = "_dmarc.foo.cacophony.za.org", + type = "txt"; + rcode = 'nxdomain'; + }, + { + name = "zzzzaaaa", + type = "txt"; + rcode = 'nxdomain'; + }, + { + name = "asdfsfewewrredfs", + type = "txt"; + rcode = 'nxdomain'; }]; } } |