aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/plugins/lua/clickhouse.lua44
1 files changed, 33 insertions, 11 deletions
diff --git a/src/plugins/lua/clickhouse.lua b/src/plugins/lua/clickhouse.lua
index 87738c8af..cfa066728 100644
--- a/src/plugins/lua/clickhouse.lua
+++ b/src/plugins/lua/clickhouse.lua
@@ -17,6 +17,7 @@ limitations under the License.
local rspamd_logger = require 'rspamd_logger'
local rspamd_http = require "rspamd_http"
local rspamd_lua_utils = require "lua_util"
+local upstream_list = require "rspamd_upstream_list"
if confighelp then
return
@@ -247,20 +248,25 @@ local function clickhouse_check_symbol(task, symbols, need_score)
end
local function clickhouse_send_data(task)
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local ip_addr = tostring(upstream:get_addr())
+
local function http_cb(err_message, code, _, _)
if code ~= 200 or err_message then
rspamd_logger.errx(task, "cannot send data to clickhouse server %s: %s:%s",
- settings['server'], code, err_message)
+ ip_addr, code, err_message)
+ upstream:fail()
else
rspamd_logger.infox(task, "sent %s rows to clickhouse server %s",
- settings['limit'], settings['server'])
+ settings['limit'], ip_addr)
+ upstream:ok()
end
end
local body = table.concat(rows, ' ')
if not rspamd_http.request({
task = task,
- url = connect_prefix .. settings['server'],
+ url = connect_prefix .. ip_addr,
body = body,
callback = http_cb,
gzip = settings.use_gzip,
@@ -275,7 +281,7 @@ local function clickhouse_send_data(task)
body = table.concat(attachment_rows, ' ')
if not rspamd_http.request({
task = task,
- url = connect_prefix .. settings['server'],
+ url = connect_prefix .. ip_addr,
body = body,
callback = http_cb,
mime_type = 'text/plain',
@@ -289,7 +295,7 @@ local function clickhouse_send_data(task)
body = table.concat(urls_rows, ' ')
if not rspamd_http.request({
task = task,
- url = connect_prefix .. settings['server'],
+ url = connect_prefix .. ip_addr,
body = body,
callback = http_cb,
mime_type = 'text/plain',
@@ -303,7 +309,7 @@ local function clickhouse_send_data(task)
body = table.concat(asn_rows, ' ')
if not rspamd_http.request({
task = task,
- url = connect_prefix .. settings['server'],
+ url = connect_prefix .. ip_addr,
body = body,
callback = http_cb,
mime_type = 'text/plain',
@@ -318,7 +324,7 @@ local function clickhouse_send_data(task)
body = table.concat(symbols_rows, ' ')
if not rspamd_http.request({
task = task,
- url = connect_prefix .. settings['server'],
+ url = connect_prefix .. ip_addr,
body = body,
callback = http_cb,
mime_type = 'text/plain',
@@ -334,7 +340,7 @@ local function clickhouse_send_data(task)
body = table.concat(specific, ' ')
if not rspamd_http.request({
task = task,
- url = connect_prefix .. settings['server'],
+ url = connect_prefix .. ip_addr,
body = body,
callback = http_cb,
mime_type = 'text/plain',
@@ -645,7 +651,7 @@ if opts then
settings[k] = v
end
- if not settings['server'] then
+ if not settings['server'] and not settings['servers'] then
rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
else
settings['from_map'] = rspamd_map_add('clickhouse', 'from_tables',
@@ -654,6 +660,15 @@ if opts then
connect_prefix = 'https://'
end
+ settings.upstream = upstream_list.create(rspamd_config,
+ settings['server'] or settings['servers'], 8123)
+
+ if not settings.upstream then
+ rspamd_logger.errx('cannot parse clickhouse address: %s',
+ settings['server'] or settings['servers'])
+ return
+ end
+
clickhouse_first_row()
rspamd_config:register_symbol({
name = 'CLICKHOUSE_COLLECT',
@@ -668,10 +683,17 @@ if opts then
end)
-- Create tables on load
rspamd_config:add_on_load(function(cfg, ev_base, worker)
+ -- XXX: need to call this script for all upstreams
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local ip_addr = tostring(upstream:get_addr())
+
local function http_cb(err_message, code, _, _)
if code ~= 200 or err_message then
rspamd_logger.errx(rspamd_config, "cannot create table in clickhouse server %s: %s:%s",
- settings['server'], code, err_message)
+ ip_addr, code, err_message)
+ upstream:fail()
+ else
+ upstream:ok()
end
end
@@ -679,7 +701,7 @@ if opts then
if not rspamd_http.request({
ev_base = ev_base,
config = cfg,
- url = connect_prefix .. settings['server'],
+ url = connect_prefix .. ip_addr,
body = sql,
callback = http_cb,
mime_type = 'text/plain',