Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: use separate report interval for hot peer stat #3661

Merged
merged 11 commits into from
May 11, 2021
37 changes: 23 additions & 14 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) {
testutil.CheckTransferLeader(c, hb.Schedule(tc)[0], operator.OpHotRegion, 1, 3)
hb.(*hotScheduler).clearPendingInfluence()
// assume handle the operator
tc.AddLeaderRegionWithReadInfo(3, 3, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 2})
tc.AddLeaderRegionWithReadInfo(3, 3, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{1, 2})
// After transfer a hot region leader from store 1 to store 3
// the three region leader will be evenly distributed in three stores

Expand Down Expand Up @@ -1071,12 +1071,16 @@ func addRegionInfo(tc *mockcluster.Cluster, rwTy rwType, regions []testRegionInf
if rwTy == write {
addFunc = tc.AddLeaderRegionWithWriteInfo
}
reportIntervalSecs := statistics.WriteReportInterval
if rwTy == read {
reportIntervalSecs = statistics.ReadReportInterval
}
for _, r := range regions {
addFunc(
r.id, r.peers[0],
uint64(r.byteRate*statistics.RegionHeartBeatReportInterval),
uint64(r.keyRate*statistics.RegionHeartBeatReportInterval),
statistics.RegionHeartBeatReportInterval,
uint64(r.byteRate*float64(reportIntervalSecs)),
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
uint64(r.keyRate*float64(reportIntervalSecs)),
uint64(reportIntervalSecs),
r.peers[1:],
)
}
Expand Down Expand Up @@ -1105,17 +1109,21 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h

tc.AddRegionStore(2, 20)
tc.UpdateStorageReadStats(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.5*MB*statistics.StoreHeartBeatReportInterval)
reportInterval := uint64(statistics.WriteReportInterval)
if kind == read {
reportInterval = uint64(statistics.ReadReportInterval)
}
// hot degree increase
heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
items := heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
heartbeat(1, 1, 512*KB*reportInterval, 0, reportInterval, []uint64{2, 3}, 1)
heartbeat(1, 1, 512*KB*reportInterval, 0, reportInterval, []uint64{2, 3}, 1)
items := heartbeat(1, 1, 512*KB*reportInterval, 0, reportInterval, []uint64{2, 3}, 1)
c.Check(len(items), Greater, 0)
for _, item := range items {
c.Check(item.HotDegree, Equals, 3)
}

// transfer leader, skip the first heartbeat and schedule.
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3}, 1)
items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 3}, 1)
for _, item := range items {
if !item.IsNeedDelete() {
c.Check(item.HotDegree, Equals, 3)
Expand All @@ -1133,12 +1141,12 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h
tc.SetHotRegionCacheHitsThreshold(threshold)

// move peer: add peer and remove peer
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3, 4}, 1)
items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 3, 4}, 1)
c.Check(len(items), Greater, 0)
for _, item := range items {
c.Check(item.HotDegree, Equals, 4)
}
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 4}, 1)
items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 4}, 1)
c.Check(len(items), Greater, 0)
for _, item := range items {
if item.StoreID == 3 {
Expand All @@ -1158,19 +1166,20 @@ func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) {
tc.SetLocationLabels([]string{"zone", "host"})
tc.DisableFeature(versioninfo.JointConsensus)
// some peers are hot, and some are cold #3198

rate := uint64(512 * KB)
for i := 0; i < statistics.TopNN; i++ {
for j := 0; j < statistics.DefaultAotSize; j++ {
tc.AddLeaderRegionWithWriteInfo(uint64(i+100), 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
tc.AddLeaderRegionWithWriteInfo(uint64(i+100), 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1)
}
}
items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1)
c.Check(items[0].GetThresholds()[0], Equals, float64(rate)*statistics.HotThresholdRatio)
// Threshold of store 1,2,3 is 409.6 KB and others are 1 KB
// Make the hot threshold of some store is high and the others are low
rate = 10 * KB
tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3, 4}, 1)
items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{3, 4}, 1)
tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3, 4}, 1)
items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{3, 4}, 1)
for _, item := range items {
if item.StoreID < 4 {
c.Check(item.IsNeedDelete(), IsTrue)
Expand Down
22 changes: 11 additions & 11 deletions server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ func (s *testShuffleHotRegionSchedulerSuite) checkBalance(c *C, tc *mockcluster.
//| 1 | 1 | 2 | 3 | 512KB |
//| 2 | 1 | 3 | 4 | 512KB |
//| 3 | 1 | 2 | 4 | 512KB |
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{3, 4})
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 4})
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{3, 4})
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 4})
tc.SetHotRegionCacheHitsThreshold(0)

