local sqlite3 = require "rspamd_sqlite3"
local redis = require "rspamd_redis"
local util = require "rspamd_util"

local function send_redis(server, symbol, tokens, password, db)
  local ret = true
  local conn,err = redis.connect_sync({
    host = server,
  })

  if not conn then
    print('Cannot connect to ' .. server .. ' error: ' .. err)
    return false
  end

  if password then
    conn:add_cmd('AUTH', {password})
  end
  if db then
    conn:add_cmd('SELECT', {db})
  end

  for _,t in ipairs(tokens) do
    if not conn:add_cmd('HINCRBY', {symbol .. t[3], t[1], t[2]}) then
      ret = false
    end
  end

  if ret then
    ret = conn:exec()
  end

  return ret
end

local function convert_learned(cache, server, password, db)
  local converted = 0
  local db = sqlite3.open(cache)
  local ret = true

  if not db then
    print('Cannot open cache database: ' .. cache)
    return false
  end

  db:sql('BEGIN;')

  local conn,err = redis.connect_sync({
    host = server,
  })

  if not conn then
    print('Cannot connect to ' .. server .. ' error: ' .. err)
    return false
  end

  if password then
    conn:add_cmd('AUTH', {password})
  end
  if db then
    conn:add_cmd('SELECT', {db})
  end

  for row in db:rows('SELECT * FROM learns;') do
    local is_spam
    local digest = tostring(util.encode_base32(row.digest))

    if row.flag == '0' then
      is_spam = '-1'
    else
      is_spam = '1'
    end

    if not conn:add_cmd('HSET', {'learned_ids', digest, is_spam}) then
      print('Cannot add hash: ' .. digest)
      ret = false
    else
      converted = converted + 1
    end
  end
  db:sql('COMMIT;')

  if ret then
    ret = conn:exec()
  end

  if ret then
    print(string.format('Converted %d cached items from sqlite3 learned cache to redis',
      converted))
  else
    print('Error occurred during sending data to redis')
  end

  return ret
end

return function (args, res)
  local db = sqlite3.open(res['source_db'])
  local tokens = {}
  local num = 0
  local total = 0
  local nusers = 0
  local lim = 1000 -- Update each 1000 tokens
  local users_map = {}
  local learns = {}
  local redis_password = res['redis_password']
  local redis_db = res['redis_db']
  local ret = false

  if res['cache_db'] then
    if not convert_learned(res['cache_db'], res['redis_host']) then
      print('Cannot convert learned cache to redis')
      return
    end
  end

  if not db then
    print('Cannot open source db: ' .. res['source_db'])
    return
  end

  db:sql('BEGIN;')
  -- Fill users mapping
  for row in db:rows('SELECT * FROM users;') do
    if row.id == '0' then
      users_map[row.id] = ''
    else
      users_map[row.id] = row.name
    end
    learns[row.id] = row.learns
    nusers = nusers + 1
  end

  -- Workaround for old databases
  for row in db:rows('SELECT * FROM languages WHERE id=0;') do
    if learns[row.id] then
      learns[row.id] = learns[row.id] + row.learns
    else
      learns[row.id] = row.learns
    end
  end

  -- Fill tokens, sending data to redis each `lim` records
  for row in db:rows('SELECT token,value,user FROM tokens;') do
    local user = ''
    if row.user ~= 0 and users_map[row.user] then
      user = users_map[row.user]
    end

    table.insert(tokens, {row.token, row.value, user})

    num = num + 1
    total = total + 1
    if num > lim then
      if not send_redis(res['redis_host'], res['symbol'],
        tokens, redis_password, redis_db) then

        print('Cannot send tokens to the redis server')
        return
      end

      num = 0
      tokens = {}
    end
  end
  if #tokens > 0 and
    not send_redis(res['redis_host'], res['symbol'], tokens,
      redis_password, redis_db) then

    print('Cannot send tokens to the redis server')
    return
  end
  -- Now update all users
  local conn,err = redis.connect_sync({
    host = res['redis_host'],
  })

  if not conn then
    print('Cannot connect to ' .. res['redis_host'] .. ' error: ' .. err)
    return false
  end

  if redis_password then
    conn:add_cmd('AUTH', {redis_password})
  end
  if redis_db then
    conn:add_cmd('SELECT', {redis_db})
  end

  for id,learned in pairs(learns) do
    local user = users_map[id]
    if not conn:add_cmd('HINCRBY', {res['symbol'] .. user, 'learns', learned}) then
      print('Cannot update learns for user: ' .. user)
    end
  end
  db:sql('COMMIT;')

  ret = conn:exec()

  if ret then
    print(string.format('Migrated %d tokens for %d users for symbol %s',
     total, nusers, res['symbol']))
  else
    print('Error occurred during sending data to redis')
  end
end