Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#4875
Browse files Browse the repository at this point in the history
close tikv#4769

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
rleungx authored and ti-chi-bot committed May 17, 2022
1 parent b99630d commit dcabcac
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 3 deletions.
13 changes: 13 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,11 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
return nil
}

// IsPrepared return true if the prepare checker is ready.
func (c *RaftCluster) IsPrepared() bool {
return c.coordinator.prepareChecker.isPrepared()
}

var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
Expand Down Expand Up @@ -688,8 +693,13 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionEventCounter.WithLabelValues("update_cache").Inc()
}

<<<<<<< HEAD
if isNew {
c.prepareChecker.collect(region)
=======
if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
}

if c.regionStats != nil {
Expand Down Expand Up @@ -743,13 +753,16 @@ func (c *RaftCluster) updateStoreStatusLocked(id uint64) {
c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize)
}

<<<<<<< HEAD
//nolint:unused
func (c *RaftCluster) getClusterID() uint64 {
c.RLock()
defer c.RUnlock()
return c.meta.GetId()
}

=======
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error {
if c.storage != nil {
if err := c.storage.SaveMeta(meta); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ func (c *coordinator) drivePushOperator() {

func (c *coordinator) run() {
ticker := time.NewTicker(runSchedulerCheckInterval)
failpoint.Inject("changeCoordinatorTicker", func() {
ticker = time.NewTicker(100 * time.Millisecond)
})
defer ticker.Stop()
log.Info("coordinator starts to collect cluster information")
for {
Expand Down
6 changes: 5 additions & 1 deletion server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
func (s *testCoordinatorSuite) TestDispatch(c *C) {
tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.isPrepared = true }, nil, c)
defer cleanup()
<<<<<<< HEAD

=======
co.prepareChecker.prepared = true
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
// Transfer peer from store 4 to store 1.
c.Assert(tc.addRegionStore(4, 40), IsNil)
c.Assert(tc.addRegionStore(3, 30), IsNil)
Expand Down Expand Up @@ -285,7 +289,7 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run
setCfg(cfg)
}
tc := newTestCluster(ctx, opt)
hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.getClusterID(), tc, true /* need to run */)
hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */)
if setTc != nil {
setTc(tc)
}
Expand Down
81 changes: 81 additions & 0 deletions server/cluster/prepare_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cluster

import (
"time"

"github.com/tikv/pd/pkg/syncutil"
"github.com/tikv/pd/server/core"
)

type prepareChecker struct {
syncutil.RWMutex
reactiveRegions map[uint64]int
start time.Time
sum int
prepared bool
}

func newPrepareChecker() *prepareChecker {
return &prepareChecker{
start: time.Now(),
reactiveRegions: make(map[uint64]int),
}
}

// Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration.
func (checker *prepareChecker) check(c *core.BasicCluster) bool {
checker.RLock()
defer checker.RUnlock()
if checker.prepared {
return true
}
if time.Since(checker.start) > collectTimeout {
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
if float64(c.GetRegionCount())*collectFactor > float64(checker.sum) {
return false
}
for _, store := range c.GetStores() {
if !store.IsPreparing() && !store.IsServing() {
continue
}
storeID := store.GetID()
// For each store, the number of active regions should be more than total region of the store * collectFactor
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
return false
}
}
checker.prepared = true
return true
}

func (checker *prepareChecker) collect(region *core.RegionInfo) {
checker.Lock()
defer checker.Unlock()
for _, p := range region.GetPeers() {
checker.reactiveRegions[p.GetStoreId()]++
}
checker.sum++
}

func (checker *prepareChecker) isPrepared() bool {
checker.RLock()
defer checker.RUnlock()
return checker.prepared
}
67 changes: 67 additions & 0 deletions server/cluster/unsafe_recovery_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,13 @@ func (s *testUnsafeRecoverSuite) TestReportCollection(c *C) {

func (s *testUnsafeRecoverSuite) TestPlanExecution(c *C) {
_, opt, _ := newTestScheduleConfig()
<<<<<<< HEAD
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())
=======
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
// Manually fill the coordinator up to allow calling on cluster.PauseOrResumeSchedulers().
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true))
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
recoveryController := newUnsafeRecoveryController(cluster)
recoveryController.stage = recovering
recoveryController.failedStores = map[uint64]string{
Expand Down Expand Up @@ -617,3 +623,64 @@ func (s *testUnsafeRecoverSuite) TestPlanExecution(c *C) {
c.Assert(recoveryController.numStoresPlanExecuted, Equals, 2)
c.Assert(recoveryController.stage, Equals, finished)
}
<<<<<<< HEAD
=======

func (s *testUnsafeRecoverSuite) TestRemoveFailedStores(c *C) {
_, opt, _ := newTestScheduleConfig()
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true))
cluster.coordinator.run()
stores := newTestStores(2, "5.3.0")
stores[1] = stores[1].Clone(core.SetLastHeartbeatTS(time.Now()))
for _, store := range stores {
c.Assert(cluster.PutStore(store.GetMeta()), IsNil)
}
recoveryController := newUnsafeRecoveryController(cluster)
failedStores := map[uint64]string{
1: "",
3: "",
}

