Skip to content

Commit

Permalink
*: fix the wrong pending status (tikv#5080)
Browse files Browse the repository at this point in the history
close tikv#5095

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx authored Jun 7, 2022
1 parent d02a98a commit e19dc71
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 13 deletions.
9 changes: 3 additions & 6 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,10 @@ func (h *regionsHandler) CheckRegionsReplicated(w http.ResponseWriter, r *http.R
for _, region := range regions {
if !schedule.IsRegionReplicated(rc, region) {
state = "INPROGRESS"
for _, item := range rc.GetCoordinator().GetWaitingRegions() {
if item.Key == region.GetID() {
state = "PENDING"
break
}
if rc.GetCoordinator().IsPendingRegion(region.GetID()) {
state = "PENDING"
break
}
break
}
}
failpoint.Inject("mockPending", func(val failpoint.Value) {
Expand Down
4 changes: 4 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func (c *coordinator) GetWaitingRegions() []*cache.Item {
return c.checkers.GetWaitingRegions()
}

func (c *coordinator) IsPendingRegion(region uint64) bool {
return c.checkers.IsPendingRegion(region)
}

// patrolRegions is used to scan regions.
// The checkers will check these regions to decide if they need to do some operations.
func (c *coordinator) patrolRegions() {
Expand Down
6 changes: 6 additions & 0 deletions server/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ func (c *Controller) ClearSuspectKeyRanges() {
c.suspectKeyRanges.Clear()
}

// IsPendingRegion returns true if the given region is in the pending list.
func (c *Controller) IsPendingRegion(regionID uint64) bool {
_, exist := c.ruleChecker.pendingList.Get(regionID)
return exist
}

// GetPauseController returns pause controller of the checker
func (c *Controller) GetPauseController(name string) (*PauseController, error) {
switch name {
Expand Down
26 changes: 19 additions & 7 deletions server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ var (
errNoNewLeader = errors.New("no new leader")
)

const maxPendingListLen = 100000

// RuleChecker fix/improve region by placement rules.
type RuleChecker struct {
PauseController
cluster schedule.Cluster
ruleManager *placement.RuleManager
name string
regionWaitingList cache.Cache
pendingList cache.Cache
record *recorder
}

Expand All @@ -56,6 +59,7 @@ func NewRuleChecker(cluster schedule.Cluster, ruleManager *placement.RuleManager
ruleManager: ruleManager,
name: "rule-checker",
regionWaitingList: regionWaitingList,
pendingList: cache.NewDefaultCache(maxPendingListLen),
record: newRecord(),
}
}
Expand Down Expand Up @@ -107,6 +111,7 @@ func (c *RuleChecker) CheckWithFit(region *core.RegionInfo, fit *placement.Regio
if err != nil {
log.Debug("fail to fix orphan peer", errs.ZapError(err))
} else if op != nil {
c.pendingList.Remove(region.GetID())
return op
}
for _, rf := range fit.RuleFits {
Expand All @@ -116,6 +121,7 @@ func (c *RuleChecker) CheckWithFit(region *core.RegionInfo, fit *placement.Regio
continue
}
if op != nil {
c.pendingList.Remove(region.GetID())
return op
}
}
Expand Down Expand Up @@ -164,9 +170,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, rf *placement.RuleFit
store, filterByTempState := c.strategy(region, rf.Rule).SelectStoreToAdd(ruleStores)
if store == 0 {
checkerCounter.WithLabelValues("rule_checker", "no-store-add").Inc()
if filterByTempState {
c.regionWaitingList.Put(region.GetID(), nil)
}
c.handleFilterState(region, filterByTempState)
return nil, errNoStoreToAdd
}
peer := &metapb.Peer{StoreId: store, Role: rf.Rule.Role.MetaPeerRole()}
Expand All @@ -184,9 +188,7 @@ func (c *RuleChecker) replaceUnexpectRulePeer(region *core.RegionInfo, rf *place
store, filterByTempState := c.strategy(region, rf.Rule).SelectStoreToFix(ruleStores, peer.GetStoreId())
if store == 0 {
checkerCounter.WithLabelValues("rule_checker", "no-store-replace").Inc()
if filterByTempState {
c.regionWaitingList.Put(region.GetID(), nil)
}
c.handleFilterState(region, filterByTempState)
return nil, errNoStoreToReplace
}
newPeer := &metapb.Peer{StoreId: store, Role: rf.Rule.Role.MetaPeerRole()}
Expand Down Expand Up @@ -291,9 +293,10 @@ func (c *RuleChecker) fixBetterLocation(region *core.RegionInfo, rf *placement.R
if oldStore == 0 {
return nil, nil
}
newStore, _ := strategy.SelectStoreToImprove(ruleStores, oldStore)
newStore, filterByTempState := strategy.SelectStoreToImprove(ruleStores, oldStore)
if newStore == 0 {
log.Debug("no replacement store", zap.Uint64("region-id", region.GetID()))
c.handleFilterState(region, filterByTempState)
return nil, nil
}
checkerCounter.WithLabelValues("rule_checker", "move-to-better-location").Inc()
Expand Down Expand Up @@ -382,6 +385,15 @@ func (c *RuleChecker) getRuleFitStores(rf *placement.RuleFit) []*core.StoreInfo
return stores
}

func (c *RuleChecker) handleFilterState(region *core.RegionInfo, filterByTempState bool) {
if filterByTempState {
c.regionWaitingList.Put(region.GetID(), nil)
c.pendingList.Remove(region.GetID())
} else {
c.pendingList.Put(region.GetID(), nil)
}
}

type recorder struct {
offlineLeaderCounter map[uint64]uint64
lastUpdateTime time.Time
Expand Down
21 changes: 21 additions & 0 deletions server/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,3 +820,24 @@ func (s *testRuleCheckerSuite) TestOfflineAndDownStore(c *C) {
c.Assert(op, NotNil)
c.Assert(op.Desc(), Equals, "replace-rule-down-peer")
}

func (s *testRuleCheckerSuite) TestPendingList(c *C) {
// no enough store
s.cluster.AddLeaderStore(1, 1)
s.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2)
op := s.rc.Check(s.cluster.GetRegion(1))
c.Assert(op, IsNil)
_, exist := s.rc.pendingList.Get(1)
c.Assert(exist, IsTrue)

// add more stores
s.cluster.AddLeaderStore(2, 1)
s.cluster.AddLeaderStore(3, 1)
op = s.rc.Check(s.cluster.GetRegion(1))
c.Assert(op, NotNil)
c.Assert(op.Desc(), Equals, "add-rule-peer")
c.Assert(op.GetPriorityLevel(), Equals, core.HighPriority)
c.Assert(op.Step(0).(operator.AddLearner).ToStore, Equals, uint64(3))
_, exist = s.rc.pendingList.Get(1)
c.Assert(exist, IsFalse)
}

0 comments on commit e19dc71

Please sign in to comment.