Skip to content

Commit

Permalink
schedulers: unify the use and GC of hot-region's pendings and regionP…
Browse files Browse the repository at this point in the history
…endings (#3921)

* schedulers: unify the use and GC of hot-region's pendings and regionPendings

Signed-off-by: HunDunDM <[email protected]>

* address comment

Signed-off-by: HunDunDM <[email protected]>

* address comment

Signed-off-by: HunDunDM <[email protected]>

* address comment

Signed-off-by: HunDunDM <[email protected]>

* address comment

Signed-off-by: HunDunDM <[email protected]>

* address comment

Signed-off-by: HunDunDM <[email protected]>

* address comment

Signed-off-by: HunDunDM <[email protected]>

* address comment

Signed-off-by: HunDunDM <[email protected]>
  • Loading branch information
HunDunDM authored Aug 3, 2021
1 parent 386b044 commit c06414f
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 85 deletions.
12 changes: 12 additions & 0 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ func (o *Operator) Status() OpStatus {
return o.status.Status()
}

// CheckAndGetStatus returns operator status after `CheckExpired` and `CheckTimeout`.
func (o *Operator) CheckAndGetStatus() OpStatus {
switch {
case o.CheckExpired():
return EXPIRED
case o.CheckTimeout():
return TIMEOUT
default:
return o.Status()
}
}

// GetReachTimeOf returns the time when operator reaches the given status.
func (o *Operator) GetReachTimeOf(st OpStatus) time.Time {
return o.status.ReachTimeOf(st)
Expand Down
94 changes: 50 additions & 44 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,10 @@ type hotScheduler struct {
types []rwType
r *rand.Rand

// states across multiple `Schedule` calls
pendings map[*pendingInfluence]struct{}
// regionPendings stores regionID -> Operator
// regionPendings stores regionID -> pendingInfluence
// this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't
// be selected if its owner region is tracked in this attribute.
regionPendings map[uint64]*operator.Operator
regionPendings map[uint64]*pendingInfluence

// temporary states but exported to API or metrics
stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail
Expand All @@ -98,8 +96,7 @@ func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionS
BaseScheduler: base,
types: []rwType{write, read},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
pendings: map[*pendingInfluence]struct{}{},
regionPendings: make(map[uint64]*operator.Operator),
regionPendings: make(map[uint64]*pendingInfluence),
conf: conf,
}
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
Expand Down Expand Up @@ -199,23 +196,32 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {

// summaryPendingInfluence calculate the summary of pending Influence for each store
// and clean the region from regionInfluence if they have ended operator.
// It makes each dim rate or count become `weight` times to the origin value.
func (h *hotScheduler) summaryPendingInfluence() {
h.pendingSums = summaryPendingInfluence(h.pendings, h.calcPendingWeight)
h.gcRegionPendings()
}

// gcRegionPendings check the region whether it need to be deleted from regionPendings depended on whether it have
// ended operator
func (h *hotScheduler) gcRegionPendings() {
for regionID, op := range h.regionPendings {
if op != nil && op.IsEnd() {
if time.Now().After(op.GetCreateTime().Add(h.conf.GetMaxZombieDuration())) {
log.Debug("gc pending influence in hot region scheduler", zap.Uint64("region-id", regionID), zap.Time("create", op.GetCreateTime()), zap.Time("now", time.Now()), zap.Duration("zombie", h.conf.GetMaxZombieDuration()))
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec()
delete(h.regionPendings, regionID)
}
maxZombieDur := h.conf.GetMaxZombieDuration()
ret := make(map[uint64]*Influence)
for id, p := range h.regionPendings {
weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur)
if needGC {
delete(h.regionPendings, id)
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec()
log.Debug("gc pending influence in hot region scheduler",
zap.Uint64("region-id", id),
zap.Time("create", p.op.GetCreateTime()),
zap.Time("now", time.Now()),
zap.Duration("zombie", maxZombieDur))
continue
}
if _, ok := ret[p.to]; !ok {
ret[p.to] = &Influence{Loads: make([]float64, len(p.origin.Loads))}
}
ret[p.to] = ret[p.to].add(&p.origin, weight)
if _, ok := ret[p.from]; !ok {
ret[p.from] = &Influence{Loads: make([]float64, len(p.origin.Loads))}
}
ret[p.from] = ret[p.from].add(&p.origin, -weight)
}
h.pendingSums = ret
}

// summaryStoresLoad Load information of all available stores.
Expand Down Expand Up @@ -356,8 +362,7 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS
}

influence := newPendingInfluence(op, srcStore, dstStore, infl)
h.pendings[influence] = struct{}{}
h.regionPendings[regionID] = op
h.regionPendings[regionID] = influence

schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc()
return true
Expand Down Expand Up @@ -672,10 +677,11 @@ func (bs *balanceSolver) isRegionAvailable(region *core.RegionInfo) bool {
return false
}

if op, ok := bs.sche.regionPendings[region.GetID()]; ok {
if influence, ok := bs.sche.regionPendings[region.GetID()]; ok {
if bs.opTy == transferLeader {
return false
}
op := influence.op
if op.Kind()&operator.OpRegion != 0 ||
(op.Kind()&operator.OpLeader != 0 && !op.IsEnd()) {
return false
Expand Down Expand Up @@ -1153,33 +1159,33 @@ func (h *hotScheduler) GetPendingInfluence() map[uint64]*Influence {
return ret
}

// calcPendingWeight return the calculate weight of one Operator, the value will between [0,1]
func (h *hotScheduler) calcPendingWeight(op *operator.Operator) float64 {
if op.CheckExpired() || op.CheckTimeout() {
return 0
}
status := op.Status()
// calcPendingInfluence return the calculate weight of one Operator, the value will between [0,1]
func (h *hotScheduler) calcPendingInfluence(op *operator.Operator, maxZombieDur time.Duration) (weight float64, needGC bool) {
status := op.CheckAndGetStatus()
if !operator.IsEndStatus(status) {
return 1
}
switch status {
case operator.SUCCESS:
zombieDur := time.Since(op.GetReachTimeOf(status))
maxZombieDur := h.conf.GetMaxZombieDuration()
if zombieDur >= maxZombieDur {
return 0
}
// TODO: use store statistics update time to make a more accurate estimation
return float64(maxZombieDur-zombieDur) / float64(maxZombieDur)
default:
return 0
return 1, false
}

// TODO: use store statistics update time to make a more accurate estimation
zombieDur := time.Since(op.GetReachTimeOf(status))
if zombieDur >= maxZombieDur {
weight = 0
} else {
weight = 1
}

needGC = weight == 0
if status != operator.SUCCESS {
// CANCELED, REPLACED, TIMEOUT, EXPIRED, etc.
// The actual weight is 0, but there is still a delay in GC.
weight = 0
}
return
}

func (h *hotScheduler) clearPendingInfluence() {
h.pendings = map[*pendingInfluence]struct{}{}
h.pendingSums = nil
h.regionPendings = make(map[uint64]*operator.Operator)
h.regionPendings = make(map[uint64]*pendingInfluence)
}

// rwType : the perspective of balance
Expand Down
43 changes: 23 additions & 20 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) {
c.Assert(err, IsNil)
hb := sche.(*hotScheduler)

notDoneOp := func(region *core.RegionInfo, ty opType) *operator.Operator {
notDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence {
var op *operator.Operator
var err error
switch ty {
Expand All @@ -95,40 +95,43 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) {
}
c.Assert(err, IsNil)
c.Assert(op, NotNil)
return op
op.Start()
operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second))
operator.SetOperatorStatusReachTime(op, operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second))
return newPendingInfluence(op, 2, 4, Influence{})
}
doneOp := func(region *core.RegionInfo, ty opType) *operator.Operator {
op := notDoneOp(region, ty)
op.Cancel()
return op
justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence {
infl := notDoneOpInfluence(region, ty)
infl.op.Cancel()
return infl
}
shouldRemoveOp := func(region *core.RegionInfo, ty opType) *operator.Operator {
op := doneOp(region, ty)
operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second))
return op
shouldRemoveOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence {
infl := justDoneOpInfluence(region, ty)
operator.SetOperatorStatusReachTime(infl.op, operator.CANCELED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second))
return infl
}
opCreaters := [3]func(region *core.RegionInfo, ty opType) *operator.Operator{shouldRemoveOp, notDoneOp, doneOp}
opInfluenceCreators := [3]func(region *core.RegionInfo, ty opType) *pendingInfluence{shouldRemoveOpInfluence, notDoneOpInfluence, justDoneOpInfluence}

typs := []opType{movePeer, transferLeader}

for i := 0; i < len(opCreaters); i++ {
for i, creator := range opInfluenceCreators {
for j, typ := range typs {
regionID := uint64(i*len(opCreaters) + j + 1)
regionID := uint64(i*len(typs) + j + 1)
region := newTestRegion(regionID)
hb.regionPendings[regionID] = opCreaters[i](region, typ)
hb.regionPendings[regionID] = creator(region, typ)
}
}

hb.gcRegionPendings()
hb.summaryPendingInfluence() // Calling this function will GC.

for i := 0; i < len(opCreaters); i++ {
for i := range opInfluenceCreators {
for j, typ := range typs {
regionID := uint64(i*len(opCreaters) + j + 1)
if i < 1 { // shouldRemoveOp
regionID := uint64(i*len(typs) + j + 1)
if i < 1 { // shouldRemoveOpInfluence
c.Assert(hb.regionPendings, Not(HasKey), regionID)
} else { // notDoneOp, doneOp
} else { // notDoneOpInfluence, justDoneOpInfluence
c.Assert(hb.regionPendings, HasKey, regionID)
kind := hb.regionPendings[regionID].Kind()
kind := hb.regionPendings[regionID].op.Kind()
switch typ {
case transferLeader:
c.Assert(kind&operator.OpLeader != 0, IsTrue)
Expand Down
21 changes: 0 additions & 21 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,27 +234,6 @@ func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence)
}
}

// summaryPendingInfluence calculate the summary pending Influence for each store and return storeID -> Influence
// It makes each dim rate or count become (1+w) times to the origin value while f is the function to provide w(weight)
func summaryPendingInfluence(pendings map[*pendingInfluence]struct{}, f func(*operator.Operator) float64) map[uint64]*Influence {
ret := make(map[uint64]*Influence)
for p := range pendings {
w := f(p.op)
if w == 0 {
delete(pendings, p)
}
if _, ok := ret[p.to]; !ok {
ret[p.to] = &Influence{Loads: make([]float64, len(p.origin.Loads))}
}
ret[p.to] = ret[p.to].add(&p.origin, w)
if _, ok := ret[p.from]; !ok {
ret[p.from] = &Influence{Loads: make([]float64, len(p.origin.Loads))}
}
ret[p.from] = ret[p.from].add(&p.origin, -w)
}
return ret
}

type storeLoad struct {
Loads []float64
Count float64
Expand Down

0 comments on commit c06414f

Please sign in to comment.