c.Assert(recoveryController.RemoveFailedStores(failedStores), IsNil)
c.Assert(cluster.GetStore(uint64(1)).IsRemoved(), IsTrue)
for _, s := range cluster.GetSchedulers() {
paused, err := cluster.IsSchedulerAllowed(s)
c.Assert(err, IsNil)
c.Assert(paused, IsTrue)
}

// Store 2's last heartbeat is recent, and is not allowed to be removed.
failedStores = map[uint64]string{
2: "",
}

c.Assert(recoveryController.RemoveFailedStores(failedStores), NotNil)
}

func (s *testUnsafeRecoverSuite) TestSplitPaused(c *C) {
_, opt, _ := newTestScheduleConfig()
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true))
cluster.coordinator.run()
stores := newTestStores(2, "5.3.0")
stores[1] = stores[1].Clone(core.SetLastHeartbeatTS(time.Now()))
for _, store := range stores {
c.Assert(cluster.PutStore(store.GetMeta()), IsNil)
}
recoveryController := newUnsafeRecoveryController(cluster)
cluster.Lock()
cluster.unsafeRecoveryController = recoveryController
cluster.Unlock()
failedStores := map[uint64]string{
1: "",
}
c.Assert(recoveryController.RemoveFailedStores(failedStores), IsNil)
askSplitReq := &pdpb.AskSplitRequest{}
_, err := cluster.HandleAskSplit(askSplitReq)
c.Assert(err.Error(), Equals, "[PD:unsaferecovery:ErrUnsafeRecoveryIsRunning]unsafe recovery is running")
askBatchSplitReq := &pdpb.AskBatchSplitRequest{}
_, err = cluster.HandleAskBatchSplit(askBatchSplitReq)
c.Assert(err.Error(), Equals, "[PD:unsaferecovery:ErrUnsafeRecoveryIsRunning]unsafe recovery is running")
}
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
14 changes: 14 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ type RegionInfo struct {
replicationStatus *replication_modepb.RegionReplicationStatus
QueryStats *pdpb.QueryStats
flowRoundDivisor uint64
<<<<<<< HEAD
=======
// buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version.
buckets unsafe.Pointer
fromHeartbeat bool
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -493,6 +499,11 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio
return r.replicationStatus
}

// IsFromHeartbeat returns whether the region info is from the region heartbeat.
func (r *RegionInfo) IsFromHeartbeat() bool {
return r.fromHeartbeat
}

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)
Expand Down Expand Up @@ -577,6 +588,9 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
saveCache = true
}
if !origin.IsFromHeartbeat() {
isNew = true
}
}
return
}
Expand Down
7 changes: 7 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption {
region.interval = interval
}
}

// SetFromHeartbeat sets if the region info comes from the region heartbeat.
func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption {
return func(region *RegionInfo) {
region.fromHeartbeat = fromHeartbeat
}
}
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request, flowRoundOption)
region := core.RegionFromHeartbeat(request, flowRoundOption, core.SetFromHeartbeat(true))
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down
3 changes: 2 additions & 1 deletion server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
core.SetWrittenKeys(stats[i].KeysWritten),
core.SetReadBytes(stats[i].BytesRead),
core.SetReadKeys(stats[i].KeysRead),
core.SetFromHeartbeat(false),
)
} else {
region = core.NewRegionInfo(r, regionLeader)
region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false))
}

origin, err := bc.PreCheckPutRegion(region)
Expand Down
45 changes: 45 additions & 0 deletions tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,51 @@ func (s *regionSyncerTestSuite) TestFullSyncWithAddMember(c *C) {
c.Assert(loadRegions, HasLen, regionLen)
}

func (s *regionSyncerTestSuite) TestPrepareChecker(c *C) {
c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker", `return(true)`), IsNil)
defer failpoint.Disable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker")
cluster, err := tests.NewTestCluster(s.ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true })
defer cluster.Destroy()
c.Assert(err, IsNil)

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer := cluster.GetServer(cluster.GetLeader())
c.Assert(leaderServer.BootstrapCluster(), IsNil)
rc := leaderServer.GetServer().GetRaftCluster()
c.Assert(rc, NotNil)
regionLen := 110
regions := initRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}

// ensure flush to region storage
time.Sleep(3 * time.Second)
c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue)

// join new PD
pd2, err := cluster.Join(s.ctx)
c.Assert(err, IsNil)
err = pd2.Run()
c.Assert(err, IsNil)
// waiting for synchronization to complete
time.Sleep(3 * time.Second)
err = cluster.ResignLeader()
c.Assert(err, IsNil)
c.Assert(cluster.WaitLeader(), Equals, "pd2")
leaderServer = cluster.GetServer(cluster.GetLeader())
rc = leaderServer.GetServer().GetRaftCluster()
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}
time.Sleep(time.Second)
c.Assert(rc.IsPrepared(), IsTrue)
}

func initRegions(regionLen int) []*core.RegionInfo {
allocator := &idAllocator{allocator: mockid.NewIDAllocator()}
regions := make([]*core.RegionInfo, 0, regionLen)
Expand Down

0 comments on commit dcabcac

Please sign in to comment.