Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: Count resource penalty for resource control #6336

Merged
merged 18 commits into from
Apr 19, 2023
45 changes: 38 additions & 7 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
// ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store.
type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, error)
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, error)
// OnResponse is used to consume tokens after receiving response
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
}
Expand Down Expand Up @@ -429,11 +429,11 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
func (c *ResourceGroupsController) OnRequestWait(
ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, error) {
) (*rmpb.Consumption, *rmpb.Consumption, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return nil, err
return nil, nil, err
}
return gc.onRequestWait(ctx, info)
}
Expand Down Expand Up @@ -465,7 +465,9 @@ type groupCostController struct {

mu struct {
sync.Mutex
consumption *rmpb.Consumption
consumption *rmpb.Consumption
storeCounter map[uint64]*rmpb.Consumption
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How add adding a lock? The logic for storeCounter and globalCounter is independent of consumption

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but they almost use the lock at the same time

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This a hot path. I guess two locks will help get better performance. But maybe it's not the bottleneck currently

globalCounter *rmpb.Consumption
}

// fast path to make once token limit with un-limit burst.
Expand Down Expand Up @@ -573,6 +575,8 @@ func newGroupCostController(
}

gc.mu.consumption = &rmpb.Consumption{}
gc.mu.storeCounter = make(map[uint64]*rmpb.Consumption)
gc.mu.globalCounter = &rmpb.Consumption{}
return gc, nil
}

Expand Down Expand Up @@ -1016,14 +1020,16 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 {

func (gc *groupCostController) onRequestWait(
ctx context.Context, info RequestInfo,
) (*rmpb.Consumption, error) {
) (*rmpb.Consumption, *rmpb.Consumption, error) {
delta := &rmpb.Consumption{}
for _, calc := range gc.calculators {
calc.BeforeKVRequest(delta, info)
}

gc.mu.Lock()
add(gc.mu.consumption, delta)
gc.mu.Unlock()

if !gc.burstable.Load() {
var err error
now := time.Now()
Expand Down Expand Up @@ -1061,12 +1067,27 @@ func (gc *groupCostController) onRequestWait(
gc.mu.Lock()
sub(gc.mu.consumption, delta)
gc.mu.Unlock()
return nil, err
return nil, nil, err
} else {
gc.successfulRequestDuration.Observe(d.Seconds())
}
}
return delta, nil

gc.mu.Lock()
// calculate the penalty of the store
penalty := &rmpb.Consumption{}
if storeCounter, exist := gc.mu.storeCounter[info.StoreID()]; exist {
*penalty = *gc.mu.globalCounter
sub(penalty, storeCounter)
} else {
gc.mu.storeCounter[info.StoreID()] = &rmpb.Consumption{}
}
// More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty.
// So here resets it directly as failure is rare.
*gc.mu.storeCounter[info.StoreID()] = *gc.mu.globalCounter
gc.mu.Unlock()

return delta, penalty, nil
}

func (gc *groupCostController) onResponse(
Expand All @@ -1092,9 +1113,19 @@ func (gc *groupCostController) onResponse(
}
}
}

gc.mu.Lock()
// record the consumption of the request
add(gc.mu.consumption, delta)
// record the consumption of the request by store
// As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest`
for _, calc := range gc.calculators {
calc.BeforeKVRequest(delta, req)
Connor1996 marked this conversation as resolved.
Show resolved Hide resolved
}
add(gc.mu.storeCounter[req.StoreID()], delta)
add(gc.mu.globalCounter, delta)
gc.mu.Unlock()

