Skip to content

Commit

Permalink
checker: only temp state should be added in waiting list (#5010) (#5058)
Browse files Browse the repository at this point in the history
close #4920, ref #5010

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: Ryan Leung <[email protected]>
  • Loading branch information
ti-chi-bot and rleungx authored Jun 14, 2022
1 parent 152e093 commit 204ed42
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 33 deletions.
2 changes: 1 addition & 1 deletion server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (

const (
runSchedulerCheckInterval = 3 * time.Second
collectFactor = 0.8
collectFactor = 0.9
collectTimeout = 5 * time.Minute
maxScheduleRetries = 10
maxLoadConfigRetries = 10
Expand Down
28 changes: 16 additions & 12 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,8 @@ func (s *testCoordinatorSuite) TestShouldRunWithNonLeaderRegions(c *C) {
{5, false},
{6, false},
{7, false},
{8, true},
{8, false},
{9, true},
}

for _, t := range tbl {
Expand All @@ -615,13 +616,12 @@ func (s *testCoordinatorSuite) TestShouldRunWithNonLeaderRegions(c *C) {
c.Assert(tc.processRegionHeartbeat(nr), IsNil)
c.Assert(co.shouldRun(), Equals, t.shouldRun)
}
nr := &metapb.Region{Id: 8, Peers: []*metapb.Peer{}}
nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}}
newRegion := core.NewRegionInfo(nr, nil)
c.Assert(tc.processRegionHeartbeat(newRegion), NotNil)
c.Assert(co.cluster.prepareChecker.sum, Equals, 8)
c.Assert(co.cluster.prepareChecker.sum, Equals, 9)

// Now, after server is prepared, there exist some regions with no leader.
c.Assert(tc.GetRegion(9).GetLeader().GetStoreId(), Equals, uint64(0))
c.Assert(tc.GetRegion(10).GetLeader().GetStoreId(), Equals, uint64(0))
}

Expand Down Expand Up @@ -972,7 +972,9 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) {
tc.putRegion(region)
start := time.Now()
{
op1 := lb.Schedule(tc)[0]
ops := lb.Schedule(tc)
c.Assert(ops, HasLen, 1)
op1 := ops[0]
c.Assert(op1, NotNil)
c.Assert(oc.AddOperator(op1), IsTrue)
c.Assert(oc.RemoveOperator(op1), IsTrue)
Expand All @@ -983,23 +985,25 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) {
if time.Since(start) > time.Second {
break
}
c.Assert(ops, IsNil)
c.Assert(ops, HasLen, 0)
}

// reset all stores' limit
// scheduling one time needs 1/10 seconds
opt.SetAllStoresLimit(storelimit.AddPeer, 600)
opt.SetAllStoresLimit(storelimit.RemovePeer, 600)
time.Sleep(time.Second)
for i := 0; i < 10; i++ {
op1 := lb.Schedule(tc)[0]
c.Assert(op1, NotNil)
c.Assert(oc.AddOperator(op1), IsTrue)
c.Assert(oc.RemoveOperator(op1), IsTrue)
ops := lb.Schedule(tc)
c.Assert(ops, HasLen, 1)
op := ops[0]
c.Assert(oc.AddOperator(op), IsTrue)
c.Assert(oc.RemoveOperator(op), IsTrue)
}
// sleep 1 seconds to make sure that the token is filled up
time.Sleep(1 * time.Second)
time.Sleep(time.Second)
for i := 0; i < 100; i++ {
c.Assert(lb.Schedule(tc), NotNil)
c.Assert(len(lb.Schedule(tc)), Greater, 0)
}
}

Expand Down
14 changes: 9 additions & 5 deletions server/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,13 @@ func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.O
}
log.Debug("region has fewer than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers())))
regionStores := r.cluster.GetRegionStores(region)
target := r.strategy(region).SelectStoreToAdd(regionStores)
target, filterByTempState := r.strategy(region).SelectStoreToAdd(regionStores)
if target == 0 {
log.Debug("no store to add replica", zap.Uint64("region-id", region.GetID()))
checkerCounter.WithLabelValues("replica_checker", "no-target-store").Inc()
r.regionWaitingList.Put(region.GetID(), nil)
if filterByTempState {
r.regionWaitingList.Put(region.GetID(), nil)
}
return nil
}
newPeer := &metapb.Peer{StoreId: target}
Expand Down Expand Up @@ -204,7 +206,7 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper
checkerCounter.WithLabelValues("replica_checker", "all-right").Inc()
return nil
}
newStore := strategy.SelectStoreToImprove(regionStores, oldStore)
newStore, _ := strategy.SelectStoreToImprove(regionStores, oldStore)
if newStore == 0 {
log.Debug("no better peer", zap.Uint64("region-id", region.GetID()))
checkerCounter.WithLabelValues("replica_checker", "not-better").Inc()
Expand Down Expand Up @@ -234,12 +236,14 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
}

