diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a96a40676384..61cbf8489dcb 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -671,6 +671,10 @@ func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error { return nil } +func (c *RaftCluster) IsPreapred() bool { + return c.coordinator.prepareChecker.isPrepared() +} + var regionGuide = core.GenerateRegionGuideFunc(true) // processRegionHeartbeat updates the region information. @@ -737,7 +741,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { regionEventCounter.WithLabelValues("update_cache").Inc() } - if !c.coordinator.prepareChecker.isPrepared() || isNew { + if !c.IsPreapred() || isNew { c.coordinator.prepareChecker.collect(region) } diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index 4e7426132842..7b9ea908213f 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -42,7 +42,6 @@ import ( ) const ( - runSchedulerCheckInterval = 3 * time.Second checkSuspectRangesInterval = 100 * time.Millisecond collectFactor = 0.8 collectTimeout = 5 * time.Minute @@ -56,6 +55,8 @@ const ( PluginUnload = "PluginUnload" ) +var runSchedulerCheckInterval = 3 * time.Second + // coordinator is used to manage all schedulers and checkers to decide if the region needs to be scheduled. type coordinator struct { syncutil.RWMutex @@ -310,6 +311,9 @@ func (c *coordinator) runUntilStop() { } func (c *coordinator) run() { + failpoint.Inject("runSchedulerCheckInterval", func() { + runSchedulerCheckInterval = 100 * time.Millisecond + }) ticker := time.NewTicker(runSchedulerCheckInterval) defer ticker.Stop() log.Info("coordinator starts to collect cluster information") diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index f7bb3d776c5a..259e8e9debe9 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -210,6 +210,51 @@ func (s *regionSyncerTestSuite) TestFullSyncWithAddMember(c *C) { c.Assert(loadRegions, HasLen, regionLen) } +func (s *regionSyncerTestSuite) TestPrepareChecker(c *C) { + c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/runSchedulerCheckInterval", `return(true)`), IsNil) + defer failpoint.Disable("github.com/tikv/pd/server/cluster/runSchedulerCheckInterval") + cluster, err := tests.NewTestCluster(s.ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true }) + defer cluster.Destroy() + c.Assert(err, IsNil) + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + leaderServer := cluster.GetServer(cluster.GetLeader()) + c.Assert(leaderServer.BootstrapCluster(), IsNil) + rc := leaderServer.GetServer().GetRaftCluster() + c.Assert(rc, NotNil) + regionLen := 110 + regions := initRegions(regionLen) + for _, region := range regions { + err = rc.HandleRegionHeartbeat(region) + c.Assert(err, IsNil) + } + + // ensure flush to region storage + time.Sleep(3 * time.Second) + c.Assert(leaderServer.GetRaftCluster().IsPreapred(), IsTrue) + + // join new PD + pd2, err := cluster.Join(s.ctx) + c.Assert(err, IsNil) + err = pd2.Run() + c.Assert(err, IsNil) + // waiting for synchronization to complete + time.Sleep(3 * time.Second) + err = cluster.ResignLeader() + c.Assert(err, IsNil) + c.Assert(cluster.WaitLeader(), Equals, "pd2") + leaderServer = cluster.GetServer(cluster.GetLeader()) + rc = leaderServer.GetServer().GetRaftCluster() + for _, region := range regions { + err = rc.HandleRegionHeartbeat(region) + c.Assert(err, IsNil) + } + time.Sleep(time.Second) + c.Assert(rc.IsPreapred(), IsTrue) +} + func initRegions(regionLen int) []*core.RegionInfo { allocator := &idAllocator{allocator: mockid.NewIDAllocator()} regions := make([]*core.RegionInfo, 0, regionLen)