Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: simplify pending influence #3658

Merged
merged 7 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 14 additions & 30 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ type hotScheduler struct {

// states across multiple `Schedule` calls
pendings map[*pendingInfluence]struct{}
// regionPendings stores regionID -> [opType]Operator
// regionPendings stores regionID -> Operator
// this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't
// be selected if its owner region is tracked in this attribute.
regionPendings map[uint64][2]*operator.Operator
regionPendings map[uint64]*operator.Operator

// temporary states but exported to API or metrics
stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail
Expand All @@ -114,7 +114,7 @@ func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionS
peerLimit: 1,
types: []rwType{write, read},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
regionPendings: make(map[uint64][2]*operator.Operator),
regionPendings: make(map[uint64]*operator.Operator),
conf: conf,
}
ret.pendings = map[*pendingInfluence]struct{}{}
Expand Down Expand Up @@ -243,25 +243,14 @@ func (h *hotScheduler) summaryPendingInfluence() {
// gcRegionPendings check the region whether it need to be deleted from regionPendings depended on whether it have
// ended operator
func (h *hotScheduler) gcRegionPendings() {
for regionID, pendings := range h.regionPendings {
empty := true
for ty, op := range pendings {
if op != nil && op.IsEnd() {
if time.Now().After(op.GetCreateTime().Add(h.conf.GetMaxZombieDuration())) {
log.Debug("gc pending influence in hot region scheduler", zap.Uint64("region-id", regionID), zap.Time("create", op.GetCreateTime()), zap.Time("now", time.Now()), zap.Duration("zombie", h.conf.GetMaxZombieDuration()))
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec()
pendings[ty] = nil
}
}
if pendings[ty] != nil {
empty = false
for regionID, op := range h.regionPendings {
if op != nil && op.IsEnd() {
if time.Now().After(op.GetCreateTime().Add(h.conf.GetMaxZombieDuration())) {
log.Debug("gc pending influence in hot region scheduler", zap.Uint64("region-id", regionID), zap.Time("create", op.GetCreateTime()), zap.Time("now", time.Now()), zap.Duration("zombie", h.conf.GetMaxZombieDuration()))
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec()
delete(h.regionPendings, regionID)
}
}
if empty {
delete(h.regionPendings, regionID)
} else {
h.regionPendings[regionID] = pendings
}
}
}

Expand Down Expand Up @@ -388,12 +377,7 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS

influence := newPendingInfluence(op, srcStore, dstStore, infl)
h.pendings[influence] = struct{}{}
h.regionPendings[regionID] = [2]*operator.Operator{nil, nil}
{
tmp := h.regionPendings[regionID]
tmp[opTy] = op
h.regionPendings[regionID] = tmp
}
h.regionPendings[regionID] = op

schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc()
return true
Expand Down Expand Up @@ -684,12 +668,12 @@ func (bs *balanceSolver) isRegionAvailable(region *core.RegionInfo) bool {
return false
}

if pendings, ok := bs.sche.regionPendings[region.GetID()]; ok {
if op, ok := bs.sche.regionPendings[region.GetID()]; ok {
if bs.opTy == transferLeader {
return false
}
if pendings[movePeer] != nil ||
(pendings[transferLeader] != nil && !pendings[transferLeader].IsEnd()) {
if op.Kind()&operator.OpRegion != 0 ||
(op.Kind()&operator.OpLeader != 0 && !op.IsEnd()) {
return false
}
}
Expand Down Expand Up @@ -1128,7 +1112,7 @@ func (h *hotScheduler) calcPendingWeight(op *operator.Operator) float64 {
func (h *hotScheduler) clearPendingInfluence() {
h.pendings = map[*pendingInfluence]struct{}{}
h.pendingSums = nil
h.regionPendings = make(map[uint64][2]*operator.Operator)
h.regionPendings = make(map[uint64]*operator.Operator)
}

// rwType : the perspective of balance
Expand Down
39 changes: 17 additions & 22 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) {
c.Assert(err, IsNil)
hb := sche.(*hotScheduler)

nilOp := func(region *core.RegionInfo, ty opType) *operator.Operator {
return nil
}
notDoneOp := func(region *core.RegionInfo, ty opType) *operator.Operator {
var op *operator.Operator
var err error
Expand All @@ -88,38 +85,36 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) {
operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second))
return op
}
opCreaters := [4]func(region *core.RegionInfo, ty opType) *operator.Operator{nilOp, shouldRemoveOp, notDoneOp, doneOp}
opCreaters := [3]func(region *core.RegionInfo, ty opType) *operator.Operator{shouldRemoveOp, notDoneOp, doneOp}

typs := []opType{movePeer, transferLeader}

for i := 0; i < len(opCreaters); i++ {
for j := 0; j < len(opCreaters); j++ {
for j, typ := range typs {
regionID := uint64(i*len(opCreaters) + j + 1)
region := newTestRegion(regionID)
hb.regionPendings[regionID] = [2]*operator.Operator{
movePeer: opCreaters[i](region, movePeer),
transferLeader: opCreaters[j](region, transferLeader),
}
hb.regionPendings[regionID] = opCreaters[i](region, typ)
}
}

hb.gcRegionPendings()

for i := 0; i < len(opCreaters); i++ {
for j := 0; j < len(opCreaters); j++ {
for j, typ := range typs {
regionID := uint64(i*len(opCreaters) + j + 1)
if i < 2 && j < 2 {
if i < 1 { // shouldRemoveOp
c.Assert(hb.regionPendings, Not(HasKey), regionID)
} else if i < 2 {
c.Assert(hb.regionPendings, HasKey, regionID)
c.Assert(hb.regionPendings[regionID][movePeer], IsNil)
c.Assert(hb.regionPendings[regionID][transferLeader], NotNil)
} else if j < 2 {
} else { // notDoneOp, doneOp
c.Assert(hb.regionPendings, HasKey, regionID)
c.Assert(hb.regionPendings[regionID][movePeer], NotNil)
c.Assert(hb.regionPendings[regionID][transferLeader], IsNil)
} else {
c.Assert(hb.regionPendings, HasKey, regionID)
c.Assert(hb.regionPendings[regionID][movePeer], NotNil)
c.Assert(hb.regionPendings[regionID][transferLeader], NotNil)
kind := hb.regionPendings[regionID].Kind()
switch typ {
case transferLeader:
c.Assert(kind&operator.OpLeader != 0, IsTrue)
c.Assert(kind&operator.OpRegion == 0, IsTrue)
case movePeer:
c.Assert(kind&operator.OpLeader == 0, IsTrue)
c.Assert(kind&operator.OpRegion != 0, IsTrue)
}
}
}
}
Expand Down