From dc62563cb269e4253e603cbd9fc8a20eeb75b1ae Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Mon, 1 Apr 2024 10:33:15 +0800 Subject: [PATCH] This is an automated cherry-pick of #7722 close tikv/pd#7726 Signed-off-by: ti-chi-bot --- pkg/progress/progress.go | 103 +++++++++++++++++++++---- pkg/progress/progress_test.go | 138 ++++++++++++++++++++++++++++++++-- pkg/schedule/coordinator.go | 22 +++++- server/cluster/cluster.go | 18 +++-- 4 files changed, 250 insertions(+), 31 deletions(-) diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index 345e4928c41..6940ad1dd01 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -24,8 +24,14 @@ import ( "github.com/tikv/pd/pkg/utils/syncutil" ) -// speedStatisticalWindow is the speed calculation window -const speedStatisticalWindow = 10 * time.Minute +const ( + // maxSpeedCalculationWindow is the maximum size of the time window used to calculate the speed, + // but it does not mean that all data in it will be used to calculate the speed, + // which data is used depends on the patrol region duration + maxSpeedCalculationWindow = 2 * time.Hour + // minSpeedCalculationWindow is the minimum speed calculation window + minSpeedCalculationWindow = 10 * time.Minute +) // Manager is used to maintain the progresses we care about. type Manager struct { @@ -46,12 +52,28 @@ type progressIndicator struct { remaining float64 // We use a fixed interval's history to calculate the latest average speed. history *list.List - // We use speedStatisticalWindow / updateInterval to get the windowLengthLimit. - // Assume that the windowLengthLimit is 3, the init value is 1. after update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4]. + // We use (maxSpeedCalculationWindow / updateInterval + 1) to get the windowCapacity. + // Assume that the windowCapacity is 4, the init value is 1. After update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4]. // Then we update it again with 5, the window will become [2, 3, 4, 5]. - windowLengthLimit int - updateInterval time.Duration - lastSpeed float64 + windowCapacity int + // windowLength is used to determine what data will be computed. + // Assume that the windowLength is 2, the init value is 1. The value that will be calculated are [1]. + // After update 3 times with 2, 3, 4 separately. The value that will be calculated are [3,4] and the values in queue are [(1,2),3,4]. + // It helps us avoid calculation results jumping change when patrol-region-interval changes. + windowLength int + // front is the first element which should be used. + // currentWindowLength indicates where the front is currently in the queue. + // Assume that the windowLength is 2, the init value is 1. The front is [1] and currentWindowLength is 1. + // After update 3 times with 2, 3, 4 separately. + // The front is [3], the currentWindowLength is 2, and values in queue are [(1,2),3,4] + // ^ front + // - - currentWindowLength = len([3,4]) = 2 + // We will always keep the currentWindowLength equal to windowLength if the actual size is enough. + front *list.Element + currentWindowLength int + + updateInterval time.Duration + lastSpeed float64 } // Reset resets the progress manager. @@ -62,13 +84,29 @@ func (m *Manager) Reset() { m.progesses = make(map[string]*progressIndicator) } +// Option is used to do some action for progressIndicator. +type Option func(*progressIndicator) + +// WindowDurationOption changes the time window size. +func WindowDurationOption(dur time.Duration) func(*progressIndicator) { + return func(pi *progressIndicator) { + if dur < minSpeedCalculationWindow { + dur = minSpeedCalculationWindow + } else if dur > maxSpeedCalculationWindow { + dur = maxSpeedCalculationWindow + } + pi.windowLength = int(dur/pi.updateInterval) + 1 + } +} + // AddProgress adds a progress into manager if it doesn't exist. -func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration) (exist bool) { +func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration, opts ...Option) (exist bool) { m.Lock() defer m.Unlock() history := list.New() history.PushBack(current) +<<<<<<< HEAD if _, exist = m.progesses[progress]; !exist { m.progesses[progress] = &progressIndicator{ total: total, @@ -76,38 +114,73 @@ func (m *Manager) AddProgress(progress string, current, total float64, updateInt history: history, windowLengthLimit: int(speedStatisticalWindow / updateInterval), updateInterval: updateInterval, +======= + if _, exist = m.progresses[progress]; !exist { + pi := &progressIndicator{ + total: total, + remaining: total, + history: history, + windowCapacity: int(maxSpeedCalculationWindow/updateInterval) + 1, + windowLength: int(minSpeedCalculationWindow / updateInterval), + updateInterval: updateInterval, +>>>>>>> 945e29c03 (cluster: dynamic progress time window for offline scene (#7722)) + } + for _, op := range opts { + op(pi) } + m.progresses[progress] = pi + pi.front = history.Front() + pi.currentWindowLength = 1 } return } // UpdateProgress updates the progress if it exists. -func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool) { +func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool, opts ...Option) { m.Lock() defer m.Unlock() +<<<<<<< HEAD if p, exist := m.progesses[progress]; exist { +======= + if p, exist := m.progresses[progress]; exist { + for _, op := range opts { + op(p) + } +>>>>>>> 945e29c03 (cluster: dynamic progress time window for offline scene (#7722)) p.remaining = remaining if p.total < remaining { p.total = remaining } - if p.history.Len() > p.windowLengthLimit { + p.history.PushBack(current) + p.currentWindowLength++ + + // try to move `front` into correct place. + for p.currentWindowLength > p.windowLength { + p.front = p.front.Next() + p.currentWindowLength-- + } + for p.currentWindowLength < p.windowLength && p.front.Prev() != nil { + p.front = p.front.Prev() + p.currentWindowLength++ + } + + for p.history.Len() > p.windowCapacity { p.history.Remove(p.history.Front()) } - p.history.PushBack(current) // It means it just init and we haven't update the progress if p.history.Len() <= 1 { p.lastSpeed = 0 } else if isInc { // the value increases, e.g., [1, 2, 3] - p.lastSpeed = (p.history.Back().Value.(float64) - p.history.Front().Value.(float64)) / - (float64(p.history.Len()-1) * p.updateInterval.Seconds()) + p.lastSpeed = (current - p.front.Value.(float64)) / + (float64(p.currentWindowLength-1) * p.updateInterval.Seconds()) } else { // the value decreases, e.g., [3, 2, 1] - p.lastSpeed = (p.history.Front().Value.(float64) - p.history.Back().Value.(float64)) / - (float64(p.history.Len()-1) * p.updateInterval.Seconds()) + p.lastSpeed = (p.front.Value.(float64) - current) / + (float64(p.currentWindowLength-1) * p.updateInterval.Seconds()) } if p.lastSpeed < 0 { p.lastSpeed = 0 diff --git a/pkg/progress/progress_test.go b/pkg/progress/progress_test.go index e6799fb0ff8..524d216d0ea 100644 --- a/pkg/progress/progress_test.go +++ b/pkg/progress/progress_test.go @@ -24,7 +24,6 @@ import ( ) func TestProgress(t *testing.T) { - t.Parallel() re := require.New(t) n := "test" m := NewManager() @@ -41,15 +40,17 @@ func TestProgress(t *testing.T) { p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.7, p) - // 30/(70/1s+) > 30/70 - re.Greater(ls, 30.0/70.0) - // 70/1s+ > 70 - re.Less(cs, 70.0) + re.Less(math.Abs(ls-30.0/7.0), 1e-6) + re.Less(math.Abs(cs-7), 1e-6) // there is no scheduling - for i := 0; i < 100; i++ { + for i := 0; i < 1000; i++ { m.UpdateProgress(n, 30, 30, false) } +<<<<<<< HEAD re.Equal(61, m.progesses[n].history.Len()) +======= + re.Equal(721, m.progresses[n].history.Len()) +>>>>>>> 945e29c03 (cluster: dynamic progress time window for offline scene (#7722)) p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.7, p) @@ -70,7 +71,6 @@ func TestProgress(t *testing.T) { } func TestAbnormal(t *testing.T) { - t.Parallel() re := require.New(t) n := "test" m := NewManager() @@ -95,3 +95,127 @@ func TestAbnormal(t *testing.T) { re.Equal(0.0, ls) re.Equal(0.0, cs) } + +func TestProgressWithDynamicWindow(t *testing.T) { + // The full capacity of queue is 721. + re := require.New(t) + n := "test" + m := NewManager() + re.False(m.AddProgress(n, 100, 100, 10*time.Second)) + p, ls, cs, err := m.Status(n) + re.NoError(err) + re.Equal(0.0, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + time.Sleep(time.Second) + re.True(m.AddProgress(n, 100, 100, 10*time.Second)) + + m.UpdateProgress(n, 31, 31, false) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.69, p) + re.Less(math.Abs(ls-31.0/6.9), 1e-6) + re.Less(math.Abs(cs-6.9), 1e-6) + re.Equal(2, m.progresses[n].currentWindowLength) + re.Equal(100.0, m.progresses[n].front.Value.(float64)) + + m.UpdateProgress(n, 30, 30, false, WindowDurationOption(time.Minute*20)) + re.Equal(3, m.progresses[n].currentWindowLength) + re.Equal(100.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.7, p) + re.Less(math.Abs(ls-30.0/(7.0/2)), 1e-6) + re.Less(math.Abs(cs-3.5), 1e-6) + + for i := 0; i < 1000; i++ { + m.UpdateProgress(n, 30, 30, false) + } + re.Equal(721, m.progresses[n].history.Len()) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.7, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + m.UpdateProgress(n, 29, 29, false, WindowDurationOption(time.Minute*20)) + re.Equal(121, m.progresses[n].currentWindowLength) + re.Equal(30.0, m.progresses[n].front.Value.(float64)) + re.Equal(721, m.progresses[n].history.Len()) + + for i := 0; i < 60; i++ { + m.UpdateProgress(n, 28, 28, false) + } + re.Equal(721, m.progresses[n].history.Len()) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.72, p) + re.Equal(float64(28/(2./120)*10.), ls) + re.Equal(float64(2./120/10.), cs) + + m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*10)) + re.Equal(721, m.progresses[n].history.Len()) + re.Equal(61, m.progresses[n].currentWindowLength) + re.Equal(28.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.72, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + + m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*20)) + re.Equal(121, m.progresses[n].currentWindowLength) + re.Equal(30.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.72, p) + re.Equal(float64(28/(2./120)*10.), ls) + re.Equal(float64(2./120/10.), cs) + + m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*12)) + re.Equal(73, m.progresses[n].currentWindowLength) + re.Equal(30.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(float64(1/(29./72)*10.), ls) + re.Equal(float64(29./72/10.), cs) + + m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*5)) + re.Equal(61, m.progresses[n].currentWindowLength) + re.Equal(28.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(float64(1/(27./60)*10.), ls) + re.Equal(float64(27./60/10.), cs) + + m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*180)) + p, ls, cs, err = m.Status(n) + re.Equal(721, m.progresses[n].currentWindowLength) + re.Equal(30.0, m.progresses[n].front.Value.(float64)) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(float64(1/(29./720)*10.), ls) + re.Equal(float64(29./720/10.), cs) + for i := 0; i < 2000; i++ { + m.UpdateProgress(n, 1, 1, false) + } + re.Equal(721, m.progresses[n].history.Len()) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + + ps := m.GetProgresses(func(p string) bool { + return strings.Contains(p, n) + }) + re.Len(ps, 1) + re.Equal(n, ps[0]) + ps = m.GetProgresses(func(p string) bool { + return strings.Contains(p, "a") + }) + re.Empty(ps) + re.True(m.RemoveProgress(n)) + re.False(m.RemoveProgress(n)) +} diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 7f4dcf09727..fcdd8c9a32c 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -74,6 +74,7 @@ type Coordinator struct { cancel context.CancelFunc schedulersInitialized bool + patrolRegionsDuration time.Duration cluster sche.ClusterInformer prepareChecker *prepareChecker @@ -110,6 +111,22 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams } } +// GetPatrolRegionsDuration returns the duration of the last patrol region round. +func (c *Coordinator) GetPatrolRegionsDuration() time.Duration { + if c == nil { + return 0 + } + c.RLock() + defer c.RUnlock() + return c.patrolRegionsDuration +} + +func (c *Coordinator) setPatrolRegionsDuration(dur time.Duration) { + c.Lock() + defer c.Unlock() + c.patrolRegionsDuration = dur +} + // markSchedulersInitialized marks the scheduler initialization is finished. func (c *Coordinator) markSchedulersInitialized() { c.Lock() @@ -157,6 +174,7 @@ func (c *Coordinator) PatrolRegions() { ticker.Reset(c.cluster.GetCheckerConfig().GetPatrolRegionInterval()) case <-c.ctx.Done(): patrolCheckRegionsGauge.Set(0) + c.setPatrolRegionsDuration(0) log.Info("patrol regions has been stopped") return } @@ -178,7 +196,9 @@ func (c *Coordinator) PatrolRegions() { // Updates the label level isolation statistics. c.cluster.UpdateRegionsLabelLevelStats(regions) if len(key) == 0 { - patrolCheckRegionsGauge.Set(time.Since(start).Seconds()) + dur := time.Since(start) + patrolCheckRegionsGauge.Set(dur.Seconds()) + c.setPatrolRegionsDuration(dur) start = time.Now() } failpoint.Inject("break-patrol", func() { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 043c0996acc..d6454f48ee3 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1513,7 +1513,7 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro if err == nil { regionSize := float64(c.core.GetStoreRegionSize(storeID)) c.resetProgress(storeID, store.GetAddress()) - c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval) + c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval, progress.WindowDurationOption(c.GetCoordinator().GetPatrolRegionsDuration())) // record the current store limit in memory c.prevStoreLimit[storeID] = map[storelimit.Type]float64{ storelimit.AddPeer: c.GetStoreLimitByType(storeID, storelimit.AddPeer), @@ -2059,21 +2059,23 @@ func updateTopology(topology map[string]interface{}, sortedLabels []*metapb.Stor func (c *RaftCluster) updateProgress(storeID uint64, storeAddress, action string, current, remaining float64, isInc bool) { storeLabel := strconv.FormatUint(storeID, 10) - var progress string + var progressName string + var opts []progress.Option switch action { case removingAction: - progress = encodeRemovingProgressKey(storeID) + progressName = encodeRemovingProgressKey(storeID) + opts = []progress.Option{progress.WindowDurationOption(c.coordinator.GetPatrolRegionsDuration())} case preparingAction: - progress = encodePreparingProgressKey(storeID) + progressName = encodePreparingProgressKey(storeID) } - if exist := c.progressManager.AddProgress(progress, current, remaining, nodeStateCheckJobInterval); !exist { + if exist := c.progressManager.AddProgress(progressName, current, remaining, nodeStateCheckJobInterval, opts...); !exist { return } - c.progressManager.UpdateProgress(progress, current, remaining, isInc) - process, ls, cs, err := c.progressManager.Status(progress) + c.progressManager.UpdateProgress(progressName, current, remaining, isInc, opts...) + process, ls, cs, err := c.progressManager.Status(progressName) if err != nil { - log.Error("get progress status failed", zap.String("progress", progress), zap.Float64("remaining", remaining), errs.ZapError(err)) + log.Error("get progress status failed", zap.String("progress", progressName), zap.Float64("remaining", remaining), errs.ZapError(err)) return } storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(process)