// try to get an operator
Expand Down Expand Up @@ -217,9 +217,9 @@ func (s *testHotRegionSchedulerSuite) TestAbnormalReplica(c *C) {
tc.UpdateStorageReadBytes(2, 4.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(3, 4.5*MB*statistics.StoreHeartBeatReportInterval)

tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2})
tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3})
tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{2})
tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{1, 3})
tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{2, 3})
tc.SetHotRegionCacheHitsThreshold(0)
c.Assert(tc.IsRegionHot(tc.GetRegion(1)), IsTrue)
c.Assert(hb.Schedule(tc), IsNil)
Expand Down Expand Up @@ -386,11 +386,11 @@ func (s *testSpecialUseSuite) TestSpecialUseHotRegion(c *C) {
tc.UpdateStorageWrittenBytes(3, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(4, 0)
tc.UpdateStorageWrittenBytes(5, 0)
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(4, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3})
tc.AddLeaderRegionWithWriteInfo(5, 3, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 2})
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(4, 2, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{1, 3})
tc.AddLeaderRegionWithWriteInfo(5, 3, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{1, 2})
ops = hs.Schedule(tc)
c.Assert(ops, HasLen, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 1, 4)
Expand Down
12 changes: 9 additions & 3 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ type dimStat struct {
LastAverage *movingaverage.AvgOverTime // it's used to obtain the average speed in last second as instantaneous speed.
}

func newDimStat(typ RegionStatKind) *dimStat {
reportInterval := RegionHeartBeatReportInterval * time.Second
func newDimStat(typ RegionStatKind, reportInterval time.Duration) *dimStat {
return &dimStat{
typ: typ,
Rolling: movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, reportInterval),
Expand Down Expand Up @@ -129,7 +128,7 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi

// IsNeedCoolDownTransferLeader use cooldown time after transfer leader to avoid unnecessary schedule
func (stat *HotPeerStat) IsNeedCoolDownTransferLeader(minHotDegree int) bool {
return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*HotStatReportInterval)
return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*stat.hotStatReportInterval())
}

// IsNeedDelete to delete the item in cache.
Expand Down Expand Up @@ -192,3 +191,10 @@ func (stat *HotPeerStat) clearLastAverage() {
l.clearLastAverage()
}
}

