aboutsummaryrefslogtreecommitdiffstats
path: root/src/rspamadm/stat_convert.lua
blob: 7b6de9836da009ff5ef22177102c3b5e427e4327 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
local sqlite3 = require "rspamd_sqlite3"
local redis = require "rspamd_redis"
local util = require "rspamd_util"

local function send_redis(server, symbol, tokens, password, db, cmd)
  local ret = true
  local conn,err = redis.connect_sync({
    host = server,
  })

  local err_str

  if not conn then
    print('Cannot connect to ' .. server .. ' error: ' .. err)
    return false, err
  end

  if password then
    conn:add_cmd('AUTH', {password})
  end
  if db then
    conn:add_cmd('SELECT', {db})
  end

  for _,t in ipairs(tokens) do
    if not conn:add_cmd(cmd, {symbol .. t[3], t[1], t[2]}) then
      ret = false
      err_str = 'add command failure' .. string.format('%s %s',
        cmd, table.concat({symbol .. t[3], t[1], t[2]}, ' '))
    end
  end

  if ret then
    ret,err_str = conn:exec()
  end

  return ret,err_str
end

local function convert_learned(cache, server, password, redis_db)
  local converted = 0
  local db = sqlite3.open(cache)
  local ret = true
  local err_str

  if not db then
    print('Cannot open cache database: ' .. cache)
    return false
  end

  db:sql('BEGIN;')

  local conn,err = redis.connect_sync({
    host = server,
  })

  if not conn then
    print('Cannot connect to ' .. server .. ' error: ' .. err)
    return false
  end

  if password then
    conn:add_cmd('AUTH', {password})
  end
  if redis_db then
    conn:add_cmd('SELECT', {redis_db})
  end

  for row in db:rows('SELECT * FROM learns;') do
    local is_spam
    local digest = tostring(util.encode_base32(row.digest))

    if row.flag == '0' then
      is_spam = '-1'
    else
      is_spam = '1'
    end

    if not conn:add_cmd('HSET', {'learned_ids', digest, is_spam}) then
      print('Cannot add hash: ' .. digest)
      ret = false
    else
      converted = converted + 1
    end
  end
  db:sql('COMMIT;')

  if ret then
    ret,err_str = conn:exec()
  end

  if ret then
    print(string.format('Converted %d cached items from sqlite3 learned cache to redis',
      converted))
  else
    print('Error occurred during sending data to redis: ' .. err_str)
  end

  return ret
end

return function (_, res)
  local db = sqlite3.open(res['source_db'])
  local tokens = {}
  local num = 0
  local total = 0
  local nusers = 0
  local lim = 1000 -- Update each 1000 tokens
  local users_map = {}
  local learns = {}
  local redis_password = res['redis_password']
  local redis_db = nil
  local cmd = 'HINCRBY'
  local ret, err_str

  if res['redis_db'] then
    redis_db = tostring(res['redis_db'])
  end
  if res['reset_previous'] then
    cmd = 'HSET'
  end

  if res['cache_db'] then
    if not convert_learned(res['cache_db'], res['redis_host'],
      redis_password, redis_db) then
        print('Cannot convert learned cache to redis')
        return
    end
  end

  if not db then
    print('Cannot open source db: ' .. res['source_db'])
    return
  end

  db:sql('BEGIN;')
  -- Fill users mapping
  for row in db:rows('SELECT * FROM users;') do
    if row.id == '0' then
      users_map[row.id] = ''
    else
      users_map[row.id] = row.name
    end
    learns[row.id] = row.learns
    nusers = nusers + 1
  end

  -- Workaround for old databases
  for row in db:rows('SELECT * FROM languages') do
    if learns['0'] then
      learns['0'] = learns['0'] + row.learns
    else
      learns['0'] = row.learns
    end
  end

  -- Fill tokens, sending data to redis each `lim` records
  for row in db:rows('SELECT token,value,user FROM tokens;') do
    local user = ''
    if row.user ~= 0 and users_map[row.user] then
      user = users_map[row.user]
    end

    table.insert(tokens, {row.token, row.value, user})

    num = num + 1
    total = total + 1
    if num > lim then
      ret,err_str = send_redis(res['redis_host'], res['symbol'],
        tokens, redis_password, redis_db, cmd)
      if not ret then
        print('Cannot send tokens to the redis server: ' .. err_str)
        return
      end

      num = 0
      tokens = {}
    end
  end
  if #tokens > 0 then
    ret, err_str = send_redis(res['redis_host'], res['symbol'], tokens,
      redis_password, redis_db, cmd)

    if not ret then
      print('Cannot send tokens to the redis server: ' .. err_str)
      return
    end
  end
  -- Now update all users
  local conn,err = redis.connect_sync({
    host = res['redis_host'],
  })

  if not conn then
    print('Cannot connect to ' .. res['redis_host'] .. ' error: ' .. err)
    return false
  end

  if redis_password then
    conn:add_cmd('AUTH', {redis_password})
  end
  if redis_db then
    conn:add_cmd('SELECT', {redis_db})
  end

  for id,learned in pairs(learns) do
    local user = users_map[id]
    if not conn:add_cmd(cmd, {res['symbol'] .. user, 'learns', learned}) then
      print('Cannot update learns for user: ' .. user)
    end
    if not conn:add_cmd('SADD', {res['symbol'] .. '_keys', res['symbol'] .. user}) then
      print('Cannot update learns for user: ' .. user)
    end
  end
  db:sql('COMMIT;')

  ret = conn:exec()

  if ret then
    print(string.format('Migrated %d tokens for %d users for symbol %s',
     total, nusers, res['symbol']))
  else
    print('Error occurred during sending data to redis')
  end
end