]> source.dussan.org Git - rspamd.git/commitdiff
[Rework] Breaking: Actualize elastic module, support Elastic 8 & OpenSearch 2, add...
authorDmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
Fri, 11 Oct 2024 16:49:58 +0000 (18:49 +0200)
committerDmitriy Alekseev <1865999+dragoangel@users.noreply.github.com>
Sun, 13 Oct 2024 22:16:02 +0000 (00:16 +0200)
Signed-off-by: Dmytro Alieksieiev <1865999+dragoangel@users.noreply.github.com>
CMakeLists.txt
conf/modules.d/elastic.conf
contrib/elastic/kibana.json [deleted file]
contrib/elastic/rspamd_template.json [deleted file]
src/plugins/lua/elastic.lua

index 3d384fa1b156258eeeb3ddc8ebc6d0e5bd0232b5..e22c63992ca700c33b0e3ab4dfba3e29c87edcc9 100644 (file)
@@ -791,8 +791,6 @@ IF (INSTALL_WEBUI MATCHES "ON")
 ENDIF (INSTALL_WEBUI MATCHES "ON")
 
 
-INSTALL(DIRECTORY "contrib/elastic/" DESTINATION "${SHAREDIR}/elastic" PATTERN ".git" EXCLUDE)
-
 ADD_CUSTOM_TARGET(dist ${CMAKE_SOURCE_DIR}/dist.sh
         "${CMAKE_BINARY_DIR}/rspamd-${RSPAMD_VERSION}.tar.xz" "${TAR}"
         COMMENT "Create source distribution"
index 80560237ad76786417f3dee60ae58eb624eef59c..6255d528f9edd69e531d75b145a7b072f4749ce9 100644 (file)
 # Module documentation can be found at  https://rspamd.com/doc/modules/elastic.html
 
 elastic {
-  # Push update when 10 records are collected (10 if unset)
-  limit = 10;
-  # IP:port of Elasticsearch server
-  #server = "localhost:9200";
-  # Timeout to wait for response (5 seconds if unset)
-  timeout = 5;
-  # Elasticsearch template file (json format)
-  #template_file = "${SHAREDIR}/elastic/rspamd_template.json";
-  # Kibana prebuild visualizations and dashboard template (json format)
-  #kibana_file = "${SHAREDIR}/elastic/kibana.json";
-  # Elasticsearch index name pattern
-  index_pattern = "rspamd-%Y.%m.%d";
-  # Dump debug information
-  debug = false;
-  # Import kibana template
-  import_kibana = false;
+  enabled = false;
+  # server = "localhost:9200";
+  # user = "";
+  # password = "";
+  use_https = false;
+  periodic_interval = 5.0;
+  timeout = 5.0;
+  no_ssl_verify = false;
+  use_gzip = true;
+  use_keepalive = true;
+  version = {
+    autodetect_enabled = true;
+    autodetect_max_fail = 12;
+    # override works only if autodetect is disabled
+    override = {
+      name = "opensearch";
+      version = "2.17";
+    }
+  };
+  limits = {
+    max_rows = 500; # max logs in one bulk req to elastic and first reason to flush buffer to elastic
+    max_interval = 60; # seconds, if first log in buffer older then interval - flush buffer
+    max_size = 5000000; # max symbols count in buffer, if reached - flush buffer, f.e: 5000000 ~= 10MB/normal-worker
+    max_fail = 3;
+  };
+  index_template = {
+    managed = true;
+    name = "rspamd";
+    priority = 0;
+    pattern = "%Y.%m.%d";
+    shards_count = 3;
+    replicas_count = 1;
+    refresh_interval = 5; # seconds
+    dynamic_keyword_ignore_above = 256;
+    headers_text_ignore_above = 2048; # strip headers value and add "..." to the end; set 0 to disable limit
+    symbols_nested = false;
+    empty_value = "unknown"; # empty numbers, ips and ipnets are not customizable they will be always 0, :: and ::/128 respectively
+  };
+  index_policy = {
+    enabled = true;
+    managed = true;
+    name = "rspamd"; # if you want use custom lifecycle policy, change name and set managed = false
+    hot = {
+      index_priority = 100;
+    };
+    warm = {
+      enabled = true;
+      after = "2d";
+      index_priority = 50;
+      migrate = true; # only supported with elastic distro, will not have impact elsewhere
+      read_only = true;
+      change_replicas = false;
+      replicas_count = 1;
+      shrink = false;
+      shards_count = 1;
+      max_gb_per_shard = 0; # zero - disabled by default, if enabled - shards_count is ignored
+      force_merge = true;
+      segments_count = 1;
+    };
+    cold = {
+      enabled = true;
+      after = "14d";
+      index_priority = 0;
+      migrate = true; # only supported with elastic distro, will not have impact elsewhere
+      read_only = true;
+      change_replicas = false;
+      replicas_count = 1;
+    };
+    delete = {
+      enabled = true;
+      after = "30d";
+    };
+  };
+  collect_headers = [
+    "From";
+    "To";
+    "Subject";
+    "Date";
+    "User-Agent";
+  ];
+  # extra headers to collect, f.e.:
+  # "Precedence";
+  # "List-Id";
+  extra_collect_headers = [];
+  geoip = {
+    enabled = true;
+    managed = true;
+    pipeline_name = "rspamd-geoip";
+  };
+
   .include(try=true,priority=5) "${DBDIR}/dynamic/elastic.conf"
   .include(try=true,priority=1,duplicate=merge) "$LOCAL_CONFDIR/local.d/elastic.conf"
   .include(try=true,priority=10) "$LOCAL_CONFDIR/override.d/elastic.conf"
diff --git a/contrib/elastic/kibana.json b/contrib/elastic/kibana.json
deleted file mode 100644 (file)
index 17b68b6..0000000
+++ /dev/null
@@ -1,138 +0,0 @@
-[
-  {
-    "_type": "index-pattern",
-    "_id": "eb48a1c0-23a2-11e8-b222-e710267d9b66",
-    "_score": 1,
-    "_source": {
-      "type": "index-pattern",
-      "index-pattern": {
-        "title": "rspamd-*",
-        "timeFieldName": "@timestamp",
-        "fields": "[{\"name\":\"rspamd_meta.action\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.asn.asn\",\"type\":\"number\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.asn.country\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.asn.ipnet\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.direction\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.from\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.geoip.city_name\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.geoip.continent_name\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.geoip.country_iso_code\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.geoip.location.lat\",\"type\":\"number\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.geoip.location.lon\",\"type\":\"number\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.geoip.region_name\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.header_date\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.header_from\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.header_subject\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.header_to\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.ip\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.is_local\",\"type\":\"boolean\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.message_id\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.qid\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.rcpt\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.score\",\"type\":\"number\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.symbols.group\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.symbols.name\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.symbols.options\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.symbols.score\",\"type\":\"number\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.user\",\"type\":\"string\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"name\":\"rspamd_meta.webmail\",\"type\":\"boolean\",\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true}]",
-        "sourceFilters": "[{\"value\":\"rspamd_meta*\"}]"
-      }
-    }
-  },
-  {
-    "_id": "6c6a2ed0-8660-11e7-85ae-fbc80f1b7844",
-    "_type": "dashboard",
-    "_source": {
-      "type": "dashboard",
-      "dashboard": {
-        "title": "Rspamd Dashboard",
-        "hits": 0,
-        "description": "",
-        "panelsJSON": "[{\"size_x\":6,\"size_y\":3,\"panelIndex\":1,\"type\":\"visualization\",\"id\":\"6413f870-80f6-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":1},{\"size_x\":6,\"size_y\":3,\"panelIndex\":2,\"type\":\"visualization\",\"id\":\"927debf0-8649-11e7-967f-798bfd7ac13a\",\"col\":7,\"row\":1},{\"size_x\":12,\"size_y\":3,\"panelIndex\":3,\"type\":\"visualization\",\"id\":\"efa3f7a0-80f6-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":7},{\"size_x\":12,\"size_y\":3,\"panelIndex\":4,\"type\":\"visualization\",\"id\":\"1f7d9210-80f7-11e7-91e6-0986b0b459e7\",\"col\":1,\"row\":10},{\"size_x\":6,\"size_y\":3,\"panelIndex\":5,\"type\":\"visualization\",\"id\":\"2be7b6f0-8649-11e7-967f-798bfd7ac13a\",\"col\":7,\"row\":4},{\"size_x\":6,\"size_y\":3,\"panelIndex\":6,\"type\":\"visualization\",\"id\":\"680b6480-826e-11e7-8a20-b7bc68c2e9e7\",\"col\":7,\"row\":13},{\"size_x\":6,\"size_y\":3,\"panelIndex\":7,\"type\":\"visualization\",\"id\":\"158dfc80-864d-11e7-bce7-4532b9d239a0\",\"col\":1,\"row\":4}]",
-        "optionsJSON": "{\"darkTheme\":false}",
-        "uiStateJSON": "{\"P-3\":{\"vis\":{\"defaultColors\":{\"0 - 100\":\"rgb(0,104,55)\"}}},\"P-4\":{\"vis\":{\"defaultColors\":{\"0 - 100\":\"rgb(0,104,55)\"}}},\"P-1\":{\"mapZoom\":2,\"mapCenter\":[40.58058466412761,1.7578125]},\"P-6\":{\"vis\":{\"defaultColors\":{\"0 - 0.25\":\"rgb(247,252,245)\",\"0.25 - 0.5\":\"rgb(199,233,192)\",\"0.5 - 0.75\":\"rgb(116,196,118)\",\"0.75 - 1\":\"rgb(35,139,69)\"}}}}",
-        "version": 1,
-        "timeRestore": false,
-        "kibanaSavedObjectMeta": {
-          "searchSourceJSON": "{\"filter\":[{\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}}}],\"highlightAll\":true,\"version\":true}"
-        }
-      }
-    }
-  },
-  {
-    "_id": "927debf0-8649-11e7-967f-798bfd7ac13a",
-    "_type": "visualization",
-    "_source": {
-      "type": "visualization",
-      "visualization": {
-        "title": "Rspamd Actions",
-        "visState": "{\"title\":\"Rspamd Actions\",\"type\":\"pie\",\"params\":{\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"isDonut\":false},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"rspamd_meta.action\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
-        "uiStateJSON": "{}",
-        "description": "",
-        "version": 1,
-        "kibanaSavedObjectMeta": {
-          "searchSourceJSON": "{\"index\":\"eb48a1c0-23a2-11e8-b222-e710267d9b66\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
-        }
-      }
-    }
-  },
-  {
-    "_id": "6413f870-80f6-11e7-91e6-0986b0b459e7",
-    "_type": "visualization",
-    "_source": {
-      "type": "visualization",
-      "visualization": {
-        "title": "Rspamd Geo Map",
-        "visState": "{\n  \"title\": \"Rspamd Geo Map\",\n  \"type\": \"tile_map\",\n  \"params\": {\n    \"mapType\": \"Scaled Circle Markers\",\n    \"isDesaturated\": true,\n    \"addTooltip\": true,\n    \"heatMaxZoom\": 0,\n    \"heatMinOpacity\": 0.1,\n    \"heatRadius\": 25,\n    \"heatBlur\": 15,\n    \"legendPosition\": \"bottomright\",\n    \"mapZoom\": 2,\n    \"mapCenter\": [\n      0,\n      0\n    ],\n    \"wms\": {\n      \"enabled\": false,\n      \"url\": \"https://basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer\",\n      \"options\": {\n        \"version\": \"1.3.0\",\n        \"layers\": \"0\",\n        \"format\": \"image/png\",\n        \"transparent\": true,\n        \"attribution\": \"Maps provided by USGS\",\n        \"styles\": \"\"\n      }\n    }\n  },\n  \"aggs\": [\n    {\n      \"id\": \"1\",\n      \"enabled\": true,\n      \"type\": \"count\",\n      \"schema\": \"metric\",\n      \"params\": {}\n    },\n    {\n      \"id\": \"2\",\n      \"enabled\": true,\n      \"type\": \"geohash_grid\",\n      \"schema\": \"segment\",\n      \"params\": {\n        \"field\": \"rspamd_meta.geoip.location\",\n        \"autoPrecision\": true,\n        \"useGeocentroid\": true,\n        \"precision\": 2\n      }\n    }\n  ],\n  \"listeners\": {}\n}",
-        "uiStateJSON": "{}",
-        "description": "",
-        "version": 1,
-        "kibanaSavedObjectMeta": {
-          "searchSourceJSON": "{\n  \"index\": \"eb48a1c0-23a2-11e8-b222-e710267d9b66\",\n  \"query\": {\n    \"query_string\": {\n      \"query\": \"*\",\n      \"analyze_wildcard\": true\n    }\n  },\n  \"filter\": []\n}"
-        }
-      }
-    }
-  },
-  {
-    "_id": "92a92c00-80f6-11e7-91e6-0986b0b459e7",
-    "_type": "visualization",
-    "_source": {
-      "type": "visualization",
-      "visualization": {
-        "title": "Rspamd Spam Map",
-        "visState": "{\n  \"title\": \"Rspamd Spam Map\",\n  \"type\": \"tile_map\",\n  \"params\": {\n    \"mapType\": \"Scaled Circle Markers\",\n    \"isDesaturated\": true,\n    \"addTooltip\": true,\n    \"heatMaxZoom\": 0,\n    \"heatMinOpacity\": 0.1,\n    \"heatRadius\": 25,\n    \"heatBlur\": 15,\n    \"legendPosition\": \"bottomright\",\n    \"mapZoom\": 2,\n    \"mapCenter\": [\n      0,\n      0\n    ],\n    \"wms\": {\n      \"enabled\": false,\n      \"url\": \"https://basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer\",\n      \"options\": {\n        \"version\": \"1.3.0\",\n        \"layers\": \"0\",\n        \"format\": \"image/png\",\n        \"transparent\": true,\n        \"attribution\": \"Maps provided by USGS\",\n        \"styles\": \"\"\n      }\n    }\n  },\n  \"aggs\": [\n    {\n      \"id\": \"1\",\n      \"enabled\": true,\n      \"type\": \"count\",\n      \"schema\": \"metric\",\n      \"params\": {}\n    },\n    {\n      \"id\": \"2\",\n      \"enabled\": true,\n      \"type\": \"geohash_grid\",\n      \"schema\": \"segment\",\n      \"params\": {\n        \"field\": \"rspamd_meta.geoip.location\",\n        \"autoPrecision\": true,\n        \"useGeocentroid\": true,\n        \"precision\": 2\n      }\n    }\n  ],\n  \"listeners\": {}\n}",
-        "uiStateJSON": "{}",
-        "description": "",
-        "version": 1,
-        "kibanaSavedObjectMeta": {
-          "searchSourceJSON": "{\n  \"index\": \"eb48a1c0-23a2-11e8-b222-e710267d9b66\",\n  \"query\": {\n    \"query_string\": {\n      \"query\": \"*\",\n      \"analyze_wildcard\": true\n    }\n  },\n  \"filter\": [\n    {\n      \"meta\": {\n        \"index\": \"rspamd_beat-*\",\n        \"negate\": true,\n        \"disabled\": false,\n        \"alias\": null,\n        \"type\": \"phrase\",\n        \"key\": \"rspamd_meta.action\",\n        \"value\": \"no action\"\n      },\n      \"query\": {\n        \"match\": {\n          \"rspamd_meta.action\": {\n            \"query\": \"no action\",\n            \"type\": \"phrase\"\n          }\n        }\n      },\n      \"$state\": {\n        \"store\": \"appState\"\n      }\n    }\n  ]\n}"
-        }
-      }
-    }
-  },
-  {
-    "_id": "2be7b6f0-8649-11e7-967f-798bfd7ac13a",
-    "_type": "visualization",
-    "_source": {
-      "type": "visualization",
-      "visualization": {
-        "title": "Rspamd Symbols Cloud",
-        "visState": "{\"title\":\"Rspamd Symbols Cloud\",\"type\":\"tagcloud\",\"params\":{\"scale\":\"linear\",\"orientation\":\"single\",\"minFontSize\":18,\"maxFontSize\":72},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"rspamd_meta.symbols.name\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
-        "uiStateJSON": "{}",
-        "description": "",
-        "version": 1,
-        "kibanaSavedObjectMeta": {
-          "searchSourceJSON": "{\"index\":\"eb48a1c0-23a2-11e8-b222-e710267d9b66\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
-        }
-      }
-    }
-  },
-  {
-    "_id": "1f7d9210-80f7-11e7-91e6-0986b0b459e7",
-    "_type": "visualization",
-    "_source": {
-      "type": "visualization",
-      "visualization": {
-        "title": "Rspamd Top recipients",
-        "visState": "{\n  \"title\": \"Rspamd Top recipients\",\n  \"type\": \"metric\",\n  \"params\": {\n    \"addTooltip\": true,\n    \"addLegend\": false,\n    \"type\": \"gauge\",\n    \"gauge\": {\n      \"verticalSplit\": false,\n      \"autoExtend\": false,\n      \"percentageMode\": false,\n      \"gaugeType\": \"Metric\",\n      \"gaugeStyle\": \"Full\",\n      \"backStyle\": \"Full\",\n      \"orientation\": \"vertical\",\n      \"colorSchema\": \"Green to Red\",\n      \"gaugeColorMode\": \"None\",\n      \"useRange\": false,\n      \"colorsRange\": [\n        {\n          \"from\": 0,\n          \"to\": 100\n        }\n      ],\n      \"invertColors\": false,\n      \"labels\": {\n        \"show\": true,\n        \"color\": \"black\"\n      },\n      \"scale\": {\n        \"show\": false,\n        \"labels\": false,\n        \"color\": \"#333\",\n        \"width\": 2\n      },\n      \"type\": \"simple\",\n      \"style\": {\n        \"fontSize\": 60,\n        \"bgFill\": \"#000\",\n        \"bgColor\": false,\n        \"labelColor\": false,\n        \"subText\": \"\"\n      }\n    }\n  },\n  \"aggs\": [\n    {\n      \"id\": \"1\",\n      \"enabled\": true,\n      \"type\": \"count\",\n      \"schema\": \"metric\",\n      \"params\": {}\n    },\n    {\n      \"id\": \"2\",\n      \"enabled\": true,\n      \"type\": \"terms\",\n      \"schema\": \"group\",\n      \"params\": {\n        \"field\": \"rspamd_meta.rcpt\",\n        \"size\": 5,\n        \"order\": \"desc\",\n        \"orderBy\": \"1\"\n      }\n    }\n  ],\n  \"listeners\": {}\n}",
-        "uiStateJSON": "{\n  \"vis\": {\n    \"defaultColors\": {\n      \"0 - 100\": \"rgb(0,104,55)\"\n    }\n  }\n}",
-        "description": "",
-        "version": 1,
-        "kibanaSavedObjectMeta": {
-          "searchSourceJSON": "{\n  \"index\": \"eb48a1c0-23a2-11e8-b222-e710267d9b66\",\n  \"query\": {\n    \"query_string\": {\n      \"query\": \"*\",\n      \"analyze_wildcard\": true\n    }\n  },\n  \"filter\": []\n}"
-        }
-      }
-    }
-  },
-  {
-    "_id": "efa3f7a0-80f6-11e7-91e6-0986b0b459e7",
-    "_type": "visualization",
-    "_source": {
-      "type": "visualization",
-      "visualization": {
-        "title": "Rspamd Top Senders",
-        "visState": "{\n  \"title\": \"Rspamd Top Senders\",\n  \"type\": \"metric\",\n  \"params\": {\n    \"addTooltip\": true,\n    \"addLegend\": false,\n    \"type\": \"gauge\",\n    \"gauge\": {\n      \"verticalSplit\": false,\n      \"autoExtend\": false,\n      \"percentageMode\": false,\n      \"gaugeType\": \"Metric\",\n      \"gaugeStyle\": \"Full\",\n      \"backStyle\": \"Full\",\n      \"orientation\": \"vertical\",\n      \"colorSchema\": \"Green to Red\",\n      \"gaugeColorMode\": \"None\",\n      \"useRange\": false,\n      \"colorsRange\": [\n        {\n          \"from\": 0,\n          \"to\": 100\n        }\n      ],\n      \"invertColors\": false,\n      \"labels\": {\n        \"show\": true,\n        \"color\": \"black\"\n      },\n      \"scale\": {\n        \"show\": false,\n        \"labels\": false,\n        \"color\": \"#333\",\n        \"width\": 2\n      },\n      \"type\": \"simple\",\n      \"style\": {\n        \"fontSize\": 60,\n        \"bgFill\": \"#000\",\n        \"bgColor\": false,\n        \"labelColor\": false,\n        \"subText\": \"\"\n      }\n    }\n  },\n  \"aggs\": [\n    {\n      \"id\": \"1\",\n      \"enabled\": true,\n      \"type\": \"count\",\n      \"schema\": \"metric\",\n      \"params\": {}\n    },\n    {\n      \"id\": \"2\",\n      \"enabled\": true,\n      \"type\": \"terms\",\n      \"schema\": \"group\",\n      \"params\": {\n        \"field\": \"rspamd_meta.user\",\n        \"size\": 5,\n        \"order\": \"desc\",\n        \"orderBy\": \"1\"\n      }\n    }\n  ],\n  \"listeners\": {}\n}",
-        "uiStateJSON": "{\n  \"vis\": {\n    \"defaultColors\": {\n      \"0 - 100\": \"rgb(0,104,55)\"\n    }\n  }\n}",
-        "description": "",
-        "version": 1,
-        "kibanaSavedObjectMeta": {
-          "searchSourceJSON": "{\n  \"index\": \"eb48a1c0-23a2-11e8-b222-e710267d9b66\",\n  \"query\": {\n    \"query_string\": {\n      \"query\": \"*\",\n      \"analyze_wildcard\": true\n    }\n  },\n  \"filter\": [\n    {\n      \"meta\": {\n        \"index\": \"rspamd_beat-*\",\n        \"negate\": true,\n        \"disabled\": false,\n        \"alias\": null,\n        \"type\": \"phrase\",\n        \"key\": \"rspamd_meta.user\",\n        \"value\": \"unknown\"\n      },\n      \"query\": {\n        \"match\": {\n          \"rspamd_meta.user\": {\n            \"query\": \"unknown\",\n            \"type\": \"phrase\"\n          }\n        }\n      },\n      \"$state\": {\n        \"store\": \"appState\"\n      }\n    }\n  ]\n}"
-        }
-      }
-    }
-  }
-]
diff --git a/contrib/elastic/rspamd_template.json b/contrib/elastic/rspamd_template.json
deleted file mode 100644 (file)
index ebd87fa..0000000
+++ /dev/null
@@ -1,149 +0,0 @@
-{
-  "mappings": {
-    "_meta": {
-      "version": "5.5.3"
-    },
-    "date_detection": false,
-    "dynamic_templates": [
-      {
-        "strings_as_keyword": {
-          "mapping": {
-            "ignore_above": 1024,
-            "type": "keyword"
-          },
-          "match_mapping_type": "string"
-        }
-      }
-    ],
-    "properties": {
-      "@timestamp": {
-        "type": "date"
-      },
-      "meta": {
-        "properties": {
-          "cloud": {
-            "properties": {
-              "availability_zone": {
-                "ignore_above": 1024,
-                "type": "keyword"
-              },
-              "instance_id": {
-                "ignore_above": 1024,
-                "type": "keyword"
-              },
-              "machine_type": {
-                "ignore_above": 1024,
-                "type": "keyword"
-              },
-              "project_id": {
-                "ignore_above": 1024,
-                "type": "keyword"
-              },
-              "provider": {
-                "ignore_above": 1024,
-                "type": "keyword"
-              },
-              "region": {
-                "ignore_above": 1024,
-                "type": "keyword"
-              }
-            }
-          }
-        }
-      },
-      "rspamd_meta": {
-        "properties": {
-          "action": {
-            "ignore_above": 1024,
-            "type": "keyword"
-          },
-          "direction": {
-            "ignore_above": 1024,
-            "type": "keyword"
-          },
-          "asn": {
-            "properties": {
-              "asn": {
-                "type": "long"
-              },
-              "country_code": {
-                "ignore_above": 1024,
-                "type": "keyword"
-              },
-              "ipnet": {
-                "ignore_above": 1024,
-                "type": "keyword"
-              },
-              "registrant": {
-                "ignore_above": 1024,
-                "type": "keyword"
-              }
-            }
-          },
-          "from": {
-            "ignore_above": 1024,
-            "type": "keyword"
-          },
-          "is_local": {
-            "type": "boolean"
-          },
-          "webmail": {
-            "type": "boolean"
-          },
-          "sender_ip": {
-            "ignore_above": 1024,
-            "type": "keyword"
-          },
-          "geoip": {
-            "properties": {
-              "city_name": {
-                "ignore_above": 1024,
-                "type": "keyword"
-              },
-              "continent_name": {
-                "ignore_above": 1024,
-                "type": "keyword"
-              },
-              "country_iso_code": {
-                "ignore_above": 1024,
-                "type": "keyword"
-              },
-              "location": {
-                "type": "geo_point"
-              }
-            }
-          },
-          "ip": {
-            "ignore_above": 1024,
-            "type": "keyword"
-          },
-          "qid": {
-            "ignore_above": 1024,
-            "type": "keyword"
-          },
-          "hostname": {
-            "ignore_above": 1024,
-            "type": "keyword"
-          },
-          "score": {
-            "type": "float"
-          },
-          "user": {
-            "ignore_above": 1024,
-            "type": "keyword"
-          }
-        }
-      },
-      "tags": {
-        "ignore_above": 1024,
-        "type": "keyword"
-      }
-    }
-  },
-  "order": 0,
-  "settings": {
-    "index.mapping.total_fields.limit": 10000,
-    "index.refresh_interval": "5s"
-  },
-  "index_patterns" : ["rspamd-*", "*-rspamd-*"]
-}
index ccbb7c198b7d4b1b5843fb89e779fbe15be80c97..205a5ce8602aee4fddd765b75efd4a16afe3bea5 100644 (file)
@@ -18,219 +18,615 @@ limitations under the License.
 local rspamd_logger = require 'rspamd_logger'
 local rspamd_http = require "rspamd_http"
 local lua_util = require "lua_util"
-local util = require "rspamd_util"
+local rspamd_util = require "rspamd_util"
 local ucl = require "ucl"
-local rspamd_redis = require "lua_redis"
 local upstream_list = require "rspamd_upstream_list"
-local lua_settings = require "lua_settings"
 
 if confighelp then
   return
 end
 
-local rows = {}
-local nrows = 0
-local failed_sends = 0
-local elastic_template
-local redis_params
 local N = "elastic"
-local E = {}
-local HOSTNAME = util.get_hostname()
 local connect_prefix = 'http://'
-local enabled = true
-local ingest_geoip_type = 'plugins'
+local rspamd_hostname = rspamd_util.get_hostname()
+-- supported_distro:
+-- from - minimal compatible version supported, including
+-- till & till_unknown = true - yet unreleased version, so unknown compitability, excluding
+-- hill & till_unknown = false - version is known to be not yet compatible with this module
+local supported_distro = {
+  elastic = {
+    from = '7.8',
+    till = '9',
+    till_unknown = true,
+  },
+  opensearch = {
+    from = '1',
+    till = '3',
+    till_unknown = true,
+  },
+}
+local detected_distro = {
+  name = nil,
+  version = nil,
+  supported = false,
+}
+local states = {
+  distro = {
+    configured = false,
+    errors = 0,
+  },
+  index_template = {
+    configured = false,
+    errors = 0,
+  },
+  index_policy = {
+    configured = false,
+    errors = 0,
+  },
+  geoip_pipeline = {
+    configured = false,
+    errors = 0,
+  },
+}
 local settings = {
-  limit = 500,
-  index_pattern = 'rspamd-%Y.%m.%d',
-  template_file = rspamd_paths['SHAREDIR'] .. '/elastic/rspamd_template.json',
-  kibana_file = rspamd_paths['SHAREDIR'] .. '/elastic/kibana.json',
-  key_prefix = 'elastic-',
-  expire = 3600,
+  enabled = true,
+  version = {
+    autodetect_enabled = true,
+    autodetect_max_fail  = 12,
+    -- override works only if autodetect is disabled
+    override = {
+      name = 'opensearch',
+      version = '2.17',
+    }
+  },
+  limits = {
+    max_rows = 1000, -- max logs in one bulk req to elastic and first reason to flush buffer
+    max_interval = 60, -- seconds, if first log in buffer older then interval - flush buffer
+    max_size = 5000000, -- max symbols count in buffer, if reached - flush buffer, f.e: 5000000 ~= 10MB/normal-worker
+    max_fail = 3,
+  },
+  index_template = {
+    managed = true,
+    name = 'rspamd',
+    pattern = '{service}-%Y.%m.%d',
+    priority = 0,
+    shards_count = 3,
+    replicas_count = 1,
+    refresh_interval = 5, -- seconds
+    dynamic_keyword_ignore_above = 256,
+    headers_text_ignore_above = 2048, -- strip headers value and add '...' to the end, set 0 to disable limit
+    symbols_nested = false,
+    empty_value = 'unknown', -- empty numbers, ips and ipnets are not customizable they will be always 0, :: and ::/128 respectively
+  },
+  index_policy = {
+    enabled = true,
+    managed = true,
+    name = 'rspamd', -- if you want use custom lifecycle policy, change name and set managed = false
+    hot = {
+      index_priority = 100,
+    },
+    warm = {
+      enabled = true,
+      after = '2d',
+      index_priority = 50,
+      migrate = true, -- only supported with elastic distro, will not have impact elsewhere
+      read_only = true,
+      change_replicas = false,
+      replicas_count = 1,
+      shrink = false,
+      shards_count = 1,
+      max_gb_per_shard = 0, -- zero - disabled by default, if enabled - shards_count is ignored
+      force_merge = true,
+      segments_count = 1,
+    },
+    cold = {
+      enabled = true,
+      after = '14d',
+      index_priority = 0,
+      migrate = true, -- only supported with elastic distro, will not have impact elsewhere
+      read_only = true,
+      change_replicas = false,
+      replicas_count = 1,
+    },
+    delete = {
+      enabled = true,
+      after = '30d',
+    },
+  },
+  collect_headers = {
+    'From',
+    'To',
+    'Subject',
+    'Date',
+    'User-Agent',
+  },
+  extra_collect_headers = {
+    -- 'List-Id',
+    -- 'X-Mailer',
+  },
+  geoip = {
+    enabled = true,
+    managed = true,
+    pipeline_name = 'rspamd-geoip',
+  },
+  periodic_interval = 5.0,
   timeout = 5.0,
-  failover = false,
-  import_kibana = false,
   use_https = false,
+  no_ssl_verify = false,
   use_gzip = true,
+  use_keepalive = true,
   allow_local = false,
   user = nil,
   password = nil,
-  no_ssl_verify = false,
-  max_fail = 3,
-  ingest_module = false,
-  elasticsearch_version = 6,
 }
 
-local function read_file(path)
-  local file = io.open(path, "rb")
-  if not file then
+local Queue = {}
+Queue.__index = Queue
+
+function Queue:new()
+  local obj = {first = 1, last = 0, data = {}}
+  setmetatable(obj, self)
+  return obj
+end
+
+function Queue:push(value)
+  self.last = self.last + 1
+  self.data[self.last] = value
+end
+
+function Queue:length()
+  return self.last - self.first + 1
+end
+
+function Queue:size()
+  local size = 0
+  for i = self.first, self.last do
+    local row = self.data[i]
+    if row ~= nil then
+      size = size + #row
+    end
+  end
+  return size
+end
+
+function Queue:get(index)
+  local real_index = self.first + index - 1
+  if real_index <= self.last then
+    return self.data[real_index]
+  else
     return nil
   end
-  local content = file:read "*a"
-  file:close()
-  return content
 end
 
-local function elastic_send_data(task)
-  local es_index = os.date(settings['index_pattern'])
+function Queue:get_all()
+  local items = {}
+  for i = self.first, self.last do
+    table.insert(items, self.data[i])
+  end
+  return items
+end
+
+function Queue:pop()
+  if self.first > self.last then
+    return nil
+  end
+  local value = self.data[self.first]
+  self.data[self.first] = nil
+  self.first = self.first + 1
+  return value
+end
+
+function Queue:get_first(count)
+  local items = {}
+  count = count or self:length()
+  local actual_end = math.min(self.first + count - 1, self.last)
+  for i = self.first, actual_end do
+    table.insert(items, self.data[i])
+  end
+  return items
+end
+
+function Queue:pop_first(count)
+  local popped_items = {}
+  count = count or self:length()
+  local actual_count = math.min(count, self:length())
+  for i = 1, actual_count do
+    local item = self:pop()
+    table.insert(popped_items, item)
+  end
+  return popped_items
+end
+
+local buffer = {
+  logs = Queue:new(),
+  errors = 0,
+}
+
+local function contains(tbl, val)
+  for i=1,#tbl do
+    if tbl[i]:lower() == val:lower() then
+      return true
+    end
+  end
+  return false
+end
+
+local function safe_get(table, ...)
+  local value = table
+  for _, key in ipairs({...}) do
+    if value[key] == nil then
+      return nil
+    end
+    value = value[key]
+  end
+  return value
+end
+
+local function deep_compare(t1, t2, visited)
+  if t1 == t2 then
+    return true
+  end
+
+  if type(t1) ~= "table" or type(t2) ~= "table" then
+    return false
+  end
+
+  -- use visited to keep track of already compared tables to handle cycles
+  visited = visited or {}
+  if visited[t1] and visited[t1][t2] then
+    return true
+  end
+
+  visited[t1] = visited[t1] or {}
+  visited[t1][t2] = true
+
+  -- compare the number of keys in both tables
+  local t1len = 0
+  for _ in pairs(t1) do
+    t1len = t1len + 1
+  end
+
+  local t2len = 0
+  for _ in pairs(t2) do
+    t2len = t2len + 1
+  end
+
+  if t1len ~= t2len then
+    return false
+  end
+
+  -- recursively compare each key-value pair
+  for k, v1 in pairs(t1) do
+    local v2 = t2[k]
+    if v2 == nil or not deep_compare(v1, v2, visited) then
+      return false
+    end
+  end
+  return true
+end
+
+local function compare_versions(v1, v2)
+  -- helper function to extract the numeric version string
+  local function extract_numeric_version(version)
+    -- remove any trailing characters that are not digits or dots
+    version = version:match("^([%.%d]+)")
+    local parts = {}
+    for part in string.gmatch(version or "", '([^.]+)') do
+      table.insert(parts, tonumber(part) or 0)
+    end
+    return parts
+  end
+
+  local v1_parts = extract_numeric_version(v1)
+  local v2_parts = extract_numeric_version(v2)
+  local max_length = math.max(#v1_parts, #v2_parts)
+
+  -- compare each part of the version strings
+  for i = 1, max_length do
+    local num1 = v1_parts[i] or 0
+    local num2 = v2_parts[i] or 0
+
+    if num1 > num2 then
+      return 1  -- v1 is greater than v2
+    elseif num1 < num2 then
+      return -1 -- v1 is less than v2
+    end
+    -- if equal, continue to the next part
+  end
+  return 0 -- versions are equal
+end
+
+local function handle_error(action,component,limit)
+  if states[component]['errors'] >= limit then
+    rspamd_logger.errx(rspamd_config, 'cannot %s elastic %s, failed attempts: %s/%s, stop trying',
+      action, component:gsub('_', ' '), states[component]['errors'], limit)
+    states[component]['configured'] = true
+  else
+    states[component]['errors'] = states[component]['errors'] + 1
+  end
+  return true
+end
+
+local function create_bulk_json(es_index, logs_to_send)
   local tbl = {}
-  for _, value in pairs(rows) do
-    if settings.elasticsearch_version >= 7 then
-      table.insert(tbl, '{ "index" : { "_index" : "' .. es_index ..
-          '","pipeline": "rspamd-geoip"} }')
-    else
-      table.insert(tbl, '{ "index" : { "_index" : "' .. es_index ..
-          '", "_type" : "_doc" ,"pipeline": "rspamd-geoip"} }')
+  for _, row in pairs(logs_to_send) do
+    local pipeline = ''
+    if settings['geoip']['enabled']then
+      pipeline = ',"pipeline":"'.. settings['geoip']['pipeline_name'] .. '"'
     end
-    table.insert(tbl, ucl.to_format(value, 'json-compact'))
+    table.insert(tbl, '{"index":{"_index":"' .. es_index .. '"' .. pipeline .. '}}')
+    table.insert(tbl, ucl.to_format(row, 'json-compact'))
   end
+  table.insert(tbl, '') -- for last \n
+  return table.concat(tbl, "\n")
+end
 
-  table.insert(tbl, '') -- For last \n
+local function elastic_send_data(flush_all, task, cfg, ev_base)
+  local log_object = task or rspamd_config
+  local nlogs_to_send = 0
+  local es_index
+  local upstream
+  local host
+  local push_url
+  local bulk_json
+  local logs_to_send
+  if flush_all then
+    logs_to_send = buffer['logs']:get_all()
+  else
+    logs_to_send = buffer['logs']:get_first(settings['limits']['max_rows'])
+  end
+  nlogs_to_send = #logs_to_send -- actual size can be lower then max_rows
+  if nlogs_to_send > 0 then
+    es_index = settings['index_template']['name'] .. '-' .. os.date(settings['index_template']['pattern'])
 
-  local upstream = settings.upstream:get_upstream_round_robin()
-  local ip_addr = upstream:get_addr():to_string(true)
+    upstream = settings.upstream:get_upstream_round_robin()
+    host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
+    local ip_addr = upstream:get_addr():to_string(true)
+    push_url = connect_prefix .. ip_addr .. '/' .. es_index .. '/_bulk'
 
-  local push_url = connect_prefix .. ip_addr .. '/' .. es_index .. '/_bulk'
-  local bulk_json = table.concat(tbl, "\n")
+    bulk_json = create_bulk_json(es_index, logs_to_send)
+    rspamd_logger.debugm(N, log_object, 'successfully composed payload with %s log lines', nlogs_to_send)
+  end
 
-  local function http_callback(err, code, _, _)
+  local function http_callback(err, code, body, _)
+    local push_done = false
+    local push_errors = false
     if err then
-      rspamd_logger.infox(task, "cannot push data to elastic backend (%s): %s; failed attempts: %s/%s",
-          push_url, err, failed_sends, settings.max_fail)
+      rspamd_logger.errx(log_object, 'cannot send logs to elastic (%s): %s; failed attempts: %s/%s',
+        push_url, err, buffer['errors'], settings['limits']['max_fail'])
+    elseif code == 200 then
+      local parser = ucl.parser()
+      local res, ucl_err = parser:parse_string(body)
+      if not ucl_err and res then
+        local obj = parser:get_object()
+        if not obj['errors'] then
+          push_done = true
+          rspamd_logger.debugm(N, log_object, 'successfully sent payload with %s logs', nlogs_to_send)
+        else
+          push_errors = true
+          for _, value in pairs(obj['items']) do
+            if value['index']['status'] >= 400 then
+              if value['index']['error'] then
+                if value['index']['error']['type'] and value['index']['error']['reason'] then
+                  rspamd_logger.errx(log_object,
+                    'cannot send logs to elastic (%s) due to error: %s status, %s type, due to: %s; failed attempts: %s/%s',
+                    push_url, value['index']['status'], value['index']['error']['type'], value['index']['error']['reason'],
+                    buffer['errors'], settings['limits']['max_fail'])
+                end
+              end
+            end
+          end
+        end
+      else
+        rspamd_logger.errx(log_object,
+          'cannot parse response from elastic (%s): %s; failed attempts: %s/%s',
+          push_url, ucl_err, buffer['errors'], settings['limits']['max_fail'])
+      end
     else
-      if code ~= 200 then
-        rspamd_logger.infox(task,
-            "cannot push data to elastic backend (%s): wrong http code %s (%s); failed attempts: %s/%s",
-            push_url, err, code, failed_sends, settings.max_fail)
+      rspamd_logger.errx(log_object,
+        'cannot send logs to elastic (%s) due to bad http status code: %s, response: %s; failed attempts: %s/%s',
+        push_url, code, body, buffer['errors'], settings['limits']['max_fail'])
+    end
+    -- proccess results
+    if push_done then
+      buffer['logs']:pop_first(nlogs_to_send)
+      buffer['errors'] = 0
+      upstream:ok()
+    else
+      if buffer['errors'] >= settings['limits']['max_fail'] then
+        rspamd_logger.errx(log_object, 'failed to send %s log lines, failed attempts: %s/%s, removing failed logs from bugger',
+          nlogs_to_send, buffer['errors'], settings['limits']['max_fail'])
+        buffer['logs']:pop_first(nlogs_to_send)
+        buffer['errors'] = 0
+      else
+        buffer['errors'] = buffer['errors'] + 1
+      end
+      if push_errors then
+        upstream:ok() -- we not assume upstream is failed if it return errors in response body
       else
-        lua_util.debugm(N, task, "successfully sent %s (%s bytes) rows to ES",
-            nrows, #bulk_json)
+        upstream:fail()
       end
     end
   end
 
-  return rspamd_http.request({
-    url = push_url,
-    headers = {
-      ['Content-Type'] = 'application/x-ndjson',
-    },
-    body = bulk_json,
-    callback = http_callback,
-    task = task,
-    method = 'post',
-    gzip = settings.use_gzip,
-    no_ssl_verify = settings.no_ssl_verify,
-    user = settings.user,
-    password = settings.password,
-    timeout = settings.timeout,
-  })
+  if nlogs_to_send > 0 then
+    if task then
+      return rspamd_http.request({
+        url = push_url,
+        headers = {
+          ['Host'] = host,
+          ['Content-Type'] = 'application/x-ndjson',
+        },
+        body = bulk_json,
+        task = task,
+        method = 'post',
+        callback=http_callback,
+        gzip = settings.use_gzip,
+        keepalive = settings.use_keepalive,
+        no_ssl_verify = settings.no_ssl_verify,
+        user = settings.user,
+        password = settings.password,
+        timeout = settings.timeout,
+      })
+    else
+      return rspamd_http.request({
+        url = push_url,
+        headers = {
+          ['Host'] = host,
+          ['Content-Type'] = 'application/x-ndjson',
+        },
+        body = bulk_json,
+        ev_base = ev_base,
+        config = cfg,
+        method = 'post',
+        callback=http_callback,
+        gzip = settings.use_gzip,
+        keepalive = settings.use_keepalive,
+        no_ssl_verify = settings.no_ssl_verify,
+        user = settings.user,
+        password = settings.password,
+        timeout = settings.timeout,
+      })
+    end
+  end
+end
+
+local function get_header_name(name)
+  return 'header_' .. name:lower():gsub('[%s%-]', '_')
 end
 
 local function get_general_metadata(task)
   local r = {}
-  local ip_addr = task:get_ip()
+  local empty = settings['index_template']['empty_value']
+  local user = task:get_user()
+  r.rspamd_server = rspamd_hostname or empty
 
+  r.action = task:get_metric_action() or empty
+  r.score = task:get_metric_score()[1] or 0
+  r.symbols = task:get_symbols_all()
+  for _, symbol in ipairs(r.symbols) do
+    symbol.groups = nil -- we don't need groups array in elastic
+    if type(symbol.options) == "table" then
+      symbol.options = table.concat(symbol.options, "; ")
+    end
+  end
+  r.user = user or empty
+  if user then
+    r.direction = "Inbound"
+  else
+    r.direction = "Outbound"
+  end
+  r.qid = task:get_queue_id() or empty
+  r.helo = task:get_helo() or empty
+  r.hostname = task:get_hostname() or empty
+
+  r.ip = '::'
+  r.is_local = false
+  local ip_addr = task:get_ip()
   if ip_addr and ip_addr:is_valid() then
     r.is_local = ip_addr:is_local()
     r.ip = tostring(ip_addr)
-  else
-    r.ip = '127.0.0.1'
   end
 
-  r.webmail = false
-  r.sender_ip = 'unknown'
+  r.sender_ip = '::'
   local origin = task:get_header('X-Originating-IP')
   if origin then
     origin = origin:gsub('%[', ''):gsub('%]', '')
     local rspamd_ip = require "rspamd_ip"
     local origin_ip = rspamd_ip.from_string(origin)
     if origin_ip and origin_ip:is_valid() then
-      r.webmail = true
       r.sender_ip = origin -- use string here
     end
   end
 
-  r.direction = "Inbound"
-  r.user = task:get_user() or 'unknown'
-  r.qid = task:get_queue_id() or 'unknown'
-  r.action = task:get_metric_action()
-  r.rspamd_server = HOSTNAME
-  if r.user ~= 'unknown' then
-    r.direction = "Outbound"
+  local message_id = task:get_message_id()
+  if message_id == 'undef' then
+    r.message_id = empty
+  else
+    r.message_id = message_id
   end
-  local s = task:get_metric_score()[1]
-  r.score = s
-
-  local rcpt = task:get_recipients('smtp')
-  if rcpt then
+  if task:has_recipients('smtp') then
+    local rcpt = task:get_recipients('smtp')
     local l = {}
     for _, a in ipairs(rcpt) do
       table.insert(l, a['addr'])
     end
     r.rcpt = l
   else
-    r.rcpt = 'unknown'
+    r.rcpt = empty
   end
 
-  local from = task:get_from { 'smtp', 'orig' }
-  if ((from or E)[1] or E).addr then
-    r.from = from[1].addr
-  else
-    r.from = 'unknown'
+  r.from_domain = empty
+  r.from_user = empty
+  if task:has_from('smtp') then
+    local from = task:get_from({ 'smtp', 'orig' })[1]
+    if from then
+      r.from_domain = from['domain']:lower()
+      r.from_user = from['user']:lower()
+    end
   end
 
-  local mime_from = task:get_from { 'mime', 'orig' }
-  if ((mime_from or E)[1] or E).addr then
-    r.mime_from = mime_from[1].addr
-  else
-    r.mime_from = 'unknown'
+  r.mime_from_domain = empty
+  r.mime_from_user = empty
+  if task:has_from('mime') then
+    local mime_from = task:get_from({ 'mime', 'orig' })[1]
+    if mime_from then
+      r.mime_from_domain = mime_from['domain']:lower()
+      r.mime_from_user = mime_from['user']:lower()
+    end
   end
 
-  local syminf = task:get_symbols_all()
-  r.symbols = syminf
+  local settings_id = task:get_settings_id()
+  if settings_id then
+    -- Convert to string
+    local lua_settings = require "lua_settings"
+    settings_id = lua_settings.settings_by_id(settings_id)
+    if settings_id then
+      settings_id = settings_id.name
+    end
+  end
+  if not settings_id then
+    settings_id = empty
+  end
+  r.settings_id = settings_id
+
   r.asn = {}
   local pool = task:get_mempool()
-  r.asn.country = pool:get_variable("country") or 'unknown'
+  r.asn.country = pool:get_variable("country") or empty
   r.asn.asn = pool:get_variable("asn") or 0
-  r.asn.ipnet = pool:get_variable("ipnet") or 'unknown'
+  r.asn.ipnet = pool:get_variable("ipnet") or '::/128'
 
   local function process_header(name)
     local hdr = task:get_header_full(name)
+    local headers_text_ignore_above = settings['index_template']['headers_text_ignore_above'] - 3
     if hdr then
       local l = {}
       for _, h in ipairs(hdr) do
         table.insert(l, h.decoded)
       end
+      if #l > headers_text_ignore_above and headers_text_ignore_above ~= -3 then
+        l = l:sub(1, headers_text_ignore_above) .. '...'
+      end
       return l
     else
-      return 'unknown'
+      return empty
     end
   end
 
-  r.header_from = process_header('from')
-  r.header_to = process_header('to')
-  r.header_subject = process_header('subject')
-  r.header_date = process_header('date')
-  r.message_id = task:get_message_id()
-  local hname = task:get_hostname() or 'unknown'
-  r.hostname = hname
-
-  local settings_id = task:get_settings_id()
-
-  if settings_id then
-    -- Convert to string
-    settings_id = lua_settings.settings_by_id(settings_id)
-
-    if settings_id then
-      settings_id = settings_id.name
+  for _, header in ipairs(settings['collect_headers']) do
+    local header_name = get_header_name(header)
+    if not r[header_name] then
+      r[header_name] = process_header(header)
     end
   end
 
-  if not settings_id then
-    settings_id = ''
+  for _, header in ipairs(settings['extra_collect_headers']) do
+    local header_name = get_header_name(header)
+    if not r[header_name] then
+      r[header_name] = process_header(header)
+    end
   end
 
-  r.settings_id = settings_id
-
   local scan_real = task:get_scan_time()
   scan_real = math.floor(scan_real * 1000)
   if scan_real < 0 then
@@ -239,91 +635,295 @@ local function get_general_metadata(task)
         scan_real)
     scan_real = 0
   end
-
   r.scan_time = scan_real
 
+  local parts = task:get_text_parts()
+  local lang_t = {}
+  if parts then
+    for _, part in ipairs(parts) do
+        local l = part:get_language()
+        if l and not contains(lang_t, l) then
+          table.insert(lang_t, l)
+        end
+    end
+    if table.getn(lang_t) > 0 then
+      r.language = lang_t
+    else
+      r.language = empty
+    end
+    if table.getn(lang_t) == 1 and lang_t[1] == 'en' then
+      r.non_en = false
+    else
+      r.non_en = true
+    end
+  end
+
+  local fuzzy_hashes = task:get_mempool():get_variable('fuzzy_hashes', 'fstrings')
+  if fuzzy_hashes and #fuzzy_hashes > 0 then
+    local l = {}
+    for _, h in ipairs(fuzzy_hashes) do
+      table.insert(l, h)
+    end
+    r.fuzzy_hashes = l
+  else
+    r.fuzzy_hashes = empty
+  end
+
+  r.received_delay = 0
+  if user then -- calculate received_delay only for incoming traffic
+    local recieved_hop = 2
+    local received_headers = task:get_received_headers()
+    if received_headers[recieved_hop] then
+      if received_headers[recieved_hop]['timestamp'] then
+        r.received_delay = math.floor(rspamd_util.get_time()) - received_headers[recieved_hop]['timestamp']
+        if r.received_delay < 0 then
+          r.received_delay = 0
+        end
+      end
+    end
+  end
+
   return r
 end
 
 local function elastic_collect(task)
-  if not enabled then
-    return
-  end
   if task:has_flag('skip') then
     return
   end
+
   if not settings.allow_local and lua_util.is_rspamc_or_controller(task) then
     return
   end
 
-  local row = { ['rspamd_meta'] = get_general_metadata(task),
-                ['@timestamp'] = tostring(util.get_time() * 1000) }
-  table.insert(rows, row)
-  nrows = nrows + 1
-  if nrows > settings['limit'] then
-    lua_util.debugm(N, task, 'send elastic search rows: %s', nrows)
-    if elastic_send_data(task) then
-      nrows = 0
-      rows = {}
-      failed_sends = 0;
-    else
-      failed_sends = failed_sends + 1
-
-      if failed_sends > settings.max_fail then
-        rspamd_logger.errx(task, 'cannot send %s rows to ES %s times, stop trying',
-            nrows, failed_sends)
-        nrows = 0
-        rows = {}
-        failed_sends = 0;
-      end
+  if not detected_distro['supported'] then
+    if buffer['logs']:length() >= settings['limits']['max_rows'] then
+      buffer['logs']:pop_first(settings['limits']['max_rows'])
+      rspamd_logger.errx(task,
+        'elastic distro not supported, deleting %s logs from buffer due to reaching max rows limit',
+        settings['limits']['max_rows'])
     end
   end
+
+  local now = tostring(rspamd_util.get_time() * 1000)
+  local row = { ['rspamd_meta'] = get_general_metadata(task), ['@timestamp'] = now }
+  buffer['logs']:push(row)
+  rspamd_logger.debugm(N, task, 'saved log to buffer')
 end
 
-local opts = rspamd_config:get_all_opt('elastic')
+local function periodic_send_data(cfg, ev_base)
+  local now = tostring(rspamd_util.get_time() * 1000)
+  local flush_needed = false
 
-local function check_elastic_server(cfg, ev_base, _)
+
+  local nlogs_total = buffer['logs']:length()
+  if nlogs_total >= settings['limits']['max_rows'] then
+    rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max rows: %s/%s', nlogs_total, settings['limits']['max_rows'])
+    flush_needed = true
+  else
+    local first_row = buffer['logs']:get(1)
+    if first_row then
+      local time_diff = now - first_row['@timestamp']
+      if time_diff > settings.limits.max_interval * 1000 then
+        rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max interval, diff: %s, current time: %s, log timestamp: %s',
+          time_diff, now, first_row['@timestamp'])
+        flush_needed = true
+      else
+        local size = buffer['logs']:size()
+        if size >= settings['limits']['max_size'] then
+          rspamd_logger.infox(rspamd_config, 'flushing buffer by reaching max size: %s/%s', size, settings['limits']['max_size'])
+          flush_needed = true
+        end
+      end
+    end
+  end
+
+  if flush_needed then
+    elastic_send_data(false, nil, cfg, ev_base)
+  end
+end
+
+local function configure_geoip_pipeline(cfg, ev_base)
   local upstream = settings.upstream:get_upstream_round_robin()
+  local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
   local ip_addr = upstream:get_addr():to_string(true)
-  local plugins_url = connect_prefix .. ip_addr .. '/_nodes/' .. ingest_geoip_type
+  local geoip_url = connect_prefix .. ip_addr .. '/_ingest/pipeline/' .. settings['geoip']['pipeline_name']
+  local geoip_pipeline = {
+    description = "Add geoip info for rspamd",
+    processors = {
+      {
+        geoip = {
+          field = "rspamd_meta.ip",
+          target_field = "rspamd_meta.geoip"
+        }
+      },
+      {
+        geoip = {
+          field = "rspamd_meta.sender_ip",
+          target_field = "rspamd_meta.sender_geoip"
+        }
+      }
+    }
+  }
+
+  local function geoip_cb(err, code, body, _)
+    if err then
+      rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', geoip_url, err)
+      upstream:fail()
+    elseif code == 200 then
+      states['geoip_pipeline']['configured'] = true
+      upstream:ok()
+    else
+      rspamd_logger.errx(rspamd_config,
+        'cannot configure elastic geoip pipeline (%s), status code: %s, response: %s',
+        geoip_url, code, body)
+      upstream:fail()
+      handle_error('configure', 'geoip_pipeline', settings['limits']['max_fail'])
+    end
+  end
+
+  rspamd_http.request({
+    url = geoip_url,
+    ev_base = ev_base,
+    config = cfg,
+    callback = geoip_cb,
+    headers = {
+      ['Host'] = host,
+      ['Content-Type'] = 'application/json',
+    },
+    body = ucl.to_format(geoip_pipeline, 'json-compact'),
+    method = 'put',
+    gzip = settings.use_gzip,
+    keepalive = settings.use_keepalive,
+    no_ssl_verify = settings.no_ssl_verify,
+    user = settings.user,
+    password = settings.password,
+    timeout = settings.timeout,
+  })
+end
+
+local function put_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json)
   local function http_callback(err, code, body, _)
-    if code == 200 then
-      local parser = ucl.parser()
-      local res, ucl_err = parser:parse_string(body)
-      if not res then
-        rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s',
-            plugins_url, ucl_err)
-        enabled = false;
-        return
-      end
-      local obj = parser:get_object()
-      for node, value in pairs(obj['nodes']) do
-        local plugin_found = false
-        for _, plugin in pairs(value['plugins']) do
-          if plugin['name'] == 'ingest-geoip' then
-            plugin_found = true
-            lua_util.debugm(N, "ingest-geoip plugin has been found")
+    if err then
+      rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', policy_url, err)
+      upstream:fail()
+    elseif code == 200 or code == 201 then
+      rspamd_logger.infox(rspamd_config, 'successfully updated elastic index policy: %s', body)
+      states['index_policy']['configured'] = true
+      upstream:ok()
+    else
+      rspamd_logger.errx(rspamd_config, 'cannot configure elastic index policy (%s), status code: %s, response: %s', policy_url, code, body)
+      upstream:fail()
+      handle_error('configure', 'index_policy', settings['limits']['max_fail'])
+    end
+  end
+
+  rspamd_http.request({
+    url = policy_url,
+    ev_base = ev_base,
+    config = cfg,
+    body = index_policy_json,
+    headers = {
+      ['Host'] = host,
+      ['Content-Type'] = 'application/json',
+    },
+    method = 'put',
+    callback = http_callback,
+    gzip = settings.use_gzip,
+    keepalive = settings.use_keepalive,
+    no_ssl_verify = settings.no_ssl_verify,
+    user = settings.user,
+    password = settings.password,
+    timeout = settings.timeout,
+  })
+end
+
+local function get_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json)
+  local function http_callback(err, code, body, _)
+    if err then
+      rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', policy_url, err)
+      upstream:fail()
+    elseif code == 404 then
+      put_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json)
+    elseif code == 200 then
+      local remote_policy_parser = ucl.parser()
+      local our_policy_parser = ucl.parser()
+      local res, ucl_err = remote_policy_parser:parse_string(body)
+      if not ucl_err and res then
+        local res, ucl_err = our_policy_parser:parse_string(index_policy_json)
+        if not ucl_err and res then
+          local remote_policy = remote_policy_parser:get_object()
+          local our_policy = our_policy_parser:get_object()
+          local update_needed = false
+          if detected_distro['name'] == 'elastic' then
+            local index_policy_name = settings['index_policy']['name']
+            local current_phases = safe_get(remote_policy, index_policy_name, 'policy', 'phases')
+            if not deep_compare(our_policy['policy']['phases'], current_phases) then
+              update_needed = true
+            end
+          elseif detected_distro['name'] == 'opensearch' then
+            local current_default_state = safe_get(remote_policy, 'policy', 'default_state')
+            local current_ism_index_patterns = safe_get(remote_policy, 'policy', 'ism_template', 1, 'index_patterns')
+            local current_states = safe_get(remote_policy, 'policy', 'states')
+            if not deep_compare(our_policy['policy']['default_state'], current_default_state) then
+              update_needed = true
+            elseif not deep_compare(our_policy['policy']['ism_template'][1]['index_patterns'], current_ism_index_patterns) then
+              update_needed = true
+            elseif not deep_compare(our_policy['policy']['states'], current_states) then
+              update_needed = true
+            end
           end
