From f1fab1337b30525d327af8f7f44d0d89352ff68a Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Fri, 14 Apr 2023 14:41:01 +0800 Subject: [PATCH] operator: cache operator influence (#6234) close tikv/pd#6235 Signed-off-by: bufferflies <1045931706@qq.com> Co-authored-by: Ti Chi Robot --- pkg/core/storelimit/limit.go | 11 +++++++++ pkg/core/storelimit/limit_test.go | 8 +++--- pkg/core/storelimit/sliding_window.go | 7 +++++- pkg/core/storelimit/store_limit.go | 3 +++ pkg/schedule/operator/influence.go | 22 +++++++++++++++-- pkg/schedule/operator/operator.go | 13 ++++++++-- pkg/schedule/operator_controller.go | 19 ++++++++++++--- pkg/schedule/operator_controller_test.go | 31 ++++++++++++++++++++++++ 8 files changed, 102 insertions(+), 12 deletions(-) diff --git a/pkg/core/storelimit/limit.go b/pkg/core/storelimit/limit.go index b5a3312ff56..1ecd99102ad 100644 --- a/pkg/core/storelimit/limit.go +++ b/pkg/core/storelimit/limit.go @@ -33,6 +33,14 @@ const ( ) // StoreLimit is an interface to control the operator rate of store +// TODO: add a method to control the rate of store +// the normal control flow is: +// 1. check the store limit with Available in checker or scheduler. +// 2. check the store limit with Available in operator controller again. +// the different between 1 and 2 is that 1 maybe not use the operator level. +// 3. take the cost of operator with Take in operator controller. +// 4. ack will put back the cost into the limit for the next waiting operator after the operator is finished. +// the cost is the operator influence, so the influence should be same in the life of the operator. type StoreLimit interface { // Available returns true if the store can accept the operator Available(cost int64, typ Type, level constant.PriorityLevel) bool @@ -40,4 +48,7 @@ type StoreLimit interface { Take(count int64, typ Type, level constant.PriorityLevel) bool // Reset resets the store limit Reset(rate float64, typ Type) + // Ack put back the cost into the limit for the next waiting operator after the operator is finished. + // only snapshot type can use this method. + Ack(cost int64, typ Type) } diff --git a/pkg/core/storelimit/limit_test.go b/pkg/core/storelimit/limit_test.go index 7ff321776ed..03cc06d8a17 100644 --- a/pkg/core/storelimit/limit_test.go +++ b/pkg/core/storelimit/limit_test.go @@ -56,14 +56,14 @@ func TestSlidingWindow(t *testing.T) { re.True(s.Available(capacity, SendSnapshot, constant.Low)) re.True(s.Take(capacity, SendSnapshot, constant.Low)) re.False(s.Available(capacity, SendSnapshot, constant.Low)) - s.Ack(capacity) + s.Ack(capacity, SendSnapshot) re.True(s.Available(capacity, SendSnapshot, constant.Low)) // case 1: it will occupy the normal window size not the core.High window. re.True(s.Take(capacity, SendSnapshot, constant.High)) re.EqualValues(capacity, s.GetUsed()) re.EqualValues(0, s.windows[constant.High].getUsed()) - s.Ack(capacity) + s.Ack(capacity, SendSnapshot) re.EqualValues(s.GetUsed(), 0) // case 2: it will occupy the core.High window size if the normal window is full. @@ -75,8 +75,8 @@ func TestSlidingWindow(t *testing.T) { re.True(s.Take(capacity-minSnapSize, SendSnapshot, constant.Medium)) re.False(s.Take(capacity-minSnapSize, SendSnapshot, constant.Medium)) re.EqualValues(s.GetUsed(), capacity+capacity+capacity-minSnapSize*3) - s.Ack(capacity - minSnapSize) - s.Ack(capacity - minSnapSize) + s.Ack(capacity-minSnapSize, SendSnapshot) + s.Ack(capacity-minSnapSize, SendSnapshot) re.Equal(s.GetUsed(), capacity-minSnapSize) // case 3: skip the type is not the SendSnapshot diff --git a/pkg/core/storelimit/sliding_window.go b/pkg/core/storelimit/sliding_window.go index 34b39c36253..5fded60e124 100644 --- a/pkg/core/storelimit/sliding_window.go +++ b/pkg/core/storelimit/sliding_window.go @@ -24,6 +24,8 @@ const ( minSnapSize = 10 ) +var _ StoreLimit = &SlidingWindows{} + // SlidingWindows is a multi sliding windows type SlidingWindows struct { mu syncutil.RWMutex @@ -107,7 +109,10 @@ func (s *SlidingWindows) Take(token int64, typ Type, level constant.PriorityLeve // Ack indicates that some executing operator has been finished. // The order of refilling windows is from high to low. // It will refill the highest window first. -func (s *SlidingWindows) Ack(token int64) { +func (s *SlidingWindows) Ack(token int64, typ Type) { + if typ != SendSnapshot { + return + } s.mu.Lock() defer s.mu.Unlock() for i := constant.PriorityLevelLen - 1; i >= 0; i-- { diff --git a/pkg/core/storelimit/store_limit.go b/pkg/core/storelimit/store_limit.go index 824431bf54e..5de17aecb30 100644 --- a/pkg/core/storelimit/store_limit.go +++ b/pkg/core/storelimit/store_limit.go @@ -81,6 +81,9 @@ func NewStoreRateLimit(ratePerSec float64) StoreLimit { } } +// Ack does nothing. +func (l *StoreRateLimit) Ack(_ int64, _ Type) {} + // Available returns the number of available tokens. // notice that the priority level is not used. func (l *StoreRateLimit) Available(cost int64, typ Type, _ constant.PriorityLevel) bool { diff --git a/pkg/schedule/operator/influence.go b/pkg/schedule/operator/influence.go index 5cce6ffdf61..fed259d99f0 100644 --- a/pkg/schedule/operator/influence.go +++ b/pkg/schedule/operator/influence.go @@ -32,6 +32,13 @@ func NewOpInfluence() *OpInfluence { } } +// Add adds another influence. +func (m OpInfluence) Add(other *OpInfluence) { + for id, v := range other.StoresInfluence { + m.GetStoreInfluence(id).add(v) + } +} + // GetStoreInfluence get storeInfluence of specific store. func (m OpInfluence) GetStoreInfluence(id uint64) *StoreInfluence { storeInfluence, ok := m.StoresInfluence[id] @@ -52,8 +59,19 @@ type StoreInfluence struct { StepCost map[storelimit.Type]int64 } +func (s *StoreInfluence) add(other *StoreInfluence) { + s.RegionCount += other.RegionCount + s.RegionSize += other.RegionSize + s.LeaderSize += other.LeaderSize + s.LeaderCount += other.LeaderCount + s.WitnessCount += other.WitnessCount + for _, v := range storelimit.TypeNameValue { + s.AddStepCost(v, other.GetStepCost(v)) + } +} + // ResourceProperty returns delta size of leader/region by influence. -func (s StoreInfluence) ResourceProperty(kind constant.ScheduleKind) int64 { +func (s *StoreInfluence) ResourceProperty(kind constant.ScheduleKind) int64 { switch kind.Resource { case constant.LeaderKind: switch kind.Policy { @@ -74,7 +92,7 @@ func (s StoreInfluence) ResourceProperty(kind constant.ScheduleKind) int64 { } // GetStepCost returns the specific type step cost -func (s StoreInfluence) GetStepCost(limitType storelimit.Type) int64 { +func (s *StoreInfluence) GetStepCost(limitType storelimit.Type) int64 { if s.StepCost == nil { return 0 } diff --git a/pkg/schedule/operator/operator.go b/pkg/schedule/operator/operator.go index f4b1d819cf4..c69e266308e 100644 --- a/pkg/schedule/operator/operator.go +++ b/pkg/schedule/operator/operator.go @@ -52,6 +52,7 @@ type Operator struct { AdditionalInfos map[string]string ApproximateSize int64 timeout time.Duration + influence *OpInfluence } // NewOperator creates a new operator. @@ -342,9 +343,17 @@ func (o *Operator) UnfinishedInfluence(opInfluence OpInfluence, region *core.Reg // TotalInfluence calculates the store difference which whole operator steps make. func (o *Operator) TotalInfluence(opInfluence OpInfluence, region *core.RegionInfo) { - for step := 0; step < len(o.steps); step++ { - o.steps[step].Influence(opInfluence, region) + // skip if region is nil and not cache influence. + if region == nil && o.influence == nil { + return } + if o.influence == nil { + o.influence = NewOpInfluence() + for step := 0; step < len(o.steps); step++ { + o.steps[step].Influence(*o.influence, region) + } + } + opInfluence.Add(o.influence) } // OpHistory is used to log and visualize completed operators. diff --git a/pkg/schedule/operator_controller.go b/pkg/schedule/operator_controller.go index b64d88c1aa3..70346e5671f 100644 --- a/pkg/schedule/operator_controller.go +++ b/pkg/schedule/operator_controller.go @@ -511,6 +511,20 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool { return true } +func (oc *OperatorController) ack(op *operator.Operator) { + opInfluence := NewTotalOpInfluence([]*operator.Operator{op}, oc.cluster) + for storeID := range opInfluence.StoresInfluence { + for _, v := range storelimit.TypeNameValue { + limiter := oc.getOrCreateStoreLimit(storeID, v) + if limiter == nil { + return + } + cost := opInfluence.GetStoreInfluence(storeID).GetStepCost(v) + limiter.Ack(cost, v) + } + } +} + // RemoveOperator removes a operator from the running operators. func (oc *OperatorController) RemoveOperator(op *operator.Operator, extraFields ...zap.Field) bool { oc.Lock() @@ -540,6 +554,7 @@ func (oc *OperatorController) removeOperatorLocked(op *operator.Operator) bool { delete(oc.operators, regionID) oc.updateCounts(oc.operators) operatorCounter.WithLabelValues(op.Desc(), "remove").Inc() + oc.ack(op) return true } return false @@ -745,9 +760,7 @@ func (oc *OperatorController) GetFastOpInfluence(cluster Cluster, influence oper // AddOpInfluence add operator influence for cluster func AddOpInfluence(op *operator.Operator, influence operator.OpInfluence, cluster Cluster) { region := cluster.GetRegion(op.RegionID()) - if region != nil { - op.TotalInfluence(influence, region) - } + op.TotalInfluence(influence, region) } // NewTotalOpInfluence creates a OpInfluence. diff --git a/pkg/schedule/operator_controller_test.go b/pkg/schedule/operator_controller_test.go index ca15de2cc68..a8e162aead0 100644 --- a/pkg/schedule/operator_controller_test.go +++ b/pkg/schedule/operator_controller_test.go @@ -56,6 +56,37 @@ func (suite *operatorControllerTestSuite) TearDownSuite() { suite.cancel() } +func (suite *operatorControllerTestSuite) TestCacheInfluence() { + opt := mockconfig.NewTestOptions() + tc := mockcluster.NewCluster(suite.ctx, opt) + oc := NewOperatorController(suite.ctx, tc, nil) + tc.AddLeaderStore(2, 1) + region := tc.AddLeaderRegion(1, 1, 2) + + steps := []operator.OpStep{ + operator.RemovePeer{FromStore: 2}, + } + op := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, steps...) + oc.SetOperator(op) + suite.True(op.Start()) + influence := operator.NewOpInfluence() + AddOpInfluence(op, *influence, tc) + suite.Equal(int64(-96), influence.GetStoreInfluence(2).RegionSize) + + // case: influence is same even if the region size changed. + region = region.Clone(core.SetApproximateSize(100)) + tc.PutRegion(region) + influence1 := operator.NewOpInfluence() + AddOpInfluence(op, *influence1, tc) + suite.Equal(int64(-96), influence1.GetStoreInfluence(2).RegionSize) + + // case: influence is valid even if the region is removed. + tc.RemoveRegion(region) + influence2 := operator.NewOpInfluence() + AddOpInfluence(op, *influence2, tc) + suite.Equal(int64(-96), influence2.GetStoreInfluence(2).RegionSize) +} + // issue #1338 func (suite *operatorControllerTestSuite) TestGetOpInfluence() { opt := mockconfig.NewTestOptions()