1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537 |
- --[[
- Copyright (c) 2022, Vsevolod Stakhov <vsevolod@rspamd.com>
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ]]--
-
- local rspamd_logger = require 'rspamd_logger'
- local upstream_list = require "rspamd_upstream_list"
- local lua_util = require "lua_util"
- local lua_clickhouse = require "lua_clickhouse"
- local lua_settings = require "lua_settings"
- local fun = require "fun"
-
- local N = "clickhouse"
-
- if confighelp then
- return
- end
-
- local data_rows = {}
- local custom_rows = {}
- local nrows = 0
- local used_memory = 0
- local last_collection = 0
- local final_call = false -- If the final collection has been started
- local schema_version = 9 -- Current schema version
-
- local settings = {
- limits = { -- Collection limits
- max_rows = 1000, -- How many rows are allowed (0 for disable this)
- max_memory = 50 * 1024 * 1024, -- How many memory should be occupied before sending collection
- max_interval = 60, -- Maximum collection interval
- },
- collect_garbage = false, -- Perform GC collection after sending the data
- check_timeout = 10.0, -- Periodic timeout
- timeout = 5.0,
- bayes_spam_symbols = {'BAYES_SPAM'},
- bayes_ham_symbols = {'BAYES_HAM'},
- ann_symbols_spam = {'NEURAL_SPAM'},
- ann_symbols_ham = {'NEURAL_HAM'},
- fuzzy_symbols = {'FUZZY_DENIED'},
- whitelist_symbols = {'WHITELIST_DKIM', 'WHITELIST_SPF_DKIM', 'WHITELIST_DMARC'},
- dkim_allow_symbols = {'R_DKIM_ALLOW'},
- dkim_reject_symbols = {'R_DKIM_REJECT'},
- dkim_dnsfail_symbols = {'R_DKIM_TEMPFAIL', 'R_DKIM_PERMFAIL'},
- dkim_na_symbols = {'R_DKIM_NA'},
- dmarc_allow_symbols = {'DMARC_POLICY_ALLOW'},
- dmarc_reject_symbols = {'DMARC_POLICY_REJECT'},
- dmarc_quarantine_symbols = {'DMARC_POLICY_QUARANTINE'},
- dmarc_softfail_symbols = {'DMARC_POLICY_SOFTFAIL'},
- dmarc_na_symbols = {'DMARC_NA'},
- spf_allow_symbols = {'R_SPF_ALLOW'},
- spf_reject_symbols = {'R_SPF_FAIL'},
- spf_dnsfail_symbols = {'R_SPF_DNSFAIL', 'R_SPF_PERMFAIL'},
- spf_neutral_symbols = {'R_DKIM_TEMPFAIL', 'R_DKIM_PERMFAIL'},
- spf_na_symbols = {'R_SPF_NA'},
- stop_symbols = {},
- ipmask = 19,
- ipmask6 = 48,
- full_urls = false,
- from_tables = nil,
- enable_symbols = false,
- database = 'default',
- use_https = false,
- use_gzip = true,
- allow_local = false,
- insert_subject = false,
- subject_privacy = false, -- subject privacy is off
- subject_privacy_alg = 'blake2', -- default hash-algorithm to obfuscate subject
- subject_privacy_prefix = 'obf', -- prefix to show it's obfuscated
- subject_privacy_length = 16, -- cut the length of the hash
- schema_additions = {}, -- additional SQL statements to be executed when schema is uploaded
- user = nil,
- password = nil,
- no_ssl_verify = false,
- custom_rules = {},
- enable_digest = false,
- exceptions = nil,
- retention = {
- enable = false,
- method = 'detach',
- period_months = 3,
- run_every = '7d',
- },
- extra_columns = {},
- }
-
- --- @language SQL
- local clickhouse_schema = {[[
- CREATE TABLE IF NOT EXISTS rspamd
- (
- Date Date COMMENT 'Date (used for partitioning)',
- TS DateTime COMMENT 'Date and time of the request start (UTC)',
- From String COMMENT 'Domain part of the return address (RFC5321.MailFrom)',
- MimeFrom String COMMENT 'Domain part of the address in From: header (RFC5322.From)',
- IP String COMMENT 'SMTP client IP as provided by MTA or from Received: header',
- Helo String COMMENT 'Full hostname as sent by the SMTP client (RFC5321.HELO/.EHLO)',
- Score Float32 COMMENT 'Message score',
- NRcpt UInt8 COMMENT 'Number of envelope recipients (RFC5321.RcptTo)',
- Size UInt32 COMMENT 'Message size in bytes',
- IsWhitelist Enum8('blacklist' = 0, 'whitelist' = 1, 'unknown' = 2) DEFAULT 'unknown' COMMENT 'Based on symbols configured in `whitelist_symbols` module option',
- IsBayes Enum8('ham' = 0, 'spam' = 1, 'unknown' = 2) DEFAULT 'unknown' COMMENT 'Based on symbols configured in `bayes_spam_symbols` and `bayes_ham_symbols` module options',
- IsFuzzy Enum8('whitelist' = 0, 'deny' = 1, 'unknown' = 2) DEFAULT 'unknown' COMMENT 'Based on symbols configured in `fuzzy_symbols` module option',
- IsFann Enum8('ham' = 0, 'spam' = 1, 'unknown' = 2) DEFAULT 'unknown' COMMENT 'Based on symbols configured in `ann_symbols_spam` and `ann_symbols_ham` module options',
- IsDkim Enum8('reject' = 0, 'allow' = 1, 'unknown' = 2, 'dnsfail' = 3, 'na' = 4) DEFAULT 'unknown' COMMENT 'Based on symbols configured in dkim_* module options',
- IsDmarc Enum8('reject' = 0, 'allow' = 1, 'unknown' = 2, 'softfail' = 3, 'na' = 4, 'quarantine' = 5) DEFAULT 'unknown' COMMENT 'Based on symbols configured in dmarc_* module options',
- IsSpf Enum8('reject' = 0, 'allow' = 1, 'neutral' = 2, 'dnsfail' = 3, 'na' = 4, 'unknown' = 5) DEFAULT 'unknown' COMMENT 'Based on symbols configured in spf_* module options',
- NUrls Int32 COMMENT 'Number of URLs and email extracted from the message',
- Action Enum8('reject' = 0, 'rewrite subject' = 1, 'add header' = 2, 'greylist' = 3, 'no action' = 4, 'soft reject' = 5, 'custom' = 6) DEFAULT 'no action' COMMENT 'Action returned for the message; if action is not predefined actual action will be in `CustomAction` field',
- CustomAction LowCardinality(String) COMMENT 'Action string for custom action',
- FromUser String COMMENT 'Local part of the return address (RFC5321.MailFrom)',
- MimeUser String COMMENT 'Local part of the address in From: header (RFC5322.From)',
- RcptUser String COMMENT '[Deprecated] Local part of the first envelope recipient (RFC5321.RcptTo)',
- RcptDomain String COMMENT '[Deprecated] Domain part of the first envelope recipient (RFC5321.RcptTo)',
- SMTPRecipients Array(String) COMMENT 'List of envelope recipients (RFC5321.RcptTo)',
- MimeRecipients Array(String) COMMENT 'List of recipients from headers (RFC5322.To/.CC/.BCC)',
- MessageId String COMMENT 'Message-ID header',
- ListId String COMMENT 'List-Id header',
- Subject String COMMENT 'Subject header (or hash if `subject_privacy` module option enabled)',
- `Attachments.FileName` Array(String) COMMENT 'Attachment name',
- `Attachments.ContentType` Array(String) COMMENT 'Attachment Content-Type',
- `Attachments.Length` Array(UInt32) COMMENT 'Attachment size in bytes',
- `Attachments.Digest` Array(FixedString(16)) COMMENT 'First 16 characters of hash returned by mime_part:get_digest()',
- `Urls.Tld` Array(String) COMMENT 'Effective second level domain part of the URL host',
- `Urls.Url` Array(String) COMMENT 'Full URL if `full_urls` module option enabled, host part of URL otherwise',
- `Urls.Flags` Array(UInt32) COMMENT 'Corresponding url flags, see `enum rspamd_url_flags` in libserver/url.h for details',
- Emails Array(String) COMMENT 'List of emails extracted from the message',
- ASN UInt32 COMMENT 'BGP AS number for SMTP client IP (returned by asn.rspamd.com or asn6.rspamd.com)',
- Country FixedString(2) COMMENT 'Country for SMTP client IP (returned by asn.rspamd.com or asn6.rspamd.com)',
- IPNet String,
- `Symbols.Names` Array(LowCardinality(String)) COMMENT 'Symbol name',
- `Symbols.Scores` Array(Float32) COMMENT 'Symbol score',
- `Symbols.Options` Array(String) COMMENT 'Symbol options (comma separated list)',
- `Groups.Names` Array(LowCardinality(String)) COMMENT 'Group name',
- `Groups.Scores` Array(Float32) COMMENT 'Group score',
- ScanTimeReal UInt32 COMMENT 'Request time in milliseconds',
- ScanTimeVirtual UInt32 COMMENT 'Deprecated do not use',
- AuthUser String COMMENT 'Username for authenticated SMTP client',
- SettingsId LowCardinality(String) COMMENT 'ID for the settings profile',
- Digest FixedString(32) COMMENT '[Deprecated]',
- SMTPFrom ALIAS if(From = '', '', concat(FromUser, '@', From)) COMMENT 'Return address (RFC5321.MailFrom)',
- SMTPRcpt ALIAS SMTPRecipients[1] COMMENT 'The first envelope recipient (RFC5321.RcptTo)',
- MIMEFrom ALIAS if(MimeFrom = '', '', concat(MimeUser, '@', MimeFrom)) COMMENT 'Address in From: header (RFC5322.From)',
- MIMERcpt ALIAS MimeRecipients[1] COMMENT 'The first recipient from headers (RFC5322.To/.CC/.BCC)'
- ) ENGINE = MergeTree()
- PARTITION BY toMonday(Date)
- ORDER BY TS
- ]],
- [[CREATE TABLE IF NOT EXISTS rspamd_version ( Version UInt32) ENGINE = TinyLog]],
- {[[INSERT INTO rspamd_version (Version) Values (${SCHEMA_VERSION})]], true},
- }
-
- -- This describes SQL queries to migrate between versions
- local migrations = {
- [1] = {
- -- Move to a wide fat table
- [[ALTER TABLE rspamd
- ADD COLUMN IF NOT EXISTS `Attachments.FileName` Array(String) AFTER ListId,
- ADD COLUMN IF NOT EXISTS `Attachments.ContentType` Array(String) AFTER `Attachments.FileName`,
- ADD COLUMN IF NOT EXISTS `Attachments.Length` Array(UInt32) AFTER `Attachments.ContentType`,
- ADD COLUMN IF NOT EXISTS `Attachments.Digest` Array(FixedString(16)) AFTER `Attachments.Length`,
- ADD COLUMN IF NOT EXISTS `Urls.Tld` Array(String) AFTER `Attachments.Digest`,
- ADD COLUMN IF NOT EXISTS `Urls.Url` Array(String) AFTER `Urls.Tld`,
- ADD COLUMN IF NOT EXISTS Emails Array(String) AFTER `Urls.Url`,
- ADD COLUMN IF NOT EXISTS ASN UInt32 AFTER Emails,
- ADD COLUMN IF NOT EXISTS Country FixedString(2) AFTER ASN,
- ADD COLUMN IF NOT EXISTS IPNet String AFTER Country,
- ADD COLUMN IF NOT EXISTS `Symbols.Names` Array(String) AFTER IPNet,
- ADD COLUMN IF NOT EXISTS `Symbols.Scores` Array(Float64) AFTER `Symbols.Names`,
- ADD COLUMN IF NOT EXISTS `Symbols.Options` Array(String) AFTER `Symbols.Scores`]],
- -- Add explicit version
- [[CREATE TABLE rspamd_version ( Version UInt32) ENGINE = TinyLog]],
- [[INSERT INTO rspamd_version (Version) Values (2)]],
- },
- [2] = {
- -- Add `Subject` column
- [[ALTER TABLE rspamd
- ADD COLUMN IF NOT EXISTS Subject String AFTER ListId]],
- -- New version
- [[INSERT INTO rspamd_version (Version) Values (3)]],
- },
- [3] = {
- [[ALTER TABLE rspamd
- ADD COLUMN IF NOT EXISTS IsSpf Enum8('reject' = 0, 'allow' = 1, 'neutral' = 2, 'dnsfail' = 3, 'na' = 4, 'unknown' = 5) DEFAULT 'unknown' AFTER IsDmarc,
- MODIFY COLUMN IsDkim Enum8('reject' = 0, 'allow' = 1, 'unknown' = 2, 'dnsfail' = 3, 'na' = 4) DEFAULT 'unknown',
- MODIFY COLUMN IsDmarc Enum8('reject' = 0, 'allow' = 1, 'unknown' = 2, 'softfail' = 3, 'na' = 4, 'quarantine' = 5) DEFAULT 'unknown',
- ADD COLUMN IF NOT EXISTS MimeRecipients Array(String) AFTER RcptDomain,
- ADD COLUMN IF NOT EXISTS MessageId String AFTER MimeRecipients,
- ADD COLUMN IF NOT EXISTS ScanTimeReal UInt32 AFTER `Symbols.Options`,
- ADD COLUMN IF NOT EXISTS ScanTimeVirtual UInt32 AFTER ScanTimeReal]],
- -- Add aliases
- [[ALTER TABLE rspamd
- ADD COLUMN IF NOT EXISTS SMTPFrom ALIAS if(From = '', '', concat(FromUser, '@', From)),
- ADD COLUMN IF NOT EXISTS SMTPRcpt ALIAS if(RcptDomain = '', '', concat(RcptUser, '@', RcptDomain)),
- ADD COLUMN IF NOT EXISTS MIMEFrom ALIAS if(MimeFrom = '', '', concat(MimeUser, '@', MimeFrom)),
- ADD COLUMN IF NOT EXISTS MIMERcpt ALIAS MimeRecipients[1]
- ]],
- -- New version
- [[INSERT INTO rspamd_version (Version) Values (4)]],
- },
- [4] = {
- [[ALTER TABLE rspamd
- MODIFY COLUMN Action Enum8('reject' = 0, 'rewrite subject' = 1, 'add header' = 2, 'greylist' = 3, 'no action' = 4, 'soft reject' = 5, 'custom' = 6) DEFAULT 'no action',
- ADD COLUMN IF NOT EXISTS CustomAction String AFTER Action
- ]],
- -- New version
- [[INSERT INTO rspamd_version (Version) Values (5)]],
- },
- [5] = {
- [[ALTER TABLE rspamd
- ADD COLUMN IF NOT EXISTS AuthUser String AFTER ScanTimeVirtual,
- ADD COLUMN IF NOT EXISTS SettingsId LowCardinality(String) AFTER AuthUser
- ]],
- -- New version
- [[INSERT INTO rspamd_version (Version) Values (6)]],
- },
- [6] = {
- -- Add new columns
- [[ALTER TABLE rspamd
- ADD COLUMN IF NOT EXISTS Helo String AFTER IP,
- ADD COLUMN IF NOT EXISTS SMTPRecipients Array(String) AFTER RcptDomain
- ]],
- -- Modify SMTPRcpt alias
- [[
- ALTER TABLE rspamd
- MODIFY COLUMN SMTPRcpt ALIAS SMTPRecipients[1]
- ]],
- -- New version
- [[INSERT INTO rspamd_version (Version) Values (7)]],
- },
- [7] = {
- -- Add new columns
- [[ALTER TABLE rspamd
- ADD COLUMN IF NOT EXISTS `Groups.Names` Array(LowCardinality(String)) AFTER `Symbols.Options`,
- ADD COLUMN IF NOT EXISTS `Groups.Scores` Array(Float32) AFTER `Groups.Names`
- ]],
- -- New version
- [[INSERT INTO rspamd_version (Version) Values (8)]],
- },
- [8] = {
- -- Add new columns
- [[ALTER TABLE rspamd
- ADD COLUMN IF NOT EXISTS `Urls.Flags` Array(UInt32) AFTER `Urls.Url`
- ]],
- -- New version
- [[INSERT INTO rspamd_version (Version) Values (9)]],
- },
- }
-
- local predefined_actions = {
- ['reject'] = true,
- ['rewrite subject'] = true,
- ['add header'] = true,
- ['greylist'] = true,
- ['no action'] = true,
- ['soft reject'] = true
- }
-
- local function clickhouse_main_row(res)
- local fields = {
- 'Date',
- 'TS',
- 'From',
- 'MimeFrom',
- 'IP',
- 'Helo',
- 'Score',
- 'NRcpt',
- 'Size',
- 'IsWhitelist',
- 'IsBayes',
- 'IsFuzzy',
- 'IsFann',
- 'IsDkim',
- 'IsDmarc',
- 'NUrls',
- 'Action',
- 'FromUser',
- 'MimeUser',
- 'RcptUser',
- 'RcptDomain',
- 'SMTPRecipients',
- 'ListId',
- 'Subject',
- 'Digest',
- -- 1.9.2 +
- 'IsSpf',
- 'MimeRecipients',
- 'MessageId',
- 'ScanTimeReal',
- -- 1.9.3 +
- 'CustomAction',
- -- 2.0 +
- 'AuthUser',
- 'SettingsId',
- }
-
- for _,v in ipairs(fields) do table.insert(res, v) end
- end
-
- local function clickhouse_attachments_row(res)
- local fields = {
- 'Attachments.FileName',
- 'Attachments.ContentType',
- 'Attachments.Length',
- 'Attachments.Digest',
- }
-
- for _,v in ipairs(fields) do table.insert(res, v) end
- end
-
- local function clickhouse_urls_row(res)
- local fields = {
- 'Urls.Tld',
- 'Urls.Url',
- 'Urls.Flags',
- }
- for _,v in ipairs(fields) do table.insert(res, v) end
- end
-
- local function clickhouse_emails_row(res)
- local fields = {
- 'Emails',
- }
- for _,v in ipairs(fields) do table.insert(res, v) end
- end
-
- local function clickhouse_symbols_row(res)
- local fields = {
- 'Symbols.Names',
- 'Symbols.Scores',
- 'Symbols.Options',
- }
- for _,v in ipairs(fields) do table.insert(res, v) end
- end
-
- local function clickhouse_groups_row(res)
- local fields = {
- 'Groups.Names',
- 'Groups.Scores',
- }
- for _,v in ipairs(fields) do table.insert(res, v) end
- end
-
- local function clickhouse_asn_row(res)
- local fields = {
- 'ASN',
- 'Country',
- 'IPNet',
- }
- for _,v in ipairs(fields) do table.insert(res, v) end
- end
-
- local function clickhouse_extra_columns(res)
- for _,v in ipairs(settings.extra_columns) do table.insert(res, v.name) end
- end
-
- local function today(ts)
- return os.date('!%Y-%m-%d', ts)
- end
-
- local function clickhouse_check_symbol(task, settings_field_name, fields_table,
- field_name, value, value_negative)
- for _,s in ipairs(settings[settings_field_name] or {}) do
- if task:has_symbol(s) then
- if value_negative then
- local sym = task:get_symbol(s)[1]
- if sym['score'] > 0 then
- fields_table[field_name] = value
- else
- fields_table[field_name] = value_negative
- end
- else
- fields_table[field_name] = value
- end
-
- return true
- end
- end
-
- return false
- end
-
- local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows)
- local log_object = task or rspamd_config
- local upstream = settings.upstream:get_upstream_round_robin()
- local ip_addr = upstream:get_addr():to_string(true)
- rspamd_logger.infox(log_object, "trying to send %s rows to clickhouse server %s; started as %s",
- #gen_rows + #cust_rows, ip_addr, why)
-
- local function gen_success_cb(what, how_many)
- return function (_, _)
- rspamd_logger.messagex(log_object, "sent %s rows of %s to clickhouse server %s; started as %s",
- how_many, what, ip_addr, why)
- upstream:ok()
- end
- end
-
- local function gen_fail_cb(what, how_many)
- return function (_, err)
- rspamd_logger.errx(log_object, "cannot send %s rows of %s data to clickhouse server %s: %s; started as %s",
- how_many, what, ip_addr, err, why)
- upstream:fail()
- end
- end
-
- local function send_data(what, tbl, query)
- local ch_params = {}
- if task then
- ch_params.task = task
- else
- ch_params.config = rspamd_config
- ch_params.ev_base = ev_base
- end
-
- local ret = lua_clickhouse.insert(upstream, settings, ch_params,
- query, tbl,
- gen_success_cb(what, #tbl),
- gen_fail_cb(what, #tbl))
- if not ret then
- rspamd_logger.errx(log_object, "cannot send %s rows of %s data to clickhouse server %s: %s",
- #tbl, what, ip_addr, 'cannot make HTTP request')
- end
- end
-
- local fields = {}
- clickhouse_main_row(fields)
- clickhouse_attachments_row(fields)
- clickhouse_urls_row(fields)
- clickhouse_emails_row(fields)
- clickhouse_asn_row(fields)
-
- if settings.enable_symbols then
- clickhouse_symbols_row(fields)
- clickhouse_groups_row(fields)
- end
-
- if #settings.extra_columns > 0 then
- clickhouse_extra_columns(fields)
- end
-
- send_data('generic data', gen_rows,
- string.format('INSERT INTO rspamd (%s)',
- table.concat(fields, ',')))
-
- for k,crows in pairs(cust_rows) do
- if #crows > 1 then
- send_data('custom data ('..k..')', crows,
- settings.custom_rules[k].first_row())
- end
- end
- end
-
- local function clickhouse_collect(task)
- if task:has_flag('skip') then
- return
- end
-
- if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then
- return
- end
-
- for _,sym in ipairs(settings.stop_symbols) do
- if task:has_symbol(sym) then
- rspamd_logger.infox(task, 'skip Clickhouse storage for message: symbol %s has fired', sym)
- return
- end
- end
-
- if settings.exceptions then
- local excepted,trace = settings.exceptions:process(task)
- if excepted then
- rspamd_logger.infox(task, 'skipped Clickhouse storage for message: excepted (%s)',
- trace)
- -- Excepted
- return
- end
- end
-
- local from_domain = ''
- local from_user = ''
- if task:has_from('smtp') then
- local from = task:get_from({'smtp','orig'})[1]
-
- if from then
- from_domain = from['domain']:lower()
- from_user = from['user']
- end
- end
-
- local mime_domain = ''
- local mime_user = ''
- if task:has_from('mime') then
- local from = task:get_from({'mime','orig'})[1]
- if from then
- mime_domain = from['domain']:lower()
- mime_user = from['user']
- end
- end
-
- local mime_recipients = {}
- if task:has_recipients('mime') then
- local recipients = task:get_recipients({'mime','orig'})
- for _, rcpt in ipairs(recipients) do
- table.insert(mime_recipients, rcpt['user'] .. '@' .. rcpt['domain']:lower())
- end
- end
-
- local ip_str = 'undefined'
- local ip = task:get_from_ip()
- if ip and ip:is_valid() then
- local ipnet
- if ip:get_version() == 4 then
- ipnet = ip:apply_mask(settings['ipmask'])
- else
- ipnet = ip:apply_mask(settings['ipmask6'])
- end
- ip_str = ipnet:to_string()
- end
-
- local helo = task:get_helo() or ''
-
- local rcpt_user = ''
- local rcpt_domain = ''
- local smtp_recipients = {}
- if task:has_recipients('smtp') then
- local recipients = task:get_recipients('smtp')
- -- for compatibility with an old table structure
- rcpt_user = recipients[1]['user']
- rcpt_domain = recipients[1]['domain']:lower()
-
- for _, rcpt in ipairs(recipients) do
- table.insert(smtp_recipients, rcpt['user'] .. '@' .. rcpt['domain']:lower())
- end
- end
-
- local list_id = task:get_header('List-Id') or ''
- local message_id = lua_util.maybe_obfuscate_string(task:get_message_id() or '',
- settings, 'mid')
-
- local score = task:get_metric_score()[1];
- local fields = {
- bayes = 'unknown',
- fuzzy = 'unknown',
- ann = 'unknown',
- whitelist = 'unknown',
- dkim = 'unknown',
- dmarc = 'unknown',
- spf = 'unknown',
- }
-
- local ret
-
- ret = clickhouse_check_symbol(task,'bayes_spam_symbols', fields,
- 'bayes', 'spam')
- if not ret then
- clickhouse_check_symbol(task,'bayes_ham_symbols', fields,
- 'bayes', 'ham')
- end
-
- clickhouse_check_symbol(task,'ann_symbols_spam', fields,
- 'ann', 'spam')
- if not ret then
- clickhouse_check_symbol(task,'ann_symbols_ham', fields,
- 'ann', 'ham')
- end
-
- clickhouse_check_symbol(task,'whitelist_symbols', fields,
- 'whitelist', 'blacklist', 'whitelist')
-
- clickhouse_check_symbol(task,'fuzzy_symbols', fields,
- 'fuzzy', 'deny')
-
-
- ret = clickhouse_check_symbol(task,'dkim_allow_symbols', fields,
- 'dkim', 'allow')
- if not ret then
- ret = clickhouse_check_symbol(task,'dkim_reject_symbols', fields,
- 'dkim', 'reject')
- end
- if not ret then
- ret = clickhouse_check_symbol(task,'dkim_dnsfail_symbols', fields,
- 'dkim', 'dnsfail')
- end
- if not ret then
- clickhouse_check_symbol(task,'dkim_na_symbols', fields,
- 'dkim', 'na')
- end
-
-
- ret = clickhouse_check_symbol(task,'dmarc_allow_symbols', fields,
- 'dmarc', 'allow')
- if not ret then
- ret = clickhouse_check_symbol(task,'dmarc_reject_symbols', fields,
- 'dmarc', 'reject')
- end
- if not ret then
- ret = clickhouse_check_symbol(task,'dmarc_quarantine_symbols', fields,
- 'dmarc', 'quarantine')
- end
- if not ret then
- ret = clickhouse_check_symbol(task,'dmarc_softfail_symbols', fields,
- 'dmarc', 'softfail')
- end
- if not ret then
- clickhouse_check_symbol(task,'dmarc_na_symbols', fields,
- 'dmarc', 'na')
- end
-
-
- ret = clickhouse_check_symbol(task,'spf_allow_symbols', fields,
- 'spf', 'allow')
- if not ret then
- ret = clickhouse_check_symbol(task,'spf_reject_symbols', fields,
- 'spf', 'reject')
- end
- if not ret then
- ret = clickhouse_check_symbol(task,'spf_neutral_symbols', fields,
- 'spf', 'neutral')
- end
- if not ret then
- ret = clickhouse_check_symbol(task,'spf_dnsfail_symbols', fields,
- 'spf', 'dnsfail')
- end
- if not ret then
- clickhouse_check_symbol(task,'spf_na_symbols', fields,
- 'spf', 'na')
- end
-
- local nrcpts = 0
- if task:has_recipients('smtp') then
- nrcpts = #task:get_recipients('smtp')
- end
-
- local nurls = 0
- local task_urls = task:get_urls({
- content = true,
- images = true,
- emails = false,
- sort = true,
- }) or {}
-
- nurls = #task_urls
-
- local timestamp = math.floor(task:get_date({
- format = 'connect',
- gmt = true, -- The only sane way to sync stuff with different timezones
- }))
-
- local action = task:get_metric_action()
- local custom_action = ''
-
- if not predefined_actions[action] then
- custom_action = action
- action = 'custom'
- end
-
- local digest = ''
-
- if settings.enable_digest then
- digest = task:get_digest()
- end
-
- local subject = ''
- if settings.insert_subject then
- subject = lua_util.maybe_obfuscate_string(task:get_subject() or '', settings, 'subject')
- end
-
- local scan_real = task:get_scan_time()
- scan_real = math.floor(scan_real * 1000)
- if scan_real < 0 then
- rspamd_logger.messagex(task,
- 'clock skew detected for message: %s ms real scan time (reset to 0)',
- scan_real)
- scan_real = 0
- end
-
- local auth_user = task:get_user() or ''
- local settings_id = task:get_settings_id()
-
- if settings_id then
- -- Convert to string
- settings_id = lua_settings.settings_by_id(settings_id)
-
- if settings_id then
- settings_id = settings_id.name
- end
- end
-
- if not settings_id then
- settings_id = ''
- end
-
- local row = {
- today(timestamp),
- timestamp,
- from_domain,
- mime_domain,
- ip_str,
- helo,
- score,
- nrcpts,
- task:get_size(),
- fields.whitelist,
- fields.bayes,
- fields.fuzzy,
- fields.ann,
- fields.dkim,
- fields.dmarc,
- nurls,
- action,
- from_user,
- mime_user,
- rcpt_user,
- rcpt_domain,
- smtp_recipients,
- list_id,
- subject,
- digest,
- fields.spf,
- mime_recipients,
- message_id,
- scan_real,
- custom_action,
- auth_user,
- settings_id
- }
-
- -- Attachments step
- local attachments_fnames = {}
- local attachments_ctypes = {}
- local attachments_lengths = {}
- local attachments_digests = {}
- for _, part in ipairs(task:get_parts()) do
- if part:is_attachment() then
- table.insert(attachments_fnames, part:get_filename() or '')
- local mime_type, mime_subtype = part:get_type()
- table.insert(attachments_ctypes, string.format("%s/%s", mime_type, mime_subtype))
- table.insert(attachments_lengths, part:get_length())
- table.insert(attachments_digests, string.sub(part:get_digest(), 1, 16))
- end
- end
-
- if #attachments_fnames > 0 then
- table.insert(row, attachments_fnames)
- table.insert(row, attachments_ctypes)
- table.insert(row, attachments_lengths)
- table.insert(row, attachments_digests)
- else
- table.insert(row, {})
- table.insert(row, {})
- table.insert(row, {})
- table.insert(row, {})
- end
-
- -- Urls step
- local urls_urls = {}
- local urls_tlds = {}
- local urls_flags = {}
-
- if settings.full_urls then
- for i,u in ipairs(task_urls) do
- urls_urls[i] = u:get_text()
- urls_tlds[i] = u:get_tld() or u:get_host()
- urls_flags[i] = u:get_flags_num()
- end
- else
- -- We need to store unique
- local mt = {
- ord_tbl = {}, -- ordered list of urls
- idx_tbl = {}, -- indexed by host + flags, reference to an index in ord_tbl
- __newindex = function(t, k, v)
- local idx = getmetatable(t).idx_tbl
- local ord = getmetatable(t).ord_tbl
- local key = k:get_host() .. tostring(k:get_flags_num())
- if idx[key] then
- ord[idx[key]] = v -- replace
- else
- ord[#ord + 1] = v
- idx[key] = #ord
- end
- end,
- __index = function(t, k)
- local ord = getmetatable(t).ord_tbl
- if type(k) == 'number' then
- return ord[k]
- else
- local idx = getmetatable(t).idx_tbl
- local key = k:get_host() .. tostring(k:get_flags_num())
- if idx[key] then
- return ord[idx[key]]
- end
- end
- end,
- }
- -- Extra index needed for making this unique
- local urls_idx = {}
- setmetatable(urls_idx, mt)
- for _,u in ipairs(task_urls) do
- if not urls_idx[u] then
- urls_idx[u] = u
- urls_urls[#urls_urls + 1] = u:get_host()
- urls_tlds[#urls_tlds + 1] = u:get_tld() or u:get_host()
- urls_flags[#urls_flags + 1] = u:get_flags_num()
- end
- end
- end
-
-
- -- Get tlds
- table.insert(row, urls_tlds)
- -- Get hosts/full urls
- table.insert(row, urls_urls)
- -- Numeric flags
- table.insert(row, urls_flags)
-
- -- Emails step
- if task:has_urls(true) then
- local emails = task:get_emails() or {}
- local emails_formatted = {}
- for i,u in ipairs(emails) do
- emails_formatted[i] = string.format('%s@%s', u:get_user(), u:get_host())
- end
- table.insert(row, emails_formatted)
- else
- table.insert(row, {})
- end
-
- -- ASN information
- local asn, country, ipnet = 0, '--', '--'
- local pool = task:get_mempool()
- ret = pool:get_variable("asn")
- if ret then
- asn = ret
- end
- ret = pool:get_variable("country")
- if ret then
- country = ret:sub(1, 2)
- end
- ret = pool:get_variable("ipnet")
- if ret then
- ipnet = ret
- end
- table.insert(row, asn)
- table.insert(row, country)
- table.insert(row, ipnet)
-
- -- Symbols info
- if settings.enable_symbols then
- local symbols = task:get_symbols_all()
- local syms_tab = {}
- local scores_tab = {}
- local options_tab = {}
-
- for _,s in ipairs(symbols) do
- table.insert(syms_tab, s.name or '')
- table.insert(scores_tab, s.score)
-
- if s.options then
- table.insert(options_tab, table.concat(s.options, ','))
- else
- table.insert(options_tab, '');
- end
- end
- table.insert(row, syms_tab)
- table.insert(row, scores_tab)
- table.insert(row, options_tab)
-
- -- Groups data
- local groups = task:get_groups()
- local groups_tab = {}
- local gr_scores_tab = {}
- for gr,sc in pairs(groups) do
- table.insert(groups_tab, gr)
- table.insert(gr_scores_tab, sc)
- end
- table.insert(row, groups_tab)
- table.insert(row, gr_scores_tab)
- end
-
- -- Extra columns
- if #settings.extra_columns > 0 then
- for _,col in ipairs(settings.extra_columns) do
- local elts = col.real_selector(task)
-
- if elts then
- table.insert(row, elts)
- else
- table.insert(row, col.default_value)
- end
- end
- end
-
- -- Custom data
- for k,rule in pairs(settings.custom_rules) do
- if not custom_rows[k] then custom_rows[k] = {} end
- table.insert(custom_rows[k], lua_clickhouse.row_to_tsv(rule.get_row(task)))
- end
-
- local tsv_row = lua_clickhouse.row_to_tsv(row)
- used_memory = used_memory + #tsv_row
- data_rows[#data_rows + 1] = tsv_row
- nrows = nrows + 1
- lua_util.debugm(N, task,
- "add clickhouse row %s / %s; used memory: %s / %s",
- nrows, settings.limits.max_rows,
- used_memory, settings.limits.max_memory)
- end
-
- local function do_remove_partition(ev_base, cfg, table_name, partition)
- lua_util.debugm(N, rspamd_config, "removing partition %s.%s", table_name, partition)
- local upstream = settings.upstream:get_upstream_round_robin()
- local remove_partition_sql = "ALTER TABLE ${table_name} ${remove_method} PARTITION '${partition}'"
- local remove_method = (settings.retention.method == 'drop') and 'DROP' or 'DETACH'
- local sql_params = {
- ['table_name'] = table_name,
- ['remove_method'] = remove_method,
- ['partition'] = partition
- }
-
- local sql = lua_util.template(remove_partition_sql, sql_params)
-
- local ch_params = {
- body = sql,
- ev_base = ev_base,
- config = cfg,
- }
-
- local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql)
- if err then
- rspamd_logger.errx(rspamd_config,
- "cannot detach partition %s:%s from server %s: %s",
- table_name, partition,
- settings['server'], err)
- return
- end
-
- rspamd_logger.infox(rspamd_config,
- 'detached partition %s:%s on server %s', table_name, partition,
- settings['server'])
-
- end
-
- --[[
- nil - file is not writable, do not perform removal
- 0 - it's time to perform removal
- <int> - how many seconds wait until next run
- ]]
- local function get_last_removal_ago()
- local ts_file = string.format('%s/%s', rspamd_paths['DBDIR'], 'clickhouse_retention_run')
- local last_ts
- local current_ts = os.time()
-
- local function write_ts_to_file()
- local write_file, err = io.open(ts_file, 'w')
- if err then
- rspamd_logger.errx(rspamd_config, 'Failed to open %s, will not perform retention: %s', ts_file, err)
- return nil
- end
-
- local res
- res, err = write_file:write(tostring(current_ts))
- if err or res == nil then
- write_file:close()
- rspamd_logger.errx(rspamd_config, 'Failed to write %s, will not perform retention: %s', ts_file, err)
- return nil
- end
- write_file:close()
-
- return true
- end
-
- local f, err = io.open(ts_file, 'r')
- if err then
- lua_util.debugm(N, rspamd_config, 'Failed to open %s: %s', ts_file, err)
- else
- last_ts = tonumber(f:read('*number'))
- f:close()
- end
-
- if last_ts == nil or (last_ts + settings.retention.period) <= current_ts then
- return write_ts_to_file() and 0
- end
-
- if last_ts > current_ts then
- -- Clock skew detected, overwrite last_ts with current_ts and wait for the next
- -- retention period
- rspamd_logger.errx(rspamd_config, 'Last collection time is in future: %s; overwrite it with %s in %s',
- last_ts, current_ts, ts_file)
- return write_ts_to_file() and -1
- end
-
- return (last_ts + settings.retention.period) - current_ts
- end
-
- local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now)
- local need_collect = false
- local reason
-
- if nrows == 0 then
- lua_util.debugm(N, cfg, "no need to send data, as there are no rows to collect")
- return settings.check_timeout
- end
-
- if final_call then
- lua_util.debugm(N, cfg, "no need to send data, final call has been issued")
- return 0
- end
-
- if settings.limits.max_rows > 0 then
- if nrows > settings.limits.max_rows then
- need_collect = true
- reason = string.format('limit of rows has been reached: %d', nrows)
- end
- end
-
- if last_collection > 0 and settings.limits.max_interval > 0 then
- if now - last_collection > settings.limits.max_interval then
- need_collect = true
- reason = string.format('limit of time since last collection has been reached: %d seconds passed ' ..
- '(%d seconds trigger)',
- (now - last_collection), settings.limits.max_interval)
- end
- end
-
- if settings.limits.max_memory > 0 then
- if used_memory >= settings.limits.max_memory then
- need_collect = true
- reason = string.format('limit of memory has been reached: %d bytes used',
- used_memory)
- end
- end
-
- if last_collection == 0 then
- last_collection = now
- end
-
- if need_collect then
- -- Do it atomic
- local saved_rows = data_rows
- local saved_custom = custom_rows
- nrows = 0
- last_collection = now
- used_memory = 0
- data_rows = {}
- custom_rows = {}
-
- clickhouse_send_data(nil, ev_base, reason, saved_rows, saved_custom)
-
- if settings.collect_garbage then
- collectgarbage()
- end
- end
-
- return settings.check_timeout
- end
-
- local function clickhouse_remove_old_partitions(cfg, ev_base)
- local last_time_ago = get_last_removal_ago()
- if last_time_ago == nil then
- rspamd_logger.errx(rspamd_config, "Failed to get last run time. Disabling retention")
- return false
- elseif last_time_ago ~= 0 then
- return last_time_ago
- end
-
- local upstream = settings.upstream:get_upstream_round_robin()
- local partition_to_remove_sql = "SELECT partition, table " ..
- "FROM system.parts WHERE table IN ('${tables}') " ..
- "GROUP BY partition, table " ..
- "HAVING max(max_date) < toDate(now() - interval ${month} month)"
-
- local table_names = {'rspamd'}
- local tables = table.concat(table_names, "', '")
- local sql_params = {
- tables = tables,
- month = settings.retention.period_months,
- }
- local sql = lua_util.template(partition_to_remove_sql, sql_params)
-
-
- local ch_params = {
- ev_base = ev_base,
- config = cfg,
- }
- local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql)
- if err then
- rspamd_logger.errx(rspamd_config,
- "cannot send data to clickhouse server %s: %s",
- settings['server'], err)
- else
- fun.each(function(row)
- do_remove_partition(ev_base, cfg, row.table, row.partition)
- end, rows)
- end
-
- -- settings.retention.period is added on initialisation, see below
- return settings.retention.period
- end
-
- local function upload_clickhouse_schema(upstream, ev_base, cfg, initial)
- local ch_params = {
- ev_base = ev_base,
- config = cfg,
- }
-
- local errored = false
-
- -- Upload a single element of the schema
- local function upload_schema_elt(v)
- if errored then
- rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: due to previous errors",
- v, upstream:get_addr():to_string(true))
- return
- end
- local sql = v
- local err, reply = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql)
-
- if err then
- rspamd_logger.errx(rspamd_config, "cannot upload schema '%s' on clickhouse server %s: %s",
- sql, upstream:get_addr():to_string(true), err)
- errored = true
- return
- end
- rspamd_logger.debugm(N, rspamd_config, 'uploaded clickhouse schema element %s to %s: %s',
- v, upstream:get_addr():to_string(true), reply)
- end
-
- -- Process element and return nil if statement should be skipped
- local function preprocess_schema_elt(v)
- if type(v) == 'string' then
- return lua_util.template(v, {SCHEMA_VERSION = tostring(schema_version)})
- elseif type(v) == 'table' then
- -- Pair of statement + boolean
- if initial == v[2] then
- return lua_util.template(v[1], {SCHEMA_VERSION = tostring(schema_version)})
- else
- rspamd_logger.debugm(N, rspamd_config, 'skip clickhouse schema element %s: schema already exists',
- v)
- end
- end
-
- return nil
- end
-
- -- Apply schema elements sequentially, users additions are concatenated to the tail
- fun.each(upload_schema_elt,
- -- Also template schema version
- fun.filter(function(v) return v ~= nil end,
- fun.map(preprocess_schema_elt,
- fun.chain(clickhouse_schema, settings.schema_additions)
- )
- )
- )
- end
-
- local function maybe_apply_migrations(upstream, ev_base, cfg, version)
- local ch_params = {
- ev_base = ev_base,
- config = cfg,
- }
- -- Apply migrations sequentially
- local function migration_recursor(i)
- if i < schema_version then
- if migrations[i] then
- -- We also need to apply statements sequentially
- local function sql_recursor(j)
- if migrations[i][j] then
- local sql = migrations[i][j]
- local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
- function(_, _)
- rspamd_logger.infox(rspamd_config,
- 'applied migration to version %s from version %s: %s',
- i + 1, version, sql:gsub('[\n%s]+', ' '))
- if j == #migrations[i] then
- -- Go to the next migration
- migration_recursor(i + 1)
- else
- -- Apply the next statement
- sql_recursor(j + 1)
- end
- end ,
- function(_, err)
- rspamd_logger.errx(rspamd_config,
- "cannot apply migration %s: '%s' on clickhouse server %s: %s",
- i, sql, upstream:get_addr():to_string(true), err)
- end)
- if not ret then
- rspamd_logger.errx(rspamd_config,
- "cannot apply migration %s: '%s' on clickhouse server %s: cannot make request",
- i, sql, upstream:get_addr():to_string(true))
- end
- end
- end
-
- sql_recursor(1)
- else
- -- Try another migration
- migration_recursor(i + 1)
- end
- end
- end
-
- migration_recursor(version)
- end
-
- local function add_extra_columns(upstream, ev_base, cfg)
- local ch_params = {
- ev_base = ev_base,
- config = cfg,
- }
- -- Apply migrations sequentially
- local function columns_recursor(i)
- if i <= #settings.extra_columns then
- local col = settings.extra_columns[i]
- local prev_column
- if i == 1 then
- prev_column = 'MIMERcpt'
- else
- prev_column = settings.extra_columns[i - 1].name
- end
- local sql = string.format('ALTER TABLE rspamd ADD COLUMN IF NOT EXISTS `%s` %s AFTER `%s`',
- col.name, col.type, prev_column)
- if col.comment then
- sql = sql .. string.format(", COMMENT COLUMN IF EXISTS `%s` '%s'", col.name, col.comment)
- end
-
- local ret = lua_clickhouse.generic(upstream, settings, ch_params, sql,
- function(_, _)
- rspamd_logger.infox(rspamd_config,
- 'added extra column %s (%s) after %s',
- col.name, col.type, prev_column)
- -- Apply the next statement
- columns_recursor(i + 1)
- end ,
- function(_, err)
- rspamd_logger.errx(rspamd_config,
- "cannot apply add column alter %s: '%s' on clickhouse server %s: %s",
- i, sql, upstream:get_addr():to_string(true), err)
- end)
- if not ret then
- rspamd_logger.errx(rspamd_config,
- "cannot apply add column alter %s: '%s' on clickhouse server %s: cannot make request",
- i, sql, upstream:get_addr():to_string(true))
- end
- end
- end
-
- columns_recursor(1)
- end
-
- local function check_rspamd_table(upstream, ev_base, cfg)
- local ch_params = {
- ev_base = ev_base,
- config = cfg,
- }
- local sql = [[EXISTS TABLE rspamd]]
- local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql)
- if err then
- rspamd_logger.errx(rspamd_config, "cannot check rspamd table in clickhouse server %s: %s",
- upstream:get_addr():to_string(true), err)
- return
- end
-
- if rows[1] and rows[1].result then
- if tonumber(rows[1].result) == 1 then
- -- Apply migration
- upload_clickhouse_schema(upstream, ev_base, cfg, false)
- rspamd_logger.infox(rspamd_config, 'table rspamd exists, check if we need to apply migrations')
- maybe_apply_migrations(upstream, ev_base, cfg, 1)
- else
- -- Upload schema
- rspamd_logger.infox(rspamd_config, 'table rspamd does not exists, upload full schema')
- upload_clickhouse_schema(upstream, ev_base, cfg, true)
- end
- else
- rspamd_logger.errx(rspamd_config,
- "unexpected reply on EXISTS command from server %s: %s",
- upstream:get_addr():to_string(true), rows)
- end
- end
-
-
- local function check_clickhouse_upstream(upstream, ev_base, cfg)
- local ch_params = {
- ev_base = ev_base,
- config = cfg,
- }
- -- If we have some custom rules, we just send its schema to the upstream
- for k,rule in pairs(settings.custom_rules) do
- if rule.schema then
- local sql = lua_util.template(rule.schema, settings)
- local err, _ = lua_clickhouse.generic_sync(upstream, settings, ch_params, sql)
- if err then
- rspamd_logger.errx(rspamd_config, 'cannot send custom schema %s to clickhouse server %s: ' ..
- 'cannot make request (%s)',
- k, upstream:get_addr():to_string(true), err)
- end
- end
- end
-
- -- Now check the main schema and apply migrations if needed
- local sql = [[SELECT MAX(Version) as v FROM rspamd_version]]
- local err, rows = lua_clickhouse.select_sync(upstream, settings, ch_params, sql)
- if err then
- if rows and rows.code == 404 then
- rspamd_logger.infox(rspamd_config,
- 'table rspamd_version does not exist, check rspamd table')
- check_rspamd_table(upstream, ev_base, cfg)
- else
- rspamd_logger.errx(rspamd_config,
- "cannot get version on clickhouse server %s: %s",
- upstream:get_addr():to_string(true), err)
- end
- else
- upload_clickhouse_schema(upstream, ev_base, cfg, false)
- local version = tonumber(rows[1].v)
- maybe_apply_migrations(upstream, ev_base, cfg, version)
- end
-
- if #settings.extra_columns > 0 then
- add_extra_columns(upstream, ev_base, cfg)
- end
- end
-
- local opts = rspamd_config:get_all_opt('clickhouse')
- if opts then
- -- Legacy `limit` options
- if opts.limit and not opts.limits then
- settings.limits.max_rows = opts.limit
- end
- for k,v in pairs(opts) do
- if k == 'custom_rules' then
- if not v[1] then
- v = {v}
- end
-
- for i,rule in ipairs(v) do
- if rule.schema and rule.first_row and rule.get_row then
- local first_row, get_row
- local loadstring = loadstring or load
- local ret, res_or_err = pcall(loadstring(rule.first_row))
-
- if not ret or type(res_or_err) ~= 'function' then
- rspamd_logger.errx(rspamd_config, 'invalid first_row (%s) - must be a function',
- res_or_err)
- else
- first_row = res_or_err
- end
-
- ret, res_or_err = pcall(loadstring(rule.get_row))
-
- if not ret or type(res_or_err) ~= 'function' then
- rspamd_logger.errx(rspamd_config,
- 'invalid get_row (%s) - must be a function',
- res_or_err)
- else
- get_row = res_or_err
- end
-
- if first_row and get_row then
- local name = rule.name or tostring(i)
- settings.custom_rules[name] = {
- schema = rule.schema,
- first_row = first_row,
- get_row = get_row,
- }
- end
- else
- rspamd_logger.errx(rspamd_config, 'custom rule has no required attributes: schema, first_row and get_row')
- end
- end
- else
- settings[k] = lua_util.deepcopy(v)
- end
- end
-
- if not settings['server'] and not settings['servers'] then
- rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
- lua_util.disable_module(N, "config")
- else
- local lua_maps = require "lua_maps"
- settings['from_map'] = lua_maps.map_add('clickhouse', 'from_tables',
- 'regexp', 'clickhouse specific domains')
-
- settings.upstream = upstream_list.create(rspamd_config,
- settings['server'] or settings['servers'], 8123)
-
- if not settings.upstream then
- rspamd_logger.errx(rspamd_config, 'cannot parse clickhouse address: %s',
- settings['server'] or settings['servers'])
- lua_util.disable_module(N, "config")
- return
- end
-
- if settings.exceptions then
- local maps_expressions = require "lua_maps_expressions"
-
- settings.exceptions = maps_expressions.create(rspamd_config,
- settings.exceptions, N)
- end
-
- if settings.extra_columns then
- -- Check sanity and create selector closures
- local lua_selectors = require "lua_selectors"
- local columns_transformed = {}
- local need_sort = false
- -- Select traverse function depending on what we have
- local iter_func = settings.extra_columns[1] and ipairs or pairs
-
- for col_name,col_data in iter_func(settings.extra_columns) do
- -- Array based extra columns
- if col_data.name then col_name = col_data.name end
- if not col_data.selector or not col_data.type then
- rspamd_logger.errx(rspamd_config, 'cannot add clickhouse extra row %s: no type or no selector',
- col_name)
- else
- local is_array = false
-
- if col_data.type:lower():match('^array') then
- is_array = true
- end
-
- local selector = lua_selectors.create_selector_closure(rspamd_config,
- col_data.selector, col_data.delimiter or '', is_array)
-
- if not selector then
- rspamd_logger.errx(rspamd_config, 'cannot add clickhouse extra row %s: bad selector: %s',
- col_name, col_data.selector)
- else
- if not col_data.default_value then
- if is_array then
- col_data.default_value = {}
- else
- col_data.default_value = ''
- end
- end
- col_data.real_selector = selector
- if not col_data.name then
- col_data.name = col_name
- need_sort = true
- end
- table.insert(columns_transformed, col_data)
- end
- end
- end
-
- -- Convert extra columns from a map to an array sorted by column name to
- -- preserve strict order when doing altering
- if need_sort then
- rspamd_logger.infox(rspamd_config, 'sort extra columns as they are not configured as an array')
- table.sort(columns_transformed, function(c1, c2) return c1.name < c2.name end)
- end
- settings.extra_columns = columns_transformed
- end
-
- rspamd_config:register_symbol({
- name = 'CLICKHOUSE_COLLECT',
- type = 'idempotent',
- callback = clickhouse_collect,
- flags = 'empty,explicit_disable,ignore_passthrough',
- augmentations = {string.format("timeout=%f", settings.timeout)},
- })
- rspamd_config:register_finish_script(function(task)
- if nrows > 0 then
- final_call = true
- local saved_rows = data_rows
- local saved_custom = custom_rows
-
- nrows = 0
- data_rows = {}
- used_memory = 0
- custom_rows = {}
-
- clickhouse_send_data(task, nil, 'final collection',
- saved_rows, saved_custom)
-
- if settings.collect_garbage then
- collectgarbage()
- end
- end
- end)
- -- Create tables on load
- rspamd_config:add_on_load(function(cfg, ev_base, worker)
- if worker:is_scanner() then
- rspamd_config:add_periodic(ev_base, 0,
- clickhouse_maybe_send_data_periodic, true)
- end
- if worker:is_primary_controller() then
- local upstreams = settings.upstream:all_upstreams()
-
- for _,up in ipairs(upstreams) do
- check_clickhouse_upstream(up, ev_base, cfg)
- end
-
- if settings.retention.enable and settings.retention.method ~= 'drop' and
- settings.retention.method ~= 'detach' then
- rspamd_logger.errx(rspamd_config,
- "retention.method should be either 'drop' or 'detach' (now: %s). Disabling retention",
- settings.retention.method)
- settings.retention.enable = false
- end
- if settings.retention.enable and settings.retention.period_months < 1 or
- settings.retention.period_months > 1000 then
- rspamd_logger.errx(rspamd_config,
- "please, set retention.period_months between 1 and 1000 (now: %s). Disabling retention",
- settings.retention.period_months)
- settings.retention.enable = false
- end
- local period = lua_util.parse_time_interval(settings.retention.run_every)
- if settings.retention.enable and period == nil then
- rspamd_logger.errx(rspamd_config, "invalid value for retention.run_every (%s). Disabling retention",
- settings.retention.run_every)
- settings.retention.enable = false
- end
-
- if settings.retention.enable then
- settings.retention.period = period
- rspamd_logger.infox(rspamd_config,
- "retention will be performed each %s seconds for %s month with method %s",
- period, settings.retention.period_months, settings.retention.method)
- rspamd_config:add_periodic(ev_base, 0, clickhouse_remove_old_partitions, false)
- end
- end
- end)
- end
- end
|