diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-02-18 16:43:50 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-02-18 16:43:50 +0000 |
commit | 79f00df258e56deb7fe1957e6886a84e85c1cd6e (patch) | |
tree | 28a84d5891fee4e7e79a5291029a50adab071acf /src/plugins | |
parent | 9ef70a3754121b2161e440fa295963dbac4269bc (diff) | |
download | rspamd-79f00df258e56deb7fe1957e6886a84e85c1cd6e.tar.gz rspamd-79f00df258e56deb7fe1957e6886a84e85c1cd6e.zip |
[Fix] Rework elasticsearch plugin
Diffstat (limited to 'src/plugins')
-rw-r--r-- | src/plugins/lua/dynamic_conf.lua | 2 | ||||
-rw-r--r-- | src/plugins/lua/elastic.lua | 266 |
2 files changed, 152 insertions, 116 deletions
diff --git a/src/plugins/lua/dynamic_conf.lua b/src/plugins/lua/dynamic_conf.lua index 569d00cbd..a4b7527a5 100644 --- a/src/plugins/lua/dynamic_conf.lua +++ b/src/plugins/lua/dynamic_conf.lua @@ -232,7 +232,7 @@ end local section = rspamd_config:get_all_opt("dynamic_conf") if section then - redis_params = rspamd_parse_redis_server('dynamic_conf') + redis_params = rspamd_redis.parse_redis_server('dynamic_conf') if not redis_params then rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module') return diff --git a/src/plugins/lua/elastic.lua b/src/plugins/lua/elastic.lua index f3bf69ead..ae789f0c5 100644 --- a/src/plugins/lua/elastic.lua +++ b/src/plugins/lua/elastic.lua @@ -18,31 +18,34 @@ limitations under the License. local rspamd_logger = require 'rspamd_logger' local rspamd_http = require "rspamd_http" local rspamd_lua_utils = require "lua_util" +local util = require "rspamd_util" local ucl = require "ucl" local hash = require "rspamd_cryptobox_hash" +local rspamd_redis = require "lua_redis" +local upstream_list = require "rspamd_upstream_list" if confighelp then return end -local redis_params -redis_params = rspamd_parse_redis_server('elastic') - local rows = {} local nrows = 0 local elastic_template local redis_params +local N = "elastic" +local E = {} +local connect_prefix = 'http://' +local enabled = true local settings = { limit = 10, index_pattern = 'rspamd-%Y.%m.%d', - server = 'localhost:9200', template_file = '/etc/rspamd/rspamd_template.json', kibana_file = '/etc/rspamd/kibana.json', key_prefix = 'elastic-', expire = 3600, - debug = false, failover = false, import_kibana = false, + use_https = false, } local function read_file(path) @@ -52,30 +55,36 @@ local function read_file(path) file:close() return content end + local function elastic_send_data(task) local es_index = os.date(settings['index_pattern']) - local bulk_json = "" - for key,value in pairs(rows) do - bulk_json = bulk_json..'{ "index" : { "_index" : "'..es_index..'", "_type" : "logs" ,"pipeline": "rspamd-geoip"} }'.."\n" - bulk_json = bulk_json..ucl.to_format(value, 'json-compact').."\n" + local tbl = {} + for _,value in pairs(rows) do + table.insert(tbl, '{ "index" : { "_index" : "'..es_index.. + '", "_type" : "logs" ,"pipeline": "rspamd-geoip"} }') + table.insert(tbl, ucl.to_format(value, 'json-compact')) end - local function http_index_data_callback(err, code, body, headers) - -- todo error handling we may store the rows it into redis and send it again later - if settings['debug'] then - rspamd_logger.infox(task, "After create data %1",body) - end - if code ~= 200 or err_message then + + local upstream = settings.upstream:get_upstream_round_robin() + local ip_addr = upstream:get_addr():to_string(true) + + local push_url = connect_prefix .. ip_addr .. '/'..es_index..'/_bulk' + local bulk_json = table.concat(tbl, "\n") + local function http_index_data_callback(_, code, body, _) + -- todo error handling we may store the rows it into redis and send it again late + rspamd_logger.debugm(N, task, "After create data %1",body) + if code ~= 200 then if settings['failover'] then local h = hash.create() h:update(bulk_json) local key = settings['key_prefix'] ..es_index..":".. h:base32():sub(1, 20) - local data = rspamd_util.zstd_compress(bulk_json) + local data = util.zstd_compress(bulk_json) local function redis_set_cb(err) if err ~=nil then rspamd_logger.errx(task, 'redis_set_cb received error: %1', err) end end - rspamd_redis_make_request(task, + rspamd_redis.make_request(task, redis_params, -- connect params key, -- hash key true, -- is write @@ -87,7 +96,7 @@ local function elastic_send_data(task) end end rspamd_http.request({ - url = 'http://'..settings['server']..'/'..es_index..'/_bulk', + url = push_url, headers = { ['Content-Type'] = 'application/x-ndjson', }, @@ -167,7 +176,8 @@ end local function elastic_collect(task) if not enabled then return end if rspamd_lua_utils.is_rspamc_or_controller(task) then return end - local row = {['rspam_meta'] = get_general_metadata(task), ['@timestamp'] = os.time(os.date("*t")).."000"} + local row = {['rspam_meta'] = get_general_metadata(task), + ['@timestamp'] = tostring(util.get_time()).."000"} table.insert(rows, row) nrows = nrows + 1 if nrows > settings['limit'] then @@ -176,131 +186,139 @@ local function elastic_collect(task) rows = {} end end + + local opts = rspamd_config:get_all_opt('elastic') -local enabled = true; -local function check_elastic_server(ev_base) - local function http_callback(err, code, body, headers) +local function check_elastic_server(cfg, ev_base, _) + local upstream = settings.upstream:get_upstream_round_robin() + local ip_addr = upstream:get_addr():to_string(true) + + local plugins_url = connect_prefix .. ip_addr .. '/_nodes/plugins' + local function http_callback(_, _, body, _) local parser = ucl.parser() local res,err = parser:parse_string(body) if not res then - rspamd_logger.infox(rspamd_config, 'failed to query elasticsearch server %1, disabling module',settings['server']) + rspamd_logger.infox(rspamd_config, 'failed to parse reply from %s: %s', + plugins_url, err) enabled = false; return end local obj = parser:get_object() for node,value in pairs(obj['nodes']) do local plugin_found = false - for i,plugin in pairs(value['plugins']) do + for _,plugin in pairs(value['plugins']) do if plugin['name'] == 'ingest-geoip' then plugin_found = true end end if not plugin_found then - rspamd_logger.infox(rspamd_config, 'Unable to find ingest-geoip on %1 node, disabling module',node) + rspamd_logger.infox(rspamd_config, + 'Unable to find ingest-geoip on %1 node, disabling module', node) enabled = false return end end end rspamd_http.request({ - url = 'http://'..settings['server']..'/_nodes/plugins', + url = plugins_url, ev_base = ev_base, + config = cfg, method = 'get', callback = http_callback }) - if enabled then - local function http_ingest_callback(err, code, body, headers) - if code ~= 200 or err_message then - -- pipeline not exist - end - end - rspamd_http.request({ - url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip', - ev_base = ev_base, - method = 'get', - callback = http_ingest_callback - }) - -- lets try to create ingest pipeline if not exist - rspamd_http.request({ - url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip', - task = task, - body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}', - method = 'put', - }) - end - local function http_template_create_callback(err, code, body, headers) - end - local function http_template_exist_callback(err, code, body, headers) - if code ~= 200 or err_message then - rspamd_http.request({ - url = 'http://'..settings['server']..'/_template/rspamd', - task = task, - body = elastic_template, - method = 'put', - callback = http_template_create_callback - }) - end - end - rspamd_http.request({ - url = 'http://'..settings['server']..'/_template/rspamd', - task = task, - method = 'head', - callback = http_template_exist_callback - }) end + -- import ingest pipeline and kibana dashboard/visualization -local function initial_setup(worker) - if not (worker:get_name() == 'normal' and worker:get_index() == 0) then return end +local function initial_setup(cfg, ev_base, worker) + if not (worker:get_name() == 'controller' and worker:get_index() == 0) then return end + + local upstream = settings.upstream:get_upstream_round_robin() + local ip_addr = upstream:get_addr():to_string(true) if enabled then -- create ingest pipeline + local geoip_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip' + local function geoip_cb(_, code, _, _) + if code ~= 200 then + rspamd_logger.errx('cannot get data from %s: %s', geoip_url, code) + enabled = false + end + end rspamd_http.request({ - url = 'http://'..settings['server']..'/_ingest/pipeline/rspamd-geoip', - task = task, + url = geoip_url, + ev_base = ev_base, + config = cfg, + callback = geoip_cb, body = '{"description" : "Add geoip info for rspamd","processors" : [{"geoip" : {"field" : "rspam_meta.ip","target_field": "rspam_meta.geoip"}}]}', method = 'put', }) -- create template mappings if not exist - local function http_template_exist_callback(err, code, body, headers) - if code ~= 200 or err_message then + local template_url = connect_prefix .. ip_addr ..'/_ingest/pipeline/rspamd-geoip' + local function http_template_put_callback(_, code, _, _) + if code ~= 200 then + rspamd_logger.errx('cannot put template to %s: %s', template_url, code) + enabled = false + end + end + local function http_template_exist_callback(_, code, _, _) + if code ~= 200 then rspamd_http.request({ - url = 'http://'..settings['server']..'/_template/rspamd', - task = task, + url = template_url, + ev_base = ev_base, + config = cfg, body = elastic_template, method = 'put', + callback = http_template_put_callback, }) end end + rspamd_http.request({ - url = 'http://'..settings['server']..'/_template/rspamd', - task = task, + url = template_url, + ev_base = ev_base, + config = cfg, method = 'head', callback = http_template_exist_callback }) -- add kibana dashboard and visualizations - local kibana_mappings = read_file(settings['kibana_file']); if enabled and settings['import_kibana'] then - local kibana_mappings = read_file(settings['kibana_file']); + local kibana_mappings = read_file(settings['kibana_file']) if kibana_mappings then local parser = ucl.parser() local res,err = parser:parse_string(kibana_mappings) if not res then + rspamd_logger.infox(rspamd_config, 'kibana template cannot be parsed: %s', + err) + enabled = false + return end local obj = parser:get_object() - local bulk_json = "" + local tbl = {} for _,item in ipairs(obj) do - bulk_json = bulk_json..'{ "index" : { "_index" : ".kibana", "_type" : "'..item["_type"]..'" ,"_id": "'..item["_id"]..'"} }'.."\n" - bulk_json = bulk_json..ucl.to_format(item['_source'], 'json-compact').."\n" + table.insert(tbl, '{ "index" : { "_index" : ".kibana", "_type" : "'.. + item["_type"]..'" ,"_id": "'.. + item["_id"]..'"} }') + table.insert(tbl, ucl.to_format(item['_source'], 'json-compact')) + end + + local kibana_url = connect_prefix .. ip_addr ..'/.kibana/_bulk' + local function kibana_template_callback(_, code, _, _) + if code ~= 200 then + rspamd_logger.errx('cannot put template to %s: %s', kibana_url, code) + enabled = false + end end rspamd_http.request({ - url = 'http://'..settings['server']..'/.kibana/_bulk', + url = kibana_url, + ev_base = ev_base, + config = cfg, headers = { ['Content-Type'] = 'application/x-ndjson', }, - body = bulk_json, - task = task, - method = 'post' + body = table.concat(tbl, "\n"), + method = 'post', + callback = kibana_template_callback }) else rspamd_logger.infox(rspamd_config, 'kibana templatefile not found') @@ -308,38 +326,56 @@ local function initial_setup(worker) end end end -rspamd_config:add_on_load(function(cfg, ev_base,worker) - if opts then - for k,v in pairs(opts) do - settings[k] = v - end - if not settings['server'] then - rspamd_logger.infox(rspamd_config, 'no elastic servers are specified, disabling module') - enabled = false - return - end - if not settings['template_file'] then - rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module') - enabled = false - return - end + +redis_params = rspamd_redis.parse_redis_server('elastic') + +if redis_params and opts then + for k,v in pairs(opts) do + settings[k] = v end - elastic_template = read_file(settings['template_file']); - if not elastic_template then - rspamd_logger.infox(rspamd_config, 'elastic unable to read mappings, disabling module') - enabled = false - return + + if not settings['server'] and not settings['servers'] then + rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module') + rspamd_lua_utils.disable_module(N, "config") + else + if settings.use_https then + connect_prefix = 'https://' + end + + settings.upstream = upstream_list.create(rspamd_config, + settings['server'] or settings['servers'], 9200) + + if not settings.upstream then + rspamd_logger.errx('cannot parse elastic address: %s', + settings['server'] or settings['servers']) + rspamd_lua_utils.disable_module(N, "config") + return + end + if not settings['template_file'] then + rspamd_logger.infox(rspamd_config, 'elastic template_file is required, disabling module') + rspamd_lua_utils.disable_module(N, "config") + return + end + + elastic_template = read_file(settings['template_file']); + if not elastic_template then + rspamd_logger.infox(rspamd_config, 'elastic unable to read %s, disabling module', + settings['template_file']) + rspamd_lua_utils.disable_module(N, "config") + return + end + + rspamd_config:register_symbol({ + name = 'ELASTIC_COLLECT', + type = 'idempotent', + callback = elastic_collect, + priority = 10 + }) + + rspamd_config:add_on_load(function(cfg, ev_base,worker) + check_elastic_server(cfg, ev_base, worker) -- check for elasticsearch requirements + initial_setup(cfg, ev_base, worker) -- import mappings pipeline and visualizations + end) end - redis_params = rspamd_parse_redis_server('elastic') - check_elastic_server(ev_base) -- check for elasticsearch requirements - initial_setup(worker) -- import mappings pipeline and visualizations -end) -if enabled == true then - rspamd_config:register_symbol({ - name = 'ELASTIC_COLLECT', - type = 'postfilter', - callback = elastic_collect, - priority = 10 - }) end |