Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Connor1996 <[email protected]>
  • Loading branch information
Connor1996 committed Apr 19, 2023
1 parent dfcac87 commit 3c74095
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 92 deletions.
120 changes: 47 additions & 73 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.uber.org/zap"
)

Expand All @@ -50,7 +51,7 @@ const (
// ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store.
type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, Delta, error)
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, error)
// OnResponse is used to consume tokens after receiving response
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
}
Expand Down Expand Up @@ -97,9 +98,6 @@ type ResourceGroupsController struct {

calculators []ResourceCalculator

mutex sync.Mutex // used for the `resourceCounter`
resourceCounter map[string]*Counter // resourceGroupName -> counter

// When a signal is received, it means the number of available token is low.
lowTokenNotifyChan chan struct{}
// When a token bucket response received from server, it will be sent to the channel.
Expand All @@ -117,17 +115,6 @@ type ResourceGroupsController struct {
}
}

type Counter struct {
storeCounter map[uint64]Delta // storeID -> delta
globalCounter Delta
}

// Delta records resource usage on all stores between two adjacent requests on same store.
type Delta struct {
WriteBytes uint64
CPUTime time.Duration
}

// NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor
func NewResourceGroupController(
ctx context.Context,
Expand All @@ -151,7 +138,6 @@ func NewResourceGroupController(
lowTokenNotifyChan: make(chan struct{}, 1),
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1),
tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen),
resourceCounter: make(map[string]*Counter),
}
for _, opt := range opts {
opt(controller)
Expand Down Expand Up @@ -444,66 +430,19 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
func (c *ResourceGroupsController) OnRequestWait(
ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, Delta, error) {
) (*rmpb.Consumption, *rmpb.Consumption, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return nil, Delta{}, err
}
consumption, err := gc.onRequestWait(ctx, info)
if err != nil {
return nil, Delta{}, err
}

c.mutex.Lock()
defer c.mutex.Unlock()
m, ok := c.resourceCounter[resourceGroupName]
if !ok {
m = &Counter{
storeCounter: make(map[uint64]Delta),
globalCounter: Delta{},
}
c.resourceCounter[resourceGroupName] = m
return nil, nil, err
}

delta := Delta{}
if counter, exist := m.storeCounter[info.StoreID()]; exist {
delta.WriteBytes = m.globalCounter.WriteBytes - counter.WriteBytes
delta.CPUTime = m.globalCounter.CPUTime - counter.CPUTime
}
// More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty.
// So here resets it directly as failure is rare.
m.storeCounter[info.StoreID()] = m.globalCounter

return consumption, delta, nil
return gc.onRequestWait(ctx, info)
}