+          if update_needed then
+            if detected_distro['name'] == 'elastic' then
+              put_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json)
+            elseif detected_distro['name'] == 'opensearch' then
+              local seq_no = remote_policy['_seq_no']
+              local primary_term = remote_policy['_primary_term']
+              if type(seq_no) == 'number' and type(primary_term) == 'number' then
+                upstream:ok()
+                -- adjust policy url to include seq_no with primary_term
+                -- https://opensearch.org/docs/2.17/im-plugin/ism/api/#update-policy
+                policy_url = policy_url .. '?if_seq_no=' .. seq_no .. '&if_primary_term=' .. primary_term
+                put_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json)
+              else
+                rspamd_logger.errx(rspamd_config,
+                  'current elastic index policy (%s) not returned correct seq_no/primary_term, policy will not be updated, response: %s',
+                  policy_url, body)
+                upstream:fail()
+                handle_error('validate current', 'index_policy', settings['limits']['max_fail'])
+              end
+            end
+          end
+        else
+          rspamd_logger.errx(rspamd_config, 'failed to parse our index policy for elastic: %s', ucl_err)
+          upstream:fail()
+          handle_error('parse our', 'index_policy', settings['limits']['max_fail'])
         end
-        if not plugin_found then
-          rspamd_logger.infox(rspamd_config,
-              'Unable to find ingest-geoip on %1 node, disabling module', node)
-          enabled = false
-          return
-        end
+      else
+        rspamd_logger.errx(rspamd_config, 'failed to parse remote index policy from elastic: %s', ucl_err)
+        upstream:fail()
+        handle_error('parse remote', 'index_policy', settings['limits']['max_fail'])
       end
     else
