Przeglądaj źródła

[Feature] Preliminary import of the elasticsearch module

tags/1.7.0
Vsevolod Stakhov 6 lat temu
rodzic
commit
9ef70a3754

+ 21
- 0
conf/modules.d/elastic.conf Wyświetl plik

@@ -0,0 +1,21 @@
elastic {
# Push update when 10 records are collected (10 if unset)
limit = 10;
# IP:port of Elasticsearch server
server = "localhost:9200";
# Timeout to wait for response (5 seconds if unset)
timeout = 5;
# Elasticsearch template file (json format)
template_file = "/etc/rspamd/rspamd_template.json";
# Kibana prebuild visualizations and dashboard template (json format)
kibana_file = '/etc/rspamd/kibana.json',
# Elasticsearch index name pattern
index_pattern = "rspamd-%Y.%m.%d";
# Dump debug information
debug = false;
# Import kibana template
import_kibana = false;
.include(try=true,priority=5) "${DBDIR}/dynamic/elastic.conf"
.include(try=true,priority=1,duplicate=merge) "$LOCAL_CONFDIR/local.d/elastic.conf"
.include(try=true,priority=10) "$LOCAL_CONFDIR/override.d/elastic.conf"
}

+ 103
- 0
contrib/elastic/kibana.json Wyświetl plik

