Skip to content

Commit

Permalink
scheduler: allow empty region to be scheduled and use a sperate toler…
Browse files Browse the repository at this point in the history
…ance config in scatter range scheduler (#4106)

* scheduler: allow empty region to be scheduled and use a sperate tolerance config in scatter range scheduler

Signed-off-by: lhy1024 <[email protected]>

* fix lint

Signed-off-by: lhy1024 <[email protected]>

* address comments

Signed-off-by: lhy1024 <[email protected]>

* fix lint

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
lhy1024 and ti-chi-bot committed Sep 14, 2021
1 parent 9420551 commit ff080cc
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 19 deletions.
19 changes: 15 additions & 4 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,34 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
return stores[i].RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), iOp, -1) >
stores[j].RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), jOp, -1)
})

var allowBalanceEmptyRegion func(*core.RegionInfo) bool

switch cluster.(type) {
case *schedule.RangeCluster:
// allow empty region to be scheduled in range cluster
allowBalanceEmptyRegion = func(region *core.RegionInfo) bool { return true }
default:
allowBalanceEmptyRegion = opt.AllowBalanceEmptyRegion(cluster)
}

for _, plan.source = range stores {
retryLimit := s.retryQuota.GetLimit(plan.source)
for i := 0; i < retryLimit; i++ {
// Priority pick the region that has a pending peer.
// Pending region may means the disk is overload, remove the pending region firstly.
plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthAllowPending(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthAllowPending(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
if plan.region == nil {
// Then pick the region that has a follower in the source store.
plan.region = cluster.RandFollowerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
plan.region = cluster.RandFollowerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
}
if plan.region == nil {
// Then pick the region has the leader in the source store.
plan.region = cluster.RandLeaderRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
plan.region = cluster.RandLeaderRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
}
if plan.region == nil {
// Finally pick learner.
plan.region = cluster.RandLearnerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
plan.region = cluster.RandLearnerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
}
if plan.region == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc()
Expand Down
27 changes: 13 additions & 14 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,28 +1052,29 @@ func (s *testRandomMergeSchedulerSuite) TestMerge(c *C) {
c.Assert(mb.IsScheduleAllowed(tc), IsFalse)
}

var _ = Suite(&testScatterRangeLeaderSuite{})
var _ = Suite(&testScatterRangeSuite{})

type testScatterRangeLeaderSuite struct {
type testScatterRangeSuite struct {
ctx context.Context
cancel context.CancelFunc
}

func (s *testScatterRangeLeaderSuite) SetUpSuite(c *C) {
func (s *testScatterRangeSuite) SetUpSuite(c *C) {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

func (s *testScatterRangeLeaderSuite) TearDownSuite(c *C) {
func (s *testScatterRangeSuite) TearDownSuite(c *C) {
s.cancel()
}

func (s *testScatterRangeLeaderSuite) TestBalance(c *C) {
func (s *testScatterRangeSuite) TestBalance(c *C) {
opt := config.NewTestOptions()
// TODO: enable palcementrules
opt.SetPlacementRuleEnabled(false)
tc := mockcluster.NewCluster(s.ctx, opt)
tc.DisableFeature(versioninfo.JointConsensus)
tc.SetTolerantSizeRatio(2.5)
// range cluster use a special tolerant ratio, cluster opt take no impact
tc.SetTolerantSizeRatio(10000)
// Add stores 1,2,3,4,5.
tc.AddRegionStore(1, 0)
tc.AddRegionStore(2, 0)
Expand All @@ -1098,17 +1099,16 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) {
})
id += 4
}
// empty case
// empty region case
regions[49].EndKey = []byte("")
for _, meta := range regions {
leader := rand.Intn(4) % 3
regionInfo := core.NewRegionInfo(
meta,
meta.Peers[leader],
core.SetApproximateKeys(96),
core.SetApproximateSize(96),
core.SetApproximateKeys(1),
core.SetApproximateSize(1),
)

tc.Regions.SetRegion(regionInfo)
}
for i := 0; i < 100; i++ {
Expand All @@ -1132,7 +1132,7 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) {
}
}

func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) {
func (s *testScatterRangeSuite) TestBalanceLeaderLimit(c *C) {
opt := config.NewTestOptions()
opt.SetPlacementRuleEnabled(false)
tc := mockcluster.NewCluster(s.ctx, opt)
Expand Down Expand Up @@ -1163,7 +1163,6 @@ func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) {
id += 4
}

// empty case
regions[49].EndKey = []byte("")
for _, meta := range regions {
leader := rand.Intn(4) % 3
Expand Down Expand Up @@ -1208,7 +1207,7 @@ func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) {
c.Check(maxLeaderCount-minLeaderCount, Greater, 10)
}

func (s *testScatterRangeLeaderSuite) TestConcurrencyUpdateConfig(c *C) {
func (s *testScatterRangeSuite) TestConcurrencyUpdateConfig(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(s.ctx, opt)
oc := schedule.NewOperatorController(s.ctx, nil, nil)
Expand All @@ -1234,7 +1233,7 @@ func (s *testScatterRangeLeaderSuite) TestConcurrencyUpdateConfig(c *C) {
ch <- struct{}{}
}

func (s *testScatterRangeLeaderSuite) TestBalanceWhenRegionNotHeartbeat(c *C) {
func (s *testScatterRangeSuite) TestBalanceWhenRegionNotHeartbeat(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(s.ctx, opt)
// Add stores 1,2,3.
Expand Down
10 changes: 9 additions & 1 deletion server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
"github.com/tikv/pd/server/statistics"
Expand Down Expand Up @@ -133,7 +134,14 @@ func (p *balancePlan) getTolerantResource() int64 {
}

func adjustTolerantRatio(cluster opt.Cluster, kind core.ScheduleKind) float64 {
tolerantSizeRatio := cluster.GetOpts().GetTolerantSizeRatio()
var tolerantSizeRatio float64
switch c := cluster.(type) {
case *schedule.RangeCluster:
// range cluster use a separate configuration
tolerantSizeRatio = c.GetTolerantSizeRatio()
default:
tolerantSizeRatio = cluster.GetOpts().GetTolerantSizeRatio()
}
if kind.Resource == core.LeaderKind && kind.Policy == core.ByCount {
if tolerantSizeRatio == 0 {
return leaderTolerantSizeRatio
Expand Down

0 comments on commit ff080cc

Please sign in to comment.