-      rspamd_logger.errx('cannot get plugins from %s: %s(%s) (%s)', plugins_url,
-          err, code, body)
-      enabled = false
+      rspamd_logger.errx(rspamd_config,
+        'cannot get current elastic index policy (%s), status code: %s, response: %s',
+        policy_url, code, body)
+      handle_error('get current', 'index_policy', settings['limits']['max_fail'])
+      upstream:fail()
     end
   end
+
   rspamd_http.request({
-    url = plugins_url,
+    url = policy_url,
     ev_base = ev_base,
     config = cfg,
+    headers = {
+      ['Host'] = host,
+      ['Content-Type'] = 'application/json',
+    },
     method = 'get',
     callback = http_callback,
+    gzip = settings.use_gzip,
+    keepalive = settings.use_keepalive,
     no_ssl_verify = settings.no_ssl_verify,
     user = settings.user,
     password = settings.password,
@@ -331,165 +931,622 @@ local function check_elastic_server(cfg, ev_base, _)
   })
 end
 
--- import ingest pipeline and kibana dashboard/visualization
-local function initial_setup(cfg, ev_base, worker)
-  if not worker:is_primary_controller() then
-    return
-  end
-
+local function configure_index_policy(cfg, ev_base)
   local upstream = settings.upstream:get_upstream_round_robin()
