diff --git a/server/schedule/operator/operator.go b/server/schedule/operator/operator.go index 06f5b4b8f51..5c63e86650b 100644 --- a/server/schedule/operator/operator.go +++ b/server/schedule/operator/operator.go @@ -140,6 +140,18 @@ func (o *Operator) Status() OpStatus { return o.status.Status() } +// CheckAndGetStatus returns operator status after `CheckExpired` and `CheckTimeout`. +func (o *Operator) CheckAndGetStatus() OpStatus { + switch { + case o.CheckExpired(): + return EXPIRED + case o.CheckTimeout(): + return TIMEOUT + default: + return o.Status() + } +} + // GetReachTimeOf returns the time when operator reaches the given status. func (o *Operator) GetReachTimeOf(st OpStatus) time.Time { return o.status.ReachTimeOf(st) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index edd9f8244e9..3f5772c5ba0 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -84,17 +84,15 @@ type hotScheduler struct { name string *BaseScheduler sync.RWMutex - leaderLimit uint64 - peerLimit uint64 - types []rwType - r *rand.Rand + types []rwType + r *rand.Rand // states across multiple `Schedule` calls pendings [resourceTypeLen]map[*pendingInfluence]struct{} - // regionPendings stores regionID -> [opType]Operator + // regionPendings stores regionID -> Operator // this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't // be selected if its owner region is tracked in this attribute. - regionPendings map[uint64][2]*operator.Operator + regionPendings map[uint64]*operator.Operator // temporary states but exported to API or metrics stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail @@ -110,11 +108,9 @@ func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionS ret := &hotScheduler{ name: HotRegionName, BaseScheduler: base, - leaderLimit: 1, - peerLimit: 1, types: []rwType{write, read}, r: rand.New(rand.NewSource(time.Now().UnixNano())), - regionPendings: make(map[uint64][2]*operator.Operator), + regionPendings: make(map[uint64]*operator.Operator), conf: conf, } for ty := resourceType(0); ty < resourceTypeLen; ty++ { @@ -174,7 +170,7 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope h.Lock() defer h.Unlock() - h.prepareForBalance(cluster) + h.prepareForBalance(typ, cluster) switch typ { case read: @@ -187,13 +183,15 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope // prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for // each store -func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { +func (h *hotScheduler) prepareForBalance(typ rwType, cluster opt.Cluster) { h.summaryPendingInfluence() stores := cluster.GetStores() storesLoads := cluster.GetStoresLoads() - { // update read statistics + switch typ { + case read: + // update read statistics regionRead := cluster.RegionReadStats() h.stLoadInfos[readLeader] = summaryStoresLoad( stores, @@ -201,9 +199,8 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { h.pendingSums[readLeader], regionRead, read, core.LeaderKind) - } - - { // update write statistics + case write: + // update write statistics regionWrite := cluster.RegionWriteStats() h.stLoadInfos[writeLeader] = summaryStoresLoad( stores, @@ -223,35 +220,30 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { // summaryPendingInfluence calculate the summary of pending Influence for each store // and clean the region from regionInfluence if they have ended operator. +// It makes each dim rate or count become `weight` times to the origin value. func (h *hotScheduler) summaryPendingInfluence() { for ty := resourceType(0); ty < resourceTypeLen; ty++ { - h.pendingSums[ty] = summaryPendingInfluence(h.pendings[ty], h.calcPendingWeight) - } - h.gcRegionPendings() -} - -// gcRegionPendings check the region whether it need to be deleted from regionPendings depended on whether it have -// ended operator -func (h *hotScheduler) gcRegionPendings() { - for regionID, pendings := range h.regionPendings { - empty := true - for ty, op := range pendings { - if op != nil && op.IsEnd() { - if time.Now().After(op.GetCreateTime().Add(h.conf.GetMaxZombieDuration())) { - log.Debug("gc pending influence in hot region scheduler", zap.Uint64("region-id", regionID), zap.Time("create", op.GetCreateTime()), zap.Time("now", time.Now()), zap.Duration("zombie", h.conf.GetMaxZombieDuration())) - schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec() - pendings[ty] = nil - } - } - if pendings[ty] != nil { - empty = false + ret := make(map[uint64]Influence) + pendings := h.pendings[ty] + for p := range pendings { + maxZombieDur := p.maxZombieDuration + weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur) + if needGC { + id := p.op.RegionID() + delete(h.regionPendings, id) + delete(pendings, p) + schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec() + log.Debug("gc pending influence in hot region scheduler", + zap.Uint64("region-id", id), + zap.Time("create", p.op.GetCreateTime()), + zap.Time("now", time.Now()), + zap.Duration("zombie", maxZombieDur)) + continue } + ret[p.to] = ret[p.to].add(&p.origin, weight) + ret[p.from] = ret[p.from].add(&p.origin, -weight) } - if empty { - delete(h.regionPendings, regionID) - } else { - h.regionPendings[regionID] = pendings - } + h.pendingSums[ty] = ret } } @@ -267,9 +259,10 @@ func summaryStoresLoad( ) map[uint64]*storeLoadDetail { // loadDetail stores the storeID -> hotPeers stat and its current and future stat(key/byte rate,count) loadDetail := make(map[uint64]*storeLoadDetail, len(storesLoads)) - allByteSum := 0.0 - allKeySum := 0.0 - allCount := 0.0 + allTiKVByteSum := 0.0 + allTiKVKeySum := 0.0 + allTiKVCount := 0 + allTiKVHotPeersCount := 0 for _, store := range stores { id := store.GetID() @@ -280,6 +273,7 @@ func summaryStoresLoad( if kind == core.LeaderKind && !store.AllowLeaderTransfer() { continue } + isTiFlash := core.IsTiFlashStore(store.GetMeta()) var byteRate, keyRate float64 switch rwTy { @@ -317,9 +311,13 @@ func summaryStoresLoad( hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(keySum) } } - allByteSum += byteRate - allKeySum += keyRate - allCount += float64(len(hotPeers)) + + if !isTiFlash { + allTiKVByteSum += byteRate + allTiKVKeySum += keyRate + allTiKVCount += 1 + allTiKVHotPeersCount += len(hotPeers) + } // Build store load prediction from current load and pending influence. stLoadPred := (&storeLoad{ @@ -336,27 +334,27 @@ func summaryStoresLoad( } } - storeLen := float64(len(storesLoads)) + expect := storeLoad{ + ByteRate: allTiKVByteSum / float64(allTiKVCount), + KeyRate: allTiKVKeySum / float64(allTiKVCount), + Count: float64(allTiKVHotPeersCount) / float64(allTiKVCount), + } + // store expectation byte/key rate and count for each store-load detail. for id, detail := range loadDetail { - byteExp := allByteSum / storeLen - keyExp := allKeySum / storeLen - countExp := allCount / storeLen - detail.LoadPred.Expect.ByteRate = byteExp - detail.LoadPred.Expect.KeyRate = keyExp - detail.LoadPred.Expect.Count = countExp + detail.LoadPred.Expect = expect // Debug { ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(byteExp) + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expect.ByteRate) } { ty := "exp-key-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(keyExp) + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expect.KeyRate) } { ty := "exp-count-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(countExp) + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expect.Count) } } return loadDetail @@ -377,7 +375,7 @@ func filterHotPeers( return ret } -func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, rwTy rwType, opTy opType) bool { +func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, rwTy rwType, opTy opType, maxZombieDur time.Duration) bool { regionID := op.RegionID() _, ok := h.regionPendings[regionID] if ok { @@ -385,16 +383,10 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS return false } - influence := newPendingInfluence(op, srcStore, dstStore, infl) + influence := newPendingInfluence(op, srcStore, dstStore, infl, maxZombieDur) rcTy := toResourceType(rwTy, opTy) h.pendings[rcTy][influence] = struct{}{} - - h.regionPendings[regionID] = [2]*operator.Operator{nil, nil} - { // h.pendingOpInfos[regionID][ty] = influence - tmp := h.regionPendings[regionID] - tmp[opTy] = op - h.regionPendings[regionID] = tmp - } + h.regionPendings[regionID] = op schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc() return true @@ -535,9 +527,9 @@ func (bs *balanceSolver) solve() []*operator.Operator { } bs.cur = &solution{} var ( - best *solution - ops []*operator.Operator - infls []Influence + best *solution + op *operator.Operator + infl Influence ) for srcStoreID := range bs.filterSrcStores() { @@ -553,9 +545,9 @@ func (bs *balanceSolver) solve() []*operator.Operator { bs.cur.dstStoreID = dstStoreID bs.calcProgressiveRank() if bs.cur.progressiveRank < 0 && bs.betterThan(best) { - if newOps, newInfls := bs.buildOperators(); len(newOps) > 0 { - ops = newOps - infls = newInfls + if newOp, newInfl := bs.buildOperator(); newOp != nil { + op = newOp + infl = newInfl clone := *bs.cur best = &clone } @@ -564,13 +556,25 @@ func (bs *balanceSolver) solve() []*operator.Operator { } } - for i := 0; i < len(ops); i++ { - // TODO: multiple operators need to be atomic. - if !bs.sche.addPendingInfluence(ops[i], best.srcStoreID, best.dstStoreID, infls[i], bs.rwTy, bs.opTy) { - return nil - } + if best == nil { + return nil + } + + // Depending on the source of the statistics used, a different ZombieDuration will be used. + // If the statistics are from the sum of Regions, there will be a longer ZombieDuration. + var maxZombieDur time.Duration + switch { + case bs.rwTy == write && bs.opTy == transferLeader: + maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration() + default: + maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration() + } + + if !bs.sche.addPendingInfluence(op, best.srcStoreID, best.dstStoreID, infl, bs.rwTy, bs.opTy, maxZombieDur) { + return nil } - return ops + + return []*operator.Operator{op} } // filterSrcStores compare the min rate and the ratio * expectation rate, if both key and byte rate is greater than @@ -659,12 +663,12 @@ func (bs *balanceSolver) isRegionAvailable(region *core.RegionInfo) bool { return false } - if pendings, ok := bs.sche.regionPendings[region.GetID()]; ok { + if op, ok := bs.sche.regionPendings[region.GetID()]; ok { if bs.opTy == transferLeader { return false } - if pendings[movePeer] != nil || - (pendings[transferLeader] != nil && !pendings[transferLeader].IsEnd()) { + if op.Kind()&operator.OpRegion != 0 || + (op.Kind()&operator.OpLeader != 0 && !op.IsEnd()) { return false } } @@ -775,7 +779,7 @@ func (bs *balanceSolver) calcProgressiveRank() { rank := int64(0) if bs.rwTy == write && bs.opTy == transferLeader { // In this condition, CPU usage is the matter. - // Only consider about key rate. + // Only consider key rate. if srcLd.KeyRate-peer.GetKeyRate() >= dstLd.KeyRate+peer.GetKeyRate() { rank = -1 } @@ -962,12 +966,11 @@ func (bs *balanceSolver) isReadyToBuild() bool { return true } -func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) { +func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl Influence) { if !bs.isReadyToBuild() { - return nil, nil + return nil, Influence{} } var ( - op *operator.Operator counters []prometheus.Counter err error ) @@ -990,7 +993,7 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) { hotDirectionCounter.WithLabelValues("move-peer", bs.rwTy.String(), strconv.FormatUint(dstPeer.GetStoreId(), 10), "in")) case transferLeader: if bs.cur.region.GetStoreVoter(bs.cur.dstStoreID) == nil { - return nil, nil + return nil, Influence{} } desc := "transfer-hot-" + bs.rwTy.String() + "-leader" op, err = operator.CreateTransferLeaderOperator( @@ -1008,7 +1011,7 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) { if err != nil { log.Debug("fail to create operator", zap.Stringer("rw-type", bs.rwTy), zap.Stringer("op-type", bs.opTy), errs.ZapError(err)) schedulerCounter.WithLabelValues(bs.sche.GetName(), "create-operator-fail").Inc() - return nil, nil + return nil, Influence{} } op.SetPriorityLevel(core.HighPriority) @@ -1017,13 +1020,13 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) { schedulerCounter.WithLabelValues(bs.sche.GetName(), "new-operator"), schedulerCounter.WithLabelValues(bs.sche.GetName(), bs.opTy.String())) - infl := Influence{ + infl = Influence{ ByteRate: bs.cur.srcPeerStat.GetByteRate(), KeyRate: bs.cur.srcPeerStat.GetKeyRate(), Count: 1, } - return []*operator.Operator{op}, []Influence{infl} + return op, infl } func (h *hotScheduler) GetHotReadStatus() *statistics.StoreHotPeersInfos { @@ -1074,27 +1077,28 @@ func (h *hotScheduler) copyPendingInfluence(ty resourceType) map[uint64]Influenc return ret } -// calcPendingWeight return the calculate weight of one Operator, the value will between [0,1] -func (h *hotScheduler) calcPendingWeight(op *operator.Operator) float64 { - if op.CheckExpired() || op.CheckTimeout() { - return 0 - } - status := op.Status() +// calcPendingInfluence return the calculate weight of one Operator, the value will between [0,1] +func (h *hotScheduler) calcPendingInfluence(op *operator.Operator, maxZombieDur time.Duration) (weight float64, needGC bool) { + status := op.CheckAndGetStatus() if !operator.IsEndStatus(status) { - return 1 - } - switch status { - case operator.SUCCESS: - zombieDur := time.Since(op.GetReachTimeOf(status)) - maxZombieDur := h.conf.GetMaxZombieDuration() - if zombieDur >= maxZombieDur { - return 0 - } - // TODO: use store statistics update time to make a more accurate estimation - return float64(maxZombieDur-zombieDur) / float64(maxZombieDur) - default: - return 0 + return 1, false + } + + // TODO: use store statistics update time to make a more accurate estimation + zombieDur := time.Since(op.GetReachTimeOf(status)) + if zombieDur >= maxZombieDur { + weight = 0 + } else { + weight = 1 + } + + needGC = weight == 0 + if status != operator.SUCCESS { + // CANCELED, REPLACED, TIMEOUT, EXPIRED, etc. + // The actual weight is 0, but there is still a delay in GC. + weight = 0 } + return } func (h *hotScheduler) clearPendingInfluence() { @@ -1102,7 +1106,7 @@ func (h *hotScheduler) clearPendingInfluence() { h.pendings[ty] = map[*pendingInfluence]struct{}{} h.pendingSums[ty] = nil } - h.regionPendings = make(map[uint64][2]*operator.Operator) + h.regionPendings = make(map[uint64]*operator.Operator) } // rwType : the perspective of balance diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index 42f4b14b57d..595f90333aa 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -36,12 +36,12 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { MinHotByteRate: 100, MinHotKeyRate: 10, MaxZombieRounds: 3, + MaxPeerNum: 1000, ByteRateRankStepRatio: 0.05, KeyRateRankStepRatio: 0.05, CountRankStepRatio: 0.01, GreatDecRatio: 0.95, MinorDecRatio: 0.99, - MaxPeerNum: 1000, SrcToleranceRatio: 1.05, // Tolerate 5% difference DstToleranceRatio: 1.05, // Tolerate 5% difference } @@ -73,12 +73,18 @@ func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) { return schedule.EncodeConfig(conf) } -func (conf *hotRegionSchedulerConfig) GetMaxZombieDuration() time.Duration { +func (conf *hotRegionSchedulerConfig) GetStoreStatZombieDuration() time.Duration { conf.RLock() defer conf.RUnlock() return time.Duration(conf.MaxZombieRounds) * statistics.StoreHeartBeatReportInterval * time.Second } +func (conf *hotRegionSchedulerConfig) GetRegionsStatZombieDuration() time.Duration { + conf.RLock() + defer conf.RUnlock() + return time.Duration(conf.MaxZombieRounds) * statistics.RegionHeartBeatReportInterval * time.Second +} + func (conf *hotRegionSchedulerConfig) GetMaxPeerNumber() int { conf.RLock() defer conf.RUnlock() @@ -214,5 +220,4 @@ func (conf *hotRegionSchedulerConfig) persist() error { } return conf.storage.SaveScheduleConfig(HotRegionName, data) - } diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index 4214c9d177d..d997b6fcb22 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -56,9 +56,6 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { c.Assert(err, IsNil) hb := sche.(*hotScheduler) - nilOp := func(region *core.RegionInfo, ty opType) *operator.Operator { - return nil - } notDoneOp := func(region *core.RegionInfo, ty opType) *operator.Operator { var op *operator.Operator var err error @@ -70,6 +67,9 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { } c.Assert(err, IsNil) c.Assert(op, NotNil) + op.Start() + operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second)) + operator.SetOperatorStatusReachTime(op, operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second)) return op } doneOp := func(region *core.RegionInfo, ty opType) *operator.Operator { @@ -79,41 +79,42 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { } shouldRemoveOp := func(region *core.RegionInfo, ty opType) *operator.Operator { op := doneOp(region, ty) - operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second)) + operator.SetOperatorStatusReachTime(op, operator.CANCELED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second)) return op } - opCreaters := [4]func(region *core.RegionInfo, ty opType) *operator.Operator{nilOp, shouldRemoveOp, notDoneOp, doneOp} + opCreaters := [3]func(region *core.RegionInfo, ty opType) *operator.Operator{shouldRemoveOp, notDoneOp, doneOp} + + typs := []opType{movePeer, transferLeader} - for i := 0; i < len(opCreaters); i++ { - for j := 0; j < len(opCreaters); j++ { - regionID := uint64(i*len(opCreaters) + j + 1) + for i, creator := range opCreaters { + for j, typ := range typs { + regionID := uint64(i*len(typs) + j + 1) region := newTestRegion(regionID) - hb.regionPendings[regionID] = [2]*operator.Operator{ - movePeer: opCreaters[i](region, movePeer), - transferLeader: opCreaters[j](region, transferLeader), - } + op := creator(region, typ) + influence := newPendingInfluence(op, 2, 4, Influence{}, hb.conf.GetStoreStatZombieDuration()) + hb.pendings[writePeer][influence] = struct{}{} + hb.regionPendings[regionID] = op } } - hb.gcRegionPendings() + hb.summaryPendingInfluence() // Calling this function will GC. - for i := 0; i < len(opCreaters); i++ { - for j := 0; j < len(opCreaters); j++ { - regionID := uint64(i*len(opCreaters) + j + 1) - if i < 2 && j < 2 { + for i := range opCreaters { + for j, typ := range typs { + regionID := uint64(i*len(typs) + j + 1) + if i < 1 { // shouldRemoveOp c.Assert(hb.regionPendings, Not(HasKey), regionID) - } else if i < 2 { - c.Assert(hb.regionPendings, HasKey, regionID) - c.Assert(hb.regionPendings[regionID][movePeer], IsNil) - c.Assert(hb.regionPendings[regionID][transferLeader], NotNil) - } else if j < 2 { + } else { // notDoneOp, doneOp c.Assert(hb.regionPendings, HasKey, regionID) - c.Assert(hb.regionPendings[regionID][movePeer], NotNil) - c.Assert(hb.regionPendings[regionID][transferLeader], IsNil) - } else { - c.Assert(hb.regionPendings, HasKey, regionID) - c.Assert(hb.regionPendings[regionID][movePeer], NotNil) - c.Assert(hb.regionPendings[regionID][transferLeader], NotNil) + kind := hb.regionPendings[regionID].Kind() + switch typ { + case transferLeader: + c.Assert(kind&operator.OpLeader != 0, IsTrue) + c.Assert(kind&operator.OpRegion == 0, IsTrue) + case movePeer: + c.Assert(kind&operator.OpLeader == 0, IsTrue) + c.Assert(kind&operator.OpRegion != 0, IsTrue) + } } } } @@ -1223,7 +1224,7 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h } // try schedule - hb.prepareForBalance(tc) + hb.prepareForBalance(kind, tc) leaderSolver := newBalanceSolver(hb, tc, kind, transferLeader) leaderSolver.cur = &solution{srcStoreID: 2} c.Check(leaderSolver.filterHotPeers(), HasLen, 0) // skip schedule diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index ef20f6179d0..c2d6409df75 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -17,6 +17,7 @@ import ( "math" "net/url" "strconv" + "time" "github.com/montanaflynn/stats" "github.com/pingcap/log" @@ -163,35 +164,22 @@ func (infl Influence) add(rhs *Influence, w float64) Influence { // TODO: merge it into OperatorInfluence. type pendingInfluence struct { - op *operator.Operator - from, to uint64 - origin Influence + op *operator.Operator + from, to uint64 + origin Influence + maxZombieDuration time.Duration } -func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence) *pendingInfluence { +func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence, maxZombieDur time.Duration) *pendingInfluence { return &pendingInfluence{ - op: op, - from: from, - to: to, - origin: infl, + op: op, + from: from, + to: to, + origin: infl, + maxZombieDuration: maxZombieDur, } } -// summaryPendingInfluence calculate the summary pending Influence for each store and return storeID -> Influence -// It makes each key/byte rate or count become (1+w) times to the origin value while f is the function to provide w(weight) -func summaryPendingInfluence(pendings map[*pendingInfluence]struct{}, f func(*operator.Operator) float64) map[uint64]Influence { - ret := map[uint64]Influence{} - for p := range pendings { - w := f(p.op) - if w == 0 { - delete(pendings, p) - } - ret[p.to] = ret[p.to].add(&p.origin, w) - ret[p.from] = ret[p.from].add(&p.origin, -w) - } - return ret -} - type storeLoad struct { ByteRate float64 KeyRate float64