From a314b29e7a6d1393eab62d0b8614db702b628929 Mon Sep 17 00:00:00 2001 From: nolouch Date: Sat, 28 Mar 2020 14:09:02 +0800 Subject: [PATCH 1/3] scheduelrs: add load expectations for hot scheudler Signed-off-by: nolouch --- pkg/mock/mockcluster/mockcluster.go | 20 ++++++++--- server/schedulers/hot_region.go | 49 +++++++++++++++++++++++--- server/schedulers/hot_region_config.go | 14 ++++++++ server/schedulers/hot_test.go | 36 ++++++++++--------- server/schedulers/metrics.go | 9 +++++ server/schedulers/utils.go | 4 +++ 6 files changed, 108 insertions(+), 24 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 08739da942a..8284d1df3c1 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -394,10 +394,13 @@ func (mc *Cluster) UpdateStorageRatio(storeID uint64, usedRatio, availableRatio } // UpdateStorageWrittenBytes updates store written bytes. -func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64) { +func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64, updateSingleOption ...struct{}) { store := mc.GetStore(storeID) newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) newStats.BytesWritten = bytesWritten + if len(updateSingleOption) == 0 { + newStats.KeysWritten = bytesWritten / 100 + } now := time.Now().Second() interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} newStats.Interval = interval @@ -407,10 +410,13 @@ func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64 } // UpdateStorageReadBytes updates store read bytes. -func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) { +func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64, updateSingleOption ...struct{}) { store := mc.GetStore(storeID) newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) newStats.BytesRead = bytesRead + if len(updateSingleOption) == 0 { + newStats.KeysRead = bytesRead / 100 + } now := time.Now().Second() interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} newStats.Interval = interval @@ -420,10 +426,13 @@ func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) { } // UpdateStorageWrittenKeys updates store written keys. -func (mc *Cluster) UpdateStorageWrittenKeys(storeID uint64, keysWritten uint64) { +func (mc *Cluster) UpdateStorageWrittenKeys(storeID uint64, keysWritten uint64, updateSingleOption ...struct{}) { store := mc.GetStore(storeID) newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) newStats.KeysWritten = keysWritten + if len(updateSingleOption) == 0 { + newStats.BytesWritten = keysWritten * 100 + } now := time.Now().Second() interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} newStats.Interval = interval @@ -433,10 +442,13 @@ func (mc *Cluster) UpdateStorageWrittenKeys(storeID uint64, keysWritten uint64) } // UpdateStorageReadKeys updates store read bytes. -func (mc *Cluster) UpdateStorageReadKeys(storeID uint64, keysRead uint64) { +func (mc *Cluster) UpdateStorageReadKeys(storeID uint64, keysRead uint64, updateSingleOption ...struct{}) { store := mc.GetStore(storeID) newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) newStats.KeysRead = keysRead + if len(updateSingleOption) == 0 { + newStats.BytesRead = keysRead * 100 + } now := time.Now().Second() interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} newStats.Interval = interval diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 68aa949174e..886927993e0 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -19,6 +19,7 @@ import ( "math/rand" "net/http" "sort" + "strconv" "sync" "time" @@ -254,6 +255,9 @@ func summaryStoresLoad( kind core.ResourceKind, ) map[uint64]*storeLoadDetail { loadDetail := make(map[uint64]*storeLoadDetail, len(storeByteRate)) + allByteSum := 0.0 + allKeySum := 0.0 + allCount := 0.0 // Stores without byte rate statistics is not available to schedule. for id, byteRate := range storeByteRate { @@ -285,6 +289,9 @@ func summaryStoresLoad( hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(keySum) } } + allByteSum += byteRate + allKeySum += keyRate + allCount += float64(len(hotPeers)) // Build store load prediction from current load and pending influence. stLoadPred := (&storeLoad{ @@ -299,6 +306,29 @@ func summaryStoresLoad( HotPeers: hotPeers, } } + storeLen := float64(len(storeByteRate)) + + for id, detail := range loadDetail { + byteExp := allByteSum / storeLen + keyExp := allKeySum / storeLen + countExp := allCount / storeLen + detail.LoadPred.Future.ExpByteRate = byteExp + detail.LoadPred.Future.ExpKeyRate = keyExp + detail.LoadPred.Future.ExpCount = countExp + // Debug + { + ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String() + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(byteExp) + } + { + ty := "exp-key-rate-" + rwTy.String() + "-" + kind.String() + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(keyExp) + } + { + ty := "exp-count-rate-" + rwTy.String() + "-" + kind.String() + hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(countExp) + } + } return loadDetail } @@ -543,7 +573,12 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { if len(detail.HotPeers) == 0 { continue } - ret[id] = detail + if detail.LoadPred.min().ByteRate > bs.sche.conf.GetToleranceRatio()*detail.LoadPred.Future.ExpByteRate && + detail.LoadPred.min().KeyRate > bs.sche.conf.GetToleranceRatio()*detail.LoadPred.Future.ExpKeyRate { + ret[id] = detail + balanceHotRegionCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc() + } + balanceHotRegionCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc() } return ret } @@ -706,6 +741,13 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { ret := make(map[uint64]*storeLoadDetail, len(candidates)) for _, store := range candidates { if filter.Target(bs.cluster, store, filters) { + detail := bs.stLoadDetail[store.GetID()] + if detail.LoadPred.max().ByteRate*bs.sche.conf.GetToleranceRatio() < detail.LoadPred.Future.ExpByteRate && + detail.LoadPred.max().KeyRate*bs.sche.conf.GetToleranceRatio() < detail.LoadPred.Future.ExpKeyRate { + ret[store.GetID()] = bs.stLoadDetail[store.GetID()] + balanceHotRegionCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10)).Inc() + } + balanceHotRegionCounter.WithLabelValues("dst-store-fail", strconv.FormatUint(store.GetID(), 10)).Inc() ret[store.GetID()] = bs.stLoadDetail[store.GetID()] } } @@ -721,9 +763,8 @@ func (bs *balanceSolver) calcProgressiveRank() { rank := int64(0) if bs.rwTy == write && bs.opTy == transferLeader { // In this condition, CPU usage is the matter. - // Only consider about count and key rate. - if srcLd.Count > dstLd.Count && - srcLd.KeyRate >= dstLd.KeyRate+peer.GetKeyRate() { + // Only consider about key rate. + if srcLd.KeyRate >= dstLd.KeyRate+peer.GetKeyRate() { rank = -1 } } else { diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index 2db258ed44e..855acab81a5 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -42,6 +42,7 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { GreatDecRatio: 0.95, MinorDecRatio: 0.99, MaxPeerNum: 1000, + ToleranceRatio: 1.02, } } @@ -61,6 +62,7 @@ type hotRegionSchedulerConfig struct { CountRankStepRatio float64 `json:"count-rank-step-ratio"` GreatDecRatio float64 `json:"great-dec-ratio"` MinorDecRatio float64 `json:"minor-dec-ratio"` + ToleranceRatio float64 `json:"minorDecRatio"` } func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) { @@ -81,6 +83,18 @@ func (conf *hotRegionSchedulerConfig) GetMaxPeerNumber() int { return conf.MaxPeerNum } +func (conf *hotRegionSchedulerConfig) GetToleranceRatio() float64 { + conf.RLock() + defer conf.RUnlock() + return conf.ToleranceRatio +} + +func (conf *hotRegionSchedulerConfig) SetToleranceRatio(tol float64) { + conf.Lock() + defer conf.Unlock() + conf.ToleranceRatio = tol +} + func (conf *hotRegionSchedulerConfig) GetByteRankStepRatio() float64 { conf.RLock() defer conf.RUnlock() diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index 5e5e9adff4b..b4d8fb2d2e2 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -298,6 +298,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) { opt := mockoption.NewScheduleOptions() hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) c.Assert(err, IsNil) + hb.(*hotScheduler).conf.SetToleranceRatio(1) opt.HotRegionCacheHitsThreshold = 0 tc := mockcluster.NewCluster(opt) @@ -313,11 +314,12 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) { tc.UpdateStorageWrittenBytes(4, 9*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenBytes(5, 8.9*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenKeys(1, 10*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenKeys(2, 9.5*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenKeys(3, 9.8*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenKeys(4, 9*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenKeys(5, 9.2*MB*statistics.StoreHeartBeatReportInterval) + updateSingel := struct{}{} + tc.UpdateStorageWrittenKeys(1, 10.2*MB*statistics.StoreHeartBeatReportInterval, updateSingel) + tc.UpdateStorageWrittenKeys(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, updateSingel) + tc.UpdateStorageWrittenKeys(3, 9.8*MB*statistics.StoreHeartBeatReportInterval, updateSingel) + tc.UpdateStorageWrittenKeys(4, 9*MB*statistics.StoreHeartBeatReportInterval, updateSingel) + tc.UpdateStorageWrittenKeys(5, 9.2*MB*statistics.StoreHeartBeatReportInterval, updateSingel) addRegionInfo(tc, write, []testRegionInfo{ {1, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB}, @@ -331,19 +333,19 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) { // byteDecRatio <= 0.95 && keyDecRatio <= 0.95 testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 1, 4) // store byte rate (min, max): (10, 10.5) | 9.5 | 9.5 | (9, 9.5) | 8.9 - // store key rate (min, max): (9.5, 10) | 9.5 | 9.8 | (9, 9.5) | 9.2 + // store key rate (min, max): (9.7, 10.2) | 9.5 | 9.8 | (9, 9.5) | 9.2 op = hb.Schedule(tc)[0] // byteDecRatio <= 0.99 && keyDecRatio <= 0.95 testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 3, 5) // store byte rate (min, max): (10, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 8.95) - // store key rate (min, max): (9.5, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.3) + // store key rate (min, max): (9.7, 10.2) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.3) op = hb.Schedule(tc)[0] // byteDecRatio <= 0.95 testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 1, 5) // store byte rate (min, max): (9.5, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 9.45) - // store key rate (min, max): (9, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.8) + // store key rate (min, max): (9.2, 10.2) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.8) } } @@ -586,6 +588,7 @@ func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) { opt := mockoption.NewScheduleOptions() hb, err := schedule.CreateScheduler(HotReadRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) c.Assert(err, IsNil) + hb.(*hotScheduler).conf.SetToleranceRatio(1) opt.HotRegionCacheHitsThreshold = 0 tc := mockcluster.NewCluster(opt) @@ -601,11 +604,12 @@ func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) { tc.UpdateStorageReadBytes(4, 9*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageReadBytes(5, 8.9*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadKeys(1, 10*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadKeys(2, 9.5*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadKeys(3, 9.8*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadKeys(4, 9*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadKeys(5, 9.2*MB*statistics.StoreHeartBeatReportInterval) + updateSingel := struct{}{} + tc.UpdateStorageReadKeys(1, 10.2*MB*statistics.StoreHeartBeatReportInterval, updateSingel) + tc.UpdateStorageReadKeys(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, updateSingel) + tc.UpdateStorageReadKeys(3, 9.8*MB*statistics.StoreHeartBeatReportInterval, updateSingel) + tc.UpdateStorageReadKeys(4, 9*MB*statistics.StoreHeartBeatReportInterval, updateSingel) + tc.UpdateStorageReadKeys(5, 9.2*MB*statistics.StoreHeartBeatReportInterval, updateSingel) addRegionInfo(tc, read, []testRegionInfo{ {1, []uint64{1, 2, 4}, 0.5 * MB, 0.5 * MB}, @@ -619,19 +623,19 @@ func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) { // byteDecRatio <= 0.95 && keyDecRatio <= 0.95 testutil.CheckTransferLeader(c, op, operator.OpHotRegion, 1, 4) // store byte rate (min, max): (10, 10.5) | 9.5 | 9.5 | (9, 9.5) | 8.9 - // store key rate (min, max): (9.5, 10) | 9.5 | 9.8 | (9, 9.5) | 9.2 + // store key rate (min, max): (9.7, 10.2) | 9.5 | 9.8 | (9, 9.5) | 9.2 op = hb.Schedule(tc)[0] // byteDecRatio <= 0.99 && keyDecRatio <= 0.95 testutil.CheckTransferLeader(c, op, operator.OpHotRegion, 3, 5) // store byte rate (min, max): (10, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 8.95) - // store key rate (min, max): (9.5, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.3) + // store key rate (min, max): (9.7, 10.2) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.3) op = hb.Schedule(tc)[0] // byteDecRatio <= 0.95 testutil.CheckTransferPeerWithLeaderTransfer(c, op, operator.OpHotRegion, 1, 5) // store byte rate (min, max): (9.5, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 9.45) - // store key rate (min, max): (9, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.8) + // store key rate (min, max): (9.2, 10.2) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.8) } } diff --git a/server/schedulers/metrics.go b/server/schedulers/metrics.go index fd2299543f8..768a5c1d92a 100644 --- a/server/schedulers/metrics.go +++ b/server/schedulers/metrics.go @@ -71,6 +71,14 @@ var balanceRegionCounter = prometheus.NewCounterVec( Help: "Counter of balance region scheduler.", }, []string{"type", "address", "store"}) +var balanceHotRegionCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "scheduler", + Name: "hot_region", + Help: "Counter of hot region scheduler.", + }, []string{"type", "store"}) + var balanceDirectionCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", @@ -101,6 +109,7 @@ func init() { prometheus.MustRegister(hotPeerSummary) prometheus.MustRegister(balanceLeaderCounter) prometheus.MustRegister(balanceRegionCounter) + prometheus.MustRegister(balanceHotRegionCounter) prometheus.MustRegister(balanceDirectionCounter) prometheus.MustRegister(scatterRangeLeaderCounter) prometheus.MustRegister(scatterRangeRegionCounter) diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 7cce93ce527..d0272517e62 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -208,6 +208,10 @@ type storeLoad struct { ByteRate float64 KeyRate float64 Count float64 + + ExpByteRate float64 + ExpKeyRate float64 + ExpCount float64 } func (load *storeLoad) ToLoadPred(infl Influence) *storeLoadPred { From 2cc7db5c6ffdf94d4487a02451e215f07676b689 Mon Sep 17 00:00:00 2001 From: nolouch Date: Sat, 28 Mar 2020 14:19:07 +0800 Subject: [PATCH 2/3] fix typo Signed-off-by: nolouch --- server/schedulers/hot_region_config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index 855acab81a5..d2b4f70d5f7 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -62,7 +62,7 @@ type hotRegionSchedulerConfig struct { CountRankStepRatio float64 `json:"count-rank-step-ratio"` GreatDecRatio float64 `json:"great-dec-ratio"` MinorDecRatio float64 `json:"minor-dec-ratio"` - ToleranceRatio float64 `json:"minorDecRatio"` + ToleranceRatio float64 `json:"tolerance-ratio"` } func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) { From 859fdf0a72c1ad47c22c0b8a4cfa238bbac7905c Mon Sep 17 00:00:00 2001 From: nolouch Date: Sat, 28 Mar 2020 18:26:39 +0800 Subject: [PATCH 3/3] address comments Signed-off-by: nolouch --- pkg/mock/mockcluster/mockcluster.go | 52 ++++++++++++++++++-------- server/schedulers/hot_region_config.go | 2 +- server/schedulers/hot_test.go | 34 +++++------------ 3 files changed, 47 insertions(+), 41 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 8284d1df3c1..aa58cba8c6e 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -393,14 +393,40 @@ func (mc *Cluster) UpdateStorageRatio(storeID uint64, usedRatio, availableRatio mc.PutStore(newStore) } +// UpdateStorageWrittenStats updates store written bytes. +func (mc *Cluster) UpdateStorageWrittenStats(storeID, bytesWritten, keysWritten uint64) { + store := mc.GetStore(storeID) + newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) + newStats.BytesWritten = bytesWritten + newStats.KeysWritten = keysWritten + now := time.Now().Second() + interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} + newStats.Interval = interval + newStore := store.Clone(core.SetStoreStats(newStats)) + mc.Set(storeID, newStats) + mc.PutStore(newStore) +} + +// UpdateStorageReadStats updates store written bytes. +func (mc *Cluster) UpdateStorageReadStats(storeID, bytesWritten, keysWritten uint64) { + store := mc.GetStore(storeID) + newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) + newStats.BytesRead = bytesWritten + newStats.KeysRead = keysWritten + now := time.Now().Second() + interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} + newStats.Interval = interval + newStore := store.Clone(core.SetStoreStats(newStats)) + mc.Set(storeID, newStats) + mc.PutStore(newStore) +} + // UpdateStorageWrittenBytes updates store written bytes. -func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64, updateSingleOption ...struct{}) { +func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64) { store := mc.GetStore(storeID) newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) newStats.BytesWritten = bytesWritten - if len(updateSingleOption) == 0 { - newStats.KeysWritten = bytesWritten / 100 - } + newStats.KeysWritten = bytesWritten / 100 now := time.Now().Second() interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} newStats.Interval = interval @@ -410,13 +436,11 @@ func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64 } // UpdateStorageReadBytes updates store read bytes. -func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64, updateSingleOption ...struct{}) { +func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) { store := mc.GetStore(storeID) newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) newStats.BytesRead = bytesRead - if len(updateSingleOption) == 0 { - newStats.KeysRead = bytesRead / 100 - } + newStats.KeysRead = bytesRead / 100 now := time.Now().Second() interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} newStats.Interval = interval @@ -426,13 +450,11 @@ func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64, upda } // UpdateStorageWrittenKeys updates store written keys. -func (mc *Cluster) UpdateStorageWrittenKeys(storeID uint64, keysWritten uint64, updateSingleOption ...struct{}) { +func (mc *Cluster) UpdateStorageWrittenKeys(storeID uint64, keysWritten uint64) { store := mc.GetStore(storeID) newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) newStats.KeysWritten = keysWritten - if len(updateSingleOption) == 0 { - newStats.BytesWritten = keysWritten * 100 - } + newStats.BytesWritten = keysWritten * 100 now := time.Now().Second() interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} newStats.Interval = interval @@ -442,13 +464,11 @@ func (mc *Cluster) UpdateStorageWrittenKeys(storeID uint64, keysWritten uint64, } // UpdateStorageReadKeys updates store read bytes. -func (mc *Cluster) UpdateStorageReadKeys(storeID uint64, keysRead uint64, updateSingleOption ...struct{}) { +func (mc *Cluster) UpdateStorageReadKeys(storeID uint64, keysRead uint64) { store := mc.GetStore(storeID) newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats) newStats.KeysRead = keysRead - if len(updateSingleOption) == 0 { - newStats.BytesRead = keysRead * 100 - } + newStats.BytesRead = keysRead * 100 now := time.Now().Second() interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)} newStats.Interval = interval diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index d2b4f70d5f7..2c72e8046d8 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -42,7 +42,7 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { GreatDecRatio: 0.95, MinorDecRatio: 0.99, MaxPeerNum: 1000, - ToleranceRatio: 1.02, + ToleranceRatio: 1.02, // Tolerate 2% difference } } diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index b4d8fb2d2e2..a3999da5b47 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -308,18 +308,11 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) { tc.AddRegionStore(4, 20) tc.AddRegionStore(5, 20) - tc.UpdateStorageWrittenBytes(1, 10.5*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenBytes(2, 9.5*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenBytes(3, 9.5*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenBytes(4, 9*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageWrittenBytes(5, 8.9*MB*statistics.StoreHeartBeatReportInterval) - - updateSingel := struct{}{} - tc.UpdateStorageWrittenKeys(1, 10.2*MB*statistics.StoreHeartBeatReportInterval, updateSingel) - tc.UpdateStorageWrittenKeys(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, updateSingel) - tc.UpdateStorageWrittenKeys(3, 9.8*MB*statistics.StoreHeartBeatReportInterval, updateSingel) - tc.UpdateStorageWrittenKeys(4, 9*MB*statistics.StoreHeartBeatReportInterval, updateSingel) - tc.UpdateStorageWrittenKeys(5, 9.2*MB*statistics.StoreHeartBeatReportInterval, updateSingel) + tc.UpdateStorageWrittenStats(1, 10.5*MB*statistics.StoreHeartBeatReportInterval, 10.2*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(3, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.8*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(4, 9*MB*statistics.StoreHeartBeatReportInterval, 9*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenStats(5, 8.9*MB*statistics.StoreHeartBeatReportInterval, 9.2*MB*statistics.StoreHeartBeatReportInterval) addRegionInfo(tc, write, []testRegionInfo{ {1, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB}, @@ -598,18 +591,11 @@ func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) { tc.AddRegionStore(4, 20) tc.AddRegionStore(5, 20) - tc.UpdateStorageReadBytes(1, 10.5*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadBytes(2, 9.5*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadBytes(3, 9.5*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadBytes(4, 9*MB*statistics.StoreHeartBeatReportInterval) - tc.UpdateStorageReadBytes(5, 8.9*MB*statistics.StoreHeartBeatReportInterval) - - updateSingel := struct{}{} - tc.UpdateStorageReadKeys(1, 10.2*MB*statistics.StoreHeartBeatReportInterval, updateSingel) - tc.UpdateStorageReadKeys(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, updateSingel) - tc.UpdateStorageReadKeys(3, 9.8*MB*statistics.StoreHeartBeatReportInterval, updateSingel) - tc.UpdateStorageReadKeys(4, 9*MB*statistics.StoreHeartBeatReportInterval, updateSingel) - tc.UpdateStorageReadKeys(5, 9.2*MB*statistics.StoreHeartBeatReportInterval, updateSingel) + tc.UpdateStorageReadStats(1, 10.5*MB*statistics.StoreHeartBeatReportInterval, 10.2*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadStats(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.5*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadStats(3, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.8*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadStats(4, 9*MB*statistics.StoreHeartBeatReportInterval, 9*MB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadStats(5, 8.9*MB*statistics.StoreHeartBeatReportInterval, 9.2*MB*statistics.StoreHeartBeatReportInterval) addRegionInfo(tc, read, []testRegionInfo{ {1, []uint64{1, 2, 4}, 0.5 * MB, 0.5 * MB},