+  local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
   local ip_addr = upstream:get_addr():to_string(true)
+  local index_policy_path = nil
+  local index_policy = {}
+  if detected_distro['name'] == 'elastic' then
+    index_policy_path = '/_ilm/policy/'
+  elseif detected_distro['name'] == 'opensearch' then
+    index_policy_path = '/_plugins/_ism/policies/'
+  end
+  local policy_url = connect_prefix .. ip_addr .. index_policy_path .. settings['index_policy']['name']
 
-  local function push_kibana_template()
-    -- add kibana dashboard and visualizations
-    if settings['import_kibana'] then
-      local kibana_mappings = read_file(settings['kibana_file'])
-      if kibana_mappings then
-        local parser = ucl.parser()
-        local res, parser_err = parser:parse_string(kibana_mappings)
-        if not res then
-          rspamd_logger.infox(rspamd_config, 'kibana template cannot be parsed: %s',
-              parser_err)
-          enabled = false
-
-          return
-        end
-        local obj = parser:get_object()
-        local tbl = {}
-        for _, item in ipairs(obj) do
-          table.insert(tbl, '{ "index" : { "_index" : ".kibana", "_type" : "doc" ,"_id": "' ..
-              item['_type'] .. ':' .. item["_id"] .. '"} }')
-          table.insert(tbl, ucl.to_format(item['_source'], 'json-compact'))
-        end
-        table.insert(tbl, '') -- For last \n
-
-        local kibana_url = connect_prefix .. ip_addr .. '/.kibana/_bulk'
-        local function kibana_template_callback(err, code, body, _)
-          if code ~= 200 then
-            rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)', kibana_url,
-                err, code, body)
-            enabled = false
-          else
-            lua_util.debugm(N, 'pushed kibana template: %s', body)
-          end
-        end
+  -- ucl.to_format(obj, 'json') can't manage empty {} objects, it will be treat them as [] in json as result,
+  -- so we write {} as '{emty_object}', which allows us to replace '"{emty_object}"' string after convertion to json to '{}'
+  local index_policy_json = ''
 
