Skip to content

Commit

Permalink
separte interval
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer committed May 10, 2021
1 parent df1614b commit 3636d1a
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 56 deletions.
37 changes: 23 additions & 14 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) {
testutil.CheckTransferLeader(c, hb.Schedule(tc)[0], operator.OpHotRegion, 1, 3)
hb.(*hotScheduler).clearPendingInfluence()
// assume handle the operator
tc.AddLeaderRegionWithReadInfo(3, 3, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 2})
tc.AddLeaderRegionWithReadInfo(3, 3, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{1, 2})
// After transfer a hot region leader from store 1 to store 3
// the three region leader will be evenly distributed in three stores

Expand Down Expand Up @@ -1071,12 +1071,16 @@ func addRegionInfo(tc *mockcluster.Cluster, rwTy rwType, regions []testRegionInf
if rwTy == write {
addFunc = tc.AddLeaderRegionWithWriteInfo
}
reportIntervalSecs := statistics.WriteReportInterval
if rwTy == read {
reportIntervalSecs = statistics.ReadReportInterval
}
for _, r := range regions {
addFunc(
r.id, r.peers[0],
uint64(r.byteRate*statistics.RegionHeartBeatReportInterval),
uint64(r.keyRate*statistics.RegionHeartBeatReportInterval),
statistics.RegionHeartBeatReportInterval,
uint64(r.byteRate*float64(reportIntervalSecs)),
uint64(r.keyRate*float64(reportIntervalSecs)),
uint64(reportIntervalSecs),
r.peers[1:],
)
}
Expand Down Expand Up @@ -1105,17 +1109,21 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h

tc.AddRegionStore(2, 20)
tc.UpdateStorageReadStats(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.5*MB*statistics.StoreHeartBeatReportInterval)
reportInterval := statistics.WriteReportInterval
if kind == read {
reportInterval = statistics.ReadReportInterval
}
// hot degree increase
heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
items := heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
heartbeat(1, 1, 512*KB*uint64(reportInterval), 0, uint64(reportInterval), []uint64{2, 3}, 1)
heartbeat(1, 1, 512*KB*uint64(reportInterval), 0, uint64(reportInterval), []uint64{2, 3}, 1)
items := heartbeat(1, 1, 512*KB*uint64(reportInterval), 0, uint64(reportInterval), []uint64{2, 3}, 1)
c.Check(len(items), Greater, 0)
for _, item := range items {
c.Check(item.HotDegree, Equals, 3)
}

// transfer leader, skip the first heartbeat and schedule.
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3}, 1)
items = heartbeat(1, 2, 512*KB*uint64(reportInterval), 0, uint64(reportInterval), []uint64{1, 3}, 1)
for _, item := range items {
if !item.IsNeedDelete() {
c.Check(item.HotDegree, Equals, 3)
Expand All @@ -1133,12 +1141,12 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h
tc.SetHotRegionCacheHitsThreshold(threshold)

// move peer: add peer and remove peer
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3, 4}, 1)
items = heartbeat(1, 2, 512*KB*uint64(reportInterval), 0, uint64(reportInterval), []uint64{1, 3, 4}, 1)
c.Check(len(items), Greater, 0)
for _, item := range items {
c.Check(item.HotDegree, Equals, 4)
}
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 4}, 1)
items = heartbeat(1, 2, 512*KB*uint64(reportInterval), 0, uint64(reportInterval), []uint64{1, 4}, 1)
c.Check(len(items), Greater, 0)
for _, item := range items {
if item.StoreID == 3 {
Expand All @@ -1158,19 +1166,20 @@ func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) {
tc.SetLocationLabels([]string{"zone", "host"})
tc.DisableFeature(versioninfo.JointConsensus)
// some peers are hot, and some are cold #3198

rate := uint64(512 * KB)
for i := 0; i < statistics.TopNN; i++ {
for j := 0; j < statistics.DefaultAotSize; j++ {
tc.AddLeaderRegionWithWriteInfo(uint64(i+100), 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
tc.AddLeaderRegionWithWriteInfo(uint64(i+100), 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1)
}
}
items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1)
c.Check(items[0].GetThresholds()[0], Equals, float64(rate)*statistics.HotThresholdRatio)
// Threshold of store 1,2,3 is 409.6 KB and others are 1 KB
// Make the hot threshold of some store is high and the others are low
rate = 10 * KB
tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3, 4}, 1)
items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{3, 4}, 1)
tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3, 4}, 1)
items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{3, 4}, 1)
for _, item := range items {
if item.StoreID < 4 {
c.Check(item.IsNeedDelete(), IsTrue)
Expand Down
22 changes: 11 additions & 11 deletions server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ func (s *testShuffleHotRegionSchedulerSuite) checkBalance(c *C, tc *mockcluster.
//| 1 | 1 | 2 | 3 | 512KB |
//| 2 | 1 | 3 | 4 | 512KB |
//| 3 | 1 | 2 | 4 | 512KB |
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{3, 4})
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 4})
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{3, 4})
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 4})
tc.SetHotRegionCacheHitsThreshold(0)

