Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix scheduling can not immediately start after transfer leader #4875

Merged
merged 8 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,11 @@ func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error {
return nil
}

// IsPrepared return true if the prepare checker is ready.
func (c *RaftCluster) IsPrepared() bool {
return c.coordinator.prepareChecker.isPrepared()
}

var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
Expand Down Expand Up @@ -813,7 +818,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if isNew {
if !c.IsPrepared() && isNew {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we check the condition in the function collect directly?

c.coordinator.prepareChecker.collect(region)
}

Expand Down Expand Up @@ -867,12 +872,6 @@ func (c *RaftCluster) updateStoreStatusLocked(id uint64) {
c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize)
}

func (c *RaftCluster) getClusterID() uint64 {
c.RLock()
defer c.RUnlock()
return c.meta.GetId()
}

func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error {
if c.storage != nil {
if err := c.storage.SaveMeta(meta); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ func (c *coordinator) runUntilStop() {

func (c *coordinator) run() {
ticker := time.NewTicker(runSchedulerCheckInterval)
failpoint.Inject("changeCoordinatorTicker", func() {
ticker = time.NewTicker(100 * time.Millisecond)
})
defer ticker.Stop()
log.Info("coordinator starts to collect cluster information")
for {
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
func (s *testCoordinatorSuite) TestDispatch(c *C) {
tc, co, cleanup := prepare(nil, nil, nil, c)
defer cleanup()
co.prepareChecker.isPrepared = true
co.prepareChecker.prepared = true
// Transfer peer from store 4 to store 1.
c.Assert(tc.addRegionStore(4, 40), IsNil)
c.Assert(tc.addRegionStore(3, 30), IsNil)
Expand Down Expand Up @@ -286,7 +286,7 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run
setCfg(cfg)
}
tc := newTestCluster(ctx, opt)
hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.getClusterID(), tc, true /* need to run */)
hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */)
if setTc != nil {
setTc(tc)
}
Expand Down
16 changes: 13 additions & 3 deletions server/cluster/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type prepareChecker struct {
reactiveRegions map[uint64]int
start time.Time
sum int
isPrepared bool
prepared bool
}

func newPrepareChecker() *prepareChecker {
Expand All @@ -40,7 +40,11 @@ func newPrepareChecker() *prepareChecker {
func (checker *prepareChecker) check(c *core.BasicCluster) bool {
checker.RLock()
defer checker.RUnlock()
if checker.isPrepared || time.Since(checker.start) > collectTimeout {
if checker.prepared {
return true
}
if time.Since(checker.start) > collectTimeout {
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
Expand All @@ -57,7 +61,7 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
return false
}
}
checker.isPrepared = true
checker.prepared = true
return true
}

Expand All @@ -69,3 +73,9 @@ func (checker *prepareChecker) collect(region *core.RegionInfo) {
}
checker.sum++
}

func (checker *prepareChecker) isPrepared() bool {
checker.RLock()
defer checker.RUnlock()
return checker.prepared
}
6 changes: 3 additions & 3 deletions server/cluster/unsafe_recovery_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func (s *testUnsafeRecoverSuite) TestPlanExecution(c *C) {
_, opt, _ := newTestScheduleConfig()
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
// Manually fill the coordinator up to allow calling on cluster.PauseOrResumeSchedulers().
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.getClusterID(), cluster, true))
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true))
recoveryController := newUnsafeRecoveryController(cluster)
recoveryController.stage = recovering
recoveryController.failedStores = map[uint64]string{
Expand Down Expand Up @@ -629,7 +629,7 @@ func (s *testUnsafeRecoverSuite) TestPlanExecution(c *C) {
func (s *testUnsafeRecoverSuite) TestRemoveFailedStores(c *C) {
_, opt, _ := newTestScheduleConfig()
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.getClusterID(), cluster, true))
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true))
cluster.coordinator.run()
stores := newTestStores(2, "5.3.0")
stores[1] = stores[1].Clone(core.SetLastHeartbeatTS(time.Now()))
Expand Down Expand Up @@ -661,7 +661,7 @@ func (s *testUnsafeRecoverSuite) TestRemoveFailedStores(c *C) {
func (s *testUnsafeRecoverSuite) TestSplitPaused(c *C) {
_, opt, _ := newTestScheduleConfig()
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.getClusterID(), cluster, true))
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true))
cluster.coordinator.run()
stores := newTestStores(2, "5.3.0")
stores[1] = stores[1].Clone(core.SetLastHeartbeatTS(time.Now()))
Expand Down
11 changes: 10 additions & 1 deletion server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type RegionInfo struct {
QueryStats *pdpb.QueryStats
flowRoundDivisor uint64
// buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version.
buckets unsafe.Pointer
buckets unsafe.Pointer
fromHeartbeat bool
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -540,6 +541,11 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio
return r.replicationStatus
}

