Skip to content

Commit

Permalink
scheduler: allow empty region to be scheduled and use a sperate toler…
Browse files Browse the repository at this point in the history
…ance config in scatter range scheduler (#4106) (#4117)

* schedulers: avoid recalculate tolerantRatio (#3745)

* schedulers: avoid recalculate tolerantRatio

Signed-off-by: disksing <[email protected]>

* fix test

Signed-off-by: disksing <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>

* scheduler: allow empty region to be scheduled and use a sperate tolerance config in scatter range scheduler (#4106)

* scheduler: allow empty region to be scheduled and use a sperate tolerance config in scatter range scheduler

Signed-off-by: lhy1024 <[email protected]>

* fix lint

Signed-off-by: lhy1024 <[email protected]>

* address comments

Signed-off-by: lhy1024 <[email protected]>

* fix lint

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>

Co-authored-by: disksing <[email protected]>
Co-authored-by: lhy1024 <[email protected]>
  • Loading branch information
3 people authored Sep 14, 2021
1 parent edfecb9 commit 2b8584e
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 45 deletions.
19 changes: 15 additions & 4 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,34 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
return stores[i].RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), iOp, -1) >
stores[j].RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), jOp, -1)
})

var allowBalanceEmptyRegion func(*core.RegionInfo) bool

switch cluster.(type) {
case *schedule.RangeCluster:
// allow empty region to be scheduled in range cluster
allowBalanceEmptyRegion = func(region *core.RegionInfo) bool { return true }
default:
allowBalanceEmptyRegion = opt.AllowBalanceEmptyRegion(cluster)
}