regionStores := r.cluster.GetRegionStores(region)
target := r.strategy(region).SelectStoreToFix(regionStores, storeID)
target, filterByTempState := r.strategy(region).SelectStoreToFix(regionStores, storeID)
if target == 0 {
reason := fmt.Sprintf("no-store-%s", status)
checkerCounter.WithLabelValues("replica_checker", reason).Inc()
r.regionWaitingList.Put(region.GetID(), nil)
log.Debug("no best store to add replica", zap.Uint64("region-id", region.GetID()))
if filterByTempState {
r.regionWaitingList.Put(region.GetID(), nil)
}
return nil
}
newPeer := &metapb.Peer{StoreId: target}
Expand Down
23 changes: 13 additions & 10 deletions server/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ReplicaStrategy struct {
// the peer list with the peer removed as `coLocationStores`.
// Meanwhile, we need to provide more constraints to ensure that the isolation
// level cannot be reduced after replacement.
func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, extraFilters ...filter.Filter) uint64 {
func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, extraFilters ...filter.Filter) (uint64, bool) {
// The selection process uses a two-stage fashion. The first stage
// ignores the temporary state of the stores and selects the stores
// with the highest score according to the location label. The second
Expand All @@ -69,33 +69,36 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e

isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true}
target := filter.NewCandidates(s.cluster.GetStores()).
targetCandidate := filter.NewCandidates(s.cluster.GetStores()).
FilterTarget(s.cluster.GetOpts(), filters...).
Sort(isolationComparer).Reverse().Top(isolationComparer). // greater isolation score is better
Sort(filter.RegionScoreComparer(s.cluster.GetOpts())). // less region score is better
FilterTarget(s.cluster.GetOpts(), strictStateFilter).PickFirst() // the filter does not ignore temp states
Sort(isolationComparer).Reverse().Top(isolationComparer). // greater isolation score is better
Sort(filter.RegionScoreComparer(s.cluster.GetOpts())) // less region score is better
if targetCandidate.Len() == 0 {
return 0, false
}
target := targetCandidate.FilterTarget(s.cluster.GetOpts(), strictStateFilter).PickFirst() // the filter does not ignore temp states
if target == nil {
return 0
return 0, true // filter by temporary states
}
return target.GetID()
return target.GetID(), false
}

// SelectStoreToFix returns a store to replace down/offline old peer. The location
// placement after scheduling is allowed to be worse than original.
func (s *ReplicaStrategy) SelectStoreToFix(coLocationStores []*core.StoreInfo, old uint64) uint64 {
func (s *ReplicaStrategy) SelectStoreToFix(coLocationStores []*core.StoreInfo, old uint64) (uint64, bool) {
// trick to avoid creating a slice with `old` removed.
s.swapStoreToFirst(coLocationStores, old)
return s.SelectStoreToAdd(coLocationStores[1:])
}

// SelectStoreToImprove returns a store to replace oldStore. The location
// placement after scheduling should be better than original.
func (s *ReplicaStrategy) SelectStoreToImprove(coLocationStores []*core.StoreInfo, old uint64) uint64 {
func (s *ReplicaStrategy) SelectStoreToImprove(coLocationStores []*core.StoreInfo, old uint64) (uint64, bool) {
// trick to avoid creating a slice with `old` removed.
s.swapStoreToFirst(coLocationStores, old)
oldStore := s.cluster.GetStore(old)
if oldStore == nil {
return 0
return 0, false
}
filters := []filter.Filter{
filter.NewLocationImprover(s.checkerName, s.locationLabels, coLocationStores, oldStore),
Expand Down
14 changes: 9 additions & 5 deletions server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,12 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region
func (c *RuleChecker) addRulePeer(region *core.RegionInfo, rf *placement.RuleFit) (*operator.Operator, error) {
checkerCounter.WithLabelValues("rule_checker", "add-rule-peer").Inc()
ruleStores := c.getRuleFitStores(rf)
store := c.strategy(region, rf.Rule).SelectStoreToAdd(ruleStores)
store, filterByTempState := c.strategy(region, rf.Rule).SelectStoreToAdd(ruleStores)
if store == 0 {
checkerCounter.WithLabelValues("rule_checker", "no-store-add").Inc()
c.regionWaitingList.Put(region.GetID(), nil)
if filterByTempState {
c.regionWaitingList.Put(region.GetID(), nil)
}
return nil, errors.New("no store to add peer")
}
peer := &metapb.Peer{StoreId: store, Role: rf.Rule.Role.MetaPeerRole()}
Expand All @@ -151,10 +153,12 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, rf *placement.RuleFit
// The peer's store may in Offline or Down, need to be replace.
func (c *RuleChecker) replaceUnexpectRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) {
ruleStores := c.getRuleFitStores(rf)
store := c.strategy(region, rf.Rule).SelectStoreToFix(ruleStores, peer.GetStoreId())
store, filterByTempState := c.strategy(region, rf.Rule).SelectStoreToFix(ruleStores, peer.GetStoreId())
if store == 0 {
checkerCounter.WithLabelValues("rule_checker", "no-store-replace").Inc()
c.regionWaitingList.Put(region.GetID(), nil)
if filterByTempState {
c.regionWaitingList.Put(region.GetID(), nil)
}
return nil, errors.New("no store to replace peer")
}
newPeer := &metapb.Peer{StoreId: store, Role: rf.Rule.Role.MetaPeerRole()}
Expand Down Expand Up @@ -255,7 +259,7 @@ func (c *RuleChecker) fixBetterLocation(region *core.RegionInfo, rf *placement.R
if oldStore == 0 {
return nil, nil
}
newStore := strategy.SelectStoreToImprove(ruleStores, oldStore)
newStore, _ := strategy.SelectStoreToImprove(ruleStores, oldStore)
if newStore == 0 {
log.Debug("no replacement store", zap.Uint64("region-id", region.GetID()))
return nil, nil
Expand Down
5 changes: 5 additions & 0 deletions server/schedule/filter/candidates.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,8 @@ func (c *StoreCandidates) RandomPick() *core.StoreInfo {
}
return c.Stores[rand.Intn(len(c.Stores))]
}

// Len returns a length of candidate list.
func (c *StoreCandidates) Len() int {
return len(c.Stores)
}

0 comments on commit 204ed42

Please sign in to comment.