黄先森

西二旗民工

分享一些与编程、分布式系统、区块链技术相关的内容


欢迎访问个人github

基于Redis的分布式限流方案介绍与实践

分布式限流方案介绍与实践

Note. 限流算法在分布式系统设计中有广泛的应用,特别是在系统的处理能力有限的时候,通过一种有效的手段阻止限制范围外的请求继续对系统造成压力,避免系统被压垮,值得开发工程师们去思考。

限流还有一种常见的作用是对用户行为进行约束,防止大量无用请求占用系统资源。比如在一个商品秒杀系统,用户的下单行为需要进行严格控制,通常都是限定某些行为在一定时间内允许的次数。这样能够避免一些黑产用户通过脚本等违规方式去抢占秒杀商品。对这些违规行为,需要执行封禁等惩罚措施。

1. 简单计数器限流

这一节我们先介绍一种简单的限流策略,系统要限制用在指定时间里的某个行为最多只能发生N次。由于现在的系统一般都是分布式部署的,所以我们需要有一个中心化的计数器,使用redis能为我们解决这个问题。但是由于中心化的设计,如何能保证高并发时候更新这个计数器的原子性也是一个要处理的问题。

下面我们先定义限流接口:

// 指定用户uid的某个行为action在特定时间内period能够最多发生maxCount次,其中period单位为秒
func isAllowed(uid string, action string, period, maxCount int) bool {
// 基于redis的简单counter算法
// ...
}

// 下单接口
func CreateOrder() {
	canCreateOrder := isAllowed("berryjam", "createOrder", 5, 50) // 调用限流接口,5秒内最多只能下单50次
	if canCreateOrder {
		// 处理下单逻辑
		// ...
		fmt.Println("下单成功")
	} else { // 返回请求或者抛出异常、panic
		panic("下单次数超限")
	}
}

在看下面的解决方案前,可以先思考下如果我们来实现isAllowed接口,会用什么方法?

解决方案

我们需要维护一个滑动时间窗口,结合redis的zset数据结构,就很自然地可以通过scroe来找出指定的时间窗口。另外只需要保留时间窗口内的数据,窗口之外的数据都可以删除掉。至于zset的value值只要保证集合元素的唯一性即可,我们可以选择纳秒级的时间戳作为value。

如图1所示:

图 1 滑动时间窗口示意图

为了减少算法的空间复杂度,只需要保留时间窗口内的行为记录,另外如果用户是不活跃用户,滑动时间窗口内的行为记录也是空的,那么这个zset就可以直接删除,进一步减少占用的空间。

接着通过统计滑动时间窗口内的元素数量与阀值maxCount进行比较,便可知道当前行为是否允许。golang实现代码如下:

package main

import (
	"github.com/go-redis/redis"
	"fmt"
	"time"
)

var redisdb *redis.Client

var counterLuaScript = `
	-- 记录行为
	redis.pcall("zadd", KEYS[1], ARGV[1], ARGV[1]); -- value 和 score 都使用纳秒时间戳,即ARGV[1]
	redis.pcall("zremrangebyscore", KEYS[1], 0, ARGV[2]); -- 移除时间窗口之前的行为记录,剩下的都是时间窗口内的
	local count = redis.pcall("zcard", KEYS[1]); -- 获取窗口内的行为数量
	redis.pcall("expire", KEYS[1], ARGV[3]); -- 设置 zset 过期时间,避免冷用户持续占用内存
	return count -- 返回窗口内行为数量`

var evalSha string

func init() {
	initRedisClient()
}

func initRedisClient() {
	redisdb = redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       0,
	})

	var err error
	evalSha, err = redisdb.ScriptLoad(counterLuaScript).Result()
	if err != nil {
		panic(err)
	}
}

// period单位为秒
func isAllowed(uid string, action string, period, maxCount int) bool {
	key := fmt.Sprintf("%v_%v", uid, action)
	now := time.Now().UnixNano()
	beforeTime := now - int64(period*1000000000)
	res, err := redisdb.EvalSha(evalSha, []string{key}, now, beforeTime, period).Result()
	if err != nil {
		panic(err)
	}
	if res.(int64) > int64(maxCount) {
		return false
	}
	return true
}

