Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Apr 29, 2022
1 parent 45c0d23 commit f8399aa
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 2 deletions.
6 changes: 5 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,10 @@ func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error {
return nil
}

func (c *RaftCluster) IsPreapred() bool {
return c.coordinator.prepareChecker.isPrepared()
}

var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
Expand Down Expand Up @@ -737,7 +741,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if !c.coordinator.prepareChecker.isPrepared() || isNew {
if !c.IsPreapred() || isNew {
c.coordinator.prepareChecker.collect(region)
}

Expand Down
6 changes: 5 additions & 1 deletion server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
)

const (
runSchedulerCheckInterval = 3 * time.Second
checkSuspectRangesInterval = 100 * time.Millisecond
collectFactor = 0.8
collectTimeout = 5 * time.Minute
Expand All @@ -56,6 +55,8 @@ const (
PluginUnload = "PluginUnload"
)

var runSchedulerCheckInterval = 3 * time.Second

// coordinator is used to manage all schedulers and checkers to decide if the region needs to be scheduled.
type coordinator struct {
syncutil.RWMutex
Expand Down Expand Up @@ -310,6 +311,9 @@ func (c *coordinator) runUntilStop() {
}

func (c *coordinator) run() {
failpoint.Inject("runSchedulerCheckInterval", func() {
runSchedulerCheckInterval = 100 * time.Millisecond
})
ticker := time.NewTicker(runSchedulerCheckInterval)
defer ticker.Stop()
log.Info("coordinator starts to collect cluster information")
Expand Down
45 changes: 45 additions & 0 deletions tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,51 @@ func (s *regionSyncerTestSuite) TestFullSyncWithAddMember(c *C) {
c.Assert(loadRegions, HasLen, regionLen)
}

func (s *regionSyncerTestSuite) TestPrepareChecker(c *C) {
c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/runSchedulerCheckInterval", `return(true)`), IsNil)
defer failpoint.Disable("github.com/tikv/pd/server/cluster/runSchedulerCheckInterval")
cluster, err := tests.NewTestCluster(s.ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true })
defer cluster.Destroy()
c.Assert(err, IsNil)

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer := cluster.GetServer(cluster.GetLeader())
c.Assert(leaderServer.BootstrapCluster(), IsNil)
rc := leaderServer.GetServer().GetRaftCluster()
c.Assert(rc, NotNil)
regionLen := 110
regions := initRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}

// ensure flush to region storage
time.Sleep(3 * time.Second)
c.Assert(leaderServer.GetRaftCluster().IsPreapred(), IsTrue)

// join new PD
pd2, err := cluster.Join(s.ctx)
c.Assert(err, IsNil)
err = pd2.Run()
c.Assert(err, IsNil)
// waiting for synchronization to complete
time.Sleep(3 * time.Second)
err = cluster.ResignLeader()
c.Assert(err, IsNil)
c.Assert(cluster.WaitLeader(), Equals, "pd2")
leaderServer = cluster.GetServer(cluster.GetLeader())
rc = leaderServer.GetServer().GetRaftCluster()
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}
time.Sleep(time.Second)
c.Assert(rc.IsPreapred(), IsTrue)
}

func initRegions(regionLen int) []*core.RegionInfo {
allocator := &idAllocator{allocator: mockid.NewIDAllocator()}
regions := make([]*core.RegionInfo, 0, regionLen)
Expand Down

0 comments on commit f8399aa

Please sign in to comment.