-        rspamd_http.request({
-          url = kibana_url,
-          ev_base = ev_base,
-          config = cfg,
-          headers = {
-            ['Content-Type'] = 'application/x-ndjson',
+  -- elastic lifecycle policy with hot state
+  if detected_distro['name'] == 'elastic' then
+    index_policy = {
+      policy = {
+        phases = {
+          hot = {
+            min_age = '0ms',
+            actions = {
+              set_priority = {
+                priority = settings['index_policy']['hot']['index_priority'],
+              },
+            },
           },
-          body = table.concat(tbl, "\n"),
-          method = 'post',
-          gzip = settings.use_gzip,
-          callback = kibana_template_callback,
-          no_ssl_verify = settings.no_ssl_verify,
-          user = settings.user,
-          password = settings.password,
-          timeout = settings.timeout,
-        })
-      else
-        rspamd_logger.infox(rspamd_config, 'kibana template file %s not found', settings['kibana_file'])
+        },
+      },
+    }
+    -- elastic lifecycle warm
+    if settings['index_policy']['warm']['enabled'] then
+      local warm_obj = {}
+      warm_obj['min_age'] = settings['index_policy']['warm']['after']
+      warm_obj['actions'] = {
+        set_priority = {
+          priority = settings['index_policy']['warm']['index_priority'],
+        },
+      }
+      if not settings['index_policy']['warm']['migrate'] then
+        warm_obj['actions']['migrate'] = { enabled = false }
+      end
+      if settings['index_policy']['warm']['read_only'] then
+        warm_obj['actions']['readonly'] = '{empty_object}'
       end
+      if settings['index_policy']['warm']['change_replicas'] then
+        warm_obj['actions']['allocate'] = {
+          number_of_replicas = settings['index_policy']['warm']['replicas_count'],
+        }
+      end
+      if settings['index_policy']['warm']['shrink'] then
+        if settings['index_policy']['warm']['max_gb_per_shard'] then
+          warm_obj['actions']['shrink'] = {
+            max_primary_shard_size = settings['index_policy']['warm']['max_gb_per_shard'] .. 'gb',
+          }
+        else
+          warm_obj['actions']['shrink'] = {
+            number_of_shards = settings['index_policy']['warm']['shards_count'],
+          }
+        end
+      end
+      if settings['index_policy']['warm']['force_merge'] then
+        warm_obj['actions']['forcemerge'] = {
+          max_num_segments = settings['index_policy']['warm']['segments_count'],
+        }
+      end
+      index_policy['policy']['phases']['warm'] = warm_obj
     end
-  end
-
-  if enabled then
-    -- create ingest pipeline
-    local geoip_url = connect_prefix .. ip_addr .. '/_ingest/pipeline/rspamd-geoip'
-    local function geoip_cb(err, code, body, _)
-      if code ~= 200 then
-        rspamd_logger.errx('cannot get data from %s: %s(%s) (%s)',
-            geoip_url, err, code, body)
-        enabled = false
+    -- elastic lifecycle cold
+    if settings['index_policy']['cold']['enabled'] then
+      local cold_obj = {}
+      cold_obj['min_age'] = settings['index_policy']['cold']['after']
+      cold_obj['actions'] = {
+        set_priority = {
+          priority = settings['index_policy']['cold']['index_priority'],
+        },
+      }
+      if not settings['index_policy']['cold']['migrate'] then
+        cold_obj['actions']['migrate'] = { enabled = false }
+      end
+      if settings['index_policy']['cold']['read_only'] then
+        cold_obj['actions']['readonly'] = '{empty_object}'
+      end
+      if settings['index_policy']['cold']['change_replicas'] then
+        cold_obj['actions']['allocate'] = {
+          number_of_replicas = settings['index_policy']['cold']['replicas_count'],
+        }
+      end
+      index_policy['policy']['phases']['cold'] = cold_obj
+    end
+    -- elastic lifecycle delete
+    if settings['index_policy']['delete']['enabled'] then
+      local delete_obj = {}
+      delete_obj['min_age'] = settings['index_policy']['delete']['after']
+      delete_obj['actions'] = {
+        delete = { delete_searchable_snapshot = true },
+      }
+      index_policy['policy']['phases']['delete'] = delete_obj
+    end
+  -- opensearch state policy with hot state
+  elseif detected_distro['name'] == 'opensearch' then
+    local retry = {
+      count = 3,
+      backoff = 'exponential',
+      delay = '1m',
+    }
+    index_policy = {
+      policy = {
+        description = 'Rspamd index state policy',
+        ism_template = {
+          {
+            index_patterns = { settings['index_template']['name'] .. '-*' },
+            priority = 100,
+          },
+        },
+        default_state = 'hot',
+        states = {
+          {
+            name = 'hot',
+            actions = {
+              {
+                index_priority = {
+                  priority = settings['index_policy']['hot']['index_priority'],
+                },
+                retry = retry,
+              },
+            },
+            transitions = {},
+          },
+        },
+      },
+    }
+    local state_id = 1 -- includes hot state
+    -- opensearch state warm
+    if settings['index_policy']['warm']['enabled'] then
+      local prev_state_id = state_id
+      state_id = state_id + 1
+      index_policy['policy']['states'][prev_state_id]['transitions'] = {
+        {
+          state_name = 'warm',
+          conditions = {
+            min_index_age = settings['index_policy']['warm']['after']
+          },
+        },
+      }
+      local warm_obj = {
+        name = 'warm',
+        actions = {
+          {
+            index_priority = {
+              priority = settings['index_policy']['warm']['index_priority'],
+            },
+            retry = retry,
+          },
+        },
+        transitions = {},
+      }
+      table.insert(index_policy['policy']['states'], warm_obj)
+      if settings['index_policy']['warm']['read_only'] then
+        local read_only = {
+          read_only = '{empty_object}',
+          retry = retry,
+        }
+        table.insert(index_policy['policy']['states'][state_id]['actions'], read_only)
+      end
+      if settings['index_policy']['warm']['change_replicas'] then
+        local change_replicas = {
+          replica_count = {
+            number_of_replicas = settings['index_policy']['warm']['replicas_count'],
+          },
+          retry = retry,
+        }
+        table.insert(index_policy['policy']['states'][state_id]['actions'], change_replicas)
+      end
+      if settings['index_policy']['warm']['shrink'] then
+        local shrink = {
+          shrink = {},
+          retry = retry,
+        }
+        if settings['index_policy']['warm']['max_gb_per_shard'] then
+          shrink['shrink']['max_shard_size'] = settings['index_policy']['warm']['max_gb_per_shard'] .. 'gb'
+        else
+          shrink['shrink']['num_new_shards'] = settings['index_policy']['warm']['shards_count']
+        end
+        shrink['shrink']['switch_aliases'] = false
+        table.insert(index_policy['policy']['states'][state_id]['actions'], shrink)
+      end
+      if settings['index_policy']['warm']['force_merge'] then
+        local force_merge = {
+          force_merge = {
+            max_num_segments = settings['index_policy']['warm']['segments_count'],
+          },
+          retry = retry,
+        }
+        table.insert(index_policy['policy']['states'][state_id]['actions'], force_merge)
       end
     end
-    local template = {
-      description = "Add geoip info for rspamd",
-      processors = {
+    -- opensearch state cold
+    if settings['index_policy']['cold']['enabled'] then
+      local prev_state_id = state_id
+      state_id = state_id + 1
+      index_policy['policy']['states'][prev_state_id]['transitions'] = {
         {
-          geoip = {
-            field = "rspamd_meta.ip",
-            target_field = "rspamd_meta.geoip"
-          }
+          state_name = 'cold',
+          conditions = {
+            min_index_age = settings['index_policy']['cold']['after']
+          },
+        },
+      }
+      local cold_obj = {
+        name = 'cold',
+        actions = {
+          {
+            index_priority = {
+              priority = settings['index_policy']['cold']['index_priority'],
+            },
+            retry = retry,
+          },
+        },
+        transitions = {},
+      }
+      table.insert(index_policy['policy']['states'], cold_obj)
+      if settings['index_policy']['cold']['read_only'] then
+        local read_only = {
+          read_only = '{empty_object}',
+          retry = retry,
         }
+        table.insert(index_policy['policy']['states'][state_id]['actions'], read_only)
+      end
+      if settings['index_policy']['cold']['change_replicas'] then
+        local change_replicas = {
+          replica_count = {
+            number_of_replicas = settings['index_policy']['cold']['replicas_count'],
+          },
+          retry = retry,
+        }
+        table.insert(index_policy['policy']['states'][state_id]['actions'], change_replicas)
+      end
+    end
+    -- opensearch state delete
+    if settings['index_policy']['delete']['enabled'] then
+      local prev_state_id = state_id
+      state_id = state_id + 1
+      index_policy['policy']['states'][prev_state_id]['transitions'] = {
+        {
+          state_name = 'delete',
+          conditions = {
+            min_index_age = settings['index_policy']['delete']['after']
+          },
+        },
       }
-    }
-    rspamd_http.request({
-      url = geoip_url,
-      ev_base = ev_base,
-      config = cfg,
-      callback = geoip_cb,
-      headers = {
-        ['Content-Type'] = 'application/json',
+      local delete_obj = {
+        name = 'delete',
+        actions = {
+          {
+            delete = '{empty_object}',
+            retry = retry,
+          },
+        },
+        transitions = {},
+      }
+      table.insert(index_policy['policy']['states'], delete_obj)
+    end
+  end
+
+  -- finish rendering index policy, will now get current version and update it if neeeded
+  index_policy_json = ucl.to_format(index_policy, 'json-compact'):gsub('"{empty_object}"', '{}')
+  get_index_policy(cfg, ev_base, upstream, host, policy_url, index_policy_json)
+end
+
+local function configure_index_template(cfg, ev_base)
+  local upstream = settings.upstream:get_upstream_round_robin()
+  local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
+  local ip_addr = upstream:get_addr():to_string(true)
+  local template_url = connect_prefix .. ip_addr .. '/_index_template/' .. settings['index_template']['name']
+
+  -- common data types
+  local t_boolean_nil_true = { type = 'boolean', null_value = true }
+  local t_boolean_nil_false = { type = 'boolean', null_value = false }
+  local t_date = { type = 'date' }
+  local t_long = { type = 'long', null_value = 0 }
+  local t_float = { type = 'float', null_value = 0 }
+  local t_double = { type = 'double', null_value = 0 }
+  local t_ip = { type = 'ip', null_value = '::' }
+  local t_geo_point = { type = 'geo_point' }
+  local t_keyword = { type = 'keyword', null_value = settings['index_template']['empty_value'] }
+  local t_text = { type = 'text' }
+  local t_text_with_keyword = {
+    type = 'text',
+    fields = {
+      keyword = {
+        type = 'keyword',
+        ignore_above = settings['index_template']['dynamic_keyword_ignore_above'],
       },
-      gzip = settings.use_gzip,
-      body = ucl.to_format(template, 'json-compact'),
-      method = 'put',
-      no_ssl_verify = settings.no_ssl_verify,
-      user = settings.user,
-      password = settings.password,
-      timeout = settings.timeout,
-    })
-    -- create template mappings if not exist
-    local template_url = connect_prefix .. ip_addr .. '/_template/rspamd'
-    local function http_template_put_callback(err, code, body, _)
-      if code ~= 200 then
-        rspamd_logger.errx('cannot put template to %s: %s(%s) (%s)',
-            template_url, err, code, body)
-        enabled = false
+    },
+  }
+
+  -- common objects types
+  local geoip_obj = {
+    dynamic = false,
+    type = 'object',
+    properties = {
+      continent_name = t_text,
+      region_iso_code = t_keyword,
+      city_name = t_text,
+      country_iso_code = t_keyword,
+      country_name = t_text,
+      location = t_geo_point,
+      region_name = t_text,
+    },
+  }
+  local asn_obj = {
+    dynamic = false,
+    type = 'object',
+    properties = {
+      country = t_keyword,
+      asn = t_long,
+      ipnet = t_keyword, -- do not use ip_range type, it's not usable for search
+    },
+  }
+  local symbols_obj = {
+    dynamic = false,
+    type = 'object',
+    properties = {
+      name = t_keyword,
+      group = t_keyword,
+      options = t_text_with_keyword,
+      score = t_double,
+      weight = t_double,
+    },
+  }
+  if settings['index_template']['symbols_nested'] then
+    symbols_obj['type'] = 'nested'
+  end
+
+  -- dynamic templates
+  local dynamic_templates_obj = {}
+  local dynamic_strings = {
+    strings = {
+      match_mapping_type = 'string',
+      mapping = {
+        type = 'text',
+        fields = {
+          keyword = {
+            type = 'keyword',
+            ignore_above = settings['index_template']['dynamic_keyword_ignore_above'],
+          },
+        },
+      },
+    },
+  }
+  table.insert(dynamic_templates_obj, dynamic_strings)
+
+  -- index template rendering
+  local index_template = {
+    index_patterns = { settings['index_template']['name'] .. '-*', },
+    priority = settings['index_template']['priority'],
+    template = {
+      settings = {
+        index = {
+          number_of_shards = settings['index_template']['shards_count'],
+          number_of_replicas = settings['index_template']['replicas_count'],
+          refresh_interval = settings['index_template']['refresh_interval'] .. 's',
+        },
+      },
+      mappings = {
+        dynamic = false,
+        dynamic_templates = dynamic_templates_obj,
+        properties = {
+          ['@timestamp'] = t_date,
+          rspamd_meta = {
+            dynamic = true,
+            type = 'object',
+            properties = {
+              rspamd_server = t_keyword,
+              action = t_keyword,
+              score = t_double,
+              symbols = symbols_obj,
+              user = t_keyword,
+              direction = t_keyword,
+              qid = t_keyword,
+              helo = t_text_with_keyword,
+              hostname = t_text_with_keyword,
+              ip = t_ip,
+              is_local = t_boolean_nil_false,
+              sender_ip = t_ip,
+              message_id = t_text_with_keyword,
+              rcpt = t_text_with_keyword,
+              from_domain = t_keyword,
+              from_user = t_keyword,
+              mime_from_domain = t_keyword,
+              mime_from_user = t_keyword,
+              settings_id = t_keyword,
+              asn = asn_obj,
+              scan_time = t_float,
+              language = t_text,
+              non_en = t_boolean_nil_true,
+              fuzzy_hashes = t_text,
+              received_delay = t_long,
+            },
+          },
+        },
+      },
+    },
+  }
+
+  -- render index lifecycle policy
+  if detected_distro['name'] == 'elastic' and settings['index_policy']['enabled'] then
+    index_template['template']['settings']['index']['lifecycle'] = {
+      name = settings['index_policy']['name']
+    }
+  end
+
+  -- render geoip mappings
+  if settings['geoip']['enabled'] then
+    index_template['template']['mappings']['properties']['rspamd_meta']['properties']['geoip'] = geoip_obj
+    index_template['template']['mappings']['properties']['rspamd_meta']['properties']['sender_geoip'] = geoip_obj
+  end
+
+  -- render collect_headers and extra_collect_headers mappings
+  for _, header in ipairs(settings['collect_headers']) do
+    local header_name = get_header_name(header)
+    if not index_template['template']['mappings']['properties']['rspamd_meta']['properties'][header_name] then
+      index_template['template']['mappings']['properties']['rspamd_meta']['properties'][header_name] = t_text_with_keyword
+    end
+  end
+  for _, header in ipairs(settings['extra_collect_headers']) do
+    local header_name = get_header_name(header)
+    if not index_template['template']['mappings']['properties']['rspamd_meta']['properties'][header_name] then
+      index_template['template']['mappings']['properties']['rspamd_meta']['properties'][header_name] = t_text_with_keyword
+    end
+  end
+
+  local function http_callback(err, code, body, _)
+    if err then
+      rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', template_url, err)
+      upstream:fail()
+    elseif code == 200 then
+      rspamd_logger.infox(rspamd_config, 'successfully updated elastic index template: %s', body)
+      states['index_template']['configured'] = true
+      upstream:ok()
+    else
+      rspamd_logger.errx(rspamd_config, 'cannot configure elastic index template (%s), status code: %s, response: %s',
+        template_url, code, body)
+      upstream:fail()
+      handle_error('configure', 'index_template', settings['limits']['max_fail'])
+    end
+  end
+
+  rspamd_http.request({
+    url = template_url,
+    ev_base = ev_base,
+    config = cfg,
+    body = ucl.to_format(index_template, 'json-compact'),
+    headers = {
+      ['Host'] = host,
+      ['Content-Type'] = 'application/json',
+    },
+    method = 'put',
+    callback = http_callback,
+    gzip = settings.use_gzip,
+    keepalive = settings.use_keepalive,
+    no_ssl_verify = settings.no_ssl_verify,
+    user = settings.user,
+    password = settings.password,
+    timeout = settings.timeout,
+  })
+end
+
+local function verify_distro(manual)
+  local detected_distro_name = detected_distro['name']
+  local detected_distro_version = detected_distro['version']
+  local valid = true
+  local valid_unknown = false
+
+  -- check that detected_distro_name is valid
+  if not detected_distro_name then
+    rspamd_logger.errx(rspamd_config, 'failed to detect elastic distribution')
+    valid = false
+  elseif not supported_distro[detected_distro_name] then
+    rspamd_logger.errx(rspamd_config, 'unsupported elastic distribution: %s', detected_distro_name)
+    valid = false
+  else
+    local supported_distro_info = supported_distro[detected_distro_name]
+    -- check that detected_distro_version is valid
+    if not detected_distro_version or type(detected_distro_version) ~= 'string' then
+      rspamd_logger.errx(rspamd_config, 'elastic version should be a string, but we received: %s', type(detected_distro_version))
+      valid = false
+    elseif detected_distro_version == '' then
+      rspamd_logger.errx(rspamd_config, 'unsupported elastic version: empty string')
+      valid = false
+    else
+      -- compare versions using compare_versions
+      local cmp_from = compare_versions(detected_distro_version, supported_distro_info['from'])
+      if cmp_from == -1 then
+        rspamd_logger.errx(rspamd_config, 'unsupported elastic version: %s, minimal supported version of %s is %s',
+          detected_distro_version, detected_distro_name, supported_distro_info['from'])
+        valid = false
       else
-        lua_util.debugm(N, 'pushed rspamd template: %s', body)
-        push_kibana_template()
+        local cmp_till = compare_versions(detected_distro_version, supported_distro_info['till'])
+        if (cmp_till >= 0) and not supported_distro_info['till_unknown'] then
+          rspamd_logger.errx(rspamd_config, 'unsupported elastic version: %s, maximum supported version of %s is less than %s',
+            detected_distro_version, detected_distro_name, supported_distro_info['till'])
+          valid = false
+        elseif (cmp_till >= 0) and supported_distro_info['till_unknown'] then
+          rspamd_logger.warnx(rspamd_config,
+            'compatibility of elastic version: %s is unknown, maximum known supported version of %s is less than %s, use at your own risk',
+            detected_distro_version, detected_distro_name, supported_distro_info['till'])
+          valid_unknown = true
+        end
       end
     end
-    local function http_template_exist_callback(_, code, _, _)
-      if code ~= 200 then
-        rspamd_http.request({
-          url = template_url,
-          ev_base = ev_base,
-          config = cfg,
-          body = elastic_template,
-          method = 'put',
-          headers = {
-            ['Content-Type'] = 'application/json',
-          },
-          gzip = settings.use_gzip,
-          callback = http_template_put_callback,
-          no_ssl_verify = settings.no_ssl_verify,
-          user = settings.user,
-          password = settings.password,
-          timeout = settings.timeout,
-        })
+  end
+
+  if valid_unknown then
+    detected_distro['supported'] = true
+  else
+    if valid and manual then
+      rspamd_logger.infox(
+        rspamd_config, 'assuming elastic distro: %s, version: %s', detected_distro_name, detected_distro_version)
+      detected_distro['supported'] = true
+    elseif valid and not manual then
+      rspamd_logger.infox(rspamd_config, 'successfully connected to elastic distro: %s, version: %s',
+        detected_distro_name, detected_distro_version)
+      detected_distro['supported'] = true
+    else
+      handle_error('configure','distro',settings['version']['autodetect_max_fail'])
+    end
+  end
+end
+
+local function configure_distro(cfg, ev_base)
+  if not settings['version']['autodetect_enabled'] then
+    detected_distro['name'] = settings['version']['override']['name']
+    detected_distro['version'] = settings['version']['override']['version']
+    rspamd_logger.infox(rspamd_config, 'automatic detection of elastic distro and version is disabled, taking configuration from settings')
+    verify_distro(true)
+  end
+
+  local upstream = settings.upstream:get_upstream_round_robin()
+  local host = upstream:get_name():gsub(":[1-9][0-9]*$", "")
+  local ip_addr = upstream:get_addr():to_string(true)
+  local root_url = connect_prefix .. ip_addr .. '/'
+  local function http_callback(err, code, body, _)
+    if err then
+      rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s): %s', root_url, err)
+      upstream:fail()
+    elseif code ~= 200 then
+      rspamd_logger.errx(rspamd_config, 'cannot connect to elastic (%s), status code: %s, response: %s', root_url, code, body)
+      upstream:fail()
+    else
+      local parser = ucl.parser()
+      local res, ucl_err = parser:parse_string(body)
+      if not res then
+        rspamd_logger.errx(rspamd_config, 'failed to parse reply from elastic (%s): %s', root_url, ucl_err)
+        upstream:fail()
       else
-        push_kibana_template()
+        local obj = parser:get_object()
+        if obj['tagline'] == "The OpenSearch Project: https://opensearch.org/" then
+            detected_distro['name'] = 'opensearch'
+        end
+        if obj['tagline'] == "You Know, for Search" then
+            detected_distro['name'] = 'elastic'
+        end
+        if obj['version'] then
+          if obj['version']['number'] then
+            detected_distro['version'] = obj['version']['number']
+          end
+          if not detected_distro['name'] and obj['version']['distribution'] then
+            detected_distro['name'] = obj['version']['distribution']
+          end
+        end
+        verify_distro()
+        if detected_distro['supported'] then
+          upstream:ok()
+        end
       end
     end
+  end
 
+  if settings['version']['autodetect_enabled'] then
     rspamd_http.request({
-      url = template_url,
+      url = root_url,
       ev_base = ev_base,
       config = cfg,
-      method = 'head',
-      callback = http_template_exist_callback,
+      headers = {
+        ['Host'] = host,
+        ['Content-Type'] = 'application/json',
+      },
+      method = 'get',
+      callback = http_callback,
+      gzip = settings.use_gzip,
+      keepalive = settings.use_keepalive,
       no_ssl_verify = settings.no_ssl_verify,
       user = settings.user,
       password = settings.password,
       timeout = settings.timeout,
     })
-
   end
 end
 
-redis_params = rspamd_redis.parse_redis_server('elastic')
+local opts = rspamd_config:get_all_opt('elastic')
 
-if redis_params and opts then
-  for k, v in pairs(opts) do
+if opts then
+  for k,v in pairs(opts) do
     settings[k] = v
   end
 
+  if not settings['enabled'] then
+    rspamd_logger.infox(rspamd_config, 'module disabled in config')
+    lua_util.disable_module(N, "config")
+  end
+
   if not settings['server'] and not settings['servers'] then
     rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module')
     lua_util.disable_module(N, "config")
@@ -498,29 +1555,10 @@ if redis_params and opts then
       connect_prefix = 'https://'
     end
 
-    if settings.ingest_module then
-      ingest_geoip_type = 'modules'
-    end
-
-    settings.upstream = upstream_list.create(rspamd_config,
-        settings['server'] or settings['servers'], 9200)
+    settings.upstream = upstream_list.create(rspamd_config, settings['server'] or settings['servers'], 9200)
 
     if not settings.upstream then
-      rspamd_logger.errx('cannot parse elastic address: %s',
-          settings['server'] or settings['servers'])
-      lua_util.disable_module(N, "config")
-      return
-    end
-    if not settings['template_file'] then
-      rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module')
-      lua_util.disable_module(N, "config")
-      return
-    end
-
-    elastic_template = read_file(settings['template_file']);
-    if not elastic_template then
-      rspamd_logger.infox(rspamd_config, 'elastic unable to read %s, disabling module',
-          settings['template_file'])
+      rspamd_logger.errx(rspamd_config, 'cannot parse elastic address: %s', settings['server'] or settings['servers'])
       lua_util.disable_module(N, "config")
       return
     end
@@ -533,12 +1571,79 @@ if redis_params and opts then
       augmentations = { string.format("timeout=%f", settings.timeout) },
     })
 
