Skip to content

Commit

Permalink
add uniform filter
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Aug 3, 2022
1 parent ed6e4b8 commit 8e652c5
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 12 deletions.
35 changes: 23 additions & 12 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,24 @@ func (bs *balanceSolver) isValid() bool {
return true
}

func (bs *balanceSolver) filterUniformStore() (string, bool) {
// Because region is available for src and dst, so stddev is the same for both, only need to calcurate one.
isUniformFirstPriority, isUniformSecondPriority := bs.isUniformFirstPriority(bs.cur.srcStore), bs.isUniformSecondPriority(bs.cur.srcStore)
if isUniformFirstPriority && isUniformSecondPriority {
// If both dims are enough uniform, any schedule is unnecessary.
return "all-dim", true
}
if isUniformFirstPriority && (bs.cur.progressiveRank == -1 || bs.cur.progressiveRank == -3) {
// If first priority dim is enough uniform, -1 is unnecessary and maybe lead to worse balance for second priority dim
return dimToString(bs.firstPriority), true
}
if isUniformSecondPriority && bs.cur.progressiveRank == -2 {
// If second priority dim is enough uniform, -2 is unnecessary and maybe lead to worse balance for first priority dim
return dimToString(bs.secondPriority), true
}
return "", false
}

// solve travels all the src stores, hot peers, dst stores and select each one of them to make a best scheduling solution.
// The comparing between solutions is based on calcProgressiveRank.
func (bs *balanceSolver) solve() []*operator.Operator {
Expand All @@ -515,11 +533,9 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}

