diff options
author | Mikhail Galanin <mgalanin@mimecast.com> | 2018-08-08 16:05:11 +0100 |
---|---|---|
committer | Mikhail Galanin <mgalanin@mimecast.com> | 2018-08-08 16:05:11 +0100 |
commit | 99ebeb7c1d3e9f32d46664ea2e91fa731d430e31 (patch) | |
tree | 0b976ba58af3ef0a8469b74ca01721560c5a4616 /src | |
parent | 2c501018c04ca0a972c600b1324be21545e9f133 (diff) | |
parent | a756a6ef6a58c3062def2a8a0795498ab8568f94 (diff) | |
download | rspamd-99ebeb7c1d3e9f32d46664ea2e91fa731d430e31.tar.gz rspamd-99ebeb7c1d3e9f32d46664ea2e91fa731d430e31.zip |
Merge branch 'master' into lua-coroutine-model
Diffstat (limited to 'src')
-rw-r--r-- | src/libutil/ssl_util.c | 12 | ||||
-rw-r--r-- | src/plugins/lua/clickhouse.lua | 522 |
2 files changed, 282 insertions, 252 deletions
diff --git a/src/libutil/ssl_util.c b/src/libutil/ssl_util.c index a90bd5e36..1eab5821f 100644 --- a/src/libutil/ssl_util.c +++ b/src/libutil/ssl_util.c @@ -746,6 +746,18 @@ void rspamd_ssl_connection_free (struct rspamd_ssl_connection *conn) { if (conn) { + /* + * SSL_RECEIVED_SHUTDOWN tells SSL_shutdown to act as if we had already + * received a close notify from the other end. SSL_shutdown will then + * send the final close notify in reply. The other end will receive the + * close notify and send theirs. By this time, we will have already + * closed the socket and the other end's real close notify will never be + * received. In effect, both sides will think that they have completed a + * clean shutdown and keep their sessions valid. This strategy will fail + * if the socket is not ready for writing, in which case this hack will + * lead to an unclean shutdown and lost session on the other end. + */ + SSL_set_shutdown (conn->ssl, SSL_RECEIVED_SHUTDOWN); SSL_shutdown (conn->ssl); SSL_free (conn->ssl); diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index 444465814..c2375d765 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -15,7 +15,6 @@ limitations under the License. ]]-- local rspamd_logger = require 'rspamd_logger' -local rspamd_http = require "rspamd_http" local rspamd_lua_utils = require "lua_util" local upstream_list = require "rspamd_upstream_list" local lua_util = require "lua_util" @@ -28,16 +27,10 @@ if confighelp then return end -local main_rows = {} -local attachment_rows = {} -local urls_rows = {} -local emails_rows = {} ---local specific_rows = {} -local asn_rows = {} -local symbols_rows = {} +local data_rows = {} local custom_rows = {} local nrows = 0 -local connect_prefix = 'http://' +local schema_version = 2 -- Current schema version local settings = { limit = 1000, @@ -52,12 +45,6 @@ local settings = { dmarc_allow_symbols = {'DMARC_POLICY_ALLOW'}, dmarc_reject_symbols = {'DMARC_POLICY_REJECT', 'DMARC_POLICY_QUARANTINE'}, stop_symbols = {}, - table = 'rspamd', - attachments_table = 'rspamd_attachments', - urls_table = 'rspamd_urls', - emails_table = 'rspamd_emails', - symbols_table = 'rspamd_symbols', - asn_table = 'rspamd_asn', ipmask = 19, ipmask6 = 48, full_urls = false, @@ -78,9 +65,9 @@ local settings = { } } -local clickhouse_schema = { -table = [[ -CREATE TABLE IF NOT EXISTS ${table} +--- @language SQL +local clickhouse_schema = {[[ +CREATE TABLE rspamd ( Date Date, TS DateTime, @@ -103,60 +90,52 @@ CREATE TABLE IF NOT EXISTS ${table} RcptUser String, RcptDomain String, ListId String, - Digest FixedString(32) -) ENGINE = MergeTree(Date, (TS, From), 8192) -]], - -attachments_table = [[ -CREATE TABLE IF NOT EXISTS ${attachments_table} ( - Date Date, - Digest FixedString(32), `Attachments.FileName` Array(String), `Attachments.ContentType` Array(String), `Attachments.Length` Array(UInt32), - `Attachments.Digest` Array(FixedString(16)) -) ENGINE = MergeTree(Date, Digest, 8192) -]], - -urls_table = [[ -CREATE TABLE IF NOT EXISTS ${urls_table} ( - Date Date, - Digest FixedString(32), + `Attachments.Digest` Array(FixedString(16)), `Urls.Tld` Array(String), - `Urls.Url` Array(String) -) ENGINE = MergeTree(Date, Digest, 8192) -]], - -emails_table = [[ -CREATE TABLE IF NOT EXISTS ${emails_table} ( - Date Date, - Digest FixedString(32), - Emails Array(String) -) ENGINE = MergeTree(Date, Digest, 8192) -]], - -asn_table = [[ -CREATE TABLE IF NOT EXISTS ${asn_table} ( - Date Date, - Digest FixedString(32), + `Urls.Url` Array(String), + Emails Array(String), ASN String, Country FixedString(2), - IPNet String -) ENGINE = MergeTree(Date, Digest, 8192) -]], - -symbols_table = [[ -CREATE TABLE IF NOT EXISTS ${symbols_table} ( - Date Date, - Digest FixedString(32), + IPNet String, `Symbols.Names` Array(String), `Symbols.Scores` Array(Float64), - `Symbols.Options` Array(String) -) ENGINE = MergeTree(Date, Digest, 8192) -]] + `Symbols.Options` Array(String), + Digest FixedString(32) +) ENGINE = MergeTree(Date, (TS, From), 8192) +]], +[[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]], +[[INSERT INTO rspamd_version (Version) Values (2)]], } -local function clickhouse_main_row(tname) +-- This describes SQL queries to migrate between versions +local migrations = { + [1] = { + -- Move to a wide fat table + [[ALTER TABLE rspamd + ADD COLUMN `Attachments.FileName` Array(String) AFTER ListId, + ADD COLUMN `Attachments.ContentType` Array(String) AFTER `Attachments.FileName`, + ADD COLUMN `Attachments.Length` Array(UInt32) AFTER `Attachments.ContentType`, + ADD COLUMN `Attachments.Digest` Array(FixedString(16)) AFTER `Attachments.Length`, + ADD COLUMN `Urls.Tld` Array(String) AFTER `Attachments.Digest`, + ADD COLUMN `Urls.Url` Array(String) AFTER `Urls.Tld`, + ADD COLUMN Emails Array(String) AFTER `Urls.Url`, + ADD COLUMN ASN String AFTER Emails, + ADD COLUMN Country FixedString(2) AFTER ASN, + ADD COLUMN IPNet String AFTER Country, + ADD COLUMN `Symbols.Names` Array(String) AFTER IPNet, + ADD COLUMN `Symbols.Scores` Array(Float64) AFTER `Symbols.Names`, + ADD COLUMN `Symbols.Options` Array(String) AFTER `Symbols.Scores`]], + -- Add explicit version + [[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]], + [[INSERT INTO rspamd_version (Version) Values (2)]], + } +} + + +local function clickhouse_main_row(res) local fields = { 'Date', 'TS', @@ -181,73 +160,52 @@ local function clickhouse_main_row(tname) 'ListId', 'Digest' } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_attachments_row(tname) - local attachement_fields = { - 'Date', - 'Digest', +local function clickhouse_attachments_row(res) + local fields = { 'Attachments.FileName', 'Attachments.ContentType', 'Attachments.Length', 'Attachments.Digest', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(attachement_fields, ',')) - return elt + + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_urls_row(tname) - local urls_fields = { - 'Date', - 'Digest', +local function clickhouse_urls_row(res) + local fields = { 'Urls.Tld', 'Urls.Url', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(urls_fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_emails_row(tname) - local emails_fields = { - 'Date', - 'Digest', +local function clickhouse_emails_row(res) + local fields = { 'Emails', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(emails_fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_symbols_row(tname) - local symbols_fields = { - 'Date', - 'Digest', +local function clickhouse_symbols_row(res) + local fields = { 'Symbols.Names', 'Symbols.Scores', 'Symbols.Options', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(symbols_fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end -local function clickhouse_asn_row(tname) - local asn_fields = { - 'Date', - 'Digest', +local function clickhouse_asn_row(res) + local fields = { 'ASN', 'Country', 'IPNet', } - local elt = string.format('INSERT INTO %s (%s) ', - tname, table.concat(asn_fields, ',')) - return elt + for _,v in ipairs(fields) do table.insert(res, v) end end local function today(ts) @@ -304,34 +262,19 @@ local function clickhouse_send_data(task) end end - send_data('generic data', main_rows, - clickhouse_main_row(settings['table'])) - + local fields = {} + clickhouse_main_row(fields) + clickhouse_attachments_row(fields) + clickhouse_urls_row(fields) + clickhouse_emails_row(fields) + clickhouse_asn_row(fields) - if #attachment_rows > 1 then - send_data('attachments data', attachment_rows, - clickhouse_attachments_row(settings.attachments_table)) + if settings.enable_symbols then + clickhouse_symbols_row(fields) end - if #urls_rows > 1 then - send_data('urls data', urls_rows, - clickhouse_urls_row(settings.urls_table)) - end - - if #emails_rows > 1 then - send_data('emails data', emails_rows, - clickhouse_emails_row(settings.emails_table)) - end - - if #asn_rows > 1 then - send_data('asn data', asn_rows, - clickhouse_asn_row(settings.asn_table)) - end - - if #symbols_rows > 1 then - send_data('symbols data', symbols_rows, - clickhouse_symbols_row(settings.symbols_table)) - end + send_data('generic data', data_rows, + string.format('INSERT INTO rspamd (%s)', table.concat(fields, ','))) for k,crows in pairs(custom_rows) do if #crows > 1 then @@ -490,7 +433,7 @@ local function clickhouse_collect(task) local action = task:get_metric_action('default') local digest = task:get_digest() - table.insert(main_rows, { + local row = { today(timestamp), timestamp, from_domain, @@ -512,25 +455,8 @@ local function clickhouse_collect(task) rcpt_user, rcpt_domain, list_id, - task:get_digest() - }) - ---[[ TODO: has been broken - if settings['from_map'] and dkim == 'allow' then - -- Use dkim - local das = task:get_symbol(settings['dkim_allow_symbols'][1]) - if ((das or E)[1] or E).options then - for _,dkim_domain in ipairs(das[1]['options']) do - local specific = settings.from_map:get_key(dkim_domain) - if specific then - specific_rows[specific] = {} - table.insert(specific_rows[specific], elt) - end - end - end - - end ---]] + digest + } -- Attachments step local attachments_fnames = {} @@ -551,14 +477,15 @@ local function clickhouse_collect(task) end if #attachments_fnames > 0 then - table.insert(attachment_rows, { - today(timestamp), - digest, - attachments_fnames, - attachments_ctypes, - attachments_lengths, - attachments_digests, - }) + table.insert(row, attachments_fnames) + table.insert(row, attachments_ctypes) + table.insert(row, attachments_lengths) + table.insert(row, attachments_digests) + else + table.insert(row, {}) + table.insert(row, {}) + table.insert(row, {}) + table.insert(row, {}) end -- Urls step @@ -576,58 +503,43 @@ local function clickhouse_collect(task) end if #urls_tlds > 0 then - table.insert(urls_rows, { - today(timestamp), - digest, - urls_tlds, - urls_urls - }) + table.insert(row, urls_tlds) + table.insert(row, urls_urls) + else + table.insert(row, {}) + table.insert(row, {}) end -- Emails step - local emails = {} if task:has_urls(true) then - for _,u in ipairs(task:get_emails()) do - table.insert(emails, - string.format('%s@%s', u:get_user(), u:get_host())) - end - end - - if #emails > 0 then - table.insert(emails_rows, { - today(timestamp), - digest, - emails, - }) + table.insert(row, fun.totable(fun.map(function(u) + return string.format('%s@%s', u:get_user(), u:get_host()) + end, task:get_emails()))) + else + table.insert(row, {}) end -- ASN information - if settings['asn_table'] then - local asn, country, ipnet = '--', '--', '--' - local pool = task:get_mempool() - ret = pool:get_variable("asn") - if ret then - asn = ret - end - ret = pool:get_variable("country") - if ret then - country = ret:sub(1, 2) - end - ret = pool:get_variable("ipnet") - if ret then - ipnet = ret - end - table.insert(asn_rows, { - today(timestamp), - digest, - asn, - country, - ipnet - }) + local asn, country, ipnet = '--', '--', '--' + local pool = task:get_mempool() + ret = pool:get_variable("asn") + if ret then + asn = ret + end + ret = pool:get_variable("country") + if ret then + country = ret:sub(1, 2) + end + ret = pool:get_variable("ipnet") + if ret then + ipnet = ret end + table.insert(row, asn) + table.insert(row, country) + table.insert(row, ipnet) -- Symbols info - if settings.enable_symbols and settings['symbols_table'] then + if settings.enable_symbols then local symbols = task:get_symbols_all() local syms_tab = {} local scores_tab = {} @@ -643,14 +555,9 @@ local function clickhouse_collect(task) table.insert(options_tab, ''); end end - - table.insert(symbols_rows, { - today(timestamp), - digest, - syms_tab, - scores_tab, - options_tab - }) + table.insert(row, syms_tab) + table.insert(row, scores_tab) + table.insert(row, options_tab) end -- Custom data @@ -660,17 +567,13 @@ local function clickhouse_collect(task) end nrows = nrows + 1 + table.insert(data_rows, row) rspamd_logger.debugm(N, task, "add clickhouse row %s / %s", nrows, settings.limit) if nrows > settings['limit'] then clickhouse_send_data(task) nrows = 0 - main_rows = {} - attachment_rows = {} - urls_rows = {} - emails_rows = {} - asn_rows = {} - symbols_rows = {} + data_rows = {} custom_rows = {} end end @@ -803,6 +706,163 @@ local function clickhouse_remove_old_partitions(cfg, ev_base) return settings.retention.period end +local function upload_clickhouse_schema(upstream, ev_base, cfg) + local ch_params = { + ev_base = ev_base, + config = cfg, + } + -- Apply schema sequentially + local function sql_recursor(i) + if clickhouse_schema[i] then + local sql = clickhouse_schema[i] + local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql, + function(_, _) + rspamd_logger.infox(rspamd_config, 'uploaded clickhouse schema element %s to %s', + i, upstream:get_addr():to_string(true)) + sql_recursor(i + 1) + end, + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot upload schema '%s' on clickhouse server %s: %s", + sql, upstream:get_addr():to_string(true), err) + end) + if not ret then + rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: cannot make request", + sql, upstream:get_addr():to_string(true)) + end + end + end + + sql_recursor(1) +end + +local function maybe_apply_migrations(upstream, ev_base, cfg, version) + local ch_params = { + ev_base = ev_base, + config = cfg, + } + -- Apply migrations sequentially + local function migration_recursor(i) + if i < schema_version then + if migrations[i] then + -- We also need to apply statements sequentially + local function sql_recursor(j) + if migrations[i][j] then + local sql = migrations[i][j] + local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql, + function(_, _) + rspamd_logger.infox(rspamd_config, + 'applied migration to version %s from version %s: %s', + i + 1, version, sql:gsub('[\n%s]+', ' ')) + if j == #migrations[i] then + -- Go to the next migration + migration_recursor(i + 1) + else + -- Apply the next statement + sql_recursor(j + 1) + end + end , + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot apply migration %s: '%s' on clickhouse server %s: %s", + i, sql, upstream:get_addr():to_string(true), err) + end) + if not ret then + rspamd_logger.errx(rspamd_config, + "cannot apply migration %s: '%s' on clickhouse server %s: cannot make request", + i, sql, upstream:get_addr():to_string(true)) + end + end + end + + sql_recursor(1) + else + -- Try another migration + migration_recursor(i + 1) + end + end + end + + migration_recursor(version) +end + +local function check_rspamd_table(upstream, ev_base, cfg) + local ch_params = { + ev_base = ev_base, + config = cfg, + } + local sql = [[EXISTS TABLE rspamd]] + local ret = lua_clickhouse.select(upstream, settings, ch_params, sql, + function(_, rows) + if rows[1] and rows[1].result then + if tonumber(rows[1].result) == 1 then + -- Apply migration + rspamd_logger.infox(rspamd_config, 'table rspamd exists, apply migration') + maybe_apply_migrations(upstream, ev_base, cfg, 1) + else + -- Upload schema + rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema') + upload_clickhouse_schema(upstream, ev_base, cfg) + end + else + rspamd_logger.errx(rspamd_config, + "unexpected reply on EXISTS command from server %s: %s", + upstream:get_addr():to_string(true), rows) + end + end , + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot check if rspamd table exists on clickhouse server %s: %s", + upstream:get_addr():to_string(true), err) + end) + if not ret then + rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: cannot make request", + upstream:get_addr():to_string(true)) + end +end + + +local function check_clickhouse_upstream(upstream, ev_base, cfg) + local ch_params = { + ev_base = ev_base, + config = cfg, + } + -- If we have some custom rules, we just send its schema to the upstream + for k,rule in pairs(settings.custom_rules) do + if rule.schema then + local sql = rspamd_lua_utils.template(rule.schema, settings) + local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql, + nil, + function(_, err) + rspamd_logger.errx(rspamd_config, + "cannot send custom schema %s to clickhouse server %s: %s", + k, upstream:get_addr():to_string(true), err) + end) + if not ret then + rspamd_logger.errx(rspamd_config, "cannot send custom schema %s to clickhouse server %s: cannot make request", + k, upstream:get_addr():to_string(true)) + end + end + end + + -- Now check the main schema and apply migrations if needed + local sql = [[SELECT MAX(Version) as v FROM rspamd_version]] + local ret = lua_clickhouse.select(upstream, settings, ch_params, sql, + function(_, rows) + local version = tonumber(rows[1].v) + maybe_apply_migrations(upstream, ev_base, cfg, version) + end, + function(_, err) + -- It might be either no rspamd table or version 1 + rspamd_logger.infox(rspamd_config, 'table rspamd_version does not exist, check rspamd table') + check_rspamd_table(upstream, ev_base, cfg) + end) + if not ret then + rspamd_logger.errx(rspamd_config, "cannot get version on clickhouse server %s: cannot make request", + upstream:get_addr():to_string(true)) + end +end + local opts = rspamd_config:get_all_opt('clickhouse') if opts then for k,v in pairs(opts) do @@ -856,9 +916,6 @@ if opts then else settings['from_map'] = rspamd_map_add('clickhouse', 'from_tables', 'regexp', 'clickhouse specific domains') - if settings.use_https then - connect_prefix = 'https://' - end settings.upstream = upstream_list.create(rspamd_config, settings['server'] or settings['servers'], 8123) @@ -888,46 +945,7 @@ if opts then local upstreams = settings.upstream:all_upstreams() for _,up in ipairs(upstreams) do - local ip_addr = up:get_addr():to_string(true) - - local function send_req(elt, sql) - local function http_cb(err_message, code, data, _) - if code ~= 200 or err_message then - if not err_message then err_message = data end - rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: %s", - elt, ip_addr, err_message) - up:fail() - else - up:ok() - end - end - - if not rspamd_http.request({ - ev_base = ev_base, - config = cfg, - url = connect_prefix .. ip_addr, - body = sql, - callback = http_cb, - mime_type = 'text/plain', - timeout = settings['timeout'], - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - }) then - rspamd_logger.errx(rspamd_config, "cannot create table %s in clickhouse server %s: cannot make request", - elt, ip_addr) - end - end - - for tab,sql in pairs(clickhouse_schema) do - send_req(tab, rspamd_lua_utils.template(sql, settings)) - end - - for k,rule in pairs(settings.custom_rules) do - if rule.schema then - send_req(k, rspamd_lua_utils.template(rule.schema, settings)) - end - end + check_clickhouse_upstream(up, ev_base, cfg) end if settings.retention.enable and settings.retention.method ~= 'drop' and settings.retention.method ~= 'detach' then |