// IsFromHeartbeat returns whether the region info is from the region heartbeat.
func (r *RegionInfo) IsFromHeartbeat() bool {
return r.fromHeartbeat
}

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)
Expand Down Expand Up @@ -624,6 +630,9 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
saveCache = true
}
if !origin.IsFromHeartbeat() {
isNew = true
Copy link
Member

@AndreMouche AndreMouche May 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set saveKV, saveCache, needSync to true if it is not from heartbeat?

}
}
return
}
Expand Down
7 changes: 7 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption {
region.interval = interval
}
}

// SetFromHeartbeat sets if the region info comes from the region heartbeat.
func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption {
return func(region *RegionInfo) {
region.fromHeartbeat = fromHeartbeat
}
}
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request, flowRoundOption)
region := core.RegionFromHeartbeat(request, flowRoundOption, core.SetFromHeartbeat(true))
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down
3 changes: 2 additions & 1 deletion server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
core.SetWrittenKeys(stats[i].KeysWritten),
core.SetReadBytes(stats[i].BytesRead),
core.SetReadKeys(stats[i].KeysRead),
core.SetFromHeartbeat(false),
)
} else {
region = core.NewRegionInfo(r, regionLeader)
region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we do not need to SetFromHeartbeat(fase) because we do not process the value if IsNew on L218

_, saveKV, _, _ := regionGuide(region, origin)

}

origin, err := bc.PreCheckPutRegion(region)
Expand Down
45 changes: 45 additions & 0 deletions tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,51 @@ func (s *regionSyncerTestSuite) TestFullSyncWithAddMember(c *C) {
c.Assert(loadRegions, HasLen, regionLen)
}

func (s *regionSyncerTestSuite) TestPrepareChecker(c *C) {
c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker", `return(true)`), IsNil)
defer failpoint.Disable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker")
cluster, err := tests.NewTestCluster(s.ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true })
defer cluster.Destroy()
c.Assert(err, IsNil)

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer := cluster.GetServer(cluster.GetLeader())
c.Assert(leaderServer.BootstrapCluster(), IsNil)
rc := leaderServer.GetServer().GetRaftCluster()
c.Assert(rc, NotNil)
regionLen := 110
regions := initRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}

// ensure flush to region storage
time.Sleep(3 * time.Second)
c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue)

// join new PD
pd2, err := cluster.Join(s.ctx)
c.Assert(err, IsNil)
err = pd2.Run()
c.Assert(err, IsNil)
// waiting for synchronization to complete
time.Sleep(3 * time.Second)
err = cluster.ResignLeader()
c.Assert(err, IsNil)
c.Assert(cluster.WaitLeader(), Equals, "pd2")
leaderServer = cluster.GetServer(cluster.GetLeader())
rc = leaderServer.GetServer().GetRaftCluster()
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}
time.Sleep(time.Second)
c.Assert(rc.IsPrepared(), IsTrue)
}

func initRegions(regionLen int) []*core.RegionInfo {
allocator := &idAllocator{allocator: mockid.NewIDAllocator()}
regions := make([]*core.RegionInfo, 0, regionLen)
Expand Down