Skip to content

Commit

Permalink
Merge branch 'master' into fix-issue-4769
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 17, 2022
2 parents 04b24dd + 7088c95 commit f297bf2
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 158 deletions.
153 changes: 83 additions & 70 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -1062,99 +1062,112 @@
"type": "gauge"
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 6,
"w": 8,
"w": 4,
"x": 8,
"y": 13
"y": 14
},
"hiddenSeries": false,
"id": 1451,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"id": 1111,
"options": {
"alertThreshold": true
"showHeader": false
},
"percentage": false,
"pluginVersion": "7.5.11",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "pd_cluster_eta{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}",
"legendFormat": "{{action}}-{{store}}",
"interval": "",
"legendFormat": "{{store}}-{{action}}",
"exemplar": true,
"queryType": "randomWalk",
"refId": "A"
"refId": "A",
"instant": true
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Left seconds",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
"type": "table",
"fieldConfig": {
"defaults": {
"custom": {
"align": "left",
"filterable": false
},
"mappings": [
{
"id": 1,
"type": 1,
"from": "",
"to": "",
"text": "N/A",
"value": "1.7976931348623157e+308"
}
],
"unit": "s"
},
"overrides": []
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
"transformations": [
{
"id": "reduce",
"options": {
"reducers": [
"last"
]
}
}
],
"timeFrom": null,
"timeShift": null
},
{
"type": "table",
"title": "Current scaling speed",
"datasource": "${DS_TEST-CLUSTER}",
"gridPos": {
"x": 12,
"y": 13,
"w": 4,
"h": 6
},
"yaxes": [
"id": 1112,
"targets": [
{
"format": "s",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
"expr": "pd_cluster_speed{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}",
"legendFormat": "{{action}}-{{store}}",
"interval": "",
"exemplar": true,
"refId": "A",
"queryType": "randomWalk",
"instant": true
}
],
"options": {
"showHeader": false
},
"fieldConfig": {
"defaults": {
"custom": {
"align": "left",
"filterable": false
},
"mappings": [],
"unit": "MBs"
},
"overrides": []
},
"transformations": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
"id": "reduce",
"options": {
"reducers": [
"last"
]
}
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
"timeFrom": null,
"timeShift": null
},
{
"collapsed": true,
Expand Down
69 changes: 42 additions & 27 deletions pkg/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package progress

import (
"container/list"
"fmt"
"math"
"time"
Expand All @@ -23,8 +24,8 @@ import (
"github.com/tikv/pd/pkg/syncutil"
)

// speedStatisticalInterval is the speed calculation interval
var speedStatisticalInterval = 5 * time.Minute
// speedStatisticalWindow is the speed calculation window
const speedStatisticalWindow = 10 * time.Minute

// Manager is used to maintain the progresses we care about.
type Manager struct {
Expand All @@ -43,10 +44,14 @@ func NewManager() *Manager {
type progressIndicator struct {
total float64
remaining float64
// we use a fixed interval to calculate the latest average speed.
lastTimeRemaining float64
// We use a fixed interval's history to calculate the latest average speed.
history *list.List
// We use speedStatisticalWindow / updateInterval to get the windowLengthLimit.
// Assume that the windowLengthLimit is 3, the init value is 1. after update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4].
// Then we update it again with 5, the window will become [2, 3, 4, 5].
windowLengthLimit int
updateInterval time.Duration
lastSpeed float64
lastTime time.Time
}

// Reset resets the progress manager.
Expand All @@ -58,23 +63,26 @@ func (m *Manager) Reset() {
}

// AddProgress adds a progress into manager if it doesn't exist.
func (m *Manager) AddProgress(progress string, total float64) (exist bool) {
func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration) (exist bool) {
m.Lock()
defer m.Unlock()

history := list.New()
history.PushBack(current)
if _, exist = m.progesses[progress]; !exist {
m.progesses[progress] = &progressIndicator{
total: total,
remaining: total,
lastTimeRemaining: total,
lastTime: time.Now(),
history: history,
windowLengthLimit: int(speedStatisticalWindow / updateInterval),
updateInterval: updateInterval,
}
}
return
}

// UpdateProgressRemaining updates the remaining value of a progress if it exists.
func (m *Manager) UpdateProgressRemaining(progress string, remaining float64) {
// UpdateProgress updates the progress if it exists.
func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool) {
m.Lock()
defer m.Unlock()

Expand All @@ -83,18 +91,26 @@ func (m *Manager) UpdateProgressRemaining(progress string, remaining float64) {
if p.total < remaining {
p.total = remaining
}
if p.lastTimeRemaining < remaining {
p.lastTimeRemaining = remaining

if p.history.Len() > p.windowLengthLimit {
p.history.Remove(p.history.Front())
}
p.history.PushBack(current)

// It means it just init and we haven't update the progress
if p.history.Len() <= 1 {
p.lastSpeed = 0
} else if isInc {
// the value increases, e.g., [1, 2, 3]
p.lastSpeed = (p.history.Back().Value.(float64) - p.history.Front().Value.(float64)) /
(float64(p.history.Len()-1) * p.updateInterval.Seconds())
} else {
// the value decreases, e.g., [3, 2, 1]
p.lastSpeed = (p.history.Front().Value.(float64) - p.history.Back().Value.(float64)) /
(float64(p.history.Len()-1) * p.updateInterval.Seconds())
}
// calculate the average speed for every `speedStatisticalInterval`
if time.Since(p.lastTime) >= speedStatisticalInterval {
if (p.lastTimeRemaining - remaining) <= 0 {
p.lastSpeed = 0
} else {
p.lastSpeed = (p.lastTimeRemaining - remaining) / time.Since(p.lastTime).Seconds()
}
p.lastTime = time.Now()
p.lastTimeRemaining = remaining
if p.lastSpeed < 0 {
p.lastSpeed = 0
}
}
}
Expand Down Expand Up @@ -147,13 +163,12 @@ func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed fl
err = errs.ErrProgressWrongStatus.FastGenByArgs(fmt.Sprintf("the remaining: %v is larger than the total: %v", p.remaining, p.total))
return
}
currentSpeed = 0
// when the progress is newly added
if p.lastSpeed == 0 && time.Since(p.lastTime) < speedStatisticalInterval {
currentSpeed = (p.lastTimeRemaining - p.remaining) / time.Since(p.lastTime).Seconds()
} else {
currentSpeed = p.lastSpeed
currentSpeed = p.lastSpeed
// When the progress is newly added, there is no last speed.
if p.lastSpeed == 0 && p.history.Len() <= 1 {
currentSpeed = 0
}

leftSeconds = p.remaining / currentSpeed
if math.IsNaN(leftSeconds) || math.IsInf(leftSeconds, 0) {
leftSeconds = math.MaxFloat64
Expand Down
25 changes: 11 additions & 14 deletions pkg/progress/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,16 @@ type testProgressSuite struct{}
func (s *testProgressSuite) Test(c *C) {
n := "test"
m := NewManager()
c.Assert(m.AddProgress(n, 100), IsFalse)
c.Assert(m.AddProgress(n, 100, 100, 10*time.Second), IsFalse)
p, ls, cs, err := m.Status(n)
c.Assert(err, IsNil)
c.Assert(p, Equals, 0.0)
c.Assert(ls, Equals, math.MaxFloat64)
c.Assert(cs, Equals, 0.0)
time.Sleep(time.Second)
c.Assert(m.AddProgress(n, 100), IsTrue)
speedStatisticalInterval = time.Millisecond
defer func() {
speedStatisticalInterval = 5 * time.Minute
}()
time.Sleep(time.Millisecond)
m.UpdateProgressRemaining(n, 30)
c.Assert(m.AddProgress(n, 100, 100, 10*time.Second), IsTrue)

m.UpdateProgress(n, 30, 30, false)
p, ls, cs, err = m.Status(n)
c.Assert(err, IsNil)
c.Assert(p, Equals, 0.7)
Expand All @@ -56,12 +52,13 @@ func (s *testProgressSuite) Test(c *C) {
// 70/1s+ > 70
c.Assert(cs, Less, 70.0)
// there is no scheduling
time.Sleep(time.Millisecond)
m.UpdateProgressRemaining(n, 30)
for i := 0; i < 100; i++ {
m.UpdateProgress(n, 30, 30, false)
}
c.Assert(m.progesses[n].history.Len(), Equals, 61)
p, ls, cs, err = m.Status(n)
c.Assert(err, IsNil)
c.Assert(p, Equals, 0.7)
// the speed in previous `speedStatisticalInterval` is zero
c.Assert(ls, Equals, math.MaxFloat64)
c.Assert(cs, Equals, 0.0)

Expand All @@ -81,14 +78,14 @@ func (s *testProgressSuite) Test(c *C) {
func (s *testProgressSuite) TestAbnormal(c *C) {
n := "test"
m := NewManager()
c.Assert(m.AddProgress(n, 100), IsFalse)
c.Assert(m.AddProgress(n, 100, 100, 10*time.Second), IsFalse)
p, ls, cs, err := m.Status(n)
c.Assert(err, IsNil)
c.Assert(p, Equals, 0.0)
c.Assert(ls, Equals, math.MaxFloat64)
c.Assert(cs, Equals, 0.0)
// When offline a store, but there are operators moving region to it, e.g., merge operation
m.UpdateProgressRemaining(n, 110)
// When offline a store, but there are still many write operations
m.UpdateProgress(n, 110, 110, false)
p, ls, cs, err = m.Status(n)
c.Assert(err, IsNil)
c.Assert(p, Equals, 0.0)
Expand Down
Loading

0 comments on commit f297bf2

Please sign in to comment.