From 7088c95c0ec02da9dd16f26fb1fce0b3a6e874d5 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 17 May 2022 17:10:39 +0800 Subject: [PATCH] pkg: make speed estimation more accurate (#4963) ref tikv/pd#4640 Signed-off-by: Ryan Leung --- metrics/grafana/pd.json | 153 ++++++++++++++++-------------- pkg/progress/progress.go | 69 ++++++++------ pkg/progress/progress_test.go | 25 +++-- server/cluster/cluster.go | 39 ++++---- server/cluster/cluster_test.go | 11 +-- server/cluster/metrics.go | 19 +++- tests/pdctl/config/config_test.go | 5 + tests/server/api/api_test.go | 143 ++++++++++++++++++++++++---- 8 files changed, 306 insertions(+), 158 deletions(-) diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 1fc9844bc32..3ed153d32f6 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -1062,99 +1062,112 @@ "type": "gauge" }, { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "fill": 1, - "fillGradient": 0, "gridPos": { "h": 6, - "w": 8, + "w": 4, "x": 8, - "y": 13 + "y": 14 }, - "hiddenSeries": false, - "id": 1451, - "legend": { - "alignAsTable": true, - "avg": false, - "current": true, - "hideEmpty": true, - "hideZero": true, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "nullPointMode": "null", + "id": 1111, "options": { - "alertThreshold": true + "showHeader": false }, - "percentage": false, "pluginVersion": "7.5.11", - "pointradius": 2, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, "targets": [ { - "exemplar": true, "expr": "pd_cluster_eta{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}", + "legendFormat": "{{action}}-{{store}}", "interval": "", - "legendFormat": "{{store}}-{{action}}", + "exemplar": true, "queryType": "randomWalk", - "refId": "A" + "refId": "A", + "instant": true } ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, "title": "Left seconds", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" + "type": "table", + "fieldConfig": { + "defaults": { + "custom": { + "align": "left", + "filterable": false + }, + "mappings": [ + { + "id": 1, + "type": 1, + "from": "", + "to": "", + "text": "N/A", + "value": "1.7976931348623157e+308" + } + ], + "unit": "s" + }, + "overrides": [] }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] + "transformations": [ + { + "id": "reduce", + "options": { + "reducers": [ + "last" + ] + } + } + ], + "timeFrom": null, + "timeShift": null + }, + { + "type": "table", + "title": "Current scaling speed", + "datasource": "${DS_TEST-CLUSTER}", + "gridPos": { + "x": 12, + "y": 13, + "w": 4, + "h": 6 }, - "yaxes": [ + "id": 1112, + "targets": [ { - "format": "s", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true + "expr": "pd_cluster_speed{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}", + "legendFormat": "{{action}}-{{store}}", + "interval": "", + "exemplar": true, + "refId": "A", + "queryType": "randomWalk", + "instant": true + } + ], + "options": { + "showHeader": false + }, + "fieldConfig": { + "defaults": { + "custom": { + "align": "left", + "filterable": false + }, + "mappings": [], + "unit": "MBs" }, + "overrides": [] + }, + "transformations": [ { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true + "id": "reduce", + "options": { + "reducers": [ + "last" + ] + } } ], - "yaxis": { - "align": false, - "alignLevel": null - } + "timeFrom": null, + "timeShift": null }, { "collapsed": true, diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index 918456c3984..cd5abf40863 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -15,6 +15,7 @@ package progress import ( + "container/list" "fmt" "math" "time" @@ -23,8 +24,8 @@ import ( "github.com/tikv/pd/pkg/syncutil" ) -// speedStatisticalInterval is the speed calculation interval -var speedStatisticalInterval = 5 * time.Minute +// speedStatisticalWindow is the speed calculation window +const speedStatisticalWindow = 10 * time.Minute // Manager is used to maintain the progresses we care about. type Manager struct { @@ -43,10 +44,14 @@ func NewManager() *Manager { type progressIndicator struct { total float64 remaining float64 - // we use a fixed interval to calculate the latest average speed. - lastTimeRemaining float64 + // We use a fixed interval's history to calculate the latest average speed. + history *list.List + // We use speedStatisticalWindow / updateInterval to get the windowLengthLimit. + // Assume that the windowLengthLimit is 3, the init value is 1. after update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4]. + // Then we update it again with 5, the window will become [2, 3, 4, 5]. + windowLengthLimit int + updateInterval time.Duration lastSpeed float64 - lastTime time.Time } // Reset resets the progress manager. @@ -58,23 +63,26 @@ func (m *Manager) Reset() { } // AddProgress adds a progress into manager if it doesn't exist. -func (m *Manager) AddProgress(progress string, total float64) (exist bool) { +func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration) (exist bool) { m.Lock() defer m.Unlock() + history := list.New() + history.PushBack(current) if _, exist = m.progesses[progress]; !exist { m.progesses[progress] = &progressIndicator{ total: total, remaining: total, - lastTimeRemaining: total, - lastTime: time.Now(), + history: history, + windowLengthLimit: int(speedStatisticalWindow / updateInterval), + updateInterval: updateInterval, } } return } -// UpdateProgressRemaining updates the remaining value of a progress if it exists. -func (m *Manager) UpdateProgressRemaining(progress string, remaining float64) { +// UpdateProgress updates the progress if it exists. +func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool) { m.Lock() defer m.Unlock() @@ -83,18 +91,26 @@ func (m *Manager) UpdateProgressRemaining(progress string, remaining float64) { if p.total < remaining { p.total = remaining } - if p.lastTimeRemaining < remaining { - p.lastTimeRemaining = remaining + + if p.history.Len() > p.windowLengthLimit { + p.history.Remove(p.history.Front()) + } + p.history.PushBack(current) + + // It means it just init and we haven't update the progress + if p.history.Len() <= 1 { + p.lastSpeed = 0 + } else if isInc { + // the value increases, e.g., [1, 2, 3] + p.lastSpeed = (p.history.Back().Value.(float64) - p.history.Front().Value.(float64)) / + (float64(p.history.Len()-1) * p.updateInterval.Seconds()) + } else { + // the value decreases, e.g., [3, 2, 1] + p.lastSpeed = (p.history.Front().Value.(float64) - p.history.Back().Value.(float64)) / + (float64(p.history.Len()-1) * p.updateInterval.Seconds()) } - // calculate the average speed for every `speedStatisticalInterval` - if time.Since(p.lastTime) >= speedStatisticalInterval { - if (p.lastTimeRemaining - remaining) <= 0 { - p.lastSpeed = 0 - } else { - p.lastSpeed = (p.lastTimeRemaining - remaining) / time.Since(p.lastTime).Seconds() - } - p.lastTime = time.Now() - p.lastTimeRemaining = remaining + if p.lastSpeed < 0 { + p.lastSpeed = 0 } } } @@ -147,13 +163,12 @@ func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed fl err = errs.ErrProgressWrongStatus.FastGenByArgs(fmt.Sprintf("the remaining: %v is larger than the total: %v", p.remaining, p.total)) return } - currentSpeed = 0 - // when the progress is newly added - if p.lastSpeed == 0 && time.Since(p.lastTime) < speedStatisticalInterval { - currentSpeed = (p.lastTimeRemaining - p.remaining) / time.Since(p.lastTime).Seconds() - } else { - currentSpeed = p.lastSpeed + currentSpeed = p.lastSpeed + // When the progress is newly added, there is no last speed. + if p.lastSpeed == 0 && p.history.Len() <= 1 { + currentSpeed = 0 } + leftSeconds = p.remaining / currentSpeed if math.IsNaN(leftSeconds) || math.IsInf(leftSeconds, 0) { leftSeconds = math.MaxFloat64 diff --git a/pkg/progress/progress_test.go b/pkg/progress/progress_test.go index c42e0388fe2..c4b030941f8 100644 --- a/pkg/progress/progress_test.go +++ b/pkg/progress/progress_test.go @@ -34,20 +34,16 @@ type testProgressSuite struct{} func (s *testProgressSuite) Test(c *C) { n := "test" m := NewManager() - c.Assert(m.AddProgress(n, 100), IsFalse) + c.Assert(m.AddProgress(n, 100, 100, 10*time.Second), IsFalse) p, ls, cs, err := m.Status(n) c.Assert(err, IsNil) c.Assert(p, Equals, 0.0) c.Assert(ls, Equals, math.MaxFloat64) c.Assert(cs, Equals, 0.0) time.Sleep(time.Second) - c.Assert(m.AddProgress(n, 100), IsTrue) - speedStatisticalInterval = time.Millisecond - defer func() { - speedStatisticalInterval = 5 * time.Minute - }() - time.Sleep(time.Millisecond) - m.UpdateProgressRemaining(n, 30) + c.Assert(m.AddProgress(n, 100, 100, 10*time.Second), IsTrue) + + m.UpdateProgress(n, 30, 30, false) p, ls, cs, err = m.Status(n) c.Assert(err, IsNil) c.Assert(p, Equals, 0.7) @@ -56,12 +52,13 @@ func (s *testProgressSuite) Test(c *C) { // 70/1s+ > 70 c.Assert(cs, Less, 70.0) // there is no scheduling - time.Sleep(time.Millisecond) - m.UpdateProgressRemaining(n, 30) + for i := 0; i < 100; i++ { + m.UpdateProgress(n, 30, 30, false) + } + c.Assert(m.progesses[n].history.Len(), Equals, 61) p, ls, cs, err = m.Status(n) c.Assert(err, IsNil) c.Assert(p, Equals, 0.7) - // the speed in previous `speedStatisticalInterval` is zero c.Assert(ls, Equals, math.MaxFloat64) c.Assert(cs, Equals, 0.0) @@ -81,14 +78,14 @@ func (s *testProgressSuite) Test(c *C) { func (s *testProgressSuite) TestAbnormal(c *C) { n := "test" m := NewManager() - c.Assert(m.AddProgress(n, 100), IsFalse) + c.Assert(m.AddProgress(n, 100, 100, 10*time.Second), IsFalse) p, ls, cs, err := m.Status(n) c.Assert(err, IsNil) c.Assert(p, Equals, 0.0) c.Assert(ls, Equals, math.MaxFloat64) c.Assert(cs, Equals, 0.0) - // When offline a store, but there are operators moving region to it, e.g., merge operation - m.UpdateProgressRemaining(n, 110) + // When offline a store, but there are still many write operations + m.UpdateProgress(n, 110, 110, false) p, ls, cs, err = m.Status(n) c.Assert(err, IsNil) c.Assert(p, Equals, 0.0) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 2cdc334304d..1c99cce8823 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -59,10 +59,6 @@ import ( ) var ( - // metricsCollectionJobInterval is the interval to run metrics collection job. - metricsCollectionJobInterval = 10 * time.Second - // nodeStateCheckJobInterval is the interval to run node state check job. - nodeStateCheckJobInterval = 10 * time.Second // DefaultMinResolvedTSPersistenceInterval is the default value of min resolved ts persistence interval. DefaultMinResolvedTSPersistenceInterval = 10 * time.Second ) @@ -71,8 +67,12 @@ var ( const regionLabelGCInterval = time.Hour const ( - clientTimeout = 3 * time.Second - defaultChangedRegionsLimit = 10000 + // nodeStateCheckJobInterval is the interval to run node state check job. + nodeStateCheckJobInterval = 10 * time.Second + // metricsCollectionJobInterval is the interval to run metrics collection job. + metricsCollectionJobInterval = 10 * time.Second + clientTimeout = 3 * time.Second + defaultChangedRegionsLimit = 10000 // persistLimitRetryTimes is used to reduce the probability of the persistent error // since the once the store is add or remove, we shouldn't return an error even if the store limit is failed to persist. persistLimitRetryTimes = 5 @@ -372,10 +372,11 @@ func (c *RaftCluster) runMetricsCollectionJob() { defer logutil.LogPanic() defer c.wg.Done() + ticker := time.NewTicker(metricsCollectionJobInterval) failpoint.Inject("highFrequencyClusterJobs", func() { - metricsCollectionJobInterval = time.Microsecond + ticker = time.NewTicker(time.Microsecond) }) - ticker := time.NewTicker(metricsCollectionJobInterval) + defer ticker.Stop() for { @@ -395,10 +396,10 @@ func (c *RaftCluster) runNodeStateCheckJob() { defer logutil.LogPanic() defer c.wg.Done() + ticker := time.NewTicker(nodeStateCheckJobInterval) failpoint.Inject("highFrequencyClusterJobs", func() { - nodeStateCheckJobInterval = time.Microsecond + ticker = time.NewTicker(2 * time.Second) }) - ticker := time.NewTicker(nodeStateCheckJobInterval) defer ticker.Stop() for { @@ -1171,8 +1172,9 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro zap.Bool("physically-destroyed", newStore.IsPhysicallyDestroyed())) err := c.putStoreLocked(newStore) if err == nil { - c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), float64(c.core.GetStoreRegionSize(storeID))) + regionSize := float64(c.core.GetStoreRegionSize(storeID)) c.resetProgress(storeID, store.GetAddress(), preparingAction) + c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval) // record the current store limit in memory c.prevStoreLimit[storeID] = map[storelimit.Type]float64{ storelimit.AddPeer: c.GetStoreLimitByType(storeID, storelimit.AddPeer), @@ -1451,7 +1453,7 @@ func (c *RaftCluster) checkStores() { remaining := threshold - regionSize // If we add multiple stores, the total will need to be changed. c.progressManager.UpdateProgressTotal(encodePreparingProgressKey(storeID), threshold) - c.updateProgress(storeID, store.GetAddress(), preparingAction, remaining) + c.updateProgress(storeID, store.GetAddress(), preparingAction, regionSize, remaining, true /* inc */) } } @@ -1465,7 +1467,7 @@ func (c *RaftCluster) checkStores() { offlineStore := store.GetMeta() id := offlineStore.GetId() regionSize := c.core.GetStoreRegionSize(id) - c.updateProgress(id, store.GetAddress(), removingAction, float64(regionSize)) + c.updateProgress(id, store.GetAddress(), removingAction, float64(regionSize), float64(regionSize), false /* dec */) regionCount := c.core.GetStoreRegionCount(id) // If the store is empty, it can be buried. if regionCount == 0 { @@ -1620,7 +1622,7 @@ func updateTopology(topology map[string]interface{}, sortedLabels []*metapb.Stor } } -func (c *RaftCluster) updateProgress(storeID uint64, storeAddress string, action string, remaining float64) { +func (c *RaftCluster) updateProgress(storeID uint64, storeAddress, action string, current, remaining float64, isInc bool) { storeLabel := strconv.FormatUint(storeID, 10) var progress string switch action { @@ -1630,16 +1632,17 @@ func (c *RaftCluster) updateProgress(storeID uint64, storeAddress string, action progress = encodePreparingProgressKey(storeID) } - if exist := c.progressManager.AddProgress(progress, remaining); !exist { + if exist := c.progressManager.AddProgress(progress, current, remaining, nodeStateCheckJobInterval); !exist { return } - c.progressManager.UpdateProgressRemaining(progress, remaining) - process, ls, _, err := c.progressManager.Status(progress) + c.progressManager.UpdateProgress(progress, current, remaining, isInc) + process, ls, cs, err := c.progressManager.Status(progress) if err != nil { log.Error("get progress status failed", zap.String("progress", progress), zap.Float64("remaining", remaining), errs.ZapError(err)) return } storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(process) + storesSpeedGauge.WithLabelValues(storeAddress, storeLabel, action).Set(cs) storesETAGauge.WithLabelValues(storeAddress, storeLabel, action).Set(ls) } @@ -1655,6 +1658,7 @@ func (c *RaftCluster) resetProgress(storeID uint64, storeAddress string, action if exist := c.progressManager.RemoveProgress(progress); exist { storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(0) + storesSpeedGauge.WithLabelValues(storeAddress, storeLabel, action).Set(0) storesETAGauge.WithLabelValues(storeAddress, storeLabel, action).Set(0) } } @@ -1771,6 +1775,7 @@ func (c *RaftCluster) resetHealthStatus() { func (c *RaftCluster) resetProgressIndicator() { c.progressManager.Reset() storesProgressGauge.Reset() + storesSpeedGauge.Reset() storesETAGauge.Reset() } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 4130bc2cfa2..a7062e66885 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -466,7 +466,6 @@ func (s *testClusterInfoSuite) TestRemovingProcess(c *C) { cluster.DropCacheRegion(region.GetID()) i++ } - time.Sleep(time.Second) cluster.checkStores() p, l, cs, err = cluster.progressManager.Status(process) c.Assert(err, IsNil) @@ -474,12 +473,10 @@ func (s *testClusterInfoSuite) TestRemovingProcess(c *C) { // process = 5 / 20 = 0.25 c.Assert(p, Equals, 0.25) // Each region is 100MB, we use more than 1s to move 5 region. - // speed = 5 * 100MB / 1s+ ~= 400MB/s+ - c.Assert(cs, Greater, 400.0) - c.Assert(cs, Less, 500.0) - // left second = 15 * 100MB / 400MB/s+ ~= 3s+ - c.Assert(l, Greater, 3.0) - c.Assert(l, Less, 4.0) + // speed = 5 * 100MB / 20s = 25MB/s + c.Assert(cs, Equals, 25.0) + // left second = 15 * 100MB / 25s = 60s + c.Assert(l, Equals, 60.0) } func (s *testClusterInfoSuite) TestDeleteStoreUpdatesClusterVersion(c *C) { diff --git a/server/cluster/metrics.go b/server/cluster/metrics.go index 8b70ec30cf8..8afb441d65f 100644 --- a/server/cluster/metrics.go +++ b/server/cluster/metrics.go @@ -96,13 +96,13 @@ var ( Help: "The current progress of corresponding action", }, []string{"address", "store", "action"}) - storeSyncConfigEvent = prometheus.NewCounterVec( - prometheus.CounterOpts{ + storesSpeedGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ Namespace: "pd", Subsystem: "cluster", - Name: "store_sync", - Help: "The state of store sync config", - }, []string{"address", "state"}) + Name: "speed", + Help: "The current speed of corresponding action", + }, []string{"address", "store", "action"}) storesETAGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -111,6 +111,14 @@ var ( Name: "eta", Help: "The ETA of corresponding action", }, []string{"address", "store", "action"}) + + storeSyncConfigEvent = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "cluster", + Name: "store_sync", + Help: "The state of store sync config", + }, []string{"address", "state"}) ) func init() { @@ -124,6 +132,7 @@ func init() { prometheus.MustRegister(regionListGauge) prometheus.MustRegister(bucketEventCounter) prometheus.MustRegister(storesProgressGauge) + prometheus.MustRegister(storesSpeedGauge) prometheus.MustRegister(storesETAGauge) prometheus.MustRegister(storeSyncConfigEvent) } diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 66220d0338b..297cc538606 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -198,6 +198,11 @@ func (s *configTestSuite) TestConfig(c *C) { c.Assert(err, IsNil) c.Assert(svr.GetScheduleConfig().MaxStorePreparingTime, Equals, typeutil.NewDuration(10*time.Minute)) + args = []string{"-u", pdAddr, "config", "set", "max-store-preparing-time", "0s"} + _, err = pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + c.Assert(svr.GetScheduleConfig().MaxStorePreparingTime, Equals, typeutil.NewDuration(0)) + // test config read and write testItems := []testItem{ {"leader-schedule-limit", uint64(64), func(scheduleConfig *config.ScheduleConfig) interface{} { diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index ae7c8ba7d6a..b4a39aa8501 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -502,7 +502,7 @@ var _ = Suite(&testProgressSuite{}) type testProgressSuite struct{} -func (s *testProgressSuite) TestProgress(c *C) { +func (s *testProgressSuite) TestRemovingProgress(c *C) { c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/hasPrepared", `return(true)`), IsNil) c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`), IsNil) ctx, cancel := context.WithCancel(context.Background()) @@ -579,7 +579,7 @@ func (s *testProgressSuite) TestProgress(c *C) { pdctl.MustPutRegion(c, cluster, 1000, 1, []byte("a"), []byte("b"), core.SetApproximateSize(20)) pdctl.MustPutRegion(c, cluster, 1001, 2, []byte("c"), []byte("d"), core.SetApproximateSize(10)) - time.Sleep(time.Second) + time.Sleep(2 * time.Second) output = sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=removing", http.MethodGet, http.StatusOK) c.Assert(json.Unmarshal(output, &p), IsNil) c.Assert(p.Action, Equals, "removing") @@ -587,28 +587,135 @@ func (s *testProgressSuite) TestProgress(c *C) { // store 2: (30-10)/(30+40) ~= 0.28 // average progress ~= (0.36+0.28)/2 = 0.32 c.Assert(fmt.Sprintf("%.2f", p.Progress), Equals, "0.32") - // store 1: 40/1s+ < 40 - // store 2: 20/1s+ < 20 - // average speed ~= (20+40)/2/1s+ < 30 - c.Assert(p.CurrentSpeed, Less, 30.0) - c.Assert(p.CurrentSpeed, Greater, 25.0) - // store 1: (20+50)/40 ~= 1.75s+ - // store 2: (10+40)/20 ~= 2.5s+ - // average time ~= (1.75+2.5)/2 = 2.125s+ - c.Assert(p.LeftSeconds, Greater, 2.125) - c.Assert(p.LeftSeconds, Less, 2.5) + // store 1: 40/10s = 4 + // store 2: 20/10s = 2 + // average speed = (2+4)/2 = 33 + c.Assert(p.CurrentSpeed, Equals, 3.0) + // store 1: (20+50)/4 = 17.5s + // store 2: (10+40)/2 = 25s + // average time = (17.5+25)/2 = 21.25s + c.Assert(p.LeftSeconds, Equals, 21.25) output = sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?id=2", http.MethodGet, http.StatusOK) c.Assert(json.Unmarshal(output, &p), IsNil) c.Assert(p.Action, Equals, "removing") // store 2: (30-10)/(30+40) ~= 0.285 c.Assert(fmt.Sprintf("%.2f", p.Progress), Equals, "0.29") - // store 2: 20/1s+ < 20 - c.Assert(p.CurrentSpeed, Less, 20.0) - c.Assert(p.CurrentSpeed, Greater, 15.0) - // store 2: (10+40)/20 ~= 2.5s+ - c.Assert(p.LeftSeconds, Greater, 2.5) - c.Assert(p.LeftSeconds, Less, 3.0) + // store 2: 20/10s = 2 + c.Assert(p.CurrentSpeed, Equals, 2.0) + // store 2: (10+40)/2 = 25s + c.Assert(p.LeftSeconds, Equals, 25.0) + + c.Assert(failpoint.Disable("github.com/tikv/pd/server/cluster/hasPrepared"), IsNil) + c.Assert(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"), IsNil) +} + +func (s *testProgressSuite) TestPreparingProgress(c *C) { + c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/hasPrepared", `return(true)`), IsNil) + c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`), IsNil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, serverName string) { + conf.Replication.MaxReplicas = 1 + }) + c.Assert(err, IsNil) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + + cluster.WaitLeader() + leader := cluster.GetServer(cluster.GetLeader()) + grpcPDClient := testutil.MustNewGrpcClient(c, leader.GetAddr()) + clusterID := leader.GetClusterID() + req := &pdpb.BootstrapRequest{ + Header: testutil.NewRequestHeader(clusterID), + Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"}, + Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, + } + _, err = grpcPDClient.Bootstrap(context.Background(), req) + c.Assert(err, IsNil) + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Preparing, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 5, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Preparing, + LastHeartbeat: time.Now().UnixNano(), + }, + } + + for _, store := range stores { + pdctl.MustPutStore(c, leader.GetServer(), store) + } + for i := 0; i < 100; i++ { + pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10)) + } + // no store preparing + output := sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound) + c.Assert(strings.Contains((string(output)), "no progress found for the action"), IsTrue) + output = sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?id=4", http.MethodGet, http.StatusNotFound) + c.Assert(strings.Contains((string(output)), "no progress found for the given store ID"), IsTrue) + + time.Sleep(2 * time.Second) + // size is not changed. + output = sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK) + var p api.Progress + c.Assert(json.Unmarshal(output, &p), IsNil) + c.Assert(p.Action, Equals, "preparing") + c.Assert(p.Progress, Equals, 0.0) + c.Assert(p.CurrentSpeed, Equals, 0.0) + c.Assert(p.LeftSeconds, Equals, math.MaxFloat64) + + // update size + pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10)) + pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40)) + time.Sleep(2 * time.Second) + output = sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK) + c.Assert(json.Unmarshal(output, &p), IsNil) + c.Assert(p.Action, Equals, "preparing") + // store 4: 10/(210*0.9) ~= 0.05 + // store 5: 40/(210*0.9) ~= 0.21 + // average progress ~= (0.05+0.21)/2 = 0.13 + c.Assert(fmt.Sprintf("%.2f", p.Progress), Equals, "0.13") + // store 4: 10/10s = 1 + // store 5: 40/10s = 4 + // average speed = (1+4)/2 = 2.5 + c.Assert(p.CurrentSpeed, Equals, 2.5) + // store 4: 179/1 ~= 179 + // store 5: 149/4 ~= 37.25 + // average time ~= (179+37.25)/2 = 108.125 + c.Assert(p.LeftSeconds, Equals, 108.125) + + output = sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?id=4", http.MethodGet, http.StatusOK) + c.Assert(json.Unmarshal(output, &p), IsNil) + c.Assert(p.Action, Equals, "preparing") + c.Assert(fmt.Sprintf("%.2f", p.Progress), Equals, "0.05") + c.Assert(p.CurrentSpeed, Equals, 1.0) + c.Assert(p.LeftSeconds, Equals, 179.0) c.Assert(failpoint.Disable("github.com/tikv/pd/server/cluster/hasPrepared"), IsNil) c.Assert(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"), IsNil)