From 8e3564d9def64c5d3cfc5c58fcf8997d044e0f26 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Tue, 11 May 2021 14:19:38 +0800 Subject: [PATCH] statistics: use separate report interval for hot peer stat (#3661) * separte interval Signed-off-by: yisaer * fix lint Signed-off-by: yisaer * fix test Signed-off-by: yisaer * address the comment Signed-off-by: yisaer * fix test Signed-off-by: yisaer * remove useless code Signed-off-by: yisaer * remove useless code Signed-off-by: yisaer * address the comment Signed-off-by: yisaer Co-authored-by: Ti Chi Robot --- server/schedulers/hot_test.go | 37 ++++++++++++-------- server/schedulers/scheduler_test.go | 22 ++++++------ server/statistics/hot_peer.go | 12 +++++-- server/statistics/hot_peer_cache.go | 36 ++++++++++++-------- tests/pdctl/hot/hot_test.go | 52 +++++++++++++++++++++-------- 5 files changed, 104 insertions(+), 55 deletions(-) diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index 175c6895e7f..b3d93430253 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -692,7 +692,7 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) { testutil.CheckTransferLeader(c, hb.Schedule(tc)[0], operator.OpHotRegion, 1, 3) hb.(*hotScheduler).clearPendingInfluence() // assume handle the operator - tc.AddLeaderRegionWithReadInfo(3, 3, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 2}) + tc.AddLeaderRegionWithReadInfo(3, 3, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{1, 2}) // After transfer a hot region leader from store 1 to store 3 // the three region leader will be evenly distributed in three stores @@ -1071,12 +1071,16 @@ func addRegionInfo(tc *mockcluster.Cluster, rwTy rwType, regions []testRegionInf if rwTy == write { addFunc = tc.AddLeaderRegionWithWriteInfo } + reportIntervalSecs := statistics.WriteReportInterval + if rwTy == read { + reportIntervalSecs = statistics.ReadReportInterval + } for _, r := range regions { addFunc( r.id, r.peers[0], - uint64(r.byteRate*statistics.RegionHeartBeatReportInterval), - uint64(r.keyRate*statistics.RegionHeartBeatReportInterval), - statistics.RegionHeartBeatReportInterval, + uint64(r.byteRate*float64(reportIntervalSecs)), + uint64(r.keyRate*float64(reportIntervalSecs)), + uint64(reportIntervalSecs), r.peers[1:], ) } @@ -1105,17 +1109,21 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h tc.AddRegionStore(2, 20) tc.UpdateStorageReadStats(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.5*MB*statistics.StoreHeartBeatReportInterval) + reportInterval := uint64(statistics.WriteReportInterval) + if kind == read { + reportInterval = uint64(statistics.ReadReportInterval) + } // hot degree increase - heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1) - heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1) - items := heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1) + heartbeat(1, 1, 512*KB*reportInterval, 0, reportInterval, []uint64{2, 3}, 1) + heartbeat(1, 1, 512*KB*reportInterval, 0, reportInterval, []uint64{2, 3}, 1) + items := heartbeat(1, 1, 512*KB*reportInterval, 0, reportInterval, []uint64{2, 3}, 1) c.Check(len(items), Greater, 0) for _, item := range items { c.Check(item.HotDegree, Equals, 3) } // transfer leader, skip the first heartbeat and schedule. - items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3}, 1) + items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 3}, 1) for _, item := range items { if !item.IsNeedDelete() { c.Check(item.HotDegree, Equals, 3) @@ -1133,12 +1141,12 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h tc.SetHotRegionCacheHitsThreshold(threshold) // move peer: add peer and remove peer - items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3, 4}, 1) + items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 3, 4}, 1) c.Check(len(items), Greater, 0) for _, item := range items { c.Check(item.HotDegree, Equals, 4) } - items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 4}, 1) + items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 4}, 1) c.Check(len(items), Greater, 0) for _, item := range items { if item.StoreID == 3 { @@ -1158,19 +1166,20 @@ func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) { tc.SetLocationLabels([]string{"zone", "host"}) tc.DisableFeature(versioninfo.JointConsensus) // some peers are hot, and some are cold #3198 + rate := uint64(512 * KB) for i := 0; i < statistics.TopNN; i++ { for j := 0; j < statistics.DefaultAotSize; j++ { - tc.AddLeaderRegionWithWriteInfo(uint64(i+100), 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1) + tc.AddLeaderRegionWithWriteInfo(uint64(i+100), 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1) } } - items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1) + items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1) c.Check(items[0].GetThresholds()[0], Equals, float64(rate)*statistics.HotThresholdRatio) // Threshold of store 1,2,3 is 409.6 KB and others are 1 KB // Make the hot threshold of some store is high and the others are low rate = 10 * KB - tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3, 4}, 1) - items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{3, 4}, 1) + tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3, 4}, 1) + items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{3, 4}, 1) for _, item := range items { if item.StoreID < 4 { c.Check(item.IsNeedDelete(), IsTrue) diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index 11cf59b18fe..de24f4bf0a0 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -177,9 +177,9 @@ func (s *testShuffleHotRegionSchedulerSuite) checkBalance(c *C, tc *mockcluster. //| 1 | 1 | 2 | 3 | 512KB | //| 2 | 1 | 3 | 4 | 512KB | //| 3 | 1 | 2 | 4 | 512KB | - tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}) - tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{3, 4}) - tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 4}) + tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}) + tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{3, 4}) + tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 4}) tc.SetHotRegionCacheHitsThreshold(0) // try to get an operator @@ -217,9 +217,9 @@ func (s *testHotRegionSchedulerSuite) TestAbnormalReplica(c *C) { tc.UpdateStorageReadBytes(2, 4.5*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageReadBytes(3, 4.5*MB*statistics.StoreHeartBeatReportInterval) - tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2}) - tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3}) - tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}) + tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{2}) + tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{1, 3}) + tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{2, 3}) tc.SetHotRegionCacheHitsThreshold(0) c.Assert(tc.IsRegionHot(tc.GetRegion(1)), IsTrue) c.Assert(hb.Schedule(tc), IsNil) @@ -386,11 +386,11 @@ func (s *testSpecialUseSuite) TestSpecialUseHotRegion(c *C) { tc.UpdateStorageWrittenBytes(3, 6*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenBytes(4, 0) tc.UpdateStorageWrittenBytes(5, 0) - tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}) - tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}) - tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}) - tc.AddLeaderRegionWithWriteInfo(4, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3}) - tc.AddLeaderRegionWithWriteInfo(5, 3, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 2}) + tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}) + tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}) + tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}) + tc.AddLeaderRegionWithWriteInfo(4, 2, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{1, 3}) + tc.AddLeaderRegionWithWriteInfo(5, 3, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{1, 2}) ops = hs.Schedule(tc) c.Assert(ops, HasLen, 1) testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 1, 4) diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index 70d2be3ff62..45dc96eacfc 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -35,8 +35,7 @@ type dimStat struct { LastAverage *movingaverage.AvgOverTime // it's used to obtain the average speed in last second as instantaneous speed. } -func newDimStat(typ RegionStatKind) *dimStat { - reportInterval := RegionHeartBeatReportInterval * time.Second +func newDimStat(typ RegionStatKind, reportInterval time.Duration) *dimStat { return &dimStat{ typ: typ, Rolling: movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, reportInterval), @@ -129,7 +128,7 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi // IsNeedCoolDownTransferLeader use cooldown time after transfer leader to avoid unnecessary schedule func (stat *HotPeerStat) IsNeedCoolDownTransferLeader(minHotDegree int) bool { - return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*HotStatReportInterval) + return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*stat.hotStatReportInterval()) } // IsNeedDelete to delete the item in cache. @@ -192,3 +191,10 @@ func (stat *HotPeerStat) clearLastAverage() { l.clearLastAverage() } } + +func (stat *HotPeerStat) hotStatReportInterval() int { + if stat.Kind == ReadFlow { + return ReadReportInterval + } + return WriteReportInterval +} diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index e08b96e94f2..cbced39d18b 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -30,10 +30,11 @@ const ( TopNN = 60 // HotThresholdRatio is used to calculate hot thresholds HotThresholdRatio = 0.8 - // HotStatReportInterval indicates the interval between each data reporting - // TODO: change into StoreHeartBeatReportInterval when we use store heartbeat to report data - HotStatReportInterval = RegionHeartBeatReportInterval - topNTTL = 3 * HotStatReportInterval * time.Second + // WriteReportInterval indicates the interval between write interval + WriteReportInterval = RegionHeartBeatReportInterval + // ReadReportInterval indicates the interval between read stats report + // TODO: use StoreHeartBeatReportInterval in future + ReadReportInterval = RegionHeartBeatReportInterval rollingWindowsSize = 5 @@ -52,20 +53,29 @@ var minHotThresholds = [RegionStatCount]float64{ // hotPeerCache saves the hot peer's statistics. type hotPeerCache struct { - kind FlowKind - peersOfStore map[uint64]*TopN // storeID -> hot peers - storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs - inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat + kind FlowKind + peersOfStore map[uint64]*TopN // storeID -> hot peers + storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs + inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat + topNTTL time.Duration + reportIntervalSecs int } // NewHotStoresStats creates a HotStoresStats func NewHotStoresStats(kind FlowKind) *hotPeerCache { - return &hotPeerCache{ + c := &hotPeerCache{ kind: kind, peersOfStore: make(map[uint64]*TopN), storesOfRegion: make(map[uint64]map[uint64]struct{}), inheritItem: make(map[uint64]*HotPeerStat), } + if kind == WriteFlow { + c.reportIntervalSecs = WriteReportInterval + } else { + c.reportIntervalSecs = ReadReportInterval + } + c.topNTTL = 3 * time.Duration(c.reportIntervalSecs) * time.Second + return c } // TODO: rename RegionStats as PeerStats @@ -99,7 +109,7 @@ func (f *hotPeerCache) Update(item *HotPeerStat) { } else { peers, ok := f.peersOfStore[item.StoreID] if !ok { - peers = NewTopN(DimLen, TopNN, topNTTL) + peers = NewTopN(DimLen, TopNN, f.topNTTL) f.peersOfStore[item.StoreID] = peers } peers.Put(item) @@ -383,7 +393,7 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb } func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian { - return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, HotStatReportInterval*time.Second) + return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(f.reportIntervalSecs)*time.Second) } func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { @@ -403,14 +413,14 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa if !isHot { return nil } - if interval.Seconds() >= HotStatReportInterval { + if interval.Seconds() >= float64(f.reportIntervalSecs) { newItem.HotDegree = 1 newItem.AntiCount = hotRegionAntiCount } newItem.isNew = true newItem.rollingLoads = make([]*dimStat, len(regionStats)) for i, k := range regionStats { - ds := newDimStat(k) + ds := newDimStat(k, time.Duration(newItem.hotStatReportInterval())*time.Second) ds.Add(deltaLoads[k], interval) if ds.isFull() { ds.clearLastAverage() diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index 2a30f354593..f061f686687 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -117,23 +117,47 @@ func (s *hotTestSuite) TestHot(c *C) { c.Assert(hotRegion.AsLeader[hotStoreID].Stats[count-1].RegionID, Equals, hotRegionID) } } + + regionIDCounter := uint64(1) + testCommand := func(reportIntervals []uint64, hotType string) { + for _, reportInterval := range reportIntervals { + hotRegionID := regionIDCounter + regionIDCounter++ + switch hotType { + case "read": + pdctl.MustPutRegion(c, cluster, hotRegionID, hotStoreID, []byte("b"), []byte("c"), core.SetReadBytes(1000000000), core.SetReportInterval(reportInterval)) + time.Sleep(5000 * time.Millisecond) + if reportInterval >= statistics.RegionHeartBeatReportInterval { + count++ + } + testHot(hotRegionID, hotStoreID, "read") + case "write": + pdctl.MustPutRegion(c, cluster, hotRegionID, hotStoreID, []byte("c"), []byte("d"), core.SetWrittenBytes(1000000000), core.SetReportInterval(reportInterval)) + time.Sleep(5000 * time.Millisecond) + if reportInterval >= statistics.RegionHeartBeatReportInterval { + count++ + } + testHot(hotRegionID, hotStoreID, "write") + } + } + } reportIntervals := []uint64{ statistics.HotRegionReportMinInterval, statistics.HotRegionReportMinInterval + 1, - statistics.RegionHeartBeatReportInterval, - statistics.RegionHeartBeatReportInterval + 1, - statistics.RegionHeartBeatReportInterval * 2, - statistics.RegionHeartBeatReportInterval*2 + 1, + statistics.WriteReportInterval, + statistics.WriteReportInterval + 1, + statistics.WriteReportInterval * 2, + statistics.WriteReportInterval*2 + 1, } - for _, reportInterval := range reportIntervals { - hotReadRegionID, hotWriteRegionID := reportInterval, reportInterval+100 - pdctl.MustPutRegion(c, cluster, hotReadRegionID, hotStoreID, []byte("b"), []byte("c"), core.SetReadBytes(1000000000), core.SetReportInterval(reportInterval)) - pdctl.MustPutRegion(c, cluster, hotWriteRegionID, hotStoreID, []byte("c"), []byte("d"), core.SetWrittenBytes(1000000000), core.SetReportInterval(reportInterval)) - time.Sleep(5000 * time.Millisecond) - if reportInterval >= statistics.RegionHeartBeatReportInterval { - count++ - } - testHot(hotReadRegionID, hotStoreID, "read") - testHot(hotWriteRegionID, hotStoreID, "write") + testCommand(reportIntervals, "write") + count = 0 + reportIntervals = []uint64{ + statistics.HotRegionReportMinInterval, + statistics.HotRegionReportMinInterval + 1, + statistics.ReadReportInterval, + statistics.ReadReportInterval + 1, + statistics.ReadReportInterval * 2, + statistics.ReadReportInterval*2 + 1, } + testCommand(reportIntervals, "read") }