Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#4875
Browse files Browse the repository at this point in the history
close tikv#4769

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
rleungx authored and ti-chi-bot committed May 17, 2022
1 parent cbd89cf commit 82ecde4
Show file tree
Hide file tree
Showing 10 changed files with 900 additions and 1 deletion.
13 changes: 13 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,11 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
return nil
}

// IsPrepared return true if the prepare checker is ready.
func (c *RaftCluster) IsPrepared() bool {
return c.coordinator.prepareChecker.isPrepared()
}

var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
Expand Down Expand Up @@ -587,8 +592,13 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionEventCounter.WithLabelValues("update_cache").Inc()
}

<<<<<<< HEAD
if isNew {
c.prepareChecker.collect(region)
=======
if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
}

if c.regionStats != nil {
Expand Down Expand Up @@ -635,13 +645,16 @@ func (c *RaftCluster) updateStoreStatusLocked(id uint64) {
c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize)
}

<<<<<<< HEAD
//nolint:unused
func (c *RaftCluster) getClusterID() uint64 {
c.RLock()
defer c.RUnlock()
return c.meta.GetId()
}

=======
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error {
if c.storage != nil {
if err := c.storage.SaveMeta(meta); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ func (c *coordinator) drivePushOperator() {

func (c *coordinator) run() {
ticker := time.NewTicker(runSchedulerCheckInterval)
failpoint.Inject("changeCoordinatorTicker", func() {
ticker = time.NewTicker(100 * time.Millisecond)
})
defer ticker.Stop()
log.Info("coordinator starts to collect cluster information")
for {
Expand Down
9 changes: 9 additions & 0 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
func (s *testCoordinatorSuite) TestDispatch(c *C) {
tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.isPrepared = true }, nil, c)
defer cleanup()
<<<<<<< HEAD

=======
co.prepareChecker.prepared = true
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
// Transfer peer from store 4 to store 1.
c.Assert(tc.addRegionStore(4, 40), IsNil)
c.Assert(tc.addRegionStore(3, 30), IsNil)
Expand Down Expand Up @@ -290,8 +294,13 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run
if setCfg != nil {
setCfg(cfg)
}
<<<<<<< HEAD
tc := newTestCluster(opt)
hbStreams := mockhbstream.NewHeartbeatStreams(tc.getClusterID(), false /* need to run */)
=======
tc := newTestCluster(ctx, opt)
hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */)
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
if setTc != nil {
setTc(tc)
}
Expand Down
81 changes: 81 additions & 0 deletions server/cluster/prepare_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cluster

import (
"time"

"github.com/tikv/pd/pkg/syncutil"
"github.com/tikv/pd/server/core"
)

type prepareChecker struct {
syncutil.RWMutex
reactiveRegions map[uint64]int
start time.Time
sum int
prepared bool
}

func newPrepareChecker() *prepareChecker {
return &prepareChecker{
start: time.Now(),
reactiveRegions: make(map[uint64]int),
}
}

// Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration.
func (checker *prepareChecker) check(c *core.BasicCluster) bool {
checker.RLock()
defer checker.RUnlock()
if checker.prepared {
return true
}
if time.Since(checker.start) > collectTimeout {
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
if float64(c.GetRegionCount())*collectFactor > float64(checker.sum) {
return false
}
for _, store := range c.GetStores() {
if !store.IsPreparing() && !store.IsServing() {
continue
}
storeID := store.GetID()
// For each store, the number of active regions should be more than total region of the store * collectFactor
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
return false
}
}
checker.prepared = true
return true
}

func (checker *prepareChecker) collect(region *core.RegionInfo) {
checker.Lock()
defer checker.Unlock()
for _, p := range region.GetPeers() {
checker.reactiveRegions[p.GetStoreId()]++
}
checker.sum++
}

func (checker *prepareChecker) isPrepared() bool {
checker.RLock()
defer checker.RUnlock()
return checker.prepared
}
Loading

0 comments on commit 82ecde4

Please sign in to comment.