]> source.dussan.org Git - rspamd.git/commitdiff
[Feature] Preliminary import of the elasticsearch module
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sun, 18 Feb 2018 15:46:58 +0000 (15:46 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sun, 18 Feb 2018 15:46:58 +0000 (15:46 +0000)
conf/modules.d/elastic.conf [new file with mode: 0644]
contrib/elastic/kibana.json [new file with mode: 0644]
contrib/elastic/rspamd_template.json [new file with mode: 0644]
src/plugins/lua/elastic.lua [new file with mode: 0644]

diff --git a/conf/modules.d/elastic.conf b/conf/modules.d/elastic.conf
new file mode 100644 (file)
index 0000000..b0a9680
--- /dev/null
@@ -0,0 +1,21 @@
+elastic {
+  # Push update when 10 records are collected (10 if unset)
+  limit = 10;
+  # IP:port of Elasticsearch server
+  server = "localhost:9200";
+  # Timeout to wait for response (5 seconds if unset)
+  timeout = 5;
+  # Elasticsearch template file (json format)
+  template_file = "/etc/rspamd/rspamd_template.json";
+  # Kibana prebuild visualizations and dashboard template (json format)
+  kibana_file = '/etc/rspamd/kibana.json',
+  # Elasticsearch index name pattern
+  index_pattern = "rspamd-%Y.%m.%d";
+  # Dump debug information
+  debug = false;
+  # Import kibana template
+  import_kibana = false;
+  .include(try=true,priority=5) "${DBDIR}/dynamic/elastic.conf"
+  .include(try=true,priority=1,duplicate=merge) "$LOCAL_CONFDIR/local.d/elastic.conf"
+  .include(try=true,priority=10) "$LOCAL_CONFDIR/override.d/elastic.conf"
+}
diff --git a/contrib/elastic/kibana.json b/contrib/elastic/kibana.json
new file mode 100644 (file)
index 0000000..16ef1b1
--- /dev/null
@@ -0,0 +1,103 @@
+[
+  {
+    "_id": "6c6a2ed0-8660-11e7-85ae-fbc80f1b7844",
+    "_type": "dashboard",
+    "_source": {
+      "title": "Rspamd Dashboard",
+      "hits": 0,
+      "description": "",
+      "panelsJSON": "[{\"size_x\":6,\"size_y\":3,\"panelIndex\":1,\"type\":\"visualization\",\"id\":\"6413f870-80f6-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":1},{\"size_x\":6,\"size_y\":3,\"panelIndex\":2,\"type\":\"visualization\",\"id\":\"927debf0-8649-11e7-967f-798bfd7ac13a\",\"col\":7,\"row\":1},{\"size_x\":12,\"size_y\":3,\"panelIndex\":3,\"type\":\"visualization\",\"id\":\"efa3f7a0-80f6-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":7},{\"size_x\":12,\"size_y\":3,\"panelIndex\":4,\"type\":\"visualization\",\"id\":\"1f7d9210-80f7-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":10},{\"size_x\":6,\"size_y\":3,\"panelIndex\":5,\"type\":\"visualization\",\"id\":\"2be7b6f0-8649-11e7-967f-798bfd7ac13a\",\"col\":7,\"row\":4},{\"size_x\":6,\"size_y\":3,\"panelIndex\":6,\"type\":\"visualization\",\"id\":\"680b6480-826e-11e7-8a20-b7bc68c2e9e7\",\"col\":7,\"row\":13},{\"size_x\":6,\"size_y\":3,\"panelIndex\":7,\"type\":\"visualization\",\"id\":\"158dfc80-864d-11e7-bce7-4532b9d239a0\",\"col\":1,\"row\":4}]",
+      "optionsJSON": "{\"darkTheme\":false}",
+      "uiStateJSON": "{\"P-3\":{\"vis\":{\"defaultColors\":{\"0 - 100\":\"rgb(0,104,55)\"}}},\"P-4\":{\"vis\":{\"defaultColors\":{\"0 - 100\":\"rgb(0,104,55)\"}}},\"P-1\":{\"mapZoom\":2,\"mapCenter\":[40.58058466412761,1.7578125]},\"P-6\":{\"vis\":{\"defaultColors\":{\"0 - 0.25\":\"rgb(247,252,245)\",\"0.25 - 0.5\":\"rgb(199,233,192)\",\"0.5 - 0.75\":\"rgb(116,196,118)\",\"0.75 - 1\":\"rgb(35,139,69)\"}}}}",
+      "version": 1,
+      "timeRestore": false,
+      "kibanaSavedObjectMeta": {
+        "searchSourceJSON": "{\"filter\":[{\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}}}],\"highlightAll\":true,\"version\":true}"
+      }
+    }
+  },
+  {
+    "_id": "927debf0-8649-11e7-967f-798bfd7ac13a",
+    "_type": "visualization",
+    "_source": {
+      "title": "Rspamd Actions",
+      "visState": "{\"title\":\"Rspamd Actions\",\"type\":\"pie\",\"params\":{\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"isDonut\":false},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"rspam_meta.action\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
+      "uiStateJSON": "{}",
+      "description": "",
+      "version": 1,
+      "kibanaSavedObjectMeta": {
+        "searchSourceJSON": "{\"index\":\"rspamd-*\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
+      }
+    }
+  },
+  {
+    "_id": "6413f870-80f6-11e7-91e6-0986b0b459e7",
+    "_type": "visualization",
+    "_source": {
+      "title": "Rspamd Geo Map",
+      "visState": "{\n  \"title\": \"Rspamd Geo Map\",\n  \"type\": \"tile_map\",\n  \"params\": {\n    \"mapType\": \"Scaled Circle Markers\",\n    \"isDesaturated\": true,\n    \"addTooltip\": true,\n    \"heatMaxZoom\": 0,\n    \"heatMinOpacity\": 0.1,\n    \"heatRadius\": 25,\n    \"heatBlur\": 15,\n    \"legendPosition\": \"bottomright\",\n    \"mapZoom\": 2,\n    \"mapCenter\": [\n      0,\n      0\n    ],\n    \"wms\": {\n      \"enabled\": false,\n      \"url\": \"https://basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer\",\n      \"options\": {\n        \"version\": \"1.3.0\",\n        \"layers\": \"0\",\n        \"format\": \"image/png\",\n        \"transparent\": true,\n        \"attribution\": \"Maps provided by USGS\",\n        \"styles\": \"\"\n      }\n    }\n  },\n  \"aggs\": [\n    {\n      \"id\": \"1\",\n      \"enabled\": true,\n      \"type\": \"count\",\n      \"schema\": \"metric\",\n      \"params\": {}\n    },\n    {\n      \"id\": \"2\",\n      \"enabled\": true,\n      \"type\": \"geohash_grid\",\n      \"schema\": \"segment\",\n      \"params\": {\n        \"field\": \"rspam_meta.geoip.location\",\n        \"autoPrecision\": true,\n        \"useGeocentroid\": true,\n        \"precision\": 2\n      }\n    }\n  ],\n  \"listeners\": {}\n}",
+      "uiStateJSON": "{}",
+      "description": "",
+      "version": 1,
+      "kibanaSavedObjectMeta": {
+        "searchSourceJSON": "{\n  \"index\": \"rspamd-*\",\n  \"query\": {\n    \"query_string\": {\n      \"query\": \"*\",\n      \"analyze_wildcard\": true\n    }\n  },\n  \"filter\": []\n}"
+      }
+    }
+  },
+  {
+    "_id": "92a92c00-80f6-11e7-91e6-0986b0b459e7",
+    "_type": "visualization",
+    "_source": {
+      "title": "Rspamd Spam Map",
+      "visState": "{\n  \"title\": \"Rspamd Spam Map\",\n  \"type\": \"tile_map\",\n  \"params\": {\n    \"mapType\": \"Scaled Circle Markers\",\n    \"isDesaturated\": true,\n    \"addTooltip\": true,\n    \"heatMaxZoom\": 0,\n    \"heatMinOpacity\": 0.1,\n    \"heatRadius\": 25,\n    \"heatBlur\": 15,\n    \"legendPosition\": \"bottomright\",\n    \"mapZoom\": 2,\n    \"mapCenter\": [\n      0,\n      0\n    ],\n    \"wms\": {\n      \"enabled\": false,\n      \"url\": \"https://basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer\",\n      \"options\": {\n        \"version\": \"1.3.0\",\n        \"layers\": \"0\",\n        \"format\": \"image/png\",\n        \"transparent\": true,\n        \"attribution\": \"Maps provided by USGS\",\n        \"styles\": \"\"\n      }\n    }\n  },\n  \"aggs\": [\n    {\n      \"id\": \"1\",\n      \"enabled\": true,\n      \"type\": \"count\",\n      \"schema\": \"metric\",\n      \"params\": {}\n    },\n    {\n      \"id\": \"2\",\n      \"enabled\": true,\n      \"type\": \"geohash_grid\",\n      \"schema\": \"segment\",\n      \"params\": {\n        \"field\": \"rspam_meta.geoip.location\",\n        \"autoPrecision\": true,\n        \"useGeocentroid\": true,\n        \"precision\": 2\n      }\n    }\n  ],\n  \"listeners\": {}\n}",
+      "uiStateJSON": "{}",
+      "description": "",
+      "version": 1,
+      "kibanaSavedObjectMeta": {
+        "searchSourceJSON": "{\n  \"index\": \"rspamd-*\",\n  \"query\": {\n    \"query_string\": {\n      \"query\": \"*\",\n      \"analyze_wildcard\": true\n    }\n  },\n  \"filter\": [\n    {\n      \"meta\": {\n        \"index\": \"rspamd_beat-*\",\n        \"negate\": true,\n        \"disabled\": false,\n        \"alias\": null,\n        \"type\": \"phrase\",\n        \"key\": \"rspam_meta.action\",\n        \"value\": \"no action\"\n      },\n      \"query\": {\n        \"match\": {\n          \"rspam_meta.action\": {\n            \"query\": \"no action\",\n            \"type\": \"phrase\"\n          }\n        }\n      },\n      \"$state\": {\n        \"store\": \"appState\"\n      }\n    }\n  ]\n}"
+      }
+    }
+  },
+  {
+    "_id": "2be7b6f0-8649-11e7-967f-798bfd7ac13a",
+    "_type": "visualization",
+    "_source": {
+      "title": "Rspamd Symbols Cloud",
+      "visState": "{\"title\":\"Rspamd Symbols Cloud\",\"type\":\"tagcloud\",\"params\":{\"scale\":\"linear\",\"orientation\":\"single\",\"minFontSize\":18,\"maxFontSize\":72},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"rspam_meta.symbols.name\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
+      "uiStateJSON": "{}",
+      "description": "",
+      "version": 1,
+      "kibanaSavedObjectMeta": {
+        "searchSourceJSON": "{\"index\":\"rspamd-*\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
+      }
+    }
+  },
+  {
+    "_id": "1f7d9210-80f7-11e7-91e6-0986b0b459e7",
+    "_type": "visualization",
+    "_source": {
+      "title": "Rspamd Top recipients",
+      "visState": "{\n  \"title\": \"Rspamd Top recipients\",\n  \"type\": \"metric\",\n  \"params\": {\n    \"addTooltip\": true,\n    \"addLegend\": false,\n    \"type\": \"gauge\",\n    \"gauge\": {\n      \"verticalSplit\": false,\n      \"autoExtend\": false,\n      \"percentageMode\": false,\n      \"gaugeType\": \"Metric\",\n      \"gaugeStyle\": \"Full\",\n      \"backStyle\": \"Full\",\n      \"orientation\": \"vertical\",\n      \"colorSchema\": \"Green to Red\",\n      \"gaugeColorMode\": \"None\",\n      \"useRange\": false,\n      \"colorsRange\": [\n        {\n          \"from\": 0,\n          \"to\": 100\n        }\n      ],\n      \"invertColors\": false,\n      \"labels\": {\n        \"show\": true,\n        \"color\": \"black\"\n      },\n      \"scale\": {\n        \"show\": false,\n        \"labels\": false,\n        \"color\": \"#333\",\n        \"width\": 2\n      },\n      \"type\": \"simple\",\n      \"style\": {\n        \"fontSize\": 60,\n        \"bgFill\": \"#000\",\n        \"bgColor\": false,\n        \"labelColor\": false,\n        \"subText\": \"\"\n      }\n    }\n  },\n  \"aggs\": [\n    {\n      \"id\": \"1\",\n      \"enabled\": true,\n      \"type\": \"count\",\n      \"schema\": \"metric\",\n      \"params\": {}\n    },\n    {\n      \"id\": \"2\",\n      \"enabled\": true,\n      \"type\": \"terms\",\n      \"schema\": \"group\",\n      \"params\": {\n        \"field\": \"rspam_meta.rcpt\",\n        \"size\": 5,\n        \"order\": \"desc\",\n        \"orderBy\": \"1\"\n      }\n    }\n  ],\n  \"listeners\": {}\n}",
+      "uiStateJSON": "{\n  \"vis\": {\n    \"defaultColors\": {\n      \"0 - 100\": \"rgb(0,104,55)\"\n    }\n  }\n}",
+      "description": "",
+      "version": 1,
+      "kibanaSavedObjectMeta": {
+        "searchSourceJSON": "{\n  \"index\": \"rspamd-*\",\n  \"query\": {\n    \"query_string\": {\n      \"query\": \"*\",\n      \"analyze_wildcard\": true\n    }\n  },\n  \"filter\": []\n}"
+      }
+    }
+  },
+  {
+    "_id": "efa3f7a0-80f6-11e7-91e6-0986b0b459e7",
+    "_type": "visualization",
+    "_source": {
+      "title": "Rspamd Top Senders",
+      "visState": "{\n  \"title\": \"Rspamd Top Senders\",\n  \"type\": \"metric\",\n  \"params\": {\n    \"addTooltip\": true,\n    \"addLegend\": false,\n    \"type\": \"gauge\",\n    \"gauge\": {\n      \"verticalSplit\": false,\n      \"autoExtend\": false,\n      \"percentageMode\": false,\n      \"gaugeType\": \"Metric\",\n      \"gaugeStyle\": \"Full\",\n      \"backStyle\": \"Full\",\n      \"orientation\": \"vertical\",\n      \"colorSchema\": \"Green to Red\",\n      \"gaugeColorMode\": \"None\",\n      \"useRange\": false,\n      \"colorsRange\": [\n        {\n          \"from\": 0,\n          \"to\": 100\n        }\n      ],\n      \"invertColors\": false,\n      \"labels\": {\n        \"show\": true,\n        \"color\": \"black\"\n      },\n      \"scale\": {\n        \"show\": false,\n        \"labels\": false,\n        \"color\": \"#333\",\n        \"width\": 2\n      },\n      \"type\": \"simple\",\n      \"style\": {\n        \"fontSize\": 60,\n        \"bgFill\": \"#000\",\n        \"bgColor\": false,\n        \"labelColor\": false,\n        \"subText\": \"\"\n      }\n    }\n  },\n  \"aggs\": [\n    {\n      \"id\": \"1\",\n      \"enabled\": true,\n      \"type\": \"count\",\n      \"schema\": \"metric\",\n      \"params\": {}\n    },\n    {\n      \"id\": \"2\",\n      \"enabled\": true,\n      \"type\": \"terms\",\n      \"schema\": \"group\",\n      \"params\": {\n        \"field\": \"rspam_meta.user\",\n        \"size\": 5,\n        \"order\": \"desc\",\n        \"orderBy\": \"1\"\n      }\n    }\n  ],\n  \"listeners\": {}\n}",
+      "uiStateJSON": "{\n  \"vis\": {\n    \"defaultColors\": {\n      \"0 - 100\": \"rgb(0,104,55)\"\n    }\n  }\n}",
+      "description": "",
+      "version": 1,
+      "kibanaSavedObjectMeta": {
+        "searchSourceJSON": "{\n  \"index\": \"rspamd-*\",\n  \"query\": {\n    \"query_string\": {\n      \"query\": \"*\",\n      \"analyze_wildcard\": true\n    }\n  },\n  \"filter\": [\n    {\n      \"meta\": {\n        \"index\": \"rspamd_beat-*\",\n        \"negate\": true,\n        \"disabled\": false,\n        \"alias\": null,\n        \"type\": \"phrase\",\n        \"key\": \"rspam_meta.user\",\n        \"value\": \"unknown\"\n      },\n      \"query\": {\n        \"match\": {\n          \"rspam_meta.user\": {\n            \"query\": \"unknown\",\n            \"type\": \"phrase\"\n          }\n        }\n      },\n      \"$state\": {\n        \"store\": \"appState\"\n      }\n    }\n  ]\n}"
+      }
+    }
+  }
+]
diff --git a/contrib/elastic/rspamd_template.json b/contrib/elastic/rspamd_template.json
new file mode 100644 (file)
index 0000000..96e011e
--- /dev/null
@@ -0,0 +1,146 @@
+{
+  "mappings": {
+    "_default_": {
+      "_all": {
+        "norms": false
+      },
+      "_meta": {
+        "version": "5.5.2"
+      },
+      "date_detection": false,
+      "dynamic_templates": [
+        {
+          "strings_as_keyword": {
+            "mapping": {
+              "ignore_above": 1024,
+              "type": "keyword"
+            },
+            "match_mapping_type": "string"
+          }
+        }
+      ],
+      "properties": {
+        "@timestamp": {
+          "type": "date"
+        },
+        "meta": {
+          "properties": {
+            "cloud": {
+              "properties": {
+                "availability_zone": {
+                  "ignore_above": 1024,
+                  "type": "keyword"
+                },
+                "instance_id": {
+                  "ignore_above": 1024,
+                  "type": "keyword"
+                },
+                "machine_type": {
+                  "ignore_above": 1024,
+                  "type": "keyword"
+                },
+                "project_id": {
+                  "ignore_above": 1024,
+                  "type": "keyword"
+                },
+                "provider": {
+                  "ignore_above": 1024,
+                  "type": "keyword"
+                },
+                "region": {
+                  "ignore_above": 1024,
+                  "type": "keyword"
+                }
+              }
+            }
+          }
+        },
+        "rspam_meta": {
+          "properties": {
+            "action": {
+              "ignore_above": 1024,
+              "type": "keyword"
+            },
+            "direction": {
+              "ignore_above": 1024,
+              "type": "keyword"
+            },
+            "asn": {
+              "properties": {
+                "asn": {
+                  "type": "long"
+                },
+                "country_code": {
+                  "ignore_above": 1024,
+                  "type": "keyword"
+                },
+                "ipnet": {
+                  "ignore_above": 1024,
+                  "type": "keyword"
+                },
+                "registrant": {
+                  "ignore_above": 1024,
+                  "type": "keyword"
+                }
+              }
+            },
+            "from": {
+              "ignore_above": 1024,
+              "type": "keyword"
+            },
+            "is_local": {
+              "type": "boolean"
+            },
+            "webmail": {
+              "type": "boolean"
+            },
+            "geoip": {
+              "properties": {
+                "city_name": {
+                  "ignore_above": 1024,
+                  "type": "keyword"
+                },
+                "continent_name": {
+                  "ignore_above": 1024,
+                  "type": "keyword"
+                },
+                "country_iso_code": {
+                  "ignore_above": 1024,
+                  "type": "keyword"
+                },
+                "location": {
+                  "type": "geo_point"
+                }
+              }
+            },
+            "ip": {
+              "ignore_above": 1024,
+              "type": "keyword"
+            },
+            "qid": {
+              "ignore_above": 1024,
+              "type": "keyword"
+            },
+            "score": {
+              "type": "float"
+            },
+            "user": {
+              "ignore_above": 1024,
+              "type": "keyword"
+            }
+          }
+        },
+        "tags": {
+          "ignore_above": 1024,
+          "type": "keyword"
+        }
+      }
+    }
+  },
+  "order": 0,
+  "settings": {
+    "index.mapping.total_fields.limit": 10000,
+    "index.refresh_interval": "5s"
+  },
+  "template": "rspamd-*"
+}
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua
new file mode 100644 (file)
index 0000000..f3bf69e
--- /dev/null
@@ -0,0 +1,345 @@
+--[[
+Copyright (c) 2017, Veselin Iordanov
+Copyright (c) 2018, Vsevolod Stakhov <vsevolod@highsecure.ru>
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+]]--
+
+local rspamd_logger = require 'rspamd_logger'
+local rspamd_http = require "rspamd_http"
+local rspamd_lua_utils = require "lua_util"
+local ucl = require "ucl"
+local hash = require "rspamd_cryptobox_hash"
+
+if confighelp then
+  return
+end
+
+local redis_params
+redis_params = rspamd_parse_redis_server('elastic')
+
+local rows = {}
+local nrows = 0
+local elastic_template
+local redis_params
+local settings = {
+  limit = 10,
+  index_pattern = 'rspamd-%Y.%m.%d',
+  server = 'localhost:9200',
+  template_file = '/etc/rspamd/rspamd_template.json',
+  kibana_file = '/etc/rspamd/kibana.json',
+  key_prefix = 'elastic-',
+  expire = 3600,
+  debug = false,
+  failover = false,
+  import_kibana = false,
+}
+
+local function read_file(path)
+    local file = io.open(path, "rb")
+    if not file then return nil end
+    local content = file:read "*a"
+    file:close()
+    return content
+end
+local function elastic_send_data(task)
+  local es_index = os.date(settings['index_pattern'])
+  local bulk_json = ""
+  for key,value in pairs(rows) do
+    bulk_json = bulk_json..'{ "index" : { "_index" : "'..es_index..'", "_type" : "logs" ,"pipeline": "rspamd-geoip"} }'.."\n"
+    bulk_json = bulk_json..ucl.to_format(value, 'json-compact').."\n"
+  end
+  local function http_index_data_callback(err, code, body, headers)
+    -- todo error handling we may store the rows it into redis and send it again later
+    if settings['debug'] then
+      rspamd_logger.infox(task, "After create data %1",body)
+    end
+    if code ~= 200 or err_message then
+      if settings['failover'] then
+        local h = hash.create()
+        h:update(bulk_json)
+        local key = settings['key_prefix'] ..es_index..":".. h:base32():sub(1, 20)
+        local data = rspamd_util.zstd_compress(bulk_json)
+        local function redis_set_cb(err)
+          if err ~=nil then
+            rspamd_logger.errx(task, 'redis_set_cb received error: %1', err)
+          end
+        end
+        rspamd_redis_make_request(task,
+          redis_params, -- connect params
+          key, -- hash key
+          true, -- is write
+          redis_set_cb, --callback
+          'SETEX', -- command
+          {key, tostring(settings['expire']), data} -- arguments
+        )
+      end
+    end
+  end
+  rspamd_http.request({
+    url = 'http://'..settings['server']..'/'..es_index..'/_bulk',
+    headers = {
+      ['Content-Type'] = 'application/x-ndjson',
+    },
+    body = bulk_json,
+    task = task,
+    method = 'post',
+    callback = http_index_data_callback
+  })
+
+end
+local function get_general_metadata(task)
+  local r = {}
+  local ip_addr = task:get_ip()
+  r.ip = tostring(ip_addr) or 'unknown'
+  r.webmail = false
+  if ip_addr  then
+    r.is_local = ip_addr:is_local()
+    local origin = task:get_header('X-Originating-IP')
+    if origin then
+        r.webmail = true
+        r.ip = origin
+    end
+  end
+  r.direction = "Inbound"
+  r.user = task:get_user() or 'unknown'
+  r.qid = task:get_queue_id() or 'unknown'
+  r.action = task:get_metric_action('default')
+  if r.user ~= 'unknown' then
+      r.direction = "Outbound"
+  end
+  local s = task:get_metric_score('default')[1]
+  r.score =  s
+
+  local rcpt = task:get_recipients('smtp')
+  if rcpt then
+    local l = {}
+    for _, a in ipairs(rcpt) do
+      table.insert(l, a['addr'])
+    end
+      r.rcpt = l
+  else
+    r.rcpt = 'unknown'
+  end
+  local from = task:get_from('smtp')
+  if ((from or E)[1] or E).addr then
+    r.from = from[1].addr
+  else
+    r.from = 'unknown'
+  end
+  local syminf = task:get_symbols_all()
+  r.symbols = syminf
+  r.asn = {}
+  local pool = task:get_mempool()
+  r.asn.country = pool:get_variable("country") or 'unknown'
+  r.asn.asn   = pool:get_variable("asn") or 0
+  r.asn.ipnet = pool:get_variable("ipnet") or 'unknown'
+  local function process_header(name)
+    local hdr = task:get_header_full(name)
+    if hdr then
+      local l = {}
+      for _, h in ipairs(hdr) do
+        table.insert(l, h.decoded)
+      end
+      return l
+    else
+      return 'unknown'
+    end
+  end
+  r.header_from = process_header('from')
+  r.header_to = process_header('to')
+  r.header_subject = process_header('subject')
+  r.header_date = process_header('date')
+  r.message_id = task:get_message_id()
+  return r
+end
+
+local function elastic_collect(task)
+  if not enabled then return end
+  if rspamd_lua_utils.is_rspamc_or_controller(task) then return end
+  local row = {['rspam_meta'] = get_general_metadata(task), ['@timestamp'] = os.time(os.date("*t")).."000"}
+  table.insert(rows, row)
+  nrows = nrows + 1
+  if nrows > settings['limit'] then
+    elastic_send_data(task)
+    nrows = 0
+    rows = {}
+  end
+end
+local opts = rspamd_config:get_all_opt('elastic')
+local enabled = true;
+
+local function check_elastic_server(ev_base)
+  local function http_callback(err, code, body, headers)
+    local parser = ucl.parser()
+    local res,err = parser:parse_string(body)
+    if not res then
+        rspamd_logger.infox(rspamd_config, 'failed to query elasticsearch server %1, disabling module',settings['server'])
+        enabled = false;
+        return
+    end
+    local obj = parser:get_object()
+    for node,value in pairs(obj['nodes']) do
+      local plugin_found = false
+      for i,plugin in pairs(value['plugins']) do
+        if plugin['name'] == 'ingest-geoip' then
+          plugin_found = true
+        end
+      end
+      if not plugin_found then
+        rspamd_logger.infox(rspamd_config, 'Unable to find ingest-geoip on %1 node, disabling module',node)
+        enabled = false
+        return
+      end
+    end
+  end
+  rspamd_http.request({
+    url = 'http://'..settings['server']..'/_nodes/plugins',
+    ev_base = ev_base,
+    method = 'get',
+    callback = http_callback
+  })
+  if enabled then
+    local function http_ingest_callback(err, code, body, headers)
+      if code ~= 200 or err_message then
+        -- pipeline not exist
+      end
+    end
+    rspamd_http.request({
+      url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
+      ev_base = ev_base,
+      method = 'get',
+      callback = http_ingest_callback
+    })
+    -- lets try to create ingest pipeline if not exist
+    rspamd_http.request({
+      url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
+      task = task,
+      body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}',
+      method = 'put',
+    })
+  end
+  local function http_template_create_callback(err, code, body, headers)
+  end
+  local function http_template_exist_callback(err, code, body, headers)
+    if code ~= 200 or err_message then
+      rspamd_http.request({
+        url = 'http://'..settings['server']..'/_template/rspamd',
+        task = task,
+        body = elastic_template,
+        method = 'put',
+        callback = http_template_create_callback
+      })
+    end
+  end
+  rspamd_http.request({
+    url = 'http://'..settings['server']..'/_template/rspamd',
+    task = task,
+    method = 'head',
+    callback = http_template_exist_callback
+  })
+end
+-- import ingest pipeline and kibana dashboard/visualization
+local function initial_setup(worker)
+  if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end
+  if enabled then
+    -- create ingest pipeline
+    rspamd_http.request({
+      url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip',
+      task = task,
+      body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}',
+      method = 'put',
+    })
+    -- create template mappings if not exist
+    local function http_template_exist_callback(err, code, body, headers)
+      if code ~= 200 or err_message then
+        rspamd_http.request({
+          url = 'http://'..settings['server']..'/_template/rspamd',
+          task = task,
+          body = elastic_template,
+          method = 'put',
+        })
+      end
+    end
+    rspamd_http.request({
+      url = 'http://'..settings['server']..'/_template/rspamd',
+      task = task,
+      method = 'head',
+      callback = http_template_exist_callback
+    })
+    -- add kibana dashboard and visualizations
+    local kibana_mappings = read_file(settings['kibana_file']);
+    if enabled and settings['import_kibana'] then
+        local kibana_mappings = read_file(settings['kibana_file']);
+        if kibana_mappings then
+          local parser = ucl.parser()
+          local res,err = parser:parse_string(kibana_mappings)
+          if not res then
+            return
+          end
+          local obj = parser:get_object()
+          local bulk_json = ""
+          for _,item in ipairs(obj) do
+            bulk_json = bulk_json..'{ "index" : { "_index" : ".kibana", "_type" : "'..item["_type"]..'" ,"_id": "'..item["_id"]..'"} }'.."\n"
+            bulk_json = bulk_json..ucl.to_format(item['_source'], 'json-compact').."\n"
+          end
+          rspamd_http.request({
+            url = 'http://'..settings['server']..'/.kibana/_bulk',
+            headers = {
+              ['Content-Type'] = 'application/x-ndjson',
+            },
+            body = bulk_json,
+            task = task,
+            method = 'post'
+          })
+        else
+          rspamd_logger.infox(rspamd_config, 'kibana templatefile not found')
+        end
+    end
+  end
+end
+rspamd_config:add_on_load(function(cfg, ev_base,worker)
+  if opts then
+      for k,v in pairs(opts) do
+        settings[k] = v
+      end
+      if not settings['server'] then
+        rspamd_logger.infox(rspamd_config, 'no elastic servers are specified, disabling module')
+        enabled = false
+        return
+      end
+      if not settings['template_file'] then
+          rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module')
+          enabled = false
+          return
+      end
+  end
+  elastic_template = read_file(settings['template_file']);
+  if not elastic_template then
+        rspamd_logger.infox(rspamd_config, 'elastic unable to read mappings, disabling module')
+        enabled = false
+        return
+  end
+  redis_params = rspamd_parse_redis_server('elastic')
+  check_elastic_server(ev_base) -- check for elasticsearch requirements
+  initial_setup(worker) -- import mappings pipeline and visualizations
+end)
+
+if enabled == true then
+  rspamd_config:register_symbol({
+    name = 'ELASTIC_COLLECT',
+    type = 'postfilter',
+    callback = elastic_collect,
+    priority = 10
+  })
+end