Skip to content

Commit

Permalink
*: fix scheduling can not immediately start after transfer leader (#4875
Browse files Browse the repository at this point in the history
) (#4973)

close #4769, ref #4875

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: Ryan Leung <[email protected]>
  • Loading branch information
ti-chi-bot and rleungx authored Sep 20, 2022
1 parent 55142dc commit 9f14d0a
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 22 deletions.
42 changes: 27 additions & 15 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,11 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
return nil
}

// IsPrepared return true if the prepare checker is ready.
func (c *RaftCluster) IsPrepared() bool {
return c.prepareChecker.isPrepared()
}

var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
Expand Down Expand Up @@ -608,7 +613,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if isNew {
if !c.IsPrepared() && isNew {
c.prepareChecker.collect(region)
}

Expand Down Expand Up @@ -656,7 +661,6 @@ func (c *RaftCluster) updateStoreStatusLocked(id uint64) {
c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize)
}

//nolint:unused
func (c *RaftCluster) getClusterID() uint64 {
c.RLock()
defer c.RUnlock()
Expand Down Expand Up @@ -1381,13 +1385,6 @@ func (c *RaftCluster) GetComponentManager() *component.Manager {
return c.componentManager
}

// isPrepared if the cluster information is collected
func (c *RaftCluster) isPrepared() bool {
c.RLock()
defer c.RUnlock()
return c.prepareChecker.check(c)
}

// GetStoresLoads returns load stats of all stores.
func (c *RaftCluster) GetStoresLoads() map[uint64][]float64 {
c.RLock()
Expand Down Expand Up @@ -1447,10 +1444,11 @@ func (c *RaftCluster) FitRegion(region *core.RegionInfo) *placement.RegionFit {
}

type prepareChecker struct {
sync.RWMutex
reactiveRegions map[uint64]int
start time.Time
sum int
isPrepared bool
prepared bool
}

func newPrepareChecker() *prepareChecker {
Expand All @@ -1461,12 +1459,18 @@ func newPrepareChecker() *prepareChecker {
}

// Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration.
func (checker *prepareChecker) check(c *RaftCluster) bool {
if checker.isPrepared || time.Since(checker.start) > collectTimeout {
func (checker *prepareChecker) check(c *core.BasicCluster) bool {
checker.Lock()
defer checker.Unlock()
if checker.prepared {
return true
}
if time.Since(checker.start) > collectTimeout {
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
if float64(c.core.GetRegionCount())*collectFactor > float64(checker.sum) {
if float64(c.GetRegionCount())*collectFactor > float64(checker.sum) {
return false
}
for _, store := range c.GetStores() {
Expand All @@ -1475,21 +1479,29 @@ func (checker *prepareChecker) check(c *RaftCluster) bool {
}
storeID := store.GetID()
// For each store, the number of active regions should be more than total region of the store * collectFactor
if float64(c.core.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
return false
}
}
checker.isPrepared = true
checker.prepared = true
return true
}

func (checker *prepareChecker) collect(region *core.RegionInfo) {
checker.Lock()
defer checker.Unlock()
for _, p := range region.GetPeers() {
checker.reactiveRegions[p.GetStoreId()]++
}
checker.sum++
}

func (checker *prepareChecker) isPrepared() bool {
checker.RLock()
defer checker.RUnlock()
return checker.prepared
}

// GetHotWriteRegions gets hot write regions' info.
func (c *RaftCluster) GetHotWriteRegions() *statistics.StoreHotPeersInfos {
c.RLock()
Expand Down
5 changes: 4 additions & 1 deletion server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ func (c *coordinator) drivePushOperator() {

func (c *coordinator) run() {
ticker := time.NewTicker(runSchedulerCheckInterval)
failpoint.Inject("changeCoordinatorTicker", func() {
ticker = time.NewTicker(100 * time.Millisecond)
})
defer ticker.Stop()
log.Info("coordinator starts to collect cluster information")
for {
Expand Down Expand Up @@ -569,7 +572,7 @@ func (c *coordinator) resetHotSpotMetrics() {
}

func (c *coordinator) shouldRun() bool {
return c.cluster.isPrepared()
return c.cluster.prepareChecker.check(c.cluster.core)
}

func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string) error {
Expand Down
3 changes: 1 addition & 2 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,8 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
}

func (s *testCoordinatorSuite) TestDispatch(c *C) {
tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.isPrepared = true }, nil, c)
tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.prepared = true }, nil, c)
defer cleanup()

// Transfer peer from store 4 to store 1.
c.Assert(tc.addRegionStore(4, 40), IsNil)
c.Assert(tc.addRegionStore(3, 30), IsNil)
Expand Down
15 changes: 14 additions & 1 deletion server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type RegionInfo struct {
approximateKeys int64
interval *pdpb.TimeInterval
replicationStatus *replication_modepb.RegionReplicationStatus
fromHeartbeat bool
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -98,7 +99,7 @@ const (
)

// RegionFromHeartbeat constructs a Region from region heartbeat.
func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest) *RegionInfo {
func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
// Convert unit to MB.
// If region is empty or less than 1MB, use 1MB instead.
regionSize := heartbeat.GetApproximateSize() / (1 << 20)
Expand All @@ -122,6 +123,10 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest) *RegionInfo {
replicationStatus: heartbeat.GetReplicationStatus(),
}

for _, opt := range opts {
opt(region)
}

if region.writtenKeys >= ImpossibleFlowSize || region.writtenBytes >= ImpossibleFlowSize {
region.writtenKeys = 0
region.writtenBytes = 0
Expand Down Expand Up @@ -439,6 +444,11 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio
return r.replicationStatus
}

// IsFromHeartbeat returns whether the region info is from the region heartbeat.
func (r *RegionInfo) IsFromHeartbeat() bool {
return r.fromHeartbeat
}

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo, traceRegionFlow bool) (isNew, saveKV, saveCache, needSync bool)
Expand Down Expand Up @@ -523,6 +533,9 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
saveCache = true
}
if !origin.IsFromHeartbeat() {
isNew = true
}
}
return
}
Expand Down
7 changes: 7 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,10 @@ func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption {
region.interval = interval
}
}

// SetFromHeartbeat sets if the region info comes from the region heartbeat.
func SetFromHeartbeat(fromHeartbeat bool) RegionCreateOption {
return func(region *RegionInfo) {
region.fromHeartbeat = fromHeartbeat
}
}
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request)
region := core.RegionFromHeartbeat(request, core.SetFromHeartbeat(true))
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down
3 changes: 2 additions & 1 deletion server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
core.SetWrittenKeys(stats[i].KeysWritten),
core.SetReadBytes(stats[i].BytesRead),
core.SetReadKeys(stats[i].KeysRead),
core.SetFromHeartbeat(false),
)
} else {
region = core.NewRegionInfo(r, regionLeader)
region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false))
}

