diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 1c99cce8823..888163bceaa 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -755,6 +755,11 @@ func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error { return nil } +// IsPrepared return true if the prepare checker is ready. +func (c *RaftCluster) IsPrepared() bool { + return c.coordinator.prepareChecker.isPrepared() +} + var regionGuide = core.GenerateRegionGuideFunc(true) // processRegionHeartbeat updates the region information. @@ -821,7 +826,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { regionEventCounter.WithLabelValues("update_cache").Inc() } - if isNew { + if !c.IsPrepared() && isNew { c.coordinator.prepareChecker.collect(region) } @@ -875,12 +880,6 @@ func (c *RaftCluster) updateStoreStatusLocked(id uint64) { c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize) } -func (c *RaftCluster) getClusterID() uint64 { - c.RLock() - defer c.RUnlock() - return c.meta.GetId() -} - func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error { if c.storage != nil { if err := c.storage.SaveMeta(meta); err != nil { diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index 5b837cb2e4b..973c628662c 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -311,6 +311,9 @@ func (c *coordinator) runUntilStop() { func (c *coordinator) run() { ticker := time.NewTicker(runSchedulerCheckInterval) + failpoint.Inject("changeCoordinatorTicker", func() { + ticker = time.NewTicker(100 * time.Millisecond) + }) defer ticker.Stop() log.Info("coordinator starts to collect cluster information") for { diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index cfdd182decf..4757a582a70 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -193,7 +193,7 @@ func (s *testCoordinatorSuite) TestBasic(c *C) { func (s *testCoordinatorSuite) TestDispatch(c *C) { tc, co, cleanup := prepare(nil, nil, nil, c) defer cleanup() - co.prepareChecker.isPrepared = true + co.prepareChecker.prepared = true // Transfer peer from store 4 to store 1. c.Assert(tc.addRegionStore(4, 40), IsNil) c.Assert(tc.addRegionStore(3, 30), IsNil) @@ -286,7 +286,7 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run setCfg(cfg) } tc := newTestCluster(ctx, opt) - hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.getClusterID(), tc, true /* need to run */) + hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */) if setTc != nil { setTc(tc) } diff --git a/server/cluster/prepare_checker.go b/server/cluster/prepare_checker.go index 438d6e2da46..92c8123f74f 100644 --- a/server/cluster/prepare_checker.go +++ b/server/cluster/prepare_checker.go @@ -26,7 +26,7 @@ type prepareChecker struct { reactiveRegions map[uint64]int start time.Time sum int - isPrepared bool + prepared bool } func newPrepareChecker() *prepareChecker { @@ -40,7 +40,11 @@ func newPrepareChecker() *prepareChecker { func (checker *prepareChecker) check(c *core.BasicCluster) bool { checker.RLock() defer checker.RUnlock() - if checker.isPrepared || time.Since(checker.start) > collectTimeout { + if checker.prepared { + return true + } + if time.Since(checker.start) > collectTimeout { + checker.prepared = true return true } // The number of active regions should be more than total region of all stores * collectFactor @@ -57,7 +61,7 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool { return false } } - checker.isPrepared = true + checker.prepared = true return true } @@ -69,3 +73,9 @@ func (checker *prepareChecker) collect(region *core.RegionInfo) { } checker.sum++ } + +func (checker *prepareChecker) isPrepared() bool { + checker.RLock() + defer checker.RUnlock() + return checker.prepared +} diff --git a/server/cluster/unsafe_recovery_controller_test.go b/server/cluster/unsafe_recovery_controller_test.go index e8a48970a8f..8455b201c98 100644 --- a/server/cluster/unsafe_recovery_controller_test.go +++ b/server/cluster/unsafe_recovery_controller_test.go @@ -522,7 +522,7 @@ func (s *testUnsafeRecoverSuite) TestPlanExecution(c *C) { _, opt, _ := newTestScheduleConfig() cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) // Manually fill the coordinator up to allow calling on cluster.PauseOrResumeSchedulers(). - cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.getClusterID(), cluster, true)) + cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true)) recoveryController := newUnsafeRecoveryController(cluster) recoveryController.stage = recovering recoveryController.failedStores = map[uint64]string{ @@ -629,7 +629,7 @@ func (s *testUnsafeRecoverSuite) TestPlanExecution(c *C) { func (s *testUnsafeRecoverSuite) TestRemoveFailedStores(c *C) { _, opt, _ := newTestScheduleConfig() cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) - cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.getClusterID(), cluster, true)) + cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true)) cluster.coordinator.run() stores := newTestStores(2, "5.3.0") stores[1] = stores[1].Clone(core.SetLastHeartbeatTS(time.Now())) @@ -661,7 +661,7 @@ func (s *testUnsafeRecoverSuite) TestRemoveFailedStores(c *C) { func (s *testUnsafeRecoverSuite) TestSplitPaused(c *C) { _, opt, _ := newTestScheduleConfig() cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) - cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.getClusterID(), cluster, true)) + cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true)) cluster.coordinator.run() stores := newTestStores(2, "5.3.0") stores[1] = stores[1].Clone(core.SetLastHeartbeatTS(time.Now())) diff --git a/server/core/region.go b/server/core/region.go index 7e303e68de6..e2997113624 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -61,7 +61,8 @@ type RegionInfo struct { QueryStats *pdpb.QueryStats flowRoundDivisor uint64 // buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version. - buckets unsafe.Pointer + buckets unsafe.Pointer + fromHeartbeat bool } // NewRegionInfo creates RegionInfo with region's meta and leader peer. @@ -540,6 +541,11 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio return r.replicationStatus } +// IsFromHeartbeat returns whether the region info is from the region heartbeat. +func (r *RegionInfo) IsFromHeartbeat() bool { + return r.fromHeartbeat +} + // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) @@ -624,6 +630,9 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) { saveCache = true } + if !origin.IsFromHeartbeat() { + isNew = true + } } return } diff --git a/server/core/region_option.go b/server/core/region_option.go index fca63eb4992..7f76c515340 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -315,3 +315,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption { region.interval = interval } } + +// SetFromHeartbeat sets if the region info comes from the region heartbeat. +func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption { + return func(region *RegionInfo) { + region.fromHeartbeat = fromHeartbeat + } +} diff --git a/server/grpc_service.go b/server/grpc_service.go index d6aacb70755..0d7cd6a8374 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -877,7 +877,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error lastBind = time.Now() } - region := core.RegionFromHeartbeat(request, flowRoundOption) + region := core.RegionFromHeartbeat(request, flowRoundOption, core.SetFromHeartbeat(true)) if region.GetLeader() == nil { log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil)) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc() diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index 37cdf0fbc9f..3f2419ad720 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -204,9 +204,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { core.SetWrittenKeys(stats[i].KeysWritten), core.SetReadBytes(stats[i].BytesRead), core.SetReadKeys(stats[i].KeysRead), + core.SetFromHeartbeat(false), ) } else { - region = core.NewRegionInfo(r, regionLeader) + region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false)) } origin, err := bc.PreCheckPutRegion(region) diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index f7bb3d776c5..dd7dc1fb5f1 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -210,6 +210,51 @@ func (s *regionSyncerTestSuite) TestFullSyncWithAddMember(c *C) { c.Assert(loadRegions, HasLen, regionLen) } +func (s *regionSyncerTestSuite) TestPrepareChecker(c *C) { + c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker", `return(true)`), IsNil) + defer failpoint.Disable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker") + cluster, err := tests.NewTestCluster(s.ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true }) + defer cluster.Destroy() + c.Assert(err, IsNil) + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + leaderServer := cluster.GetServer(cluster.GetLeader()) + c.Assert(leaderServer.BootstrapCluster(), IsNil) + rc := leaderServer.GetServer().GetRaftCluster() + c.Assert(rc, NotNil) + regionLen := 110 + regions := initRegions(regionLen) + for _, region := range regions { + err = rc.HandleRegionHeartbeat(region) + c.Assert(err, IsNil) + } + + // ensure flush to region storage + time.Sleep(3 * time.Second) + c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue) + + // join new PD + pd2, err := cluster.Join(s.ctx) + c.Assert(err, IsNil) + err = pd2.Run() + c.Assert(err, IsNil) + // waiting for synchronization to complete + time.Sleep(3 * time.Second) + err = cluster.ResignLeader() + c.Assert(err, IsNil) + c.Assert(cluster.WaitLeader(), Equals, "pd2") + leaderServer = cluster.GetServer(cluster.GetLeader()) + rc = leaderServer.GetServer().GetRaftCluster() + for _, region := range regions { + err = rc.HandleRegionHeartbeat(region) + c.Assert(err, IsNil) + } + time.Sleep(time.Second) + c.Assert(rc.IsPrepared(), IsTrue) +} + func initRegions(regionLen int) []*core.RegionInfo { allocator := &idAllocator{allocator: mockid.NewIDAllocator()} regions := make([]*core.RegionInfo, 0, regionLen)