From 396d23cdd045da7a5650de9ae8a349d74cbb0908 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Sat, 28 Mar 2020 17:15:58 +0800 Subject: [PATCH] statistic: add moving average for stable hot (#2286) * add moving average --- server/statistics/avg_over_time.go | 73 ++++++++++++++++++------- server/statistics/avg_over_time_test.go | 4 +- server/statistics/store.go | 30 +++++----- tests/pdctl/hot/hot_test.go | 8 ++- 4 files changed, 76 insertions(+), 39 deletions(-) diff --git a/server/statistics/avg_over_time.go b/server/statistics/avg_over_time.go index eacfba71c5c..ead36715aa7 100644 --- a/server/statistics/avg_over_time.go +++ b/server/statistics/avg_over_time.go @@ -19,6 +19,11 @@ import ( "github.com/phf/go-queue/queue" ) +const ( + // StoreHeartBeatReportInterval is the heartbeat report interval of a store. + StoreHeartBeatReportInterval = 10 +) + type deltaWithInterval struct { delta float64 interval time.Duration @@ -48,38 +53,68 @@ func NewAvgOverTime(interval time.Duration) *AvgOverTime { // Get returns change rate in the last interval. func (aot *AvgOverTime) Get() float64 { - if aot.intervalSum.Seconds() < 1 { - return 0 - } return aot.deltaSum / aot.intervalSum.Seconds() } +// Clear clears the AvgOverTime. +func (aot *AvgOverTime) Clear() { + aot.que = queue.New() + aot.intervalSum = 0 + aot.deltaSum = 0 +} + // Add adds recent change to AvgOverTime. func (aot *AvgOverTime) Add(delta float64, interval time.Duration) { aot.que.PushBack(deltaWithInterval{delta, interval}) aot.deltaSum += delta aot.intervalSum += interval - if aot.intervalSum <= aot.avgInterval { - return - } - for aot.que.Len() > 0 { - front := aot.que.Front().(deltaWithInterval) - if aot.intervalSum-front.interval >= aot.avgInterval { - aot.que.PopFront() - aot.deltaSum -= front.delta - aot.intervalSum -= front.interval - } else { - break - } - } } // Set sets AvgOverTime to the given average. func (aot *AvgOverTime) Set(avg float64) { - for aot.que.Len() > 0 { - aot.que.PopFront() - } + aot.Clear() aot.deltaSum = avg * aot.avgInterval.Seconds() aot.intervalSum = aot.avgInterval aot.que.PushBack(deltaWithInterval{delta: aot.deltaSum, interval: aot.intervalSum}) } + +// TimeMedian is AvgOverTime + MedianFilter +// Size of MedianFilter should be larger than double size of AvgOverTime to denoisy. +// Delay is aotSize * mfSize * StoreHeartBeatReportInterval /4 +type TimeMedian struct { + aotInterval time.Duration + aot *AvgOverTime + mf *MedianFilter +} + +// NewTimeMedian returns a TimeMedian with given size. +func NewTimeMedian(aotSize, mfSize int) *TimeMedian { + interval := time.Duration(aotSize*StoreHeartBeatReportInterval) * time.Second + return &TimeMedian{ + aotInterval: interval, + aot: NewAvgOverTime(interval), + mf: NewMedianFilter(mfSize), + } +} + +// Get returns change rate in the median of the several intervals. +func (t *TimeMedian) Get() float64 { + return t.mf.Get() +} + +// Add adds recent change to TimeMedian. +func (t *TimeMedian) Add(delta float64, interval time.Duration) { + if interval < 1 { + return + } + t.aot.Add(delta, interval) + if t.aot.intervalSum >= t.aotInterval { + t.mf.Add(t.aot.Get()) + t.aot.Clear() + } +} + +// Set sets the given average. +func (t *TimeMedian) Set(avg float64) { + t.mf.Set(avg) +} diff --git a/server/statistics/avg_over_time_test.go b/server/statistics/avg_over_time_test.go index c617e6bdc73..cbf436b1c12 100644 --- a/server/statistics/avg_over_time_test.go +++ b/server/statistics/avg_over_time_test.go @@ -55,7 +55,7 @@ func (t *testAvgOverTimeSuite) TestChange(c *C) { for i := 0; i < 5; i++ { aot.Add(500, time.Second) } - c.Assert(aot.Get(), LessEqual, 505.) + c.Assert(aot.Get(), LessEqual, 900.) c.Assert(aot.Get(), GreaterEqual, 495.) for i := 0; i < 15; i++ { aot.Add(500, time.Second) @@ -65,6 +65,6 @@ func (t *testAvgOverTimeSuite) TestChange(c *C) { for i := 0; i < 5; i++ { aot.Add(100, time.Second) } - c.Assert(aot.Get(), LessEqual, 101.) + c.Assert(aot.Get(), LessEqual, 678.) c.Assert(aot.Get(), GreaterEqual, 99.) } diff --git a/server/statistics/store.go b/server/statistics/store.go index 02eb5732767..7c3a4e35ad8 100644 --- a/server/statistics/store.go +++ b/server/statistics/store.go @@ -21,11 +21,6 @@ import ( "github.com/pingcap/pd/v4/server/core" ) -const ( - // StoreHeartBeatReportInterval is the heartbeat report interval of a store. - StoreHeartBeatReportInterval = 10 -) - // StoresStats is a cache hold hot regions. type StoresStats struct { sync.RWMutex @@ -261,25 +256,30 @@ func (s *StoresStats) GetStoresKeysReadStat() map[uint64]float64 { // RollingStoreStats are multiple sets of recent historical records with specified windows size. type RollingStoreStats struct { sync.RWMutex - bytesWriteRate *AvgOverTime - bytesReadRate *AvgOverTime - keysWriteRate *AvgOverTime - keysReadRate *AvgOverTime + bytesWriteRate *TimeMedian + bytesReadRate *TimeMedian + keysWriteRate *TimeMedian + keysReadRate *TimeMedian totalCPUUsage MovingAvg totalBytesDiskReadRate MovingAvg totalBytesDiskWriteRate MovingAvg } -const storeStatsRollingWindows = 3 -const storeAvgInterval time.Duration = 3 * StoreHeartBeatReportInterval * time.Second +const ( + storeStatsRollingWindows = 3 + // DefaultAotSize is default size of average over time. + DefaultAotSize = 2 + // DefaultMfSize is default size of median filter + DefaultMfSize = 5 +) // NewRollingStoreStats creates a RollingStoreStats. func newRollingStoreStats() *RollingStoreStats { return &RollingStoreStats{ - bytesWriteRate: NewAvgOverTime(storeAvgInterval), - bytesReadRate: NewAvgOverTime(storeAvgInterval), - keysWriteRate: NewAvgOverTime(storeAvgInterval), - keysReadRate: NewAvgOverTime(storeAvgInterval), + bytesWriteRate: NewTimeMedian(DefaultAotSize, DefaultMfSize), + bytesReadRate: NewTimeMedian(DefaultAotSize, DefaultMfSize), + keysWriteRate: NewTimeMedian(DefaultAotSize, DefaultMfSize), + keysReadRate: NewTimeMedian(DefaultAotSize, DefaultMfSize), totalCPUUsage: NewMedianFilter(storeStatsRollingWindows), totalBytesDiskReadRate: NewMedianFilter(storeStatsRollingWindows), totalBytesDiskWriteRate: NewMedianFilter(storeStatsRollingWindows), diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index b9ec8ac3d2b..f5af3439ff3 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -70,7 +70,6 @@ func (s *hotTestSuite) TestHot(c *C) { // test hot store ss := leaderServer.GetStore(1) now := time.Now().Second() - interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - 10), EndTimestamp: uint64(now)} newStats := proto.Clone(ss.GetStoreStats()).(*pdpb.StoreStats) bytesWritten := uint64(8 * 1024 * 1024) bytesRead := uint64(16 * 1024 * 1024) @@ -80,9 +79,12 @@ func (s *hotTestSuite) TestHot(c *C) { newStats.BytesRead = bytesRead newStats.KeysWritten = keysWritten newStats.KeysRead = keysRead - newStats.Interval = interval rc := leaderServer.GetRaftCluster() - rc.GetStoresStats().Observe(ss.GetID(), newStats) + for i := statistics.DefaultMfSize; i > 0; i-- { + newStats.Interval = &pdpb.TimeInterval{StartTimestamp: uint64(now - 10*i), EndTimestamp: uint64(now - 10*i + 10)} + rc.GetStoresStats().Observe(ss.GetID(), newStats) + } + args := []string{"-u", pdAddr, "hot", "store"} _, output, err := pdctl.ExecuteCommandC(cmd, args...) c.Assert(err, IsNil)