diff options
-rw-r--r-- | src/plugins/lua/clickhouse.lua | 44 |
1 files changed, 33 insertions, 11 deletions
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua index 87738c8af..cfa066728 100644 --- a/src/plugins/lua/clickhouse.lua +++ b/src/plugins/lua/clickhouse.lua @@ -17,6 +17,7 @@ limitations under the License. local rspamd_logger = require 'rspamd_logger' local rspamd_http = require "rspamd_http" local rspamd_lua_utils = require "lua_util" +local upstream_list = require "rspamd_upstream_list" if confighelp then return @@ -247,20 +248,25 @@ local function clickhouse_check_symbol(task, symbols, need_score) end local function clickhouse_send_data(task) + local upstream = settings.upstream:get_upstream_round_robin() + local ip_addr = tostring(upstream:get_addr()) + local function http_cb(err_message, code, _, _) if code ~= 200 or err_message then rspamd_logger.errx(task, "cannot send data to clickhouse server %s: %s:%s", - settings['server'], code, err_message) + ip_addr, code, err_message) + upstream:fail() else rspamd_logger.infox(task, "sent %s rows to clickhouse server %s", - settings['limit'], settings['server']) + settings['limit'], ip_addr) + upstream:ok() end end local body = table.concat(rows, ' ') if not rspamd_http.request({ task = task, - url = connect_prefix .. settings['server'], + url = connect_prefix .. ip_addr, body = body, callback = http_cb, gzip = settings.use_gzip, @@ -275,7 +281,7 @@ local function clickhouse_send_data(task) body = table.concat(attachment_rows, ' ') if not rspamd_http.request({ task = task, - url = connect_prefix .. settings['server'], + url = connect_prefix .. ip_addr, body = body, callback = http_cb, mime_type = 'text/plain', @@ -289,7 +295,7 @@ local function clickhouse_send_data(task) body = table.concat(urls_rows, ' ') if not rspamd_http.request({ task = task, - url = connect_prefix .. settings['server'], + url = connect_prefix .. ip_addr, body = body, callback = http_cb, mime_type = 'text/plain', @@ -303,7 +309,7 @@ local function clickhouse_send_data(task) body = table.concat(asn_rows, ' ') if not rspamd_http.request({ task = task, - url = connect_prefix .. settings['server'], + url = connect_prefix .. ip_addr, body = body, callback = http_cb, mime_type = 'text/plain', @@ -318,7 +324,7 @@ local function clickhouse_send_data(task) body = table.concat(symbols_rows, ' ') if not rspamd_http.request({ task = task, - url = connect_prefix .. settings['server'], + url = connect_prefix .. ip_addr, body = body, callback = http_cb, mime_type = 'text/plain', @@ -334,7 +340,7 @@ local function clickhouse_send_data(task) body = table.concat(specific, ' ') if not rspamd_http.request({ task = task, - url = connect_prefix .. settings['server'], + url = connect_prefix .. ip_addr, body = body, callback = http_cb, mime_type = 'text/plain', @@ -645,7 +651,7 @@ if opts then settings[k] = v end - if not settings['server'] then + if not settings['server'] and not settings['servers'] then rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module') else settings['from_map'] = rspamd_map_add('clickhouse', 'from_tables', @@ -654,6 +660,15 @@ if opts then connect_prefix = 'https://' end + settings.upstream = upstream_list.create(rspamd_config, + settings['server'] or settings['servers'], 8123) + + if not settings.upstream then + rspamd_logger.errx('cannot parse clickhouse address: %s', + settings['server'] or settings['servers']) + return + end + clickhouse_first_row() rspamd_config:register_symbol({ name = 'CLICKHOUSE_COLLECT', @@ -668,10 +683,17 @@ if opts then end) -- Create tables on load rspamd_config:add_on_load(function(cfg, ev_base, worker) + -- XXX: need to call this script for all upstreams + local upstream = settings.upstream:get_upstream_round_robin() + local ip_addr = tostring(upstream:get_addr()) + local function http_cb(err_message, code, _, _) if code ~= 200 or err_message then rspamd_logger.errx(rspamd_config, "cannot create table in clickhouse server %s: %s:%s", - settings['server'], code, err_message) + ip_addr, code, err_message) + upstream:fail() + else + upstream:ok() end end @@ -679,7 +701,7 @@ if opts then if not rspamd_http.request({ ev_base = ev_base, config = cfg, - url = connect_prefix .. settings['server'], + url = connect_prefix .. ip_addr, body = sql, callback = http_cb, mime_type = 'text/plain', |