Skip to content

Commit

Permalink
Merge branch 'master' into simple-pending
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 11, 2021
2 parents 21c9551 + 8e3564d commit d4f505f
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 55 deletions.
37 changes: 23 additions & 14 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) {
testutil.CheckTransferLeader(c, hb.Schedule(tc)[0], operator.OpHotRegion, 1, 3)
hb.(*hotScheduler).clearPendingInfluence()
// assume handle the operator
tc.AddLeaderRegionWithReadInfo(3, 3, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 2})
tc.AddLeaderRegionWithReadInfo(3, 3, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{1, 2})
// After transfer a hot region leader from store 1 to store 3
// the three region leader will be evenly distributed in three stores

Expand Down Expand Up @@ -1066,12 +1066,16 @@ func addRegionInfo(tc *mockcluster.Cluster, rwTy rwType, regions []testRegionInf
if rwTy == write {
addFunc = tc.AddLeaderRegionWithWriteInfo
}
reportIntervalSecs := statistics.WriteReportInterval
if rwTy == read {
reportIntervalSecs = statistics.ReadReportInterval
}
for _, r := range regions {
addFunc(
r.id, r.peers[0],
uint64(r.byteRate*statistics.RegionHeartBeatReportInterval),
uint64(r.keyRate*statistics.RegionHeartBeatReportInterval),
statistics.RegionHeartBeatReportInterval,
uint64(r.byteRate*float64(reportIntervalSecs)),
uint64(r.keyRate*float64(reportIntervalSecs)),
uint64(reportIntervalSecs),
r.peers[1:],
)
}
Expand Down Expand Up @@ -1100,17 +1104,21 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h

tc.AddRegionStore(2, 20)
tc.UpdateStorageReadStats(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.5*MB*statistics.StoreHeartBeatReportInterval)
reportInterval := uint64(statistics.WriteReportInterval)
if kind == read {
reportInterval = uint64(statistics.ReadReportInterval)
}
// hot degree increase
heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
items := heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
heartbeat(1, 1, 512*KB*reportInterval, 0, reportInterval, []uint64{2, 3}, 1)
heartbeat(1, 1, 512*KB*reportInterval, 0, reportInterval, []uint64{2, 3}, 1)
items := heartbeat(1, 1, 512*KB*reportInterval, 0, reportInterval, []uint64{2, 3}, 1)
c.Check(len(items), Greater, 0)
for _, item := range items {
c.Check(item.HotDegree, Equals, 3)
}

// transfer leader, skip the first heartbeat and schedule.
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3}, 1)
items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 3}, 1)
for _, item := range items {
if !item.IsNeedDelete() {
c.Check(item.HotDegree, Equals, 3)
Expand All @@ -1128,12 +1136,12 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h
tc.SetHotRegionCacheHitsThreshold(threshold)

// move peer: add peer and remove peer
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3, 4}, 1)
items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 3, 4}, 1)
c.Check(len(items), Greater, 0)
for _, item := range items {
c.Check(item.HotDegree, Equals, 4)
}
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 4}, 1)
items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 4}, 1)
c.Check(len(items), Greater, 0)
for _, item := range items {
if item.StoreID == 3 {
Expand All @@ -1153,19 +1161,20 @@ func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) {
tc.SetLocationLabels([]string{"zone", "host"})
tc.DisableFeature(versioninfo.JointConsensus)
// some peers are hot, and some are cold #3198

rate := uint64(512 * KB)
for i := 0; i < statistics.TopNN; i++ {
for j := 0; j < statistics.DefaultAotSize; j++ {
tc.AddLeaderRegionWithWriteInfo(uint64(i+100), 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
tc.AddLeaderRegionWithWriteInfo(uint64(i+100), 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1)
}
}
items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3}, 1)
c.Check(items[0].GetThresholds()[0], Equals, float64(rate)*statistics.HotThresholdRatio)
// Threshold of store 1,2,3 is 409.6 KB and others are 1 KB
// Make the hot threshold of some store is high and the others are low
rate = 10 * KB
tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3, 4}, 1)
items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{3, 4}, 1)
tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3, 4}, 1)
items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{3, 4}, 1)
for _, item := range items {
if item.StoreID < 4 {
c.Check(item.IsNeedDelete(), IsTrue)
Expand Down
22 changes: 11 additions & 11 deletions server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ func (s *testShuffleHotRegionSchedulerSuite) checkBalance(c *C, tc *mockcluster.
//| 1 | 1 | 2 | 3 | 512KB |
//| 2 | 1 | 3 | 4 | 512KB |
//| 3 | 1 | 2 | 4 | 512KB |
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{3, 4})
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 4})
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{3, 4})
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 4})
tc.SetHotRegionCacheHitsThreshold(0)

