1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
-- This Lua script is a rate limiter for Redis using the token bucket algorithm.
-- The script checks if a message should be rate-limited and updates the bucket status accordingly.
-- Input keys:
-- KEYS[1]: A prefix for the Redis keys, e.g., RL_<triplet>_<seconds>
-- KEYS[2]: The current time in milliseconds
-- KEYS[3]: The bucket leak rate (messages per millisecond)
-- KEYS[4]: The maximum allowed burst
-- KEYS[5]: The expiration time for a bucket
-- KEYS[6]: The number of recipients for the message
-- KEYS[7]: Enable dynamic ratelimits
-- Redis keys used:
-- l: Last hit (time in milliseconds)
-- b: Current burst (number of tokens in the bucket)
-- p: Pending messages (number of messages in processing)
-- dr: Current dynamic rate multiplier (*10000)
-- db: Current dynamic burst multiplier (*10000)
-- Returns:
-- An array containing:
-- 1. if the message should be rate-limited or 0 if not
-- 2. The current burst value after processing the message
-- 3. The dynamic rate multiplier
-- 4. The dynamic burst multiplier
-- 5. The number of tokens leaked during processing
local last = redis.call('HGET', KEYS[1], 'l')
local now = tonumber(KEYS[2])
local nrcpt = tonumber(KEYS[6])
local leak_rate = tonumber(KEYS[3])
local max_burst = tonumber(KEYS[4])
local prefix = KEYS[1]
local enable_dynamic = KEYS[7] == 'true'
local lfb_cache_prefix = KEYS[8]
local lfb_max_cache_size = tonumber(KEYS[9])
local dynr, dynb, leaked = 0, 0, 0
if not last then
-- New bucket
redis.call('HMSET', prefix, 'l', tostring(now), 'b', '0', 'dr', '10000', 'db', '10000', 'p', tostring(nrcpt))
redis.call('EXPIRE', prefix, KEYS[5])
return { 0, '0', '1', '1', '0' }
end
last = tonumber(last)
local burst, pending = unpack(redis.call('HMGET', prefix, 'b', 'p'))
burst, pending = tonumber(burst or '0'), tonumber(pending or '0')
-- Sanity to avoid races
if burst < 0 then
burst = 0
end
if pending < 0 then
pending = 0
end
pending = pending + nrcpt -- this message
-- Perform leak
if burst + pending > 0 then
-- If we have any time passed
if burst > 0 and last < now then
if enable_dynamic then
dynr = tonumber(redis.call('HGET', prefix, 'dr')) / 10000.0
if dynr == 0 then
dynr = 0.0001
end
else
dynr = 1.0
end
leak_rate = leak_rate * dynr
leaked = ((now - last) * leak_rate)
if leaked > burst then
leaked = burst
end
burst = burst - leaked
redis.call('HINCRBYFLOAT', prefix, 'b', -(leaked))
redis.call('HSET', prefix, 'l', tostring(now))
end
if enable_dynamic then
dynb = tonumber(redis.call('HGET', prefix, 'db')) / 10000.0
if dynb == 0 then
dynb = 0.0001
end
else
dynb = 1.0
end
burst = burst + pending
if burst > 0 and burst > max_burst * dynb then
redis.call('ZREMRANGEBYRANK', lfb_cache_prefix, 0, -(lfb_max_cache_size + 1)) -- Keeping size of lfb cache
redis.call('ZADD', lfb_cache_prefix, now, prefix) -- LRU cache is based on timestamps of buckets
return { 1, tostring(burst - pending), tostring(dynr), tostring(dynb), tostring(leaked) }
end
-- Increase pending if we allow ratelimit
redis.call('HINCRBY', prefix, 'p', nrcpt)
else
burst = 0
redis.call('HMSET', prefix, 'b', '0', 'p', tostring(nrcpt))
end
return { 0, tostring(burst), tostring(dynr), tostring(dynb), tostring(leaked) }
|