123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540 |
- --[[
- Copyright (c) 2021, Vsevolod Stakhov <vsevolod@highsecure.ru>
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ]]--
-
- local lua_redis = require "lua_redis"
- local rspamd_logger = require "rspamd_logger"
- local argparse = require "argparse"
- local rspamd_zstd = require "rspamd_zstd"
- local rspamd_text = require "rspamd_text"
- local rspamd_util = require "rspamd_util"
- local rspamd_cdb = require "rspamd_cdb"
- local lua_util = require "lua_util"
- local rspamd_i64 = require "rspamd_int64"
- local ucl = require "ucl"
-
- local N = "statistics_dump"
- local E = {}
- local classifiers = {}
-
- -- Define command line options
- local parser = argparse()
- :name "rspamadm statistics_dump"
- :description "Dump/restore Rspamd statistics"
- :help_description_margin(30)
- :command_target("command")
- :require_command(false)
-
- parser:option "-c --config"
- :description "Path to config file"
- :argname("<cfg>")
- :default(rspamd_paths["CONFDIR"] .. "/" .. "rspamd.conf")
-
- -- Extract subcommand
- local dump = parser:command "dump d"
- :description "Dump bayes statistics"
- dump:mutex(
- dump:flag "-j --json"
- :description "Json output",
- dump:flag "-C --cdb"
- :description "CDB output"
- )
- dump:flag "-c --compress"
- :description "Compress output"
- dump:option "-b --batch-size"
- :description "Number of entires to process at once"
- :argname("<elts>")
- :convert(tonumber)
- :default(1000)
-
-
- -- Restore
- local restore = parser:command "restore r"
- :description "Restore bayes statistics"
- restore:argument "file"
- :description "Input file to process"
- :argname "<file>"
- :args "*"
- restore:option "-b --batch-size"
- :description "Number of entires to process at once"
- :argname("<elts>")
- :convert(tonumber)
- :default(1000)
- restore:option "-m --mode"
- :description "Number of entires to process at once"
- :argname("<append|subtract|replace>")
- :convert {
- ['append'] = 'append',
- ['subtract'] = 'subtract',
- ['replace'] = 'replace',
- }
- :default 'append'
- restore:flag "-n --no-operation"
- :description "Only show redis commands to be issued"
-
- local function load_config(opts)
- local _r,err = rspamd_config:load_ucl(opts['config'])
-
- if not _r then
- rspamd_logger.errx('cannot parse %s: %s', opts['config'], err)
- os.exit(1)
- end
-
- _r,err = rspamd_config:parse_rcl({'logging', 'worker'})
- if not _r then
- rspamd_logger.errx('cannot process %s: %s', opts['config'], err)
- os.exit(1)
- end
- end
-
- local function check_redis_classifier(cls, cfg)
- -- Skip old classifiers
- if cls.new_schema then
- local symbol_spam, symbol_ham
- -- Load symbols from statfiles
-
- local function check_statfile_table(tbl, def_sym)
- local symbol = tbl.symbol or def_sym
-
- local spam
- if tbl.spam then
- spam = tbl.spam
- else
- if string.match(symbol:upper(), 'SPAM') then
- spam = true
- else
- spam = false
- end
- end
-
- if spam then
- symbol_spam = symbol
- else
- symbol_ham = symbol
- end
- end
-
- local statfiles = cls.statfile
- if statfiles[1] then
- for _,stf in ipairs(statfiles) do
- if not stf.symbol then
- for k,v in pairs(stf) do
- check_statfile_table(v, k)
- end
- else
- check_statfile_table(stf, 'undefined')
- end
- end
- else
- for stn,stf in pairs(statfiles) do
- check_statfile_table(stf, stn)
- end
- end
-
- local redis_params
- redis_params = lua_redis.try_load_redis_servers(cls,
- rspamd_config, false, 'bayes')
- if not redis_params then
- redis_params = lua_redis.try_load_redis_servers(cfg[N] or E,
- rspamd_config, false, 'bayes')
- if not redis_params then
- redis_params = lua_redis.try_load_redis_servers(cfg[N] or E,
- rspamd_config, true)
- if not redis_params then
- return false
- end
- end
- end
-
- table.insert(classifiers, {
- symbol_spam = symbol_spam,
- symbol_ham = symbol_ham,
- redis_params = redis_params,
- })
- end
- end
-
- local function redis_map_zip(ar)
- local data = {}
- for j=1,#ar,2 do
- data[ar[j]] = ar[j + 1]
- end
-
- return data
- end
-
- -- Used to clear tables
- local clear_fcn = table.clear or function(tbl)
- local keys = lua_util.keys(tbl)
- for _,k in ipairs(keys) do
- tbl[k] = nil
- end
- end
-
- local compress_ctx
-
- local function dump_out(out, opts, last)
- if opts.compress and not compress_ctx then
- compress_ctx = rspamd_zstd.compress_ctx()
- end
-
- if compress_ctx then
- if last then
- compress_ctx:stream(rspamd_text.fromtable(out), 'end'):write()
- else
- compress_ctx:stream(rspamd_text.fromtable(out), 'flush'):write()
- end
- else
- for _,o in ipairs(out) do
- io.write(o)
- end
- end
- end
-
- local function dump_cdb(out, opts, last, pattern)
- local results = out[pattern]
-
- if not out.cdb_builder then
- -- First invocation
- out.cdb_builder = rspamd_cdb.build(string.format('%s.cdb', pattern))
- out.cdb_builder:add('_lrnspam', rspamd_i64.fromstring(results.learns_spam or '0'))
- out.cdb_builder:add('_lrnham_', rspamd_i64.fromstring(results.learns_ham or '0'))
- end
-
- for _,o in ipairs(results.elts) do
- out.cdb_builder:add(o.key, o.value)
- end
-
- if last then
- out.cdb_builder:finalize()
- out.cdb_builder = nil
- end
- end
-
- local function dump_pattern(conn, pattern, opts, out, key)
- local cursor = 0
-
- repeat
- conn:add_cmd('SCAN', {tostring(cursor),
- 'MATCH', pattern,
- 'COUNT', tostring(opts.batch_size)})
- local ret, results = conn:exec()
-
- if not ret then
- rspamd_logger.errx("cannot connect execute scan command: %s", results)
- os.exit(1)
- end
-
- cursor = tonumber(results[1])
-
- local elts = results[2]
- local tokens = {}
-
- for _,e in ipairs(elts) do
- conn:add_cmd('HGETALL', {e})
- end
- -- This function returns many results, each for each command
- -- So if we have batch 1000, then we would have 1000 tables in form
- -- [result, {hash_content}]
- local all_results = {conn:exec()}
-
- for i=1,#all_results,2 do
- local r, hash_content = all_results[i], all_results[i + 1]
-
- if r then
- -- List to a hash map
- local data = redis_map_zip(hash_content)
- tokens[#tokens + 1] = {key = elts[(i + 1)/2], data = data}
- end
- end
-
- -- Output keeping track of the commas
- for i,d in ipairs(tokens) do
- if cursor == 0 and i == #tokens or not opts.json then
- if opts.cdb then
- table.insert(out[key].elts, {
- key = rspamd_i64.fromstring(string.match(d.key, '%d+')),
- value = rspamd_util.pack('<n<n', tonumber(d.data["S"] or '0') or 0,
- tonumber(d.data["H"] or '0'))
- })
- else
- out[#out + 1] = rspamd_logger.slog('"%s": %s\n', d.key,
- ucl.to_format(d.data, "json-compact"))
- end
- else
- out[#out + 1] = rspamd_logger.slog('"%s": %s,\n', d.key,
- ucl.to_format(d.data, "json-compact"))
- end
-
- end
-
- if opts.json and cursor == 0 then
- out[#out + 1] = '}}\n'
- end
-
- -- Do not write the last chunk of out as it will be processed afterwards
- if not cursor == 0 then
- if opts.cdb then
- dump_out(out, opts, false)
- clear_fcn(out)
- else
- dump_cdb(out, opts, false, key)
- out[key].elts = {}
- end
- elseif opts.cdb then
- dump_cdb(out, opts, true, key)
- end
-
- until cursor == 0
- end
-
- local function dump_handler(opts)
- local patterns_seen = {}
- for _,cls in ipairs(classifiers) do
- local res,conn = lua_redis.redis_connect_sync(cls.redis_params, false)
-
- if not res then
- rspamd_logger.errx("cannot connect to redis server: %s", cls.redis_params)
- os.exit(1)
- end
-
- local out = {}
- local function check_keys(sym)
- local sym_keys_pattern = string.format("%s_keys", sym)
- conn:add_cmd('SMEMBERS', { sym_keys_pattern })
- local ret,keys = conn:exec()
-
- if not ret then
- rspamd_logger.errx("cannot execute command to get keys: %s", keys)
- os.exit(1)
- end
-
- if not opts.json then
- out[#out + 1] = string.format('"%s": %s\n', sym_keys_pattern,
- ucl.to_format(keys, 'json-compact'))
- end
- for _,k in ipairs(keys) do
- local pat = string.format('%s*_*', k)
- if not patterns_seen[pat] then
- conn:add_cmd('HGETALL', {k})
- local _ret,additional_keys = conn:exec()
-
- if _ret then
- if opts.json then
- out[#out + 1] = string.format('{"pattern": "%s", "meta": %s, "elts": {\n',
- k, ucl.to_format(redis_map_zip(additional_keys), 'json-compact'))
- elseif opts.cdb then
- out[k] = redis_map_zip(additional_keys)
- out[k].elts = {}
- else
- out[#out + 1] = string.format('"%s": %s\n', k,
- ucl.to_format(redis_map_zip(additional_keys), 'json-compact'))
- end
- dump_pattern(conn, pat, opts, out, k)
- patterns_seen[pat] = true
- end
- end
- end
- end
-
- check_keys(cls.symbol_spam)
- check_keys(cls.symbol_ham)
-
- if #out > 0 then
- dump_out(out, opts, true)
- end
- end
- end
-
- local function obj_to_redis_arguments(obj, opts, cmd_pipe)
- local key,value = next(obj)
-
- if type(key) == 'string' then
- if type(value) == 'table' then
- if not value[1] then
- if opts.mode == 'replace' then
- local cmd = 'HMSET'
- local params = {key}
- for k,v in pairs(value) do
- table.insert(params, k)
- table.insert(params, v)
- end
- table.insert(cmd_pipe, {cmd, params})
- else
- local cmd = 'HINCRBYFLOAT'
- local mult = 1.0
- if opts.mode == 'subtract' then
- mult = (-mult)
- end
-
- for k,v in pairs(value) do
- if tonumber(v) then
- v = tonumber(v)
- table.insert(cmd_pipe, {cmd, {key, k, tostring(v * mult)}})
- else
- table.insert(cmd_pipe, {'HSET', {key, k, v}})
- end
- end
- end
- else
- -- Numeric table of elements (e.g. _keys) - it is actually a set in Redis
- for _,elt in ipairs(value) do
- table.insert(cmd_pipe, {'SADD', {key, elt}})
- end
- end
- end
- end
-
- return cmd_pipe
- end
-
- local function execute_batch(batch, conns, opts)
- local cmd_pipe = {}
-
- for _,cmd in ipairs(batch) do
- obj_to_redis_arguments(cmd, opts, cmd_pipe)
- end
-
- if opts.no_operation then
- for _,cmd in ipairs(cmd_pipe) do
- rspamd_logger.messagex('%s %s', cmd[1], table.concat(cmd[2], ' '))
- end
- else
- for _, conn in ipairs(conns) do
- for _,cmd in ipairs(cmd_pipe) do
- local is_ok, err = conn:add_cmd(cmd[1], cmd[2])
-
- if not is_ok then
- rspamd_logger.errx("cannot add command: %s with args: %s: %s", cmd[1], cmd[2], err)
- end
- end
-
- conn:exec()
- end
- end
- end
-
- local function restore_handler(opts)
- local files = opts.file or {'-'}
- local conns = {}
-
- for _,cls in ipairs(classifiers) do
- local res,conn = lua_redis.redis_connect_sync(cls.redis_params, true)
-
- if not res then
- rspamd_logger.errx("cannot connect to redis server: %s", cls.redis_params)
- os.exit(1)
- end
-
- table.insert(conns, conn)
- end
-
- local batch = {}
-
- for _,f in ipairs(files) do
- local fd
- if f ~= '-' then
- fd = io.open(f, 'r')
- io.input(fd)
- end
-
- local cur_line = 1
- for line in io.lines() do
- local ucl_parser = ucl.parser()
- local res, err
- res,err = ucl_parser:parse_string(line)
-
- if not res then
- rspamd_logger.errx("%s: cannot read line %s: %s", f, cur_line, err)
- os.exit(1)
- end
-
- table.insert(batch, ucl_parser:get_object())
- cur_line = cur_line + 1
-
- if cur_line % opts.batch_size == 0 then
- execute_batch(batch, conns, opts)
- batch = {}
- end
- end
-
- if fd then fd:close() end
- end
-
- if #batch > 0 then
- execute_batch(batch, conns, opts)
- end
- end
-
- local function handler(args)
- local opts = parser:parse(args)
-
- local command = opts.command or 'dump'
-
- load_config(opts)
- rspamd_config:init_subsystem('stat')
-
- local obj = rspamd_config:get_ucl()
-
- local classifier = obj.classifier
-
- if classifier then
- if classifier[1] then
- for _,cls in ipairs(classifier) do
- if cls.bayes then cls = cls.bayes end
- if cls.backend and cls.backend == 'redis' then
- check_redis_classifier(cls, obj)
- end
- end
- else
- if classifier.bayes then
-
- classifier = classifier.bayes
- if classifier[1] then
- for _,cls in ipairs(classifier) do
- if cls.backend and cls.backend == 'redis' then
- check_redis_classifier(cls, obj)
- end
- end
- else
- if classifier.backend and classifier.backend == 'redis' then
- check_redis_classifier(classifier, obj)
- end
- end
- end
- end
- end
-
- if type(opts.file) == 'string' then
- opts.file = {opts.file}
- elseif type(opts.file) == 'none' then
- opts.file = {}
- end
-
- if command == 'dump' then
- dump_handler(opts)
- elseif command == 'restore' then
- restore_handler(opts)
- else
- parser:error('command %s is not implemented', command)
- end
- end
-
- return {
- name = 'statistics_dump',
- aliases = {'stat_dump', 'bayes_dump'},
- handler = handler,
- description = parser._description
- }
|