From 106b5cd993bdb6b9c2b106b0a29f33c966224a72 Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Wed, 12 Apr 2023 18:06:37 +0800 Subject: [PATCH 01/17] record delta Signed-off-by: Connor1996 --- .../resource_group/controller/controller.go | 31 ++++++++++++++++--- client/resource_group/controller/model.go | 1 + 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index a43685b9f23..bac937cedd5 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -50,7 +50,7 @@ const ( // ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store. type ResourceGroupKVInterceptor interface { // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. - OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, error) + OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, uint64, error) // OnResponse is used to consume tokens after receiving response OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error) } @@ -97,6 +97,8 @@ type ResourceGroupsController struct { calculators []ResourceCalculator + requestDelta [string][uint64]uint64 // resourceGroupName -> storeID -> delta + // When a signal is received, it means the number of available token is low. lowTokenNotifyChan chan struct{} // When a token bucket response received from server, it will be sent to the channel. @@ -429,13 +431,34 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. func (c *ResourceGroupsController) OnRequestWait( ctx context.Context, resourceGroupName string, info RequestInfo, -) (*rmpb.Consumption, error) { +) (*rmpb.Consumption, uint64, error) { gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) if err != nil { failedRequestCounter.WithLabelValues(resourceGroupName).Inc() - return nil, err + return nil, 0, err } - return gc.onRequestWait(ctx, info) + con, err := gc.onRequestWait(ctx, info) + if err != nil { + return nil, 0, err + } + + v, ok := c.requestDelta[resourceGroupName]; + if !ok { + v = make(map[uint64]uint64) + c.requestDelta[resourceGroupName] = v + } + + // iter all stores + for id, delta := range v { + if info.StoreID() == id { + continue + } + delta += 1 + } + delta := v[info.StoreID()] + v[info.StoreID()] = 0 + + return con, delta, nil } // OnResponse is used to consume tokens after receiving response diff --git a/client/resource_group/controller/model.go b/client/resource_group/controller/model.go index 462022b719e..82e7fb2dd5f 100644 --- a/client/resource_group/controller/model.go +++ b/client/resource_group/controller/model.go @@ -35,6 +35,7 @@ type RequestUnit float64 type RequestInfo interface { IsWrite() bool WriteBytes() uint64 + StoreID() uint64 } // ResponseInfo is the interface of the response information provider. A response should be From 839665187346a5e02010fa72aa1c94fbc1a6bc2e Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Wed, 12 Apr 2023 18:29:26 +0800 Subject: [PATCH 02/17] fix build Signed-off-by: Connor1996 --- client/resource_group/controller/controller.go | 2 +- client/resource_group/controller/testutil.go | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index bac937cedd5..b4e8da0cffe 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -97,7 +97,7 @@ type ResourceGroupsController struct { calculators []ResourceCalculator - requestDelta [string][uint64]uint64 // resourceGroupName -> storeID -> delta + requestDelta map[string]map[uint64]uint64 // resourceGroupName -> storeID -> delta // When a signal is received, it means the number of available token is low. lowTokenNotifyChan chan struct{} diff --git a/client/resource_group/controller/testutil.go b/client/resource_group/controller/testutil.go index 8b510621a52..f2642987876 100644 --- a/client/resource_group/controller/testutil.go +++ b/client/resource_group/controller/testutil.go @@ -24,6 +24,7 @@ import "time" type TestRequestInfo struct { isWrite bool writeBytes uint64 + storeID uint64 } // NewTestRequestInfo creates a new TestRequestInfo. @@ -31,6 +32,7 @@ func NewTestRequestInfo(isWrite bool, writeBytes uint64) *TestRequestInfo { return &TestRequestInfo{ isWrite: isWrite, writeBytes: writeBytes, + storeID: 0, } } @@ -44,6 +46,10 @@ func (tri *TestRequestInfo) WriteBytes() uint64 { return tri.writeBytes } +func (tri *TestRequestInfo) StoreID() uint64 { + return tri.storeID +} + // TestResponseInfo is used to test the response info interface. type TestResponseInfo struct { readBytes uint64 From 59be299440848a2abe16307b28654dece8ed7a9d Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Thu, 13 Apr 2023 14:46:39 +0800 Subject: [PATCH 03/17] fix panic Signed-off-by: Connor1996 --- client/resource_group/controller/controller.go | 7 ++++--- client/resource_group/controller/testutil.go | 4 ++-- .../resource_manager/resource_manager_test.go | 18 +++++++++--------- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index b4e8da0cffe..e00cdfe294f 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -97,7 +97,7 @@ type ResourceGroupsController struct { calculators []ResourceCalculator - requestDelta map[string]map[uint64]uint64 // resourceGroupName -> storeID -> delta + requestDelta map[string]map[uint64]uint64 // resourceGroupName -> storeID -> delta // When a signal is received, it means the number of available token is low. lowTokenNotifyChan chan struct{} @@ -139,6 +139,7 @@ func NewResourceGroupController( lowTokenNotifyChan: make(chan struct{}, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), + requestDelta: make(map[string]map[uint64]uint64), } for _, opt := range opts { opt(controller) @@ -442,14 +443,14 @@ func (c *ResourceGroupsController) OnRequestWait( return nil, 0, err } - v, ok := c.requestDelta[resourceGroupName]; + v, ok := c.requestDelta[resourceGroupName] if !ok { v = make(map[uint64]uint64) c.requestDelta[resourceGroupName] = v } // iter all stores - for id, delta := range v { + for id, delta := range v { if info.StoreID() == id { continue } diff --git a/client/resource_group/controller/testutil.go b/client/resource_group/controller/testutil.go index f2642987876..832c3c3e743 100644 --- a/client/resource_group/controller/testutil.go +++ b/client/resource_group/controller/testutil.go @@ -24,7 +24,7 @@ import "time" type TestRequestInfo struct { isWrite bool writeBytes uint64 - storeID uint64 + storeID uint64 } // NewTestRequestInfo creates a new TestRequestInfo. @@ -32,7 +32,7 @@ func NewTestRequestInfo(isWrite bool, writeBytes uint64) *TestRequestInfo { return &TestRequestInfo{ isWrite: isWrite, writeBytes: writeBytes, - storeID: 0, + storeID: 0, } } diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index fb83211e596..cf9d0b4f2d3 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -340,9 +340,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { rres := cas.tcs[i].makeReadResponse() wres := cas.tcs[i].makeWriteResponse() startTime := time.Now() - _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) + _, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) re.NoError(err) - _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) + _, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) re.NoError(err) sum += time.Since(startTime) controller.OnResponse(cas.resourceGroupName, rreq, rres) @@ -408,9 +408,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { wreq := tcs.makeWriteRequest() rres := tcs.makeReadResponse() wres := tcs.makeWriteResponse() - _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) + _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) re.NoError(err) - _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) + _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) re.NoError(err) controller.OnResponse(resourceGroupName, rreq, rres) controller.OnResponse(resourceGroupName, wreq, wres) @@ -447,9 +447,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { rres := cas.tcs[i].makeReadResponse() wres := cas.tcs[i].makeWriteResponse() startTime := time.Now() - _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) + _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) re.NoError(err) - _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) + _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) re.NoError(err) sum += time.Since(startTime) controller.OnResponse(resourceGroupName, rreq, rres) @@ -467,14 +467,14 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { resourceGroupName2 := suite.initGroups[2].Name tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 100000, times: 1, waitDuration: 0} wreq := tcs.makeWriteRequest() - _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq) + _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq) re.NoError(err) re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/acceleratedSpeedTrend", "return(true)")) resourceGroupName3 := suite.initGroups[3].Name tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 1000, times: 1, waitDuration: 0} wreq = tcs.makeWriteRequest() - _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) + _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) re.NoError(err) time.Sleep(110 * time.Millisecond) tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 10, times: 1010, waitDuration: 0} @@ -482,7 +482,7 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { for i := 0; i < tcs.times; i++ { wreq = tcs.makeWriteRequest() startTime := time.Now() - _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) + _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) duration += time.Since(startTime) re.NoError(err) } From 5ca0ce9ff8dcdf5a25c4b7bfa362c16e855f4c8a Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Thu, 13 Apr 2023 16:03:45 +0800 Subject: [PATCH 04/17] add lock Signed-off-by: Connor1996 --- client/resource_group/controller/controller.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index e00cdfe294f..b0b43d6ab7e 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -97,6 +97,7 @@ type ResourceGroupsController struct { calculators []ResourceCalculator + mutex sync.Mutex requestDelta map[string]map[uint64]uint64 // resourceGroupName -> storeID -> delta // When a signal is received, it means the number of available token is low. @@ -443,6 +444,7 @@ func (c *ResourceGroupsController) OnRequestWait( return nil, 0, err } + c.mutex.Lock() v, ok := c.requestDelta[resourceGroupName] if !ok { v = make(map[uint64]uint64) @@ -458,6 +460,7 @@ func (c *ResourceGroupsController) OnRequestWait( } delta := v[info.StoreID()] v[info.StoreID()] = 0 + c.mutex.Unlock() return con, delta, nil } From 17de08f4de124787b0cd8a52720659abf70ba119 Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Fri, 14 Apr 2023 14:59:49 +0800 Subject: [PATCH 05/17] fix update Signed-off-by: Connor1996 --- client/resource_group/controller/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index b0b43d6ab7e..68d8e7da865 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -456,7 +456,7 @@ func (c *ResourceGroupsController) OnRequestWait( if info.StoreID() == id { continue } - delta += 1 + v[id] = delta + 1 } delta := v[info.StoreID()] v[info.StoreID()] = 0 From c530690e2987936bc5af9064c993882d784bda99 Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Tue, 18 Apr 2023 16:31:52 +0800 Subject: [PATCH 06/17] fine grained Signed-off-by: Connor1996 --- .../resource_group/controller/controller.go | 67 ++++++++++----- client/resource_group/controller/testutil.go | 5 +- .../resource_manager/resource_manager_test.go | 83 ++++++++++++++++++- 3 files changed, 128 insertions(+), 27 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 68d8e7da865..518e11f5814 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -50,7 +50,7 @@ const ( // ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store. type ResourceGroupKVInterceptor interface { // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. - OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, uint64, error) + OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, Delta, error) // OnResponse is used to consume tokens after receiving response OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error) } @@ -97,8 +97,8 @@ type ResourceGroupsController struct { calculators []ResourceCalculator - mutex sync.Mutex - requestDelta map[string]map[uint64]uint64 // resourceGroupName -> storeID -> delta + mutex sync.Mutex // used for the `resourceDelta` + resourceDelta map[string]map[uint64]Delta // resourceGroupName -> storeID -> delta // When a signal is received, it means the number of available token is low. lowTokenNotifyChan chan struct{} @@ -117,6 +117,12 @@ type ResourceGroupsController struct { } } +// Delta records resource usage on all stores between two adjacent requests on same store. +type Delta struct { + WriteBytes uint64 + CpuTime time.Duration +} + // NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor func NewResourceGroupController( ctx context.Context, @@ -140,7 +146,7 @@ func NewResourceGroupController( lowTokenNotifyChan: make(chan struct{}, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), - requestDelta: make(map[string]map[uint64]uint64), + resourceDelta: make(map[string]map[uint64]Delta), } for _, opt := range opts { opt(controller) @@ -433,42 +439,57 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. func (c *ResourceGroupsController) OnRequestWait( ctx context.Context, resourceGroupName string, info RequestInfo, -) (*rmpb.Consumption, uint64, error) { +) (*rmpb.Consumption, Delta, error) { gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) if err != nil { failedRequestCounter.WithLabelValues(resourceGroupName).Inc() - return nil, 0, err + return nil, Delta{}, err } - con, err := gc.onRequestWait(ctx, info) + consumption, err := gc.onRequestWait(ctx, info) if err != nil { - return nil, 0, err + return nil, Delta{}, err } c.mutex.Lock() - v, ok := c.requestDelta[resourceGroupName] + defer c.mutex.Unlock() + m, ok := c.resourceDelta[resourceGroupName] if !ok { - v = make(map[uint64]uint64) - c.requestDelta[resourceGroupName] = v + m = make(map[uint64]Delta) + c.resourceDelta[resourceGroupName] = m } + delta := m[info.StoreID()] + // More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty. + // So here resets it directly as failure is rare. + m[info.StoreID()] = Delta{} - // iter all stores - for id, delta := range v { - if info.StoreID() == id { - continue - } - v[id] = delta + 1 - } - delta := v[info.StoreID()] - v[info.StoreID()] = 0 - c.mutex.Unlock() - - return con, delta, nil + return consumption, delta, nil } // OnResponse is used to consume tokens after receiving response func (c *ResourceGroupsController) OnResponse( resourceGroupName string, req RequestInfo, resp ResponseInfo, ) (*rmpb.Consumption, error) { + // record resource delta + c.mutex.Lock() + if resp.Succeed() { + m, ok := c.resourceDelta[resourceGroupName] + if !ok { + m = make(map[uint64]Delta) + c.resourceDelta[resourceGroupName] = m + } + for id, delta := range m { + if req.StoreID() == id { + continue + } + if req.IsWrite() { + delta.WriteBytes += req.WriteBytes() + } + delta.CpuTime += resp.KVCPU() + m[id] = delta + } + } + c.mutex.Unlock() + tmp, ok := c.groupsController.Load(resourceGroupName) if !ok { log.Warn("[resource group controller] resource group name does not exist", zap.String("resourceGroupName", resourceGroupName)) diff --git a/client/resource_group/controller/testutil.go b/client/resource_group/controller/testutil.go index 832c3c3e743..a6a5dd040d6 100644 --- a/client/resource_group/controller/testutil.go +++ b/client/resource_group/controller/testutil.go @@ -28,11 +28,11 @@ type TestRequestInfo struct { } // NewTestRequestInfo creates a new TestRequestInfo. -func NewTestRequestInfo(isWrite bool, writeBytes uint64) *TestRequestInfo { +func NewTestRequestInfo(isWrite bool, writeBytes uint64, storeID uint64) *TestRequestInfo { return &TestRequestInfo{ isWrite: isWrite, writeBytes: writeBytes, - storeID: 0, + storeID: storeID, } } @@ -46,6 +46,7 @@ func (tri *TestRequestInfo) WriteBytes() uint64 { return tri.writeBytes } +// StoreID implements the RequestInfo interface. func (tri *TestRequestInfo) StoreID() uint64 { return tri.storeID } diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index cf9d0b4f2d3..141670f6fbb 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -258,11 +258,11 @@ type tokenConsumptionPerSecond struct { } func (t tokenConsumptionPerSecond) makeReadRequest() *controller.TestRequestInfo { - return controller.NewTestRequestInfo(false, 0) + return controller.NewTestRequestInfo(false, 0, 0) } func (t tokenConsumptionPerSecond) makeWriteRequest() *controller.TestRequestInfo { - return controller.NewTestRequestInfo(true, uint64(t.wruTokensAtATime-1)) + return controller.NewTestRequestInfo(true, uint64(t.wruTokensAtATime-1), 0) } func (t tokenConsumptionPerSecond) makeReadResponse() *controller.TestResponseInfo { @@ -493,6 +493,85 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { controller.Stop() } +func (suite *resourceManagerClientTestSuite) TestResourceDelta() { + re := suite.Require() + cli := suite.client + + for _, group := range suite.initGroups { + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + } + + cfg := &controller.RequestUnitConfig{ + ReadBaseCost: 1, + ReadCostPerByte: 1, + WriteBaseCost: 1, + WriteCostPerByte: 1, + CPUMsCost: 1, + } + c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace()) + c.Start(suite.ctx) + + resourceGroupName := suite.initGroups[1].Name + // init + req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */) + resp := controller.NewTestResponseInfo(0, time.Duration(30), true) + _, delta, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) + re.NoError(err) + re.Equal(delta.WriteBytes, uint64(0)) + re.Equal(delta.CpuTime, time.Duration(0)) + _, err = c.OnResponse(resourceGroupName, req, resp) + re.NoError(err) + + req = controller.NewTestRequestInfo(true, 60, 1 /* store1 */) + resp = controller.NewTestResponseInfo(0, time.Duration(10), true) + _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) + re.NoError(err) + re.Equal(delta.WriteBytes, uint64(0)) + re.Equal(delta.CpuTime, time.Duration(0)) + _, err = c.OnResponse(resourceGroupName, req, resp) + re.NoError(err) + + // failed request, shouldn't be counted in delta + req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */) + resp = controller.NewTestResponseInfo(0, time.Duration(0), false) + _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) + re.NoError(err) + re.Equal(delta.WriteBytes, uint64(0)) + re.Equal(delta.CpuTime, time.Duration(0)) + _, err = c.OnResponse(resourceGroupName, req, resp) + re.NoError(err) + + // from same store, should be zero + req1 := controller.NewTestRequestInfo(true, 70, 1 /* store1 */) + resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true) + _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1) + re.NoError(err) + re.Equal(delta.WriteBytes, uint64(0)) + _, err = c.OnResponse(resourceGroupName, req1, resp1) + re.NoError(err) + + // from different store, should be non-zero + req2 := controller.NewTestRequestInfo(true, 50, 2 /* store2 */) + resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true) + _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) + re.NoError(err) + re.Equal(delta.WriteBytes, uint64(130)) + _, err = c.OnResponse(resourceGroupName, req2, resp2) + re.NoError(err) + + // from different group, should be zero + resourceGroupName = suite.initGroups[2].Name + req3 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */) + resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true) + _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3) + re.NoError(err) + re.Equal(delta.WriteBytes, uint64(0)) + _, err = c.OnResponse(resourceGroupName, req3, resp3) + re.NoError(err) +} + func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { re := suite.Require() cli := suite.client From 1858403cd0c4cc0e047351f9d789082feb7fa5e2 Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Tue, 18 Apr 2023 19:44:42 +0800 Subject: [PATCH 07/17] address comment Signed-off-by: Connor1996 --- .../resource_group/controller/controller.go | 48 +++++++++++-------- .../resource_manager/resource_manager_test.go | 20 ++++---- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 518e11f5814..10d6482a683 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -97,8 +97,8 @@ type ResourceGroupsController struct { calculators []ResourceCalculator - mutex sync.Mutex // used for the `resourceDelta` - resourceDelta map[string]map[uint64]Delta // resourceGroupName -> storeID -> delta + mutex sync.Mutex // used for the `resourceCounter` + resourceCounter map[string]Counter // resourceGroupName -> counter // When a signal is received, it means the number of available token is low. lowTokenNotifyChan chan struct{} @@ -117,6 +117,11 @@ type ResourceGroupsController struct { } } +type Counter struct { + storeCounter map[uint64]Delta // storeID -> delta + globalCounter Delta +} + // Delta records resource usage on all stores between two adjacent requests on same store. type Delta struct { WriteBytes uint64 @@ -146,7 +151,7 @@ func NewResourceGroupController( lowTokenNotifyChan: make(chan struct{}, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), - resourceDelta: make(map[string]map[uint64]Delta), + resourceCounter: make(map[string]Counter), } for _, opt := range opts { opt(controller) @@ -452,15 +457,21 @@ func (c *ResourceGroupsController) OnRequestWait( c.mutex.Lock() defer c.mutex.Unlock() - m, ok := c.resourceDelta[resourceGroupName] + m, ok := c.resourceCounter[resourceGroupName] if !ok { - m = make(map[uint64]Delta) - c.resourceDelta[resourceGroupName] = m + m = Counter{ + storeCounter: make(map[uint64]Delta), + globalCounter: Delta{}, + } + c.resourceCounter[resourceGroupName] = m + } + delta := Delta{ + WriteBytes: m.globalCounter.WriteBytes - m.storeCounter[info.StoreID()].WriteBytes, + CpuTime: m.globalCounter.CpuTime - m.storeCounter[info.StoreID()].CpuTime, } - delta := m[info.StoreID()] // More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty. // So here resets it directly as failure is rare. - m[info.StoreID()] = Delta{} + m.storeCounter[info.StoreID()] = m.globalCounter return consumption, delta, nil } @@ -472,21 +483,18 @@ func (c *ResourceGroupsController) OnResponse( // record resource delta c.mutex.Lock() if resp.Succeed() { - m, ok := c.resourceDelta[resourceGroupName] + m, ok := c.resourceCounter[resourceGroupName] if !ok { - m = make(map[uint64]Delta) - c.resourceDelta[resourceGroupName] = m - } - for id, delta := range m { - if req.StoreID() == id { - continue + m = Counter{ + storeCounter: make(map[uint64]Delta), + globalCounter: Delta{}, } - if req.IsWrite() { - delta.WriteBytes += req.WriteBytes() - } - delta.CpuTime += resp.KVCPU() - m[id] = delta + c.resourceCounter[resourceGroupName] = m + } + if req.IsWrite() { + m.globalCounter.WriteBytes += req.WriteBytes() } + m.globalCounter.CpuTime += resp.KVCPU() } c.mutex.Unlock() diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index 141670f6fbb..4e06c234b6c 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -514,8 +514,8 @@ func (suite *resourceManagerClientTestSuite) TestResourceDelta() { c.Start(suite.ctx) resourceGroupName := suite.initGroups[1].Name - // init - req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */) + // init + req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */) resp := controller.NewTestResponseInfo(0, time.Duration(30), true) _, delta, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) @@ -524,7 +524,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceDelta() { _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) - req = controller.NewTestRequestInfo(true, 60, 1 /* store1 */) + req = controller.NewTestRequestInfo(true, 60, 1 /* store1 */) resp = controller.NewTestResponseInfo(0, time.Duration(10), true) _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) @@ -534,7 +534,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceDelta() { re.NoError(err) // failed request, shouldn't be counted in delta - req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */) + req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */) resp = controller.NewTestResponseInfo(0, time.Duration(0), false) _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) @@ -542,28 +542,28 @@ func (suite *resourceManagerClientTestSuite) TestResourceDelta() { re.Equal(delta.CpuTime, time.Duration(0)) _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) - + // from same store, should be zero - req1 := controller.NewTestRequestInfo(true, 70, 1 /* store1 */) + req1 := controller.NewTestRequestInfo(true, 70, 1 /* store1 */) resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true) _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1) re.NoError(err) re.Equal(delta.WriteBytes, uint64(0)) _, err = c.OnResponse(resourceGroupName, req1, resp1) re.NoError(err) - + // from different store, should be non-zero - req2 := controller.NewTestRequestInfo(true, 50, 2 /* store2 */) + req2 := controller.NewTestRequestInfo(true, 50, 2 /* store2 */) resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true) _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) re.NoError(err) re.Equal(delta.WriteBytes, uint64(130)) _, err = c.OnResponse(resourceGroupName, req2, resp2) re.NoError(err) - + // from different group, should be zero resourceGroupName = suite.initGroups[2].Name - req3 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */) + req3 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */) resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true) _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3) re.NoError(err) From ee162b6a6049a5e437de0b898297b07d80d5f0d7 Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Tue, 18 Apr 2023 20:39:17 +0800 Subject: [PATCH 08/17] fix and add test Signed-off-by: Connor1996 --- .../resource_group/controller/controller.go | 20 ++++++++++++------- .../resource_manager/resource_manager_test.go | 15 +++++++++++--- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 10d6482a683..40b610c029c 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -98,7 +98,7 @@ type ResourceGroupsController struct { calculators []ResourceCalculator mutex sync.Mutex // used for the `resourceCounter` - resourceCounter map[string]Counter // resourceGroupName -> counter + resourceCounter map[string]*Counter // resourceGroupName -> counter // When a signal is received, it means the number of available token is low. lowTokenNotifyChan chan struct{} @@ -151,7 +151,7 @@ func NewResourceGroupController( lowTokenNotifyChan: make(chan struct{}, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), - resourceCounter: make(map[string]Counter), + resourceCounter: make(map[string]*Counter), } for _, opt := range opts { opt(controller) @@ -459,15 +459,17 @@ func (c *ResourceGroupsController) OnRequestWait( defer c.mutex.Unlock() m, ok := c.resourceCounter[resourceGroupName] if !ok { - m = Counter{ + m = &Counter{ storeCounter: make(map[uint64]Delta), globalCounter: Delta{}, } c.resourceCounter[resourceGroupName] = m } - delta := Delta{ - WriteBytes: m.globalCounter.WriteBytes - m.storeCounter[info.StoreID()].WriteBytes, - CpuTime: m.globalCounter.CpuTime - m.storeCounter[info.StoreID()].CpuTime, + + delta := Delta{} + if counter, exist := m.storeCounter[info.StoreID()]; exist { + delta.WriteBytes = m.globalCounter.WriteBytes - counter.WriteBytes + delta.CpuTime= m.globalCounter.CpuTime - counter.CpuTime } // More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty. // So here resets it directly as failure is rare. @@ -485,16 +487,20 @@ func (c *ResourceGroupsController) OnResponse( if resp.Succeed() { m, ok := c.resourceCounter[resourceGroupName] if !ok { - m = Counter{ + m = &Counter{ storeCounter: make(map[uint64]Delta), globalCounter: Delta{}, } c.resourceCounter[resourceGroupName] = m } + storeCounter :=m.storeCounter[req.StoreID()] if req.IsWrite() { m.globalCounter.WriteBytes += req.WriteBytes() + storeCounter.WriteBytes += req.WriteBytes() } m.globalCounter.CpuTime += resp.KVCPU() + storeCounter.CpuTime += resp.KVCPU() + m.storeCounter[req.StoreID()] = storeCounter } c.mutex.Unlock() diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index 4e06c234b6c..349747f30ee 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -561,15 +561,24 @@ func (suite *resourceManagerClientTestSuite) TestResourceDelta() { _, err = c.OnResponse(resourceGroupName, req2, resp2) re.NoError(err) - // from different group, should be zero - resourceGroupName = suite.initGroups[2].Name - req3 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */) + // from new store, should be zero + req3 := controller.NewTestRequestInfo(true, 0, 3 /* store3 */) resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true) _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3) re.NoError(err) re.Equal(delta.WriteBytes, uint64(0)) _, err = c.OnResponse(resourceGroupName, req3, resp3) re.NoError(err) + + // from different group, should be zero + resourceGroupName = suite.initGroups[2].Name + req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */) + resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true) + _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) + re.NoError(err) + re.Equal(delta.WriteBytes, uint64(0)) + _, err = c.OnResponse(resourceGroupName, req4, resp4) + re.NoError(err) } func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { From 9ff587bda8f4c2dbc0189cc877bbb9445b0d08a5 Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Tue, 18 Apr 2023 20:54:30 +0800 Subject: [PATCH 09/17] make format Signed-off-by: Connor1996 --- client/resource_group/controller/controller.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 40b610c029c..d480b5b838b 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -97,7 +97,7 @@ type ResourceGroupsController struct { calculators []ResourceCalculator - mutex sync.Mutex // used for the `resourceCounter` + mutex sync.Mutex // used for the `resourceCounter` resourceCounter map[string]*Counter // resourceGroupName -> counter // When a signal is received, it means the number of available token is low. @@ -469,7 +469,7 @@ func (c *ResourceGroupsController) OnRequestWait( delta := Delta{} if counter, exist := m.storeCounter[info.StoreID()]; exist { delta.WriteBytes = m.globalCounter.WriteBytes - counter.WriteBytes - delta.CpuTime= m.globalCounter.CpuTime - counter.CpuTime + delta.CpuTime = m.globalCounter.CpuTime - counter.CpuTime } // More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty. // So here resets it directly as failure is rare. @@ -493,7 +493,7 @@ func (c *ResourceGroupsController) OnResponse( } c.resourceCounter[resourceGroupName] = m } - storeCounter :=m.storeCounter[req.StoreID()] + storeCounter := m.storeCounter[req.StoreID()] if req.IsWrite() { m.globalCounter.WriteBytes += req.WriteBytes() storeCounter.WriteBytes += req.WriteBytes() From dfcac87de1fdfe944f0238007de8b04792863ad6 Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Wed, 19 Apr 2023 11:12:52 +0800 Subject: [PATCH 10/17] fix style Signed-off-by: Connor1996 --- client/resource_group/controller/controller.go | 8 ++++---- .../mcs/resource_manager/resource_manager_test.go | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index d480b5b838b..3cbbed1b00f 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -125,7 +125,7 @@ type Counter struct { // Delta records resource usage on all stores between two adjacent requests on same store. type Delta struct { WriteBytes uint64 - CpuTime time.Duration + CPUTime time.Duration } // NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor @@ -469,7 +469,7 @@ func (c *ResourceGroupsController) OnRequestWait( delta := Delta{} if counter, exist := m.storeCounter[info.StoreID()]; exist { delta.WriteBytes = m.globalCounter.WriteBytes - counter.WriteBytes - delta.CpuTime = m.globalCounter.CpuTime - counter.CpuTime + delta.CPUTime = m.globalCounter.CPUTime - counter.CPUTime } // More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty. // So here resets it directly as failure is rare. @@ -498,8 +498,8 @@ func (c *ResourceGroupsController) OnResponse( m.globalCounter.WriteBytes += req.WriteBytes() storeCounter.WriteBytes += req.WriteBytes() } - m.globalCounter.CpuTime += resp.KVCPU() - storeCounter.CpuTime += resp.KVCPU() + m.globalCounter.CPUTime += resp.KVCPU() + storeCounter.CPUTime += resp.KVCPU() m.storeCounter[req.StoreID()] = storeCounter } c.mutex.Unlock() diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index 349747f30ee..3e985d9f649 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -520,7 +520,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceDelta() { _, delta, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) re.Equal(delta.WriteBytes, uint64(0)) - re.Equal(delta.CpuTime, time.Duration(0)) + re.Equal(delta.CPUTime, time.Duration(0)) _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) @@ -529,7 +529,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceDelta() { _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) re.Equal(delta.WriteBytes, uint64(0)) - re.Equal(delta.CpuTime, time.Duration(0)) + re.Equal(delta.CPUTime, time.Duration(0)) _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) @@ -539,7 +539,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceDelta() { _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) re.Equal(delta.WriteBytes, uint64(0)) - re.Equal(delta.CpuTime, time.Duration(0)) + re.Equal(delta.CPUTime, time.Duration(0)) _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) From 3c74095709c57b6a67ad3581934b1828c049cec7 Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Wed, 19 Apr 2023 14:35:20 +0800 Subject: [PATCH 11/17] refactor Signed-off-by: Connor1996 --- .../resource_group/controller/controller.go | 120 +++++++----------- .../resource_manager/resource_manager_test.go | 39 +++--- 2 files changed, 67 insertions(+), 92 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 3cbbed1b00f..6270ff28971 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/pkg/utils/typeutil" "go.uber.org/zap" ) @@ -50,7 +51,7 @@ const ( // ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store. type ResourceGroupKVInterceptor interface { // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. - OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, Delta, error) + OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, error) // OnResponse is used to consume tokens after receiving response OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error) } @@ -97,9 +98,6 @@ type ResourceGroupsController struct { calculators []ResourceCalculator - mutex sync.Mutex // used for the `resourceCounter` - resourceCounter map[string]*Counter // resourceGroupName -> counter - // When a signal is received, it means the number of available token is low. lowTokenNotifyChan chan struct{} // When a token bucket response received from server, it will be sent to the channel. @@ -117,17 +115,6 @@ type ResourceGroupsController struct { } } -type Counter struct { - storeCounter map[uint64]Delta // storeID -> delta - globalCounter Delta -} - -// Delta records resource usage on all stores between two adjacent requests on same store. -type Delta struct { - WriteBytes uint64 - CPUTime time.Duration -} - // NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor func NewResourceGroupController( ctx context.Context, @@ -151,7 +138,6 @@ func NewResourceGroupController( lowTokenNotifyChan: make(chan struct{}, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), - resourceCounter: make(map[string]*Counter), } for _, opt := range opts { opt(controller) @@ -444,66 +430,19 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. func (c *ResourceGroupsController) OnRequestWait( ctx context.Context, resourceGroupName string, info RequestInfo, -) (*rmpb.Consumption, Delta, error) { +) (*rmpb.Consumption, *rmpb.Consumption, error) { gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) if err != nil { failedRequestCounter.WithLabelValues(resourceGroupName).Inc() - return nil, Delta{}, err - } - consumption, err := gc.onRequestWait(ctx, info) - if err != nil { - return nil, Delta{}, err - } - - c.mutex.Lock() - defer c.mutex.Unlock() - m, ok := c.resourceCounter[resourceGroupName] - if !ok { - m = &Counter{ - storeCounter: make(map[uint64]Delta), - globalCounter: Delta{}, - } - c.resourceCounter[resourceGroupName] = m + return nil, nil, err } - - delta := Delta{} - if counter, exist := m.storeCounter[info.StoreID()]; exist { - delta.WriteBytes = m.globalCounter.WriteBytes - counter.WriteBytes - delta.CPUTime = m.globalCounter.CPUTime - counter.CPUTime - } - // More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty. - // So here resets it directly as failure is rare. - m.storeCounter[info.StoreID()] = m.globalCounter - - return consumption, delta, nil + return gc.onRequestWait(ctx, info) } // OnResponse is used to consume tokens after receiving response func (c *ResourceGroupsController) OnResponse( resourceGroupName string, req RequestInfo, resp ResponseInfo, ) (*rmpb.Consumption, error) { - // record resource delta - c.mutex.Lock() - if resp.Succeed() { - m, ok := c.resourceCounter[resourceGroupName] - if !ok { - m = &Counter{ - storeCounter: make(map[uint64]Delta), - globalCounter: Delta{}, - } - c.resourceCounter[resourceGroupName] = m - } - storeCounter := m.storeCounter[req.StoreID()] - if req.IsWrite() { - m.globalCounter.WriteBytes += req.WriteBytes() - storeCounter.WriteBytes += req.WriteBytes() - } - m.globalCounter.CPUTime += resp.KVCPU() - storeCounter.CPUTime += resp.KVCPU() - m.storeCounter[req.StoreID()] = storeCounter - } - c.mutex.Unlock() - tmp, ok := c.groupsController.Load(resourceGroupName) if !ok { log.Warn("[resource group controller] resource group name does not exist", zap.String("resourceGroupName", resourceGroupName)) @@ -527,7 +466,9 @@ type groupCostController struct { mu struct { sync.Mutex - consumption *rmpb.Consumption + consumption *rmpb.Consumption + storeCounter map[uint64]*rmpb.Consumption + globalCounter *rmpb.Consumption } // fast path to make once token limit with un-limit burst. @@ -635,6 +576,8 @@ func newGroupCostController( } gc.mu.consumption = &rmpb.Consumption{} + gc.mu.storeCounter = make(map[uint64]*rmpb.Consumption) + gc.mu.globalCounter = &rmpb.Consumption{} return gc, nil } @@ -1078,14 +1021,16 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 { func (gc *groupCostController) onRequestWait( ctx context.Context, info RequestInfo, -) (*rmpb.Consumption, error) { +) (*rmpb.Consumption, *rmpb.Consumption, error) { delta := &rmpb.Consumption{} for _, calc := range gc.calculators { calc.BeforeKVRequest(delta, info) } + gc.mu.Lock() add(gc.mu.consumption, delta) gc.mu.Unlock() + if !gc.burstable.Load() { var err error now := time.Now() @@ -1123,12 +1068,27 @@ func (gc *groupCostController) onRequestWait( gc.mu.Lock() sub(gc.mu.consumption, delta) gc.mu.Unlock() - return nil, err + return nil, nil, err } else { gc.successfulRequestDuration.Observe(d.Seconds()) } } - return delta, nil + + gc.mu.Lock() + defer gc.mu.Unlock() + // calculate the penalty of the store + var penalty *rmpb.Consumption + if storeCounter, exist := gc.mu.storeCounter[info.StoreID()]; exist { + penalty = typeutil.DeepClone(gc.mu.globalCounter, func() *rmpb.Consumption { return &rmpb.Consumption{} }) + sub(penalty, storeCounter) + } else { + penalty = &rmpb.Consumption{} + } + // More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty. + // So here resets it directly as failure is rare. + gc.mu.storeCounter[info.StoreID()] = typeutil.DeepClone(gc.mu.globalCounter, func() *rmpb.Consumption { return &rmpb.Consumption{} }) + + return delta, penalty, nil } func (gc *groupCostController) onResponse( @@ -1154,9 +1114,23 @@ func (gc *groupCostController) onResponse( } } } - gc.mu.Lock() - add(gc.mu.consumption, delta) - gc.mu.Unlock() + + { + gc.mu.Lock() + defer gc.mu.Unlock() + + // record the consumption of the request + add(gc.mu.consumption, delta) + + // record the consumption of the request by store + // As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest` + for _, calc := range gc.calculators { + calc.BeforeKVRequest(delta, req) + } + add(gc.mu.storeCounter[req.StoreID()], delta) + add(gc.mu.globalCounter, delta) + } + return delta, nil } diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index 3e985d9f649..989ca7f143a 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -517,56 +517,57 @@ func (suite *resourceManagerClientTestSuite) TestResourceDelta() { // init req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */) resp := controller.NewTestResponseInfo(0, time.Duration(30), true) - _, delta, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) - re.Equal(delta.WriteBytes, uint64(0)) - re.Equal(delta.CPUTime, time.Duration(0)) + re.Equal(penalty.WriteBytes, float64(0)) + re.Equal(penalty.TotalCpuTimeMs, 0.0) _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) req = controller.NewTestRequestInfo(true, 60, 1 /* store1 */) resp = controller.NewTestResponseInfo(0, time.Duration(10), true) - _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) - re.Equal(delta.WriteBytes, uint64(0)) - re.Equal(delta.CPUTime, time.Duration(0)) + re.Equal(penalty.WriteBytes, float64(0)) + re.Equal(penalty.TotalCpuTimeMs, 0.0) _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) - // failed request, shouldn't be counted in delta + // failed request, shouldn't be counted in penalty req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */) resp = controller.NewTestResponseInfo(0, time.Duration(0), false) - _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) - re.Equal(delta.WriteBytes, uint64(0)) - re.Equal(delta.CPUTime, time.Duration(0)) + re.Equal(penalty.WriteBytes, float64(0)) + re.Equal(penalty.TotalCpuTimeMs, 0.0) _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) // from same store, should be zero - req1 := controller.NewTestRequestInfo(true, 70, 1 /* store1 */) + req1 := controller.NewTestRequestInfo(false, 0, 1 /* store1 */) resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1) + _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1) re.NoError(err) - re.Equal(delta.WriteBytes, uint64(0)) + re.Equal(penalty.WriteBytes, float64(0)) _, err = c.OnResponse(resourceGroupName, req1, resp1) re.NoError(err) // from different store, should be non-zero req2 := controller.NewTestRequestInfo(true, 50, 2 /* store2 */) resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) + _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) re.NoError(err) - re.Equal(delta.WriteBytes, uint64(130)) + re.Equal(penalty.WriteBytes, float64(60)) + re.InEpsilon(penalty.TotalCpuTimeMs, 10.0 / 1000.0 / 1000.0, 1e-6) _, err = c.OnResponse(resourceGroupName, req2, resp2) re.NoError(err) // from new store, should be zero req3 := controller.NewTestRequestInfo(true, 0, 3 /* store3 */) resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3) + _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3) re.NoError(err) - re.Equal(delta.WriteBytes, uint64(0)) + re.Equal(penalty.WriteBytes, float64(0)) _, err = c.OnResponse(resourceGroupName, req3, resp3) re.NoError(err) @@ -574,9 +575,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceDelta() { resourceGroupName = suite.initGroups[2].Name req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */) resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) + _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) re.NoError(err) - re.Equal(delta.WriteBytes, uint64(0)) + re.Equal(penalty.WriteBytes, float64(0)) _, err = c.OnResponse(resourceGroupName, req4, resp4) re.NoError(err) } From d36e37ca0a625394fe11d0979b15d7daf52d7bbc Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Wed, 19 Apr 2023 15:04:48 +0800 Subject: [PATCH 12/17] address comment Signed-off-by: Connor1996 --- client/resource_group/controller/controller.go | 9 ++++----- .../mcs/resource_manager/resource_manager_test.go | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 6270ff28971..f56c58513e1 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -29,7 +29,6 @@ import ( "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" - "github.com/tikv/pd/pkg/utils/typeutil" "go.uber.org/zap" ) @@ -1077,16 +1076,16 @@ func (gc *groupCostController) onRequestWait( gc.mu.Lock() defer gc.mu.Unlock() // calculate the penalty of the store - var penalty *rmpb.Consumption + penalty := &rmpb.Consumption{} if storeCounter, exist := gc.mu.storeCounter[info.StoreID()]; exist { - penalty = typeutil.DeepClone(gc.mu.globalCounter, func() *rmpb.Consumption { return &rmpb.Consumption{} }) + *penalty = *gc.mu.globalCounter sub(penalty, storeCounter) } else { - penalty = &rmpb.Consumption{} + gc.mu.storeCounter[info.StoreID()] = &rmpb.Consumption{} } // More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty. // So here resets it directly as failure is rare. - gc.mu.storeCounter[info.StoreID()] = typeutil.DeepClone(gc.mu.globalCounter, func() *rmpb.Consumption { return &rmpb.Consumption{} }) + *gc.mu.storeCounter[info.StoreID()] = *gc.mu.globalCounter return delta, penalty, nil } diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index 989ca7f143a..95699adea2b 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -493,7 +493,7 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { controller.Stop() } -func (suite *resourceManagerClientTestSuite) TestResourceDelta() { +func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { re := suite.Require() cli := suite.client From 33961e592840c9dc4aa7ac0738a117114bbf7dca Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Wed, 19 Apr 2023 15:09:07 +0800 Subject: [PATCH 13/17] clean Signed-off-by: Connor1996 --- .../resource_group/controller/controller.go | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index f56c58513e1..6c911c78362 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -1074,7 +1074,6 @@ func (gc *groupCostController) onRequestWait( } gc.mu.Lock() - defer gc.mu.Unlock() // calculate the penalty of the store penalty := &rmpb.Consumption{} if storeCounter, exist := gc.mu.storeCounter[info.StoreID()]; exist { @@ -1086,6 +1085,7 @@ func (gc *groupCostController) onRequestWait( // More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty. // So here resets it directly as failure is rare. *gc.mu.storeCounter[info.StoreID()] = *gc.mu.globalCounter + gc.mu.Unlock() return delta, penalty, nil } @@ -1114,21 +1114,17 @@ func (gc *groupCostController) onResponse( } } - { - gc.mu.Lock() - defer gc.mu.Unlock() - - // record the consumption of the request - add(gc.mu.consumption, delta) - - // record the consumption of the request by store - // As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest` - for _, calc := range gc.calculators { - calc.BeforeKVRequest(delta, req) - } - add(gc.mu.storeCounter[req.StoreID()], delta) - add(gc.mu.globalCounter, delta) + gc.mu.Lock() + // record the consumption of the request + add(gc.mu.consumption, delta) + // record the consumption of the request by store + // As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest` + for _, calc := range gc.calculators { + calc.BeforeKVRequest(delta, req) } + add(gc.mu.storeCounter[req.StoreID()], delta) + add(gc.mu.globalCounter, delta) + gc.mu.Unlock() return delta, nil } From 7e7b15f8e6a0300070ebc60f8ab4026ab68ca9f2 Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Wed, 19 Apr 2023 15:14:45 +0800 Subject: [PATCH 14/17] make format Signed-off-by: Connor1996 --- .../mcs/resource_manager/resource_manager_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index 95699adea2b..9dc36cc68b9 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -533,7 +533,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) - // failed request, shouldn't be counted in penalty + // failed request, shouldn't be counted in penalty req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */) resp = controller.NewTestResponseInfo(0, time.Duration(0), false) _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) @@ -558,7 +558,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) re.NoError(err) re.Equal(penalty.WriteBytes, float64(60)) - re.InEpsilon(penalty.TotalCpuTimeMs, 10.0 / 1000.0 / 1000.0, 1e-6) + re.InEpsilon(penalty.TotalCpuTimeMs, 10.0/1000.0/1000.0, 1e-6) _, err = c.OnResponse(resourceGroupName, req2, resp2) re.NoError(err) From 4ee343e9b3979ea7c08833319e36991ce359edff Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Wed, 19 Apr 2023 15:24:06 +0800 Subject: [PATCH 15/17] fix test build Signed-off-by: Connor1996 --- client/resource_group/controller/controller_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 5bde4bca2c1..d3b2a29c211 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -97,7 +97,7 @@ func TestRequestAndResponseConsumption(t *testing.T) { kvCalculator := gc.getKVCalculator() for idx, testCase := range testCases { caseNum := fmt.Sprintf("case %d", idx) - consumption, err := gc.onRequestWait(context.TODO(), testCase.req) + consumption, _, err := gc.onRequestWait(context.TODO(), testCase.req) re.NoError(err, caseNum) expectedConsumption := &rmpb.Consumption{} if testCase.req.IsWrite() { From 69e76bd0ab4ce5da597af8933d46382b0297529f Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Wed, 19 Apr 2023 16:14:35 +0800 Subject: [PATCH 16/17] tear down test Signed-off-by: Connor1996 --- .../mcs/resource_manager/resource_manager_test.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index 9dc36cc68b9..634c919e437 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -149,6 +149,10 @@ func (suite *resourceManagerClientTestSuite) TearDownSuite() { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/resource_manager/server/enableDegradedMode")) } +func (suite *resourceManagerClientTestSuite) TearDownTest() { + suite.cleanupResourceGroups() +} + func (suite *resourceManagerClientTestSuite) cleanupResourceGroups() { cli := suite.client groups, err := cli.ListResourceGroups(suite.ctx) @@ -356,7 +360,6 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { break } } - suite.cleanupResourceGroups() controller.Stop() } @@ -489,7 +492,6 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { re.Less(duration, 100*time.Millisecond) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/acceleratedReportingPeriod")) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/acceleratedSpeedTrend")) - suite.cleanupResourceGroups() controller.Stop() } @@ -646,7 +648,6 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { checkFunc(gresp, groups[0]) } re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/resource_manager/server/fastPersist")) - suite.cleanupResourceGroups() } func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { @@ -902,9 +903,6 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientFailover() re.NotNil(getResp) re.Equal(group.RUSettings.RU.Settings.FillRate, getResp.RUSettings.RU.Settings.FillRate) } - - // Cleanup the resource group. - suite.cleanupResourceGroups() } func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMode() { @@ -963,7 +961,6 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo controller.Stop() re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/resource_manager/server/acquireFailed")) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/degradedModeRU")) - suite.cleanupResourceGroups() } func (suite *resourceManagerClientTestSuite) TestLoadRequestUnitConfig() { @@ -1045,5 +1042,4 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/fastCleanup")) controller.Stop() - suite.cleanupResourceGroups() } From 161cc6f49fc0dcff83b9000a97000982e99e8732 Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Wed, 19 Apr 2023 17:16:07 +0800 Subject: [PATCH 17/17] fix delta Signed-off-by: Connor1996 --- client/resource_group/controller/controller.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 6c911c78362..1b4bfd7ecf5 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -1074,7 +1074,7 @@ func (gc *groupCostController) onRequestWait( } gc.mu.Lock() - // calculate the penalty of the store + // Calculate the penalty of the store penalty := &rmpb.Consumption{} if storeCounter, exist := gc.mu.storeCounter[info.StoreID()]; exist { *penalty = *gc.mu.globalCounter @@ -1115,15 +1115,17 @@ func (gc *groupCostController) onResponse( } gc.mu.Lock() - // record the consumption of the request + // Record the consumption of the request add(gc.mu.consumption, delta) - // record the consumption of the request by store + // Record the consumption of the request by store + count := &rmpb.Consumption{} + *count = *delta // As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest` for _, calc := range gc.calculators { - calc.BeforeKVRequest(delta, req) + calc.BeforeKVRequest(count, req) } - add(gc.mu.storeCounter[req.StoreID()], delta) - add(gc.mu.globalCounter, delta) + add(gc.mu.storeCounter[req.StoreID()], count) + add(gc.mu.globalCounter, count) gc.mu.Unlock() return delta, nil