+    -- send tail of data if worker going to stop
+    rspamd_config:register_finish_script(function(task)
+      local nlogs_total = buffer['logs']:length()
+      if nlogs_total > 0 then
+        rspamd_logger.debugm(N, task, 'flushing buffer on shutdown, buffer size: %s', nlogs_total)
+        elastic_send_data(true, task)
+      end
+    end)
+
     rspamd_config:add_on_load(function(cfg, ev_base, worker)
       if worker:is_scanner() then
-        check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements
-        initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations
+        rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base)
+          if not detected_distro['supported'] then
+            if states['distro']['configured'] then
+              return false -- stop running periodic job
+            else
+              configure_distro(cfg, ev_base)
+              return true -- continue running periodic job
+            end
+          end
+        end)
+        -- send data periodically if any of limits reached
+        rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base)
+          if detected_distro['supported'] then
+            periodic_send_data(cfg, ev_base)
+          end
+          return true
+        end)
+      end
+      if worker:is_primary_controller() then
+        rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base)
+          if not settings['index_template']['managed'] then
+            return false
+          elseif not detected_distro['supported'] then
+            return true
+          else
+            if states['index_template']['configured'] then
+              return false
+            else
+              configure_index_template(cfg, ev_base)
+              return true
+            end
+          end
+        end)
+        rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base)
+          if not settings['index_policy']['enabled'] or not settings['index_policy']['managed'] then
+            return false
+          elseif not detected_distro['supported'] then
+            return true
+          else
+            if states['index_policy']['configured'] then
+              return false
+            else
+              configure_index_policy(cfg, ev_base)
+              return true
+            end
+          end
+        end)
+        rspamd_config:add_periodic(ev_base, settings.periodic_interval, function(cfg, ev_base)
+          if not settings['geoip']['enabled'] or not settings['geoip']['managed'] then
+            return false
+          elseif not detected_distro['supported'] then
+            return true
+          else
+            if states['geoip_pipeline']['configured'] then
+              return false
+            else
+              configure_geoip_pipeline(cfg, ev_base)
+              return true
+            end
+          end
+        end)
       end
     end)
   end
-
 end