]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Implement upstreams logic for clickhouse exporter
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 28 Nov 2017 08:18:24 +0000 (08:18 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 28 Nov 2017 08:18:24 +0000 (08:18 +0000)
src/plugins/lua/clickhouse.lua

index 87738c8afe790c7f64d7cef8230bea71df382e2a..cfa06672888d8960ee3bfe2c37250b4f6b9fe0ed 100644 (file)
@@ -17,6 +17,7 @@ limitations under the License.
 local rspamd_logger = require 'rspamd_logger'
 local rspamd_http = require "rspamd_http"
 local rspamd_lua_utils = require "lua_util"
+local upstream_list = require "rspamd_upstream_list"
 
 if confighelp then
   return
@@ -247,20 +248,25 @@ local function clickhouse_check_symbol(task, symbols, need_score)
 end
 
 local function clickhouse_send_data(task)
+  local upstream = settings.upstream:get_upstream_round_robin()
+  local ip_addr = tostring(upstream:get_addr())
+
   local function http_cb(err_message, code, _, _)
     if code ~= 200 or err_message then
       rspamd_logger.errx(task, "cannot send data to clickhouse server %s: %s:%s",
-        settings['server'], code, err_message)
+        ip_addr, code, err_message)
+      upstream:fail()
     else
       rspamd_logger.infox(task, "sent %s rows to clickhouse server %s",
-        settings['limit'], settings['server'])
+        settings['limit'], ip_addr)
+      upstream:ok()
     end
   end
 
   local body = table.concat(rows, ' ')
   if not rspamd_http.request({
       task = task,
-      url = connect_prefix .. settings['server'],
+      url = connect_prefix .. ip_addr,
       body = body,
       callback = http_cb,
       gzip = settings.use_gzip,
@@ -275,7 +281,7 @@ local function clickhouse_send_data(task)
     body = table.concat(attachment_rows, ' ')
     if not rspamd_http.request({
       task = task,
-      url = connect_prefix .. settings['server'],
+      url = connect_prefix .. ip_addr,
       body = body,
       callback = http_cb,
       mime_type = 'text/plain',
@@ -289,7 +295,7 @@ local function clickhouse_send_data(task)
     body = table.concat(urls_rows, ' ')
     if not rspamd_http.request({
       task = task,
-      url = connect_prefix .. settings['server'],
+      url = connect_prefix .. ip_addr,
       body = body,
       callback = http_cb,
       mime_type = 'text/plain',
@@ -303,7 +309,7 @@ local function clickhouse_send_data(task)
     body = table.concat(asn_rows, ' ')
     if not rspamd_http.request({
       task = task,
-      url = connect_prefix .. settings['server'],
+      url = connect_prefix .. ip_addr,
       body = body,
       callback = http_cb,
       mime_type = 'text/plain',
@@ -318,7 +324,7 @@ local function clickhouse_send_data(task)
     body = table.concat(symbols_rows, ' ')
     if not rspamd_http.request({
       task = task,
-      url = connect_prefix .. settings['server'],
+      url = connect_prefix .. ip_addr,
       body = body,
       callback = http_cb,
       mime_type = 'text/plain',
@@ -334,7 +340,7 @@ local function clickhouse_send_data(task)
       body = table.concat(specific, ' ')
       if not rspamd_http.request({
         task = task,
-        url = connect_prefix .. settings['server'],
+        url = connect_prefix .. ip_addr,
         body = body,
         callback = http_cb,
         mime_type = 'text/plain',
@@ -645,7 +651,7 @@ if opts then
       settings[k] = v
     end
 
-    if not settings['server'] then
+    if not settings['server'] and not settings['servers'] then
       rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
     else
       settings['from_map'] = rspamd_map_add('clickhouse', 'from_tables',
@@ -654,6 +660,15 @@ if opts then
         connect_prefix = 'https://'
       end
 
+      settings.upstream = upstream_list.create(rspamd_config,
+        settings['server'] or settings['servers'], 8123)
+
+      if not settings.upstream then
+        rspamd_logger.errx('cannot parse clickhouse address: %s',
+            settings['server'] or settings['servers'])
+        return
+      end
+
       clickhouse_first_row()
       rspamd_config:register_symbol({
         name = 'CLICKHOUSE_COLLECT',
@@ -668,10 +683,17 @@ if opts then
       end)
       -- Create tables on load
       rspamd_config:add_on_load(function(cfg, ev_base, worker)
+        -- XXX: need to call this script for all upstreams
+        local upstream = settings.upstream:get_upstream_round_robin()
+        local ip_addr = tostring(upstream:get_addr())
+
         local function http_cb(err_message, code, _, _)
           if code ~= 200 or err_message then
             rspamd_logger.errx(rspamd_config, "cannot create table in clickhouse server %s: %s:%s",
-              settings['server'], code, err_message)
+              ip_addr, code, err_message)
+            upstream:fail()
+          else
+            upstream:ok()
           end
         end
 
@@ -679,7 +701,7 @@ if opts then
           if not rspamd_http.request({
             ev_base = ev_base,
             config = cfg,
-            url = connect_prefix .. settings['server'],
+            url = connect_prefix .. ip_addr,
             body = sql,
             callback = http_cb,
             mime_type = 'text/plain',