From: Vsevolod Stakhov Date: Mon, 13 Mar 2017 18:08:44 +0000 (+0000) Subject: [Feature] Preliminary implementation of redis history plugin X-Git-Tag: 1.5.3~36 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=cf66179eeced546e12f1c28b9d032504055d778e;p=rspamd.git [Feature] Preliminary implementation of redis history plugin --- diff --git a/src/plugins/lua/history_redis.lua b/src/plugins/lua/history_redis.lua new file mode 100644 index 000000000..a8184fca9 --- /dev/null +++ b/src/plugins/lua/history_redis.lua @@ -0,0 +1,131 @@ +--[[ +Copyright (c) 2017, Vsevolod Stakhov + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +]]-- + +local redis_params + +local settings = { + key_prefix = 'rs_history', -- default key name + nrows = 2000, -- default rows limit + compress = true, -- use zstd compression when storing data in redis +} + +local rspamd_logger = require "rspamd_logger" +local rspamd_util = require "rspamd_util" +local fun = require "fun" +local ucl = require("ucl") +local E = {} + +local function process_addr(addr) + if addr then + return addr.addr + end + + return 'unknown' +end + +local function normalise_results(tbl, task) + local metric = tbl.default + + -- Convert stupid metric object + if metric then + tbl.symbols = {} + local symbols, others = fun.partition(function(k, v) + return type(v) == 'table' and v.score + end, metric) + + fun.each(function(k, v) v.name = nil; tbl.symbols[k] = v; end, symbols) + fun.each(function(k, v) tbl[k] = v end, others) + + -- Reset the original metric + tbl.default = nil + end + + -- Now, add recipients and senders + tbl.sender_smtp = process_addr((task:get_from('smtp') or E)[1]) + tbl.sender_mime = process_addr((task:get_from('mime') or E)[1]) + tbl.rcpt_smtp = fun.totable(fun.map(process_addr, task:get_recipients('smtp') or {})) + tbl.rcpt_mime = fun.totable(fun.map(process_addr, task:get_recipients('mime') or {})) + tbl.user = task:get_user() or 'unknown' + tbl.rmilter = nil + tbl.messages = nil + tbl.urls = nil + + tbl.subject = task:get_header('subject') or 'unknown' +end + +local function history_save(task) + local function redis_llen_cb(err, _) + if err then + rspamd_logger.errx(task, 'got error %s when writing history row', + err) + end + end + + -- We skip saving it to the history + if task:has_flag('no_log') then + return + end + + local data = task:get_protocol_reply{'metrics', 'basic'} + local prefix = settings.key_prefix + + if data then + normalise_results(data, task) + else + rspamd_logger.errx('cannot get protocol reply, skip saving in history') + return + end + -- 1 is 'json-compact' but faster + local json = ucl.to_format(data, 1) + + if settings.compress then + json = tostring(rspamd_util.zstd_compress(json)) + -- Distinguish between compressed and non-compressed options + prefix = prefix .. '_zst' + end + + local ret, conn, _ = rspamd_redis_make_request(task, + redis_params, -- connect params + nil, -- hash key + false, -- is write + redis_llen_cb, --callback + 'LPUSH', -- command + {prefix, json} -- arguments + ) + + if ret then + conn:add_cmd('LTRIM', {settings.key_prefix, '0', tostring(settings.nrows)}) + end +end + +local opts = rspamd_config:get_all_opt('history_redis') +if opts then + for k,v in pairs(opts) do + settings[k] = v + end + + redis_params = rspamd_parse_redis_server('history_redis') + if not redis_params then + rspamd_logger.infox(rspamd_config, 'no servers are specified, disabling module') + else + rspamd_config:register_symbol({ + name = 'HISTORY_SAVE', + type = 'postfilter', + callback = history_save, + priority = 150 + }) + end +end \ No newline at end of file