Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: fix policy with write-leader in hot scheduler #5526

Merged
merged 2 commits into from
Sep 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 35 additions & 16 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ type balanceSolver struct {
minorDecRatio float64
maxPeerNum int
minHotDegree int

checkByPriorityAndTolerance func(loads []float64, f func(int) bool) bool
}

func (bs *balanceSolver) init() {
Expand Down Expand Up @@ -460,6 +462,18 @@ func (bs *balanceSolver) init() {
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetOpts().GetHotRegionCacheHitsThreshold()
bs.pickCheckPolicy()
}

func (bs *balanceSolver) pickCheckPolicy() {
switch {
case bs.resourceTy == writeLeader:
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly
case bs.sche.conf.IsStrictPickingStoreEnabled():
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceAllOf
default:
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceAnyOf
}
}

func (bs *balanceSolver) isSelectedDim(dim int) bool {
Expand All @@ -483,14 +497,14 @@ func (bs *balanceSolver) getPriorities() []string {
}

func newBalanceSolver(sche *hotScheduler, cluster schedule.Cluster, rwTy statistics.RWType, opTy opType) *balanceSolver {
solver := &balanceSolver{
bs := &balanceSolver{
Cluster: cluster,
sche: sche,
rwTy: rwTy,
opTy: opTy,
}
solver.init()
return solver
bs.init()
return bs
}

func (bs *balanceSolver) isValid() bool {
Expand Down Expand Up @@ -688,7 +702,7 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai
continue
}

if bs.checkSrcByDimPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) {
if bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
Expand All @@ -698,7 +712,7 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai
return ret
}

func (bs *balanceSolver) checkSrcByDimPriorityAndTolerance(minLoad, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool {
func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool {
return bs.checkByPriorityAndTolerance(minLoad.Loads, func(i int) bool {
return minLoad.Loads[i] > toleranceRatio*expectLoad.Loads[i]
})
Expand Down Expand Up @@ -919,23 +933,28 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist
})
}

func (bs *balanceSolver) checkByPriorityAndTolerance(s interface{}, p func(int) bool) bool {
if bs.sche.conf.IsStrictPickingStoreEnabled() {
return slice.AllOf(s, func(i int) bool {
if bs.isSelectedDim(i) {
return p(i)
}
return true
})
}
return slice.AnyOf(s, func(i int) bool {
func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f func(int) bool) bool {
return slice.AllOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
return p(i)
return f(i)
}
return true
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f func(int) bool) bool {
return slice.AnyOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
return f(i)
}
return false
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceFirstOnly(loads []float64, f func(int) bool) bool {
return f(bs.firstPriority)
}

func (bs *balanceSolver) isUniformFirstPriority(store *statistics.StoreLoadDetail) bool {
// first priority should be more uniform than second priority
return store.IsUniform(bs.firstPriority, stddevThreshold*0.5)
Expand Down
223 changes: 223 additions & 0 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2598,3 +2598,226 @@ func TestMaxZombieDuration(t *testing.T) {
re.Equal(time.Duration(testCase.maxZombieDur)*time.Second, bs.calcMaxZombieDur())
}
}

type expectTestCase struct {
strict bool
isSrc bool
allow bool
toleranceRatio float64
rs resourceType
load *statistics.StoreLoad
expect *statistics.StoreLoad
}

func TestExpect(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
hb, err := schedule.CreateScheduler(HotRegionType, schedule.NewOperatorController(ctx, tc, nil), storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder("hot-region", nil))
re.NoError(err)
testCases := []expectTestCase{
// test src, it will be allowed when loads are higher than expect
{
strict: true, // all of
load: &statistics.StoreLoad{ // all dims are higher than expect, allow schedule
Loads: []float64{2.0, 2.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: true,
allow: true,
},
{
strict: true, // all of
load: &statistics.StoreLoad{ // all dims are higher than expect, but lower than expect*toleranceRatio, not allow schedule
Loads: []float64{2.0, 2.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: true,
toleranceRatio: 2.2,
allow: false,
},
{
strict: true, // all of
load: &statistics.StoreLoad{ // only queryDim is lower, but the dim is no selected, allow schedule
Loads: []float64{2.0, 2.0, 1.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: true,
allow: true,
},
{
strict: true, // all of
load: &statistics.StoreLoad{ // only keyDim is lower, and the dim is selected, not allow schedule
Loads: []float64{2.0, 1.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: true,
allow: false,
},
{
strict: false, // any of
load: &statistics.StoreLoad{ // keyDim is higher, and the dim is selected, allow schedule
Loads: []float64{1.0, 2.0, 1.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: true,
allow: true,
},
{
strict: false, // any of
load: &statistics.StoreLoad{ // although queryDim is higher, the dim is no selected, not allow schedule
Loads: []float64{1.0, 1.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: true,
allow: false,
},
{
strict: false, // any of
load: &statistics.StoreLoad{ // all dims are lower than expect, not allow schedule
Loads: []float64{1.0, 1.0, 1.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: true,
allow: false,
},
{
strict: true,
rs: writeLeader,
load: &statistics.StoreLoad{ // only keyDim is higher, but write leader only consider the first priority
Loads: []float64{1.0, 2.0, 1.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: true,
allow: true,
},
// test dst, it will be allowed when loads are lower than expect
{
strict: true, // all of
load: &statistics.StoreLoad{ // all dims are lower than expect, allow schedule
Loads: []float64{1.0, 1.0, 1.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: false,
allow: true,
},
{
strict: true, // all of
load: &statistics.StoreLoad{ // all dims are lower than expect, but higher than expect*toleranceRatio, not allow schedule
Loads: []float64{1.0, 1.0, 1.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: false,
toleranceRatio: 2.0,
allow: false,
},
{
strict: true, // all of
load: &statistics.StoreLoad{ // although queryDim is higher, the dim is no selected, allow schedule
Loads: []float64{1.0, 1.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: false,
allow: true,
},
{
strict: true, // all of
load: &statistics.StoreLoad{ // byteDim is higher, and the dim is selected, not allow schedule
Loads: []float64{2.0, 1.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: false,
allow: false,
},
{
strict: false, // any of
load: &statistics.StoreLoad{ // keyDim is lower, the dim is selected, allow schedule
Loads: []float64{2.0, 1.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: false,
allow: true,
},
{
strict: false, // any of
load: &statistics.StoreLoad{ // although queryDim is lower, the dim is no selected, not allow schedule
Loads: []float64{2.0, 2.0, 1.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: false,
allow: false,
},
{
strict: false, // any of
load: &statistics.StoreLoad{ // all dims are higher than expect, not allow schedule
Loads: []float64{2.0, 2.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{1.0, 1.0, 1.0},
},
isSrc: false,
allow: false,
},
{
strict: true,
rs: writeLeader,
load: &statistics.StoreLoad{ // only keyDim is lower, but write leader only consider the first priority
Loads: []float64{2.0, 1.0, 2.0},
},
expect: &statistics.StoreLoad{
Loads: []float64{2.0, 2.0, 2.0},
},
isSrc: false,
allow: true,
},
}
for _, testCase := range testCases {
toleranceRatio := testCase.toleranceRatio
if toleranceRatio == 0.0 {
toleranceRatio = 1.0 // default for test case
}
bs := &balanceSolver{
sche: hb.(*hotScheduler),
firstPriority: statistics.KeyDim,
secondPriority: statistics.ByteDim,
resourceTy: testCase.rs,
}
bs.sche.conf.StrictPickingStore = testCase.strict
bs.pickCheckPolicy()
if testCase.isSrc {
re.Equal(testCase.allow, bs.checkSrcByPriorityAndTolerance(testCase.load, testCase.expect, toleranceRatio))
} else {
re.Equal(testCase.allow, bs.checkDstByPriorityAndTolerance(testCase.load, testCase.expect, toleranceRatio))
}
}
}