return delta, nil
}

Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestRequestAndResponseConsumption(t *testing.T) {
kvCalculator := gc.getKVCalculator()
for idx, testCase := range testCases {
caseNum := fmt.Sprintf("case %d", idx)
consumption, err := gc.onRequestWait(context.TODO(), testCase.req)
consumption, _, err := gc.onRequestWait(context.TODO(), testCase.req)
re.NoError(err, caseNum)
expectedConsumption := &rmpb.Consumption{}
if testCase.req.IsWrite() {
Expand Down
1 change: 1 addition & 0 deletions client/resource_group/controller/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type RequestUnit float64
type RequestInfo interface {
IsWrite() bool
WriteBytes() uint64
StoreID() uint64
}

// ResponseInfo is the interface of the response information provider. A response should be
Expand Down
9 changes: 8 additions & 1 deletion client/resource_group/controller/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import "time"
type TestRequestInfo struct {
isWrite bool
writeBytes uint64
storeID uint64
}

// NewTestRequestInfo creates a new TestRequestInfo.
func NewTestRequestInfo(isWrite bool, writeBytes uint64) *TestRequestInfo {
func NewTestRequestInfo(isWrite bool, writeBytes uint64, storeID uint64) *TestRequestInfo {
return &TestRequestInfo{
isWrite: isWrite,
writeBytes: writeBytes,
storeID: storeID,
}
}

Expand All @@ -44,6 +46,11 @@ func (tri *TestRequestInfo) WriteBytes() uint64 {
return tri.writeBytes
}

// StoreID implements the RequestInfo interface.
func (tri *TestRequestInfo) StoreID() uint64 {
return tri.storeID
}

// TestResponseInfo is used to test the response info interface.
type TestResponseInfo struct {
readBytes uint64
Expand Down
123 changes: 104 additions & 19 deletions tests/integrations/mcs/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func (suite *resourceManagerClientTestSuite) TearDownSuite() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/resource_manager/server/enableDegradedMode"))
}

func (suite *resourceManagerClientTestSuite) TearDownTest() {
suite.cleanupResourceGroups()
}

func (suite *resourceManagerClientTestSuite) cleanupResourceGroups() {
cli := suite.client
groups, err := cli.ListResourceGroups(suite.ctx)
Expand Down Expand Up @@ -258,11 +262,11 @@ type tokenConsumptionPerSecond struct {
}

func (t tokenConsumptionPerSecond) makeReadRequest() *controller.TestRequestInfo {
return controller.NewTestRequestInfo(false, 0)
return controller.NewTestRequestInfo(false, 0, 0)
}

func (t tokenConsumptionPerSecond) makeWriteRequest() *controller.TestRequestInfo {
return controller.NewTestRequestInfo(true, uint64(t.wruTokensAtATime-1))
return controller.NewTestRequestInfo(true, uint64(t.wruTokensAtATime-1), 0)
}

func (t tokenConsumptionPerSecond) makeReadResponse() *controller.TestResponseInfo {
Expand Down Expand Up @@ -340,9 +344,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
rres := cas.tcs[i].makeReadResponse()
wres := cas.tcs[i].makeWriteResponse()
startTime := time.Now()
_, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
_, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
re.NoError(err)
_, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
re.NoError(err)
sum += time.Since(startTime)
controller.OnResponse(cas.resourceGroupName, rreq, rres)
Expand All @@ -356,7 +360,6 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
break
}
}
suite.cleanupResourceGroups()
controller.Stop()
}

Expand Down Expand Up @@ -408,9 +411,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
wreq := tcs.makeWriteRequest()
rres := tcs.makeReadResponse()
wres := tcs.makeWriteResponse()
_, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
_, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
re.NoError(err)
_, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
re.NoError(err)
controller.OnResponse(resourceGroupName, rreq, rres)
controller.OnResponse(resourceGroupName, wreq, wres)
Expand Down Expand Up @@ -447,9 +450,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
rres := cas.tcs[i].makeReadResponse()
wres := cas.tcs[i].makeWriteResponse()
startTime := time.Now()
_, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
_, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
re.NoError(err)
_, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
re.NoError(err)
sum += time.Since(startTime)
controller.OnResponse(resourceGroupName, rreq, rres)
Expand All @@ -467,32 +470,120 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
resourceGroupName2 := suite.initGroups[2].Name
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 100000, times: 1, waitDuration: 0}
wreq := tcs.makeWriteRequest()
_, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq)
_, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq)
re.NoError(err)

