Skip to content

Commit

Permalink
feat: 将 release 函数移动到 QuotaResponse 结构体中
Browse files Browse the repository at this point in the history
  • Loading branch information
WTIFS committed Nov 24, 2023
1 parent ca989d3 commit 9f1b7de
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 62 deletions.
26 changes: 15 additions & 11 deletions pkg/flow/quota/assist.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (f *FlowQuotaAssistant) GetQuota(commonRequest *data.CommonRateLimitRequest
Code: model.QuotaResultOk,
Info: Disabled,
}
return model.QuotaFutureWithResponse(resp, nil), nil
return model.QuotaFutureWithResponse(resp), nil
}
windows, err := f.lookupRateLimitWindow(commonRequest)
if err != nil {
Expand All @@ -253,33 +253,37 @@ func (f *FlowQuotaAssistant) GetQuota(commonRequest *data.CommonRateLimitRequest
Code: model.QuotaResultOk,
Info: RuleNotExists,
}
return model.QuotaFutureWithResponse(resp, nil), nil
return model.QuotaFutureWithResponse(resp), nil
}
var maxWaitMs int64 = 0
var releaseFuncs = make([]func(), 0, len(windows))
var releaseFuncs = make([]model.ReleaseFunc, 0, len(windows))
for _, window := range windows {
window.Init()
quotaResult, releaseFunc := window.AllocateQuotaWithRelease(commonRequest)
if releaseFunc != nil {
releaseFuncs = append(releaseFuncs, releaseFunc)
quotaResult := window.AllocateQuota(commonRequest)
if quotaResult == nil {
continue
}
for i := range quotaResult.ReleaseFuncs {
releaseFuncs = append(releaseFuncs, quotaResult.ReleaseFuncs[i])
}
// 触发限流,提前返回
if quotaResult.Code == model.QuotaResultLimited {
// 先释放资源
for i := range releaseFuncs {
releaseFuncs[i]()
releaseFuncs[i](0)
}
return model.QuotaFutureWithResponse(quotaResult, nil), nil
return model.QuotaFutureWithResponse(quotaResult), nil
}
// 未触发限流,记录令牌桶的最大排队时间
if quotaResult.WaitMs > maxWaitMs {
maxWaitMs = quotaResult.WaitMs
}
}
return model.QuotaFutureWithResponse(&model.QuotaResponse{
Code: model.QuotaResultOk,
WaitMs: maxWaitMs,
}, releaseFuncs), nil
Code: model.QuotaResultOk,
WaitMs: maxWaitMs,
ReleaseFuncs: releaseFuncs,
}), nil
}

// lookupRateLimitWindow 计算限流窗口
Expand Down
9 changes: 0 additions & 9 deletions pkg/flow/quota/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,15 +608,6 @@ func (r *RateLimitWindow) AllocateQuota(commonRequest *data.CommonRateLimitReque
return r.trafficShapingBucket.GetQuota(curTimeMs, commonRequest.Token)
}

// AllocateQuotaWithRelease 分配配额,并返回释放资源函数
func (r *RateLimitWindow) AllocateQuotaWithRelease(commonRequest *data.CommonRateLimitRequest) (*model.QuotaResponse, func()) {
nowMilli := model.CurrentMillisecond()
atomic.StoreInt64(&r.lastAccessTimeMilli, nowMilli)
// 获取服务端时间
curTimeMs := r.toServerTimeMilli(nowMilli)
return r.trafficShapingBucket.GetQuotaWithRelease(curTimeMs, commonRequest.Token)
}

