diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index f4da70b28de..06f4a59cd3b 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -85,10 +85,8 @@ type hotScheduler struct { name string *BaseScheduler sync.RWMutex - leaderLimit uint64 - peerLimit uint64 - types []rwType - r *rand.Rand + types []rwType + r *rand.Rand // regionPendings stores regionID -> pendingInfluence // this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't @@ -108,8 +106,6 @@ func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionS ret := &hotScheduler{ name: HotRegionName, BaseScheduler: base, - leaderLimit: 1, - peerLimit: 1, types: []rwType{write, read}, r: rand.New(rand.NewSource(time.Now().UnixNano())), regionPendings: make(map[uint64]*pendingInfluence), @@ -171,7 +167,7 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope h.Lock() defer h.Unlock() - h.prepareForBalance(cluster) + h.prepareForBalance(typ, cluster) switch typ { case read: @@ -184,13 +180,15 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope // prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for // each store -func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { +func (h *hotScheduler) prepareForBalance(typ rwType, cluster opt.Cluster) { h.summaryPendingInfluence() stores := cluster.GetStores() storesLoads := cluster.GetStoresLoads() - { // update read statistics + switch typ { + case read: + // update read statistics regionRead := cluster.RegionReadStats() h.stLoadInfos[readLeader] = summaryStoresLoad( stores, @@ -204,9 +202,8 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { h.pendingSums, regionRead, read, core.RegionKind) - } - - { // update write statistics + case write: + // update write statistics regionWrite := cluster.RegionWriteStats() h.stLoadInfos[writeLeader] = summaryStoresLoad( stores, @@ -228,9 +225,9 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { // and clean the region from regionInfluence if they have ended operator. // It makes each key/byte rate or count become `weight` times to the origin value. func (h *hotScheduler) summaryPendingInfluence() { - maxZombieDur := h.conf.GetMaxZombieDuration() ret := make(map[uint64]*Influence) for id, p := range h.regionPendings { + maxZombieDur := p.maxZombieDuration weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur) if needGC { delete(h.regionPendings, id) @@ -266,8 +263,9 @@ func summaryStoresLoad( ) map[uint64]*storeLoadDetail { // loadDetail stores the storeID -> hotPeers stat and its current and future stat(key/byte rate,count) loadDetail := make(map[uint64]*storeLoadDetail, len(storesLoads)) - allLoadSum := make([]float64, statistics.DimLen) - allCount := 0.0 + allTiKVLoadSum := make([]float64, statistics.DimLen) + allTiKVCount := 0 + allTiKVHotPeersCount := 0 // Stores without byte rate statistics is not available to schedule. for _, store := range stores { @@ -279,6 +277,8 @@ func summaryStoresLoad( if kind == core.LeaderKind && !store.AllowLeaderTransfer() { continue } + isTiFlash := core.IsTiFlashStore(store.GetMeta()) + loads := make([]float64, statistics.DimLen) switch rwTy { case read: @@ -317,10 +317,14 @@ func summaryStoresLoad( hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[statistics.KeyDim]) } } - for i := range allLoadSum { - allLoadSum[i] += loads[i] + + if !isTiFlash { + for i := range allTiKVLoadSum { + allTiKVLoadSum[i] += loads[i] + } + allTiKVCount += 1 + allTiKVHotPeersCount += len(hotPeers) } - allCount += float64(len(hotPeers)) // Build store load prediction from current load and pending influence. stLoadPred := (&storeLoad{ @@ -335,16 +339,19 @@ func summaryStoresLoad( HotPeers: hotPeers, } } - storeLen := float64(len(storesLoads)) + + expectLoads := make([]float64, len(allTiKVLoadSum)) + for i := range expectLoads { + expectLoads[i] = allTiKVLoadSum[i] / float64(allTiKVCount) + } + expect := storeLoad{ + Loads: expectLoads, + Count: float64(allTiKVHotPeersCount) / float64(allTiKVCount), + } + // store expectation byte/key rate and count for each store-load detail. for id, detail := range loadDetail { - expectLoads := make([]float64, len(allLoadSum)) - for i := range expectLoads { - expectLoads[i] = allLoadSum[i] / storeLen - } - expectCount := allCount / storeLen - detail.LoadPred.Expect.Loads = expectLoads - detail.LoadPred.Expect.Count = expectCount + detail.LoadPred.Expect = expect // Debug { ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String() @@ -356,7 +363,7 @@ func summaryStoresLoad( } { ty := "exp-count-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectCount) + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expect.Count) } } return loadDetail @@ -377,7 +384,7 @@ func filterHotPeers( return ret } -func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence) bool { +func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, maxZombieDur time.Duration) bool { regionID := op.RegionID() _, ok := h.regionPendings[regionID] if ok { @@ -385,7 +392,7 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS return false } - influence := newPendingInfluence(op, srcStore, dstStore, infl) + influence := newPendingInfluence(op, srcStore, dstStore, infl, maxZombieDur) h.regionPendings[regionID] = influence schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc() @@ -535,9 +542,9 @@ func (bs *balanceSolver) solve() []*operator.Operator { } bs.cur = &solution{} var ( - best *solution - ops []*operator.Operator - infls []Influence + best *solution + op *operator.Operator + infl Influence ) for srcStoreID := range bs.filterSrcStores() { @@ -553,9 +560,9 @@ func (bs *balanceSolver) solve() []*operator.Operator { bs.cur.dstStoreID = dstStoreID bs.calcProgressiveRank() if bs.cur.progressiveRank < 0 && bs.betterThan(best) { - if newOps, newInfls := bs.buildOperators(); len(newOps) > 0 { - ops = newOps - infls = newInfls + if newOp, newInfl := bs.buildOperator(); newOp != nil { + op = newOp + infl = *newInfl clone := *bs.cur best = &clone } @@ -564,13 +571,25 @@ func (bs *balanceSolver) solve() []*operator.Operator { } } - for i := 0; i < len(ops); i++ { - // TODO: multiple operators need to be atomic. - if !bs.sche.addPendingInfluence(ops[i], best.srcStoreID, best.dstStoreID, infls[i]) { - return nil - } + if best == nil { + return nil + } + + // Depending on the source of the statistics used, a different ZombieDuration will be used. + // If the statistics are from the sum of Regions, there will be a longer ZombieDuration. + var maxZombieDur time.Duration + switch { + case bs.rwTy == write && bs.opTy == transferLeader: + maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration() + default: + maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration() + } + + if !bs.sche.addPendingInfluence(op, best.srcStoreID, best.dstStoreID, infl, maxZombieDur) { + return nil } - return ops + + return []*operator.Operator{op} } // filterSrcStores compare the min rate and the ratio * expectation rate, if both key and byte rate is greater than @@ -787,7 +806,7 @@ func (bs *balanceSolver) calcProgressiveRank() { rank := int64(0) if bs.rwTy == write && bs.opTy == transferLeader { // In this condition, CPU usage is the matter. - // Only consider about key rate. + // Only consider key rate. srcKeyRate := srcLd.Loads[statistics.KeyDim] dstKeyRate := dstLd.Loads[statistics.KeyDim] peerKeyRate := peer.GetLoad(getRegionStatKind(bs.rwTy, statistics.KeyDim)) @@ -998,12 +1017,11 @@ func (bs *balanceSolver) isReadyToBuild() bool { return true } -func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) { +func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence) { if !bs.isReadyToBuild() { return nil, nil } var ( - op *operator.Operator counters []prometheus.Counter err error ) @@ -1064,11 +1082,11 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) { schedulerCounter.WithLabelValues(bs.sche.GetName(), "new-operator"), schedulerCounter.WithLabelValues(bs.sche.GetName(), bs.opTy.String())) - infl := Influence{ + infl = &Influence{ Loads: append(bs.cur.srcPeerStat.Loads[:0:0], bs.cur.srcPeerStat.Loads...), Count: 1, } - return []*operator.Operator{op}, []Influence{infl} + return op, infl } func (h *hotScheduler) GetHotStatus(typ string) *statistics.StoreHotPeersInfos { diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index afaa261cce2..75aa67dc0d9 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -36,12 +36,12 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { MinHotByteRate: 100, MinHotKeyRate: 10, MaxZombieRounds: 3, + MaxPeerNum: 1000, ByteRateRankStepRatio: 0.05, KeyRateRankStepRatio: 0.05, CountRankStepRatio: 0.01, GreatDecRatio: 0.95, MinorDecRatio: 0.99, - MaxPeerNum: 1000, SrcToleranceRatio: 1.05, // Tolerate 5% difference DstToleranceRatio: 1.05, // Tolerate 5% difference } @@ -73,12 +73,18 @@ func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) { return schedule.EncodeConfig(conf) } -func (conf *hotRegionSchedulerConfig) GetMaxZombieDuration() time.Duration { +func (conf *hotRegionSchedulerConfig) GetStoreStatZombieDuration() time.Duration { conf.RLock() defer conf.RUnlock() return time.Duration(conf.MaxZombieRounds) * statistics.StoreHeartBeatReportInterval * time.Second } +func (conf *hotRegionSchedulerConfig) GetRegionsStatZombieDuration() time.Duration { + conf.RLock() + defer conf.RUnlock() + return time.Duration(conf.MaxZombieRounds) * statistics.RegionHeartBeatReportInterval * time.Second +} + func (conf *hotRegionSchedulerConfig) GetMaxPeerNumber() int { conf.RLock() defer conf.RUnlock() @@ -214,5 +220,4 @@ func (conf *hotRegionSchedulerConfig) persist() error { } return conf.storage.SaveScheduleConfig(HotRegionName, data) - } diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index e859d665f13..7c79ebed98e 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -76,7 +76,7 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { op.Start() operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second)) operator.SetOperatorStatusReachTime(op, operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second)) - return newPendingInfluence(op, 2, 4, Influence{}) + return newPendingInfluence(op, 2, 4, Influence{}, hb.conf.GetStoreStatZombieDuration()) } justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence { infl := notDoneOpInfluence(region, ty) @@ -1306,7 +1306,7 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) { if testcase.DegreeAfterTransferLeader >= 3 { // try schedule - hb.prepareForBalance(tc) + hb.prepareForBalance(testcase.kind, tc) leaderSolver := newBalanceSolver(hb, tc, testcase.kind, transferLeader) leaderSolver.cur = &solution{srcStoreID: 2} c.Check(leaderSolver.filterHotPeers(), HasLen, 0) // skip schedule diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 7e1af60a815..9b53be8b9a6 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -17,6 +17,7 @@ import ( "math" "net/url" "strconv" + "time" "github.com/montanaflynn/stats" "github.com/pingcap/log" @@ -215,17 +216,19 @@ func (lhs *Influence) add(rhs *Influence, w float64) *Influence { // TODO: merge it into OperatorInfluence. type pendingInfluence struct { - op *operator.Operator - from, to uint64 - origin Influence + op *operator.Operator + from, to uint64 + origin Influence + maxZombieDuration time.Duration } -func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence) *pendingInfluence { +func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence, maxZombieDur time.Duration) *pendingInfluence { return &pendingInfluence{ - op: op, - from: from, - to: to, - origin: infl, + op: op, + from: from, + to: to, + origin: infl, + maxZombieDuration: maxZombieDur, } }