func (stat *HotPeerStat) hotStatReportInterval() int {
if stat.Kind == ReadFlow {
return ReadReportInterval
}
return WriteReportInterval
}
36 changes: 23 additions & 13 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ const (
TopNN = 60
// HotThresholdRatio is used to calculate hot thresholds
HotThresholdRatio = 0.8
// HotStatReportInterval indicates the interval between each data reporting
// TODO: change into StoreHeartBeatReportInterval when we use store heartbeat to report data
HotStatReportInterval = RegionHeartBeatReportInterval
topNTTL = 3 * HotStatReportInterval * time.Second
// WriteReportInterval indicates the interval between write interval
WriteReportInterval = RegionHeartBeatReportInterval
// ReadReportInterval indicates the interval between read stats report
// TODO: use StoreHeartBeatReportInterval in future
ReadReportInterval = RegionHeartBeatReportInterval

rollingWindowsSize = 5

Expand All @@ -52,20 +53,29 @@ var minHotThresholds = [RegionStatCount]float64{

// hotPeerCache saves the hot peer's statistics.
type hotPeerCache struct {
kind FlowKind
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat
kind FlowKind
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat
topNTTL time.Duration
reportIntervalSecs int
}

// NewHotStoresStats creates a HotStoresStats
func NewHotStoresStats(kind FlowKind) *hotPeerCache {
return &hotPeerCache{
c := &hotPeerCache{
kind: kind,
peersOfStore: make(map[uint64]*TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
inheritItem: make(map[uint64]*HotPeerStat),
}
if kind == WriteFlow {
c.reportIntervalSecs = WriteReportInterval
} else {
c.reportIntervalSecs = ReadReportInterval
}
c.topNTTL = 3 * time.Duration(c.reportIntervalSecs) * time.Second
return c
}

// TODO: rename RegionStats as PeerStats
Expand Down Expand Up @@ -99,7 +109,7 @@ func (f *hotPeerCache) Update(item *HotPeerStat) {
} else {
peers, ok := f.peersOfStore[item.StoreID]
if !ok {
peers = NewTopN(DimLen, TopNN, topNTTL)
peers = NewTopN(DimLen, TopNN, f.topNTTL)
f.peersOfStore[item.StoreID] = peers
}
peers.Put(item)
Expand Down Expand Up @@ -383,7 +393,7 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb
}

func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian {
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, HotStatReportInterval*time.Second)
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(f.reportIntervalSecs)*time.Second)
}

func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat {
Expand All @@ -403,14 +413,14 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa
if !isHot {
return nil
}
if interval.Seconds() >= HotStatReportInterval {
if interval.Seconds() >= float64(f.reportIntervalSecs) {
newItem.HotDegree = 1
newItem.AntiCount = hotRegionAntiCount
}
newItem.isNew = true
newItem.rollingLoads = make([]*dimStat, len(regionStats))
for i, k := range regionStats {
ds := newDimStat(k)
ds := newDimStat(k, time.Duration(newItem.hotStatReportInterval())*time.Second)
ds.Add(deltaLoads[k], interval)
if ds.isFull() {
ds.clearLastAverage()
Expand Down
52 changes: 38 additions & 14 deletions tests/pdctl/hot/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,23 +117,47 @@ func (s *hotTestSuite) TestHot(c *C) {
c.Assert(hotRegion.AsLeader[hotStoreID].Stats[count-1].RegionID, Equals, hotRegionID)
}
}

regionIDCounter := uint64(1)
testCommand := func(reportIntervals []uint64, hotType string) {
for _, reportInterval := range reportIntervals {
hotRegionID := regionIDCounter
regionIDCounter++
switch hotType {
case "read":
pdctl.MustPutRegion(c, cluster, hotRegionID, hotStoreID, []byte("b"), []byte("c"), core.SetReadBytes(1000000000), core.SetReportInterval(reportInterval))
time.Sleep(5000 * time.Millisecond)
if reportInterval >= statistics.RegionHeartBeatReportInterval {
count++
}
testHot(hotRegionID, hotStoreID, "read")
case "write":
pdctl.MustPutRegion(c, cluster, hotRegionID, hotStoreID, []byte("c"), []byte("d"), core.SetWrittenBytes(1000000000), core.SetReportInterval(reportInterval))
time.Sleep(5000 * time.Millisecond)
if reportInterval >= statistics.RegionHeartBeatReportInterval {
count++
}
testHot(hotRegionID, hotStoreID, "write")
}
}
}
reportIntervals := []uint64{
statistics.HotRegionReportMinInterval,
statistics.HotRegionReportMinInterval + 1,
statistics.RegionHeartBeatReportInterval,
statistics.RegionHeartBeatReportInterval + 1,
statistics.RegionHeartBeatReportInterval * 2,
statistics.RegionHeartBeatReportInterval*2 + 1,
statistics.WriteReportInterval,
statistics.WriteReportInterval + 1,
statistics.WriteReportInterval * 2,
statistics.WriteReportInterval*2 + 1,
}
for _, reportInterval := range reportIntervals {
hotReadRegionID, hotWriteRegionID := reportInterval, reportInterval+100
pdctl.MustPutRegion(c, cluster, hotReadRegionID, hotStoreID, []byte("b"), []byte("c"), core.SetReadBytes(1000000000), core.SetReportInterval(reportInterval))
pdctl.MustPutRegion(c, cluster, hotWriteRegionID, hotStoreID, []byte("c"), []byte("d"), core.SetWrittenBytes(1000000000), core.SetReportInterval(reportInterval))
time.Sleep(5000 * time.Millisecond)
if reportInterval >= statistics.RegionHeartBeatReportInterval {
count++
}
testHot(hotReadRegionID, hotStoreID, "read")
testHot(hotWriteRegionID, hotStoreID, "write")
testCommand(reportIntervals, "write")
count = 0
reportIntervals = []uint64{
statistics.HotRegionReportMinInterval,
statistics.HotRegionReportMinInterval + 1,
statistics.ReadReportInterval,
statistics.ReadReportInterval + 1,
statistics.ReadReportInterval * 2,
statistics.ReadReportInterval*2 + 1,
}
testCommand(reportIntervals, "read")
}