2016-01-25 21:13:48 +01:00
|
|
|
local sqlite3 = require "rspamd_sqlite3"
|
|
|
|
local redis = require "rspamd_redis"
|
|
|
|
local _ = require "fun"
|
|
|
|
|
|
|
|
local function send_redis(server, symbol, tokens)
|
|
|
|
local ret = true
|
2016-01-26 14:16:42 +01:00
|
|
|
local conn = redis.connect_sync({
|
|
|
|
host = server,
|
|
|
|
})
|
|
|
|
|
|
|
|
if not conn then
|
|
|
|
print('Cannot connect to ' .. server)
|
|
|
|
return false
|
|
|
|
end
|
|
|
|
|
2016-01-25 21:13:48 +01:00
|
|
|
_.each(function(t)
|
2016-01-26 14:16:42 +01:00
|
|
|
if not conn:add_cmd('HINCRBY', {symbol .. t[3], t[1], t[2]}) then
|
2016-01-25 21:13:48 +01:00
|
|
|
ret = false
|
|
|
|
end
|
2016-01-26 14:16:42 +01:00
|
|
|
end, tokens)
|
2016-01-25 21:13:48 +01:00
|
|
|
|
2016-01-26 14:16:42 +01:00
|
|
|
if ret then
|
|
|
|
ret = conn:exec()
|
|
|
|
end
|
|
|
|
|
2016-01-25 21:13:48 +01:00
|
|
|
return ret
|
|
|
|
end
|
|
|
|
|
|
|
|
return function (args, res)
|
|
|
|
local db = sqlite3.open(res['source_db'])
|
|
|
|
local tokens = {}
|
|
|
|
local num = 0
|
2016-01-26 00:40:49 +01:00
|
|
|
local total = 0
|
|
|
|
local nusers = 0
|
|
|
|
local lim = 1000 -- Update each 1000 tokens
|
2016-01-25 21:13:48 +01:00
|
|
|
local users_map = {}
|
|
|
|
local learns = {}
|
|
|
|
|
|
|
|
if not db then
|
|
|
|
print('Cannot open source db: ' .. res['source_db'])
|
|
|
|
return
|
|
|
|
end
|
|
|
|
|
|
|
|
db:sql('BEGIN;')
|
|
|
|
-- Fill users mapping
|
|
|
|
for row in db:rows('SELECT * FROM users;') do
|
2016-01-26 00:40:49 +01:00
|
|
|
if row.id == '0' then
|
|
|
|
users_map[row.id] = ''
|
|
|
|
else
|
|
|
|
users_map[row.id] = row.name
|
|
|
|
end
|
2016-01-25 21:13:48 +01:00
|
|
|
learns[row.id] = row.learned
|
2016-01-26 00:40:49 +01:00
|
|
|
nusers = nusers + 1
|
2016-01-25 21:13:48 +01:00
|
|
|
end
|
|
|
|
-- Fill tokens, sending data to redis each `lim` records
|
|
|
|
for row in db:rows('SELECT token,value,user FROM tokens;') do
|
|
|
|
local user = ''
|
|
|
|
if row.user ~= 0 and users_map[row.user] then
|
|
|
|
user = users_map[row.user]
|
|
|
|
end
|
|
|
|
|
|
|
|
table.insert(tokens, {row.token, row.value, user})
|
|
|
|
|
|
|
|
num = num + 1
|
2016-01-26 00:40:49 +01:00
|
|
|
total = total + 1
|
2016-01-25 21:13:48 +01:00
|
|
|
if num > lim then
|
|
|
|
if not send_redis(res['redis_host'], res['symbol'], tokens, users_map) then
|
|
|
|
print('Cannot send tokens to the redis server')
|
|
|
|
return
|
|
|
|
end
|
|
|
|
|
|
|
|
num = 0
|
|
|
|
tokens = {}
|
|
|
|
end
|
|
|
|
end
|
2016-01-26 00:40:49 +01:00
|
|
|
if #tokens > 0 and
|
|
|
|
not send_redis(res['redis_host'], res['symbol'], tokens, users_map) then
|
|
|
|
|
|
|
|
print('Cannot send tokens to the redis server')
|
|
|
|
return
|
|
|
|
end
|
|
|
|
-- Now update all users
|
|
|
|
_.each(function(id, learned)
|
|
|
|
local user = users_map[id]
|
|
|
|
if not redis.make_request_sync({
|
|
|
|
host = server,
|
|
|
|
cmd = 'HSET',
|
|
|
|
args = {symbol .. user, 'learns', learned}
|
|
|
|
}) then
|
|
|
|
print('Cannot update learns for user: ' .. user)
|
|
|
|
end
|
|
|
|
end, learns)
|
2016-01-25 21:13:48 +01:00
|
|
|
db:sql('COMMIT;')
|
2016-01-26 00:40:49 +01:00
|
|
|
|
|
|
|
print(string.format('Migrated %d tokens for %d users for symbol %s',
|
|
|
|
total, nusers, res['symbol']))
|
2016-01-25 21:13:48 +01:00
|
|
|
end
|