// try to get an operator
Expand Down Expand Up @@ -217,9 +217,9 @@ func (s *testHotRegionSchedulerSuite) TestAbnormalReplica(c *C) {
tc.UpdateStorageReadBytes(2, 4.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(3, 4.5*MB*statistics.StoreHeartBeatReportInterval)

tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2})
tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3})
tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithReadInfo(1, 1, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{2})
tc.AddLeaderRegionWithReadInfo(2, 2, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{1, 3})
tc.AddLeaderRegionWithReadInfo(3, 1, 512*KB*statistics.ReadReportInterval, 0, statistics.ReadReportInterval, []uint64{2, 3})
tc.SetHotRegionCacheHitsThreshold(0)
c.Assert(tc.IsRegionHot(tc.GetRegion(1)), IsTrue)
c.Assert(hb.Schedule(tc), IsNil)
Expand Down Expand Up @@ -386,11 +386,11 @@ func (s *testSpecialUseSuite) TestSpecialUseHotRegion(c *C) {
tc.UpdateStorageWrittenBytes(3, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(4, 0)
tc.UpdateStorageWrittenBytes(5, 0)
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(4, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3})
tc.AddLeaderRegionWithWriteInfo(5, 3, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 2})
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{2, 3})
tc.AddLeaderRegionWithWriteInfo(4, 2, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{1, 3})
tc.AddLeaderRegionWithWriteInfo(5, 3, 512*KB*statistics.WriteReportInterval, 0, statistics.WriteReportInterval, []uint64{1, 2})
ops = hs.Schedule(tc)
c.Assert(ops, HasLen, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 1, 4)
Expand Down
12 changes: 9 additions & 3 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ type dimStat struct {
LastAverage *movingaverage.AvgOverTime // it's used to obtain the average speed in last second as instantaneous speed.
}

func newDimStat(typ RegionStatKind) *dimStat {
reportInterval := RegionHeartBeatReportInterval * time.Second
func newDimStat(typ RegionStatKind, reportInterval time.Duration) *dimStat {
return &dimStat{
typ: typ,
Rolling: movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, reportInterval),
Expand Down Expand Up @@ -129,7 +128,7 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi

// IsNeedCoolDownTransferLeader use cooldown time after transfer leader to avoid unnecessary schedule
func (stat *HotPeerStat) IsNeedCoolDownTransferLeader(minHotDegree int) bool {
return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*HotStatReportInterval)
return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*stat.hotStatReportInterval())
}

// IsNeedDelete to delete the item in cache.
Expand Down Expand Up @@ -192,3 +191,10 @@ func (stat *HotPeerStat) clearLastAverage() {
l.clearLastAverage()
}
}

