From d6557df699e2ab25a4b820343acb60d8a94cab0b Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 16 May 2022 12:10:37 +0800 Subject: [PATCH 1/5] * : return error if get progress status failed (#4930) ref tikv/pd#4640 Signed-off-by: Ryan Leung Co-authored-by: Ti Chi Robot --- errors.toml | 10 +++++++++ pkg/errs/errno.go | 6 +++++ pkg/progress/progress.go | 15 +++++++++++-- pkg/progress/progress_test.go | 34 ++++++++++++++++++++++++++--- server/api/store.go | 12 +++++----- server/cluster/cluster.go | 33 +++++++++++++++++++--------- server/cluster/cluster_test.go | 6 +++-- server/core/region.go | 2 ++ server/handler.go | 4 ++-- server/schedulers/balance_region.go | 4 +--- 10 files changed, 98 insertions(+), 28 deletions(-) diff --git a/errors.toml b/errors.toml index 42bf009bafb..94589772634 100644 --- a/errors.toml +++ b/errors.toml @@ -501,6 +501,16 @@ error = ''' failed to lookup plugin function ''' +["PD:progress:ErrProgressNotFound"] +error = ''' +no progress found for %s +''' + +["PD:progress:ErrProgressWrongStatus"] +error = ''' +progress status is wrong +''' + ["PD:prometheus:ErrPrometheusCreateClient"] error = ''' create client error diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index f89ee049943..f11da06626f 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -332,3 +332,9 @@ var ( ErrUnsafeRecoveryIsRunning = errors.Normalize("unsafe recovery is running", errors.RFCCodeText("PD:unsaferecovery:ErrUnsafeRecoveryIsRunning")) ErrUnsafeRecoveryInvalidInput = errors.Normalize("invalid input %s", errors.RFCCodeText("PD:unsaferecovery:ErrUnsafeRecoveryInvalidInput")) ) + +// progress errors +var ( + ErrProgressWrongStatus = errors.Normalize("progress status is wrong", errors.RFCCodeText("PD:progress:ErrProgressWrongStatus")) + ErrProgressNotFound = errors.Normalize("no progress found for %s", errors.RFCCodeText("PD:progress:ErrProgressNotFound")) +) diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index bcdb5a5e270..918456c3984 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -15,9 +15,11 @@ package progress import ( + "fmt" "math" "time" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/syncutil" ) @@ -81,6 +83,9 @@ func (m *Manager) UpdateProgressRemaining(progress string, remaining float64) { if p.total < remaining { p.total = remaining } + if p.lastTimeRemaining < remaining { + p.lastTimeRemaining = remaining + } // calculate the average speed for every `speedStatisticalInterval` if time.Since(p.lastTime) >= speedStatisticalInterval { if (p.lastTimeRemaining - remaining) <= 0 { @@ -131,12 +136,17 @@ func (m *Manager) GetProgresses(filter func(p string) bool) []string { } // Status returns the current progress status of a give name. -func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed float64) { +func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed float64, err error) { m.RLock() defer m.RUnlock() if p, exist := m.progesses[progress]; exist { process = 1 - p.remaining/p.total + if process < 0 { + process = 0 + err = errs.ErrProgressWrongStatus.FastGenByArgs(fmt.Sprintf("the remaining: %v is larger than the total: %v", p.remaining, p.total)) + return + } currentSpeed = 0 // when the progress is newly added if p.lastSpeed == 0 && time.Since(p.lastTime) < speedStatisticalInterval { @@ -150,5 +160,6 @@ func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed fl } return } - return 0, 0, 0 + err = errs.ErrProgressNotFound.FastGenByArgs(fmt.Sprintf("the progress: %s", progress)) + return } diff --git a/pkg/progress/progress_test.go b/pkg/progress/progress_test.go index 546d9b56bb1..c42e0388fe2 100644 --- a/pkg/progress/progress_test.go +++ b/pkg/progress/progress_test.go @@ -35,7 +35,8 @@ func (s *testProgressSuite) Test(c *C) { n := "test" m := NewManager() c.Assert(m.AddProgress(n, 100), IsFalse) - p, ls, cs := m.Status(n) + p, ls, cs, err := m.Status(n) + c.Assert(err, IsNil) c.Assert(p, Equals, 0.0) c.Assert(ls, Equals, math.MaxFloat64) c.Assert(cs, Equals, 0.0) @@ -47,7 +48,8 @@ func (s *testProgressSuite) Test(c *C) { }() time.Sleep(time.Millisecond) m.UpdateProgressRemaining(n, 30) - p, ls, cs = m.Status(n) + p, ls, cs, err = m.Status(n) + c.Assert(err, IsNil) c.Assert(p, Equals, 0.7) // 30/(70/1s+) > 30/70 c.Assert(ls, Greater, 30.0/70.0) @@ -56,7 +58,8 @@ func (s *testProgressSuite) Test(c *C) { // there is no scheduling time.Sleep(time.Millisecond) m.UpdateProgressRemaining(n, 30) - p, ls, cs = m.Status(n) + p, ls, cs, err = m.Status(n) + c.Assert(err, IsNil) c.Assert(p, Equals, 0.7) // the speed in previous `speedStatisticalInterval` is zero c.Assert(ls, Equals, math.MaxFloat64) @@ -74,3 +77,28 @@ func (s *testProgressSuite) Test(c *C) { c.Assert(m.RemoveProgress(n), IsTrue) c.Assert(m.RemoveProgress(n), IsFalse) } + +func (s *testProgressSuite) TestAbnormal(c *C) { + n := "test" + m := NewManager() + c.Assert(m.AddProgress(n, 100), IsFalse) + p, ls, cs, err := m.Status(n) + c.Assert(err, IsNil) + c.Assert(p, Equals, 0.0) + c.Assert(ls, Equals, math.MaxFloat64) + c.Assert(cs, Equals, 0.0) + // When offline a store, but there are operators moving region to it, e.g., merge operation + m.UpdateProgressRemaining(n, 110) + p, ls, cs, err = m.Status(n) + c.Assert(err, IsNil) + c.Assert(p, Equals, 0.0) + c.Assert(ls, Equals, math.MaxFloat64) + c.Assert(cs, Equals, 0.0) + // It usually won't happens + m.UpdateProgressTotal(n, 10) + p, ls, cs, err = m.Status(n) + c.Assert(err, NotNil) + c.Assert(p, Equals, 0.0) + c.Assert(ls, Equals, 0.0) + c.Assert(cs, Equals, 0.0) +} diff --git a/server/api/store.go b/server/api/store.go index 8d6740ddf4a..27aa7b59655 100644 --- a/server/api/store.go +++ b/server/api/store.go @@ -633,9 +633,9 @@ func (h *storesHandler) GetStoresProgress(w http.ResponseWriter, r *http.Request return } - action, progress, leftSeconds, currentSpeed := h.Handler.GetProgressByID(v) - if progress == 0 && leftSeconds == 0 && currentSpeed == 0 { - h.rd.JSON(w, http.StatusNotFound, "no progress found for the given store ID") + action, progress, leftSeconds, currentSpeed, err := h.Handler.GetProgressByID(v) + if err != nil { + h.rd.JSON(w, http.StatusNotFound, err.Error()) return } sp := &Progress{ @@ -650,9 +650,9 @@ func (h *storesHandler) GetStoresProgress(w http.ResponseWriter, r *http.Request return } if v := r.URL.Query().Get("action"); v != "" { - progress, leftSeconds, currentSpeed := h.Handler.GetProgressByAction(v) - if progress == 0 && leftSeconds == 0 && currentSpeed == 0 { - h.rd.JSON(w, http.StatusNotFound, "no progress found for the action") + progress, leftSeconds, currentSpeed, err := h.Handler.GetProgressByAction(v) + if err != nil { + h.rd.JSON(w, http.StatusNotFound, err.Error()) return } sp := &Progress{ diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 5bd1d81f16f..cc7a3efee0f 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1432,7 +1432,8 @@ func (c *RaftCluster) checkStores() { threshold := c.getThreshold(stores, store) log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold)) regionSize := float64(store.GetRegionSize()) - if store.GetUptime() > c.opt.GetMaxStorePreparingTime() || regionSize >= threshold { + if store.GetUptime() > c.opt.GetMaxStorePreparingTime() || regionSize >= threshold || + c.GetRegionCount() < core.InitClusterRegionThreshold { if err := c.ReadyToServe(storeID); err != nil { log.Error("change store to serving failed", zap.Stringer("store", store.GetMeta()), @@ -1457,8 +1458,9 @@ func (c *RaftCluster) checkStores() { id := offlineStore.GetId() regionSize := c.core.GetStoreRegionSize(id) c.updateProgress(id, store.GetAddress(), removingAction, float64(regionSize)) + regionCount := c.core.GetStoreRegionCount(id) // If the store is empty, it can be buried. - if regionSize == 0 { + if regionCount == 0 { if err := c.BuryStore(id, false); err != nil { log.Error("bury store failed", zap.Stringer("store", offlineStore), @@ -1624,7 +1626,11 @@ func (c *RaftCluster) updateProgress(storeID uint64, storeAddress string, action return } c.progressManager.UpdateProgressRemaining(progress, remaining) - process, ls, _ := c.progressManager.Status(progress) + process, ls, _, err := c.progressManager.Status(progress) + if err != nil { + log.Error("get progress status failed", zap.String("progress", progress), zap.Float64("remaining", remaining), errs.ZapError(err)) + return + } storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(process) storesETAGauge.WithLabelValues(storeAddress, storeLabel, action).Set(ls) } @@ -2153,14 +2159,17 @@ func (c *RaftCluster) GetEtcdClient() *clientv3.Client { } // GetProgressByID returns the progress details for a given store ID. -func (c *RaftCluster) GetProgressByID(storeID string) (action string, process, ls, cs float64) { +func (c *RaftCluster) GetProgressByID(storeID string) (action string, process, ls, cs float64, err error) { filter := func(progress string) bool { s := strings.Split(progress, "-") return len(s) == 2 && s[1] == storeID } progress := c.progressManager.GetProgresses(filter) if len(progress) != 0 { - process, ls, cs = c.progressManager.Status(progress[0]) + process, ls, cs, err = c.progressManager.Status(progress[0]) + if err != nil { + return + } if strings.HasPrefix(progress[0], removingAction) { action = removingAction } else if strings.HasPrefix(progress[0], preparingAction) { @@ -2168,24 +2177,28 @@ func (c *RaftCluster) GetProgressByID(storeID string) (action string, process, l } return } - return "", 0, 0, 0 + return "", 0, 0, 0, errs.ErrProgressNotFound.FastGenByArgs(fmt.Sprintf("the given store ID: %s", storeID)) } // GetProgressByAction returns the progress details for a given action. -func (c *RaftCluster) GetProgressByAction(action string) (process, ls, cs float64) { +func (c *RaftCluster) GetProgressByAction(action string) (process, ls, cs float64, err error) { filter := func(progress string) bool { return strings.HasPrefix(progress, action) } progresses := c.progressManager.GetProgresses(filter) if len(progresses) == 0 { - return 0, 0, 0 + return 0, 0, 0, errs.ErrProgressNotFound.FastGenByArgs(fmt.Sprintf("the action: %s", action)) } + var p, l, s float64 for _, progress := range progresses { - p, l, c := c.progressManager.Status(progress) + p, l, s, err = c.progressManager.Status(progress) + if err != nil { + return + } process += p ls += l - cs += c + cs += s } num := float64(len(progresses)) process /= num diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 6b40538542e..4130bc2cfa2 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -452,7 +452,8 @@ func (s *testClusterInfoSuite) TestRemovingProcess(c *C) { cluster.checkStores() process := "removing-1" // no region moving - p, l, cs := cluster.progressManager.Status(process) + p, l, cs, err := cluster.progressManager.Status(process) + c.Assert(err, IsNil) c.Assert(p, Equals, 0.0) c.Assert(l, Equals, math.MaxFloat64) c.Assert(cs, Equals, 0.0) @@ -467,7 +468,8 @@ func (s *testClusterInfoSuite) TestRemovingProcess(c *C) { } time.Sleep(time.Second) cluster.checkStores() - p, l, cs = cluster.progressManager.Status(process) + p, l, cs, err = cluster.progressManager.Status(process) + c.Assert(err, IsNil) // In above we delete 5 region from store 1, the total count of region in store 1 is 20. // process = 5 / 20 = 0.25 c.Assert(p, Equals, 0.25) diff --git a/server/core/region.go b/server/core/region.go index e9edc235d12..7e303e68de6 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -120,6 +120,8 @@ const ( // Only statistics within this interval limit are valid. statsReportMinInterval = 3 // 3s statsReportMaxInterval = 5 * 60 // 5min + // InitClusterRegionThreshold is a threshold which represent a new cluster. + InitClusterRegionThreshold = 50 ) // RegionFromHeartbeat constructs a Region from region heartbeat. diff --git a/server/handler.go b/server/handler.go index 88d164e6482..fec75c5fb69 100644 --- a/server/handler.go +++ b/server/handler.go @@ -938,12 +938,12 @@ func (h *Handler) GetStoreLimitScene(limitType storelimit.Type) *storelimit.Scen } // GetProgressByID returns the progress details for a given store ID. -func (h *Handler) GetProgressByID(storeID string) (action string, p, ls, cs float64) { +func (h *Handler) GetProgressByID(storeID string) (action string, p, ls, cs float64, err error) { return h.s.GetRaftCluster().GetProgressByID(storeID) } // GetProgressByAction returns the progress details for a given action. -func (h *Handler) GetProgressByAction(action string) (p, ls, cs float64) { +func (h *Handler) GetProgressByAction(action string) (p, ls, cs float64, err error) { return h.s.GetRaftCluster().GetProgressByAction(action) } diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index b1a55f6916c..f8d4679acbc 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -62,8 +62,6 @@ const ( BalanceRegionName = "balance-region-scheduler" // BalanceRegionType is balance region scheduler type. BalanceRegionType = "balance-region" - // BalanceEmptyRegionThreshold is a threshold which allow balance the empty region if the region number is less than this threshold. - balanceEmptyRegionThreshold = 50 ) type balanceRegionSchedulerConfig struct { @@ -265,7 +263,7 @@ func (s *balanceRegionScheduler) transferPeer(plan *balancePlan) *operator.Opera // isEmptyRegionAllowBalance checks if a region is an empty region and can be balanced. func isEmptyRegionAllowBalance(cluster schedule.Cluster, region *core.RegionInfo) bool { - return region.GetApproximateSize() > core.EmptyRegionApproximateSize || cluster.GetRegionCount() < balanceEmptyRegionThreshold + return region.GetApproximateSize() > core.EmptyRegionApproximateSize || cluster.GetRegionCount() < core.InitClusterRegionThreshold } // isAllowBalanceEmptyRegion returns a function that checks if a region is an empty region and can be balanced. From d1270542850aaa5288704031f86e73b78e76b7b6 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 16 May 2022 12:20:37 +0800 Subject: [PATCH 2/5] cluster: reset stores cache before loading cluster info (#4942) close tikv/pd#4941 Signed-off-by: Ryan Leung Co-authored-by: ShuNing Co-authored-by: Ti Chi Robot --- server/cluster/cluster.go | 1 + server/core/basic_cluster.go | 7 ++++ tests/server/cluster/cluster_test.go | 57 ++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index cc7a3efee0f..794dc351813 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -339,6 +339,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { return nil, nil } + c.core.ResetStores() start := time.Now() if err := c.storage.LoadStores(c.core.PutStore); err != nil { return nil, err diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index f83a7809c23..071847a8b89 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -338,6 +338,13 @@ func (bc *BasicCluster) PutStore(store *StoreInfo) { bc.Stores.SetStore(store) } +// ResetStores resets the store cache. +func (bc *BasicCluster) ResetStores() { + bc.Lock() + defer bc.Unlock() + bc.Stores = NewStoresInfo() +} + // DeleteStore deletes a store. func (bc *BasicCluster) DeleteStore(store *StoreInfo) { bc.Lock() diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index f931f0577ca..bbb10c525aa 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -1343,3 +1343,60 @@ func (s *clusterTestSuite) TestMinResolvedTS(c *C) { ts = rc.GetMinResolvedTS() c.Assert(ts, Equals, store5TS) } + +// See https://github.com/tikv/pd/issues/4941 +func (s *clusterTestSuite) TestTransferLeaderBack(c *C) { + tc, err := tests.NewTestCluster(s.ctx, 2) + defer tc.Destroy() + c.Assert(err, IsNil) + err = tc.RunInitialServers() + c.Assert(err, IsNil) + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + svr := leaderServer.GetServer() + rc := cluster.NewRaftCluster(s.ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) + rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster()) + storage := rc.GetStorage() + meta := &metapb.Cluster{Id: 123} + c.Assert(storage.SaveMeta(meta), IsNil) + n := 4 + stores := make([]*metapb.Store, 0, n) + for i := 1; i <= n; i++ { + store := &metapb.Store{Id: uint64(i), State: metapb.StoreState_Up} + stores = append(stores, store) + } + + for _, store := range stores { + c.Assert(storage.SaveStore(store), IsNil) + } + rc, err = rc.LoadClusterInfo() + c.Assert(err, IsNil) + c.Assert(rc, NotNil) + // offline a store + c.Assert(rc.RemoveStore(1, false), IsNil) + c.Assert(rc.GetStore(1).GetState(), Equals, metapb.StoreState_Offline) + + // transfer PD leader to another PD + tc.ResignLeader() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + svr1 := leaderServer.GetServer() + rc1 := svr1.GetRaftCluster() + c.Assert(err, IsNil) + c.Assert(rc1, NotNil) + // tombstone a store, and remove its record + c.Assert(rc1.BuryStore(1, false), IsNil) + c.Assert(rc1.RemoveTombStoneRecords(), IsNil) + + // transfer PD leader back to the previous PD + tc.ResignLeader() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + svr = leaderServer.GetServer() + rc = svr.GetRaftCluster() + c.Assert(rc, NotNil) + + // check store count + c.Assert(rc.GetMetaCluster(), DeepEquals, meta) + c.Assert(rc.GetStoreCount(), Equals, 3) +} From eed19e01ad410bbc7c418c5a519530d53b95570b Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 16 May 2022 14:28:37 +0800 Subject: [PATCH 3/5] statistics: make aot more stable (#4951) ref tikv/pd#4949 Signed-off-by: lhy1024 Co-authored-by: Ti Chi Robot --- server/cluster/cluster.go | 5 +++ server/statistics/hot_peer_cache_test.go | 50 ++++++++++++++++++++++++ server/statistics/util.go | 2 +- tests/pdctl/hot/hot_test.go | 26 +++++++++--- 4 files changed, 77 insertions(+), 6 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 794dc351813..516ffa2eb08 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -615,6 +615,11 @@ func (c *RaftCluster) GetSuspectRegions() []uint64 { return c.coordinator.checkers.GetSuspectRegions() } +// GetHotStat gets hot stat for test. +func (c *RaftCluster) GetHotStat() *statistics.HotStat { + return c.hotStat +} + // RemoveSuspectRegion removes region from suspect list. func (c *RaftCluster) RemoveSuspectRegion(id uint64) { c.coordinator.checkers.RemoveSuspectRegion(id) diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index 4698cc4769a..347e2a423d8 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -22,6 +22,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/movingaverage" "github.com/tikv/pd/server/core" ) @@ -603,3 +604,52 @@ func BenchmarkCheckRegionFlow(b *testing.B) { } } } + +type testMovingAverageCase struct { + report []float64 + expect []float64 +} + +func checkMovingAverage(c *C, testCase *testMovingAverageCase) { + interval := 1 * time.Second + tm := movingaverage.NewTimeMedian(DefaultAotSize, DefaultWriteMfSize, interval) + var results []float64 + for _, data := range testCase.report { + tm.Add(data, interval) + results = append(results, tm.Get()) + } + c.Assert(results, DeepEquals, testCase.expect) +} + +// +func (t *testHotPeerCache) TestUnstableData(c *C) { + cases := []*testMovingAverageCase{ + { + report: []float64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, + expect: []float64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, + }, + { + report: []float64{0, 0, 0, 0, 0, 1, 0, 0, 0, 0}, + expect: []float64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + { + report: []float64{0, 0, 0, 0, 0, 1, 1, 0, 0, 0}, + expect: []float64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + { + report: []float64{0, 0, 0, 0, 1, 1, 1, 0, 0, 0}, + expect: []float64{0, 0, 0, 0, 0, 0, 1, 1, 1, 0}, + }, + { + report: []float64{0, 0, 0, 0, 0, 1, 0, 1, 0, 0}, + expect: []float64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + { + report: []float64{0, 0, 0, 0, 0, 1, 0, 1, 0, 1}, + expect: []float64{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, + }, + } + for i := range cases { + checkMovingAverage(c, cases[i]) + } +} diff --git a/server/statistics/util.go b/server/statistics/util.go index 05e7db9e0a8..8c3a89d9c8e 100644 --- a/server/statistics/util.go +++ b/server/statistics/util.go @@ -24,7 +24,7 @@ const ( // RegionHeartBeatReportInterval is the heartbeat report interval of a region. RegionHeartBeatReportInterval = 60 // DefaultAotSize is default size of average over time. - DefaultAotSize = 2 + DefaultAotSize = 1 // DefaultWriteMfSize is default size of write median filter. DefaultWriteMfSize = 5 // DefaultReadMfSize is default size of read median filter. diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index dc11efced6b..de40564c2d9 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -119,7 +119,7 @@ func (s *hotTestSuite) TestHot(c *C) { _, err = pdctl.ExecuteCommand(cmd, args...) c.Assert(err, IsNil) - hotStoreID := uint64(1) + hotStoreID := store1.Id count := 0 testHot := func(hotRegionID, hotStoreID uint64, hotType string) { args = []string{"-u", pdAddr, "hot", hotType} @@ -141,16 +141,32 @@ func (s *hotTestSuite) TestHot(c *C) { regionIDCounter++ switch hotType { case "read": - pdctl.MustPutRegion(c, cluster, hotRegionID, hotStoreID, []byte("b"), []byte("c"), core.SetReadBytes(1000000000), core.SetReportInterval(reportInterval)) + loads := []float64{ + statistics.RegionReadBytes: float64(1000000000 * reportInterval), + statistics.RegionReadKeys: float64(1000000000 * reportInterval), + statistics.RegionReadQuery: float64(1000000000 * reportInterval), + statistics.RegionWriteBytes: 0, + statistics.RegionWriteKeys: 0, + statistics.RegionWriteQuery: 0, + } + leader := &metapb.Peer{ + Id: 100 + regionIDCounter, + StoreId: hotStoreID, + } + peerInfo := core.NewPeerInfo(leader, loads, reportInterval) + region := core.NewRegionInfo(&metapb.Region{ + Id: hotRegionID, + }, leader) + rc.GetHotStat().CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) time.Sleep(5000 * time.Millisecond) - if reportInterval >= statistics.RegionHeartBeatReportInterval { + if reportInterval >= statistics.ReadReportInterval { count++ } testHot(hotRegionID, hotStoreID, "read") case "write": - pdctl.MustPutRegion(c, cluster, hotRegionID, hotStoreID, []byte("c"), []byte("d"), core.SetWrittenBytes(1000000000), core.SetReportInterval(reportInterval)) + pdctl.MustPutRegion(c, cluster, hotRegionID, hotStoreID, []byte("c"), []byte("d"), core.SetWrittenBytes(1000000000*reportInterval), core.SetReportInterval(reportInterval)) time.Sleep(5000 * time.Millisecond) - if reportInterval >= statistics.RegionHeartBeatReportInterval { + if reportInterval >= statistics.WriteReportInterval { count++ } testHot(hotRegionID, hotStoreID, "write") From f1d026ed9aecf03c6daac0c522019694bce9c58e Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Mon, 16 May 2022 17:50:37 +0800 Subject: [PATCH 4/5] cluster: Lower sync config log level (#4929) close tikv/pd#4923 Signed-off-by: bufferflies <1045931706@qq.com> --- server/cluster/cluster.go | 4 +++- server/cluster/metrics.go | 9 +++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 516ffa2eb08..2cdc334304d 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -319,9 +319,11 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bo // it will try next store if the current store is failed. address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress()) if err := manager.ObserveConfig(address); err != nil { - log.Warn("sync store config failed, it will try next store", zap.Error(err)) + storeSyncConfigEvent.WithLabelValues(address, "fail").Inc() + log.Debug("sync store config failed, it will try next store", zap.Error(err)) continue } + storeSyncConfigEvent.WithLabelValues(address, "succ").Inc() // it will only try one store. return true } diff --git a/server/cluster/metrics.go b/server/cluster/metrics.go index 09e204b9c42..8b70ec30cf8 100644 --- a/server/cluster/metrics.go +++ b/server/cluster/metrics.go @@ -96,6 +96,14 @@ var ( Help: "The current progress of corresponding action", }, []string{"address", "store", "action"}) + storeSyncConfigEvent = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "cluster", + Name: "store_sync", + Help: "The state of store sync config", + }, []string{"address", "state"}) + storesETAGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", @@ -117,4 +125,5 @@ func init() { prometheus.MustRegister(bucketEventCounter) prometheus.MustRegister(storesProgressGauge) prometheus.MustRegister(storesETAGauge) + prometheus.MustRegister(storeSyncConfigEvent) } From 9dad950a9c3f95879d26ce3699124a7d47fa3759 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 16 May 2022 18:28:39 +0800 Subject: [PATCH 5/5] scheduler: refactor calcProgressiveRank in hot-scheduler (#4950) ref tikv/pd#4949 Signed-off-by: lhy1024 Co-authored-by: Ti Chi Robot --- server/schedulers/hot_region.go | 121 ++++++++++++++----------- server/schedulers/hot_region_config.go | 2 +- 2 files changed, 67 insertions(+), 56 deletions(-) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 6a15dda3ac1..8158370a84a 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -324,6 +324,38 @@ func (h *hotScheduler) balanceHotWriteRegions(cluster schedule.Cluster) []*opera return nil } +type solution struct { + srcStore *statistics.StoreLoadDetail + srcPeerStat *statistics.HotPeerStat + region *core.RegionInfo + dstStore *statistics.StoreLoadDetail + + // progressiveRank measures the contribution for balance. + // The smaller the rank, the better this solution is. + // If rank < 0, this solution makes thing better. + progressiveRank int64 +} + +// getExtremeLoad returns the min load of the src store and the max load of the dst store. +func (s *solution) getExtremeLoad(dim int) (src float64, dst float64) { + return s.srcStore.LoadPred.Min().Loads[dim], s.dstStore.LoadPred.Max().Loads[dim] +} + +// getCurrentLoad returns the current load of the src store and the dst store. +func (s *solution) getCurrentLoad(dim int) (src float64, dst float64) { + return s.srcStore.LoadPred.Current.Loads[dim], s.dstStore.LoadPred.Current.Loads[dim] +} + +// getPendingLoad returns the pending load of the src store and the dst store. +func (s *solution) getPendingLoad(dim int) (src float64, dst float64) { + return s.srcStore.LoadPred.Pending().Loads[dim], s.dstStore.LoadPred.Pending().Loads[dim] +} + +// getPeerRate returns the load of the peer. +func (s *solution) getPeerRate(rw statistics.RWType, dim int) float64 { + return s.srcPeerStat.GetLoad(statistics.GetRegionStatKind(rw, dim)) +} + type balanceSolver struct { schedule.Cluster sche *hotScheduler @@ -344,18 +376,9 @@ type balanceSolver struct { // they may be byte(0), key(1), query(2), and always less than dimLen firstPriority int secondPriority int -} -type solution struct { - srcStore *statistics.StoreLoadDetail - srcPeerStat *statistics.HotPeerStat - region *core.RegionInfo - dstStore *statistics.StoreLoadDetail - - // progressiveRank measures the contribution for balance. - // The smaller the rank, the better this solution is. - // If rank < 0, this solution makes thing better. - progressiveRank int64 + greatDecRatio float64 + minorDecRatio float64 } func (bs *balanceSolver) init() { @@ -392,6 +415,7 @@ func (bs *balanceSolver) init() { } bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities()) + bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio() } func (bs *balanceSolver) isSelectedDim(dim int) bool { @@ -746,44 +770,23 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist // calcProgressiveRank calculates `bs.cur.progressiveRank`. // See the comments of `solution.progressiveRank` for more about progressive rank. func (bs *balanceSolver) calcProgressiveRank() { - src := bs.cur.srcStore - dst := bs.cur.dstStore - srcLd := src.LoadPred.Min() - dstLd := dst.LoadPred.Max() bs.cur.progressiveRank = 0 - peer := bs.cur.srcPeerStat if toResourceType(bs.rwTy, bs.opTy) == writeLeader { - if !bs.isTolerance(src, dst, bs.firstPriority) { - return - } - srcRate := srcLd.Loads[bs.firstPriority] - dstRate := dstLd.Loads[bs.firstPriority] - peerRate := peer.GetLoad(statistics.GetRegionStatKind(bs.rwTy, bs.firstPriority)) - if srcRate-peerRate >= dstRate+peerRate { + // For write leader, only compare the first priority. + if bs.isBetterForWriteLeader() { bs.cur.progressiveRank = -1 } } else { - firstPriorityDimHot, firstPriorityDecRatio, secondPriorityDimHot, secondPriorityDecRatio := bs.getHotDecRatioByPriorities(srcLd, dstLd, peer) - greatDecRatio, minorDecRatio := bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorGreatDecRatio() switch { - case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio: + case bs.isBetter(bs.firstPriority) && bs.isBetter(bs.secondPriority): // If belong to the case, two dim will be more balanced, the best choice. - if !bs.isTolerance(src, dst, bs.firstPriority) || !bs.isTolerance(src, dst, bs.secondPriority) { - return - } bs.cur.progressiveRank = -3 - case firstPriorityDecRatio <= minorDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio: + case bs.isNotWorsened(bs.firstPriority) && bs.isBetter(bs.secondPriority): // If belong to the case, first priority dim will be not worsened, second priority dim will be more balanced. - if !bs.isTolerance(src, dst, bs.secondPriority) { - return - } bs.cur.progressiveRank = -2 - case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio: + case bs.isBetter(bs.firstPriority): // If belong to the case, first priority dim will be more balanced, ignore the second priority dim. - if !bs.isTolerance(src, dst, bs.firstPriority) { - return - } bs.cur.progressiveRank = -1 } } @@ -791,20 +794,18 @@ func (bs *balanceSolver) calcProgressiveRank() { // isTolerance checks source store and target store by checking the difference value with pendingAmpFactor * pendingPeer. // This will make the hot region scheduling slow even serializely running when each 2 store's pending influence is close. -func (bs *balanceSolver) isTolerance(src, dst *statistics.StoreLoadDetail, dim int) bool { - srcRate := src.LoadPred.Current.Loads[dim] - dstRate := dst.LoadPred.Current.Loads[dim] +func (bs *balanceSolver) isTolerance(dim int) bool { + srcRate, dstRate := bs.cur.getCurrentLoad(dim) if srcRate <= dstRate { return false } + srcPending, dstPending := bs.cur.getPendingLoad(dim) pendingAmp := (1 + pendingAmpFactor*srcRate/(srcRate-dstRate)) - srcPending := src.LoadPred.Pending().Loads[dim] - dstPending := dst.LoadPred.Pending().Loads[dim] - hotPendingStatus.WithLabelValues(bs.rwTy.String(), strconv.FormatUint(src.GetID(), 10), strconv.FormatUint(dst.GetID(), 10)).Set(pendingAmp) + hotPendingStatus.WithLabelValues(bs.rwTy.String(), strconv.FormatUint(bs.cur.srcStore.GetID(), 10), strconv.FormatUint(bs.cur.dstStore.GetID(), 10)).Set(pendingAmp) return srcRate-pendingAmp*srcPending > dstRate+pendingAmp*dstPending } -func (bs *balanceSolver) getHotDecRatioByPriorities(srcLd, dstLd *statistics.StoreLoad, peer *statistics.HotPeerStat) (bool, float64, bool, float64) { +func (bs *balanceSolver) getHotDecRatioByPriorities(dim int) (bool, float64) { // we use DecRatio(Decline Ratio) to expect that the dst store's rate should still be less // than the src store's rate after scheduling one peer. getSrcDecRate := func(a, b float64) float64 { @@ -813,17 +814,27 @@ func (bs *balanceSolver) getHotDecRatioByPriorities(srcLd, dstLd *statistics.Sto } return a - b } - checkHot := func(dim int) (bool, float64) { - srcRate := srcLd.Loads[dim] - dstRate := dstLd.Loads[dim] - peerRate := peer.GetLoad(statistics.GetRegionStatKind(bs.rwTy, dim)) - decRatio := (dstRate + peerRate) / getSrcDecRate(srcRate, peerRate) - isHot := peerRate >= bs.getMinRate(dim) - return isHot, decRatio - } - firstHot, firstDecRatio := checkHot(bs.firstPriority) - secondHot, secondDecRatio := checkHot(bs.secondPriority) - return firstHot, firstDecRatio, secondHot, secondDecRatio + srcRate, dstRate := bs.cur.getExtremeLoad(dim) + peerRate := bs.cur.getPeerRate(bs.rwTy, dim) + isHot := peerRate >= bs.getMinRate(dim) + decRatio := (dstRate + peerRate) / getSrcDecRate(srcRate, peerRate) + return isHot, decRatio +} + +func (bs *balanceSolver) isBetterForWriteLeader() bool { + srcRate, dstRate := bs.cur.getExtremeLoad(bs.firstPriority) + peerRate := bs.cur.getPeerRate(bs.rwTy, bs.firstPriority) + return srcRate-peerRate >= dstRate+peerRate && bs.isTolerance(bs.firstPriority) +} + +func (bs *balanceSolver) isBetter(dim int) bool { + isHot, decRatio := bs.getHotDecRatioByPriorities(dim) + return isHot && decRatio <= bs.greatDecRatio && bs.isTolerance(dim) +} + +func (bs *balanceSolver) isNotWorsened(dim int) bool { + isHot, decRatio := bs.getHotDecRatioByPriorities(dim) + return !isHot || decRatio <= bs.minorDecRatio } func (bs *balanceSolver) getMinRate(dim int) float64 { diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index 31e65d2e8c6..53228cf6a79 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -219,7 +219,7 @@ func (conf *hotRegionSchedulerConfig) GetGreatDecRatio() float64 { return conf.GreatDecRatio } -func (conf *hotRegionSchedulerConfig) GetMinorGreatDecRatio() float64 { +func (conf *hotRegionSchedulerConfig) GetMinorDecRatio() float64 { conf.RLock() defer conf.RUnlock() return conf.MinorDecRatio