diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index ad39598a89c..c87bfd1bcc9 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -517,6 +517,11 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { return nil } +// IsPrepared return true if the prepare checker is ready. +func (c *RaftCluster) IsPrepared() bool { + return c.coordinator.prepareChecker.isPrepared() +} + var regionGuide = core.GenerateRegionGuideFunc(true) // processRegionHeartbeat updates the region information. @@ -587,8 +592,13 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { regionEventCounter.WithLabelValues("update_cache").Inc() } +<<<<<<< HEAD if isNew { c.prepareChecker.collect(region) +======= + if !c.IsPrepared() && isNew { + c.coordinator.prepareChecker.collect(region) +>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875)) } if c.regionStats != nil { @@ -635,6 +645,7 @@ func (c *RaftCluster) updateStoreStatusLocked(id uint64) { c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize) } +<<<<<<< HEAD //nolint:unused func (c *RaftCluster) getClusterID() uint64 { c.RLock() @@ -642,6 +653,8 @@ func (c *RaftCluster) getClusterID() uint64 { return c.meta.GetId() } +======= +>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875)) func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error { if c.storage != nil { if err := c.storage.SaveMeta(meta); err != nil { diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index fc5ddd91c7b..926f1c29bd5 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -212,6 +212,9 @@ func (c *coordinator) drivePushOperator() { func (c *coordinator) run() { ticker := time.NewTicker(runSchedulerCheckInterval) + failpoint.Inject("changeCoordinatorTicker", func() { + ticker = time.NewTicker(100 * time.Millisecond) + }) defer ticker.Stop() log.Info("coordinator starts to collect cluster information") for { diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index 93e33e0131a..cdbb6bd483f 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -188,7 +188,11 @@ func (s *testCoordinatorSuite) TestBasic(c *C) { func (s *testCoordinatorSuite) TestDispatch(c *C) { tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.isPrepared = true }, nil, c) defer cleanup() +<<<<<<< HEAD +======= + co.prepareChecker.prepared = true +>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875)) // Transfer peer from store 4 to store 1. c.Assert(tc.addRegionStore(4, 40), IsNil) c.Assert(tc.addRegionStore(3, 30), IsNil) @@ -290,8 +294,13 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run if setCfg != nil { setCfg(cfg) } +<<<<<<< HEAD tc := newTestCluster(opt) hbStreams := mockhbstream.NewHeartbeatStreams(tc.getClusterID(), false /* need to run */) +======= + tc := newTestCluster(ctx, opt) + hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */) +>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875)) if setTc != nil { setTc(tc) } diff --git a/server/cluster/prepare_checker.go b/server/cluster/prepare_checker.go new file mode 100644 index 00000000000..92c8123f74f --- /dev/null +++ b/server/cluster/prepare_checker.go @@ -0,0 +1,81 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "time" + + "github.com/tikv/pd/pkg/syncutil" + "github.com/tikv/pd/server/core" +) + +type prepareChecker struct { + syncutil.RWMutex + reactiveRegions map[uint64]int + start time.Time + sum int + prepared bool +} + +func newPrepareChecker() *prepareChecker { + return &prepareChecker{ + start: time.Now(), + reactiveRegions: make(map[uint64]int), + } +} + +// Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration. +func (checker *prepareChecker) check(c *core.BasicCluster) bool { + checker.RLock() + defer checker.RUnlock() + if checker.prepared { + return true + } + if time.Since(checker.start) > collectTimeout { + checker.prepared = true + return true + } + // The number of active regions should be more than total region of all stores * collectFactor + if float64(c.GetRegionCount())*collectFactor > float64(checker.sum) { + return false + } + for _, store := range c.GetStores() { + if !store.IsPreparing() && !store.IsServing() { + continue + } + storeID := store.GetID() + // For each store, the number of active regions should be more than total region of the store * collectFactor + if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) { + return false + } + } + checker.prepared = true + return true +} + +func (checker *prepareChecker) collect(region *core.RegionInfo) { + checker.Lock() + defer checker.Unlock() + for _, p := range region.GetPeers() { + checker.reactiveRegions[p.GetStoreId()]++ + } + checker.sum++ +} + +func (checker *prepareChecker) isPrepared() bool { + checker.RLock() + defer checker.RUnlock() + return checker.prepared +} diff --git a/server/cluster/unsafe_recovery_controller_test.go b/server/cluster/unsafe_recovery_controller_test.go new file mode 100644 index 00000000000..8455b201c98 --- /dev/null +++ b/server/cluster/unsafe_recovery_controller_test.go @@ -0,0 +1,685 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "bytes" + "context" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/raft_serverpb" + "github.com/tikv/pd/pkg/mock/mockid" + "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/schedule/hbstream" + "github.com/tikv/pd/server/storage" +) + +var _ = Suite(&testUnsafeRecoverSuite{}) + +type testUnsafeRecoverSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testUnsafeRecoverSuite) TearDownTest(c *C) { + s.cancel() +} + +func (s *testUnsafeRecoverSuite) SetUpTest(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + +func (s *testUnsafeRecoverSuite) TestPlanGenerationOneHealthyRegion(c *C) { + _, opt, _ := newTestScheduleConfig() + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + recoveryController := newUnsafeRecoveryController(cluster) + recoveryController.failedStores = map[uint64]string{ + 3: "", + } + recoveryController.storeReports = map[uint64]*pdpb.StoreReport{ + 1: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}}}}}, + }}, + 2: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}}}}}, + }}, + } + recoveryController.generateRecoveryPlan() + // Rely on PD replica checker to remove failed stores. + c.Assert(len(recoveryController.storeRecoveryPlans), Equals, 0) +} + +func (s *testUnsafeRecoverSuite) TestPlanGenerationOneUnhealthyRegion(c *C) { + _, opt, _ := newTestScheduleConfig() + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + recoveryController := newUnsafeRecoveryController(cluster) + recoveryController.failedStores = map[uint64]string{ + 2: "", + 3: "", + } + recoveryController.storeReports = map[uint64]*pdpb.StoreReport{ + 1: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}}}}}, + }}, + } + recoveryController.generateRecoveryPlan() + c.Assert(len(recoveryController.storeRecoveryPlans), Equals, 1) + store1Plan, ok := recoveryController.storeRecoveryPlans[1] + c.Assert(ok, IsTrue) + c.Assert(len(store1Plan.Updates), Equals, 1) + update := store1Plan.Updates[0] + c.Assert(bytes.Compare(update.StartKey, []byte("")), Equals, 0) + c.Assert(bytes.Compare(update.EndKey, []byte("")), Equals, 0) + c.Assert(len(update.Peers), Equals, 1) + c.Assert(update.Peers[0].StoreId, Equals, uint64(1)) +} + +func (s *testUnsafeRecoverSuite) TestPlanGenerationEmptyRange(c *C) { + _, opt, _ := newTestScheduleConfig() + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + recoveryController := newUnsafeRecoveryController(cluster) + recoveryController.failedStores = map[uint64]string{ + 3: "", + } + recoveryController.storeReports = map[uint64]*pdpb.StoreReport{ + 1: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + EndKey: []byte("c"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}}}}}, + }}, + 2: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 2, + StartKey: []byte("d"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}, + Peers: []*metapb.Peer{ + {Id: 12, StoreId: 1}, {Id: 22, StoreId: 2}, {Id: 32, StoreId: 3}}}}}, + }}, + } + recoveryController.generateRecoveryPlan() + c.Assert(len(recoveryController.storeRecoveryPlans), Equals, 1) + for storeID, plan := range recoveryController.storeRecoveryPlans { + c.Assert(len(plan.Creates), Equals, 1) + create := plan.Creates[0] + c.Assert(bytes.Compare(create.StartKey, []byte("c")), Equals, 0) + c.Assert(bytes.Compare(create.EndKey, []byte("d")), Equals, 0) + c.Assert(len(create.Peers), Equals, 1) + c.Assert(create.Peers[0].StoreId, Equals, storeID) + c.Assert(create.Peers[0].Role, Equals, metapb.PeerRole_Voter) + } +} + +func (s *testUnsafeRecoverSuite) TestPlanGenerationEmptyRangeAtTheEnd(c *C) { + _, opt, _ := newTestScheduleConfig() + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + recoveryController := newUnsafeRecoveryController(cluster) + recoveryController.failedStores = map[uint64]string{ + 3: "", + } + recoveryController.storeReports = map[uint64]*pdpb.StoreReport{ + 1: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + StartKey: []byte(""), + EndKey: []byte("c"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}}}}}, + }}, + 2: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + StartKey: []byte(""), + EndKey: []byte("c"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}}}}}, + }}, + } + recoveryController.generateRecoveryPlan() + c.Assert(len(recoveryController.storeRecoveryPlans), Equals, 1) + for storeID, plan := range recoveryController.storeRecoveryPlans { + c.Assert(len(plan.Creates), Equals, 1) + create := plan.Creates[0] + c.Assert(bytes.Compare(create.StartKey, []byte("c")), Equals, 0) + c.Assert(bytes.Compare(create.EndKey, []byte("")), Equals, 0) + c.Assert(len(create.Peers), Equals, 1) + c.Assert(create.Peers[0].StoreId, Equals, storeID) + c.Assert(create.Peers[0].Role, Equals, metapb.PeerRole_Voter) + } +} + +func (s *testUnsafeRecoverSuite) TestPlanGenerationUseNewestRanges(c *C) { + _, opt, _ := newTestScheduleConfig() + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + recoveryController := newUnsafeRecoveryController(cluster) + recoveryController.failedStores = map[uint64]string{ + 3: "", + 4: "", + } + recoveryController.storeReports = map[uint64]*pdpb.StoreReport{ + 1: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + StartKey: []byte(""), + EndKey: []byte("c"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 20}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 31, StoreId: 3}, {Id: 41, StoreId: 4}}}}}, + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 2, + StartKey: []byte("a"), + EndKey: []byte("c"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 10}, + Peers: []*metapb.Peer{ + {Id: 12, StoreId: 1}, {Id: 22, StoreId: 2}, {Id: 32, StoreId: 3}}}}}, + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 4, + StartKey: []byte("m"), + EndKey: []byte("p"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 10}, + Peers: []*metapb.Peer{ + {Id: 14, StoreId: 1}, {Id: 24, StoreId: 2}, {Id: 44, StoreId: 4}}}}}, + }}, + 2: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 3, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 5}, + Peers: []*metapb.Peer{ + {Id: 23, StoreId: 2}, {Id: 33, StoreId: 3}, {Id: 43, StoreId: 4}}}}}, + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 2, + StartKey: []byte("a"), + EndKey: []byte("c"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 10}, + Peers: []*metapb.Peer{ + {Id: 12, StoreId: 1}, {Id: 22, StoreId: 2}, {Id: 32, StoreId: 3}}}}}, + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 4, + StartKey: []byte("m"), + EndKey: []byte("p"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 10}, + Peers: []*metapb.Peer{ + {Id: 14, StoreId: 1}, {Id: 24, StoreId: 2}, {Id: 44, StoreId: 4}}}}}, + }}, + } + recoveryController.generateRecoveryPlan() + c.Assert(len(recoveryController.storeRecoveryPlans), Equals, 2) + store1Plan, ok := recoveryController.storeRecoveryPlans[1] + c.Assert(ok, IsTrue) + updatedRegion1 := store1Plan.Updates[0] + c.Assert(updatedRegion1.Id, Equals, uint64(1)) + c.Assert(len(updatedRegion1.Peers), Equals, 1) + c.Assert(bytes.Compare(updatedRegion1.StartKey, []byte("")), Equals, 0) + c.Assert(bytes.Compare(updatedRegion1.EndKey, []byte("a")), Equals, 0) + + store2Plan := recoveryController.storeRecoveryPlans[2] + updatedRegion3 := store2Plan.Updates[0] + c.Assert(updatedRegion3.Id, Equals, uint64(3)) + c.Assert(len(updatedRegion3.Peers), Equals, 1) + c.Assert(bytes.Compare(updatedRegion3.StartKey, []byte("c")), Equals, 0) + c.Assert(bytes.Compare(updatedRegion3.EndKey, []byte("m")), Equals, 0) + create := store2Plan.Creates[0] + c.Assert(bytes.Compare(create.StartKey, []byte("p")), Equals, 0) + c.Assert(bytes.Compare(create.EndKey, []byte("")), Equals, 0) +} + +func (s *testUnsafeRecoverSuite) TestPlanGenerationMembershipChange(c *C) { + _, opt, _ := newTestScheduleConfig() + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + recoveryController := newUnsafeRecoveryController(cluster) + recoveryController.failedStores = map[uint64]string{ + 4: "", + 5: "", + } + recoveryController.storeReports = map[uint64]*pdpb.StoreReport{ + 1: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + EndKey: []byte("c"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 2}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 41, StoreId: 4}, {Id: 51, StoreId: 5}}}}}, + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 2, + StartKey: []byte("c"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}}}}}, + }}, + 2: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}}}}}, + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 2, + StartKey: []byte("c"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}}}}}, + }}, + 3: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}}}}}, + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 2, + StartKey: []byte("c"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}}}}}, + }}, + } + recoveryController.generateRecoveryPlan() + c.Assert(len(recoveryController.storeRecoveryPlans), Equals, 3) + store1Plan, ok := recoveryController.storeRecoveryPlans[1] + c.Assert(ok, IsTrue) + updatedRegion1 := store1Plan.Updates[0] + c.Assert(updatedRegion1.Id, Equals, uint64(1)) + c.Assert(len(updatedRegion1.Peers), Equals, 1) + c.Assert(bytes.Compare(updatedRegion1.StartKey, []byte("")), Equals, 0) + c.Assert(bytes.Compare(updatedRegion1.EndKey, []byte("c")), Equals, 0) + + store2Plan := recoveryController.storeRecoveryPlans[2] + deleteStaleRegion1 := store2Plan.Deletes[0] + c.Assert(deleteStaleRegion1, Equals, uint64(1)) + + store3Plan := recoveryController.storeRecoveryPlans[3] + deleteStaleRegion1 = store3Plan.Deletes[0] + c.Assert(deleteStaleRegion1, Equals, uint64(1)) +} + +func (s *testUnsafeRecoverSuite) TestPlanGenerationPromotingLearner(c *C) { + _, opt, _ := newTestScheduleConfig() + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + recoveryController := newUnsafeRecoveryController(cluster) + recoveryController.failedStores = map[uint64]string{ + 2: "", + 3: "", + } + recoveryController.storeReports = map[uint64]*pdpb.StoreReport{ + 1: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1, Role: metapb.PeerRole_Learner}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}}}}}, + }}, + } + recoveryController.generateRecoveryPlan() + c.Assert(len(recoveryController.storeRecoveryPlans), Equals, 1) + store1Plan, ok := recoveryController.storeRecoveryPlans[1] + c.Assert(ok, IsTrue) + c.Assert(len(store1Plan.Updates), Equals, 1) + update := store1Plan.Updates[0] + c.Assert(bytes.Compare(update.StartKey, []byte("")), Equals, 0) + c.Assert(bytes.Compare(update.EndKey, []byte("")), Equals, 0) + c.Assert(len(update.Peers), Equals, 1) + c.Assert(update.Peers[0].StoreId, Equals, uint64(1)) + c.Assert(update.Peers[0].Role, Equals, metapb.PeerRole_Voter) +} + +func (s *testUnsafeRecoverSuite) TestPlanGenerationKeepingOneReplica(c *C) { + _, opt, _ := newTestScheduleConfig() + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + recoveryController := newUnsafeRecoveryController(cluster) + recoveryController.failedStores = map[uint64]string{ + 3: "", + 4: "", + } + recoveryController.storeReports = map[uint64]*pdpb.StoreReport{ + 1: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}, {Id: 41, StoreId: 4}}}}}, + }}, + 2: {PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}, {Id: 41, StoreId: 4}}}}}, + }}, + } + recoveryController.generateRecoveryPlan() + c.Assert(len(recoveryController.storeRecoveryPlans), Equals, 2) + foundUpdate := false + foundDelete := false + for storeID, plan := range recoveryController.storeRecoveryPlans { + if len(plan.Updates) == 1 { + foundUpdate = true + update := plan.Updates[0] + c.Assert(bytes.Compare(update.StartKey, []byte("")), Equals, 0) + c.Assert(bytes.Compare(update.EndKey, []byte("")), Equals, 0) + c.Assert(len(update.Peers), Equals, 1) + c.Assert(update.Peers[0].StoreId, Equals, storeID) + } else if len(plan.Deletes) == 1 { + foundDelete = true + c.Assert(plan.Deletes[0], Equals, uint64(1)) + } + } + c.Assert(foundUpdate, Equals, true) + c.Assert(foundDelete, Equals, true) +} + +func (s *testUnsafeRecoverSuite) TestReportCollection(c *C) { + _, opt, _ := newTestScheduleConfig() + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + recoveryController := newUnsafeRecoveryController(cluster) + recoveryController.stage = collectingClusterInfo + recoveryController.failedStores = map[uint64]string{ + 3: "", + 4: "", + } + recoveryController.storeReports[uint64(1)] = nil + recoveryController.storeReports[uint64(2)] = nil + store1Report := &pdpb.StoreReport{ + PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}, {Id: 41, StoreId: 4}}}}}, + }} + store2Report := &pdpb.StoreReport{ + PeerReports: []*pdpb.PeerReport{ + { + RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10}, + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1}, {Id: 21, StoreId: 2}, {Id: 31, StoreId: 3}, {Id: 41, StoreId: 4}}}}}, + }} + heartbeat := &pdpb.StoreHeartbeatRequest{Stats: &pdpb.StoreStats{StoreId: 1}} + resp := &pdpb.StoreHeartbeatResponse{} + recoveryController.HandleStoreHeartbeat(heartbeat, resp) + c.Assert(resp.RequireDetailedReport, Equals, true) + // Second and following heartbeats in a short period of time are ignored. + resp = &pdpb.StoreHeartbeatResponse{} + recoveryController.HandleStoreHeartbeat(heartbeat, resp) + c.Assert(resp.RequireDetailedReport, Equals, false) + + heartbeat.StoreReport = store1Report + recoveryController.HandleStoreHeartbeat(heartbeat, resp) + c.Assert(recoveryController.numStoresReported, Equals, 1) + c.Assert(recoveryController.storeReports[uint64(1)], Equals, store1Report) + + heartbeat.Stats.StoreId = uint64(2) + heartbeat.StoreReport = store2Report + recoveryController.HandleStoreHeartbeat(heartbeat, resp) + c.Assert(recoveryController.numStoresReported, Equals, 2) + c.Assert(recoveryController.storeReports[uint64(2)], Equals, store2Report) +} + +func (s *testUnsafeRecoverSuite) TestPlanExecution(c *C) { + _, opt, _ := newTestScheduleConfig() + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + // Manually fill the coordinator up to allow calling on cluster.PauseOrResumeSchedulers(). + cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true)) + recoveryController := newUnsafeRecoveryController(cluster) + recoveryController.stage = recovering + recoveryController.failedStores = map[uint64]string{ + 3: "", + 4: "", + } + recoveryController.storeReports[uint64(1)] = nil + recoveryController.storeReports[uint64(2)] = nil + recoveryController.storeRecoveryPlans[uint64(1)] = &pdpb.RecoveryPlan{ + Creates: []*metapb.Region{ + { + Id: 4, + StartKey: []byte("f"), + Peers: []*metapb.Peer{{Id: 14, StoreId: 1}}, + }, + }, + Updates: []*metapb.Region{ + { + Id: 5, + StartKey: []byte("c"), + EndKey: []byte("f"), + Peers: []*metapb.Peer{{Id: 15, StoreId: 1}}, + }, + }, + } + recoveryController.storeRecoveryPlans[uint64(2)] = &pdpb.RecoveryPlan{ + Updates: []*metapb.Region{ + { + Id: 3, + EndKey: []byte("c"), + Peers: []*metapb.Peer{{Id: 23, StoreId: 2}}, + }, + }, + Deletes: []uint64{2}, + } + store1Report := &pdpb.StoreReport{ + PeerReports: []*pdpb.PeerReport{ + { + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 4, + StartKey: []byte("f"), + Peers: []*metapb.Peer{{Id: 14, StoreId: 1}}}}, + }, + { + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 5, + StartKey: []byte("c"), + EndKey: []byte("f"), + Peers: []*metapb.Peer{{Id: 15, StoreId: 1}}}}, + }, + }} + heartbeat := &pdpb.StoreHeartbeatRequest{Stats: &pdpb.StoreStats{StoreId: 1}, StoreReport: store1Report} + resp := &pdpb.StoreHeartbeatResponse{} + recoveryController.HandleStoreHeartbeat(heartbeat, resp) + c.Assert(recoveryController.numStoresPlanExecuted, Equals, 1) + + store2Report := &pdpb.StoreReport{ + PeerReports: []*pdpb.PeerReport{ + { + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 2, + StartKey: []byte("g"), + Peers: []*metapb.Peer{{Id: 12, StoreId: 2}}}}, + }, + }} + heartbeat.Stats.StoreId = uint64(2) + heartbeat.StoreReport = store2Report + recoveryController.HandleStoreHeartbeat(heartbeat, resp) + c.Assert(recoveryController.numStoresPlanExecuted, Equals, 1) + + store2Report = &pdpb.StoreReport{ + PeerReports: []*pdpb.PeerReport{ + { + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 3, + EndKey: []byte("f"), + Peers: []*metapb.Peer{{Id: 13, StoreId: 2}}}}, + }, + }} + heartbeat.StoreReport = store2Report + recoveryController.HandleStoreHeartbeat(heartbeat, resp) + c.Assert(recoveryController.numStoresPlanExecuted, Equals, 1) + + store2Report = &pdpb.StoreReport{ + PeerReports: []*pdpb.PeerReport{ + { + RegionState: &raft_serverpb.RegionLocalState{ + Region: &metapb.Region{ + Id: 3, + EndKey: []byte("c"), + Peers: []*metapb.Peer{{Id: 13, StoreId: 2}}}}, + }, + }} + heartbeat.StoreReport = store2Report + recoveryController.HandleStoreHeartbeat(heartbeat, resp) + c.Assert(recoveryController.numStoresPlanExecuted, Equals, 2) + c.Assert(recoveryController.stage, Equals, finished) +} + +func (s *testUnsafeRecoverSuite) TestRemoveFailedStores(c *C) { + _, opt, _ := newTestScheduleConfig() + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true)) + cluster.coordinator.run() + stores := newTestStores(2, "5.3.0") + stores[1] = stores[1].Clone(core.SetLastHeartbeatTS(time.Now())) + for _, store := range stores { + c.Assert(cluster.PutStore(store.GetMeta()), IsNil) + } + recoveryController := newUnsafeRecoveryController(cluster) + failedStores := map[uint64]string{ + 1: "", + 3: "", + } + + c.Assert(recoveryController.RemoveFailedStores(failedStores), IsNil) + c.Assert(cluster.GetStore(uint64(1)).IsRemoved(), IsTrue) + for _, s := range cluster.GetSchedulers() { + paused, err := cluster.IsSchedulerAllowed(s) + c.Assert(err, IsNil) + c.Assert(paused, IsTrue) + } + + // Store 2's last heartbeat is recent, and is not allowed to be removed. + failedStores = map[uint64]string{ + 2: "", + } + + c.Assert(recoveryController.RemoveFailedStores(failedStores), NotNil) +} + +func (s *testUnsafeRecoverSuite) TestSplitPaused(c *C) { + _, opt, _ := newTestScheduleConfig() + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true)) + cluster.coordinator.run() + stores := newTestStores(2, "5.3.0") + stores[1] = stores[1].Clone(core.SetLastHeartbeatTS(time.Now())) + for _, store := range stores { + c.Assert(cluster.PutStore(store.GetMeta()), IsNil) + } + recoveryController := newUnsafeRecoveryController(cluster) + cluster.Lock() + cluster.unsafeRecoveryController = recoveryController + cluster.Unlock() + failedStores := map[uint64]string{ + 1: "", + } + c.Assert(recoveryController.RemoveFailedStores(failedStores), IsNil) + askSplitReq := &pdpb.AskSplitRequest{} + _, err := cluster.HandleAskSplit(askSplitReq) + c.Assert(err.Error(), Equals, "[PD:unsaferecovery:ErrUnsafeRecoveryIsRunning]unsafe recovery is running") + askBatchSplitReq := &pdpb.AskBatchSplitRequest{} + _, err = cluster.HandleAskBatchSplit(askBatchSplitReq) + c.Assert(err.Error(), Equals, "[PD:unsaferecovery:ErrUnsafeRecoveryIsRunning]unsafe recovery is running") +} diff --git a/server/core/region.go b/server/core/region.go index af8a3a6387b..5e7dc580896 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -55,6 +55,14 @@ type RegionInfo struct { approximateKeys int64 interval *pdpb.TimeInterval replicationStatus *replication_modepb.RegionReplicationStatus +<<<<<<< HEAD +======= + QueryStats *pdpb.QueryStats + flowRoundDivisor uint64 + // buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version. + buckets unsafe.Pointer + fromHeartbeat bool +>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875)) } // NewRegionInfo creates RegionInfo with region's meta and leader peer. @@ -439,6 +447,11 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio return r.replicationStatus } +// IsFromHeartbeat returns whether the region info is from the region heartbeat. +func (r *RegionInfo) IsFromHeartbeat() bool { + return r.fromHeartbeat +} + // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. type RegionGuideFunc func(region, origin *RegionInfo, traceRegionFlow bool) (isNew, saveKV, saveCache, needSync bool) @@ -523,6 +536,9 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) { saveCache = true } + if !origin.IsFromHeartbeat() { + isNew = true + } } return } diff --git a/server/core/region_option.go b/server/core/region_option.go index 1292ec89965..fccdb76096e 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -276,3 +276,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption { region.interval = interval } } + +// SetFromHeartbeat sets if the region info comes from the region heartbeat. +func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption { + return func(region *RegionInfo) { + region.fromHeartbeat = fromHeartbeat + } +} diff --git a/server/grpc_service.go b/server/grpc_service.go index 557f70ac57c..1d0e5569b62 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -402,7 +402,11 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { lastBind = time.Now() } +<<<<<<< HEAD region := core.RegionFromHeartbeat(request) +======= + region := core.RegionFromHeartbeat(request, flowRoundOption, core.SetFromHeartbeat(true)) +>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875)) if region.GetLeader() == nil { log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil)) continue diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index 9eb037c2679..63781756b31 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -202,9 +202,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { core.SetWrittenKeys(stats[i].KeysWritten), core.SetReadBytes(stats[i].BytesRead), core.SetReadKeys(stats[i].KeysRead), + core.SetFromHeartbeat(false), ) } else { - region = core.NewRegionInfo(r, regionLeader) + region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false)) } origin, err := bc.PreCheckPutRegion(region) diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index c75f7182f8c..3bdedeb187e 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -223,5 +223,85 @@ func (s *serverTestSuite) TestFullSyncWithAddMember(c *C) { c.Assert(err, IsNil) c.Assert(cluster.WaitLeader(), Equals, "pd2") loadRegions := pd2.GetServer().GetRaftCluster().GetRegions() +<<<<<<< HEAD c.Assert(len(loadRegions), Equals, regionLen) +======= + c.Assert(loadRegions, HasLen, regionLen) +} + +func (s *regionSyncerTestSuite) TestPrepareChecker(c *C) { + c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker", `return(true)`), IsNil) + defer failpoint.Disable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker") + cluster, err := tests.NewTestCluster(s.ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true }) + defer cluster.Destroy() + c.Assert(err, IsNil) + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + leaderServer := cluster.GetServer(cluster.GetLeader()) + c.Assert(leaderServer.BootstrapCluster(), IsNil) + rc := leaderServer.GetServer().GetRaftCluster() + c.Assert(rc, NotNil) + regionLen := 110 + regions := initRegions(regionLen) + for _, region := range regions { + err = rc.HandleRegionHeartbeat(region) + c.Assert(err, IsNil) + } + + // ensure flush to region storage + time.Sleep(3 * time.Second) + c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue) + + // join new PD + pd2, err := cluster.Join(s.ctx) + c.Assert(err, IsNil) + err = pd2.Run() + c.Assert(err, IsNil) + // waiting for synchronization to complete + time.Sleep(3 * time.Second) + err = cluster.ResignLeader() + c.Assert(err, IsNil) + c.Assert(cluster.WaitLeader(), Equals, "pd2") + leaderServer = cluster.GetServer(cluster.GetLeader()) + rc = leaderServer.GetServer().GetRaftCluster() + for _, region := range regions { + err = rc.HandleRegionHeartbeat(region) + c.Assert(err, IsNil) + } + time.Sleep(time.Second) + c.Assert(rc.IsPrepared(), IsTrue) +} + +func initRegions(regionLen int) []*core.RegionInfo { + allocator := &idAllocator{allocator: mockid.NewIDAllocator()} + regions := make([]*core.RegionInfo, 0, regionLen) + for i := 0; i < regionLen; i++ { + r := &metapb.Region{ + Id: allocator.alloc(), + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + StartKey: []byte{byte(i)}, + EndKey: []byte{byte(i + 1)}, + Peers: []*metapb.Peer{ + {Id: allocator.alloc(), StoreId: uint64(0)}, + {Id: allocator.alloc(), StoreId: uint64(0)}, + }, + } + region := core.NewRegionInfo(r, r.Peers[0]) + if i < regionLen/2 { + buckets := &metapb.Buckets{ + RegionId: r.Id, + Keys: [][]byte{r.StartKey, r.EndKey}, + Version: 1, + } + region.UpdateBuckets(buckets, region.GetBuckets()) + } + regions = append(regions, region) + } + return regions +>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875)) }