diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b2aefad6f74..2ba35645855 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -593,6 +593,11 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { return nil } +// IsPrepared return true if the prepare checker is ready. +func (c *RaftCluster) IsPrepared() bool { + return c.prepareChecker.isPrepared() +} + var regionGuide = core.GenerateRegionGuideFunc(true) // processRegionHeartbeat updates the region information. @@ -663,7 +668,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { regionEventCounter.WithLabelValues("update_cache").Inc() } - if isNew { + if !c.IsPrepared() && isNew { c.prepareChecker.collect(region) } @@ -718,13 +723,6 @@ func (c *RaftCluster) updateStoreStatusLocked(id uint64) { c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize) } -//nolint:unused -func (c *RaftCluster) getClusterID() uint64 { - c.RLock() - defer c.RUnlock() - return c.meta.GetId() -} - func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error { if c.storage != nil { if err := c.storage.SaveMeta(meta); err != nil { @@ -1445,13 +1443,6 @@ func (c *RaftCluster) GetComponentManager() *component.Manager { return c.componentManager } -// isPrepared if the cluster information is collected -func (c *RaftCluster) isPrepared() bool { - c.RLock() - defer c.RUnlock() - return c.prepareChecker.check(c) -} - // GetStoresLoads returns load stats of all stores. func (c *RaftCluster) GetStoresLoads() map[uint64][]float64 { c.RLock() @@ -1504,10 +1495,11 @@ func (c *RaftCluster) FitRegion(region *core.RegionInfo) *placement.RegionFit { } type prepareChecker struct { + sync.RWMutex reactiveRegions map[uint64]int start time.Time sum int - isPrepared bool + prepared bool } func newPrepareChecker() *prepareChecker { @@ -1519,7 +1511,13 @@ func newPrepareChecker() *prepareChecker { // Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration. func (checker *prepareChecker) check(c *RaftCluster) bool { - if checker.isPrepared || time.Since(checker.start) > collectTimeout { + checker.Lock() + defer checker.Unlock() + if checker.prepared { + return true + } + if time.Since(checker.start) > collectTimeout { + checker.prepared = true return true } // The number of active regions should be more than total region of all stores * collectFactor @@ -1536,17 +1534,25 @@ func (checker *prepareChecker) check(c *RaftCluster) bool { return false } } - checker.isPrepared = true + checker.prepared = true return true } func (checker *prepareChecker) collect(region *core.RegionInfo) { + checker.Lock() + defer checker.Unlock() for _, p := range region.GetPeers() { checker.reactiveRegions[p.GetStoreId()]++ } checker.sum++ } +func (checker *prepareChecker) isPrepared() bool { + checker.RLock() + defer checker.RUnlock() + return checker.prepared +} + // GetHotWriteRegions gets hot write regions' info. func (c *RaftCluster) GetHotWriteRegions(storeIDs ...uint64) *statistics.StoreHotPeersInfos { c.RLock() diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index d52f2f63dcf..8b1850c57fd 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -279,6 +279,9 @@ func (c *coordinator) drivePushOperator() { func (c *coordinator) run() { ticker := time.NewTicker(runSchedulerCheckInterval) + failpoint.Inject("changeCoordinatorTicker", func() { + ticker = time.NewTicker(100 * time.Millisecond) + }) defer ticker.Stop() log.Info("coordinator starts to collect cluster information") for { @@ -604,7 +607,7 @@ func (c *coordinator) resetHotSpotMetrics() { } func (c *coordinator) shouldRun() bool { - return c.cluster.isPrepared() + return c.cluster.prepareChecker.check(c.cluster) } func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string) error { diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index fadf3a66dee..471ad972cb3 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -189,9 +189,8 @@ func (s *testCoordinatorSuite) TestBasic(c *C) { } func (s *testCoordinatorSuite) TestDispatch(c *C) { - tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.isPrepared = true }, nil, c) + tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.prepared = true }, nil, c) defer cleanup() - // Transfer peer from store 4 to store 1. c.Assert(tc.addRegionStore(4, 40), IsNil) c.Assert(tc.addRegionStore(3, 30), IsNil) @@ -284,7 +283,7 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run setCfg(cfg) } tc := newTestCluster(ctx, opt) - hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.getClusterID(), tc, true /* need to run */) + hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */) if setTc != nil { setTc(tc) } diff --git a/server/core/region.go b/server/core/region.go index b1917d3fd58..ff3925fe08d 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -57,6 +57,7 @@ type RegionInfo struct { replicationStatus *replication_modepb.RegionReplicationStatus QueryStats *pdpb.QueryStats flowRoundDivisor uint64 + fromHeartbeat bool } // NewRegionInfo creates RegionInfo with region's meta and leader peer. @@ -476,6 +477,11 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio return r.replicationStatus } +// IsFromHeartbeat returns whether the region info is from the region heartbeat. +func (r *RegionInfo) IsFromHeartbeat() bool { + return r.fromHeartbeat +} + // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) @@ -560,6 +566,9 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) { saveCache = true } + if !origin.IsFromHeartbeat() { + isNew = true + } } return } diff --git a/server/core/region_option.go b/server/core/region_option.go index 1cab90444c9..c5a8f470a6f 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -304,3 +304,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption { region.interval = interval } } + +// SetFromHeartbeat sets if the region info comes from the region heartbeat. +func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption { + return func(region *RegionInfo) { + region.fromHeartbeat = fromHeartbeat + } +} diff --git a/server/grpc_service.go b/server/grpc_service.go index f12a2b812db..2df311cd16d 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -589,7 +589,7 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { lastBind = time.Now() } - region := core.RegionFromHeartbeat(request, flowRoundOption) + region := core.RegionFromHeartbeat(request, flowRoundOption, core.SetFromHeartbeat(true)) if region.GetLeader() == nil { log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil)) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc() diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index f76bc0a6525..132b096ee55 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -201,9 +201,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { core.SetWrittenKeys(stats[i].KeysWritten), core.SetReadBytes(stats[i].BytesRead), core.SetReadKeys(stats[i].KeysRead), + core.SetFromHeartbeat(false), ) } else { - region = core.NewRegionInfo(r, regionLeader) + region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false)) } origin, err := bc.PreCheckPutRegion(region) diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index bf14298b5d1..4599bd9dd29 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -19,6 +19,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/testutil" @@ -195,6 +196,51 @@ func (s *regionSyncerTestSuite) TestFullSyncWithAddMember(c *C) { c.Assert(len(loadRegions), Equals, regionLen) } +func (s *regionSyncerTestSuite) TestPrepareChecker(c *C) { + c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker", `return(true)`), IsNil) + defer failpoint.Disable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker") + cluster, err := tests.NewTestCluster(s.ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true }) + defer cluster.Destroy() + c.Assert(err, IsNil) + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + leaderServer := cluster.GetServer(cluster.GetLeader()) + c.Assert(leaderServer.BootstrapCluster(), IsNil) + rc := leaderServer.GetServer().GetRaftCluster() + c.Assert(rc, NotNil) + regionLen := 110 + regions := initRegions(regionLen) + for _, region := range regions { + err = rc.HandleRegionHeartbeat(region) + c.Assert(err, IsNil) + } + + // ensure flush to region storage + time.Sleep(3 * time.Second) + c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue) + + // join new PD + pd2, err := cluster.Join(s.ctx) + c.Assert(err, IsNil) + err = pd2.Run() + c.Assert(err, IsNil) + // waiting for synchronization to complete + time.Sleep(3 * time.Second) + err = cluster.ResignLeader() + c.Assert(err, IsNil) + c.Assert(cluster.WaitLeader(), Equals, "pd2") + leaderServer = cluster.GetServer(cluster.GetLeader()) + rc = leaderServer.GetServer().GetRaftCluster() + for _, region := range regions { + err = rc.HandleRegionHeartbeat(region) + c.Assert(err, IsNil) + } + time.Sleep(time.Second) + c.Assert(rc.IsPrepared(), IsTrue) +} + func initRegions(regionLen int) []*core.RegionInfo { allocator := &idAllocator{allocator: mockid.NewIDAllocator()} regions := make([]*core.RegionInfo, 0, regionLen)