aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2018-02-18 15:46:58 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2018-02-18 15:46:58 +0000
commit9ef70a3754121b2161e440fa295963dbac4269bc (patch)
tree882d37a7d03f7a836fc3064e4de6f31add6453a6
parentf1becea682b563d366add102cd88431f15182b26 (diff)
downloadrspamd-9ef70a3754121b2161e440fa295963dbac4269bc.tar.gz
rspamd-9ef70a3754121b2161e440fa295963dbac4269bc.zip
[Feature] Preliminary import of the elasticsearch module
-rw-r--r--conf/modules.d/elastic.conf21
-rw-r--r--contrib/elastic/kibana.json103
-rw-r--r--contrib/elastic/rspamd_template.json146
-rw-r--r--src/plugins/lua/elastic.lua345
4 files changed, 615 insertions, 0 deletions
diff --git a/conf/modules.d/elastic.conf b/conf/modules.d/elastic.conf
new file mode 100644
index 000000000..b0a96809f
--- /dev/null
+++ b/conf/modules.d/elastic.conf
@@ -0,0 +1,21 @@
+elastic {
+ # Push update when 10 records are collected (10 if unset)
+ limit = 10;
+ # IP:port of Elasticsearch server
+ server = "localhost:9200";
+ # Timeout to wait for response (5 seconds if unset)
+ timeout = 5;
+ # Elasticsearch template file (json format)
+ template_file = "/etc/rspamd/rspamd_template.json";
+ # Kibana prebuild visualizations and dashboard template (json format)
+ kibana_file = '/etc/rspamd/kibana.json',
+ # Elasticsearch index name pattern
+ index_pattern = "rspamd-%Y.%m.%d";
+ # Dump debug information
+ debug = false;
+ # Import kibana template
+ import_kibana = false;
+ .include(try=true,priority=5) "${DBDIR}/dynamic/elastic.conf"
+ .include(try=true,priority=1,duplicate=merge) "$LOCAL_CONFDIR/local.d/elastic.conf"
+ .include(try=true,priority=10) "$LOCAL_CONFDIR/override.d/elastic.conf"
+}
diff --git a/contrib/elastic/kibana.json b/contrib/elastic/kibana.json
new file mode 100644
index 000000000..16ef1b134
--- /dev/null
+++ b/contrib/elastic/kibana.json
@@ -0,0 +1,103 @@
+[
+ {
+ "_id": "6c6a2ed0-8660-11e7-85ae-fbc80f1b7844",
+ "_type": "dashboard",
+ "_source": {
+ "title": "Rspamd Dashboard",
+ "hits": 0,
+ "description": "",
+ "panelsJSON": "[{\"size_x\":6,\"size_y\":3,\"panelIndex\":1,\"type\":\"visualization\",\"id\":\"6413f870-80f6-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":1},{\"size_x\":6,\"size_y\":3,\"panelIndex\":2,\"type\":\"visualization\",\"id\":\"927debf0-8649-11e7-967f-798bfd7ac13a\",\"col\":7,\"row\":1},{\"size_x\":12,\"size_y\":3,\"panelIndex\":3,\"type\":\"visualization\",\"id\":\"efa3f7a0-80f6-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":7},{\"size_x\":12,\"size_y\":3,\"panelIndex\":4,\"type\":\"visualization\",\"id\":\"1f7d9210-80f7-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":10},{\"size_x\":6,\"size_y\":3,\"panelIndex\":5,\"type\":\"visualization\",\"id\":\"2be7b6f0-8649-11e7-967f-798bfd7ac13a\",\"col\":7,\"row\":4},{\"size_x\":6,\"size_y\":3,\"panelIndex\":6,\"type\":\"visualization\",\"id\":\"680b6480-826e-11e7-8a20-b7bc68c2e9e7\",\"col\":7,\"row\":13},{\"size_x\":6,\"size_y\":3,\"panelIndex\":7,\"type\":\"visualization\",\"id\":\"158dfc80-864d-11e7-bce7-4532b9d239a0\",\"col\":1,\"row\":4}]",
+ "optionsJSON": "{\"darkTheme\":false}",
+ "uiStateJSON": "{\"P-3\":{\"vis\":{\"defaultColors\":{\"0 - 100\":\"rgb(0,104,55)\"}}},\"P-4\":{\"vis\":{\"defaultColors\":{\"0 - 100\":\"rgb(0,104,55)\"}}},\"P-1\":{\"mapZoom\":2,\"mapCenter\":[40.58058466412761,1.7578125]},\"P-6\":{\"vis\":{\"defaultColors\":{\"0 - 0.25\":\"rgb(247,252,245)\",\"0.25 - 0.5\":\"rgb(199,233,192)\",\"0.5 - 0.75\":\"rgb(116,196,118)\",\"0.75 - 1\":\"rgb(35,139,69)\"}}}}",
+ "version": 1,
+ "timeRestore": false,
+ "kibanaSavedObjectMeta": {
+ "searchSourceJSON": "{\"filter\":[{\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}}}],\"highlightAll\":true,\"version\":true}"
+ }
+ }
+ },
+ {
+ "_id": "927debf0-8649-11e7-967f-798bfd7ac13a",
+ "_type": "visualization",
+ "_source": {
+ "title": "Rspamd Actions",
+ "visState": "{\"title\":\"Rspamd Actions\",\"type\":\"pie\",\"params\":{\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"isDonut\":false},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"rspam_meta.action\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
+ "uiStateJSON": "{}",
+ "description": "",
+ "version": 1,
+ "kibanaSavedObjectMeta": {
+ "searchSourceJSON": "{\"index\":\"rspamd-*\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
+ }
+ }
+ },
+ {
+ "_id": "6413f870-80f6-11e7-91e6-0986b0b459e7",
+ "_type": "visualization",
+ "_source": {
+ "title": "Rspamd Geo Map",
+ "visState": "{\n \"title\": \"Rspamd Geo Map\",\n \"type\": \"tile_map\",\n \"params\": {\n \"mapType\": \"Scaled Circle Markers\",\n \"isDesaturated\": true,\n \"addTooltip\": true,\n \"heatMaxZoom\": 0,\n \"heatMinOpacity\": 0.1,\n \"heatRadius\": 25,\n \"heatBlur\": 15,\n \"legendPosition\": \"bottomright\",\n \"mapZoom\": 2,\n \"mapCenter\": [\n 0,\n 0\n ],\n \"wms\": {\n \"enabled\": false,\n \"url\": \"https://basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer\",\n \"options\": {\n \"version\": \"1.3.0\",\n \"layers\": \"0\",\n \"format\": \"image/png\",\n \"transparent\": true,\n \"attribution\": \"Maps provided by USGS\",\n \"styles\": \"\"\n }\n }\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"count\",\n \"schema\": \"metric\",\n \"params\": {}\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"geohash_grid\",\n \"schema\": \"segment\",\n \"params\": {\n \"field\": \"rspam_meta.geoip.location\",\n \"autoPrecision\": true,\n \"useGeocentroid\": true,\n \"precision\": 2\n }\n }\n ],\n \"listeners\": {}\n}",
+ "uiStateJSON": "{}",
+ "description": "",
+ "version": 1,
+ "kibanaSavedObjectMeta": {
+ "searchSourceJSON": "{\n \"index\": \"rspamd-*\",\n \"query\": {\n \"query_string\": {\n \"query\": \"*\",\n \"analyze_wildcard\": true\n }\n },\n \"filter\": []\n}"
+ }
+ }
+ },
+ {
+ "_id": "92a92c00-80f6-11e7-91e6-0986b0b459e7",
+ "_type": "visualization",
+ "_source": {
+ "title": "Rspamd Spam Map",
+ "visState": "{\n \"title\": \"Rspamd Spam Map\",\n \"type\": \"tile_map\",\n \"params\": {\n \"mapType\": \"Scaled Circle Markers\",\n \"isDesaturated\": true,\n \"addTooltip\": true,\n \"heatMaxZoom\": 0,\n \"heatMinOpacity\": 0.1,\n \"heatRadius\": 25,\n \"heatBlur\": 15,\n \"legendPosition\": \"bottomright\",\n \"mapZoom\": 2,\n \"mapCenter\": [\n 0,\n 0\n ],\n \"wms\": {\n \"enabled\": false,\n \"url\": \"https://basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer\",\n \"options\": {\n \"version\": \"1.3.0\",\n \"layers\": \"0\",\n \"format\": \"image/png\",\n \"transparent\": true,\n \"attribution\": \"Maps provided by USGS\",\n \"styles\": \"\"\n }\n }\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"count\",\n \"schema\": \"metric\",\n \"params\": {}\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"geohash_grid\",\n \"schema\": \"segment\",\n \"params\": {\n \"field\": \"rspam_meta.geoip.location\",\n \"autoPrecision\": true,\n \"useGeocentroid\": true,\n \"precision\": 2\n }\n }\n ],\n \"listeners\": {}\n}",
+ "uiStateJSON": "{}",
+ "description": "",
+ "version": 1,
+ "kibanaSavedObjectMeta": {
+ "searchSourceJSON": "{\n \"index\": \"rspamd-*\",\n \"query\": {\n \"query_string\": {\n \"query\": \"*\",\n \"analyze_wildcard\": true\n }\n },\n \"filter\": [\n {\n \"meta\": {\n \"index\": \"rspamd_beat-*\",\n \"negate\": true,\n \"disabled\": false,\n \"alias\": null,\n \"type\": \"phrase\",\n \"key\": \"rspam_meta.action\",\n \"value\": \"no action\"\n },\n \"query\": {\n \"match\": {\n \"rspam_meta.action\": {\n \"query\": \"no action\",\n \"type\": \"phrase\"\n }\n }\n },\n \"$state\": {\n \"store\": \"appState\"\n }\n }\n ]\n}"
+ }
+ }
+ },
+ {
+ "_id": "2be7b6f0-8649-11e7-967f-798bfd7ac13a",
+ "_type": "visualization",
+ "_source": {
+ "title": "Rspamd Symbols Cloud",
+ "visState": "{\"title\":\"Rspamd Symbols Cloud\",\"type\":\"tagcloud\",\"params\":{\"scale\":\"linear\",\"orientation\":\"single\",\"minFontSize\":18,\"maxFontSize\":72},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"rspam_meta.symbols.name\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
+ "uiStateJSON": "{}",
+ "description": "",
+ "version": 1,
+ "kibanaSavedObjectMeta": {
+ "searchSourceJSON": "{\"index\":\"rspamd-*\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
+ }
+ }
+ },
+ {
+ "_id": "1f7d9210-80f7-11e7-91e6-0986b0b459e7",
+ "_type": "visualization",
+ "_source": {
+ "title": "Rspamd Top recipients",
+ "visState": "{\n \"title\": \"Rspamd Top recipients\",\n \"type\": \"metric\",\n \"params\": {\n \"addTooltip\": true,\n \"addLegend\": false,\n \"type\": \"gauge\",\n \"gauge\": {\n \"verticalSplit\": false,\n \"autoExtend\": false,\n \"percentageMode\": false,\n \"gaugeType\": \"Metric\",\n \"gaugeStyle\": \"Full\",\n \"backStyle\": \"Full\",\n \"orientation\": \"vertical\",\n \"colorSchema\": \"Green to Red\",\n \"gaugeColorMode\": \"None\",\n \"useRange\": false,\n \"colorsRange\": [\n {\n \"from\": 0,\n \"to\": 100\n }\n ],\n \"invertColors\": false,\n \"labels\": {\n \"show\": true,\n \"color\": \"black\"\n },\n \"scale\": {\n \"show\": false,\n \"labels\": false,\n \"color\": \"#333\",\n \"width\": 2\n },\n \"type\": \"simple\",\n \"style\": {\n \"fontSize\": 60,\n \"bgFill\": \"#000\",\n \"bgColor\": false,\n \"labelColor\": false,\n \"subText\": \"\"\n }\n }\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"count\",\n \"schema\": \"metric\",\n \"params\": {}\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"terms\",\n \"schema\": \"group\",\n \"params\": {\n \"field\": \"rspam_meta.rcpt\",\n \"size\": 5,\n \"order\": \"desc\",\n \"orderBy\": \"1\"\n }\n }\n ],\n \"listeners\": {}\n}",
+ "uiStateJSON": "{\n \"vis\": {\n \"defaultColors\": {\n \"0 - 100\": \"rgb(0,104,55)\"\n }\n }\n}",
+ "description": "",
+ "version": 1,
+ "kibanaSavedObjectMeta": {
+ "searchSourceJSON": "{\n \"index\": \"rspamd-*\",\n \"query\": {\n \"query_string\": {\n \"query\": \"*\",\n \"analyze_wildcard\": true\n }\n },\n \"filter\": []\n}"
+ }
+ }
+ },
+ {
+ "_id": "efa3f7a0-80f6-11e7-91e6-0986b0b459e7",
+ "_type": "visualization",
+ "_source": {
+ "title": "Rspamd Top Senders",
+ "visState": "{\n \"title\": \"Rspamd Top Senders\",\n \"type\": \"metric\",\n \"params\": {\n \"addTooltip\": true,\n \"addLegend\": false,\n \"type\": \"gauge\",\n \"gauge\": {\n \"verticalSplit\": false,\n \"autoExtend\": false,\n \"percentageMode\": false,\n \"gaugeType\": \"Metric\",\n \"gaugeStyle\": \"Full\",\n \"backStyle\": \"Full\",\n \"orientation\": \"vertical\",\n \"colorSchema\": \"Green to Red\",\n \"gaugeColorMode\": \"None\",\n \"useRange\": false,\n \"colorsRange\": [\n {\n \"from\": 0,\n \"to\": 100\n }\n ],\n \"invertColors\": false,\n \"labels\": {\n \"show\": true,\n \"color\": \"black\"\n },\n \"scale\": {\n \"show\": false,\n \"labels\": false,\n \"color\": \"#333\",\n \"width\": 2\n },\n \"type\": \"simple\",\n \"style\": {\n \"fontSize\": 60,\n \"bgFill\": \"#000\",\n \"bgColor\": false,\n \"labelColor\": false,\n \"subText\": \"\"\n }\n }\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"count\",\n \"schema\": \"metric\",\n \"params\": {}\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"terms\",\n \"schema\": \"group\",\n \"params\": {\n \"field\": \"rspam_meta.user\",\n \"size\": 5,\n \"order\": \"desc\",\n \"orderBy\": \"1\"\n }\n }\n ],\n \"listeners\": {}\n}",
+ "uiStateJSON": "{\n \"vis\": {\n \"defaultColors\": {\n \"0 - 100\": \"rgb(0,104,55)\"\n }\n }\n}",
+ "description": "",
+ "version": 1,
+ "kibanaSavedObjectMeta": {
+ "searchSourceJSON": "{\n \"index\": \"rspamd-*\",\n \"query\": {\n \"query_string\": {\n \"query\": \"*\",\n \"analyze_wildcard\": true\n }\n },\n \"filter\": [\n {\n \"meta\": {\n \"index\": \"rspamd_beat-*\",\n \"negate\": true,\n \"disabled\": false,\n \"alias\": null,\n \"type\": \"phrase\",\n \"key\": \"rspam_meta.user\",\n \"value\": \"unknown\"\n },\n \"query\": {\n \"match\": {\n \"rspam_meta.user\": {\n \"query\": \"unknown\",\n \"type\": \"phrase\"\n }\n }\n },\n \"$state\": {\n \"store\": \"appState\"\n }\n }\n ]\n}"
+ }
+ }
+ }
+]
diff --git a/contrib/elastic/rspamd_template.json b/contrib/elastic/rspamd_template.json
new file mode 100644
index 000000000..96e011e5d
--- /dev/null
+++ b/contrib/elastic/rspamd_template.json
@@ -0,0 +1,146 @@
+{
+ "mappings": {
+ "_default_": {
+ "_all": {
+ "norms": false
+ },
+ "_meta": {
+ "version": "5.5.2"
+ },
+ "date_detection": false,
+ "dynamic_templates": [
+ {
+ "strings_as_keyword": {
+ "mapping": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "match_mapping_type": "string"
+ }
+ }
+ ],
+ "properties": {
+ "@timestamp": {
+ "type": "date"
+ },
+ "meta": {
+ "properties": {
+ "cloud": {
+ "properties": {
+ "availability_zone": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "instance_id": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "machine_type": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "project_id": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "provider": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "region": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ }
+ }
+ }
+ }
+ },
+ "rspam_meta": {
+ "properties": {
+ "action": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "direction": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "asn": {
+ "properties": {
+ "asn": {
+ "type": "long"
+ },
+ "country_code": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "ipnet": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "registrant": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ }
+ }
+ },
+ "from": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "is_local": {
+ "type": "boolean"
+ },
+ "webmail": {
+ "type": "boolean"
+ },
+ "geoip": {
+ "properties": {
+ "city_name": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "continent_name": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "country_iso_code": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "location": {
+ "type": "geo_point"
+ }
+ }
+ },
+ "ip": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "qid": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ },
+ "score": {
+ "type": "float"
+ },
+ "user": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ }
+ }
+ },
+ "tags": {
+ "ignore_above": 1024,
+ "type": "keyword"
+ }
+ }
+ }
+ },
+ "order": 0,
+ "settings": {
+ "index.mapping.total_fields.limit": 10000,
+ "index.refresh_interval": "5s"
+ },
+ "template": "rspamd-*"
+}
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua
new file mode 100644
index 000000000..f3bf69ead
--- /dev/null
+++ b/src/plugins/lua/elastic.lua
@@ -0,0 +1,345 @@
+--[[
+Copyright (c) 2017, Veselin Iordanov
+Copyright (c) 2018, Vsevolod Stakhov <vsevolod@highsecure.ru>
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+]]--
+
+local rspamd_logger = require 'rspamd_logger'
+local rspamd_http = require "rspamd_http"
+local rspamd_lua_utils = require "lua_util"
+local ucl = require "ucl"
+local hash = require "rspamd_cryptobox_hash"
+
+if confighelp then
+ return
+end
+
+local redis_params
+redis_params = rspamd_parse_redis_server('elastic')
+
+local rows = {}
+local nrows = 0
+local elastic_template
+local redis_params
+local settings = {
+ limit = 10,
+ index_pattern = 'rspamd-%Y.%m.%d',
+ server = 'localhost:9200',
+ template_file = '/etc/rspamd/rspamd_template.json',
+ kibana_file = '/etc/rspamd/kibana.json',
+ key_prefix = 'elastic-',
+ expire = 3600,
+ debug = false,
+ failover = false,
+ import_kibana = false,
+}
+
+local function read_file(path)
+ local file = io.open(path, "rb")
+ if not file then return nil end
+ local content = file:read "*a"
+ file:close()
+ return content
+end
+local function elastic_send_data(task)
+ local es_index = os.date(settings['index_pattern'])
+ local bulk_json = ""
+ for key,value in pairs(rows) do
+ bulk_json = bulk_json..'{ "index" : { "_index" : "'..es_index..'", "_type" : "logs" ,"pipeline": "rspamd-geoip"} }'.."\n"
+ bulk_json = bulk_json..ucl.to_format(value, 'json-compact').."\n"
+ end
+ local function http_index_data_callback(err, code, body, headers)
+ -- todo error handling we may store the rows it into redis and send it again later
+ if settings['debug'] then
+ rspamd_logger.infox(task, "After create data %1",body)
+ end
+ if code ~= 200 or err_message then
+ if settings['failover'] then
+ local h = hash.create()
+ h:update(bulk_json)
+ local key = settings['key_prefix'] ..es_index..":".. h:base32():sub(1, 20)
+ local data = rspamd_util.zstd_compress(bulk_json)
+ local function redis_set_cb(err)
+ if err ~=nil then
+ rspamd_logger.errx(task, 'redis_set_cb received error: %1', err)
+ end
+ end
+ rspamd_redis_make_request(task,
+ redis_params, -- connect params
+ key, -- hash key
+ true, -- is write
+ redis_set_cb, --callback
+ 'SETEX', -- command
+ {key, tostring(settings['expire']), data} -- arguments
+ )
+ end
+ end
+ end
+ rspamd_http.request({
+ url = 'http://'..settings['server']..'/'..es_index..'/_bulk',
+ headers = {
+ ['Content-Type'] = 'application/x-ndjson',
+ },
+ body = bulk_json,
+ task = task,
+ method = 'post',
+ callback = http_index_data_callback
+ })
+
+end
+local function get_general_metadata(task)
+ local r = {}
+ local ip_addr = task:get_ip()
+ r.ip = tostring(ip_addr) or 'unknown'
+ r.webmail = false
+ if ip_addr then
+ r.is_local = ip_addr:is_local()
+ local origin = task:get_header('X-Originating-IP')
+ if origin then
+ r.webmail = true
+ r.ip = origin
+ end
+ end
+ r.direction = "Inbound"
+ r.user = task:get_user() or 'unknown'
+ r.qid = task:get_queue_id() or 'unknown'
+ r.action = task:get_metric_action('default')
+ if r.user ~= 'unknown' then
+ r.direction = "Outbound"
+ end
+ local s = task:get_metric_score('default')[1]
+ r.score = s
+
+ local rcpt = task:get_recipients('smtp')
+ if rcpt then
+ local l = {}
+ for _, a in ipairs(rcpt) do
+ table.insert(l, a['addr'])
+ end
+ r.rcpt = l
+ else
+ r.rcpt = 'unknown'
+ end
+ local from = task:get_from('smtp')
+ if ((from or E)[1] or E).addr then
+ r.from = from[1].addr
+ else
+ r.from = 'unknown'
+ end
+ local syminf = task:get_symbols_all()
+ r.symbols = syminf
+ r.asn = {}
+ local pool = task:get_mempool()
+ r.asn.country = pool:get_variable("country") or 'unknown'
+ r.asn.asn = pool:get_variable("asn") or 0
+ r.asn.ipnet = pool:get_variable("ipnet") or 'unknown'
+ local function process_header(name)
+ local hdr = task:get_header_full(name)
+ if hdr then
+ local l = {}
+ for _, h in ipairs(hdr) do
+ table.insert(l, h.decoded)
+ end
+ return l
+ else
+ return 'unknown'
+ end
+ end
+ r.header_from = process_header('from')
+ r.header_to = process_header('to')
+ r.header_subject = process_header('subject')
+ r.header_date = process_header('date')
+ r.message_id = task:get_message_id()
+ return r
+end
+
+local function elastic_collect(task)
+ if not enabled then return end
+ if rspamd_lua_utils.is_rspamc_or_controller(task) then return end
+ local row = {['rspam_meta'] = get_general_metadata(task), ['@timestamp'] = os.time(os.date("*t")).."000"}
+ table.insert(rows, row)
+ nrows = nrows + 1
+ if nrows > settings['limit'] then
+ elastic_send_data(task)
+ nrows = 0
+ rows = {}
+ end
+end
+local opts = rspamd_config:get_all_opt('elastic')
+local enabled = true;
+
+local function check_elastic_server(ev_base)
+ local function http_callback(err, code, body, headers)
+ local parser = ucl.parser()
+ local res,err = parser:parse_string(body)
+ if not res then
+ rspamd_logger.infox(rspamd_config, 'failed to query elasticsearch server %1, disabling module',settings['server'])
+ enabled = false;
+ return
+ end
+ local obj = parser:get_object()
+ for node,value in pairs(obj['nodes']) do
+ local plugin_found = false
+ for i,plugin in pairs(value['plugins']) do
+ if plugin['name'] == 'ingest-geoip' then
+ plugin_found = true
+ end
+ end
+ if not plugin_found then
+ rspamd_logger.infox(rspamd_config, 'Unable to find ingest-geoip on %1 node, disabling module',node)
+ enabled = false
+ return
+ end
+ end
+ end
+ rspamd_http.request({
+ url = 'http://'..settings['server']..'/_nodes/plugins',
+ ev_base = ev_base,
+ method = 'get',
+ callback = http_callback
+ })
+ if enabled then
+ local function http_ingest_callback(err, code, body, headers)
+ if code ~= 200 or err_message then
+ -- pipeline not exist
+ end
+ end
+ rspamd_http.request({
+ url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
+ ev_base = ev_base,
+ method = 'get',
+ callback = http_ingest_callback
+ })
+ -- lets try to create ingest pipeline if not exist
+ rspamd_http.request({
+ url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
+ task = task,
+ body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}',
+ method = 'put',
+ })
+ end
+ local function http_template_create_callback(err, code, body, headers)
+ end
+ local function http_template_exist_callback(err, code, body, headers)
+ if code ~= 200 or err_message then
+ rspamd_http.request({
+ url = 'http://'..settings['server']..'/_template/rspamd',
+ task = task,
+ body = elastic_template,
+ method = 'put',
+ callback = http_template_create_callback
+ })
+ end
+ end
+ rspamd_http.request({
+ url = 'http://'..settings['server']..'/_template/rspamd',
+ task = task,
+ method = 'head',
+ callback = http_template_exist_callback
+ })
+end
+-- import ingest pipeline and kibana dashboard/visualization
+local function initial_setup(worker)
+ if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end
+ if enabled then
+ -- create ingest pipeline
+ rspamd_http.request({
+ url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
+ task = task,
+ body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}',
+ method = 'put',
+ })
+ -- create template mappings if not exist
+ local function http_template_exist_callback(err, code, body, headers)
+ if code ~= 200 or err_message then
+ rspamd_http.request({
+ url = 'http://'..settings['server']..'/_template/rspamd',
+ task = task,
+ body = elastic_template,
+ method = 'put',
+ })
+ end
+ end
+ rspamd_http.request({
+ url = 'http://'..settings['server']..'/_template/rspamd',
+ task = task,
+ method = 'head',
+ callback = http_template_exist_callback
+ })
+ -- add kibana dashboard and visualizations
+ local kibana_mappings = read_file(settings['kibana_file']);
+ if enabled and settings['import_kibana'] then
+ local kibana_mappings = read_file(settings['kibana_file']);
+ if kibana_mappings then
+ local parser = ucl.parser()
+ local res,err = parser:parse_string(kibana_mappings)
+ if not res then
+ return
+ end
+ local obj = parser:get_object()
+ local bulk_json = ""
+ for _,item in ipairs(obj) do
+ bulk_json = bulk_json..'{ "index" : { "_index" : ".kibana", "_type" : "'..item["_type"]..'" ,"_id": "'..item["_id"]..'"} }'.."\n"
+ bulk_json = bulk_json..ucl.to_format(item['_source'], 'json-compact').."\n"
+ end
+ rspamd_http.request({
+ url = 'http://'..settings['server']..'/.kibana/_bulk',
+ headers = {
+ ['Content-Type'] = 'application/x-ndjson',
+ },
+ body = bulk_json,
+ task = task,
+ method = 'post'
+ })
+ else
+ rspamd_logger.infox(rspamd_config, 'kibana templatefile not found')
+ end
+ end
+ end
+end
+rspamd_config:add_on_load(function(cfg, ev_base,worker)
+ if opts then
+ for k,v in pairs(opts) do
+ settings[k] = v
+ end
+ if not settings['server'] then
+ rspamd_logger.infox(rspamd_config, 'no elastic servers are specified, disabling module')
+ enabled = false
+ return
+ end
+ if not settings['template_file'] then
+ rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module')
+ enabled = false
+ return
+ end
+ end
+ elastic_template = read_file(settings['template_file']);
+ if not elastic_template then
+ rspamd_logger.infox(rspamd_config, 'elastic unable to read mappings, disabling module')
+ enabled = false
+ return
+ end
+ redis_params = rspamd_parse_redis_server('elastic')
+ check_elastic_server(ev_base) -- check for elasticsearch requirements
+ initial_setup(worker) -- import mappings pipeline and visualizations
+end)
+
+if enabled == true then
+ rspamd_config:register_symbol({
+ name = 'ELASTIC_COLLECT',
+ type = 'postfilter',
+ callback = elastic_collect,
+ priority = 10
+ })
+end