Skip to content

Commit

Permalink
cluster: dynamic progress time window for offline scene (#7722) (#8006)
Browse files Browse the repository at this point in the history
close #7726

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: Cabinfever_B <[email protected]>

Co-authored-by: Yongbo Jiang <[email protected]>
Co-authored-by: Cabinfever_B <[email protected]>
  • Loading branch information
ti-chi-bot and CabinfeverB authored Apr 1, 2024
1 parent 256d793 commit f11bf75
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 48 deletions.
121 changes: 90 additions & 31 deletions pkg/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,25 @@ import (
"github.com/tikv/pd/pkg/syncutil"
)

// speedStatisticalWindow is the speed calculation window
const speedStatisticalWindow = 10 * time.Minute
const (
// maxSpeedCalculationWindow is the maximum size of the time window used to calculate the speed,
// but it does not mean that all data in it will be used to calculate the speed,
// which data is used depends on the patrol region duration
maxSpeedCalculationWindow = 2 * time.Hour
// minSpeedCalculationWindow is the minimum speed calculation window
minSpeedCalculationWindow = 10 * time.Minute
)

// Manager is used to maintain the progresses we care about.
type Manager struct {
syncutil.RWMutex
progesses map[string]*progressIndicator
progresses map[string]*progressIndicator
}

// NewManager creates a new Manager.
func NewManager() *Manager {
return &Manager{
progesses: make(map[string]*progressIndicator),
progresses: make(map[string]*progressIndicator),
}
}

Expand All @@ -46,68 +52,121 @@ type progressIndicator struct {
remaining float64
// We use a fixed interval's history to calculate the latest average speed.
history *list.List
// We use speedStatisticalWindow / updateInterval to get the windowLengthLimit.
// Assume that the windowLengthLimit is 3, the init value is 1. after update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4].
// We use (maxSpeedCalculationWindow / updateInterval + 1) to get the windowCapacity.
// Assume that the windowCapacity is 4, the init value is 1. After update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4].
// Then we update it again with 5, the window will become [2, 3, 4, 5].
windowLengthLimit int
updateInterval time.Duration
lastSpeed float64
windowCapacity int
// windowLength is used to determine what data will be computed.
// Assume that the windowLength is 2, the init value is 1. The value that will be calculated are [1].
// After update 3 times with 2, 3, 4 separately. The value that will be calculated are [3,4] and the values in queue are [(1,2),3,4].
// It helps us avoid calculation results jumping change when patrol-region-interval changes.
windowLength int
// front is the first element which should be used.
// currentWindowLength indicates where the front is currently in the queue.
// Assume that the windowLength is 2, the init value is 1. The front is [1] and currentWindowLength is 1.
// After update 3 times with 2, 3, 4 separately.
// The front is [3], the currentWindowLength is 2, and values in queue are [(1,2),3,4]
// ^ front
// - - currentWindowLength = len([3,4]) = 2
// We will always keep the currentWindowLength equal to windowLength if the actual size is enough.
front *list.Element
currentWindowLength int

updateInterval time.Duration
lastSpeed float64
}

// Reset resets the progress manager.
func (m *Manager) Reset() {
m.Lock()
defer m.Unlock()

m.progesses = make(map[string]*progressIndicator)
m.progresses = make(map[string]*progressIndicator)
}

// Option is used to do some action for progressIndicator.
type Option func(*progressIndicator)

// WindowDurationOption changes the time window size.
func WindowDurationOption(dur time.Duration) func(*progressIndicator) {
return func(pi *progressIndicator) {
if dur < minSpeedCalculationWindow {
dur = minSpeedCalculationWindow
} else if dur > maxSpeedCalculationWindow {
dur = maxSpeedCalculationWindow
}
pi.windowLength = int(dur/pi.updateInterval) + 1
}
}

// AddProgress adds a progress into manager if it doesn't exist.
func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration) (exist bool) {
func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration, opts ...Option) (exist bool) {
m.Lock()
defer m.Unlock()

history := list.New()
history.PushBack(current)
if _, exist = m.progesses[progress]; !exist {
m.progesses[progress] = &progressIndicator{
total: total,
remaining: total,
history: history,
windowLengthLimit: int(speedStatisticalWindow / updateInterval),
updateInterval: updateInterval,
if _, exist = m.progresses[progress]; !exist {
pi := &progressIndicator{
total: total,
remaining: total,
history: history,
windowCapacity: int(maxSpeedCalculationWindow/updateInterval) + 1,
windowLength: int(minSpeedCalculationWindow / updateInterval),
updateInterval: updateInterval,
}
for _, op := range opts {
op(pi)
}
m.progresses[progress] = pi
pi.front = history.Front()
pi.currentWindowLength = 1
}
return
}

// UpdateProgress updates the progress if it exists.
func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool) {
func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool, opts ...Option) {
m.Lock()
defer m.Unlock()

if p, exist := m.progesses[progress]; exist {
if p, exist := m.progresses[progress]; exist {
for _, op := range opts {
op(p)
}
p.remaining = remaining
if p.total < remaining {
p.total = remaining
}

if p.history.Len() > p.windowLengthLimit {
p.history.PushBack(current)
p.currentWindowLength++

// try to move `front` into correct place.
for p.currentWindowLength > p.windowLength {
p.front = p.front.Next()
p.currentWindowLength--
}
for p.currentWindowLength < p.windowLength && p.front.Prev() != nil {
p.front = p.front.Prev()
p.currentWindowLength++
}

for p.history.Len() > p.windowCapacity {
p.history.Remove(p.history.Front())
}
p.history.PushBack(current)

// It means it just init and we haven't update the progress
if p.history.Len() <= 1 {
p.lastSpeed = 0
} else if isInc {
// the value increases, e.g., [1, 2, 3]
p.lastSpeed = (p.history.Back().Value.(float64) - p.history.Front().Value.(float64)) /
(float64(p.history.Len()-1) * p.updateInterval.Seconds())
p.lastSpeed = (current - p.front.Value.(float64)) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
} else {
// the value decreases, e.g., [3, 2, 1]
p.lastSpeed = (p.history.Front().Value.(float64) - p.history.Back().Value.(float64)) /
(float64(p.history.Len()-1) * p.updateInterval.Seconds())
p.lastSpeed = (p.front.Value.(float64) - current) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
}
if p.lastSpeed < 0 {
p.lastSpeed = 0
Expand All @@ -120,7 +179,7 @@ func (m *Manager) UpdateProgressTotal(progress string, total float64) {
m.Lock()
defer m.Unlock()

if p, exist := m.progesses[progress]; exist {
if p, exist := m.progresses[progress]; exist {
p.total = total
}
}
Expand All @@ -130,8 +189,8 @@ func (m *Manager) RemoveProgress(progress string) (exist bool) {
m.Lock()
defer m.Unlock()

if _, exist = m.progesses[progress]; exist {
delete(m.progesses, progress)
if _, exist = m.progresses[progress]; exist {
delete(m.progresses, progress)
return
}
return
Expand All @@ -143,7 +202,7 @@ func (m *Manager) GetProgresses(filter func(p string) bool) []string {
defer m.RUnlock()

processes := []string{}
for p := range m.progesses {
for p := range m.progresses {
if filter(p) {
processes = append(processes, p)
}
Expand All @@ -156,7 +215,7 @@ func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed fl
m.RLock()
defer m.RUnlock()

if p, exist := m.progesses[progress]; exist {
if p, exist := m.progresses[progress]; exist {
process = 1 - p.remaining/p.total
if process < 0 {
process = 0
Expand Down
136 changes: 128 additions & 8 deletions pkg/progress/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
)

func TestProgress(t *testing.T) {
t.Parallel()
re := require.New(t)
n := "test"
m := NewManager()
Expand All @@ -41,15 +40,13 @@ func TestProgress(t *testing.T) {
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
// 30/(70/1s+) > 30/70
re.Greater(ls, 30.0/70.0)
// 70/1s+ > 70
re.Less(cs, 70.0)
re.Less(math.Abs(ls-30.0/7.0), 1e-6)
re.Less(math.Abs(cs-7), 1e-6)
// there is no scheduling
for i := 0; i < 100; i++ {
for i := 0; i < 1000; i++ {
m.UpdateProgress(n, 30, 30, false)
}
re.Equal(61, m.progesses[n].history.Len())
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
Expand All @@ -70,7 +67,6 @@ func TestProgress(t *testing.T) {
}

func TestAbnormal(t *testing.T) {
t.Parallel()
re := require.New(t)
n := "test"
m := NewManager()
Expand All @@ -95,3 +91,127 @@ func TestAbnormal(t *testing.T) {
re.Equal(0.0, ls)
re.Equal(0.0, cs)
}

func TestProgressWithDynamicWindow(t *testing.T) {
// The full capacity of queue is 721.
re := require.New(t)
n := "test"
m := NewManager()
re.False(m.AddProgress(n, 100, 100, 10*time.Second))
p, ls, cs, err := m.Status(n)
re.NoError(err)
re.Equal(0.0, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)
time.Sleep(time.Second)
re.True(m.AddProgress(n, 100, 100, 10*time.Second))

m.UpdateProgress(n, 31, 31, false)
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.69, p)
re.Less(math.Abs(ls-31.0/6.9), 1e-6)
re.Less(math.Abs(cs-6.9), 1e-6)
re.Equal(2, m.progresses[n].currentWindowLength)
re.Equal(100.0, m.progresses[n].front.Value.(float64))

m.UpdateProgress(n, 30, 30, false, WindowDurationOption(time.Minute*20))
re.Equal(3, m.progresses[n].currentWindowLength)
re.Equal(100.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
re.Less(math.Abs(ls-30.0/(7.0/2)), 1e-6)
re.Less(math.Abs(cs-3.5), 1e-6)

for i := 0; i < 1000; i++ {
m.UpdateProgress(n, 30, 30, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)
m.UpdateProgress(n, 29, 29, false, WindowDurationOption(time.Minute*20))
re.Equal(121, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
re.Equal(721, m.progresses[n].history.Len())

for i := 0; i < 60; i++ {
m.UpdateProgress(n, 28, 28, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(float64(28/(2./120)*10.), ls)
re.Equal(float64(2./120/10.), cs)

m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*10))
re.Equal(721, m.progresses[n].history.Len())
re.Equal(61, m.progresses[n].currentWindowLength)
re.Equal(28.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)

m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*20))
re.Equal(121, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(float64(28/(2./120)*10.), ls)
re.Equal(float64(2./120/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*12))
re.Equal(73, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(29./72)*10.), ls)
re.Equal(float64(29./72/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*5))
re.Equal(61, m.progresses[n].currentWindowLength)
re.Equal(28.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(27./60)*10.), ls)
re.Equal(float64(27./60/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*180))
p, ls, cs, err = m.Status(n)
re.Equal(721, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(29./720)*10.), ls)
re.Equal(float64(29./720/10.), cs)
for i := 0; i < 2000; i++ {
m.UpdateProgress(n, 1, 1, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)

ps := m.GetProgresses(func(p string) bool {
return strings.Contains(p, n)
})
re.Len(ps, 1)
re.Equal(n, ps[0])
ps = m.GetProgresses(func(p string) bool {
return strings.Contains(p, "a")
})
re.Empty(ps)
re.True(m.RemoveProgress(n))
re.False(m.RemoveProgress(n))
}
Loading

0 comments on commit f11bf75

Please sign in to comment.