From c06414f582ff5319b1f273cb81e7edab1adf0e29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B7=B7=E6=B2=8CDM?= Date: Tue, 3 Aug 2021 17:01:07 +0800 Subject: [PATCH] schedulers: unify the use and GC of hot-region's pendings and regionPendings (#3921) * schedulers: unify the use and GC of hot-region's pendings and regionPendings Signed-off-by: HunDunDM * address comment Signed-off-by: HunDunDM * address comment Signed-off-by: HunDunDM * address comment Signed-off-by: HunDunDM * address comment Signed-off-by: HunDunDM * address comment Signed-off-by: HunDunDM * address comment Signed-off-by: HunDunDM * address comment Signed-off-by: HunDunDM --- server/schedule/operator/operator.go | 12 ++++ server/schedulers/hot_region.go | 94 +++++++++++++++------------- server/schedulers/hot_region_test.go | 43 +++++++------ server/schedulers/utils.go | 21 ------- 4 files changed, 85 insertions(+), 85 deletions(-) diff --git a/server/schedule/operator/operator.go b/server/schedule/operator/operator.go index 06f5b4b8f51..5c63e86650b 100644 --- a/server/schedule/operator/operator.go +++ b/server/schedule/operator/operator.go @@ -140,6 +140,18 @@ func (o *Operator) Status() OpStatus { return o.status.Status() } +// CheckAndGetStatus returns operator status after `CheckExpired` and `CheckTimeout`. +func (o *Operator) CheckAndGetStatus() OpStatus { + switch { + case o.CheckExpired(): + return EXPIRED + case o.CheckTimeout(): + return TIMEOUT + default: + return o.Status() + } +} + // GetReachTimeOf returns the time when operator reaches the given status. func (o *Operator) GetReachTimeOf(st OpStatus) time.Time { return o.status.ReachTimeOf(st) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index a96d98f85be..e11c67f6406 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -76,12 +76,10 @@ type hotScheduler struct { types []rwType r *rand.Rand - // states across multiple `Schedule` calls - pendings map[*pendingInfluence]struct{} - // regionPendings stores regionID -> Operator + // regionPendings stores regionID -> pendingInfluence // this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't // be selected if its owner region is tracked in this attribute. - regionPendings map[uint64]*operator.Operator + regionPendings map[uint64]*pendingInfluence // temporary states but exported to API or metrics stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail @@ -98,8 +96,7 @@ func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionS BaseScheduler: base, types: []rwType{write, read}, r: rand.New(rand.NewSource(time.Now().UnixNano())), - pendings: map[*pendingInfluence]struct{}{}, - regionPendings: make(map[uint64]*operator.Operator), + regionPendings: make(map[uint64]*pendingInfluence), conf: conf, } for ty := resourceType(0); ty < resourceTypeLen; ty++ { @@ -199,23 +196,32 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { // summaryPendingInfluence calculate the summary of pending Influence for each store // and clean the region from regionInfluence if they have ended operator. +// It makes each dim rate or count become `weight` times to the origin value. func (h *hotScheduler) summaryPendingInfluence() { - h.pendingSums = summaryPendingInfluence(h.pendings, h.calcPendingWeight) - h.gcRegionPendings() -} - -// gcRegionPendings check the region whether it need to be deleted from regionPendings depended on whether it have -// ended operator -func (h *hotScheduler) gcRegionPendings() { - for regionID, op := range h.regionPendings { - if op != nil && op.IsEnd() { - if time.Now().After(op.GetCreateTime().Add(h.conf.GetMaxZombieDuration())) { - log.Debug("gc pending influence in hot region scheduler", zap.Uint64("region-id", regionID), zap.Time("create", op.GetCreateTime()), zap.Time("now", time.Now()), zap.Duration("zombie", h.conf.GetMaxZombieDuration())) - schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec() - delete(h.regionPendings, regionID) - } + maxZombieDur := h.conf.GetMaxZombieDuration() + ret := make(map[uint64]*Influence) + for id, p := range h.regionPendings { + weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur) + if needGC { + delete(h.regionPendings, id) + schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec() + log.Debug("gc pending influence in hot region scheduler", + zap.Uint64("region-id", id), + zap.Time("create", p.op.GetCreateTime()), + zap.Time("now", time.Now()), + zap.Duration("zombie", maxZombieDur)) + continue } + if _, ok := ret[p.to]; !ok { + ret[p.to] = &Influence{Loads: make([]float64, len(p.origin.Loads))} + } + ret[p.to] = ret[p.to].add(&p.origin, weight) + if _, ok := ret[p.from]; !ok { + ret[p.from] = &Influence{Loads: make([]float64, len(p.origin.Loads))} + } + ret[p.from] = ret[p.from].add(&p.origin, -weight) } + h.pendingSums = ret } // summaryStoresLoad Load information of all available stores. @@ -356,8 +362,7 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS } influence := newPendingInfluence(op, srcStore, dstStore, infl) - h.pendings[influence] = struct{}{} - h.regionPendings[regionID] = op + h.regionPendings[regionID] = influence schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc() return true @@ -672,10 +677,11 @@ func (bs *balanceSolver) isRegionAvailable(region *core.RegionInfo) bool { return false } - if op, ok := bs.sche.regionPendings[region.GetID()]; ok { + if influence, ok := bs.sche.regionPendings[region.GetID()]; ok { if bs.opTy == transferLeader { return false } + op := influence.op if op.Kind()&operator.OpRegion != 0 || (op.Kind()&operator.OpLeader != 0 && !op.IsEnd()) { return false @@ -1153,33 +1159,33 @@ func (h *hotScheduler) GetPendingInfluence() map[uint64]*Influence { return ret } -// calcPendingWeight return the calculate weight of one Operator, the value will between [0,1] -func (h *hotScheduler) calcPendingWeight(op *operator.Operator) float64 { - if op.CheckExpired() || op.CheckTimeout() { - return 0 - } - status := op.Status() +// calcPendingInfluence return the calculate weight of one Operator, the value will between [0,1] +func (h *hotScheduler) calcPendingInfluence(op *operator.Operator, maxZombieDur time.Duration) (weight float64, needGC bool) { + status := op.CheckAndGetStatus() if !operator.IsEndStatus(status) { - return 1 - } - switch status { - case operator.SUCCESS: - zombieDur := time.Since(op.GetReachTimeOf(status)) - maxZombieDur := h.conf.GetMaxZombieDuration() - if zombieDur >= maxZombieDur { - return 0 - } - // TODO: use store statistics update time to make a more accurate estimation - return float64(maxZombieDur-zombieDur) / float64(maxZombieDur) - default: - return 0 + return 1, false + } + + // TODO: use store statistics update time to make a more accurate estimation + zombieDur := time.Since(op.GetReachTimeOf(status)) + if zombieDur >= maxZombieDur { + weight = 0 + } else { + weight = 1 + } + + needGC = weight == 0 + if status != operator.SUCCESS { + // CANCELED, REPLACED, TIMEOUT, EXPIRED, etc. + // The actual weight is 0, but there is still a delay in GC. + weight = 0 } + return } func (h *hotScheduler) clearPendingInfluence() { - h.pendings = map[*pendingInfluence]struct{}{} h.pendingSums = nil - h.regionPendings = make(map[uint64]*operator.Operator) + h.regionPendings = make(map[uint64]*pendingInfluence) } // rwType : the perspective of balance diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index 8a8ae0a0e54..12e23bca054 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -84,7 +84,7 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { c.Assert(err, IsNil) hb := sche.(*hotScheduler) - notDoneOp := func(region *core.RegionInfo, ty opType) *operator.Operator { + notDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence { var op *operator.Operator var err error switch ty { @@ -95,40 +95,43 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { } c.Assert(err, IsNil) c.Assert(op, NotNil) - return op + op.Start() + operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second)) + operator.SetOperatorStatusReachTime(op, operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second)) + return newPendingInfluence(op, 2, 4, Influence{}) } - doneOp := func(region *core.RegionInfo, ty opType) *operator.Operator { - op := notDoneOp(region, ty) - op.Cancel() - return op + justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence { + infl := notDoneOpInfluence(region, ty) + infl.op.Cancel() + return infl } - shouldRemoveOp := func(region *core.RegionInfo, ty opType) *operator.Operator { - op := doneOp(region, ty) - operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second)) - return op + shouldRemoveOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence { + infl := justDoneOpInfluence(region, ty) + operator.SetOperatorStatusReachTime(infl.op, operator.CANCELED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second)) + return infl } - opCreaters := [3]func(region *core.RegionInfo, ty opType) *operator.Operator{shouldRemoveOp, notDoneOp, doneOp} + opInfluenceCreators := [3]func(region *core.RegionInfo, ty opType) *pendingInfluence{shouldRemoveOpInfluence, notDoneOpInfluence, justDoneOpInfluence} typs := []opType{movePeer, transferLeader} - for i := 0; i < len(opCreaters); i++ { + for i, creator := range opInfluenceCreators { for j, typ := range typs { - regionID := uint64(i*len(opCreaters) + j + 1) + regionID := uint64(i*len(typs) + j + 1) region := newTestRegion(regionID) - hb.regionPendings[regionID] = opCreaters[i](region, typ) + hb.regionPendings[regionID] = creator(region, typ) } } - hb.gcRegionPendings() + hb.summaryPendingInfluence() // Calling this function will GC. - for i := 0; i < len(opCreaters); i++ { + for i := range opInfluenceCreators { for j, typ := range typs { - regionID := uint64(i*len(opCreaters) + j + 1) - if i < 1 { // shouldRemoveOp + regionID := uint64(i*len(typs) + j + 1) + if i < 1 { // shouldRemoveOpInfluence c.Assert(hb.regionPendings, Not(HasKey), regionID) - } else { // notDoneOp, doneOp + } else { // notDoneOpInfluence, justDoneOpInfluence c.Assert(hb.regionPendings, HasKey, regionID) - kind := hb.regionPendings[regionID].Kind() + kind := hb.regionPendings[regionID].op.Kind() switch typ { case transferLeader: c.Assert(kind&operator.OpLeader != 0, IsTrue) diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 422e85aa537..6a34d315409 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -234,27 +234,6 @@ func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence) } } -// summaryPendingInfluence calculate the summary pending Influence for each store and return storeID -> Influence -// It makes each dim rate or count become (1+w) times to the origin value while f is the function to provide w(weight) -func summaryPendingInfluence(pendings map[*pendingInfluence]struct{}, f func(*operator.Operator) float64) map[uint64]*Influence { - ret := make(map[uint64]*Influence) - for p := range pendings { - w := f(p.op) - if w == 0 { - delete(pendings, p) - } - if _, ok := ret[p.to]; !ok { - ret[p.to] = &Influence{Loads: make([]float64, len(p.origin.Loads))} - } - ret[p.to] = ret[p.to].add(&p.origin, w) - if _, ok := ret[p.from]; !ok { - ret[p.from] = &Influence{Loads: make([]float64, len(p.origin.Loads))} - } - ret[p.from] = ret[p.from].add(&p.origin, -w) - } - return ret -} - type storeLoad struct { Loads []float64 Count float64