Skip to content

Commit

Permalink
operator: cache operator influence (#6234)
Browse files Browse the repository at this point in the history
close #6235

Signed-off-by: bufferflies <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
bufferflies and ti-chi-bot authored Apr 14, 2023
1 parent 0e16783 commit f1fab13
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 12 deletions.
11 changes: 11 additions & 0 deletions pkg/core/storelimit/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,22 @@ const (
)

// StoreLimit is an interface to control the operator rate of store
// TODO: add a method to control the rate of store
// the normal control flow is:
// 1. check the store limit with Available in checker or scheduler.
// 2. check the store limit with Available in operator controller again.
// the different between 1 and 2 is that 1 maybe not use the operator level.
// 3. take the cost of operator with Take in operator controller.
// 4. ack will put back the cost into the limit for the next waiting operator after the operator is finished.
// the cost is the operator influence, so the influence should be same in the life of the operator.
type StoreLimit interface {
// Available returns true if the store can accept the operator
Available(cost int64, typ Type, level constant.PriorityLevel) bool
// Take takes the cost of the operator, it returns false if the store can't accept any operators.
Take(count int64, typ Type, level constant.PriorityLevel) bool
// Reset resets the store limit
Reset(rate float64, typ Type)
// Ack put back the cost into the limit for the next waiting operator after the operator is finished.
// only snapshot type can use this method.
Ack(cost int64, typ Type)
}
8 changes: 4 additions & 4 deletions pkg/core/storelimit/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ func TestSlidingWindow(t *testing.T) {
re.True(s.Available(capacity, SendSnapshot, constant.Low))
re.True(s.Take(capacity, SendSnapshot, constant.Low))
re.False(s.Available(capacity, SendSnapshot, constant.Low))
s.Ack(capacity)
s.Ack(capacity, SendSnapshot)
re.True(s.Available(capacity, SendSnapshot, constant.Low))

// case 1: it will occupy the normal window size not the core.High window.
re.True(s.Take(capacity, SendSnapshot, constant.High))
re.EqualValues(capacity, s.GetUsed())
re.EqualValues(0, s.windows[constant.High].getUsed())
s.Ack(capacity)
s.Ack(capacity, SendSnapshot)
re.EqualValues(s.GetUsed(), 0)

// case 2: it will occupy the core.High window size if the normal window is full.
Expand All @@ -75,8 +75,8 @@ func TestSlidingWindow(t *testing.T) {
re.True(s.Take(capacity-minSnapSize, SendSnapshot, constant.Medium))
re.False(s.Take(capacity-minSnapSize, SendSnapshot, constant.Medium))
re.EqualValues(s.GetUsed(), capacity+capacity+capacity-minSnapSize*3)
s.Ack(capacity - minSnapSize)
s.Ack(capacity - minSnapSize)
s.Ack(capacity-minSnapSize, SendSnapshot)
s.Ack(capacity-minSnapSize, SendSnapshot)
re.Equal(s.GetUsed(), capacity-minSnapSize)

// case 3: skip the type is not the SendSnapshot
Expand Down
7 changes: 6 additions & 1 deletion pkg/core/storelimit/sliding_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
minSnapSize = 10
)

var _ StoreLimit = &SlidingWindows{}

// SlidingWindows is a multi sliding windows
type SlidingWindows struct {
mu syncutil.RWMutex
Expand Down Expand Up @@ -107,7 +109,10 @@ func (s *SlidingWindows) Take(token int64, typ Type, level constant.PriorityLeve
// Ack indicates that some executing operator has been finished.
// The order of refilling windows is from high to low.
// It will refill the highest window first.
func (s *SlidingWindows) Ack(token int64) {
func (s *SlidingWindows) Ack(token int64, typ Type) {
if typ != SendSnapshot {
return
}
s.mu.Lock()
defer s.mu.Unlock()
for i := constant.PriorityLevelLen - 1; i >= 0; i-- {
Expand Down
3 changes: 3 additions & 0 deletions pkg/core/storelimit/store_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func NewStoreRateLimit(ratePerSec float64) StoreLimit {
}
}

// Ack does nothing.
func (l *StoreRateLimit) Ack(_ int64, _ Type) {}

// Available returns the number of available tokens.
// notice that the priority level is not used.
func (l *StoreRateLimit) Available(cost int64, typ Type, _ constant.PriorityLevel) bool {
Expand Down
22 changes: 20 additions & 2 deletions pkg/schedule/operator/influence.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ func NewOpInfluence() *OpInfluence {
}
}

// Add adds another influence.
func (m OpInfluence) Add(other *OpInfluence) {
for id, v := range other.StoresInfluence {
m.GetStoreInfluence(id).add(v)
}
}

// GetStoreInfluence get storeInfluence of specific store.
func (m OpInfluence) GetStoreInfluence(id uint64) *StoreInfluence {
storeInfluence, ok := m.StoresInfluence[id]
Expand All @@ -52,8 +59,19 @@ type StoreInfluence struct {
StepCost map[storelimit.Type]int64
}

func (s *StoreInfluence) add(other *StoreInfluence) {
s.RegionCount += other.RegionCount
s.RegionSize += other.RegionSize
s.LeaderSize += other.LeaderSize
s.LeaderCount += other.LeaderCount
s.WitnessCount += other.WitnessCount
for _, v := range storelimit.TypeNameValue {
s.AddStepCost(v, other.GetStepCost(v))
}
}

// ResourceProperty returns delta size of leader/region by influence.
func (s StoreInfluence) ResourceProperty(kind constant.ScheduleKind) int64 {
func (s *StoreInfluence) ResourceProperty(kind constant.ScheduleKind) int64 {
switch kind.Resource {
case constant.LeaderKind:
switch kind.Policy {
Expand All @@ -74,7 +92,7 @@ func (s StoreInfluence) ResourceProperty(kind constant.ScheduleKind) int64 {
}

// GetStepCost returns the specific type step cost
func (s StoreInfluence) GetStepCost(limitType storelimit.Type) int64 {
func (s *StoreInfluence) GetStepCost(limitType storelimit.Type) int64 {
if s.StepCost == nil {
return 0
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Operator struct {
AdditionalInfos map[string]string
ApproximateSize int64
timeout time.Duration
influence *OpInfluence
}

// NewOperator creates a new operator.
Expand Down Expand Up @@ -342,9 +343,17 @@ func (o *Operator) UnfinishedInfluence(opInfluence OpInfluence, region *core.Reg

// TotalInfluence calculates the store difference which whole operator steps make.
func (o *Operator) TotalInfluence(opInfluence OpInfluence, region *core.RegionInfo) {
for step := 0; step < len(o.steps); step++ {
o.steps[step].Influence(opInfluence, region)
// skip if region is nil and not cache influence.
if region == nil && o.influence == nil {
return
}
if o.influence == nil {
o.influence = NewOpInfluence()
for step := 0; step < len(o.steps); step++ {
o.steps[step].Influence(*o.influence, region)
}
}
opInfluence.Add(o.influence)
}

// OpHistory is used to log and visualize completed operators.
Expand Down
19 changes: 16 additions & 3 deletions pkg/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,20 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool {
return true
}

func (oc *OperatorController) ack(op *operator.Operator) {
opInfluence := NewTotalOpInfluence([]*operator.Operator{op}, oc.cluster)
for storeID := range opInfluence.StoresInfluence {
for _, v := range storelimit.TypeNameValue {
limiter := oc.getOrCreateStoreLimit(storeID, v)
if limiter == nil {
return
}
cost := opInfluence.GetStoreInfluence(storeID).GetStepCost(v)
limiter.Ack(cost, v)
}
}
}

// RemoveOperator removes a operator from the running operators.
func (oc *OperatorController) RemoveOperator(op *operator.Operator, extraFields ...zap.Field) bool {
oc.Lock()
Expand Down Expand Up @@ -540,6 +554,7 @@ func (oc *OperatorController) removeOperatorLocked(op *operator.Operator) bool {
delete(oc.operators, regionID)
oc.updateCounts(oc.operators)
operatorCounter.WithLabelValues(op.Desc(), "remove").Inc()
oc.ack(op)
return true
}
return false
Expand Down Expand Up @@ -745,9 +760,7 @@ func (oc *OperatorController) GetFastOpInfluence(cluster Cluster, influence oper
// AddOpInfluence add operator influence for cluster
func AddOpInfluence(op *operator.Operator, influence operator.OpInfluence, cluster Cluster) {
region := cluster.GetRegion(op.RegionID())
if region != nil {
op.TotalInfluence(influence, region)
}
op.TotalInfluence(influence, region)
}

// NewTotalOpInfluence creates a OpInfluence.
Expand Down
31 changes: 31 additions & 0 deletions pkg/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,37 @@ func (suite *operatorControllerTestSuite) TearDownSuite() {
suite.cancel()
}

func (suite *operatorControllerTestSuite) TestCacheInfluence() {
opt := mockconfig.NewTestOptions()
tc := mockcluster.NewCluster(suite.ctx, opt)
oc := NewOperatorController(suite.ctx, tc, nil)
tc.AddLeaderStore(2, 1)
region := tc.AddLeaderRegion(1, 1, 2)

steps := []operator.OpStep{
operator.RemovePeer{FromStore: 2},
}
op := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, steps...)
oc.SetOperator(op)
suite.True(op.Start())
influence := operator.NewOpInfluence()
AddOpInfluence(op, *influence, tc)
suite.Equal(int64(-96), influence.GetStoreInfluence(2).RegionSize)

// case: influence is same even if the region size changed.
region = region.Clone(core.SetApproximateSize(100))
tc.PutRegion(region)
influence1 := operator.NewOpInfluence()
AddOpInfluence(op, *influence1, tc)
suite.Equal(int64(-96), influence1.GetStoreInfluence(2).RegionSize)

// case: influence is valid even if the region is removed.
tc.RemoveRegion(region)
influence2 := operator.NewOpInfluence()
AddOpInfluence(op, *influence2, tc)
suite.Equal(int64(-96), influence2.GetStoreInfluence(2).RegionSize)
}

// issue #1338
func (suite *operatorControllerTestSuite) TestGetOpInfluence() {
opt := mockconfig.NewTestOptions()
Expand Down

0 comments on commit f1fab13

Please sign in to comment.