func (stat *HotPeerStat) hotStatReportInterval() int {
if stat.Kind == ReadFlow {
return ReadReportInterval
}
return WriteReportInterval
}
36 changes: 23 additions & 13 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ const (
TopNN = 60
// HotThresholdRatio is used to calculate hot thresholds
HotThresholdRatio = 0.8
// HotStatReportInterval indicates the interval between each data reporting
// TODO: change into StoreHeartBeatReportInterval when we use store heartbeat to report data
HotStatReportInterval = RegionHeartBeatReportInterval
topNTTL = 3 * HotStatReportInterval * time.Second
// WriteReportInterval indicates the interval between write interval
WriteReportInterval = RegionHeartBeatReportInterval
// ReadReportInterval indicates the interval between read stats report
// TODO: use StoreHeartBeatReportInterval in future
ReadReportInterval = RegionHeartBeatReportInterval

rollingWindowsSize = 5

Expand All @@ -52,20 +53,29 @@ var minHotThresholds = [RegionStatCount]float64{

// hotPeerCache saves the hot peer's statistics.
type hotPeerCache struct {
kind FlowKind
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat
kind FlowKind
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat
topNTTL time.Duration
reportIntervalSecs int
}

// NewHotStoresStats creates a HotStoresStats
func NewHotStoresStats(kind FlowKind) *hotPeerCache {
return &hotPeerCache{
c := &hotPeerCache{
kind: kind,
peersOfStore: make(map[uint64]*TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
inheritItem: make(map[uint64]*HotPeerStat),
}
if kind == WriteFlow {
c.reportIntervalSecs = WriteReportInterval
} else {
c.reportIntervalSecs = ReadReportInterval
}
c.topNTTL = 3 * time.Duration(c.reportIntervalSecs) * time.Second
return c
}

// TODO: rename RegionStats as PeerStats
Expand Down Expand Up @@ -99,7 +109,7 @@ func (f *hotPeerCache) Update(item *HotPeerStat) {
} else {
peers, ok := f.peersOfStore[item.StoreID]
if !ok {
peers = NewTopN(DimLen, TopNN, topNTTL)
peers = NewTopN(DimLen, TopNN, f.topNTTL)
f.peersOfStore[item.StoreID] = peers
}
peers.Put(item)
Expand Down Expand Up @@ -383,7 +393,7 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb
}

func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian {
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, HotStatReportInterval*time.Second)
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(f.reportIntervalSecs)*time.Second)
}

func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat {
Expand All @@ -403,14 +413,14 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa
if !isHot {
return nil
}
if interval.Seconds() >= HotStatReportInterval {
if interval.Seconds() >= float64(f.reportIntervalSecs) {
newItem.HotDegree = 1
newItem.AntiCount = hotRegionAntiCount
}
newItem.isNew = true
newItem.rollingLoads = make([]*dimStat, len(regionStats))
for i, k := range regionStats {
ds := newDimStat(k)
ds := newDimStat(k, time.Duration(newItem.hotStatReportInterval())*time.Second)
ds.Add(deltaLoads[k], interval)
if ds.isFull() {
ds.clearLastAverage()
Expand Down
52 changes: 38 additions & 14 deletions tests/pdctl/hot/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,23 +117,47 @@ func (s *hotTestSuite) TestHot(c *C) {
c.Assert(hotRegion.AsLeader[hotStoreID].Stats[count-1].RegionID, Equals, hotRegionID)
}
}

regionIDCounter := uint64(1)
testCommand := func(reportIntervals []uint64, hotType string) {
for _, reportInterval := range reportIntervals {
hotRegionID := regionIDCounter
regionIDCounter++
switch hotType {
case "read":
pdctl.MustPutRegion(c, cluster, hotRegionID, hotStoreID, []byte("b"), []byte("c"), core.SetReadBytes(1000000000), core.SetReportInterval(reportInterval))
time.Sleep(5000 * time.Millisecond)
if reportInterval >= statistics.RegionHeartBeatReportInterval {
count++
}
testHot(hotRegionID, hotStoreID, "read")
case "write":
pdctl.MustPutRegion(c, cluster, hotRegionID, hotStoreID, []byte("c"), []byte("d"), core.SetWrittenBytes(1000000000), core.SetReportInterval(reportInterval))
time.Sleep(5000 * time.Millisecond)
if reportInterval >= statistics.RegionHeartBeatReportInterval {
count++
}
testHot(hotRegionID, hotStoreID, "write")
}
}
}
reportIntervals := []uint64{
statistics.HotRegionReportMinInterval,
statistics.HotRegionReportMinInterval + 1,
statistics.RegionHeartBeatReportInterval,
statistics.RegionHeartBeatReportInterval + 1,
statistics.RegionHeartBeatReportInterval * 2,
statistics.RegionHeartBeatReportInterval*2 + 1,
statistics.WriteReportInterval,
statistics.WriteReportInterval + 1,
statistics.WriteReportInterval * 2,
statistics.WriteReportInterval*2 + 1,
}
for _, reportInterval := range reportIntervals {
hotReadRegionID, hotWriteRegionID := reportInterval, reportInterval+100
pdctl.MustPutRegion(c, cluster, hotReadRegionID, hotStoreID, []byte("b"), []byte("c"), core.SetReadBytes(1000000000), core.SetReportInterval(reportInterval))
pdctl.MustPutRegion(c, cluster, hotWriteRegionID, hotStoreID, []byte("c"), []byte("d"), core.SetWrittenBytes(1000000000), core.SetReportInterval(reportInterval))
time.Sleep(5000 * time.Millisecond)
if reportInterval >= statistics.RegionHeartBeatReportInterval {
count++
}
testHot(hotReadRegionID, hotStoreID, "read")
testHot(hotWriteRegionID, hotStoreID, "write")
testCommand(reportIntervals, "write")
count = 0
reportIntervals = []uint64{
statistics.HotRegionReportMinInterval,
statistics.HotRegionReportMinInterval + 1,
statistics.ReadReportInterval,
statistics.ReadReportInterval + 1,
statistics.ReadReportInterval * 2,
statistics.ReadReportInterval*2 + 1,
}
testCommand(reportIntervals, "read")
}

0 comments on commit d4f505f

Please sign in to comment.