re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/acceleratedSpeedTrend", "return(true)"))
resourceGroupName3 := suite.initGroups[3].Name
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 1000, times: 1, waitDuration: 0}
wreq = tcs.makeWriteRequest()
_, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
re.NoError(err)
time.Sleep(110 * time.Millisecond)
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 10, times: 1010, waitDuration: 0}
duration := time.Duration(0)
for i := 0; i < tcs.times; i++ {
wreq = tcs.makeWriteRequest()
startTime := time.Now()
_, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
duration += time.Since(startTime)
re.NoError(err)
}
re.Less(duration, 100*time.Millisecond)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/acceleratedReportingPeriod"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/acceleratedSpeedTrend"))
suite.cleanupResourceGroups()
controller.Stop()
}

func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
re := suite.Require()
cli := suite.client

for _, group := range suite.initGroups {
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
}

cfg := &controller.RequestUnitConfig{
ReadBaseCost: 1,
ReadCostPerByte: 1,
WriteBaseCost: 1,
WriteCostPerByte: 1,
CPUMsCost: 1,
}
c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace())
c.Start(suite.ctx)

resourceGroupName := suite.initGroups[1].Name
// init
req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */)
resp := controller.NewTestResponseInfo(0, time.Duration(30), true)
_, penalty, err := c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.TotalCpuTimeMs, 0.0)
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)

req = controller.NewTestRequestInfo(true, 60, 1 /* store1 */)
resp = controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.TotalCpuTimeMs, 0.0)
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)

// failed request, shouldn't be counted in penalty
req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */)
resp = controller.NewTestResponseInfo(0, time.Duration(0), false)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.TotalCpuTimeMs, 0.0)
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)

// from same store, should be zero
req1 := controller.NewTestRequestInfo(false, 0, 1 /* store1 */)
resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
_, err = c.OnResponse(resourceGroupName, req1, resp1)
re.NoError(err)

// from different store, should be non-zero
req2 := controller.NewTestRequestInfo(true, 50, 2 /* store2 */)
resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(60))
re.InEpsilon(penalty.TotalCpuTimeMs, 10.0/1000.0/1000.0, 1e-6)
_, err = c.OnResponse(resourceGroupName, req2, resp2)
re.NoError(err)

// from new store, should be zero
req3 := controller.NewTestRequestInfo(true, 0, 3 /* store3 */)
resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
_, err = c.OnResponse(resourceGroupName, req3, resp3)
re.NoError(err)

// from different group, should be zero
resourceGroupName = suite.initGroups[2].Name
req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */)
resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
_, err = c.OnResponse(resourceGroupName, req4, resp4)
re.NoError(err)
Connor1996 marked this conversation as resolved.
Show resolved Hide resolved
}

func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() {
re := suite.Require()
cli := suite.client
Expand Down Expand Up @@ -557,7 +648,6 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() {
checkFunc(gresp, groups[0])
}
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/resource_manager/server/fastPersist"))
suite.cleanupResourceGroups()
}

func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() {
Expand Down Expand Up @@ -813,9 +903,6 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientFailover()
re.NotNil(getResp)
re.Equal(group.RUSettings.RU.Settings.FillRate, getResp.RUSettings.RU.Settings.FillRate)
}

// Cleanup the resource group.
suite.cleanupResourceGroups()
}

func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMode() {
Expand Down Expand Up @@ -874,7 +961,6 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo
controller.Stop()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/resource_manager/server/acquireFailed"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/degradedModeRU"))
suite.cleanupResourceGroups()
}

func (suite *resourceManagerClientTestSuite) TestLoadRequestUnitConfig() {
Expand Down Expand Up @@ -956,5 +1042,4 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() {

re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/fastCleanup"))
controller.Stop()
suite.cleanupResourceGroups()
}