// try to get an operator
Expand Down Expand Up @@ -217,9 +217,9 @@ func (s *testHotRegionSchedulerSuite) TestAbnormalReplica(c *C) {
tc.UpdateStorageReadBytes(2, 4.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(3, 4.5*MB*statistics.StoreHeartBeatReportInterval)

tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2})
tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3})
tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{2})
tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{1, 3})
tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{2, 3})
tc.SetHotRegionCacheHitsThreshold(0)
c.Assert(tc.IsRegionHot(tc.GetRegion(1)), IsTrue)
c.Assert(hb.Schedule(tc), IsNil)
Expand Down Expand Up @@ -386,11 +386,11 @@ func (s *testSpecialUseSuite) TestSpecialUseHotRegion(c *C) {
tc.UpdateStorageWrittenBytes(3, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(4, 0)
tc.UpdateStorageWrittenBytes(5, 0)
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(4, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3})
tc.AddLeaderRegionWithWriteInfo(5, 3, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 2})
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(4, 2, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{1, 3})
tc.AddLeaderRegionWithWriteInfo(5, 3, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{1, 2})
ops = hs.Schedule(tc)
c.Assert(ops, HasLen, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 1, 4)
Expand Down
4 changes: 3 additions & 1 deletion server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type HotPeerStat struct {
Kind FlowKind `json:"-"`
Loads []float64 `json:"loads"`

expectReportIntervalSecs int

// rolling statistics, recording some recently added records.
rollingLoads []*dimStat

Expand Down Expand Up @@ -129,7 +131,7 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi

// IsNeedCoolDownTransferLeader use cooldown time after transfer leader to avoid unnecessary schedule
func (stat *HotPeerStat) IsNeedCoolDownTransferLeader(minHotDegree int) bool {
return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*HotStatReportInterval)
return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*stat.expectReportIntervalSecs)
}

