Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Apr 29, 2022
1 parent 45c0d23 commit 1beeaa3
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 5 deletions.
7 changes: 6 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,11 @@ func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error {
return nil
}

// IsPreapred return if the prepare checker is ready.
func (c *RaftCluster) IsPreapred() bool {
return c.coordinator.prepareChecker.isPrepared()
}

var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
Expand Down Expand Up @@ -737,7 +742,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if !c.coordinator.prepareChecker.isPrepared() || isNew {
if !c.IsPreapred() || isNew {
c.coordinator.prepareChecker.collect(region)
}

Expand Down
6 changes: 5 additions & 1 deletion server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
)

const (
runSchedulerCheckInterval = 3 * time.Second
checkSuspectRangesInterval = 100 * time.Millisecond
collectFactor = 0.8
collectTimeout = 5 * time.Minute
Expand All @@ -56,6 +55,8 @@ const (
PluginUnload = "PluginUnload"
)

var runSchedulerCheckInterval = 3 * time.Second

// coordinator is used to manage all schedulers and checkers to decide if the region needs to be scheduled.
type coordinator struct {
syncutil.RWMutex
Expand Down Expand Up @@ -310,6 +311,9 @@ func (c *coordinator) runUntilStop() {
}

func (c *coordinator) run() {
failpoint.Inject("runSchedulerCheckInterval", func() {
runSchedulerCheckInterval = 100 * time.Millisecond
})
ticker := time.NewTicker(runSchedulerCheckInterval)
defer ticker.Stop()
log.Info("coordinator starts to collect cluster information")
Expand Down
6 changes: 3 additions & 3 deletions server/cluster/unsafe_recovery_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func (s *testUnsafeRecoverSuite) TestPlanExecution(c *C) {
_, opt, _ := newTestScheduleConfig()
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
// Manually fill the coordinator up to allow calling on cluster.PauseOrResumeSchedulers().
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.getClusterID(), cluster, true))
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true))
recoveryController := newUnsafeRecoveryController(cluster)
recoveryController.stage = recovering
recoveryController.failedStores = map[uint64]string{
Expand Down Expand Up @@ -629,7 +629,7 @@ func (s *testUnsafeRecoverSuite) TestPlanExecution(c *C) {
func (s *testUnsafeRecoverSuite) TestRemoveFailedStores(c *C) {
_, opt, _ := newTestScheduleConfig()
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.getClusterID(), cluster, true))
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true))
cluster.coordinator.run()
stores := newTestStores(2, "5.3.0")
stores[1] = stores[1].Clone(core.SetLastHeartbeatTS(time.Now()))
Expand Down Expand Up @@ -661,7 +661,7 @@ func (s *testUnsafeRecoverSuite) TestRemoveFailedStores(c *C) {
func (s *testUnsafeRecoverSuite) TestSplitPaused(c *C) {
_, opt, _ := newTestScheduleConfig()
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.getClusterID(), cluster, true))
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true))
cluster.coordinator.run()
stores := newTestStores(2, "5.3.0")
stores[1] = stores[1].Clone(core.SetLastHeartbeatTS(time.Now()))
Expand Down
45 changes: 45 additions & 0 deletions tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,51 @@ func (s *regionSyncerTestSuite) TestFullSyncWithAddMember(c *C) {
c.Assert(loadRegions, HasLen, regionLen)
}

func (s *regionSyncerTestSuite) TestPrepareChecker(c *C) {
c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/runSchedulerCheckInterval", `return(true)`), IsNil)
defer failpoint.Disable("github.com/tikv/pd/server/cluster/runSchedulerCheckInterval")
cluster, err := tests.NewTestCluster(s.ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true })
defer cluster.Destroy()
c.Assert(err, IsNil)

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer := cluster.GetServer(cluster.GetLeader())
c.Assert(leaderServer.BootstrapCluster(), IsNil)
rc := leaderServer.GetServer().GetRaftCluster()
c.Assert(rc, NotNil)
regionLen := 110
regions := initRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}

// ensure flush to region storage
time.Sleep(3 * time.Second)
c.Assert(leaderServer.GetRaftCluster().IsPreapred(), IsTrue)

// join new PD
pd2, err := cluster.Join(s.ctx)
c.Assert(err, IsNil)
err = pd2.Run()
c.Assert(err, IsNil)
// waiting for synchronization to complete
time.Sleep(3 * time.Second)
err = cluster.ResignLeader()
c.Assert(err, IsNil)
c.Assert(cluster.WaitLeader(), Equals, "pd2")
leaderServer = cluster.GetServer(cluster.GetLeader())
rc = leaderServer.GetServer().GetRaftCluster()
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}
time.Sleep(time.Second)
c.Assert(rc.IsPreapred(), IsTrue)
}

func initRegions(regionLen int) []*core.RegionInfo {
allocator := &idAllocator{allocator: mockid.NewIDAllocator()}
regions := make([]*core.RegionInfo, 0, regionLen)
Expand Down

0 comments on commit 1beeaa3

Please sign in to comment.