diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-02-18 15:46:58 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-02-18 15:46:58 +0000 |
commit | 9ef70a3754121b2161e440fa295963dbac4269bc (patch) | |
tree | 882d37a7d03f7a836fc3064e4de6f31add6453a6 | |
parent | f1becea682b563d366add102cd88431f15182b26 (diff) | |
download | rspamd-9ef70a3754121b2161e440fa295963dbac4269bc.tar.gz rspamd-9ef70a3754121b2161e440fa295963dbac4269bc.zip |
[Feature] Preliminary import of the elasticsearch module
-rw-r--r-- | conf/modules.d/elastic.conf | 21 | ||||
-rw-r--r-- | contrib/elastic/kibana.json | 103 | ||||
-rw-r--r-- | contrib/elastic/rspamd_template.json | 146 | ||||
-rw-r--r-- | src/plugins/lua/elastic.lua | 345 |
4 files changed, 615 insertions, 0 deletions
diff --git a/conf/modules.d/elastic.conf b/conf/modules.d/elastic.conf new file mode 100644 index 000000000..b0a96809f --- /dev/null +++ b/conf/modules.d/elastic.conf @@ -0,0 +1,21 @@ +elastic { + # Push update when 10 records are collected (10 if unset) + limit = 10; + # IP:port of Elasticsearch server + server = "localhost:9200"; + # Timeout to wait for response (5 seconds if unset) + timeout = 5; + # Elasticsearch template file (json format) + template_file = "/etc/rspamd/rspamd_template.json"; + # Kibana prebuild visualizations and dashboard template (json format) + kibana_file = '/etc/rspamd/kibana.json', + # Elasticsearch index name pattern + index_pattern = "rspamd-%Y.%m.%d"; + # Dump debug information + debug = false; + # Import kibana template + import_kibana = false; + .include(try=true,priority=5) "${DBDIR}/dynamic/elastic.conf" + .include(try=true,priority=1,duplicate=merge) "$LOCAL_CONFDIR/local.d/elastic.conf" + .include(try=true,priority=10) "$LOCAL_CONFDIR/override.d/elastic.conf" +} diff --git a/contrib/elastic/kibana.json b/contrib/elastic/kibana.json new file mode 100644 index 000000000..16ef1b134 --- /dev/null +++ b/contrib/elastic/kibana.json @@ -0,0 +1,103 @@ +[ + { + "_id": "6c6a2ed0-8660-11e7-85ae-fbc80f1b7844", + "_type": "dashboard", + "_source": { + "title": "Rspamd Dashboard", + "hits": 0, + "description": "", + "panelsJSON": "[{\"size_x\":6,\"size_y\":3,\"panelIndex\":1,\"type\":\"visualization\",\"id\":\"6413f870-80f6-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":1},{\"size_x\":6,\"size_y\":3,\"panelIndex\":2,\"type\":\"visualization\",\"id\":\"927debf0-8649-11e7-967f-798bfd7ac13a\",\"col\":7,\"row\":1},{\"size_x\":12,\"size_y\":3,\"panelIndex\":3,\"type\":\"visualization\",\"id\":\"efa3f7a0-80f6-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":7},{\"size_x\":12,\"size_y\":3,\"panelIndex\":4,\"type\":\"visualization\",\"id\":\"1f7d9210-80f7-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":10},{\"size_x\":6,\"size_y\":3,\"panelIndex\":5,\"type\":\"visualization\",\"id\":\"2be7b6f0-8649-11e7-967f-798bfd7ac13a\",\"col\":7,\"row\":4},{\"size_x\":6,\"size_y\":3,\"panelIndex\":6,\"type\":\"visualization\",\"id\":\"680b6480-826e-11e7-8a20-b7bc68c2e9e7\",\"col\":7,\"row\":13},{\"size_x\":6,\"size_y\":3,\"panelIndex\":7,\"type\":\"visualization\",\"id\":\"158dfc80-864d-11e7-bce7-4532b9d239a0\",\"col\":1,\"row\":4}]", + "optionsJSON": "{\"darkTheme\":false}", + "uiStateJSON": "{\"P-3\":{\"vis\":{\"defaultColors\":{\"0 - 100\":\"rgb(0,104,55)\"}}},\"P-4\":{\"vis\":{\"defaultColors\":{\"0 - 100\":\"rgb(0,104,55)\"}}},\"P-1\":{\"mapZoom\":2,\"mapCenter\":[40.58058466412761,1.7578125]},\"P-6\":{\"vis\":{\"defaultColors\":{\"0 - 0.25\":\"rgb(247,252,245)\",\"0.25 - 0.5\":\"rgb(199,233,192)\",\"0.5 - 0.75\":\"rgb(116,196,118)\",\"0.75 - 1\":\"rgb(35,139,69)\"}}}}", + "version": 1, + "timeRestore": false, + "kibanaSavedObjectMeta": { + "searchSourceJSON": "{\"filter\":[{\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}}}],\"highlightAll\":true,\"version\":true}" + } + } + }, + { + "_id": "927debf0-8649-11e7-967f-798bfd7ac13a", + "_type": "visualization", + "_source": { + "title": "Rspamd Actions", + "visState": "{\"title\":\"Rspamd Actions\",\"type\":\"pie\",\"params\":{\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"isDonut\":false},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"rspam_meta.action\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}", + "uiStateJSON": "{}", + "description": "", + "version": 1, + "kibanaSavedObjectMeta": { + "searchSourceJSON": "{\"index\":\"rspamd-*\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}" + } + } + }, + { + "_id": "6413f870-80f6-11e7-91e6-0986b0b459e7", + "_type": "visualization", + "_source": { + "title": "Rspamd Geo Map", + "visState": "{\n \"title\": \"Rspamd Geo Map\",\n \"type\": \"tile_map\",\n \"params\": {\n \"mapType\": \"Scaled Circle Markers\",\n \"isDesaturated\": true,\n \"addTooltip\": true,\n \"heatMaxZoom\": 0,\n \"heatMinOpacity\": 0.1,\n \"heatRadius\": 25,\n \"heatBlur\": 15,\n \"legendPosition\": \"bottomright\",\n \"mapZoom\": 2,\n \"mapCenter\": [\n 0,\n 0\n ],\n \"wms\": {\n \"enabled\": false,\n \"url\": \"https://basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer\",\n \"options\": {\n \"version\": \"1.3.0\",\n \"layers\": \"0\",\n \"format\": \"image/png\",\n \"transparent\": true,\n \"attribution\": \"Maps provided by USGS\",\n \"styles\": \"\"\n }\n }\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"count\",\n \"schema\": \"metric\",\n \"params\": {}\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"geohash_grid\",\n \"schema\": \"segment\",\n \"params\": {\n \"field\": \"rspam_meta.geoip.location\",\n \"autoPrecision\": true,\n \"useGeocentroid\": true,\n \"precision\": 2\n }\n }\n ],\n \"listeners\": {}\n}", + "uiStateJSON": "{}", + "description": "", + "version": 1, + "kibanaSavedObjectMeta": { + "searchSourceJSON": "{\n \"index\": \"rspamd-*\",\n \"query\": {\n \"query_string\": {\n \"query\": \"*\",\n \"analyze_wildcard\": true\n }\n },\n \"filter\": []\n}" + } + } + }, + { + "_id": "92a92c00-80f6-11e7-91e6-0986b0b459e7", + "_type": "visualization", + "_source": { + "title": "Rspamd Spam Map", + "visState": "{\n \"title\": \"Rspamd Spam Map\",\n \"type\": \"tile_map\",\n \"params\": {\n \"mapType\": \"Scaled Circle Markers\",\n \"isDesaturated\": true,\n \"addTooltip\": true,\n \"heatMaxZoom\": 0,\n \"heatMinOpacity\": 0.1,\n \"heatRadius\": 25,\n \"heatBlur\": 15,\n \"legendPosition\": \"bottomright\",\n \"mapZoom\": 2,\n \"mapCenter\": [\n 0,\n 0\n ],\n \"wms\": {\n \"enabled\": false,\n \"url\": \"https://basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer\",\n \"options\": {\n \"version\": \"1.3.0\",\n \"layers\": \"0\",\n \"format\": \"image/png\",\n \"transparent\": true,\n \"attribution\": \"Maps provided by USGS\",\n \"styles\": \"\"\n }\n }\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"count\",\n \"schema\": \"metric\",\n \"params\": {}\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"geohash_grid\",\n \"schema\": \"segment\",\n \"params\": {\n \"field\": \"rspam_meta.geoip.location\",\n \"autoPrecision\": true,\n \"useGeocentroid\": true,\n \"precision\": 2\n }\n }\n ],\n \"listeners\": {}\n}", + "uiStateJSON": "{}", + "description": "", + "version": 1, + "kibanaSavedObjectMeta": { + "searchSourceJSON": "{\n \"index\": \"rspamd-*\",\n \"query\": {\n \"query_string\": {\n \"query\": \"*\",\n \"analyze_wildcard\": true\n }\n },\n \"filter\": [\n {\n \"meta\": {\n \"index\": \"rspamd_beat-*\",\n \"negate\": true,\n \"disabled\": false,\n \"alias\": null,\n \"type\": \"phrase\",\n \"key\": \"rspam_meta.action\",\n \"value\": \"no action\"\n },\n \"query\": {\n \"match\": {\n \"rspam_meta.action\": {\n \"query\": \"no action\",\n \"type\": \"phrase\"\n }\n }\n },\n \"$state\": {\n \"store\": \"appState\"\n }\n }\n ]\n}" + } + } + }, + { + "_id": "2be7b6f0-8649-11e7-967f-798bfd7ac13a", + "_type": "visualization", + "_source": { + "title": "Rspamd Symbols Cloud", + "visState": "{\"title\":\"Rspamd Symbols Cloud\",\"type\":\"tagcloud\",\"params\":{\"scale\":\"linear\",\"orientation\":\"single\",\"minFontSize\":18,\"maxFontSize\":72},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"rspam_meta.symbols.name\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}", + "uiStateJSON": "{}", + "description": "", + "version": 1, + "kibanaSavedObjectMeta": { + "searchSourceJSON": "{\"index\":\"rspamd-*\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}" + } + } + }, + { + "_id": "1f7d9210-80f7-11e7-91e6-0986b0b459e7", + "_type": "visualization", + "_source": { + "title": "Rspamd Top recipients", + "visState": "{\n \"title\": \"Rspamd Top recipients\",\n \"type\": \"metric\",\n \"params\": {\n \"addTooltip\": true,\n \"addLegend\": false,\n \"type\": \"gauge\",\n \"gauge\": {\n \"verticalSplit\": false,\n \"autoExtend\": false,\n \"percentageMode\": false,\n \"gaugeType\": \"Metric\",\n \"gaugeStyle\": \"Full\",\n \"backStyle\": \"Full\",\n \"orientation\": \"vertical\",\n \"colorSchema\": \"Green to Red\",\n \"gaugeColorMode\": \"None\",\n \"useRange\": false,\n \"colorsRange\": [\n {\n \"from\": 0,\n \"to\": 100\n }\n ],\n \"invertColors\": false,\n \"labels\": {\n \"show\": true,\n \"color\": \"black\"\n },\n \"scale\": {\n \"show\": false,\n \"labels\": false,\n \"color\": \"#333\",\n \"width\": 2\n },\n \"type\": \"simple\",\n \"style\": {\n \"fontSize\": 60,\n \"bgFill\": \"#000\",\n \"bgColor\": false,\n \"labelColor\": false,\n \"subText\": \"\"\n }\n }\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"count\",\n \"schema\": \"metric\",\n \"params\": {}\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"terms\",\n \"schema\": \"group\",\n \"params\": {\n \"field\": \"rspam_meta.rcpt\",\n \"size\": 5,\n \"order\": \"desc\",\n \"orderBy\": \"1\"\n }\n }\n ],\n \"listeners\": {}\n}", + "uiStateJSON": "{\n \"vis\": {\n \"defaultColors\": {\n \"0 - 100\": \"rgb(0,104,55)\"\n }\n }\n}", + "description": "", + "version": 1, + "kibanaSavedObjectMeta": { + "searchSourceJSON": "{\n \"index\": \"rspamd-*\",\n \"query\": {\n \"query_string\": {\n \"query\": \"*\",\n \"analyze_wildcard\": true\n }\n },\n \"filter\": []\n}" + } + } + }, + { + "_id": "efa3f7a0-80f6-11e7-91e6-0986b0b459e7", + "_type": "visualization", + "_source": { + "title": "Rspamd Top Senders", + "visState": "{\n \"title\": \"Rspamd Top Senders\",\n \"type\": \"metric\",\n \"params\": {\n \"addTooltip\": true,\n \"addLegend\": false,\n \"type\": \"gauge\",\n \"gauge\": {\n \"verticalSplit\": false,\n \"autoExtend\": false,\n \"percentageMode\": false,\n \"gaugeType\": \"Metric\",\n \"gaugeStyle\": \"Full\",\n \"backStyle\": \"Full\",\n \"orientation\": \"vertical\",\n \"colorSchema\": \"Green to Red\",\n \"gaugeColorMode\": \"None\",\n \"useRange\": false,\n \"colorsRange\": [\n {\n \"from\": 0,\n \"to\": 100\n }\n ],\n \"invertColors\": false,\n \"labels\": {\n \"show\": true,\n \"color\": \"black\"\n },\n \"scale\": {\n \"show\": false,\n \"labels\": false,\n \"color\": \"#333\",\n \"width\": 2\n },\n \"type\": \"simple\",\n \"style\": {\n \"fontSize\": 60,\n \"bgFill\": \"#000\",\n \"bgColor\": false,\n \"labelColor\": false,\n \"subText\": \"\"\n }\n }\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"count\",\n \"schema\": \"metric\",\n \"params\": {}\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"terms\",\n \"schema\": \"group\",\n \"params\": {\n \"field\": \"rspam_meta.user\",\n \"size\": 5,\n \"order\": \"desc\",\n \"orderBy\": \"1\"\n }\n }\n ],\n \"listeners\": {}\n}", + "uiStateJSON": "{\n \"vis\": {\n \"defaultColors\": {\n \"0 - 100\": \"rgb(0,104,55)\"\n }\n }\n}", + "description": "", + "version": 1, + "kibanaSavedObjectMeta": { + "searchSourceJSON": "{\n \"index\": \"rspamd-*\",\n \"query\": {\n \"query_string\": {\n \"query\": \"*\",\n \"analyze_wildcard\": true\n }\n },\n \"filter\": [\n {\n \"meta\": {\n \"index\": \"rspamd_beat-*\",\n \"negate\": true,\n \"disabled\": false,\n \"alias\": null,\n \"type\": \"phrase\",\n \"key\": \"rspam_meta.user\",\n \"value\": \"unknown\"\n },\n \"query\": {\n \"match\": {\n \"rspam_meta.user\": {\n \"query\": \"unknown\",\n \"type\": \"phrase\"\n }\n }\n },\n \"$state\": {\n \"store\": \"appState\"\n }\n }\n ]\n}" + } + } + } +] diff --git a/contrib/elastic/rspamd_template.json b/contrib/elastic/rspamd_template.json new file mode 100644 index 000000000..96e011e5d --- /dev/null +++ b/contrib/elastic/rspamd_template.json @@ -0,0 +1,146 @@ +{ + "mappings": { + "_default_": { + "_all": { + "norms": false + }, + "_meta": { + "version": "5.5.2" + }, + "date_detection": false, + "dynamic_templates": [ + { + "strings_as_keyword": { + "mapping": { + "ignore_above": 1024, + "type": "keyword" + }, + "match_mapping_type": "string" + } + } + ], + "properties": { + "@timestamp": { + "type": "date" + }, + "meta": { + "properties": { + "cloud": { + "properties": { + "availability_zone": { + "ignore_above": 1024, + "type": "keyword" + }, + "instance_id": { + "ignore_above": 1024, + "type": "keyword" + }, + "machine_type": { + "ignore_above": 1024, + "type": "keyword" + }, + "project_id": { + "ignore_above": 1024, + "type": "keyword" + }, + "provider": { + "ignore_above": 1024, + "type": "keyword" + }, + "region": { + "ignore_above": 1024, + "type": "keyword" + } + } + } + } + }, + "rspam_meta": { + "properties": { + "action": { + "ignore_above": 1024, + "type": "keyword" + }, + "direction": { + "ignore_above": 1024, + "type": "keyword" + }, + "asn": { + "properties": { + "asn": { + "type": "long" + }, + "country_code": { + "ignore_above": 1024, + "type": "keyword" + }, + "ipnet": { + "ignore_above": 1024, + "type": "keyword" + }, + "registrant": { + "ignore_above": 1024, + "type": "keyword" + } + } + }, + "from": { + "ignore_above": 1024, + "type": "keyword" + }, + "is_local": { + "type": "boolean" + }, + "webmail": { + "type": "boolean" + }, + "geoip": { + "properties": { + "city_name": { + "ignore_above": 1024, + "type": "keyword" + }, + "continent_name": { + "ignore_above": 1024, + "type": "keyword" + }, + "country_iso_code": { + "ignore_above": 1024, + "type": "keyword" + }, + "location": { + "type": "geo_point" + } + } + }, + "ip": { + "ignore_above": 1024, + "type": "keyword" + }, + "qid": { + "ignore_above": 1024, + "type": "keyword" + }, + "score": { + "type": "float" + }, + "user": { + "ignore_above": 1024, + "type": "keyword" + } + } + }, + "tags": { + "ignore_above": 1024, + "type": "keyword" + } + } + } + }, + "order": 0, + "settings": { + "index.mapping.total_fields.limit": 10000, + "index.refresh_interval": "5s" + }, + "template": "rspamd-*" +} diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua new file mode 100644 index 000000000..f3bf69ead --- /dev/null +++ b/src/plugins/lua/elastic.lua @@ -0,0 +1,345 @@ +--[[ +Copyright (c) 2017, Veselin Iordanov +Copyright (c) 2018, Vsevolod Stakhov <vsevolod@highsecure.ru> + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +]]-- + +local rspamd_logger = require 'rspamd_logger' +local rspamd_http = require "rspamd_http" +local rspamd_lua_utils = require "lua_util" +local ucl = require "ucl" +local hash = require "rspamd_cryptobox_hash" + +if confighelp then + return +end + +local redis_params +redis_params = rspamd_parse_redis_server('elastic') + +local rows = {} +local nrows = 0 +local elastic_template +local redis_params +local settings = { + limit = 10, + index_pattern = 'rspamd-%Y.%m.%d', + server = 'localhost:9200', + template_file = '/etc/rspamd/rspamd_template.json', + kibana_file = '/etc/rspamd/kibana.json', + key_prefix = 'elastic-', + expire = 3600, + debug = false, + failover = false, + import_kibana = false, +} + +local function read_file(path) + local file = io.open(path, "rb") + if not file then return nil end + local content = file:read "*a" + file:close() + return content +end +local function elastic_send_data(task) + local es_index = os.date(settings['index_pattern']) + local bulk_json = "" + for key,value in pairs(rows) do + bulk_json = bulk_json..'{ "index" : { "_index" : "'..es_index..'", "_type" : "logs" ,"pipeline": "rspamd-geoip"} }'.."\n" + bulk_json = bulk_json..ucl.to_format(value, 'json-compact').."\n" + end + local function http_index_data_callback(err, code, body, headers) + -- todo error handling we may store the rows it into redis and send it again later + if settings['debug'] then + rspamd_logger.infox(task, "After create data %1",body) + end + if code ~= 200 or err_message then + if settings['failover'] then + local h = hash.create() + h:update(bulk_json) + local key = settings['key_prefix'] ..es_index..":".. h:base32():sub(1, 20) + local data = rspamd_util.zstd_compress(bulk_json) + local function redis_set_cb(err) + if err ~=nil then + rspamd_logger.errx(task, 'redis_set_cb received error: %1', err) + end + end + rspamd_redis_make_request(task, + redis_params, -- connect params + key, -- hash key + true, -- is write + redis_set_cb, --callback + 'SETEX', -- command + {key, tostring(settings['expire']), data} -- arguments + ) + end + end + end + rspamd_http.request({ + url = 'http://'..settings['server']..'/'..es_index..'/_bulk', + headers = { + ['Content-Type'] = 'application/x-ndjson', + }, + body = bulk_json, + task = task, + method = 'post', + callback = http_index_data_callback + }) + +end +local function get_general_metadata(task) + local r = {} + local ip_addr = task:get_ip() + r.ip = tostring(ip_addr) or 'unknown' + r.webmail = false + if ip_addr then + r.is_local = ip_addr:is_local() + local origin = task:get_header('X-Originating-IP') + if origin then + r.webmail = true + r.ip = origin + end + end + r.direction = "Inbound" + r.user = task:get_user() or 'unknown' + r.qid = task:get_queue_id() or 'unknown' + r.action = task:get_metric_action('default') + if r.user ~= 'unknown' then + r.direction = "Outbound" + end + local s = task:get_metric_score('default')[1] + r.score = s + + local rcpt = task:get_recipients('smtp') + if rcpt then + local l = {} + for _, a in ipairs(rcpt) do + table.insert(l, a['addr']) + end + r.rcpt = l + else + r.rcpt = 'unknown' + end + local from = task:get_from('smtp') + if ((from or E)[1] or E).addr then + r.from = from[1].addr + else + r.from = 'unknown' + end + local syminf = task:get_symbols_all() + r.symbols = syminf + r.asn = {} + local pool = task:get_mempool() + r.asn.country = pool:get_variable("country") or 'unknown' + r.asn.asn = pool:get_variable("asn") or 0 + r.asn.ipnet = pool:get_variable("ipnet") or 'unknown' + local function process_header(name) + local hdr = task:get_header_full(name) + if hdr then + local l = {} + for _, h in ipairs(hdr) do + table.insert(l, h.decoded) + end + return l + else + return 'unknown' + end + end + r.header_from = process_header('from') + r.header_to = process_header('to') + r.header_subject = process_header('subject') + r.header_date = process_header('date') + r.message_id = task:get_message_id() + return r +end + +local function elastic_collect(task) + if not enabled then return end + if rspamd_lua_utils.is_rspamc_or_controller(task) then return end + local row = {['rspam_meta'] = get_general_metadata(task), ['@timestamp'] = os.time(os.date("*t")).."000"} + table.insert(rows, row) + nrows = nrows + 1 + if nrows > settings['limit'] then + elastic_send_data(task) + nrows = 0 + rows = {} + end +end +local opts = rspamd_config:get_all_opt('elastic') +local enabled = true; + +local function check_elastic_server(ev_base) + local function http_callback(err, code, body, headers) + local parser = ucl.parser() + local res,err = parser:parse_string(body) + if not res then + rspamd_logger.infox(rspamd_config, 'failed to query elasticsearch server %1, disabling module',settings['server']) + enabled = false; + return + end + local obj = parser:get_object() + for node,value in pairs(obj['nodes']) do + local plugin_found = false + for i,plugin in pairs(value['plugins']) do + if plugin['name'] == 'ingest-geoip' then + plugin_found = true + end + end + if not plugin_found then + rspamd_logger.infox(rspamd_config, 'Unable to find ingest-geoip on %1 node, disabling module',node) + enabled = false + return + end + end + end + rspamd_http.request({ + url = 'http://'..settings['server']..'/_nodes/plugins', + ev_base = ev_base, + method = 'get', + callback = http_callback + }) + if enabled then + local function http_ingest_callback(err, code, body, headers) + if code ~= 200 or err_message then + -- pipeline not exist + end + end + rspamd_http.request({ + url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip', + ev_base = ev_base, + method = 'get', + callback = http_ingest_callback + }) + -- lets try to create ingest pipeline if not exist + rspamd_http.request({ + url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip', + task = task, + body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}', + method = 'put', + }) + end + local function http_template_create_callback(err, code, body, headers) + end + local function http_template_exist_callback(err, code, body, headers) + if code ~= 200 or err_message then + rspamd_http.request({ + url = 'http://'..settings['server']..'/_template/rspamd', + task = task, + body = elastic_template, + method = 'put', + callback = http_template_create_callback + }) + end + end + rspamd_http.request({ + url = 'http://'..settings['server']..'/_template/rspamd', + task = task, + method = 'head', + callback = http_template_exist_callback + }) +end +-- import ingest pipeline and kibana dashboard/visualization +local function initial_setup(worker) + if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end + if enabled then + -- create ingest pipeline + rspamd_http.request({ + url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip', + task = task, + body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}', + method = 'put', + }) + -- create template mappings if not exist + local function http_template_exist_callback(err, code, body, headers) + if code ~= 200 or err_message then + rspamd_http.request({ + url = 'http://'..settings['server']..'/_template/rspamd', + task = task, + body = elastic_template, + method = 'put', + }) + end + end + rspamd_http.request({ + url = 'http://'..settings['server']..'/_template/rspamd', + task = task, + method = 'head', + callback = http_template_exist_callback + }) + -- add kibana dashboard and visualizations + local kibana_mappings = read_file(settings['kibana_file']); + if enabled and settings['import_kibana'] then + local kibana_mappings = read_file(settings['kibana_file']); + if kibana_mappings then + local parser = ucl.parser() + local res,err = parser:parse_string(kibana_mappings) + if not res then + return + end + local obj = parser:get_object() + local bulk_json = "" + for _,item in ipairs(obj) do + bulk_json = bulk_json..'{ "index" : { "_index" : ".kibana", "_type" : "'..item["_type"]..'" ,"_id": "'..item["_id"]..'"} }'.."\n" + bulk_json = bulk_json..ucl.to_format(item['_source'], 'json-compact').."\n" + end + rspamd_http.request({ + url = 'http://'..settings['server']..'/.kibana/_bulk', + headers = { + ['Content-Type'] = 'application/x-ndjson', + }, + body = bulk_json, + task = task, + method = 'post' + }) + else + rspamd_logger.infox(rspamd_config, 'kibana templatefile not found') + end + end + end +end +rspamd_config:add_on_load(function(cfg, ev_base,worker) + if opts then + for k,v in pairs(opts) do + settings[k] = v + end + if not settings['server'] then + rspamd_logger.infox(rspamd_config, 'no elastic servers are specified, disabling module') + enabled = false + return + end + if not settings['template_file'] then + rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module') + enabled = false + return + end + end + elastic_template = read_file(settings['template_file']); + if not elastic_template then + rspamd_logger.infox(rspamd_config, 'elastic unable to read mappings, disabling module') + enabled = false + return + end + redis_params = rspamd_parse_redis_server('elastic') + check_elastic_server(ev_base) -- check for elasticsearch requirements + initial_setup(worker) -- import mappings pipeline and visualizations +end) + +if enabled == true then + rspamd_config:register_symbol({ + name = 'ELASTIC_COLLECT', + type = 'postfilter', + callback = elastic_collect, + priority = 10 + }) +end |