@@ -0,0 +1,103 @@
[
{
"_id": "6c6a2ed0-8660-11e7-85ae-fbc80f1b7844",
"_type": "dashboard",
"_source": {
"title": "Rspamd Dashboard",
"hits": 0,
"description": "",
"panelsJSON": "[{\"size_x\":6,\"size_y\":3,\"panelIndex\":1,\"type\":\"visualization\",\"id\":\"6413f870-80f6-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":1},{\"size_x\":6,\"size_y\":3,\"panelIndex\":2,\"type\":\"visualization\",\"id\":\"927debf0-8649-11e7-967f-798bfd7ac13a\",\"col\":7,\"row\":1},{\"size_x\":12,\"size_y\":3,\"panelIndex\":3,\"type\":\"visualization\",\"id\":\"efa3f7a0-80f6-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":7},{\"size_x\":12,\"size_y\":3,\"panelIndex\":4,\"type\":\"visualization\",\"id\":\"1f7d9210-80f7-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":10},{\"size_x\":6,\"size_y\":3,\"panelIndex\":5,\"type\":\"visualization\",\"id\":\"2be7b6f0-8649-11e7-967f-798bfd7ac13a\",\"col\":7,\"row\":4},{\"size_x\":6,\"size_y\":3,\"panelIndex\":6,\"type\":\"visualization\",\"id\":\"680b6480-826e-11e7-8a20-b7bc68c2e9e7\",\"col\":7,\"row\":13},{\"size_x\":6,\"size_y\":3,\"panelIndex\":7,\"type\":\"visualization\",\"id\":\"158dfc80-864d-11e7-bce7-4532b9d239a0\",\"col\":1,\"row\":4}]",
"optionsJSON": "{\"darkTheme\":false}",
"uiStateJSON": "{\"P-3\":{\"vis\":{\"defaultColors\":{\"0 - 100\":\"rgb(0,104,55)\"}}},\"P-4\":{\"vis\":{\"defaultColors\":{\"0 - 100\":\"rgb(0,104,55)\"}}},\"P-1\":{\"mapZoom\":2,\"mapCenter\":[40.58058466412761,1.7578125]},\"P-6\":{\"vis\":{\"defaultColors\":{\"0 - 0.25\":\"rgb(247,252,245)\",\"0.25 - 0.5\":\"rgb(199,233,192)\",\"0.5 - 0.75\":\"rgb(116,196,118)\",\"0.75 - 1\":\"rgb(35,139,69)\"}}}}",
"version": 1,
"timeRestore": false,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"filter\":[{\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}}}],\"highlightAll\":true,\"version\":true}"
}
}
},
{
"_id": "927debf0-8649-11e7-967f-798bfd7ac13a",
"_type": "visualization",
"_source": {
"title": "Rspamd Actions",
"visState": "{\"title\":\"Rspamd Actions\",\"type\":\"pie\",\"params\":{\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"isDonut\":false},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"rspam_meta.action\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
"uiStateJSON": "{}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"rspamd-*\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
}
}
},
{
"_id": "6413f870-80f6-11e7-91e6-0986b0b459e7",
"_type": "visualization",
"_source": {
"title": "Rspamd Geo Map",
"visState": "{\n \"title\": \"Rspamd Geo Map\",\n \"type\": \"tile_map\",\n \"params\": {\n \"mapType\": \"Scaled Circle Markers\",\n \"isDesaturated\": true,\n \"addTooltip\": true,\n \"heatMaxZoom\": 0,\n \"heatMinOpacity\": 0.1,\n \"heatRadius\": 25,\n \"heatBlur\": 15,\n \"legendPosition\": \"bottomright\",\n \"mapZoom\": 2,\n \"mapCenter\": [\n 0,\n 0\n ],\n \"wms\": {\n \"enabled\": false,\n \"url\": \"https://basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer\",\n \"options\": {\n \"version\": \"1.3.0\",\n \"layers\": \"0\",\n \"format\": \"image/png\",\n \"transparent\": true,\n \"attribution\": \"Maps provided by USGS\",\n \"styles\": \"\"\n }\n }\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"count\",\n \"schema\": \"metric\",\n \"params\": {}\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"geohash_grid\",\n \"schema\": \"segment\",\n \"params\": {\n \"field\": \"rspam_meta.geoip.location\",\n \"autoPrecision\": true,\n \"useGeocentroid\": true,\n \"precision\": 2\n }\n }\n ],\n \"listeners\": {}\n}",
"uiStateJSON": "{}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\n \"index\": \"rspamd-*\",\n \"query\": {\n \"query_string\": {\n \"query\": \"*\",\n \"analyze_wildcard\": true\n }\n },\n \"filter\": []\n}"
}
}
},
{
"_id": "92a92c00-80f6-11e7-91e6-0986b0b459e7",
"_type": "visualization",
"_source": {
"title": "Rspamd Spam Map",
"visState": "{\n \"title\": \"Rspamd Spam Map\",\n \"type\": \"tile_map\",\n \"params\": {\n \"mapType\": \"Scaled Circle Markers\",\n \"isDesaturated\": true,\n \"addTooltip\": true,\n \"heatMaxZoom\": 0,\n \"heatMinOpacity\": 0.1,\n \"heatRadius\": 25,\n \"heatBlur\": 15,\n \"legendPosition\": \"bottomright\",\n \"mapZoom\": 2,\n \"mapCenter\": [\n 0,\n 0\n ],\n \"wms\": {\n \"enabled\": false,\n \"url\": \"https://basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer\",\n \"options\": {\n \"version\": \"1.3.0\",\n \"layers\": \"0\",\n \"format\": \"image/png\",\n \"transparent\": true,\n \"attribution\": \"Maps provided by USGS\",\n \"styles\": \"\"\n }\n }\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"count\",\n \"schema\": \"metric\",\n \"params\": {}\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"geohash_grid\",\n \"schema\": \"segment\",\n \"params\": {\n \"field\": \"rspam_meta.geoip.location\",\n \"autoPrecision\": true,\n \"useGeocentroid\": true,\n \"precision\": 2\n }\n }\n ],\n \"listeners\": {}\n}",
"uiStateJSON": "{}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\n \"index\": \"rspamd-*\",\n \"query\": {\n \"query_string\": {\n \"query\": \"*\",\n \"analyze_wildcard\": true\n }\n },\n \"filter\": [\n {\n \"meta\": {\n \"index\": \"rspamd_beat-*\",\n \"negate\": true,\n \"disabled\": false,\n \"alias\": null,\n \"type\": \"phrase\",\n \"key\": \"rspam_meta.action\",\n \"value\": \"no action\"\n },\n \"query\": {\n \"match\": {\n \"rspam_meta.action\": {\n \"query\": \"no action\",\n \"type\": \"phrase\"\n }\n }\n },\n \"$state\": {\n \"store\": \"appState\"\n }\n }\n ]\n}"
}
}
},
{
"_id": "2be7b6f0-8649-11e7-967f-798bfd7ac13a",
"_type": "visualization",
"_source": {
"title": "Rspamd Symbols Cloud",
"visState": "{\"title\":\"Rspamd Symbols Cloud\",\"type\":\"tagcloud\",\"params\":{\"scale\":\"linear\",\"orientation\":\"single\",\"minFontSize\":18,\"maxFontSize\":72},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"rspam_meta.symbols.name\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
"uiStateJSON": "{}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"rspamd-*\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
}
}
},
{
"_id": "1f7d9210-80f7-11e7-91e6-0986b0b459e7",
"_type": "visualization",
"_source": {
"title": "Rspamd Top recipients",
"visState": "{\n \"title\": \"Rspamd Top recipients\",\n \"type\": \"metric\",\n \"params\": {\n \"addTooltip\": true,\n \"addLegend\": false,\n \"type\": \"gauge\",\n \"gauge\": {\n \"verticalSplit\": false,\n \"autoExtend\": false,\n \"percentageMode\": false,\n \"gaugeType\": \"Metric\",\n \"gaugeStyle\": \"Full\",\n \"backStyle\": \"Full\",\n \"orientation\": \"vertical\",\n \"colorSchema\": \"Green to Red\",\n \"gaugeColorMode\": \"None\",\n \"useRange\": false,\n \"colorsRange\": [\n {\n \"from\": 0,\n \"to\": 100\n }\n ],\n \"invertColors\": false,\n \"labels\": {\n \"show\": true,\n \"color\": \"black\"\n },\n \"scale\": {\n \"show\": false,\n \"labels\": false,\n \"color\": \"#333\",\n \"width\": 2\n },\n \"type\": \"simple\",\n \"style\": {\n \"fontSize\": 60,\n \"bgFill\": \"#000\",\n \"bgColor\": false,\n \"labelColor\": false,\n \"subText\": \"\"\n }\n }\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"count\",\n \"schema\": \"metric\",\n \"params\": {}\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"terms\",\n \"schema\": \"group\",\n \"params\": {\n \"field\": \"rspam_meta.rcpt\",\n \"size\": 5,\n \"order\": \"desc\",\n \"orderBy\": \"1\"\n }\n }\n ],\n \"listeners\": {}\n}",
"uiStateJSON": "{\n \"vis\": {\n \"defaultColors\": {\n \"0 - 100\": \"rgb(0,104,55)\"\n }\n }\n}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\n \"index\": \"rspamd-*\",\n \"query\": {\n \"query_string\": {\n \"query\": \"*\",\n \"analyze_wildcard\": true\n }\n },\n \"filter\": []\n}"
}
}
},
{
"_id": "efa3f7a0-80f6-11e7-91e6-0986b0b459e7",
"_type": "visualization",
"_source": {
"title": "Rspamd Top Senders",
"visState": "{\n \"title\": \"Rspamd Top Senders\",\n \"type\": \"metric\",\n \"params\": {\n \"addTooltip\": true,\n \"addLegend\": false,\n \"type\": \"gauge\",\n \"gauge\": {\n \"verticalSplit\": false,\n \"autoExtend\": false,\n \"percentageMode\": false,\n \"gaugeType\": \"Metric\",\n \"gaugeStyle\": \"Full\",\n \"backStyle\": \"Full\",\n \"orientation\": \"vertical\",\n \"colorSchema\": \"Green to Red\",\n \"gaugeColorMode\": \"None\",\n \"useRange\": false,\n \"colorsRange\": [\n {\n \"from\": 0,\n \"to\": 100\n }\n ],\n \"invertColors\": false,\n \"labels\": {\n \"show\": true,\n \"color\": \"black\"\n },\n \"scale\": {\n \"show\": false,\n \"labels\": false,\n \"color\": \"#333\",\n \"width\": 2\n },\n \"type\": \"simple\",\n \"style\": {\n \"fontSize\": 60,\n \"bgFill\": \"#000\",\n \"bgColor\": false,\n \"labelColor\": false,\n \"subText\": \"\"\n }\n }\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"count\",\n \"schema\": \"metric\",\n \"params\": {}\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"terms\",\n \"schema\": \"group\",\n \"params\": {\n \"field\": \"rspam_meta.user\",\n \"size\": 5,\n \"order\": \"desc\",\n \"orderBy\": \"1\"\n }\n }\n ],\n \"listeners\": {}\n}",
"uiStateJSON": "{\n \"vis\": {\n \"defaultColors\": {\n \"0 - 100\": \"rgb(0,104,55)\"\n }\n }\n}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\n \"index\": \"rspamd-*\",\n \"query\": {\n \"query_string\": {\n \"query\": \"*\",\n \"analyze_wildcard\": true\n }\n },\n \"filter\": [\n {\n \"meta\": {\n \"index\": \"rspamd_beat-*\",\n \"negate\": true,\n \"disabled\": false,\n \"alias\": null,\n \"type\": \"phrase\",\n \"key\": \"rspam_meta.user\",\n \"value\": \"unknown\"\n },\n \"query\": {\n \"match\": {\n \"rspam_meta.user\": {\n \"query\": \"unknown\",\n \"type\": \"phrase\"\n }\n }\n },\n \"$state\": {\n \"store\": \"appState\"\n }\n }\n ]\n}"
}
}
}
]

