Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: Count resource penalty for resource control #6336

Merged
merged 18 commits into from
Apr 19, 2023
64 changes: 60 additions & 4 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
// ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store.
type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, error)
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, Delta, error)
// OnResponse is used to consume tokens after receiving response
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
}
Expand Down Expand Up @@ -97,6 +97,9 @@ type ResourceGroupsController struct {

calculators []ResourceCalculator

mutex sync.Mutex // used for the `resourceCounter`
resourceCounter map[string]Counter // resourceGroupName -> counter

// When a signal is received, it means the number of available token is low.
lowTokenNotifyChan chan struct{}
// When a token bucket response received from server, it will be sent to the channel.
Expand All @@ -114,6 +117,17 @@ type ResourceGroupsController struct {
}
}

type Counter struct {
storeCounter map[uint64]Delta // storeID -> delta
globalCounter Delta
}

// Delta records resource usage on all stores between two adjacent requests on same store.
type Delta struct {
WriteBytes uint64
CpuTime time.Duration
}

// NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor
func NewResourceGroupController(
ctx context.Context,
Expand All @@ -137,6 +151,7 @@ func NewResourceGroupController(
lowTokenNotifyChan: make(chan struct{}, 1),
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1),
tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen),
resourceCounter: make(map[string]Counter),
}
for _, opt := range opts {
opt(controller)
Expand Down Expand Up @@ -429,19 +444,60 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
func (c *ResourceGroupsController) OnRequestWait(
ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, error) {
) (*rmpb.Consumption, Delta, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return nil, err
return nil, Delta{}, err
}
consumption, err := gc.onRequestWait(ctx, info)
if err != nil {
return nil, Delta{}, err
}
return gc.onRequestWait(ctx, info)

c.mutex.Lock()
defer c.mutex.Unlock()
m, ok := c.resourceCounter[resourceGroupName]
if !ok {
m = Counter{
storeCounter: make(map[uint64]Delta),
globalCounter: Delta{},
}
c.resourceCounter[resourceGroupName] = m
}
delta := Delta{
Connor1996 marked this conversation as resolved.
Show resolved Hide resolved
WriteBytes: m.globalCounter.WriteBytes - m.storeCounter[info.StoreID()].WriteBytes,
CpuTime: m.globalCounter.CpuTime - m.storeCounter[info.StoreID()].CpuTime,
}
// More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty.
// So here resets it directly as failure is rare.
m.storeCounter[info.StoreID()] = m.globalCounter

return consumption, delta, nil
}

