diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-02-18 15:46:58 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-02-18 15:46:58 +0000 |
commit | 9ef70a3754121b2161e440fa295963dbac4269bc (patch) | |
tree | 882d37a7d03f7a836fc3064e4de6f31add6453a6 /src | |
parent | f1becea682b563d366add102cd88431f15182b26 (diff) | |
download | rspamd-9ef70a3754121b2161e440fa295963dbac4269bc.tar.gz rspamd-9ef70a3754121b2161e440fa295963dbac4269bc.zip |
[Feature] Preliminary import of the elasticsearch module
Diffstat (limited to 'src')
-rw-r--r-- | src/plugins/lua/elastic.lua | 345 |
1 files changed, 345 insertions, 0 deletions
diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua new file mode 100644 index 000000000..f3bf69ead --- /dev/null +++ b/src/plugins/lua/elastic.lua @@ -0,0 +1,345 @@ +--[[ +Copyright (c) 2017, Veselin Iordanov +Copyright (c) 2018, Vsevolod Stakhov <vsevolod@highsecure.ru> + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +]]-- + +local rspamd_logger = require 'rspamd_logger' +local rspamd_http = require "rspamd_http" +local rspamd_lua_utils = require "lua_util" +local ucl = require "ucl" +local hash = require "rspamd_cryptobox_hash" + +if confighelp then + return +end + +local redis_params +redis_params = rspamd_parse_redis_server('elastic') + +local rows = {} +local nrows = 0 +local elastic_template +local redis_params +local settings = { + limit = 10, + index_pattern = 'rspamd-%Y.%m.%d', + server = 'localhost:9200', + template_file = '/etc/rspamd/rspamd_template.json', + kibana_file = '/etc/rspamd/kibana.json', + key_prefix = 'elastic-', + expire = 3600, + debug = false, + failover = false, + import_kibana = false, +} + +local function read_file(path) + local file = io.open(path, "rb") + if not file then return nil end + local content = file:read "*a" + file:close() + return content +end +local function elastic_send_data(task) + local es_index = os.date(settings['index_pattern']) + local bulk_json = "" + for key,value in pairs(rows) do + bulk_json = bulk_json..'{ "index" : { "_index" : "'..es_index..'", "_type" : "logs" ,"pipeline": "rspamd-geoip"} }'.."\n" + bulk_json = bulk_json..ucl.to_format(value, 'json-compact').."\n" + end + local function http_index_data_callback(err, code, body, headers) + -- todo error handling we may store the rows it into redis and send it again later + if settings['debug'] then + rspamd_logger.infox(task, "After create data %1",body) + end + if code ~= 200 or err_message then + if settings['failover'] then + local h = hash.create() + h:update(bulk_json) + local key = settings['key_prefix'] ..es_index..":".. h:base32():sub(1, 20) + local data = rspamd_util.zstd_compress(bulk_json) + local function redis_set_cb(err) + if err ~=nil then + rspamd_logger.errx(task, 'redis_set_cb received error: %1', err) + end + end + rspamd_redis_make_request(task, + redis_params, -- connect params + key, -- hash key + true, -- is write + redis_set_cb, --callback + 'SETEX', -- command + {key, tostring(settings['expire']), data} -- arguments + ) + end + end + end + rspamd_http.request({ + url = 'http://'..settings['server']..'/'..es_index..'/_bulk', + headers = { + ['Content-Type'] = 'application/x-ndjson', + }, + body = bulk_json, + task = task, + method = 'post', + callback = http_index_data_callback + }) + +end +local function get_general_metadata(task) + local r = {} + local ip_addr = task:get_ip() + r.ip = tostring(ip_addr) or 'unknown' + r.webmail = false + if ip_addr then + r.is_local = ip_addr:is_local() + local origin = task:get_header('X-Originating-IP') + if origin then + r.webmail = true + r.ip = origin + end + end + r.direction = "Inbound" + r.user = task:get_user() or 'unknown' + r.qid = task:get_queue_id() or 'unknown' + r.action = task:get_metric_action('default') + if r.user ~= 'unknown' then + r.direction = "Outbound" + end + local s = task:get_metric_score('default')[1] + r.score = s + + local rcpt = task:get_recipients('smtp') + if rcpt then + local l = {} + for _, a in ipairs(rcpt) do + table.insert(l, a['addr']) + end + r.rcpt = l + else + r.rcpt = 'unknown' + end + local from = task:get_from('smtp') + if ((from or E)[1] or E).addr then + r.from = from[1].addr + else + r.from = 'unknown' + end + local syminf = task:get_symbols_all() + r.symbols = syminf + r.asn = {} + local pool = task:get_mempool() + r.asn.country = pool:get_variable("country") or 'unknown' + r.asn.asn = pool:get_variable("asn") or 0 + r.asn.ipnet = pool:get_variable("ipnet") or 'unknown' + local function process_header(name) + local hdr = task:get_header_full(name) + if hdr then + local l = {} + for _, h in ipairs(hdr) do + table.insert(l, h.decoded) + end + return l + else + return 'unknown' + end + end + r.header_from = process_header('from') + r.header_to = process_header('to') + r.header_subject = process_header('subject') + r.header_date = process_header('date') + r.message_id = task:get_message_id() + return r +end + +local function elastic_collect(task) + if not enabled then return end + if rspamd_lua_utils.is_rspamc_or_controller(task) then return end + local row = {['rspam_meta'] = get_general_metadata(task), ['@timestamp'] = os.time(os.date("*t")).."000"} + table.insert(rows, row) + nrows = nrows + 1 + if nrows > settings['limit'] then + elastic_send_data(task) + nrows = 0 + rows = {} + end +end +local opts = rspamd_config:get_all_opt('elastic') +local enabled = true; + +local function check_elastic_server(ev_base) + local function http_callback(err, code, body, headers) + local parser = ucl.parser() + local res,err = parser:parse_string(body) + if not res then + rspamd_logger.infox(rspamd_config, 'failed to query elasticsearch server %1, disabling module',settings['server']) + enabled = false; + return + end + local obj = parser:get_object() + for node,value in pairs(obj['nodes']) do + local plugin_found = false + for i,plugin in pairs(value['plugins']) do + if plugin['name'] == 'ingest-geoip' then + plugin_found = true + end + end + if not plugin_found then + rspamd_logger.infox(rspamd_config, 'Unable to find ingest-geoip on %1 node, disabling module',node) + enabled = false + return + end + end + end + rspamd_http.request({ + url = 'http://'..settings['server']..'/_nodes/plugins', + ev_base = ev_base, + method = 'get', + callback = http_callback + }) + if enabled then + local function http_ingest_callback(err, code, body, headers) + if code ~= 200 or err_message then + -- pipeline not exist + end + end + rspamd_http.request({ + url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip', + ev_base = ev_base, + method = 'get', + callback = http_ingest_callback + }) + -- lets try to create ingest pipeline if not exist + rspamd_http.request({ + url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip', + task = task, + body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}', + method = 'put', + }) + end + local function http_template_create_callback(err, code, body, headers) + end + local function http_template_exist_callback(err, code, body, headers) + if code ~= 200 or err_message then + rspamd_http.request({ + url = 'http://'..settings['server']..'/_template/rspamd', + task = task, + body = elastic_template, + method = 'put', + callback = http_template_create_callback + }) + end + end + rspamd_http.request({ + url = 'http://'..settings['server']..'/_template/rspamd', + task = task, + method = 'head', + callback = http_template_exist_callback + }) +end +-- import ingest pipeline and kibana dashboard/visualization +local function initial_setup(worker) + if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end + if enabled then + -- create ingest pipeline + rspamd_http.request({ + url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip', + task = task, + body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}', + method = 'put', + }) + -- create template mappings if not exist + local function http_template_exist_callback(err, code, body, headers) + if code ~= 200 or err_message then + rspamd_http.request({ + url = 'http://'..settings['server']..'/_template/rspamd', + task = task, + body = elastic_template, + method = 'put', + }) + end + end + rspamd_http.request({ + url = 'http://'..settings['server']..'/_template/rspamd', + task = task, + method = 'head', + callback = http_template_exist_callback + }) + -- add kibana dashboard and visualizations + local kibana_mappings = read_file(settings['kibana_file']); + if enabled and settings['import_kibana'] then + local kibana_mappings = read_file(settings['kibana_file']); + if kibana_mappings then + local parser = ucl.parser() + local res,err = parser:parse_string(kibana_mappings) + if not res then + return + end + local obj = parser:get_object() + local bulk_json = "" + for _,item in ipairs(obj) do + bulk_json = bulk_json..'{ "index" : { "_index" : ".kibana", "_type" : "'..item["_type"]..'" ,"_id": "'..item["_id"]..'"} }'.."\n" + bulk_json = bulk_json..ucl.to_format(item['_source'], 'json-compact').."\n" + end + rspamd_http.request({ + url = 'http://'..settings['server']..'/.kibana/_bulk', + headers = { + ['Content-Type'] = 'application/x-ndjson', + }, + body = bulk_json, + task = task, + method = 'post' + }) + else + rspamd_logger.infox(rspamd_config, 'kibana templatefile not found') + end + end + end +end +rspamd_config:add_on_load(function(cfg, ev_base,worker) + if opts then + for k,v in pairs(opts) do + settings[k] = v + end + if not settings['server'] then + rspamd_logger.infox(rspamd_config, 'no elastic servers are specified, disabling module') + enabled = false + return + end + if not settings['template_file'] then + rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module') + enabled = false + return + end + end + elastic_template = read_file(settings['template_file']); + if not elastic_template then + rspamd_logger.infox(rspamd_config, 'elastic unable to read mappings, disabling module') + enabled = false + return + end + redis_params = rspamd_parse_redis_server('elastic') + check_elastic_server(ev_base) -- check for elasticsearch requirements + initial_setup(worker) -- import mappings pipeline and visualizations +end) + +if enabled == true then + rspamd_config:register_symbol({ + name = 'ELASTIC_COLLECT', + type = 'postfilter', + callback = elastic_collect, + priority = 10 + }) +end |