+ 146
- 0
contrib/elastic/rspamd_template.json Wyświetl plik

@@ -0,0 +1,146 @@
{
"mappings": {
"_default_": {
"_all": {
"norms": false
},
"_meta": {
"version": "5.5.2"
},
"date_detection": false,
"dynamic_templates": [
{
"strings_as_keyword": {
"mapping": {
"ignore_above": 1024,
"type": "keyword"
},
"match_mapping_type": "string"
}
}
],
"properties": {
"@timestamp": {
"type": "date"
},
"meta": {
"properties": {
"cloud": {
"properties": {
"availability_zone": {
"ignore_above": 1024,
"type": "keyword"
},
"instance_id": {
"ignore_above": 1024,
"type": "keyword"
},
"machine_type": {
"ignore_above": 1024,
"type": "keyword"
},
"project_id": {
"ignore_above": 1024,
"type": "keyword"
},
"provider": {
"ignore_above": 1024,
"type": "keyword"
},
"region": {
"ignore_above": 1024,
"type": "keyword"
}
}
}
}
},
"rspam_meta": {
"properties": {
"action": {
"ignore_above": 1024,
"type": "keyword"
},
"direction": {
"ignore_above": 1024,
"type": "keyword"
},
"asn": {
"properties": {
"asn": {
"type": "long"
},
"country_code": {
"ignore_above": 1024,
"type": "keyword"
},
"ipnet": {
"ignore_above": 1024,
"type": "keyword"
},
"registrant": {
"ignore_above": 1024,
"type": "keyword"
}
}
},
"from": {
"ignore_above": 1024,
"type": "keyword"
},
"is_local": {
"type": "boolean"
},
"webmail": {
"type": "boolean"
},
"geoip": {
"properties": {
"city_name": {
"ignore_above": 1024,
"type": "keyword"
},
"continent_name": {
"ignore_above": 1024,
"type": "keyword"
},
"country_iso_code": {
"ignore_above": 1024,
"type": "keyword"
},
"location": {
"type": "geo_point"
}
}
},
"ip": {
"ignore_above": 1024,
"type": "keyword"
},
"qid": {
"ignore_above": 1024,
"type": "keyword"
},
"score": {
"type": "float"
},
"user": {
"ignore_above": 1024,
"type": "keyword"
}
}
},
"tags": {
"ignore_above": 1024,
"type": "keyword"
}
}
}
},
"order": 0,
"settings": {
"index.mapping.total_fields.limit": 10000,
"index.refresh_interval": "5s"
},
"template": "rspamd-*"
}