origin, err := bc.PreCheckPutRegion(region)
Expand Down
70 changes: 69 additions & 1 deletion tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/testutil"
Expand Down Expand Up @@ -223,5 +224,72 @@ func (s *serverTestSuite) TestFullSyncWithAddMember(c *C) {
c.Assert(err, IsNil)
c.Assert(cluster.WaitLeader(), Equals, "pd2")
loadRegions := pd2.GetServer().GetRaftCluster().GetRegions()
c.Assert(len(loadRegions), Equals, regionLen)
c.Assert(loadRegions, HasLen, regionLen)
}

func (s *serverTestSuite) TestPrepareChecker(c *C) {
c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker", `return(true)`), IsNil)
defer failpoint.Disable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker")
cluster, err := tests.NewTestCluster(s.ctx, 1, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true })
defer cluster.Destroy()
c.Assert(err, IsNil)

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer := cluster.GetServer(cluster.GetLeader())
c.Assert(leaderServer.BootstrapCluster(), IsNil)
rc := leaderServer.GetServer().GetRaftCluster()
c.Assert(rc, NotNil)
regionLen := 110
regions := initRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}

// ensure flush to region storage
time.Sleep(3 * time.Second)
c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue)

// join new PD
pd2, err := cluster.Join(s.ctx)
c.Assert(err, IsNil)
err = pd2.Run()
c.Assert(err, IsNil)
// waiting for synchronization to complete
time.Sleep(3 * time.Second)
err = cluster.ResignLeader()
c.Assert(err, IsNil)
c.Assert(cluster.WaitLeader(), Equals, "pd2")
leaderServer = cluster.GetServer(cluster.GetLeader())
rc = leaderServer.GetServer().GetRaftCluster()
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}
time.Sleep(time.Second)
c.Assert(rc.IsPrepared(), IsTrue)
}

func initRegions(regionLen int) []*core.RegionInfo {
allocator := &idAllocator{allocator: mockid.NewIDAllocator()}
regions := make([]*core.RegionInfo, 0, regionLen)
for i := 0; i < regionLen; i++ {
r := &metapb.Region{
Id: allocator.alloc(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
Peers: []*metapb.Peer{
{Id: allocator.alloc(), StoreId: uint64(0)},
{Id: allocator.alloc(), StoreId: uint64(0)},
},
}
regions = append(regions, core.NewRegionInfo(r, r.Peers[0]))
}
return regions
}

0 comments on commit 9f14d0a

Please sign in to comment.