// OnResponse is used to consume tokens after receiving response
func (c *ResourceGroupsController) OnResponse(
resourceGroupName string, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
// record resource delta
c.mutex.Lock()
if resp.Succeed() {
m, ok := c.resourceCounter[resourceGroupName]
if !ok {
m = &Counter{
storeCounter: make(map[uint64]Delta),
globalCounter: Delta{},
}
c.resourceCounter[resourceGroupName] = m
}
storeCounter := m.storeCounter[req.StoreID()]
if req.IsWrite() {
m.globalCounter.WriteBytes += req.WriteBytes()
storeCounter.WriteBytes += req.WriteBytes()
}
m.globalCounter.CPUTime += resp.KVCPU()
storeCounter.CPUTime += resp.KVCPU()
m.storeCounter[req.StoreID()] = storeCounter
}
c.mutex.Unlock()

tmp, ok := c.groupsController.Load(resourceGroupName)
if !ok {
log.Warn("[resource group controller] resource group name does not exist", zap.String("resourceGroupName", resourceGroupName))
Expand All @@ -527,7 +466,9 @@ type groupCostController struct {

mu struct {
sync.Mutex
consumption *rmpb.Consumption
consumption *rmpb.Consumption
storeCounter map[uint64]*rmpb.Consumption
globalCounter *rmpb.Consumption
}

// fast path to make once token limit with un-limit burst.
Expand Down Expand Up @@ -635,6 +576,8 @@ func newGroupCostController(
}

gc.mu.consumption = &rmpb.Consumption{}
gc.mu.storeCounter = make(map[uint64]*rmpb.Consumption)
gc.mu.globalCounter = &rmpb.Consumption{}
return gc, nil
}

Expand Down Expand Up @@ -1078,14 +1021,16 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 {

func (gc *groupCostController) onRequestWait(
ctx context.Context, info RequestInfo,
) (*rmpb.Consumption, error) {
) (*rmpb.Consumption, *rmpb.Consumption, error) {
delta := &rmpb.Consumption{}
for _, calc := range gc.calculators {
calc.BeforeKVRequest(delta, info)
}

gc.mu.Lock()
add(gc.mu.consumption, delta)
gc.mu.Unlock()

if !gc.burstable.Load() {
var err error
now := time.Now()
Expand Down Expand Up @@ -1123,12 +1068,27 @@ func (gc *groupCostController) onRequestWait(
gc.mu.Lock()
sub(gc.mu.consumption, delta)
gc.mu.Unlock()
return nil, err
return nil, nil, err
} else {
gc.successfulRequestDuration.Observe(d.Seconds())
}
}
return delta, nil

gc.mu.Lock()
defer gc.mu.Unlock()
// calculate the penalty of the store
var penalty *rmpb.Consumption
if storeCounter, exist := gc.mu.storeCounter[info.StoreID()]; exist {
penalty = typeutil.DeepClone(gc.mu.globalCounter, func() *rmpb.Consumption { return &rmpb.Consumption{} })
sub(penalty, storeCounter)
} else {
penalty = &rmpb.Consumption{}
}
// More accurately, it should be reset when the request succeed. But it would cause all concurrent requests piggyback large delta which inflates penalty.
// So here resets it directly as failure is rare.
gc.mu.storeCounter[info.StoreID()] = typeutil.DeepClone(gc.mu.globalCounter, func() *rmpb.Consumption { return &rmpb.Consumption{} })

return delta, penalty, nil
}

func (gc *groupCostController) onResponse(
Expand All @@ -1154,9 +1114,23 @@ func (gc *groupCostController) onResponse(
}
}
}
gc.mu.Lock()
add(gc.mu.consumption, delta)
gc.mu.Unlock()

{
gc.mu.Lock()
defer gc.mu.Unlock()

// record the consumption of the request
add(gc.mu.consumption, delta)

// record the consumption of the request by store
// As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest`
for _, calc := range gc.calculators {
calc.BeforeKVRequest(delta, req)
}
add(gc.mu.storeCounter[req.StoreID()], delta)
add(gc.mu.globalCounter, delta)
}

return delta, nil
}

Expand Down
39 changes: 20 additions & 19 deletions tests/integrations/mcs/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,66 +517,67 @@ func (suite *resourceManagerClientTestSuite) TestResourceDelta() {
// init
req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */)
resp := controller.NewTestResponseInfo(0, time.Duration(30), true)
_, delta, err := c.OnRequestWait(suite.ctx, resourceGroupName, req)
_, penalty, err := c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(delta.WriteBytes, uint64(0))
re.Equal(delta.CPUTime, time.Duration(0))
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.TotalCpuTimeMs, 0.0)
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)

req = controller.NewTestRequestInfo(true, 60, 1 /* store1 */)
resp = controller.NewTestResponseInfo(0, time.Duration(10), true)
_, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(delta.WriteBytes, uint64(0))
re.Equal(delta.CPUTime, time.Duration(0))
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.TotalCpuTimeMs, 0.0)
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)

// failed request, shouldn't be counted in delta
// failed request, shouldn't be counted in penalty
req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */)
resp = controller.NewTestResponseInfo(0, time.Duration(0), false)
_, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(delta.WriteBytes, uint64(0))
re.Equal(delta.CPUTime, time.Duration(0))
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.TotalCpuTimeMs, 0.0)
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)

// from same store, should be zero
req1 := controller.NewTestRequestInfo(true, 70, 1 /* store1 */)
req1 := controller.NewTestRequestInfo(false, 0, 1 /* store1 */)
resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1)
re.NoError(err)
re.Equal(delta.WriteBytes, uint64(0))
re.Equal(penalty.WriteBytes, float64(0))
_, err = c.OnResponse(resourceGroupName, req1, resp1)
re.NoError(err)

// from different store, should be non-zero
req2 := controller.NewTestRequestInfo(true, 50, 2 /* store2 */)
resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2)
re.NoError(err)
re.Equal(delta.WriteBytes, uint64(130))
re.Equal(penalty.WriteBytes, float64(60))
re.InEpsilon(penalty.TotalCpuTimeMs, 10.0 / 1000.0 / 1000.0, 1e-6)
_, err = c.OnResponse(resourceGroupName, req2, resp2)
re.NoError(err)

// from new store, should be zero
req3 := controller.NewTestRequestInfo(true, 0, 3 /* store3 */)
resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3)
re.NoError(err)
re.Equal(delta.WriteBytes, uint64(0))
re.Equal(penalty.WriteBytes, float64(0))
_, err = c.OnResponse(resourceGroupName, req3, resp3)
re.NoError(err)

// from different group, should be zero
resourceGroupName = suite.initGroups[2].Name
req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */)
resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, delta, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4)
re.NoError(err)
re.Equal(delta.WriteBytes, uint64(0))
re.Equal(penalty.WriteBytes, float64(0))
_, err = c.OnResponse(resourceGroupName, req4, resp4)
re.NoError(err)
}
Expand Down

0 comments on commit 3c74095

Please sign in to comment.