Skip to content

Commit

Permalink
statistic: add moving average for stable hot (tikv#2286)
Browse files Browse the repository at this point in the history
* add moving average
  • Loading branch information
lhy1024 authored Mar 28, 2020
1 parent 9c709cf commit 396d23c
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 39 deletions.
73 changes: 54 additions & 19 deletions server/statistics/avg_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import (
"github.com/phf/go-queue/queue"
)

const (
// StoreHeartBeatReportInterval is the heartbeat report interval of a store.
StoreHeartBeatReportInterval = 10
)

type deltaWithInterval struct {
delta float64
interval time.Duration
Expand Down Expand Up @@ -48,38 +53,68 @@ func NewAvgOverTime(interval time.Duration) *AvgOverTime {

// Get returns change rate in the last interval.
func (aot *AvgOverTime) Get() float64 {
if aot.intervalSum.Seconds() < 1 {
return 0
}
return aot.deltaSum / aot.intervalSum.Seconds()
}

// Clear clears the AvgOverTime.
func (aot *AvgOverTime) Clear() {
aot.que = queue.New()
aot.intervalSum = 0
aot.deltaSum = 0
}

// Add adds recent change to AvgOverTime.
func (aot *AvgOverTime) Add(delta float64, interval time.Duration) {
aot.que.PushBack(deltaWithInterval{delta, interval})
aot.deltaSum += delta
aot.intervalSum += interval
if aot.intervalSum <= aot.avgInterval {
return
}
for aot.que.Len() > 0 {
front := aot.que.Front().(deltaWithInterval)
if aot.intervalSum-front.interval >= aot.avgInterval {
aot.que.PopFront()
aot.deltaSum -= front.delta
aot.intervalSum -= front.interval
} else {
break
}
}
}

// Set sets AvgOverTime to the given average.
func (aot *AvgOverTime) Set(avg float64) {
for aot.que.Len() > 0 {
aot.que.PopFront()
}
aot.Clear()
aot.deltaSum = avg * aot.avgInterval.Seconds()
aot.intervalSum = aot.avgInterval
aot.que.PushBack(deltaWithInterval{delta: aot.deltaSum, interval: aot.intervalSum})
}

// TimeMedian is AvgOverTime + MedianFilter
// Size of MedianFilter should be larger than double size of AvgOverTime to denoisy.
// Delay is aotSize * mfSize * StoreHeartBeatReportInterval /4
type TimeMedian struct {
aotInterval time.Duration
aot *AvgOverTime
mf *MedianFilter
}

// NewTimeMedian returns a TimeMedian with given size.
func NewTimeMedian(aotSize, mfSize int) *TimeMedian {
interval := time.Duration(aotSize*StoreHeartBeatReportInterval) * time.Second
return &TimeMedian{
aotInterval: interval,
aot: NewAvgOverTime(interval),
mf: NewMedianFilter(mfSize),
}
}

// Get returns change rate in the median of the several intervals.
func (t *TimeMedian) Get() float64 {
return t.mf.Get()
}

// Add adds recent change to TimeMedian.
func (t *TimeMedian) Add(delta float64, interval time.Duration) {
if interval < 1 {
return
}
t.aot.Add(delta, interval)
if t.aot.intervalSum >= t.aotInterval {
t.mf.Add(t.aot.Get())
t.aot.Clear()
}
}

// Set sets the given average.
func (t *TimeMedian) Set(avg float64) {
t.mf.Set(avg)
}
4 changes: 2 additions & 2 deletions server/statistics/avg_over_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (t *testAvgOverTimeSuite) TestChange(c *C) {
for i := 0; i < 5; i++ {
aot.Add(500, time.Second)
}
c.Assert(aot.Get(), LessEqual, 505.)
c.Assert(aot.Get(), LessEqual, 900.)
c.Assert(aot.Get(), GreaterEqual, 495.)
for i := 0; i < 15; i++ {
aot.Add(500, time.Second)
Expand All @@ -65,6 +65,6 @@ func (t *testAvgOverTimeSuite) TestChange(c *C) {
for i := 0; i < 5; i++ {
aot.Add(100, time.Second)
}
c.Assert(aot.Get(), LessEqual, 101.)
c.Assert(aot.Get(), LessEqual, 678.)
c.Assert(aot.Get(), GreaterEqual, 99.)
}
30 changes: 15 additions & 15 deletions server/statistics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ import (
"github.com/pingcap/pd/v4/server/core"
)

const (
// StoreHeartBeatReportInterval is the heartbeat report interval of a store.
StoreHeartBeatReportInterval = 10
)

// StoresStats is a cache hold hot regions.
type StoresStats struct {
sync.RWMutex
Expand Down Expand Up @@ -261,25 +256,30 @@ func (s *StoresStats) GetStoresKeysReadStat() map[uint64]float64 {
// RollingStoreStats are multiple sets of recent historical records with specified windows size.
type RollingStoreStats struct {
sync.RWMutex
bytesWriteRate *AvgOverTime
bytesReadRate *AvgOverTime
keysWriteRate *AvgOverTime
keysReadRate *AvgOverTime
bytesWriteRate *TimeMedian
bytesReadRate *TimeMedian
keysWriteRate *TimeMedian
keysReadRate *TimeMedian
totalCPUUsage MovingAvg
totalBytesDiskReadRate MovingAvg
totalBytesDiskWriteRate MovingAvg
}

const storeStatsRollingWindows = 3
const storeAvgInterval time.Duration = 3 * StoreHeartBeatReportInterval * time.Second
const (
storeStatsRollingWindows = 3
// DefaultAotSize is default size of average over time.
DefaultAotSize = 2
// DefaultMfSize is default size of median filter
DefaultMfSize = 5
)

// NewRollingStoreStats creates a RollingStoreStats.
func newRollingStoreStats() *RollingStoreStats {
return &RollingStoreStats{
bytesWriteRate: NewAvgOverTime(storeAvgInterval),
bytesReadRate: NewAvgOverTime(storeAvgInterval),
keysWriteRate: NewAvgOverTime(storeAvgInterval),
keysReadRate: NewAvgOverTime(storeAvgInterval),
bytesWriteRate: NewTimeMedian(DefaultAotSize, DefaultMfSize),
bytesReadRate: NewTimeMedian(DefaultAotSize, DefaultMfSize),
keysWriteRate: NewTimeMedian(DefaultAotSize, DefaultMfSize),
keysReadRate: NewTimeMedian(DefaultAotSize, DefaultMfSize),
totalCPUUsage: NewMedianFilter(storeStatsRollingWindows),
totalBytesDiskReadRate: NewMedianFilter(storeStatsRollingWindows),
totalBytesDiskWriteRate: NewMedianFilter(storeStatsRollingWindows),
Expand Down
8 changes: 5 additions & 3 deletions tests/pdctl/hot/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func (s *hotTestSuite) TestHot(c *C) {
// test hot store
ss := leaderServer.GetStore(1)
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - 10), EndTimestamp: uint64(now)}
newStats := proto.Clone(ss.GetStoreStats()).(*pdpb.StoreStats)
bytesWritten := uint64(8 * 1024 * 1024)
bytesRead := uint64(16 * 1024 * 1024)
Expand All @@ -80,9 +79,12 @@ func (s *hotTestSuite) TestHot(c *C) {
newStats.BytesRead = bytesRead
newStats.KeysWritten = keysWritten
newStats.KeysRead = keysRead
newStats.Interval = interval
rc := leaderServer.GetRaftCluster()
rc.GetStoresStats().Observe(ss.GetID(), newStats)
for i := statistics.DefaultMfSize; i > 0; i-- {
newStats.Interval = &pdpb.TimeInterval{StartTimestamp: uint64(now - 10*i), EndTimestamp: uint64(now - 10*i + 10)}
rc.GetStoresStats().Observe(ss.GetID(), newStats)
}

args := []string{"-u", pdAddr, "hot", "store"}
_, output, err := pdctl.ExecuteCommandC(cmd, args...)
c.Assert(err, IsNil)
Expand Down

0 comments on commit 396d23c

Please sign in to comment.