+ 345
- 0
src/plugins/lua/elastic.lua Wyświetl plik

@@ -0,0 +1,345 @@
--[[
Copyright (c) 2017, Veselin Iordanov
Copyright (c) 2018, Vsevolod Stakhov <vsevolod@highsecure.ru>

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
]]--

local rspamd_logger = require 'rspamd_logger'
local rspamd_http = require "rspamd_http"
local rspamd_lua_utils = require "lua_util"
local ucl = require "ucl"
local hash = require "rspamd_cryptobox_hash"

if confighelp then
return
end

local redis_params
redis_params = rspamd_parse_redis_server('elastic')

local rows = {}
local nrows = 0
local elastic_template
local redis_params
local settings = {
limit = 10,
index_pattern = 'rspamd-%Y.%m.%d',
server = 'localhost:9200',
template_file = '/etc/rspamd/rspamd_template.json',
kibana_file = '/etc/rspamd/kibana.json',
key_prefix = 'elastic-',
expire = 3600,
debug = false,
failover = false,
import_kibana = false,
}

local function read_file(path)
local file = io.open(path, "rb")
if not file then return nil end
local content = file:read "*a"
file:close()
return content
end
local function elastic_send_data(task)
local es_index = os.date(settings['index_pattern'])
local bulk_json = ""
for key,value in pairs(rows) do
bulk_json = bulk_json..'{ "index" : { "_index" : "'..es_index..'", "_type" : "logs" ,"pipeline": "rspamd-geoip"} }'.."\n"
bulk_json = bulk_json..ucl.to_format(value, 'json-compact').."\n"
end
local function http_index_data_callback(err, code, body, headers)
-- todo error handling we may store the rows it into redis and send it again later
if settings['debug'] then
rspamd_logger.infox(task, "After create data %1",body)
end
if code ~= 200 or err_message then
if settings['failover'] then
local h = hash.create()
h:update(bulk_json)
local key = settings['key_prefix'] ..es_index..":".. h:base32():sub(1, 20)
local data = rspamd_util.zstd_compress(bulk_json)
local function redis_set_cb(err)
if err ~=nil then
rspamd_logger.errx(task, 'redis_set_cb received error: %1', err)
end
end
rspamd_redis_make_request(task,
redis_params, -- connect params
key, -- hash key
true, -- is write
redis_set_cb, --callback
'SETEX', -- command
{key, tostring(settings['expire']), data} -- arguments
)
end
end
end
rspamd_http.request({
url = 'http://'..settings['server']..'/'..es_index..'/_bulk',
headers = {
['Content-Type'] = 'application/x-ndjson',
},
body = bulk_json,
task = task,
method = 'post',
callback = http_index_data_callback
})

