diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 3cbbed1b00f..6270ff28971 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/pkg/utils/typeutil" "go.uber.org/zap" ) @@ -50,7 +51,7 @@ const ( // ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store. type ResourceGroupKVInterceptor interface { // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. - OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, Delta, error) + OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, error) // OnResponse is used to consume tokens after receiving response OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error) } @@ -97,9 +98,6 @@ type ResourceGroupsController struct { calculators []ResourceCalculator - mutex sync.Mutex // used for the `resourceCounter` - resourceCounter map[string]*Counter // resourceGroupName -> counter - // When a signal is received, it means the number of available token is low. lowTokenNotifyChan chan struct{} // When a token bucket response received from server, it will be sent to the channel. @@ -117,17 +115,6 @@ type ResourceGroupsController struct { } } -type Counter struct { - storeCounter map[uint64]Delta // storeID -> delta - globalCounter Delta -} - -// Delta records resource usage on all stores between two adjacent requests on same store. -type Delta struct { - WriteBytes uint64 - CPUTime time.Duration -} - // NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor func NewResourceGroupController( ctx context.Context, @@ -151,7 +138,6 @@ func NewResourceGroupController( lowTokenNotifyChan: make(chan struct{}, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), - resourceCounter: make(map[string]*Counter), } for _, opt := range opts { opt(controller) @@ -444,66 +430,19 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. func (c *ResourceGroupsController) OnRequestWait( ctx context.Context, resourceGroupName string, info RequestInfo, -) (*rmpb.Consumption, Delta, error) { +) (*rmpb.Consumption, *rmpb.Consumption, error) { gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) if err != nil { failedRequestCounter.WithLabelValues(resourceGroupName).Inc() - return nil, Delta{}, err - } - consumption, err := gc.onRequestWait(ctx, info) - if err != nil { - return nil, Delta{}, err - } - - c.mutex.Lock() - defer c.mutex.Unlock() - m, ok := c.resourceCounter[resourceGroupName] - if !ok { - m = &Counter{ - storeCounter: make(map[uint64]Delta), - globalCounter: Delta{}, - } - c.resourceCounter[resourceGroupName] = m + return nil, nil, err } - - delta := Delta{} - if counter, exist := m.storeCounter[info.StoreID()]; exist { - delta.WriteBytes = m.globalCounter.WriteBytes - counter.WriteBytes - delta.CPUTime = m.globalCounter.CPUTime - counter.CPUTime - } - // More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty. - // So here resets it directly as failure is rare. - m.storeCounter[info.StoreID()] = m.globalCounter - - return consumption, delta, nil + return gc.onRequestWait(ctx, info) } // OnResponse is used to consume tokens after receiving response func (c *ResourceGroupsController) OnResponse( resourceGroupName string, req RequestInfo, resp ResponseInfo, ) (*rmpb.Consumption, error) { - // record resource delta - c.mutex.Lock() - if resp.Succeed() { - m, ok := c.resourceCounter[resourceGroupName] - if !ok { - m = &Counter{ - storeCounter: make(map[uint64]Delta), - globalCounter: Delta{}, - } - c.resourceCounter[resourceGroupName] = m - } - storeCounter := m.storeCounter[req.StoreID()] - if req.IsWrite() { - m.globalCounter.WriteBytes += req.WriteBytes() - storeCounter.WriteBytes += req.WriteBytes() - } - m.globalCounter.CPUTime += resp.KVCPU() - storeCounter.CPUTime += resp.KVCPU() - m.storeCounter[req.StoreID()] = storeCounter - } - c.mutex.Unlock() - tmp, ok := c.groupsController.Load(resourceGroupName) if !ok { log.Warn("[resource group controller] resource group name does not exist", zap.String("resourceGroupName", resourceGroupName)) @@ -527,7 +466,9 @@ type groupCostController struct { mu struct { sync.Mutex - consumption *rmpb.Consumption + consumption *rmpb.Consumption + storeCounter map[uint64]*rmpb.Consumption + globalCounter *rmpb.Consumption } // fast path to make once token limit with un-limit burst. @@ -635,6 +576,8 @@ func newGroupCostController( } gc.mu.consumption = &rmpb.Consumption{} + gc.mu.storeCounter = make(map[uint64]*rmpb.Consumption) + gc.mu.globalCounter = &rmpb.Consumption{} return gc, nil } @@ -1078,14 +1021,16 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 { func (gc *groupCostController) onRequestWait( ctx context.Context, info RequestInfo, -) (*rmpb.Consumption, error) { +) (*rmpb.Consumption, *rmpb.Consumption, error) { delta := &rmpb.Consumption{} for _, calc := range gc.calculators { calc.BeforeKVRequest(delta, info) } + gc.mu.Lock() add(gc.mu.consumption, delta) gc.mu.Unlock() + if !gc.burstable.Load() { var err error now := time.Now() @@ -1123,12 +1068,27 @@ func (gc *groupCostController) onRequestWait( gc.mu.Lock() sub(gc.mu.consumption, delta) gc.mu.Unlock() - return nil, err + return nil, nil, err } else { gc.successfulRequestDuration.Observe(d.Seconds()) } } - return delta, nil + + gc.mu.Lock() + defer gc.mu.Unlock() + // calculate the penalty of the store + var penalty *rmpb.Consumption + if storeCounter, exist := gc.mu.storeCounter[info.StoreID()]; exist { + penalty = typeutil.DeepClone(gc.mu.globalCounter, func() *rmpb.Consumption { return &rmpb.Consumption{} }) + sub(penalty, storeCounter) + } else { + penalty = &rmpb.Consumption{} + } + // More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty. + // So here resets it directly as failure is rare. + gc.mu.storeCounter[info.StoreID()] = typeutil.DeepClone(gc.mu.globalCounter, func() *rmpb.Consumption { return &rmpb.Consumption{} }) + + return delta, penalty, nil } func (gc *groupCostController) onResponse( @@ -1154,9 +1114,23 @@ func (gc *groupCostController) onResponse( } } } - gc.mu.Lock() - add(gc.mu.consumption, delta) - gc.mu.Unlock() + + { + gc.mu.Lock() + defer gc.mu.Unlock() + + // record the consumption of the request + add(gc.mu.consumption, delta) + + // record the consumption of the request by store + // As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest` + for _, calc := range gc.calculators { + calc.BeforeKVRequest(delta, req) + } + add(gc.mu.storeCounter[req.StoreID()], delta) + add(gc.mu.globalCounter, delta) + } + return delta, nil } diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index 3e985d9f649..989ca7f143a 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -517,56 +517,57 @@ func (suite *resourceManagerClientTestSuite) TestResourceDelta() { // init req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */) resp := controller.NewTestResponseInfo(0, time.Duration(30), true) - _, delta, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) - re.Equal(delta.WriteBytes, uint64(0)) - re.Equal(delta.CPUTime, time.Duration(0)) + re.Equal(penalty.WriteBytes, float64(0)) + re.Equal(penalty.TotalCpuTimeMs, 0.0) _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) req = controller.NewTestRequestInfo(true, 60, 1 /* store1 */) resp = controller.NewTestResponseInfo(0, time.Duration(10), true) - _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) - re.Equal(delta.WriteBytes, uint64(0)) - re.Equal(delta.CPUTime, time.Duration(0)) + re.Equal(penalty.WriteBytes, float64(0)) + re.Equal(penalty.TotalCpuTimeMs, 0.0) _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) - // failed request, shouldn't be counted in delta + // failed request, shouldn't be counted in penalty req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */) resp = controller.NewTestResponseInfo(0, time.Duration(0), false) - _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) - re.Equal(delta.WriteBytes, uint64(0)) - re.Equal(delta.CPUTime, time.Duration(0)) + re.Equal(penalty.WriteBytes, float64(0)) + re.Equal(penalty.TotalCpuTimeMs, 0.0) _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) // from same store, should be zero - req1 := controller.NewTestRequestInfo(true, 70, 1 /* store1 */) + req1 := controller.NewTestRequestInfo(false, 0, 1 /* store1 */) resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1) + _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1) re.NoError(err) - re.Equal(delta.WriteBytes, uint64(0)) + re.Equal(penalty.WriteBytes, float64(0)) _, err = c.OnResponse(resourceGroupName, req1, resp1) re.NoError(err) // from different store, should be non-zero req2 := controller.NewTestRequestInfo(true, 50, 2 /* store2 */) resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) + _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) re.NoError(err) - re.Equal(delta.WriteBytes, uint64(130)) + re.Equal(penalty.WriteBytes, float64(60)) + re.InEpsilon(penalty.TotalCpuTimeMs, 10.0 / 1000.0 / 1000.0, 1e-6) _, err = c.OnResponse(resourceGroupName, req2, resp2) re.NoError(err) // from new store, should be zero req3 := controller.NewTestRequestInfo(true, 0, 3 /* store3 */) resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3) + _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3) re.NoError(err) - re.Equal(delta.WriteBytes, uint64(0)) + re.Equal(penalty.WriteBytes, float64(0)) _, err = c.OnResponse(resourceGroupName, req3, resp3) re.NoError(err) @@ -574,9 +575,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceDelta() { resourceGroupName = suite.initGroups[2].Name req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */) resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) + _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) re.NoError(err) - re.Equal(delta.WriteBytes, uint64(0)) + re.Equal(penalty.WriteBytes, float64(0)) _, err = c.OnResponse(resourceGroupName, req4, resp4) re.NoError(err) }