func CreateOrder() {
	canCreateOrder := isAllowed("berryjam", "createOrder", 5, 10)
	if canCreateOrder {
		// 处理下单逻辑
		// ...
		fmt.Println("下单成功")
	} else { // 返回请求或者抛出异常、panic
		panic("下单次数超限")
	}
}

func main() {
	for i := 0; i < 100; i++ {
		CreateOrder()
	}
}

isAllowed接口主要是调用了一个redis lua[1]脚本(变量counterLuaScript),需要读者花点时间好好理解一下。整体思路是:每个一行为到来时,都维护一次时间窗口。将时间窗口外的记录全部清理掉,只保留窗口内的记录。zset集合中实际起作用的是score的值,value没有特别意义,只用于保证元素的唯一性。

至于为什么不直接在程序里,分别依次执行redis的zadd、zremrangebyscore、zcard、expire这4个命令呢?是因为在高并发的情况下,需要用redis lua脚本来保证这4条命令的原子执行,从而保证zset的数据正确性。

至此,一个基于redis计数器限流方案已经实现。但是这种方案存在缺点,因为它要记录时间窗口内的所有行为记录,如果这个量特别大的时候,内存消耗会变得非常严重。比方说限定120秒内操作不得超过1000万这样的情况,会消耗大量的存储空间去维护计数器。接下来会介绍另外一种限流算法-漏斗限流,去解决这个缺点。

2. 漏斗限流

漏斗限流是另外一种常见的限流方法,这个算法灵感源自漏斗(funnel)的结构。

如图2所示:

图 2 漏斗算法

漏斗的容量有限,并且漏嘴的流率固定。如果漏斗停止灌水,那么若干时间后漏斗会变空。如果把漏嘴堵住,一直往里灌水,漏斗会变满直到溢出再也装不进去。如果漏嘴放开,等流走一部分水后,又可以往里面灌水。另外如果漏嘴流水速率大于灌水速率,那么漏斗永远不会满。如果漏嘴流水速率小于灌水速率,那么一旦漏斗满了,就需要暂停灌水等漏斗流出足够的空间后才能继续往里灌水。

因此,漏斗的剩余空间可以表示当前可以进行的行为数量,漏嘴流速表示系统能够处理该行为的频率。下面是用golang实现的一个完整示例代码:

package main

import (
	"time"
	"fmt"
	"github.com/go-redis/redis"
)

const (
	FAILED = iota
	SUCC
)

var redisdb *redis.Client

var initFunnelScript = `
	-- 分别初始化漏斗结构的4个字段capacity、left_quota、leaking_rate、leaking_time
	-- capacity:漏斗容量
	-- left_quota:漏斗剩余空间
	-- leaking_rate:漏嘴流水速率
	-- leaking_time:上一次漏水时间
	local key
	for i,j in ipairs(ARGV) 
	do if i%2 == 0
		then
			redis.pcall('hsetnx', KEYS[1], key, j)
		else
			key = j
		end
	end`

var initFunnelSha string

var makeSpaceScript = `
	local leaking_time = tonumber(redis.pcall('hget', KEYS[1], 'leaking_time'))
	local leaking_rate = tonumber(redis.pcall('hget', KEYS[1], 'leaking_rate'))
	local left_quota = tonumber(redis.pcall('hget', KEYS[1], 'left_quota'))
	local capacity = tonumber(redis.pcall('hget', KEYS[1], 'capacity'))
	local now = tonumber(ARGV[1])
	local delta_time = now - leaking_time -- 距离上一次漏水过去了多久
	local delta_quota = leaking_rate * delta_time -- 又可以腾出不少空间了
	
	redis.pcall('hset', KEYS[1], 'leaking_time', now) -- 记录漏水时间
	if delta_quota + left_quota >= capacity then -- 剩余空间不得高于容量
		redis.pcall('hset', KEYS[1], 'left_quota', capacity) 
	else 
		redis.pcall('hset', KEYS[1], 'left_quota', delta_quota + left_quota) -- 增加剩余空间
	end
`
var makeSpaceSha string