end
local function get_general_metadata(task)
local r = {}
local ip_addr = task:get_ip()
r.ip = tostring(ip_addr) or 'unknown'
r.webmail = false
if ip_addr then
r.is_local = ip_addr:is_local()
local origin = task:get_header('X-Originating-IP')
if origin then
r.webmail = true
r.ip = origin
end
end
r.direction = "Inbound"
r.user = task:get_user() or 'unknown'
r.qid = task:get_queue_id() or 'unknown'
r.action = task:get_metric_action('default')
if r.user ~= 'unknown' then
r.direction = "Outbound"
end
local s = task:get_metric_score('default')[1]
r.score = s

local rcpt = task:get_recipients('smtp')
if rcpt then
local l = {}
for _, a in ipairs(rcpt) do
table.insert(l, a['addr'])
end
r.rcpt = l
else
r.rcpt = 'unknown'
end
local from = task:get_from('smtp')
if ((from or E)[1] or E).addr then
r.from = from[1].addr
else
r.from = 'unknown'
end
local syminf = task:get_symbols_all()
r.symbols = syminf
r.asn = {}
local pool = task:get_mempool()
r.asn.country = pool:get_variable("country") or 'unknown'
r.asn.asn = pool:get_variable("asn") or 0
r.asn.ipnet = pool:get_variable("ipnet") or 'unknown'
local function process_header(name)
local hdr = task:get_header_full(name)
if hdr then
local l = {}
for _, h in ipairs(hdr) do
table.insert(l, h.decoded)
end
return l
else
return 'unknown'
end
end
r.header_from = process_header('from')
r.header_to = process_header('to')
r.header_subject = process_header('subject')
r.header_date = process_header('date')
r.message_id = task:get_message_id()
return r
end

local function elastic_collect(task)
if not enabled then return end
if rspamd_lua_utils.is_rspamc_or_controller(task) then return end
local row = {['rspam_meta'] = get_general_metadata(task), ['@timestamp'] = os.time(os.date("*t")).."000"}
table.insert(rows, row)
nrows = nrows + 1
if nrows > settings['limit'] then
elastic_send_data(task)
nrows = 0
rows = {}
end
end
local opts = rspamd_config:get_all_opt('elastic')
local enabled = true;

