一、Go time/rate 限流器
1.1 简介
Go 在 x 标准库,即 golang.org/x/time/rate 里自带了一个限流器,这个限流器是基于令牌桶算法(token bucket)实现的。
在上一篇文章讲了几种限流算法,里面就有令牌桶算法,具体可以看上篇文章介绍。
1.2 rate/time 限流构造器
这个限流构造器就是生成 token,供后面使用。
Limiter struct 结构:
// https://github.com/golang/time/blob/master/rate/rate.go#L55// The methods AllowN, ReserveN, and WaitN consume n tokens.type Limiter struct{
mu sync.Mutex
limit Limit // 放入 token 的速率
burst int// 令牌桶限制最大值
tokens float64// 桶中令牌数// last is the last time the limiter's tokens field was updated
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
lastEvent time.Time
}
限流器构造方法:func NewLimiter(r Limit, b int) *Limiter:
- r :产生 token 的速率。默认是每秒中可以向桶中生产多少 token。也可以设置这个值,用方法 Every 设置 token 速率时间粒度。
- b :桶的容量,桶容纳 token 的最大数量。 b == 0,允许声明容量为 0 的值,这时拒绝所有请求;与 b== 0 情况相反,如果 r 为 inf 时,将允许所有请求,即使是 b == 0。
// Inf is the infinite rate limit; it allows all events (even if burst is zero). const Inf =Limit(math.MaxFloat64)
It implements a “token bucket” of size b, initially full and refilled at rate r tokens per second.
构造器一开始会为桶注入 b 个 token,然后每秒补充 r 个 token。
- 每秒生成 20 个 token,桶的容量为 5,代码为:
limiter :=NewLimiter(20,5)
- 200ms 生成 1 个 token
这时候不是秒为单位生成 token ,就可以使用 Every 方法设置生成 token 的速率:
limit :=Every(200* time.Millisecond)
limiter :=NewLimiter(limit,5)
1秒 = 200ms * 5,也就是每秒生成 5 个 token。
生成了 token 之后,请求获取 token,然后使用 token。
1.3 time/rate 有3种限流用法
time/rate 源码里注释,消费 n 个 tokens 的方法
// The methods AllowN, ReserveN, and WaitN consume n tokens.
- AllowN
- ReserveN
- WaitN
A. WaitN、Wait
WaitN / Wait 方法:
// https://pkg.go.dev/golang.org/x/time/rate#Limiter.WaitN// WaitN blocks until lim permits n events to happen.// It returns an error if n exceeds the Limiter's burst size, the Context is// canceled, or the expected wait time exceeds the Context's Deadline.// The burst limit is ignored if the rate limit is Inf.func(lim *Limiter)WaitN(ctx context.Context, n int)(err error)func(lim *Limiter)Wait(ctx context.Context)(err error)
WaitN : 当桶中的 token 数量小于 N 时,WaitN 方法将阻塞一段时间直到 token 满足条件或超时或取消(如果设置了context),超时或取消将返回error。如果 N 充足则直接返回。
Wait : 就是 WaitN 方法中参数 n 为 1 时,即:WaitN(ctx, 1)
。
方法里还有 Contex 参数,所以也可以设置 Deadline 或 Timeout,来决定 Wait 最长时间。比如下面代码片段:
ctx, cancel := context.WithTimeout(context.Background(), time.Second *5)defercancel()
err := limiter.WaitN(ctx,2)
例子1:
package main
import("context""fmt""time""golang.org/x/time/rate")funcmain(){
limit := rate.NewLimiter(3,5)// 每秒产生 3 个token,桶容量 5
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)defercancel()// 超时取消for i :=0;; i++{// 有多少令牌直接消耗掉
fmt.Printf("%03d %s\n", i, time.Now().Format("2006-01-02 15:04:05.000"))
err := limit.Wait(ctx)if err !=nil{// 超时取消 err != nil
fmt.Println("err: ", err.Error())return// 超时取消,退出 for}}}
分析:这里指定令牌桶大小为 5,每秒生成 3 个令牌。for 循环消耗令牌,产生多少令牌都会消耗掉。
从开始一直到 5 秒超时,计算令牌数,一开始初始化 NewLimiter 的 5 个 + 每秒 3 个令牌 * 5秒 ,总计 20 个令牌。运行程序输出看看:
$ go run .\waitdemo.go
0002022-05-1721:35:38.4000012022-05-1721:35:38.4250022022-05-1721:35:38.4250032022-05-1721:35:38.4250042022-05-1721:35:38.4250052022-05-1721:35:38.4250062022-05-1721:35:38.7730072022-05-1721:35:39.0960082022-05-1721:35:39.4360092022-05-1721:35:39.7640102022-05-1721:35:40.1060112022-05-1721:35:40.4340122022-05-1721:35:40.7620132022-05-1721:35:41.1040142022-05-1721:35:41.4300152022-05-1721:35:41.7590162022-05-1721:35:42.1040172022-05-1721:35:42.4290182022-05-1721:35:42.7730192022-05-1721:35:43.101
err: rate:Wait(n=1) would exceed context deadline
B: AllowN、Allow
AllowN / Allow 方法
// https://pkg.go.dev/golang.org/x/time/rate#Limiter.AllowN// AllowN reports whether n events may happen at time now.// Use this method if you intend to drop / skip events that exceed the rate limit.// Otherwise use Reserve or Wait.func(lim *Limiter)AllowN(now time.Time, n int)bool// Allow is shorthand for AllowN(time.Now(), 1).func(lim *Limiter)Allow()bool
AllowN :截止到某一时刻,桶中的 token 数量至少为 N 个,满足就返回 true,同时从桶中消费 n 个 token;反之返回 false,不消费 token。这个实际就是丢弃某些请求。
Allow :就是 AllowN 方法中参数 now 为现在时间,n 为 1,即AllowN(time.Now(), 1)
例子:
package main
import("fmt""net/http""time""golang.org/x/time/rate")funcmain(){
r := rate.Every(1* time.Millisecond)
limit := rate.NewLimiter(r,10)
http.HandleFunc("/",func(w http.ResponseWriter, r *http.Request){if limit.Allow(){
fmt.Printf("success,当前时间:%s\n", time.Now().Format("2006-01-02 15:04:05"))}else{
fmt.Printf("success,但是被限流了。。。\n")}})
fmt.Println("http start ... ")_= http.ListenAndServe(":8080",nil)}
然后你可以找一个 http 测试工具模拟用户压测下,比如 https://github.com/rakyll/hey 这个工具。测试命令:
hey -n 100 http://localhost:8080/
就可以看到输出的内容
… …
success,当前时间:2022-05-17 21:41:44
success,当前时间:2022-05-17 21:41:44
success,当前时间:2022-05-17 21:41:44
success,但是被限流了。。。
success,但是被限流了。。。
… …
例子2:
package main
import("fmt""time""golang.org/x/time/rate")funcmain(){
limit := rate.NewLimiter(1,3)for{if limit.AllowN(time.Now(),2){
fmt.Println(time.Now().Format("2006-01-02 15:04:05"))}else{
time.Sleep(time.Second *3)}}}
C:ReserveN、Reserve
ReserveN / Reserve 方法
// https://pkg.go.dev/golang.org/x/time/rate#Limiter.ReserveN// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. The Limiter takes this Reservation into account when allowing future events. The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size.func(lim *Limiter)ReserveN(now time.Time, n int)*Reservation
func(lim *Limiter)Reserve()*Reservation
func(r *Reservation)DelayFrom(now time.Time) time.Duration
func(r *Reservation)Delay() time.Duration
func(r *Reservation)OK()bool
其实上面的 WaitN 和 AllowN 都是基于 ReserveN 方法。具体可以去看看这 3 个方法的源码。
ReserveN :此方法返回 *Reservation 对象。你可以调用该对象的 Dealy 方法,获取延迟等待的时间。如果为 0,则不用等待。必须等到等待时间结束后才能进行下面的工作。
或者,如果不想等待,可以调用 Cancel 方法,该方法会将 Token 归还。
Reserve :就是 ReserveN 方法中参数 now 为现在时间,n 为 1,即AllowN(time.Now(), 1)
usage example:
// https://pkg.go.dev/golang.org/x/time/rate#Limiter.ReserveN
r := lim.ReserveN(time.Now(),1)if!r.OK(){// Not allowed to act! Did you remember to set lim.burst to be > 0 ?return}
time.Sleep(r.Delay())Act()// 执行相关逻辑
1.4 动态设置桶token容量和速率
SetBurstAt / SetBurst:
func(lim *Limiter)SetBurstAt(now time.Time, newBurst int)func(lim *Limiter)SetBurst(newBurst int)
SetBurstAt :设置到某时刻桶中 token 的容量
SetBurst:SetBurstAt(time.Now())
SetLimitAt / SetLimit
func(lim *Limiter)SetLimitAt(now time.Time, newLimit Limit)func(lim *Limiter)SetLimit(newLimit Limit)
SetLimitAt :设置某刻 token 的速率
SetLimit :设置 token 的速率
二、uber 的 rate limiter
2.1 简介
uber 的这个限流算法是
漏桶算法(leaky bucket)
- github.com/uber-go/ratelimit。
与令牌桶算法的区别:
- 漏桶算法流出的速率可以控制,流进桶中请求不能控制
- 令牌桶算法对于流入和流出的速度都是可以控制的,因为令牌可以自己生成。所以它还可以应对突发流量。突发流量生成 token 就快些。
- 令牌桶算法只要桶中有 token 就可以一直消费,漏桶是按照预定的间隔顺序进行消费的。
2.2 使用
官方的例子:
limit := ratelimit.New(100)// 每秒钟允许100个请求
prev := time.Now()for i :=0; i <10; i++{
now := limit.Take()
fmt.Println(i, now.Sub(prev))
prev = now
}
限流器每秒可以通过 100 个请求,平均每个间隔 10ms。
2.3 uber 对漏桶算法的改进
在传统的漏桶算法,每个请求间隔是固定的,然而在实际应用中,流量不是这么平均的,时而小时而大,对于这种情况,uber 对 leaky bucket 做了一点改进,引入 maxSlack 最大松弛量的概念。
举例子:比如 3 个请求,请求 1 完成,15ms后,请求 2 才到来,可以对 2 立即处理。请求 2 完成后,5ms后,请求 3 到来,这个请求距离上次请求不足 10ms,因此要等 5ms。
但是,对于这种情况,实际三个请求一共耗时 25ms 才完成,并不是预期的 20ms。
uber 的改进是:可以把之情请求间隔比较长的时间,匀给后面的请求使用,只要保证每秒请求数即可。
uber ratelimit 改进代码实现:
t.sleepFor += t.perRequest - now.Sub(t.last)if t.sleepFor >0{
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor =0}else{
t.last = now
}
把每个请求多余出来的等待时间累加起来,以给后面的抵消使用。
其他参数用法:
- WithoutSlack:
ratelimit 中引入最大松弛量,默认的最大松弛量为 10 个请求的间隔时间。
但是我不想用这个最大松弛量呢,就要限制请求的固定间隔时间,用 WithoutSlack 这个参数限制:
limit := ratelimit.New(100, ratelimit.WithoutSlack)
- WithClock(clock Clock):
ratelimit 中时间相关计算是用 go 的标准时间库 time,如果想要更高进度或特殊需求计算,可以用 WithClock 参数替换,实现 Clock 的 interface 就可以了
type Clock interface{Now() time.Time
Sleep(time.Duration)}
clock &= MyClock{}
limiter := ratelimit.New(100, ratelimit.WithClock(clock))
更多 ratelimit
三、其他限流器,算法库包和软件
- 滴滴的 tollbooth,http 限流中间件,有很多特性: - 1.基于IP,路径,方法,header,授权用户等限流- 2.通过使用
LimitByKeys()
组合你自己的中间件- 3.对于head项和基本auth能够设置TTL-过期时间- 4.拒绝后,可以使用以下 HTTP 头响应,比如X-Rate-Limit-Limit
The maximum request limit- 5.当限流达到上限,可以自定义消息和方法,返回信息- 6.它是基于 golang.org/x/time/rate 开发 - java 的 guava 限流 - ratelimiter
- 基于信号量限流 - https://github.com/golang/net/blob/master/netutil/listen.go
- sentinel-go 服务治理软件以及sentinel - https://github.com/alibaba/sentinel-golang- github.com/alibaba/Sentinel ,java编写的流控组件,服务治理
- 还有各种基于 nginx 的限流器,限流软件-服务网关,api gateway等
- java的服务治理软件 - github.com/alibaba/Sentinel Sentinel- Hystrix Hystrix- resilience4j resilience4j
四、参考
- https://github.com/golang/time
- pkg.go.dev/golang.org/x/time/rate
- https://zhuanlan.zhihu.com/p/100594314
- https://segmentfault.com/a/1190000023033365
- https://www.cyhone.com/articles/usage-of-golang-rate/
- https://www.cyhone.com/articles/analysis-of-uber-go-ratelimit
- uber ratelimit vs go time/rate demo
- uber-go ratelimit
版权归原作者 九卷技术录 所有, 如有侵权,请联系我们删除。