// GetLastAccessTimeMilli 获取最近访问时间
func (r *RateLimitWindow) GetLastAccessTimeMilli() int64 {
return atomic.LoadInt64(&r.lastAccessTimeMilli)
Expand Down
28 changes: 16 additions & 12 deletions pkg/model/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ const (
QuotaResultLimited QuotaResultCode = -1
)

type ReleaseFunc func()

// QuotaResponse 配额查询应答.
type QuotaResponse struct {
// 配额分配的返回码
Expand All @@ -164,27 +166,27 @@ type QuotaResponse struct {
Info string
// 需要等待的时间段
WaitMs int64
// 释放资源函数
ReleaseFuncs []ReleaseFunc
}

// QuotaFutureImpl 异步获取配额的future.
type QuotaFutureImpl struct {
resp *QuotaResponse
deadlineCtx context.Context
cancel context.CancelFunc
releaseFuncs []func()
resp *QuotaResponse
deadlineCtx context.Context
cancel context.CancelFunc
}

func QuotaFutureWithResponse(resp *QuotaResponse, releaseFuncs []func()) *QuotaFutureImpl {
func QuotaFutureWithResponse(resp *QuotaResponse) *QuotaFutureImpl {
var deadlineCtx context.Context
var cancel context.CancelFunc
if resp.WaitMs > 0 {
deadlineCtx, cancel = context.WithTimeout(context.Background(), time.Duration(resp.WaitMs)*time.Millisecond)
}
return &QuotaFutureImpl{
resp: resp,
deadlineCtx: deadlineCtx,
cancel: cancel,
releaseFuncs: releaseFuncs,
resp: resp,
deadlineCtx: deadlineCtx,
cancel: cancel,
}
}

Expand All @@ -209,10 +211,12 @@ func (q *QuotaFutureImpl) Get() *QuotaResponse {
return q.resp
}

// Release 释放资源,仅用于并发数限流的场景.
// Release 释放资源,仅用于并发数限流/CPU限流场景
func (q *QuotaFutureImpl) Release() {
for i := range q.releaseFuncs {
q.releaseFuncs[i]()
if q.resp != nil {
for i := range q.resp.ReleaseFuncs {
q.resp.ReleaseFuncs[i]()
}
}
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/plugin/ratelimiter/ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ type ServiceRateLimiter interface {
type QuotaBucket interface {
// GetQuota 在令牌桶/漏桶中进行单个配额的划扣,并返回本次分配的结果
GetQuota(curTimeMs int64, token uint32) *model.QuotaResponse
// GetQuotaWithRelease 判断限流结果,并返回配额释放函数(对并发数限流、CPU自适应限流有用)
GetQuotaWithRelease(curTimeMs int64, token uint32) (*model.QuotaResponse, func())
// Release 释放配额(仅对于并发数限流有用)
Release()
// OnRemoteUpdate 远程配额更新
Expand Down
6 changes: 2 additions & 4 deletions plugin/ratelimiter/bbr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ BBR 的源码实现可参考:


# 插件设计
本插件将 BBR 限流器适配成 `QuotaBucket` 接口(主要实现 `GetQuotaWithRelease` 判断限流方法),以及 `ServiceRateLimiter` 接口(实现 `InitQuota` 初始化方法)。

由于 BBR 限流需要记录请求通过数、当前并发数、请求耗时,因此没有复用原来 `QuotaBucket` 接口中的 `GetQuota` 方法,而是新增了一个方法 `GetQuotaWithRelease`,该方法相比于 `GetQuota` 方法,返回参数中多了一个 `func()`,供业务方在业务逻辑处理完成后调用。
本插件将 BBR 限流器适配成 `QuotaBucket` 接口(主要实现 `GetQuota` 判断限流方法),以及 `ServiceRateLimiter` 接口(实现 `InitQuota` 初始化方法)。

由于 CPU 使用率指标为实例单机指标,因此 CPU 限流只适用于单机限流,不适用于分布式限流,未实现分布式限流器需要实现的接口。

Expand All @@ -29,7 +27,7 @@ bucket: 桶数,BBR 会把 window 分成多个 bucket,沿时间轴向前滑
这三个入参,从 `apitraffic.Rule` 结构体中解析,直接使用了结构体中的 `MaxAmount``ValidDuration``Precision` 字段


## 判断限流 GetQuotaWithRelease
## 判断限流 GetQuota
调用了 BBR 的 `Allow()` 方法

其内部执行 `shouldDrop()` 方法,其执行流程如下:
Expand Down
39 changes: 25 additions & 14 deletions plugin/ratelimiter/bbr/bucket.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,32 @@
/**
* Tencent is pleased to support the open source community by making polaris-go available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package bbr

import (
"sort"

"github.com/polarismesh/polaris-go/pkg/model"
"github.com/polarismesh/polaris-go/pkg/plugin/ratelimiter"
"github.com/polarismesh/polaris-go/plugin/ratelimiter/bbr/core"
"sort"

apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
)

var (
allowResp = &model.QuotaResponse{
Code: model.QuotaResultOk,
}
denyResp = &model.QuotaResponse{
Code: model.QuotaResultLimited,
}
Expand All @@ -26,20 +39,18 @@ type BBRQuotaBucket struct {

// GetQuota 获取限额
func (b *BBRQuotaBucket) GetQuota(_ int64, _ uint32) *model.QuotaResponse {
return nil
}

// GetQuotaWithRelease 判断是否限流,并返回释放资源函数
func (b *BBRQuotaBucket) GetQuotaWithRelease(_ int64, _ uint32) (*model.QuotaResponse, func()) {
release, allow := b.BBR.Allow()
if allow {
return allowResp, release
return &model.QuotaResponse{
Code: model.QuotaResultOk,
ReleaseFuncs: []model.ReleaseFunc{release},
}
}
return denyResp, nil
return denyResp
}

// Release 释放资源
func (b *BBRQuotaBucket) Release() {
// Release 释放配额(仅对于并发数限流有用)
func (l *BBRQuotaBucket) Release() {

}

Expand Down
5 changes: 0 additions & 5 deletions plugin/ratelimiter/reject/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ func (q *QuotaBucketReject) GetQuota(curTimeMs int64, token uint32) *model.Quota
return q.bucket.Allocate(curTimeMs, token)
}

// GetQuotaWithRelease 在令牌桶/漏桶中进行单个配额的划扣,并返回本次分配的结果
func (q *QuotaBucketReject) GetQuotaWithRelease(curTimeMs int64, token uint32) (*model.QuotaResponse, func()) {
return q.bucket.Allocate(curTimeMs, token), nil
}

// Release 释放配额(仅对于并发数限流有用)
func (q *QuotaBucketReject) Release() {
q.bucket.Release()
Expand Down
5 changes: 0 additions & 5 deletions plugin/ratelimiter/unirate/bucket_leaky.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,6 @@ func (l *LeakyBucket) GetQuota(curTimeMs int64, token uint32) *model.QuotaRespon
return l.allocateQuota()
}

// GetQuotaWithRelease 在令牌桶/漏桶中进行单个配额的划扣,并返回本次分配的结果
func (l *LeakyBucket) GetQuotaWithRelease(_ int64, _ uint32) (*model.QuotaResponse, func()) {
return l.allocateQuota(), nil
}

// Release 释放配额(仅对于并发数限流有用)
func (l *LeakyBucket) Release() {

Expand Down

0 comments on commit 9f1b7de

Please sign in to comment.