local rspamd_logger = require 'rspamd_logger'
local rspamd_http = require "rspamd_http"
local rspamd_lua_utils = require "lua_util"
+local upstream_list = require "rspamd_upstream_list"
if confighelp then
return
end
local function clickhouse_send_data(task)
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local ip_addr = tostring(upstream:get_addr())
+
local function http_cb(err_message, code, _, _)
if code ~= 200 or err_message then
rspamd_logger.errx(task, "cannot send data to clickhouse server %s: %s:%s",
- settings['server'], code, err_message)
+ ip_addr, code, err_message)
+ upstream:fail()
else
rspamd_logger.infox(task, "sent %s rows to clickhouse server %s",
- settings['limit'], settings['server'])
+ settings['limit'], ip_addr)
+ upstream:ok()
end
end
local body = table.concat(rows, ' ')
if not rspamd_http.request({
task = task,
- url = connect_prefix .. settings['server'],
+ url = connect_prefix .. ip_addr,
body = body,
callback = http_cb,
gzip = settings.use_gzip,
body = table.concat(attachment_rows, ' ')
if not rspamd_http.request({
task = task,
- url = connect_prefix .. settings['server'],
+ url = connect_prefix .. ip_addr,
body = body,
callback = http_cb,
mime_type = 'text/plain',
body = table.concat(urls_rows, ' ')
if not rspamd_http.request({
task = task,
- url = connect_prefix .. settings['server'],
+ url = connect_prefix .. ip_addr,
body = body,
callback = http_cb,
mime_type = 'text/plain',
body = table.concat(asn_rows, ' ')
if not rspamd_http.request({
task = task,
- url = connect_prefix .. settings['server'],
+ url = connect_prefix .. ip_addr,
body = body,
callback = http_cb,
mime_type = 'text/plain',
body = table.concat(symbols_rows, ' ')
if not rspamd_http.request({
task = task,
- url = connect_prefix .. settings['server'],
+ url = connect_prefix .. ip_addr,
body = body,
callback = http_cb,
mime_type = 'text/plain',
body = table.concat(specific, ' ')
if not rspamd_http.request({
task = task,
- url = connect_prefix .. settings['server'],
+ url = connect_prefix .. ip_addr,
body = body,
callback = http_cb,
mime_type = 'text/plain',
settings[k] = v
end
- if not settings['server'] then
+ if not settings['server'] and not settings['servers'] then
rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
else
settings['from_map'] = rspamd_map_add('clickhouse', 'from_tables',
connect_prefix = 'https://'
end
+ settings.upstream = upstream_list.create(rspamd_config,
+ settings['server'] or settings['servers'], 8123)
+
+ if not settings.upstream then
+ rspamd_logger.errx('cannot parse clickhouse address: %s',
+ settings['server'] or settings['servers'])
+ return
+ end
+
clickhouse_first_row()
rspamd_config:register_symbol({
name = 'CLICKHOUSE_COLLECT',
end)
-- Create tables on load
rspamd_config:add_on_load(function(cfg, ev_base, worker)
+ -- XXX: need to call this script for all upstreams
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local ip_addr = tostring(upstream:get_addr())
+
local function http_cb(err_message, code, _, _)
if code ~= 200 or err_message then
rspamd_logger.errx(rspamd_config, "cannot create table in clickhouse server %s: %s:%s",
- settings['server'], code, err_message)
+ ip_addr, code, err_message)
+ upstream:fail()
+ else
+ upstream:ok()
end
end
if not rspamd_http.request({
ev_base = ev_base,
config = cfg,
- url = connect_prefix .. settings['server'],
+ url = connect_prefix .. ip_addr,
body = sql,
callback = http_cb,
mime_type = 'text/plain',