From 3028e8571abd3ca3112d36710776b66c1009051c Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 6 Aug 2018 17:56:17 +0100 Subject: [PATCH] [Rework] Rework Clickhouse plugin to use the new API --- src/plugins/lua/clickhouse.lua | 351 +++++++++++---------------------- 1 file changed, 117 insertions(+), 234 deletions(-) diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index a4fd07034..3eebd5b5d 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -28,13 +28,11 @@ if confighelp then return end -local E = {} - local main_rows = {} local attachment_rows = {} local urls_rows = {} local emails_rows = {} -local specific_rows = {} +--local specific_rows = {} local asn_rows = {} local symbols_rows = {} local custom_rows = {} @@ -252,34 +250,8 @@ local function clickhouse_asn_row(tname) return elt end -local function clickhouse_first_row() - table.insert(main_rows, clickhouse_main_row(settings['table'])) - if settings['attachments_table'] then - table.insert(attachment_rows, - clickhouse_attachments_row(settings['attachments_table'])) - end - if settings['urls_table'] then - table.insert(urls_rows, - clickhouse_urls_row(settings['urls_table'])) - end - if settings['emails_table'] then - table.insert(emails_rows, - clickhouse_emails_row(settings['emails_table'])) - end - if settings['asn_table'] then - table.insert(asn_rows, - clickhouse_asn_row(settings['asn_table'])) - end - if settings.enable_symbols and settings['symbols_table'] then - table.insert(symbols_rows, - clickhouse_symbols_row(settings['symbols_table'])) - end - - for k,rule in pairs(settings.custom_rules) do - if not custom_rows[k] then custom_rows[k] = {} end - table.insert(custom_rows[k], - rule.first_row()) - end +local function today(ts) + return os.date('%Y-%m-%d', ts) end local function clickhouse_check_symbol(task, symbols, need_score) @@ -301,174 +273,73 @@ local function clickhouse_send_data(task) local upstream = settings.upstream:get_upstream_round_robin() local ip_addr = upstream:get_addr():to_string(true) - local function gen_http_cb(what, how_many) - return function (err_message, code, data, _) - if code ~= 200 or err_message then - if not err_message then err_message = data end - rspamd_logger.errx(task, "cannot send %s data to clickhouse server %s: %s", - what, ip_addr, err_message) - upstream:fail() - else - rspamd_logger.infox(task, "sent %s rows of %s to clickhouse server %s", - how_many - 1, what, ip_addr) - upstream:ok() - end + local function gen_success_cb(what, how_many) + return function (_, _) + rspamd_logger.infox(task, "sent %s rows of %s to clickhouse server %s", + how_many, what, ip_addr) + upstream:ok() end end - local body = table.concat(main_rows, ' ') - if not rspamd_http.request({ - task = task, - url = connect_prefix .. ip_addr, - body = body, - callback = gen_http_cb('generic data', #main_rows), - gzip = settings.use_gzip, - mime_type = 'text/plain', - timeout = settings['timeout'], - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - }) then - rspamd_logger.errx(task, "cannot send data to clickhouse server %s: cannot make request", - settings['server']) + local function gen_fail_cb(what, how_many) + return function (_, err) + rspamd_logger.errx(task, "cannot send %s rows of %s data to clickhouse server %s: %s", + how_many, what, ip_addr, err) + upstream:fail() + end end - if #attachment_rows > 1 then - body = table.concat(attachment_rows, ' ') - if not rspamd_http.request({ + local function send_data(what, tbl, query) + local ch_params = { task = task, - url = connect_prefix .. ip_addr, - body = body, - callback = gen_http_cb('attachments data', #attachment_rows), - mime_type = 'text/plain', - timeout = settings['timeout'], - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - }) then - rspamd_logger.errx(task, "cannot send attachments to clickhouse server %s: cannot make request", - settings['server']) + } + local ret = lua_clickhouse.insert(upstream, settings, ch_params, + query, tbl, + gen_success_cb(what, #tbl), + gen_fail_cb(what, #tbl)) + if not ret then + rspamd_logger.errx(task, "cannot send %s rows of %s data to clickhouse server %s: %s", + #tbl, what, ip_addr, 'cannot make HTTP request') end end + + send_data('generic data', main_rows, + clickhouse_main_row(settings['table'])) + + + if #attachment_rows > 1 then + send_data('attachments data', attachment_rows, + clickhouse_attachments_row(settings.attachments_table)) + end + if #urls_rows > 1 then - body = table.concat(urls_rows, ' ') - if not rspamd_http.request({ - task = task, - url = connect_prefix .. ip_addr, - body = body, - callback = gen_http_cb('urls data', #urls_rows), - mime_type = 'text/plain', - timeout = settings['timeout'], - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - }) then - rspamd_logger.errx(task, "cannot send urls to clickhouse server %s: cannot make request", - settings['server']) - end + send_data('urls data', urls_rows, + clickhouse_urls_row(settings.urls_table)) end + if #emails_rows > 1 then - body = table.concat(emails_rows, ' ') - if not rspamd_http.request({ - task = task, - url = connect_prefix .. ip_addr, - body = body, - callback = gen_http_cb('emails data', #emails_rows), - mime_type = 'text/plain', - timeout = settings['timeout'], - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - }) then - rspamd_logger.errx(task, "cannot send emails to clickhouse server %s: cannot make request", - settings['server']) - end + send_data('emails data', emails_rows, + clickhouse_emails_row(settings.emails_table)) end + if #asn_rows > 1 then - body = table.concat(asn_rows, ' ') - if not rspamd_http.request({ - task = task, - url = connect_prefix .. ip_addr, - body = body, - callback = gen_http_cb('asn data', #asn_rows), - mime_type = 'text/plain', - timeout = settings['timeout'], - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - }) then - rspamd_logger.errx(task, "cannot send asn info to clickhouse server %s: cannot make request", - settings['server']) - end + send_data('asn data', asn_rows, + clickhouse_asn_row(settings.asn_table)) end if #symbols_rows > 1 then - body = table.concat(symbols_rows, ' ') - if not rspamd_http.request({ - task = task, - url = connect_prefix .. ip_addr, - body = body, - callback = gen_http_cb('symbols data', #symbols_rows), - mime_type = 'text/plain', - timeout = settings['timeout'], - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - }) then - rspamd_logger.errx(task, "cannot send symbols info to clickhouse server %s: cannot make request", - settings['server']) - end - end - - for k,specific in pairs(specific_rows) do - if #specific > 1 then - body = table.concat(specific, ' ') - if not rspamd_http.request({ - task = task, - url = connect_prefix .. ip_addr, - body = body, - callback = gen_http_cb('domain specific data ('..k..')', #specific), - mime_type = 'text/plain', - timeout = settings['timeout'], - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - }) then - rspamd_logger.errx(task, "cannot send data for domain %s to clickhouse server %s: cannot make request", - k, settings['server']) - end - end + send_data('symbols data', symbols_rows, + clickhouse_symbols_row(settings.symbols_table)) end for k,crows in pairs(custom_rows) do if #crows > 1 then - body = table.concat(crows, ' ') - if not rspamd_http.request({ - task = task, - url = connect_prefix .. ip_addr, - body = body, - callback = gen_http_cb('custom data ('..k..')', #crows), - mime_type = 'text/plain', - timeout = settings['timeout'], - no_ssl_verify = settings.no_ssl_verify, - user = settings.user, - password = settings.password, - }) then - rspamd_logger.errx(task, "cannot send custom data %s to clickhouse server %s: cannot make request", - k, settings['server']) - end + send_data('custom data ('..k..')', settings.custom_rules[k].first_row(), + crows) end end end -local function clickhouse_quote(str) - if str then - return str:gsub('[\'\\]', '\\%1'):lower() - else - return '' - end -end - local function clickhouse_collect(task) if not settings.allow_local and rspamd_lua_utils.is_rspamc_or_controller(task) then return end @@ -615,16 +486,34 @@ local function clickhouse_collect(task) gmt = false }) - local elt = string.format("(today(),%d,'%s','%s','%s',%.2f,%d,%d,'%s','%s','%s','%s','%s','%s',%d,'%s','%s','%s','%s','%s','%s','%s')", - timestamp, - clickhouse_quote(from_domain), clickhouse_quote(mime_domain), ip_str, score, - nrcpts, task:get_size(), whitelist, bayes, fuzzy, fann, - dkim, dmarc, nurls, task:get_metric_action('default'), - clickhouse_quote(from_user), clickhouse_quote(mime_user), - clickhouse_quote(rcpt_user), clickhouse_quote(rcpt_domain), - clickhouse_quote(list_id), task:get_digest()) - table.insert(main_rows, elt) + local action = task:get_metric_action('default') + + table.insert(main_rows, { + today(timestamp), + timestamp, + from_domain, + mime_domain, + ip_str, + score, + nrcpts, + task:get_size(), + whitelist, + bayes, + fuzzy, + fann, + dkim, + dmarc, + nurls, + action, + from_user, + mime_user, + rcpt_user, + rcpt_domain, + list_id, + task:get_digest() + }) +--[[ TODO: has been broken if settings['from_map'] and dkim == 'allow' then -- Use dkim local das = task:get_symbol(settings['dkim_allow_symbols'][1]) @@ -632,16 +521,14 @@ local function clickhouse_collect(task) for _,dkim_domain in ipairs(das[1]['options']) do local specific = settings.from_map:get_key(dkim_domain) if specific then - if not specific_rows[specific] then - local first = clickhouse_main_row(specific) - specific_rows[specific] = {first} - end + specific_rows[specific] = {} table.insert(specific_rows[specific], elt) end end end end +--]] -- Attachments step local attachments_fnames = {} @@ -652,23 +539,24 @@ local function clickhouse_collect(task) local fname = part:get_filename() if fname then - table.insert(attachments_fnames, string.format("'%s'", clickhouse_quote(fname))) + table.insert(attachments_fnames, fname) local type, subtype = part:get_type() - table.insert(attachments_ctypes, string.format("'%s/%s'", - clickhouse_quote(type), clickhouse_quote(subtype))) - table.insert(attachments_lengths, string.format("%s", tostring(part:get_length()))) - table.insert(attachments_digests, string.format("'%s'", string.sub(part:get_digest(), 1, 16))) + table.insert(attachments_ctypes, string.format("%s/%s", + type, subtype)) + table.insert(attachments_lengths, part:get_length()) + table.insert(attachments_digests, string.sub(part:get_digest(), 1, 16)) end end if #attachments_fnames > 0 then - elt = string.format("(today(),'%s',[%s],[%s],[%s],[%s])", + table.insert(attachment_rows, { + today(timestamp), task:get_digest(), - table.concat(attachments_fnames, ','), - table.concat(attachments_ctypes, ','), - table.concat(attachments_lengths, ','), - table.concat(attachments_digests, ',')) - table.insert(attachment_rows, elt) + attachments_fnames, + attachments_ctypes, + attachments_lengths, + attachments_digests, + }) end -- Urls step @@ -676,40 +564,39 @@ local function clickhouse_collect(task) local urls_urls = {} if task:has_urls(false) then for _,u in ipairs(task:get_urls(false)) do - table.insert(urls_tlds, string.format("'%s'", clickhouse_quote(u:get_tld()))) + table.insert(urls_tlds, u:get_tld()) if settings['full_urls'] then - table.insert(urls_urls, string.format("'%s'", - clickhouse_quote(u:get_text()))) + table.insert(urls_urls, u:get_text()) else - table.insert(urls_urls, string.format("'%s'", - clickhouse_quote(u:get_host()))) + table.insert(urls_urls, u:get_host()) end end end if #urls_tlds > 0 then - elt = string.format("(today(),'%s',[%s],[%s])", + table.insert(urls_rows, { + today(timestamp), task:get_digest(), - table.concat(urls_tlds, ','), - table.concat(urls_urls, ',')) - table.insert(urls_rows, elt) + urls_tlds, + urls_urls + }) end -- Emails step local emails = {} if task:has_urls(true) then for _,u in ipairs(task:get_emails()) do - table.insert(emails, string.format("'%s'", clickhouse_quote( - string.format('%s@%s', u:get_user(), u:get_host()) - ))) + table.insert(emails, + string.format('%s@%s', u:get_user(), u:get_host())) end end if #emails > 0 then - elt = string.format("(today(),'%s',[%s])", - task:get_digest(), - table.concat(emails, ',')) - table.insert(emails_rows, elt) + table.insert(emails_rows, { + today(timestamp), + task:get_digest(), + emails, + }) end -- ASN information @@ -728,10 +615,13 @@ local function clickhouse_collect(task) if ret then ipnet = ret end - elt = string.format("(today(),'%s','%s','%s','%s')", + table.insert(asn_rows, { + today(timestamp), task:get_digest(), - clickhouse_quote(asn), clickhouse_quote(country), clickhouse_quote(ipnet)) - table.insert(asn_rows, elt) + asn, + country, + ipnet + }) end -- Symbols info @@ -742,25 +632,22 @@ local function clickhouse_collect(task) local options_tab = {} for _,s in ipairs(symbols) do - table.insert(syms_tab, string.format("'%s'", - clickhouse_quote(s.name or ''))) - table.insert(scores_tab, string.format('%.3f', s.score)) + table.insert(syms_tab, s.name or '') + table.insert(scores_tab, s.score) if s.options then - table.insert(options_tab, string.format("'%s'", - clickhouse_quote(table.concat(s.options, ',')))) + table.insert(options_tab, table.concat(s.options, ',')) else table.insert(options_tab, "''"); end end - elt = string.format("(today(),'%s',[%s],[%s],[%s])", - task:get_digest(), - table.concat(syms_tab, ','), - table.concat(scores_tab, ','), - table.concat(options_tab, ',')) - - table.insert(symbols_rows, elt) + table.insert(symbols_rows, { + today(timestamp), + syms_tab, + scores_tab, + options_tab + }) end -- Custom data @@ -778,11 +665,9 @@ local function clickhouse_collect(task) attachment_rows = {} urls_rows = {} emails_rows = {} - specific_rows = {} asn_rows = {} symbols_rows = {} custom_rows = {} - clickhouse_first_row() end end @@ -981,8 +866,6 @@ if opts then return end - - clickhouse_first_row() rspamd_config:register_symbol({ name = 'CLICKHOUSE_COLLECT', type = 'idempotent', -- 2.39.5