分布式限流方案介绍与实践
Note. 限流算法在分布式系统设计中有广泛的应用,特别是在系统的处理能力有限的时候,通过一种有效的手段阻止限制范围外的请求继续对系统造成压力,避免系统被压垮,值得开发工程师们去思考。
限流还有一种常见的作用是对用户行为进行约束,防止大量无用请求占用系统资源。比如在一个商品秒杀系统,用户的下单行为需要进行严格控制,通常都是限定某些行为在一定时间内允许的次数。这样能够避免一些黑产用户通过脚本等违规方式去抢占秒杀商品。对这些违规行为,需要执行封禁等惩罚措施。
1. 简单计数器限流
这一节我们先介绍一种简单的限流策略,系统要限制用在指定时间里的某个行为最多只能发生N次。由于现在的系统一般都是分布式部署的,所以我们需要有一个中心化的计数器,使用redis能为我们解决这个问题。但是由于中心化的设计,如何能保证高并发时候更新这个计数器的原子性也是一个要处理的问题。
下面我们先定义限流接口:
// 指定用户uid的某个行为action在特定时间内period能够最多发生maxCount次,其中period单位为秒
func isAllowed(uid string, action string, period, maxCount int) bool {
// 基于redis的简单counter算法
// ...
}
// 下单接口
func CreateOrder() {
canCreateOrder := isAllowed("berryjam", "createOrder", 5, 50) // 调用限流接口,5秒内最多只能下单50次
if canCreateOrder {
// 处理下单逻辑
// ...
fmt.Println("下单成功")
} else { // 返回请求或者抛出异常、panic
panic("下单次数超限")
}
}
在看下面的解决方案前,可以先思考下如果我们来实现isAllowed接口,会用什么方法?
解决方案
我们需要维护一个滑动时间窗口,结合redis的zset数据结构,就很自然地可以通过scroe来找出指定的时间窗口。另外只需要保留时间窗口内的数据,窗口之外的数据都可以删除掉。至于zset的value值只要保证集合元素的唯一性即可,我们可以选择纳秒级的时间戳作为value。
如图1所示:
图 1 滑动时间窗口示意图
为了减少算法的空间复杂度,只需要保留时间窗口内的行为记录,另外如果用户是不活跃用户,滑动时间窗口内的行为记录也是空的,那么这个zset就可以直接删除,进一步减少占用的空间。
接着通过统计滑动时间窗口内的元素数量与阀值maxCount进行比较,便可知道当前行为是否允许。golang实现代码如下:
package main
import (
"github.com/go-redis/redis"
"fmt"
"time"
)
var redisdb *redis.Client
var counterLuaScript = `
-- 记录行为
redis.pcall("zadd", KEYS[1], ARGV[1], ARGV[1]); -- value 和 score 都使用纳秒时间戳,即ARGV[1]
redis.pcall("zremrangebyscore", KEYS[1], 0, ARGV[2]); -- 移除时间窗口之前的行为记录,剩下的都是时间窗口内的
local count = redis.pcall("zcard", KEYS[1]); -- 获取窗口内的行为数量
redis.pcall("expire", KEYS[1], ARGV[3]); -- 设置 zset 过期时间,避免冷用户持续占用内存
return count -- 返回窗口内行为数量`
var evalSha string
func init() {
initRedisClient()
}
func initRedisClient() {
redisdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
var err error
evalSha, err = redisdb.ScriptLoad(counterLuaScript).Result()
if err != nil {
panic(err)
}
}
// period单位为秒
func isAllowed(uid string, action string, period, maxCount int) bool {
key := fmt.Sprintf("%v_%v", uid, action)
now := time.Now().UnixNano()
beforeTime := now - int64(period*1000000000)
res, err := redisdb.EvalSha(evalSha, []string{key}, now, beforeTime, period).Result()
if err != nil {
panic(err)
}
if res.(int64) > int64(maxCount) {
return false
}
return true
}
func CreateOrder() {
canCreateOrder := isAllowed("berryjam", "createOrder", 5, 10)
if canCreateOrder {
// 处理下单逻辑
// ...
fmt.Println("下单成功")
} else { // 返回请求或者抛出异常、panic
panic("下单次数超限")
}
}
func main() {
for i := 0; i < 100; i++ {
CreateOrder()
}
}
isAllowed接口主要是调用了一个redis lua[1]脚本(变量counterLuaScript),需要读者花点时间好好理解一下。整体思路是:每个一行为到来时,都维护一次时间窗口。将时间窗口外的记录全部清理掉,只保留窗口内的记录。zset集合中实际起作用的是score的值,value没有特别意义,只用于保证元素的唯一性。
至于为什么不直接在程序里,分别依次执行redis的zadd、zremrangebyscore、zcard、expire这4个命令呢?是因为在高并发的情况下,需要用redis lua脚本来保证这4条命令的原子执行,从而保证zset的数据正确性。
至此,一个基于redis计数器限流方案已经实现。但是这种方案存在缺点,因为它要记录时间窗口内的所有行为记录,如果这个量特别大的时候,内存消耗会变得非常严重。比方说限定120秒内操作不得超过1000万这样的情况,会消耗大量的存储空间去维护计数器。接下来会介绍另外一种限流算法-漏斗限流,去解决这个缺点。
2. 漏斗限流
漏斗限流是另外一种常见的限流方法,这个算法灵感源自漏斗(funnel)的结构。
如图2所示:
图 2 漏斗算法
漏斗的容量有限,并且漏嘴的流率固定。如果漏斗停止灌水,那么若干时间后漏斗会变空。如果把漏嘴堵住,一直往里灌水,漏斗会变满直到溢出再也装不进去。如果漏嘴放开,等流走一部分水后,又可以往里面灌水。另外如果漏嘴流水速率大于灌水速率,那么漏斗永远不会满。如果漏嘴流水速率小于灌水速率,那么一旦漏斗满了,就需要暂停灌水等漏斗流出足够的空间后才能继续往里灌水。
因此,漏斗的剩余空间可以表示当前可以进行的行为数量,漏嘴流速表示系统能够处理该行为的频率。下面是用golang实现的一个完整示例代码:
package main
import (
"time"
"fmt"
"github.com/go-redis/redis"
)
const (
FAILED = iota
SUCC
)
var redisdb *redis.Client
var initFunnelScript = `
-- 分别初始化漏斗结构的4个字段capacity、left_quota、leaking_rate、leaking_time
-- capacity:漏斗容量
-- left_quota:漏斗剩余空间
-- leaking_rate:漏嘴流水速率
-- leaking_time:上一次漏水时间
local key
for i,j in ipairs(ARGV)
do if i%2 == 0
then
redis.pcall('hsetnx', KEYS[1], key, j)
else
key = j
end
end`
var initFunnelSha string
var makeSpaceScript = `
local leaking_time = tonumber(redis.pcall('hget', KEYS[1], 'leaking_time'))
local leaking_rate = tonumber(redis.pcall('hget', KEYS[1], 'leaking_rate'))
local left_quota = tonumber(redis.pcall('hget', KEYS[1], 'left_quota'))
local capacity = tonumber(redis.pcall('hget', KEYS[1], 'capacity'))
local now = tonumber(ARGV[1])
local delta_time = now - leaking_time -- 距离上一次漏水过去了多久
local delta_quota = leaking_rate * delta_time -- 又可以腾出不少空间了
redis.pcall('hset', KEYS[1], 'leaking_time', now) -- 记录漏水时间
if delta_quota + left_quota >= capacity then -- 剩余空间不得高于容量
redis.pcall('hset', KEYS[1], 'left_quota', capacity)
else
redis.pcall('hset', KEYS[1], 'left_quota', delta_quota + left_quota) -- 增加剩余空间
end
`
var makeSpaceSha string
var wateringScript = `
local left_quota = tonumber(redis.pcall('hget', KEYS[1], 'left_quota'))
local quota = tonumber(ARGV[1])
if left_quota >= quota then -- 判断剩余空间是否足够
redis.pcall('hset', KEYS[1], 'left_quota', left_quota-quota)
return 1
else
return 0
end
`
var wateringSha string
func init() {
initRedisClient()
}
func initRedisClient() {
redisdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
var err error
initFunnelSha, err = redisdb.ScriptLoad(initFunnelScript).Result()
if err != nil {
panic(err)
}
makeSpaceSha, err = redisdb.ScriptLoad(makeSpaceScript).Result()
if err != nil {
panic(err)
}
wateringSha, err = redisdb.ScriptLoad(wateringScript).Result()
if err != nil {
panic(err)
}
}
func MakeSpace(key string) {
now := time.Now().Unix()
redisdb.EvalSha(makeSpaceSha, []string{key}, now).Result()
}
// quota为每次处理请求所需要的资源配额
func Watering(key string, quota float64) bool {
MakeSpace(key)
res, err := redisdb.EvalSha(wateringSha, []string{key}, quota).Result()
if err != nil {
panic(err)
}
return res.(int64) == SUCC
}
func IsActionAllowed(uid, action string, capacity float64, leakingRate float64) bool {
key := fmt.Sprintf("%v_%v", uid, action)
redisdb.EvalSha(initFunnelSha, []string{key}, "capacity", capacity, "left_quota", capacity, "leaking_rate", leakingRate, "leaking_time", time.Now().Unix())
return Watering(key, 1)
}
func main() {
for i := 0; i < 20; i++ {
fmt.Printf("%+v\n", IsActionAllowed("berryjam", "reply", 15, 0.5))
}
}
MakeSpace方法是漏斗算法的核心,它在每次灌水前都会被调用以触发漏水,给漏斗腾出空间。所能够的腾出的空间为流速*漏斗上次漏水至今的时间。因此漏斗算法只用4个字段就可以完成流量控制,算法的空间复杂度为O(1),不再与行为的频率成正比。
同样地,上面的代码,没有直接用redis的hset存储这4个字段,然后hget出各个字段,接着在内存运算后再hset回去。还是因为在高并发的情况下,这样无法保证整个过程的原子性,所以还是使用3段lua脚本[2]来保证整个过程的原子执行。
最后,Redis4.0本身提供了一个限流redis模块,叫redis-cell[3]。该模块也使用了漏斗算法,并提供了原子的限流指令。限流问题会变得更加简单,感兴趣的读者可以点击文末的参考资料进一步了解。
3. 参考资料
[1] Redis Lua 脚本
[2] 使用lua脚本实现redis的hmsetnx命令,操作hash表时不覆盖原有数据
[3] redis-cell:在Redis中,作为单个命令,提供速率限制的Redis模块