for _, plan.source = range stores {
retryLimit := s.retryQuota.GetLimit(plan.source)
for i := 0; i < retryLimit; i++ {
// Priority pick the region that has a pending peer.
// Pending region may means the disk is overload, remove the pending region firstly.
plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthAllowPending(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthAllowPending(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
if plan.region == nil {
// Then pick the region that has a follower in the source store.
plan.region = cluster.RandFollowerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
plan.region = cluster.RandFollowerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
}
if plan.region == nil {
// Then pick the region has the leader in the source store.
plan.region = cluster.RandLeaderRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
plan.region = cluster.RandLeaderRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
}
if plan.region == nil {
// Finally pick learner.
plan.region = cluster.RandLearnerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
plan.region = cluster.RandLearnerRegion(plan.SourceStoreID(), s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
}
if plan.region == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc()
Expand Down
63 changes: 39 additions & 24 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,22 +171,38 @@ func (s *testBalanceSuite) TestTolerantRatio(c *C) {
tbl := []struct {
ratio float64
kind core.ScheduleKind
expectTolerantResource func() int64
expectTolerantResource func(core.ScheduleKind) int64
}{
{0, core.ScheduleKind{Resource: core.LeaderKind, Policy: core.ByCount}, func() int64 { return int64(leaderTolerantSizeRatio) }},
{0, core.ScheduleKind{Resource: core.LeaderKind, Policy: core.BySize}, func() int64 { return int64(adjustTolerantRatio(tc) * float64(regionSize)) }},
{0, core.ScheduleKind{Resource: core.RegionKind, Policy: core.ByCount}, func() int64 { return int64(adjustTolerantRatio(tc) * float64(regionSize)) }},
{0, core.ScheduleKind{Resource: core.RegionKind, Policy: core.BySize}, func() int64 { return int64(adjustTolerantRatio(tc) * float64(regionSize)) }},
{10, core.ScheduleKind{Resource: core.LeaderKind, Policy: core.ByCount}, func() int64 { return int64(tc.GetScheduleConfig().TolerantSizeRatio) }},
{10, core.ScheduleKind{Resource: core.LeaderKind, Policy: core.BySize}, func() int64 { return int64(adjustTolerantRatio(tc) * float64(regionSize)) }},
{10, core.ScheduleKind{Resource: core.RegionKind, Policy: core.ByCount}, func() int64 { return int64(adjustTolerantRatio(tc) * float64(regionSize)) }},
{10, core.ScheduleKind{Resource: core.RegionKind, Policy: core.BySize}, func() int64 { return int64(adjustTolerantRatio(tc) * float64(regionSize)) }},
{0, core.ScheduleKind{Resource: core.LeaderKind, Policy: core.ByCount}, func(k core.ScheduleKind) int64 {
return int64(leaderTolerantSizeRatio)
}},
{0, core.ScheduleKind{Resource: core.LeaderKind, Policy: core.BySize}, func(k core.ScheduleKind) int64 {
return int64(adjustTolerantRatio(tc, k) * float64(regionSize))
}},
{0, core.ScheduleKind{Resource: core.RegionKind, Policy: core.ByCount}, func(k core.ScheduleKind) int64 {
return int64(adjustTolerantRatio(tc, k) * float64(regionSize))
}},
{0, core.ScheduleKind{Resource: core.RegionKind, Policy: core.BySize}, func(k core.ScheduleKind) int64 {
return int64(adjustTolerantRatio(tc, k) * float64(regionSize))
}},
{10, core.ScheduleKind{Resource: core.LeaderKind, Policy: core.ByCount}, func(k core.ScheduleKind) int64 {
return int64(tc.GetScheduleConfig().TolerantSizeRatio)
}},
{10, core.ScheduleKind{Resource: core.LeaderKind, Policy: core.BySize}, func(k core.ScheduleKind) int64 {
return int64(adjustTolerantRatio(tc, k) * float64(regionSize))
}},
{10, core.ScheduleKind{Resource: core.RegionKind, Policy: core.ByCount}, func(k core.ScheduleKind) int64 {
return int64(adjustTolerantRatio(tc, k) * float64(regionSize))
}},
{10, core.ScheduleKind{Resource: core.RegionKind, Policy: core.BySize}, func(k core.ScheduleKind) int64 {
return int64(adjustTolerantRatio(tc, k) * float64(regionSize))
}},
}
for i, t := range tbl {
tc.SetTolerantSizeRatio(t.ratio)
plan := newBalancePlan(t.kind, tc, operator.OpInfluence{})
plan.region = region
c.Assert(plan.getTolerantResource(), Equals, t.expectTolerantResource(), Commentf("case #%d", i+1))
c.Assert(plan.getTolerantResource(), Equals, t.expectTolerantResource(t.kind), Commentf("case #%d", i+1))
}
}

Expand Down Expand Up @@ -1036,28 +1052,29 @@ func (s *testRandomMergeSchedulerSuite) TestMerge(c *C) {
c.Assert(mb.IsScheduleAllowed(tc), IsFalse)
}

var _ = Suite(&testScatterRangeLeaderSuite{})
var _ = Suite(&testScatterRangeSuite{})

type testScatterRangeLeaderSuite struct {
type testScatterRangeSuite struct {
ctx context.Context
cancel context.CancelFunc
}

func (s *testScatterRangeLeaderSuite) SetUpSuite(c *C) {
func (s *testScatterRangeSuite) SetUpSuite(c *C) {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

func (s *testScatterRangeLeaderSuite) TearDownSuite(c *C) {
func (s *testScatterRangeSuite) TearDownSuite(c *C) {
s.cancel()
}

func (s *testScatterRangeLeaderSuite) TestBalance(c *C) {
func (s *testScatterRangeSuite) TestBalance(c *C) {
opt := config.NewTestOptions()
// TODO: enable palcementrules
opt.SetPlacementRuleEnabled(false)
tc := mockcluster.NewCluster(s.ctx, opt)
tc.DisableFeature(versioninfo.JointConsensus)
tc.SetTolerantSizeRatio(2.5)
// range cluster use a special tolerant ratio, cluster opt take no impact
tc.SetTolerantSizeRatio(10000)
// Add stores 1,2,3,4,5.
tc.AddRegionStore(1, 0)
tc.AddRegionStore(2, 0)
Expand All @@ -1082,17 +1099,16 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) {
})
id += 4
}
// empty case
// empty region case
regions[49].EndKey = []byte("")
for _, meta := range regions {
leader := rand.Intn(4) % 3
regionInfo := core.NewRegionInfo(
meta,
meta.Peers[leader],
core.SetApproximateKeys(96),
core.SetApproximateSize(96),
core.SetApproximateKeys(1),
core.SetApproximateSize(1),
)

tc.Regions.SetRegion(regionInfo)
}
for i := 0; i < 100; i++ {
Expand All @@ -1116,7 +1132,7 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) {
}
}

func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) {
func (s *testScatterRangeSuite) TestBalanceLeaderLimit(c *C) {
opt := config.NewTestOptions()
opt.SetPlacementRuleEnabled(false)
tc := mockcluster.NewCluster(s.ctx, opt)
Expand Down Expand Up @@ -1147,7 +1163,6 @@ func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) {
id += 4
}

// empty case
regions[49].EndKey = []byte("")
for _, meta := range regions {
leader := rand.Intn(4) % 3
Expand Down Expand Up @@ -1192,7 +1207,7 @@ func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) {
c.Check(maxLeaderCount-minLeaderCount, Greater, 10)
}

func (s *testScatterRangeLeaderSuite) TestConcurrencyUpdateConfig(c *C) {
func (s *testScatterRangeSuite) TestConcurrencyUpdateConfig(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(s.ctx, opt)
oc := schedule.NewOperatorController(s.ctx, nil, nil)
Expand All @@ -1218,7 +1233,7 @@ func (s *testScatterRangeLeaderSuite) TestConcurrencyUpdateConfig(c *C) {
ch <- struct{}{}
}

func (s *testScatterRangeLeaderSuite) TestBalanceWhenRegionNotHeartbeat(c *C) {
func (s *testScatterRangeSuite) TestBalanceWhenRegionNotHeartbeat(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(s.ctx, opt)
// Add stores 1,2,3.
Expand Down
44 changes: 27 additions & 17 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
"github.com/tikv/pd/server/statistics"
Expand All @@ -39,9 +40,10 @@ const (
)

type balancePlan struct {
kind core.ScheduleKind
cluster opt.Cluster
opInfluence operator.OpInfluence
kind core.ScheduleKind
cluster opt.Cluster
opInfluence operator.OpInfluence
tolerantSizeRatio float64

source *core.StoreInfo
target *core.StoreInfo
Expand All @@ -53,9 +55,10 @@ type balancePlan struct {

func newBalancePlan(kind core.ScheduleKind, cluster opt.Cluster, opInfluence operator.OpInfluence) *balancePlan {
return &balancePlan{
kind: kind,
cluster: cluster,
opInfluence: opInfluence,
kind: kind,
cluster: cluster,
opInfluence: opInfluence,
tolerantSizeRatio: adjustTolerantRatio(cluster, kind),
}
}

Expand Down Expand Up @@ -121,24 +124,31 @@ func (p *balancePlan) shouldBalance(scheduleName string) bool {

func (p *balancePlan) getTolerantResource() int64 {
if p.kind.Resource == core.LeaderKind && p.kind.Policy == core.ByCount {
tolerantSizeRatio := p.cluster.GetOpts().GetTolerantSizeRatio()
if tolerantSizeRatio == 0 {
tolerantSizeRatio = leaderTolerantSizeRatio
}
leaderCount := int64(1.0 * tolerantSizeRatio)
return leaderCount
return int64(p.tolerantSizeRatio)
}

regionSize := p.region.GetApproximateSize()
if regionSize < p.cluster.GetAverageRegionSize() {
regionSize = p.cluster.GetAverageRegionSize()
}
regionSize = int64(float64(regionSize) * adjustTolerantRatio(p.cluster))
return regionSize
return int64(float64(regionSize) * p.tolerantSizeRatio)
}

func adjustTolerantRatio(cluster opt.Cluster) float64 {
tolerantSizeRatio := cluster.GetOpts().GetTolerantSizeRatio()
func adjustTolerantRatio(cluster opt.Cluster, kind core.ScheduleKind) float64 {
var tolerantSizeRatio float64
switch c := cluster.(type) {
case *schedule.RangeCluster:
// range cluster use a separate configuration
tolerantSizeRatio = c.GetTolerantSizeRatio()
default:
tolerantSizeRatio = cluster.GetOpts().GetTolerantSizeRatio()
}
if kind.Resource == core.LeaderKind && kind.Policy == core.ByCount {
if tolerantSizeRatio == 0 {
return leaderTolerantSizeRatio
}
return tolerantSizeRatio
}

if tolerantSizeRatio == 0 {
var maxRegionCount float64
stores := cluster.GetStores()
Expand Down

0 comments on commit 2b8584e

Please sign in to comment.