Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedulers: unify the use and GC of hot-region's pendings and regionPendings #3921

Merged
merged 12 commits into from
Aug 3, 2021
12 changes: 12 additions & 0 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ func (o *Operator) Status() OpStatus {
return o.status.Status()
}

// CheckAndGetStatus returns operator status after `CheckExpired` and `CheckTimeout`.
func (o *Operator) CheckAndGetStatus() OpStatus {
switch {
case o.CheckExpired():
return EXPIRED
case o.CheckTimeout():
return TIMEOUT
default:
return o.Status()
}
}

// GetReachTimeOf returns the time when operator reaches the given status.
func (o *Operator) GetReachTimeOf(st OpStatus) time.Time {
return o.status.ReachTimeOf(st)
Expand Down
94 changes: 50 additions & 44 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,10 @@ type hotScheduler struct {
types []rwType
r *rand.Rand

// states across multiple `Schedule` calls
pendings map[*pendingInfluence]struct{}
// regionPendings stores regionID -> Operator
// regionPendings stores regionID -> pendingInfluence
// this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't
// be selected if its owner region is tracked in this attribute.
regionPendings map[uint64]*operator.Operator
regionPendings map[uint64]*pendingInfluence

// temporary states but exported to API or metrics
stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail
Expand All @@ -98,8 +96,7 @@ func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionS
BaseScheduler: base,
types: []rwType{write, read},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
pendings: map[*pendingInfluence]struct{}{},
regionPendings: make(map[uint64]*operator.Operator),
regionPendings: make(map[uint64]*pendingInfluence),
conf: conf,
}
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
Expand Down Expand Up @@ -199,23 +196,32 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {

// summaryPendingInfluence calculate the summary of pending Influence for each store
// and clean the region from regionInfluence if they have ended operator.
// It makes each dim rate or count become `weight` times to the origin value.
func (h *hotScheduler) summaryPendingInfluence() {
h.pendingSums = summaryPendingInfluence(h.pendings, h.calcPendingWeight)
h.gcRegionPendings()
}

// gcRegionPendings check the region whether it need to be deleted from regionPendings depended on whether it have
// ended operator
func (h *hotScheduler) gcRegionPendings() {
for regionID, op := range h.regionPendings {
if op != nil && op.IsEnd() {
if time.Now().After(op.GetCreateTime().Add(h.conf.GetMaxZombieDuration())) {
log.Debug("gc pending influence in hot region scheduler", zap.Uint64("region-id", regionID), zap.Time("create", op.GetCreateTime()), zap.Time("now", time.Now()), zap.Duration("zombie", h.conf.GetMaxZombieDuration()))
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec()
delete(h.regionPendings, regionID)
}
maxZombieDur := h.conf.GetMaxZombieDuration()
ret := make(map[uint64]*Influence)
for id, p := range h.regionPendings {
weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur)
if needGC {
delete(h.regionPendings, id)
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec()
log.Debug("gc pending influence in hot region scheduler",
zap.Uint64("region-id", id),
zap.Time("create", p.op.GetCreateTime()),
zap.Time("now", time.Now()),
zap.Duration("zombie", maxZombieDur))
HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
continue
}
if _, ok := ret[p.to]; !ok {
ret[p.to] = &Influence{Loads: make([]float64, len(p.origin.Loads))}
}
ret[p.to] = ret[p.to].add(&p.origin, weight)
if _, ok := ret[p.from]; !ok {
ret[p.from] = &Influence{Loads: make([]float64, len(p.origin.Loads))}
}
ret[p.from] = ret[p.from].add(&p.origin, -weight)
}
h.pendingSums = ret
}

// summaryStoresLoad Load information of all available stores.
Expand Down Expand Up @@ -356,8 +362,7 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS
}

influence := newPendingInfluence(op, srcStore, dstStore, infl)
h.pendings[influence] = struct{}{}
h.regionPendings[regionID] = op
h.regionPendings[regionID] = influence

schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc()
return true
Expand Down Expand Up @@ -672,10 +677,11 @@ func (bs *balanceSolver) isRegionAvailable(region *core.RegionInfo) bool {
return false
}

if op, ok := bs.sche.regionPendings[region.GetID()]; ok {
if influence, ok := bs.sche.regionPendings[region.GetID()]; ok {
if bs.opTy == transferLeader {
return false
}
op := influence.op
if op.Kind()&operator.OpRegion != 0 ||
(op.Kind()&operator.OpLeader != 0 && !op.IsEnd()) {
return false
Expand Down Expand Up @@ -1153,33 +1159,33 @@ func (h *hotScheduler) GetPendingInfluence() map[uint64]*Influence {
return ret
}

// calcPendingWeight return the calculate weight of one Operator, the value will between [0,1]
func (h *hotScheduler) calcPendingWeight(op *operator.Operator) float64 {
if op.CheckExpired() || op.CheckTimeout() {
return 0
}
status := op.Status()
// calcPendingInfluence return the calculate weight of one Operator, the value will between [0,1]
func (h *hotScheduler) calcPendingInfluence(op *operator.Operator, maxZombieDur time.Duration) (weight float64, needGC bool) {
HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
status := op.CheckAndGetStatus()
if !operator.IsEndStatus(status) {
return 1
}
switch status {
case operator.SUCCESS:
zombieDur := time.Since(op.GetReachTimeOf(status))
maxZombieDur := h.conf.GetMaxZombieDuration()
if zombieDur >= maxZombieDur {
return 0
}
// TODO: use store statistics update time to make a more accurate estimation
return float64(maxZombieDur-zombieDur) / float64(maxZombieDur)
default:
return 0
return 1, false
}

// TODO: use store statistics update time to make a more accurate estimation
zombieDur := time.Since(op.GetReachTimeOf(status))
if zombieDur >= maxZombieDur {
weight = 0
} else {
weight = 1
}

needGC = weight == 0
if status != operator.SUCCESS {
// CANCELED, REPLACED, TIMEOUT, EXPIRED, etc.
// The actual weight is 0, but there is still a delay in GC.
nolouch marked this conversation as resolved.
Show resolved Hide resolved
weight = 0
}
return
}

func (h *hotScheduler) clearPendingInfluence() {
h.pendings = map[*pendingInfluence]struct{}{}
h.pendingSums = nil
h.regionPendings = make(map[uint64]*operator.Operator)
h.regionPendings = make(map[uint64]*pendingInfluence)
}

// rwType : the perspective of balance
Expand Down
43 changes: 23 additions & 20 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) {
c.Assert(err, IsNil)
hb := sche.(*hotScheduler)

notDoneOp := func(region *core.RegionInfo, ty opType) *operator.Operator {
notDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence {
var op *operator.Operator
var err error
switch ty {
Expand All @@ -95,40 +95,43 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) {
}
c.Assert(err, IsNil)
c.Assert(op, NotNil)
return op
op.Start()
operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second))
operator.SetOperatorStatusReachTime(op, operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second))
return newPendingInfluence(op, 2, 4, Influence{})
}
doneOp := func(region *core.RegionInfo, ty opType) *operator.Operator {
op := notDoneOp(region, ty)
op.Cancel()
return op
justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence {
infl := notDoneOpInfluence(region, ty)
infl.op.Cancel()
return infl
}
shouldRemoveOp := func(region *core.RegionInfo, ty opType) *operator.Operator {
op := doneOp(region, ty)
operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second))
return op
shouldRemoveOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence {
infl := justDoneOpInfluence(region, ty)
operator.SetOperatorStatusReachTime(infl.op, operator.CANCELED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second))
return infl
}
opCreaters := [3]func(region *core.RegionInfo, ty opType) *operator.Operator{shouldRemoveOp, notDoneOp, doneOp}
opInfluenceCreators := [3]func(region *core.RegionInfo, ty opType) *pendingInfluence{shouldRemoveOpInfluence, notDoneOpInfluence, justDoneOpInfluence}

typs := []opType{movePeer, transferLeader}

for i := 0; i < len(opCreaters); i++ {
for i, creator := range opInfluenceCreators {
for j, typ := range typs {
regionID := uint64(i*len(opCreaters) + j + 1)
regionID := uint64(i*len(typs) + j + 1)
region := newTestRegion(regionID)
hb.regionPendings[regionID] = opCreaters[i](region, typ)
hb.regionPendings[regionID] = creator(region, typ)
}
}

hb.gcRegionPendings()
hb.summaryPendingInfluence() // Calling this function will GC.

for i := 0; i < len(opCreaters); i++ {
for i := range opInfluenceCreators {
for j, typ := range typs {
regionID := uint64(i*len(opCreaters) + j + 1)
if i < 1 { // shouldRemoveOp
regionID := uint64(i*len(typs) + j + 1)
if i < 1 { // shouldRemoveOpInfluence
c.Assert(hb.regionPendings, Not(HasKey), regionID)
} else { // notDoneOp, doneOp
} else { // notDoneOpInfluence, justDoneOpInfluence
c.Assert(hb.regionPendings, HasKey, regionID)
kind := hb.regionPendings[regionID].Kind()
kind := hb.regionPendings[regionID].op.Kind()
switch typ {
case transferLeader:
c.Assert(kind&operator.OpLeader != 0, IsTrue)
Expand Down
21 changes: 0 additions & 21 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,27 +234,6 @@ func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence)
}
}

// summaryPendingInfluence calculate the summary pending Influence for each store and return storeID -> Influence
// It makes each dim rate or count become (1+w) times to the origin value while f is the function to provide w(weight)
func summaryPendingInfluence(pendings map[*pendingInfluence]struct{}, f func(*operator.Operator) float64) map[uint64]*Influence {
ret := make(map[uint64]*Influence)
for p := range pendings {
w := f(p.op)
if w == 0 {
delete(pendings, p)
}
if _, ok := ret[p.to]; !ok {
ret[p.to] = &Influence{Loads: make([]float64, len(p.origin.Loads))}
}
ret[p.to] = ret[p.to].add(&p.origin, w)
if _, ok := ret[p.from]; !ok {
ret[p.from] = &Influence{Loads: make([]float64, len(p.origin.Loads))}
}
ret[p.from] = ret[p.from].add(&p.origin, -w)
}
return ret
}

type storeLoad struct {
Loads []float64
Count float64
Expand Down