diff --git a/pkg/flow/quota/assist.go b/pkg/flow/quota/assist.go index b14c7950..5fb446c9 100644 --- a/pkg/flow/quota/assist.go +++ b/pkg/flow/quota/assist.go @@ -241,7 +241,7 @@ func (f *FlowQuotaAssistant) GetQuota(commonRequest *data.CommonRateLimitRequest Code: model.QuotaResultOk, Info: Disabled, } - return model.QuotaFutureWithResponse(resp, nil), nil + return model.QuotaFutureWithResponse(resp), nil } windows, err := f.lookupRateLimitWindow(commonRequest) if err != nil { @@ -253,23 +253,26 @@ func (f *FlowQuotaAssistant) GetQuota(commonRequest *data.CommonRateLimitRequest Code: model.QuotaResultOk, Info: RuleNotExists, } - return model.QuotaFutureWithResponse(resp, nil), nil + return model.QuotaFutureWithResponse(resp), nil } var maxWaitMs int64 = 0 - var releaseFuncs = make([]func(), 0, len(windows)) + var releaseFuncs = make([]model.ReleaseFunc, 0, len(windows)) for _, window := range windows { window.Init() - quotaResult, releaseFunc := window.AllocateQuotaWithRelease(commonRequest) - if releaseFunc != nil { - releaseFuncs = append(releaseFuncs, releaseFunc) + quotaResult := window.AllocateQuota(commonRequest) + if quotaResult == nil { + continue + } + for i := range quotaResult.ReleaseFuncs { + releaseFuncs = append(releaseFuncs, quotaResult.ReleaseFuncs[i]) } // 触发限流,提前返回 if quotaResult.Code == model.QuotaResultLimited { // 先释放资源 for i := range releaseFuncs { - releaseFuncs[i]() + releaseFuncs[i](0) } - return model.QuotaFutureWithResponse(quotaResult, nil), nil + return model.QuotaFutureWithResponse(quotaResult), nil } // 未触发限流,记录令牌桶的最大排队时间 if quotaResult.WaitMs > maxWaitMs { @@ -277,9 +280,10 @@ func (f *FlowQuotaAssistant) GetQuota(commonRequest *data.CommonRateLimitRequest } } return model.QuotaFutureWithResponse(&model.QuotaResponse{ - Code: model.QuotaResultOk, - WaitMs: maxWaitMs, - }, releaseFuncs), nil + Code: model.QuotaResultOk, + WaitMs: maxWaitMs, + ReleaseFuncs: releaseFuncs, + }), nil } // lookupRateLimitWindow 计算限流窗口 diff --git a/pkg/flow/quota/window.go b/pkg/flow/quota/window.go index bfcca2d3..3a63c26d 100644 --- a/pkg/flow/quota/window.go +++ b/pkg/flow/quota/window.go @@ -608,15 +608,6 @@ func (r *RateLimitWindow) AllocateQuota(commonRequest *data.CommonRateLimitReque return r.trafficShapingBucket.GetQuota(curTimeMs, commonRequest.Token) } -// AllocateQuotaWithRelease 分配配额,并返回释放资源函数 -func (r *RateLimitWindow) AllocateQuotaWithRelease(commonRequest *data.CommonRateLimitRequest) (*model.QuotaResponse, func()) { - nowMilli := model.CurrentMillisecond() - atomic.StoreInt64(&r.lastAccessTimeMilli, nowMilli) - // 获取服务端时间 - curTimeMs := r.toServerTimeMilli(nowMilli) - return r.trafficShapingBucket.GetQuotaWithRelease(curTimeMs, commonRequest.Token) -} - // GetLastAccessTimeMilli 获取最近访问时间 func (r *RateLimitWindow) GetLastAccessTimeMilli() int64 { return atomic.LoadInt64(&r.lastAccessTimeMilli) diff --git a/pkg/model/quota.go b/pkg/model/quota.go index 7a210557..b0289885 100644 --- a/pkg/model/quota.go +++ b/pkg/model/quota.go @@ -156,6 +156,8 @@ const ( QuotaResultLimited QuotaResultCode = -1 ) +type ReleaseFunc func() + // QuotaResponse 配额查询应答. type QuotaResponse struct { // 配额分配的返回码 @@ -164,27 +166,27 @@ type QuotaResponse struct { Info string // 需要等待的时间段 WaitMs int64 + // 释放资源函数 + ReleaseFuncs []ReleaseFunc } // QuotaFutureImpl 异步获取配额的future. type QuotaFutureImpl struct { - resp *QuotaResponse - deadlineCtx context.Context - cancel context.CancelFunc - releaseFuncs []func() + resp *QuotaResponse + deadlineCtx context.Context + cancel context.CancelFunc } -func QuotaFutureWithResponse(resp *QuotaResponse, releaseFuncs []func()) *QuotaFutureImpl { +func QuotaFutureWithResponse(resp *QuotaResponse) *QuotaFutureImpl { var deadlineCtx context.Context var cancel context.CancelFunc if resp.WaitMs > 0 { deadlineCtx, cancel = context.WithTimeout(context.Background(), time.Duration(resp.WaitMs)*time.Millisecond) } return &QuotaFutureImpl{ - resp: resp, - deadlineCtx: deadlineCtx, - cancel: cancel, - releaseFuncs: releaseFuncs, + resp: resp, + deadlineCtx: deadlineCtx, + cancel: cancel, } } @@ -209,10 +211,12 @@ func (q *QuotaFutureImpl) Get() *QuotaResponse { return q.resp } -// Release 释放资源,仅用于并发数限流的场景. +// Release 释放资源,仅用于并发数限流/CPU限流场景 func (q *QuotaFutureImpl) Release() { - for i := range q.releaseFuncs { - q.releaseFuncs[i]() + if q.resp != nil { + for i := range q.resp.ReleaseFuncs { + q.resp.ReleaseFuncs[i]() + } } } diff --git a/pkg/plugin/ratelimiter/ratelimiter.go b/pkg/plugin/ratelimiter/ratelimiter.go index f2cdf5c6..be8e2f8b 100644 --- a/pkg/plugin/ratelimiter/ratelimiter.go +++ b/pkg/plugin/ratelimiter/ratelimiter.go @@ -35,8 +35,6 @@ type ServiceRateLimiter interface { type QuotaBucket interface { // GetQuota 在令牌桶/漏桶中进行单个配额的划扣,并返回本次分配的结果 GetQuota(curTimeMs int64, token uint32) *model.QuotaResponse - // GetQuotaWithRelease 判断限流结果,并返回配额释放函数(对并发数限流、CPU自适应限流有用) - GetQuotaWithRelease(curTimeMs int64, token uint32) (*model.QuotaResponse, func()) // Release 释放配额(仅对于并发数限流有用) Release() // OnRemoteUpdate 远程配额更新 diff --git a/plugin/ratelimiter/bbr/README.md b/plugin/ratelimiter/bbr/README.md index 7b34f2c0..c9ca9d07 100644 --- a/plugin/ratelimiter/bbr/README.md +++ b/plugin/ratelimiter/bbr/README.md @@ -12,9 +12,7 @@ BBR 的源码实现可参考: # 插件设计 -本插件将 BBR 限流器适配成 `QuotaBucket` 接口(主要实现 `GetQuotaWithRelease` 判断限流方法),以及 `ServiceRateLimiter` 接口(实现 `InitQuota` 初始化方法)。 - -由于 BBR 限流需要记录请求通过数、当前并发数、请求耗时,因此没有复用原来 `QuotaBucket` 接口中的 `GetQuota` 方法,而是新增了一个方法 `GetQuotaWithRelease`,该方法相比于 `GetQuota` 方法,返回参数中多了一个 `func()`,供业务方在业务逻辑处理完成后调用。 +本插件将 BBR 限流器适配成 `QuotaBucket` 接口(主要实现 `GetQuota` 判断限流方法),以及 `ServiceRateLimiter` 接口(实现 `InitQuota` 初始化方法)。 由于 CPU 使用率指标为实例单机指标,因此 CPU 限流只适用于单机限流,不适用于分布式限流,未实现分布式限流器需要实现的接口。 @@ -29,7 +27,7 @@ bucket: 桶数,BBR 会把 window 分成多个 bucket,沿时间轴向前滑 这三个入参,从 `apitraffic.Rule` 结构体中解析,直接使用了结构体中的 `MaxAmount`、`ValidDuration`、`Precision` 字段 -## 判断限流 GetQuotaWithRelease +## 判断限流 GetQuota 调用了 BBR 的 `Allow()` 方法 其内部执行 `shouldDrop()` 方法,其执行流程如下: diff --git a/plugin/ratelimiter/bbr/bucket.go b/plugin/ratelimiter/bbr/bucket.go index 03a6910a..5676b5c2 100644 --- a/plugin/ratelimiter/bbr/bucket.go +++ b/plugin/ratelimiter/bbr/bucket.go @@ -1,19 +1,32 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + package bbr import ( - "sort" - "github.com/polarismesh/polaris-go/pkg/model" "github.com/polarismesh/polaris-go/pkg/plugin/ratelimiter" "github.com/polarismesh/polaris-go/plugin/ratelimiter/bbr/core" + "sort" apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage" ) var ( - allowResp = &model.QuotaResponse{ - Code: model.QuotaResultOk, - } denyResp = &model.QuotaResponse{ Code: model.QuotaResultLimited, } @@ -26,20 +39,18 @@ type BBRQuotaBucket struct { // GetQuota 获取限额 func (b *BBRQuotaBucket) GetQuota(_ int64, _ uint32) *model.QuotaResponse { - return nil -} - -// GetQuotaWithRelease 判断是否限流,并返回释放资源函数 -func (b *BBRQuotaBucket) GetQuotaWithRelease(_ int64, _ uint32) (*model.QuotaResponse, func()) { release, allow := b.BBR.Allow() if allow { - return allowResp, release + return &model.QuotaResponse{ + Code: model.QuotaResultOk, + ReleaseFuncs: []model.ReleaseFunc{release}, + } } - return denyResp, nil + return denyResp } -// Release 释放资源 -func (b *BBRQuotaBucket) Release() { +// Release 释放配额(仅对于并发数限流有用) +func (l *BBRQuotaBucket) Release() { } diff --git a/plugin/ratelimiter/reject/bucket.go b/plugin/ratelimiter/reject/bucket.go index 95920466..f058d254 100644 --- a/plugin/ratelimiter/reject/bucket.go +++ b/plugin/ratelimiter/reject/bucket.go @@ -31,11 +31,6 @@ func (q *QuotaBucketReject) GetQuota(curTimeMs int64, token uint32) *model.Quota return q.bucket.Allocate(curTimeMs, token) } -// GetQuotaWithRelease 在令牌桶/漏桶中进行单个配额的划扣,并返回本次分配的结果 -func (q *QuotaBucketReject) GetQuotaWithRelease(curTimeMs int64, token uint32) (*model.QuotaResponse, func()) { - return q.bucket.Allocate(curTimeMs, token), nil -} - // Release 释放配额(仅对于并发数限流有用) func (q *QuotaBucketReject) Release() { q.bucket.Release() diff --git a/plugin/ratelimiter/unirate/bucket_leaky.go b/plugin/ratelimiter/unirate/bucket_leaky.go index 8b1edfca..1c1076c8 100644 --- a/plugin/ratelimiter/unirate/bucket_leaky.go +++ b/plugin/ratelimiter/unirate/bucket_leaky.go @@ -156,11 +156,6 @@ func (l *LeakyBucket) GetQuota(curTimeMs int64, token uint32) *model.QuotaRespon return l.allocateQuota() } -// GetQuotaWithRelease 在令牌桶/漏桶中进行单个配额的划扣,并返回本次分配的结果 -func (l *LeakyBucket) GetQuotaWithRelease(_ int64, _ uint32) (*model.QuotaResponse, func()) { - return l.allocateQuota(), nil -} - // Release 释放配额(仅对于并发数限流有用) func (l *LeakyBucket) Release() {