// OnResponse is used to consume tokens after receiving response
func (c *ResourceGroupsController) OnResponse(
resourceGroupName string, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
// record resource delta
c.mutex.Lock()
if resp.Succeed() {
m, ok := c.resourceCounter[resourceGroupName]
if !ok {
m = Counter{
storeCounter: make(map[uint64]Delta),
globalCounter: Delta{},
}
c.resourceCounter[resourceGroupName] = m
}
if req.IsWrite() {
m.globalCounter.WriteBytes += req.WriteBytes()
}
m.globalCounter.CpuTime += resp.KVCPU()
Connor1996 marked this conversation as resolved.
Show resolved Hide resolved
}
c.mutex.Unlock()

tmp, ok := c.groupsController.Load(resourceGroupName)
if !ok {
log.Warn("[resource group controller] resource group name does not exist", zap.String("resourceGroupName", resourceGroupName))
Expand Down
1 change: 1 addition & 0 deletions client/resource_group/controller/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type RequestUnit float64
type RequestInfo interface {
IsWrite() bool
WriteBytes() uint64
StoreID() uint64
}

// ResponseInfo is the interface of the response information provider. A response should be
Expand Down
9 changes: 8 additions & 1 deletion client/resource_group/controller/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import "time"
type TestRequestInfo struct {
isWrite bool
writeBytes uint64
storeID uint64
}

// NewTestRequestInfo creates a new TestRequestInfo.
func NewTestRequestInfo(isWrite bool, writeBytes uint64) *TestRequestInfo {
func NewTestRequestInfo(isWrite bool, writeBytes uint64, storeID uint64) *TestRequestInfo {
return &TestRequestInfo{
isWrite: isWrite,
writeBytes: writeBytes,
storeID: storeID,
}
}

Expand All @@ -44,6 +46,11 @@ func (tri *TestRequestInfo) WriteBytes() uint64 {
return tri.writeBytes
}

// StoreID implements the RequestInfo interface.
func (tri *TestRequestInfo) StoreID() uint64 {
return tri.storeID
}

// TestResponseInfo is used to test the response info interface.
type TestResponseInfo struct {
readBytes uint64
Expand Down
101 changes: 90 additions & 11 deletions tests/integrations/mcs/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,11 @@ type tokenConsumptionPerSecond struct {
}

func (t tokenConsumptionPerSecond) makeReadRequest() *controller.TestRequestInfo {
return controller.NewTestRequestInfo(false, 0)
return controller.NewTestRequestInfo(false, 0, 0)
}

func (t tokenConsumptionPerSecond) makeWriteRequest() *controller.TestRequestInfo {
return controller.NewTestRequestInfo(true, uint64(t.wruTokensAtATime-1))
return controller.NewTestRequestInfo(true, uint64(t.wruTokensAtATime-1), 0)
}

func (t tokenConsumptionPerSecond) makeReadResponse() *controller.TestResponseInfo {
Expand Down Expand Up @@ -340,9 +340,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
rres := cas.tcs[i].makeReadResponse()
wres := cas.tcs[i].makeWriteResponse()
startTime := time.Now()
_, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
_, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
re.NoError(err)
_, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
re.NoError(err)
sum += time.Since(startTime)
controller.OnResponse(cas.resourceGroupName, rreq, rres)
Expand Down Expand Up @@ -408,9 +408,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
wreq := tcs.makeWriteRequest()
rres := tcs.makeReadResponse()
wres := tcs.makeWriteResponse()
_, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
_, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
re.NoError(err)
_, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
re.NoError(err)
controller.OnResponse(resourceGroupName, rreq, rres)
controller.OnResponse(resourceGroupName, wreq, wres)
Expand Down Expand Up @@ -447,9 +447,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
rres := cas.tcs[i].makeReadResponse()
wres := cas.tcs[i].makeWriteResponse()
startTime := time.Now()
_, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
_, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
re.NoError(err)
_, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
re.NoError(err)
sum += time.Since(startTime)
controller.OnResponse(resourceGroupName, rreq, rres)
Expand All @@ -467,22 +467,22 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
resourceGroupName2 := suite.initGroups[2].Name
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 100000, times: 1, waitDuration: 0}
wreq := tcs.makeWriteRequest()
_, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq)
_, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq)
re.NoError(err)

re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/acceleratedSpeedTrend", "return(true)"))
resourceGroupName3 := suite.initGroups[3].Name
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 1000, times: 1, waitDuration: 0}
wreq = tcs.makeWriteRequest()
_, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
re.NoError(err)
time.Sleep(110 * time.Millisecond)
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 10, times: 1010, waitDuration: 0}
duration := time.Duration(0)
for i := 0; i < tcs.times; i++ {
wreq = tcs.makeWriteRequest()
startTime := time.Now()
_, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
_, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
duration += time.Since(startTime)
re.NoError(err)
}
Expand All @@ -493,6 +493,85 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
controller.Stop()
}

func (suite *resourceManagerClientTestSuite) TestResourceDelta() {
re := suite.Require()
cli := suite.client

for _, group := range suite.initGroups {
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
}

cfg := &controller.RequestUnitConfig{
ReadBaseCost: 1,
ReadCostPerByte: 1,
WriteBaseCost: 1,
WriteCostPerByte: 1,
CPUMsCost: 1,
}
c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace())
c.Start(suite.ctx)

resourceGroupName := suite.initGroups[1].Name
// init
req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */)
resp := controller.NewTestResponseInfo(0, time.Duration(30), true)
_, delta, err := c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(delta.WriteBytes, uint64(0))
re.Equal(delta.CpuTime, time.Duration(0))
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)

req = controller.NewTestRequestInfo(true, 60, 1 /* store1 */)
resp = controller.NewTestResponseInfo(0, time.Duration(10), true)
_, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(delta.WriteBytes, uint64(0))
re.Equal(delta.CpuTime, time.Duration(0))
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)

// failed request, shouldn't be counted in delta
req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */)
resp = controller.NewTestResponseInfo(0, time.Duration(0), false)
_, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(delta.WriteBytes, uint64(0))
re.Equal(delta.CpuTime, time.Duration(0))
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)

// from same store, should be zero
req1 := controller.NewTestRequestInfo(true, 70, 1 /* store1 */)
resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1)
re.NoError(err)
re.Equal(delta.WriteBytes, uint64(0))
_, err = c.OnResponse(resourceGroupName, req1, resp1)
re.NoError(err)

// from different store, should be non-zero
req2 := controller.NewTestRequestInfo(true, 50, 2 /* store2 */)
resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2)
re.NoError(err)
re.Equal(delta.WriteBytes, uint64(130))
_, err = c.OnResponse(resourceGroupName, req2, resp2)
re.NoError(err)

// from different group, should be zero
resourceGroupName = suite.initGroups[2].Name
req3 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */)
resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3)
re.NoError(err)
re.Equal(delta.WriteBytes, uint64(0))
_, err = c.OnResponse(resourceGroupName, req3, resp3)
re.NoError(err)
Connor1996 marked this conversation as resolved.
Show resolved Hide resolved
}

func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() {
re := suite.Require()
cli := suite.client
Expand Down