// IsNeedDelete to delete the item in cache.
Expand Down
40 changes: 27 additions & 13 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ const (
// TopNN is the threshold which means we can get hot threshold from store.
TopNN = 60
// HotThresholdRatio is used to calculate hot thresholds
HotThresholdRatio = 0.8
// HotStatReportInterval indicates the interval between each data reporting
// TODO: change into StoreHeartBeatReportInterval when we use store heartbeat to report data
HotStatReportInterval = RegionHeartBeatReportInterval
topNTTL = 3 * HotStatReportInterval * time.Second
HotThresholdRatio = 0.8
WriteReportInterval = RegionHeartBeatReportInterval
// TODO: use StoreHeartBeatReportInterval in future
ReadReportInterval = RegionHeartBeatReportInterval

rollingWindowsSize = 5

Expand All @@ -52,20 +51,30 @@ var minHotThresholds = [RegionStatCount]float64{

// hotPeerCache saves the hot peer's statistics.
type hotPeerCache struct {
kind FlowKind
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat
kind FlowKind
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat
topNTTL time.Duration
reportIntervalSecs int
}

// NewHotStoresStats creates a HotStoresStats
func NewHotStoresStats(kind FlowKind) *hotPeerCache {
return &hotPeerCache{
c := &hotPeerCache{
kind: kind,
peersOfStore: make(map[uint64]*TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
inheritItem: make(map[uint64]*HotPeerStat),
}
if kind == WriteFlow {
c.reportIntervalSecs = WriteReportInterval
c.topNTTL = 3 * WriteReportInterval * time.Second
} else {
c.reportIntervalSecs = ReadReportInterval
c.topNTTL = 3 * ReadReportInterval * time.Second
}
return c
}

// TODO: rename RegionStats as PeerStats
Expand Down Expand Up @@ -99,7 +108,7 @@ func (f *hotPeerCache) Update(item *HotPeerStat) {
} else {
peers, ok := f.peersOfStore[item.StoreID]
if !ok {
peers = NewTopN(DimLen, TopNN, topNTTL)
peers = NewTopN(DimLen, TopNN, f.topNTTL)
f.peersOfStore[item.StoreID] = peers
}
peers.Put(item)
Expand Down Expand Up @@ -217,6 +226,11 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf
peers: peers,
thresholds: thresholds,
}
if f.kind == WriteFlow {
newItem.expectReportIntervalSecs = WriteReportInterval
} else {
newItem.expectReportIntervalSecs = ReadReportInterval
}
if oldItem == nil {
inheritItem := f.takeInheritItem(region.GetID())
if inheritItem != nil {
Expand Down Expand Up @@ -383,7 +397,7 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb
}

func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian {
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, HotStatReportInterval*time.Second)
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(f.reportIntervalSecs)*time.Second)
}

func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat {
Expand All @@ -403,7 +417,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa
if !isHot {
return nil
}
if interval.Seconds() >= HotStatReportInterval {
if interval.Seconds() >= float64(f.reportIntervalSecs) {
newItem.HotDegree = 1
newItem.AntiCount = hotRegionAntiCount
}
Expand Down
54 changes: 37 additions & 17 deletions tests/pdctl/hot/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,23 +117,43 @@ func (s *hotTestSuite) TestHot(c *C) {
c.Assert(hotRegion.AsLeader[hotStoreID].Stats[count-1].RegionID, Equals, hotRegionID)
}
}
reportIntervals := []uint64{
statistics.HotRegionReportMinInterval,
statistics.HotRegionReportMinInterval + 1,
statistics.RegionHeartBeatReportInterval,
statistics.RegionHeartBeatReportInterval + 1,
statistics.RegionHeartBeatReportInterval * 2,
statistics.RegionHeartBeatReportInterval*2 + 1,
}
for _, reportInterval := range reportIntervals {
hotReadRegionID, hotWriteRegionID := reportInterval, reportInterval+100
pdctl.MustPutRegion(c, cluster, hotReadRegionID, hotStoreID, []byte("b"), []byte("c"), core.SetReadBytes(1000000000), core.SetReportInterval(reportInterval))
pdctl.MustPutRegion(c, cluster, hotWriteRegionID, hotStoreID, []byte("c"), []byte("d"), core.SetWrittenBytes(1000000000), core.SetReportInterval(reportInterval))
time.Sleep(5000 * time.Millisecond)
if reportInterval >= statistics.RegionHeartBeatReportInterval {
count++
testInterval := func(hotType string) {
reportIntervals := []uint64{
statistics.HotRegionReportMinInterval,
statistics.HotRegionReportMinInterval + 1,
statistics.WriteReportInterval,
statistics.WriteReportInterval + 1,
statistics.WriteReportInterval * 2,
statistics.WriteReportInterval*2 + 1,
}
if hotType == "read" {
reportIntervals = []uint64{
statistics.HotRegionReportMinInterval,
statistics.HotRegionReportMinInterval + 1,
statistics.ReadReportInterval,
statistics.ReadReportInterval + 1,
statistics.ReadReportInterval * 2,
statistics.ReadReportInterval*2 + 1,
}
}
for _, reportInterval := range reportIntervals {
hotReadRegionID, hotWriteRegionID := reportInterval, reportInterval+100
if hotType == "read" {
pdctl.MustPutRegion(c, cluster, hotReadRegionID, hotStoreID, []byte("b"), []byte("c"), core.SetReadBytes(1000000000), core.SetReportInterval(reportInterval))
} else {
pdctl.MustPutRegion(c, cluster, hotWriteRegionID, hotStoreID, []byte("c"), []byte("d"), core.SetWrittenBytes(1000000000), core.SetReportInterval(reportInterval))
}
time.Sleep(5000 * time.Millisecond)
expectInterval := statistics.WriteReportInterval
if hotType == "read" {
expectInterval = statistics.ReadReportInterval
}
if reportInterval >= uint64(expectInterval) {
count++
}
testHot(hotReadRegionID, hotStoreID, hotType)
}
testHot(hotReadRegionID, hotStoreID, "read")
testHot(hotWriteRegionID, hotStoreID, "write")
}
testInterval("read")
testInterval("write")
}

0 comments on commit 3636d1a

Please sign in to comment.