bs.cur = &solution{}
tryUpdateBestSolution := func(isUniformFirstPriority bool) {
if bs.cur.progressiveRank == -1 && isUniformFirstPriority {
// Because region is available for src and dst, so stddev is the same for both, only need to calcurate one.
// If first priority dim is enough uniform, -1 is unnecessary and maybe lead to worse balance for second priority dim
hotSchedulerResultCounter.WithLabelValues("skip-uniform-store", strconv.FormatUint(bs.cur.dstStore.GetID(), 10)).Inc()
tryUpdateBestSolution := func() {
if label, ok := bs.filterUniformStore(); ok {
schedulerCounter.WithLabelValues(bs.sche.GetName(), fmt.Sprintf("%s-skip-%s-uniform-store", bs.rwTy.String(), label)).Inc()
return
}
if bs.cur.isAvailable() && bs.betterThan(bs.best) {
Expand Down Expand Up @@ -547,11 +563,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for _, srcStore := range bs.filterSrcStores() {
bs.cur.srcStore = srcStore
srcStoreID := srcStore.GetID()
isUniformFirstPriority, isUniformSecondPriority := bs.isUniformFirstPriority(srcStore), bs.isUniformSecondPriority(srcStore)
if isUniformFirstPriority && isUniformSecondPriority {
hotSchedulerResultCounter.WithLabelValues("skip-uniform-store", strconv.FormatUint(srcStore.GetID(), 10)).Inc()
continue
}
for _, mainPeerStat := range bs.filterHotPeers(srcStore) {
if bs.cur.region = bs.getRegion(mainPeerStat, srcStoreID); bs.cur.region == nil {
continue
Expand All @@ -564,7 +575,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for _, dstStore := range bs.filterDstStores() {
bs.cur.dstStore = dstStore
bs.calcProgressiveRank()
tryUpdateBestSolution(isUniformFirstPriority)
tryUpdateBestSolution()

if searchRevertRegions && (bs.cur.progressiveRank >= -1 && bs.cur.progressiveRank <= 0) &&
(bs.best == nil || bs.best.progressiveRank >= -1 || bs.best.revertRegion != nil) {
Expand All @@ -586,7 +597,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
bs.cur.revertPeerStat = revertPeerStat
bs.cur.revertRegion = revertRegion
bs.calcProgressiveRank()
tryUpdateBestSolution(isUniformFirstPriority)
tryUpdateBestSolution()
}
bs.cur.revertPeerStat = nil
bs.cur.revertRegion = nil
Expand Down
12 changes: 12 additions & 0 deletions server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,18 @@ func (conf *hotRegionSchedulerConfig) GetGreatDecRatio() float64 {
return conf.GreatDecRatio
}

func (conf *hotRegionSchedulerConfig) SetGreatDecRatio(r float64) {
conf.RLock()
defer conf.RUnlock()
conf.GreatDecRatio = r
}

func (conf *hotRegionSchedulerConfig) SetStrictPickingStore(v bool) {
conf.RLock()
defer conf.RUnlock()
conf.StrictPickingStore = v
}

func (conf *hotRegionSchedulerConfig) GetMinorDecRatio() float64 {
conf.RLock()
defer conf.RUnlock()
Expand Down
94 changes: 94 additions & 0 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,100 @@ func TestHotReadRegionScheduleWithPendingInfluence(t *testing.T) {
}
}

func TestSkipUniformStore(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Read.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1)
hb.(*hotScheduler).conf.SetDstToleranceRatio(1)
hb.(*hotScheduler).conf.SetStrictPickingStore(false)
// hb.(*hotScheduler).conf.SetGreatDecRatio(1.1) //avoid similar store but cannot schedule
hb.(*hotScheduler).conf.ReadPriorities = []string{BytePriority, KeyPriority}

tc := mockcluster.NewCluster(ctx, opt)
tc.SetHotRegionCacheHitsThreshold(0)
tc.AddRegionStore(1, 20)
tc.AddRegionStore(2, 20)
tc.AddRegionStore(3, 20)
tc.AddRegionStore(4, 20)
tc.AddRegionStore(5, 20)

// Case1: two dim are both enough uniform

tc.UpdateStorageReadStats(1, 10.05*units.MB*statistics.StoreHeartBeatReportInterval, 10.05*units.MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(2, 9.45*units.MB*statistics.StoreHeartBeatReportInterval, 9.45*units.MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(3, 10.0*units.MB*statistics.StoreHeartBeatReportInterval, 10.0*units.MB*statistics.StoreHeartBeatReportInterval)
addRegionInfo(tc, statistics.Read, []testRegionInfo{
{1, []uint64{1, 2, 3}, 0.05 * units.MB, 0.05 * units.MB, 0},
})

// when there is no uniform store filter, still schedule although the cluster is enough uniform
stddevThreshold = 0.0
ops, _ := hb.Schedule(tc, false)
re.Len(ops, 1)
testutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 1, 2)
clearPendingInfluence(hb.(*hotScheduler))

// when there is uniform store filter, not schedule
stddevThreshold = 0.1
ops, _ = hb.Schedule(tc, false)
re.Len(ops, 0)
clearPendingInfluence(hb.(*hotScheduler))

// Case2: the first dim is enough uniform, we should schedule the second dim

tc.UpdateStorageReadStats(1, 10.15*units.MB*statistics.StoreHeartBeatReportInterval, 10.05*units.MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(2, 9.45*units.MB*statistics.StoreHeartBeatReportInterval, 9.85*units.MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(3, 9.85*units.MB*statistics.StoreHeartBeatReportInterval, 16.0*units.MB*statistics.StoreHeartBeatReportInterval)
addRegionInfo(tc, statistics.Read, []testRegionInfo{
{1, []uint64{1, 2, 3}, 0.05 * units.MB, 0.05 * units.MB, 0},
{2, []uint64{3, 2, 1}, 0.05 * units.MB, 2 * units.MB, 0},
})

// when there is no uniform store filter, still schedule although the first dim is enough uniform
stddevThreshold = 0.0
ops, _ = hb.Schedule(tc, false)
re.Len(ops, 1)
testutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 1, 2)
clearPendingInfluence(hb.(*hotScheduler))

// when there is uniform store filter, schedule the second dim, which is no uniform
stddevThreshold = 0.1
ops, _ = hb.Schedule(tc, false)
re.Len(ops, 1)
testutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 3, 2)
clearPendingInfluence(hb.(*hotScheduler))

// Case3: the second dim is enough uniform, we should schedule the first dim, although its rank is higher than the second dim

tc.UpdateStorageReadStats(1, 10.05*units.MB*statistics.StoreHeartBeatReportInterval, 10.05*units.MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(2, 9.85*units.MB*statistics.StoreHeartBeatReportInterval, 9.45*units.MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(3, 16*units.MB*statistics.StoreHeartBeatReportInterval, 9.85*units.MB*statistics.StoreHeartBeatReportInterval)
addRegionInfo(tc, statistics.Read, []testRegionInfo{
{1, []uint64{1, 2, 3}, 0.05 * units.MB, 0.05 * units.MB, 0},
{2, []uint64{3, 2, 1}, 2 * units.MB, 0.05 * units.MB, 0},
})

// when there is no uniform store filter, still schedule although the second dim is enough uniform
stddevThreshold = 0.0
ops, _ = hb.Schedule(tc, false)
re.Len(ops, 1)
testutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 1, 2)
clearPendingInfluence(hb.(*hotScheduler))

// when there is uniform store filter, schedule the first dim, which is no uniform
stddevThreshold = 0.1
ops, _ = hb.Schedule(tc, false)
re.Len(ops, 1)
testutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 3, 2)
clearPendingInfluence(hb.(*hotScheduler))

}

func TestHotCacheUpdateCache(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit 8e652c5

Please sign in to comment.