local function check_elastic_server(ev_base)
local function http_callback(err, code, body, headers)
local parser = ucl.parser()
local res,err = parser:parse_string(body)
if not res then
rspamd_logger.infox(rspamd_config, 'failed to query elasticsearch server %1, disabling module',settings['server'])
enabled = false;
return
end
local obj = parser:get_object()
for node,value in pairs(obj['nodes']) do
local plugin_found = false
for i,plugin in pairs(value['plugins']) do
if plugin['name'] == 'ingest-geoip' then
plugin_found = true
end
end
if not plugin_found then
rspamd_logger.infox(rspamd_config, 'Unable to find ingest-geoip on %1 node, disabling module',node)
enabled = false
return
end
end
end
rspamd_http.request({
url = 'http://'..settings['server']..'/_nodes/plugins',
ev_base = ev_base,
method = 'get',
callback = http_callback
})
if enabled then
local function http_ingest_callback(err, code, body, headers)
if code ~= 200 or err_message then
-- pipeline not exist
end
end
rspamd_http.request({
url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
ev_base = ev_base,
method = 'get',
callback = http_ingest_callback
})
-- lets try to create ingest pipeline if not exist
rspamd_http.request({
url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
task = task,
body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}',
method = 'put',
})
end
local function http_template_create_callback(err, code, body, headers)
end
local function http_template_exist_callback(err, code, body, headers)
if code ~= 200 or err_message then
rspamd_http.request({
url = 'http://'..settings['server']..'/_template/rspamd',
task = task,
body = elastic_template,
method = 'put',
callback = http_template_create_callback
})
end
end
rspamd_http.request({
url = 'http://'..settings['server']..'/_template/rspamd',
task = task,
method = 'head',
callback = http_template_exist_callback
})
end
-- import ingest pipeline and kibana dashboard/visualization
local function initial_setup(worker)
if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end
if enabled then
-- create ingest pipeline
rspamd_http.request({
url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
task = task,
body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}',
method = 'put',
})
-- create template mappings if not exist
local function http_template_exist_callback(err, code, body, headers)
if code ~= 200 or err_message then
rspamd_http.request({
url = 'http://'..settings['server']..'/_template/rspamd',
task = task,
body = elastic_template,
method = 'put',
})
end
end
rspamd_http.request({
url = 'http://'..settings['server']..'/_template/rspamd',
task = task,
method = 'head',
callback = http_template_exist_callback
})
-- add kibana dashboard and visualizations
local kibana_mappings = read_file(settings['kibana_file']);
if enabled and settings['import_kibana'] then
local kibana_mappings = read_file(settings['kibana_file']);
if kibana_mappings then
local parser = ucl.parser()
local res,err = parser:parse_string(kibana_mappings)
if not res then
return
end
local obj = parser:get_object()
local bulk_json = ""
for _,item in ipairs(obj) do
bulk_json = bulk_json..'{ "index" : { "_index" : ".kibana", "_type" : "'..item["_type"]..'" ,"_id": "'..item["_id"]..'"} }'.."\n"
bulk_json = bulk_json..ucl.to_format(item['_source'], 'json-compact').."\n"
end
rspamd_http.request({
url = 'http://'..settings['server']..'/.kibana/_bulk',
headers = {
['Content-Type'] = 'application/x-ndjson',
},
body = bulk_json,
task = task,
method = 'post'
})
else
rspamd_logger.infox(rspamd_config, 'kibana templatefile not found')
end
end
end
end
rspamd_config:add_on_load(function(cfg, ev_base,worker)
if opts then
for k,v in pairs(opts) do
settings[k] = v
end
if not settings['server'] then
rspamd_logger.infox(rspamd_config, 'no elastic servers are specified, disabling module')
enabled = false
return
end
if not settings['template_file'] then
rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module')
enabled = false
return
end
end
elastic_template = read_file(settings['template_file']);
if not elastic_template then
rspamd_logger.infox(rspamd_config, 'elastic unable to read mappings, disabling module')
enabled = false
return
end
redis_params = rspamd_parse_redis_server('elastic')
check_elastic_server(ev_base) -- check for elasticsearch requirements
initial_setup(worker) -- import mappings pipeline and visualizations
end)

if enabled == true then
rspamd_config:register_symbol({
name = 'ELASTIC_COLLECT',
type = 'postfilter',
callback = elastic_collect,
priority = 10
})
end

Ładowanie…
Anuluj
Zapisz