summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-02-18 16:43:50 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-02-18 16:43:50 +0000
commit79f00df258e56deb7fe1957e6886a84e85c1cd6e (patch)
tree28a84d5891fee4e7e79a5291029a50adab071acf
parent9ef70a3754121b2161e440fa295963dbac4269bc (diff)
downloadrspamd-79f00df258e56deb7fe1957e6886a84e85c1cd6e.tar.gz
rspamd-79f00df258e56deb7fe1957e6886a84e85c1cd6e.zip
[Fix] Rework elasticsearch plugin
-rw-r--r--CMakeLists.txt3
-rw-r--r--src/plugins/lua/dynamic_conf.lua2
-rw-r--r--src/plugins/lua/elastic.lua266
3 files changed, 155 insertions, 116 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index dcfc35cbb..1ba6e7672 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1364,6 +1364,9 @@ IF(INSTALL_WEBUI MATCHES "ON")
INSTALL(DIRECTORY "interface/" DESTINATION ${WWWDIR} PATTERN ".git" EXCLUDE)
ENDIF(INSTALL_WEBUI MATCHES "ON")
+
+INSTALL(DIRECTORY "contrib/elastic/" DESTINATION "${PLUGINSDIR}/elastic" PATTERN ".git" EXCLUDE)
+
ADD_CUSTOM_TARGET(dist ${CMAKE_SOURCE_DIR}/dist.sh
"${CMAKE_BINARY_DIR}/rspamd-${RSPAMD_VERSION}.tar.xz" "${TAR}"
COMMENT "Create source distribution"
diff --git a/src/plugins/lua/dynamic_conf.lua b/src/plugins/lua/dynamic_conf.lua
index 569d00cbd..a4b7527a5 100644
--- a/src/plugins/lua/dynamic_conf.lua
+++ b/src/plugins/lua/dynamic_conf.lua
@@ -232,7 +232,7 @@ end
local section = rspamd_config:get_all_opt("dynamic_conf")
if section then
- redis_params = rspamd_parse_redis_server('dynamic_conf')
+ redis_params = rspamd_redis.parse_redis_server('dynamic_conf')
if not redis_params then
rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
return
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua
index f3bf69ead..ae789f0c5 100644
--- a/src/plugins/lua/elastic.lua
+++ b/src/plugins/lua/elastic.lua
@@ -18,31 +18,34 @@ limitations under the License.
local rspamd_logger = require 'rspamd_logger'
local rspamd_http = require "rspamd_http"
local rspamd_lua_utils = require "lua_util"
+local util = require "rspamd_util"
local ucl = require "ucl"
local hash = require "rspamd_cryptobox_hash"
+local rspamd_redis = require "lua_redis"
+local upstream_list = require "rspamd_upstream_list"
if confighelp then
return
end
-local redis_params
-redis_params = rspamd_parse_redis_server('elastic')
-
local rows = {}
local nrows = 0
local elastic_template
local redis_params
+local N = "elastic"
+local E = {}
+local connect_prefix = 'http://'
+local enabled = true
local settings = {
limit = 10,
index_pattern = 'rspamd-%Y.%m.%d',
- server = 'localhost:9200',
template_file = '/etc/rspamd/rspamd_template.json',
kibana_file = '/etc/rspamd/kibana.json',
key_prefix = 'elastic-',
expire = 3600,
- debug = false,
failover = false,
import_kibana = false,
+ use_https = false,
}
local function read_file(path)
@@ -52,30 +55,36 @@ local function read_file(path)
file:close()
return content
end
+
local function elastic_send_data(task)
local es_index = os.date(settings['index_pattern'])
- local bulk_json = ""
- for key,value in pairs(rows) do
- bulk_json = bulk_json..'{ "index" : { "_index" : "'..es_index..'", "_type" : "logs" ,"pipeline": "rspamd-geoip"} }'.."\n"
- bulk_json = bulk_json..ucl.to_format(value, 'json-compact').."\n"
+ local tbl = {}
+ for _,value in pairs(rows) do
+ table.insert(tbl, '{ "index" : { "_index" : "'..es_index..
+ '", "_type" : "logs" ,"pipeline": "rspamd-geoip"} }')
+ table.insert(tbl, ucl.to_format(value, 'json-compact'))
end
- local function http_index_data_callback(err, code, body, headers)
- -- todo error handling we may store the rows it into redis and send it again later
- if settings['debug'] then
- rspamd_logger.infox(task, "After create data %1",body)
- end
- if code ~= 200 or err_message then
+
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local ip_addr = upstream:get_addr():to_string(true)
+
+ local push_url = connect_prefix .. ip_addr .. '/'..es_index..'/_bulk'
+ local bulk_json = table.concat(tbl, "\n")
+ local function http_index_data_callback(_, code, body, _)
+ -- todo error handling we may store the rows it into redis and send it again late
+ rspamd_logger.debugm(N, task, "After create data %1",body)
+ if code ~= 200 then
if settings['failover'] then
local h = hash.create()
h:update(bulk_json)
local key = settings['key_prefix'] ..es_index..":".. h:base32():sub(1, 20)
- local data = rspamd_util.zstd_compress(bulk_json)
+ local data = util.zstd_compress(bulk_json)
local function redis_set_cb(err)
if err ~=nil then
rspamd_logger.errx(task, 'redis_set_cb received error: %1', err)
end
end
- rspamd_redis_make_request(task,
+ rspamd_redis.make_request(task,
redis_params, -- connect params
key, -- hash key
true, -- is write
@@ -87,7 +96,7 @@ local function elastic_send_data(task)
end
end
rspamd_http.request({
- url = 'http://'..settings['server']..'/'..es_index..'/_bulk',
+ url = push_url,
headers = {
['Content-Type'] = 'application/x-ndjson',
},
@@ -167,7 +176,8 @@ end
local function elastic_collect(task)
if not enabled then return end
if rspamd_lua_utils.is_rspamc_or_controller(task) then return end
- local row = {['rspam_meta'] = get_general_metadata(task), ['@timestamp'] = os.time(os.date("*t")).."000"}
+ local row = {['rspam_meta'] = get_general_metadata(task),
+ ['@timestamp'] = tostring(util.get_time()).."000"}
table.insert(rows, row)
nrows = nrows + 1
if nrows > settings['limit'] then
@@ -176,131 +186,139 @@ local function elastic_collect(task)
rows = {}
end
end
+
+
local opts = rspamd_config:get_all_opt('elastic')
-local enabled = true;
-local function check_elastic_server(ev_base)
- local function http_callback(err, code, body, headers)
+local function check_elastic_server(cfg, ev_base, _)
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local ip_addr = upstream:get_addr():to_string(true)
+
+ local plugins_url = connect_prefix .. ip_addr .. '/_nodes/plugins'
+ local function http_callback(_, _, body, _)
local parser = ucl.parser()
local res,err = parser:parse_string(body)
if not res then
- rspamd_logger.infox(rspamd_config, 'failed to query elasticsearch server %1, disabling module',settings['server'])
+ rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s',
+ plugins_url, err)
enabled = false;
return
end
local obj = parser:get_object()
for node,value in pairs(obj['nodes']) do
local plugin_found = false
- for i,plugin in pairs(value['plugins']) do
+ for _,plugin in pairs(value['plugins']) do
if plugin['name'] == 'ingest-geoip' then
plugin_found = true
end
end
if not plugin_found then
- rspamd_logger.infox(rspamd_config, 'Unable to find ingest-geoip on %1 node, disabling module',node)
+ rspamd_logger.infox(rspamd_config,
+ 'Unable to find ingest-geoip on %1 node, disabling module', node)
enabled = false
return
end
end
end
rspamd_http.request({
- url = 'http://'..settings['server']..'/_nodes/plugins',
+ url = plugins_url,
ev_base = ev_base,
+ config = cfg,
method = 'get',
callback = http_callback
})
- if enabled then
- local function http_ingest_callback(err, code, body, headers)
- if code ~= 200 or err_message then
- -- pipeline not exist
- end
- end
- rspamd_http.request({
- url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
- ev_base = ev_base,
- method = 'get',
- callback = http_ingest_callback
- })
- -- lets try to create ingest pipeline if not exist
- rspamd_http.request({
- url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
- task = task,
- body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}',
- method = 'put',
- })
- end
- local function http_template_create_callback(err, code, body, headers)
- end
- local function http_template_exist_callback(err, code, body, headers)
- if code ~= 200 or err_message then
- rspamd_http.request({
- url = 'http://'..settings['server']..'/_template/rspamd',
- task = task,
- body = elastic_template,
- method = 'put',
- callback = http_template_create_callback
- })
- end
- end
- rspamd_http.request({
- url = 'http://'..settings['server']..'/_template/rspamd',
- task = task,
- method = 'head',
- callback = http_template_exist_callback
- })
end
+
-- import ingest pipeline and kibana dashboard/visualization
-local function initial_setup(worker)
- if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end
+local function initial_setup(cfg, ev_base, worker)
+ if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end
+
+ local upstream = settings.upstream:get_upstream_round_robin()
+ local ip_addr = upstream:get_addr():to_string(true)
if enabled then
-- create ingest pipeline
+ local geoip_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip'
+ local function geoip_cb(_, code, _, _)
+ if code ~= 200 then
+ rspamd_logger.errx('cannot get data from %s: %s', geoip_url, code)
+ enabled = false
+ end
+ end
rspamd_http.request({
- url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
- task = task,
+ url = geoip_url,
+ ev_base = ev_base,
+ config = cfg,
+ callback = geoip_cb,
body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}',
method = 'put',
})
-- create template mappings if not exist
- local function http_template_exist_callback(err, code, body, headers)
- if code ~= 200 or err_message then
+ local template_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip'
+ local function http_template_put_callback(_, code, _, _)
+ if code ~= 200 then
+ rspamd_logger.errx('cannot put template to %s: %s', template_url, code)
+ enabled = false
+ end
+ end
+ local function http_template_exist_callback(_, code, _, _)
+ if code ~= 200 then
rspamd_http.request({
- url = 'http://'..settings['server']..'/_template/rspamd',
- task = task,
+ url = template_url,
+ ev_base = ev_base,
+ config = cfg,
body = elastic_template,
method = 'put',
+ callback = http_template_put_callback,
})
end
end
+
rspamd_http.request({
- url = 'http://'..settings['server']..'/_template/rspamd',
- task = task,
+ url = template_url,
+ ev_base = ev_base,
+ config = cfg,
method = 'head',
callback = http_template_exist_callback
})
-- add kibana dashboard and visualizations
- local kibana_mappings = read_file(settings['kibana_file']);
if enabled and settings['import_kibana'] then
- local kibana_mappings = read_file(settings['kibana_file']);
+ local kibana_mappings = read_file(settings['kibana_file'])
if kibana_mappings then
local parser = ucl.parser()
local res,err = parser:parse_string(kibana_mappings)
if not res then
+ rspamd_logger.infox(rspamd_config, 'kibana template cannot be parsed: %s',
+ err)
+ enabled = false
+
return
end
local obj = parser:get_object()
- local bulk_json = ""
+ local tbl = {}
for _,item in ipairs(obj) do
- bulk_json = bulk_json..'{ "index" : { "_index" : ".kibana", "_type" : "'..item["_type"]..'" ,"_id": "'..item["_id"]..'"} }'.."\n"
- bulk_json = bulk_json..ucl.to_format(item['_source'], 'json-compact').."\n"
+ table.insert(tbl, '{ "index" : { "_index" : ".kibana", "_type" : "'..
+ item["_type"]..'" ,"_id": "'..
+ item["_id"]..'"} }')
+ table.insert(tbl, ucl.to_format(item['_source'], 'json-compact'))
+ end
+
+ local kibana_url = connect_prefix .. ip_addr ..'/.kibana/_bulk'
+ local function kibana_template_callback(_, code, _, _)
+ if code ~= 200 then
+ rspamd_logger.errx('cannot put template to %s: %s', kibana_url, code)
+ enabled = false
+ end
end
rspamd_http.request({
- url = 'http://'..settings['server']..'/.kibana/_bulk',
+ url = kibana_url,
+ ev_base = ev_base,
+ config = cfg,
headers = {
['Content-Type'] = 'application/x-ndjson',
},
- body = bulk_json,
- task = task,
- method = 'post'
+ body = table.concat(tbl, "\n"),
+ method = 'post',
+ callback = kibana_template_callback
})
else
rspamd_logger.infox(rspamd_config, 'kibana templatefile not found')
@@ -308,38 +326,56 @@ local function initial_setup(worker)
end
end
end
-rspamd_config:add_on_load(function(cfg, ev_base,worker)
- if opts then
- for k,v in pairs(opts) do
- settings[k] = v
- end
- if not settings['server'] then
- rspamd_logger.infox(rspamd_config, 'no elastic servers are specified, disabling module')
- enabled = false
- return
- end
- if not settings['template_file'] then
- rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module')
- enabled = false
- return
- end
+
+redis_params = rspamd_redis.parse_redis_server('elastic')
+
+if redis_params and opts then
+ for k,v in pairs(opts) do
+ settings[k] = v
end
- elastic_template = read_file(settings['template_file']);
- if not elastic_template then
- rspamd_logger.infox(rspamd_config, 'elastic unable to read mappings, disabling module')
- enabled = false
- return
+
+ if not settings['server'] and not settings['servers'] then
+ rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
+ rspamd_lua_utils.disable_module(N, "config")
+ else
+ if settings.use_https then
+ connect_prefix = 'https://'
+ end
+
+ settings.upstream = upstream_list.create(rspamd_config,
+ settings['server'] or settings['servers'], 9200)
+
+ if not settings.upstream then
+ rspamd_logger.errx('cannot parse elastic address: %s',
+ settings['server'] or settings['servers'])
+ rspamd_lua_utils.disable_module(N, "config")
+ return
+ end
+ if not settings['template_file'] then
+ rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module')
+ rspamd_lua_utils.disable_module(N, "config")
+ return
+ end
+
+ elastic_template = read_file(settings['template_file']);
+ if not elastic_template then
+ rspamd_logger.infox(rspamd_config, 'elastic unable to read %s, disabling module',
+ settings['template_file'])
+ rspamd_lua_utils.disable_module(N, "config")
+ return
+ end
+
+ rspamd_config:register_symbol({
+ name = 'ELASTIC_COLLECT',
+ type = 'idempotent',
+ callback = elastic_collect,
+ priority = 10
+ })
+
+ rspamd_config:add_on_load(function(cfg, ev_base,worker)
+ check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements
+ initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations
+ end)
end
- redis_params = rspamd_parse_redis_server('elastic')
- check_elastic_server(ev_base) -- check for elasticsearch requirements
- initial_setup(worker) -- import mappings pipeline and visualizations
-end)
-if enabled == true then
- rspamd_config:register_symbol({
- name = 'ELASTIC_COLLECT',
- type = 'postfilter',
- callback = elastic_collect,
- priority = 10
- })
end