var wateringScript = `
	local left_quota = tonumber(redis.pcall('hget', KEYS[1], 'left_quota'))
	local quota = tonumber(ARGV[1])
	if left_quota >= quota then -- 判断剩余空间是否足够
		redis.pcall('hset', KEYS[1], 'left_quota', left_quota-quota) 
		return 1
	else
		return 0
	end
`

var wateringSha string

func init() {
	initRedisClient()
}

func initRedisClient() {
	redisdb = redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       0,
	})

	var err error
	initFunnelSha, err = redisdb.ScriptLoad(initFunnelScript).Result()
	if err != nil {
		panic(err)
	}

	makeSpaceSha, err = redisdb.ScriptLoad(makeSpaceScript).Result()
	if err != nil {
		panic(err)
	}

	wateringSha, err = redisdb.ScriptLoad(wateringScript).Result()
	if err != nil {
		panic(err)
	}
}

func MakeSpace(key string) {
	now := time.Now().Unix()
	redisdb.EvalSha(makeSpaceSha, []string{key}, now).Result()
}

// quota为每次处理请求所需要的资源配额
func Watering(key string, quota float64) bool {
	MakeSpace(key)
	res, err := redisdb.EvalSha(wateringSha, []string{key}, quota).Result()
	if err != nil {
		panic(err)
	}
	return res.(int64) == SUCC
}

func IsActionAllowed(uid, action string, capacity float64, leakingRate float64) bool {
	key := fmt.Sprintf("%v_%v", uid, action)
	redisdb.EvalSha(initFunnelSha, []string{key}, "capacity", capacity, "left_quota", capacity, "leaking_rate", leakingRate, "leaking_time", time.Now().Unix())
	return Watering(key, 1)
}

func main() {
	for i := 0; i < 20; i++ {
		fmt.Printf("%+v\n", IsActionAllowed("berryjam", "reply", 15, 0.5))
	}
}

MakeSpace方法是漏斗算法的核心,它在每次灌水前都会被调用以触发漏水,给漏斗腾出空间。所能够的腾出的空间为流速*漏斗上次漏水至今的时间。因此漏斗算法只用4个字段就可以完成流量控制,算法的空间复杂度为O(1),不再与行为的频率成正比。

同样地,上面的代码,没有直接用redis的hset存储这4个字段,然后hget出各个字段,接着在内存运算后再hset回去。还是因为在高并发的情况下,这样无法保证整个过程的原子性,所以还是使用3段lua脚本[2]来保证整个过程的原子执行。

最后,Redis4.0本身提供了一个限流redis模块,叫redis-cell[3]。该模块也使用了漏斗算法,并提供了原子的限流指令。限流问题会变得更加简单,感兴趣的读者可以点击文末的参考资料进一步了解。

3. 参考资料

[1] Redis Lua 脚本

[2] 使用lua脚本实现redis的hmsetnx命令,操作hash表时不覆盖原有数据

[3] redis-cell:在Redis中,作为单个命令,提供速率限制的Redis模块

最近的文章

fabric联盟链高并发场景下如何提高TPS

fabric高并发场景下如何提高TPSNote. 本文主要描述在高并发场景下fabric的TPS(每秒钟交易数量,这里指的是有效的交易,不包括无效交易)为什么变得很低,如何提高TPS以及不同的提高TPS方式的优缺点。提高TPS的方式根本方式是避免交易冲突,只是避免交易冲突的思路不同。避免冲突的方式可以分为2种类型,一种是使用高效的chaincode数据模型,完全避免交易发生冲突,但局限性比较大;一种是在依赖分布式锁或者MVCC等机制避免交易发生冲突,如果在chaincode执行过程中就能检...…

fabric hyperledger 区块链 高并发 TPS继续阅读
更早的文章

数据结构之Log Structured Merge Trees

Log Structured Merge Trees介绍Note. 本文主要翻译自Log Structured Merge Trees一文,在那篇文章里作者详细介绍了LSM这种写密集型高效存储数据结构,本文在此基础上补充了一些性能分析如时间复杂度、空间复杂度等内容和提出一些疑惑。希望能给各位读者抛砖引玉,本人水平有限请多指教。 1. B+树和Append Logs 2. LSM树 3. 具有层级压缩的LSM 4. 一些实现细节 ...…

数据结构 LSM 数据存储 BloomFilter继续阅读