aboutsummaryrefslogtreecommitdiffstats
path: root/lualib/redis_scripts/ratelimit_check.lua
blob: 019996c11dbc50e0b2e0a5ea8e280d668a893817 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
-- This Lua script is a rate limiter for Redis using the token bucket algorithm.
-- The script checks if a message should be rate-limited and updates the bucket status accordingly.
-- Input keys:
-- KEYS[1]: A prefix for the Redis keys, e.g., RL_<triplet>_<seconds>
-- KEYS[2]: The current time in milliseconds
-- KEYS[3]: The bucket leak rate (messages per millisecond)
-- KEYS[4]: The maximum allowed burst
-- KEYS[5]: The expiration time for a bucket
-- KEYS[6]: The number of recipients for the message
-- KEYS[7]: Enable dynamic ratelimits

-- Redis keys used:
-- l: Last hit (time in milliseconds)
-- b: Current burst (number of tokens in the bucket)
-- p: Pending messages (number of messages in processing)
-- dr: Current dynamic rate multiplier (*10000)
-- db: Current dynamic burst multiplier (*10000)

-- Returns:
-- An array containing:
-- 1. if the message should be rate-limited or 0 if not
-- 2. The current burst value after processing the message
-- 3. The dynamic rate multiplier
-- 4. The dynamic burst multiplier
-- 5. The number of tokens leaked during processing

local last = redis.call('HGET', KEYS[1], 'l')
local now = tonumber(KEYS[2])
local nrcpt = tonumber(KEYS[6])
local leak_rate = tonumber(KEYS[3])
local max_burst = tonumber(KEYS[4])
local prefix = KEYS[1]
local enable_dynamic = KEYS[7] == 'true'
local lfb_cache_prefix = KEYS[8]
local lfb_max_cache_size = tonumber(KEYS[9])
local dynr, dynb, leaked = 0, 0, 0
if not last then
  -- New bucket
  redis.call('HMSET', prefix, 'l', tostring(now), 'b', '0', 'dr', '10000', 'db', '10000', 'p', tostring(nrcpt))
  redis.call('EXPIRE', prefix, KEYS[5])
  return { 0, '0', '1', '1', '0' }
end
last = tonumber(last)

local burst, pending = unpack(redis.call('HMGET', prefix, 'b', 'p'))
burst, pending = tonumber(burst or '0'), tonumber(pending or '0')
-- Sanity to avoid races
if burst < 0 then
  burst = 0
end
if pending < 0 then
  pending = 0
end
pending = pending + nrcpt -- this message
-- Perform leak
if burst + pending > 0 then
  -- If we have any time passed
  if burst > 0 and last < now then
    if enable_dynamic then
      dynr = tonumber(redis.call('HGET', prefix, 'dr')) / 10000.0
      if dynr == 0 then
        dynr = 0.0001
      end
    else
      dynr = 1.0
    end
    leak_rate = leak_rate * dynr
    leaked = ((now - last) * leak_rate)
    if leaked > burst then
      leaked = burst
    end
    burst = burst - leaked
    redis.call('HINCRBYFLOAT', prefix, 'b', -(leaked))
    redis.call('HSET', prefix, 'l', tostring(now))
  end

  if enable_dynamic then
    dynb = tonumber(redis.call('HGET', prefix, 'db')) / 10000.0
    if dynb == 0 then
      dynb = 0.0001
    end
  else
    dynb = 1.0
  end

  burst = burst + pending
  if burst > 0 and burst > max_burst * dynb then
    redis.call('ZREMRANGEBYRANK', lfb_cache_prefix, 0, -(lfb_max_cache_size + 1)) -- Keeping size of lfb cache
    redis.call('ZADD', lfb_cache_prefix, now, prefix) -- LRU cache is based on timestamps of buckets

    return { 1, tostring(burst - pending), tostring(dynr), tostring(dynb), tostring(leaked) }
  end
  -- Increase pending if we allow ratelimit
  redis.call('HINCRBY', prefix, 'p', nrcpt)
else
  burst = 0
  redis.call('HMSET', prefix, 'b', '0', 'p', tostring(nrcpt))
end

return { 0, tostring(burst), tostring(dynr), tostring(dynb), tostring(leaked) }