From 9c2b83aae17466f0ec53e2501c8f486d17e87f60 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 5 Dec 2024 15:51:51 +0800 Subject: [PATCH] cleanup Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/cluster.go | 9 +- pkg/schedule/coordinator.go | 39 ++++-- pkg/schedule/core/prepare_checker.go | 13 +- server/cluster/cluster_test.go | 115 ++++++++++++++++-- tests/integrations/mcs/scheduling/api_test.go | 2 + .../mcs/scheduling/config_test.go | 3 - .../mcs/scheduling/server_test.go | 8 +- tests/scheduling_cluster.go | 2 +- tests/server/cluster/cluster_test.go | 4 +- tools/pd-ctl/tests/store/store_test.go | 6 - 10 files changed, 158 insertions(+), 43 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 1da9a0a6543..33e2be96f3b 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -8,6 +8,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/schedulingpb" @@ -66,6 +67,7 @@ type Cluster struct { const ( regionLabelGCInterval = time.Hour requestTimeout = 3 * time.Second + collectWaitTime = time.Minute // heartbeat relative const heartbeatTaskRunner = "heartbeat-task-runner" @@ -489,7 +491,12 @@ func (c *Cluster) runUpdateStoreStats() { func (c *Cluster) runCoordinator() { defer logutil.LogPanic() defer c.wg.Done() - c.coordinator.RunUntilStop() + // force wait for 1 minute to make prepare checker won't be directly skipped + runCollectWaitTime := collectWaitTime + failpoint.Inject("changeRunCollectWaitTime", func() { + runCollectWaitTime = 1 * time.Second + }) + c.coordinator.RunUntilStop(runCollectWaitTime) } func (c *Cluster) runMetricsCollectionJob() { diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index bdb7fcb464d..09d613f1058 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -43,7 +43,8 @@ import ( ) const ( - runPrepareCheckerInterval = 3 * time.Second + runSchedulerCheckInterval = 3 * time.Second + collectTimeout = 5 * time.Minute maxLoadConfigRetries = 10 // pushOperatorTickInterval is the interval try to push the operator. pushOperatorTickInterval = 500 * time.Millisecond @@ -207,10 +208,7 @@ func (c *Coordinator) runPrepareChecker() { defer logutil.LogPanic() defer c.wg.Done() - ticker := time.NewTicker(runPrepareCheckerInterval) - failpoint.Inject("changeCoordinatorTicker", func() { - ticker.Reset(100 * time.Millisecond) - }) + ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() for { select { @@ -219,7 +217,7 @@ func (c *Coordinator) runPrepareChecker() { case <-ticker.C: if !c.prepareChecker.IsPrepared() { if c.prepareChecker.Check(c.cluster.GetBasicCluster()) { - log.Info("prepare checker is finished") + log.Info("prepare checker is ready") } } } @@ -227,8 +225,8 @@ func (c *Coordinator) runPrepareChecker() { } // RunUntilStop runs the coordinator until receiving the stop signal. -func (c *Coordinator) RunUntilStop() { - c.Run() +func (c *Coordinator) RunUntilStop(collectWaitTime ...time.Duration) { + c.Run(collectWaitTime...) <-c.ctx.Done() log.Info("coordinator is stopping") c.GetSchedulersController().Wait() @@ -237,7 +235,25 @@ func (c *Coordinator) RunUntilStop() { } // Run starts coordinator. -func (c *Coordinator) Run() { +func (c *Coordinator) Run(collectWaitTime ...time.Duration) { + ticker := time.NewTicker(runSchedulerCheckInterval) + failpoint.Inject("changeCoordinatorTicker", func() { + ticker.Reset(100 * time.Millisecond) + }) + defer ticker.Stop() + log.Info("coordinator starts to collect cluster information") + for { + if c.ShouldRun(collectWaitTime...) { + log.Info("coordinator has finished cluster information preparation") + break + } + select { + case <-ticker.C: + case <-c.ctx.Done(): + log.Info("coordinator stops running") + return + } + } log.Info("coordinator starts to run schedulers") c.InitSchedulers(true) @@ -552,6 +568,11 @@ func ResetHotSpotMetrics() { schedulers.HotPendingSum.Reset() } +// ShouldRun returns true if the coordinator should run. +func (c *Coordinator) ShouldRun(collectWaitTime ...time.Duration) bool { + return c.prepareChecker.Check(c.cluster.GetBasicCluster(), collectWaitTime...) +} + // GetSchedulersController returns the schedulers controller. func (c *Coordinator) GetSchedulersController() *schedulers.Controller { return c.schedulers diff --git a/pkg/schedule/core/prepare_checker.go b/pkg/schedule/core/prepare_checker.go index ffc2a507175..8a758a593f2 100644 --- a/pkg/schedule/core/prepare_checker.go +++ b/pkg/schedule/core/prepare_checker.go @@ -40,19 +40,22 @@ func NewPrepareChecker() *PrepareChecker { } // Check checks if the coordinator has finished cluster information preparation. -func (checker *PrepareChecker) Check(c *core.BasicCluster) bool { +func (checker *PrepareChecker) Check(c *core.BasicCluster, collectWaitTime ...time.Duration) bool { checker.Lock() defer checker.Unlock() + if checker.prepared { + return true + } if time.Since(checker.start) > collectTimeout { checker.prepared = true return true } - notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt() - totalRegionsCnt := c.GetTotalRegionCount() - if totalRegionsCnt == 0 { + if len(collectWaitTime) > 0 && time.Since(checker.start) < collectWaitTime[0] { return false } + notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt() + totalRegionsCnt := c.GetTotalRegionCount() // The number of active regions should be more than total region of all stores * core.CollectFactor if float64(totalRegionsCnt)*core.CollectFactor > float64(notLoadedFromRegionsCnt) { return false @@ -63,7 +66,7 @@ func (checker *PrepareChecker) Check(c *core.BasicCluster) bool { } storeID := store.GetID() // It is used to avoid sudden scheduling when scheduling service is just started. - if float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) { + if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) { return false } if !c.IsStorePrepared(storeID) { diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 9b856ebc390..b2a7548eed9 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2389,12 +2389,12 @@ func (c *testCluster) updateLeaderCount(storeID uint64, leaderCount int) error { return c.setStore(newStore) } -func (c *testCluster) addLeaderStore(storeID uint64) error { +func (c *testCluster) addLeaderStore(storeID uint64, leaderCount int) error { stats := &pdpb.StoreStats{} newStore := core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetStoreStats(stats), - core.SetLeaderCount(1), - core.SetLeaderSize(10), + core.SetLeaderCount(leaderCount), + core.SetLeaderSize(int64(leaderCount)*10), core.SetLastHeartbeatTS(time.Now()), ) @@ -3029,7 +3029,6 @@ func TestPeerState(t *testing.T) { tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) defer cleanup() - co.GetPrepareChecker().SetPrepared() // Transfer peer from store 4 to store 1. re.NoError(tc.addRegionStore(1, 10)) @@ -3069,12 +3068,104 @@ func TestPeerState(t *testing.T) { waitNoResponse(re, stream) } +func TestShouldRun(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + tc.RaftCluster.coordinator = co + defer cleanup() + + re.NoError(tc.addLeaderStore(1, 5)) + re.NoError(tc.addLeaderStore(2, 2)) + re.NoError(tc.addLeaderStore(3, 0)) + re.NoError(tc.addLeaderStore(4, 0)) + re.NoError(tc.LoadRegion(1, 1, 2, 3)) + re.NoError(tc.LoadRegion(2, 1, 2, 3)) + re.NoError(tc.LoadRegion(3, 1, 2, 3)) + re.NoError(tc.LoadRegion(4, 1, 2, 3)) + re.NoError(tc.LoadRegion(5, 1, 2, 3)) + re.NoError(tc.LoadRegion(6, 2, 1, 4)) + re.NoError(tc.LoadRegion(7, 2, 1, 4)) + re.False(co.ShouldRun()) + re.Equal(2, tc.GetStoreRegionCount(4)) + + testCases := []struct { + regionID uint64 + ShouldRun bool + }{ + {1, false}, + {2, false}, + {3, false}, + {4, false}, + {5, false}, + // store4 needs Collect two region + {6, false}, + {7, true}, + } + + for _, testCase := range testCases { + r := tc.GetRegion(testCase.regionID) + nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) + re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), nr)) + re.Equal(testCase.ShouldRun, co.ShouldRun()) + } + nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} + newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) + re.Error(tc.processRegionHeartbeat(core.ContextTODO(), newRegion)) + re.Equal(7, tc.GetClusterNotFromStorageRegionsCnt()) +} + +func TestShouldRunWithNonLeaderRegions(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + tc.RaftCluster.coordinator = co + defer cleanup() + + re.NoError(tc.addLeaderStore(1, 10)) + re.NoError(tc.addLeaderStore(2, 0)) + re.NoError(tc.addLeaderStore(3, 0)) + for i := range 10 { + re.NoError(tc.LoadRegion(uint64(i+1), 1, 2, 3)) + } + re.False(co.ShouldRun()) + re.Equal(10, tc.GetStoreRegionCount(1)) + + testCases := []struct { + regionID uint64 + ShouldRun bool + }{ + {1, false}, + {2, false}, + {3, false}, + {4, false}, + {5, false}, + {6, false}, + {7, false}, + {8, false}, + {9, true}, + } + + for _, testCase := range testCases { + r := tc.GetRegion(testCase.regionID) + nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) + re.NoError(tc.processRegionHeartbeat(core.ContextTODO(), nr)) + re.Equal(testCase.ShouldRun, co.ShouldRun()) + } + nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}} + newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) + re.Error(tc.processRegionHeartbeat(core.ContextTODO(), newRegion)) + re.Equal(9, tc.GetClusterNotFromStorageRegionsCnt()) + + // Now, after server is prepared, there exist some regions with no leader. + re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId()) +} + func TestAddScheduler(t *testing.T) { re := require.New(t) tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) defer cleanup() - co.GetPrepareChecker().SetPrepared() controller := co.GetSchedulersController() re.Len(controller.GetSchedulerNames(), len(sc.DefaultSchedulers)) re.NoError(controller.RemoveScheduler(types.BalanceLeaderScheduler.String())) @@ -3086,9 +3177,9 @@ func TestAddScheduler(t *testing.T) { stream := mockhbstream.NewHeartbeatStream() // Add stores 1,2,3 - re.NoError(tc.addLeaderStore(1)) - re.NoError(tc.addLeaderStore(2)) - re.NoError(tc.addLeaderStore(3)) + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + re.NoError(tc.addLeaderStore(3, 1)) // Add regions 1 with leader in store 1 and followers in stores 2,3 re.NoError(tc.addLeaderRegion(1, 1, 2, 3)) // Add regions 2 with leader in store 2 and followers in stores 1,3 @@ -3152,8 +3243,8 @@ func TestPersistScheduler(t *testing.T) { defer cleanup() defaultCount := len(sc.DefaultSchedulers) // Add stores 1,2 - re.NoError(tc.addLeaderStore(1)) - re.NoError(tc.addLeaderStore(2)) + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) controller := co.GetSchedulersController() re.Len(controller.GetSchedulerNames(), defaultCount) @@ -3268,8 +3359,8 @@ func TestRemoveScheduler(t *testing.T) { defer cleanup() // Add stores 1,2 - re.NoError(tc.addLeaderStore(1)) - re.NoError(tc.addLeaderStore(2)) + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) defaultCount := len(sc.DefaultSchedulers) controller := co.GetSchedulersController() re.Len(controller.GetSchedulerNames(), defaultCount) diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 0a80aea1c01..f3e7f235018 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -41,6 +41,7 @@ func TestAPI(t *testing.T) { func (suite *apiTestSuite) SetupSuite() { re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime", `return(true)`)) suite.env = tests.NewSchedulingTestEnvironment(suite.T()) } @@ -48,6 +49,7 @@ func (suite *apiTestSuite) TearDownSuite() { suite.env.Cleanup() re := suite.Require() re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime")) } func (suite *apiTestSuite) TestGetCheckerByName() { diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index ba4d9e40eef..6a41ad0823e 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -21,7 +21,6 @@ import ( "time" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/log" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/cache" @@ -37,7 +36,6 @@ import ( "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/server/api" - "go.uber.org/zap" ) type configTestSuite struct { @@ -135,7 +133,6 @@ func (suite *configTestSuite) TestConfigWatch() { // Manually trigger the config persistence in the PD API server side. func persistConfig(re *require.Assertions, pdLeaderServer *tests.TestServer) { err := pdLeaderServer.GetPersistOptions().Persist(pdLeaderServer.GetServer().GetStorage()) - log.Info("persistConfig", zap.Reflect("opts", pdLeaderServer.GetPersistOptions())) re.NoError(err) } diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index fc07436b92f..a1ebf1fbe19 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -61,6 +61,7 @@ func (suite *serverTestSuite) SetupSuite() { var err error re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) @@ -81,6 +82,7 @@ func (suite *serverTestSuite) TearDownSuite() { suite.cluster.Destroy() suite.cancel() re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) } @@ -498,13 +500,9 @@ func (suite *serverTestSuite) TestStoreLimit() { tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) re.NoError(err) defer tc.Destroy() - leaderServer := suite.pdLeader.GetServer() tc.WaitForPrimaryServing(re) - testutil.Eventually(re, func() bool { - return leaderServer.GetRaftCluster().IsServiceIndependent(constant.SchedulingServiceName) - }) oc := tc.GetPrimaryServer().GetCluster().GetCoordinator().GetOperatorController() - + leaderServer := suite.pdLeader.GetServer() conf := leaderServer.GetReplicationConfig().Clone() conf.MaxReplicas = 1 leaderServer.SetReplicationConfig(*conf) diff --git a/tests/scheduling_cluster.go b/tests/scheduling_cluster.go index 23a59d564b8..3f7c39eb81c 100644 --- a/tests/scheduling_cluster.go +++ b/tests/scheduling_cluster.go @@ -107,7 +107,7 @@ func (tc *TestSchedulingCluster) WaitForPrimaryServing(re *require.Assertions) * var primary *scheduling.Server testutil.Eventually(re, func() bool { for _, server := range tc.servers { - if server.IsServing() && server.GetCoordinator().AreSchedulersInitialized() { + if server.IsServing() { primary = server return true } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 2b88e18b0dd..326f2cb9978 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -842,6 +842,9 @@ func TestSetScheduleOpt(t *testing.T) { tc.WaitLeader() leaderServer := tc.GetLeaderServer() + grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + clusterID := leaderServer.GetClusterID() + bootstrapCluster(re, clusterID, grpcPDClient) cfg := config.NewConfig() cfg.Schedule.TolerantSizeRatio = 5 @@ -1831,7 +1834,6 @@ func TestPatrolRegionConfigChange(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestCluster(ctx, 1) defer tc.Destroy() re.NoError(err) diff --git a/tools/pd-ctl/tests/store/store_test.go b/tools/pd-ctl/tests/store/store_test.go index 36f7bd6a4f6..7fdc876ea3d 100644 --- a/tools/pd-ctl/tests/store/store_test.go +++ b/tools/pd-ctl/tests/store/store_test.go @@ -31,7 +31,6 @@ import ( "github.com/tikv/pd/pkg/response" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/utils/grpcutil" - "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" pdTests "github.com/tikv/pd/tests" ctl "github.com/tikv/pd/tools/pd-ctl/pdctl" @@ -55,11 +54,6 @@ func TestStoreLimitV2(t *testing.T) { re.NoError(leaderServer.BootstrapCluster()) defer cluster.Destroy() - // TODO: fix https://github.com/tikv/pd/issues/7464 - testutil.Eventually(re, func() bool { - return leaderServer.GetRaftCluster().GetCoordinator().AreSchedulersInitialized() - }) - // store command args := []string{"-u", pdAddr, "config", "set", "store-limit-version", "v2"} output, err := tests.ExecuteCommand(cmd, args...)