Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: enlarge the search space so that the hot-scheduler can still schedule under dimensional conflicts #4912

Merged
merged 11 commits into from
Jun 7, 2022
66 changes: 45 additions & 21 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ var (
schedulePeerPr = 0.66
// pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together
pendingAmpFactor = 2.0
// If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension,
// as it implies that this dimension is sufficiently uniform.
stddevThreshold = 0.1
)

type hotScheduler struct {
Expand Down Expand Up @@ -384,6 +387,8 @@ type balanceSolver struct {
minorDecRatio float64
maxPeerNum int
minHotDegree int

pick func(s interface{}, p func(int) bool) bool
}

func (bs *balanceSolver) init() {
Expand Down Expand Up @@ -423,6 +428,11 @@ func (bs *balanceSolver) init() {
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetOpts().GetHotRegionCacheHitsThreshold()

bs.pick = slice.AnyOf
if bs.sche.conf.IsStrictPickingStoreEnabled() {
bs.pick = slice.AllOf
}
}

func (bs *balanceSolver) isSelectedDim(dim int) bool {
Expand Down Expand Up @@ -472,7 +482,14 @@ func (bs *balanceSolver) solve() []*operator.Operator {
return nil
}
bs.cur = &solution{}
tryUpdateBestSolution := func() {

tryUpdateBestSolution := func(isUniformFirstPriority bool) {
if bs.cur.progressiveRank == -1 && isUniformFirstPriority {
// Because region is available for src and dst, so stddev is the same for both, only need to calcurate one.
// If first priority dim is enough uniform, -1 is unnecessary and maybe lead to worse balance for second priority dim
hotSchedulerResultCounter.WithLabelValues("skip-uniform-store", strconv.FormatUint(bs.cur.dstStore.GetID(), 10)).Inc()
return
}
if bs.cur.progressiveRank < 0 && bs.betterThan(bs.best) {
if newOps, newInfl := bs.buildOperators(); len(newOps) > 0 {
bs.ops = newOps
Expand All @@ -486,7 +503,11 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for _, srcStore := range bs.filterSrcStores() {
bs.cur.srcStore = srcStore
srcStoreID := srcStore.GetID()

isUniformFirstPriority, isUniformSecondPriority := bs.isUniformFirstPriority(bs.cur.srcStore), bs.isUniformSecondPriority(bs.cur.srcStore)
if isUniformFirstPriority && isUniformSecondPriority {
nolouch marked this conversation as resolved.
Show resolved Hide resolved
nolouch marked this conversation as resolved.
Show resolved Hide resolved
hotSchedulerResultCounter.WithLabelValues("skip-uniform-store", strconv.FormatUint(bs.cur.srcStore.GetID(), 10)).Inc()
continue
}
for _, srcPeerStat := range bs.filterHotPeers(srcStore) {
if bs.cur.region = bs.getRegion(srcPeerStat, srcStoreID); bs.cur.region == nil {
continue
Expand All @@ -499,7 +520,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for _, dstStore := range bs.filterDstStores() {
bs.cur.dstStore = dstStore
bs.calcProgressiveRank()
tryUpdateBestSolution()
tryUpdateBestSolution(isUniformFirstPriority)
}
}
}
Expand Down Expand Up @@ -564,15 +585,12 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai
}

func (bs *balanceSolver) checkSrcByDimPriorityAndTolerance(minLoad, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool {
if bs.sche.conf.IsStrictPickingStoreEnabled() {
return slice.AllOf(minLoad.Loads, func(i int) bool {
if bs.isSelectedDim(i) {
return minLoad.Loads[i] > toleranceRatio*expectLoad.Loads[i]
}
return true
})
}
return minLoad.Loads[bs.firstPriority] > toleranceRatio*expectLoad.Loads[bs.firstPriority]
return bs.pick(minLoad.Loads, func(i int) bool {
if bs.isSelectedDim(i) {
return minLoad.Loads[i] > toleranceRatio*expectLoad.Loads[i]
}
return true
})
}

// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
Expand Down Expand Up @@ -770,15 +788,21 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st
}

func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statistics.StoreLoad, toleranceRatio float64) bool {
if bs.sche.conf.IsStrictPickingStoreEnabled() {
return slice.AllOf(maxLoad.Loads, func(i int) bool {
if bs.isSelectedDim(i) {
return maxLoad.Loads[i]*toleranceRatio < expect.Loads[i]
}
return true
})
}
return maxLoad.Loads[bs.firstPriority]*toleranceRatio < expect.Loads[bs.firstPriority]
return bs.pick(maxLoad.Loads, func(i int) bool {
if bs.isSelectedDim(i) {
return maxLoad.Loads[i]*toleranceRatio < expect.Loads[i]
}
return true
})
}

func (bs *balanceSolver) isUniformFirstPriority(store *statistics.StoreLoadDetail) bool {
// first priority should be more uniform than second priority
return store.IsUniform(bs.firstPriority, stddevThreshold*0.5)
}

func (bs *balanceSolver) isUniformSecondPriority(store *statistics.StoreLoadDetail) bool {
return store.IsUniform(bs.secondPriority, stddevThreshold)
}

// calcProgressiveRank calculates `bs.cur.progressiveRank`.
Expand Down
88 changes: 83 additions & 5 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,12 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithRuleEnabled(c *C) {
tc.SetHotRegionCacheHitsThreshold(0)
key, err := hex.DecodeString("")
c.Assert(err, IsNil)
// skip stddev check
origin := stddevThreshold
stddevThreshold = -1.0
defer func() {
stddevThreshold = origin
}()

tc.AddRegionStore(1, 20)
tc.AddRegionStore(2, 20)
Expand Down Expand Up @@ -1776,6 +1782,12 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) {
c.Assert(err, IsNil)
hb.(*hotScheduler).conf.SetDstToleranceRatio(1.05)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.05)
// skip stddev check
origin := stddevThreshold
stddevThreshold = -1.0
defer func() {
stddevThreshold = origin
}()

tc := mockcluster.NewCluster(ctx, opt)
tc.SetHotRegionCacheHitsThreshold(0)
Expand Down Expand Up @@ -1831,28 +1843,94 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) {

hb, err = schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
c.Assert(err, IsNil)
hb.(*hotScheduler).conf.StrictPickingStore = false

// assert loose store picking
tc.UpdateStorageWrittenStats(1, 10*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(2, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(2, 6.1*MB*statistics.StoreHeartBeatReportInterval, 6.1*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(3, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(4, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(5, 1*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval)
hb.(*hotScheduler).conf.WritePeerPriorities = []string{BytePriority, KeyPriority}
hb.(*hotScheduler).conf.StrictPickingStore = true
ops = hb.Schedule(tc)
c.Assert(ops, HasLen, 0)
hb.(*hotScheduler).conf.StrictPickingStore = false
ops = hb.Schedule(tc)
c.Assert(ops, HasLen, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 1, 5)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 2, 5) // two dims will be better
clearPendingInfluence(hb.(*hotScheduler))

tc.UpdateStorageWrittenStats(1, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(2, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(2, 6.1*MB*statistics.StoreHeartBeatReportInterval, 6.1*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(3, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(4, 1*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(5, 1*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval)
hb.(*hotScheduler).conf.WritePeerPriorities = []string{KeyPriority, BytePriority}
hb.(*hotScheduler).conf.StrictPickingStore = true
ops = hb.Schedule(tc)
c.Assert(ops, HasLen, 0)
hb.(*hotScheduler).conf.StrictPickingStore = false
ops = hb.Schedule(tc)
c.Assert(ops, HasLen, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 4, 5)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 2, 5) // two dims will be better
clearPendingInfluence(hb.(*hotScheduler))
}

func (s *testHotSchedulerSuite) TestHotScheduleWithStddev(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
c.Assert(err, IsNil)
hb.(*hotScheduler).conf.SetDstToleranceRatio(0.0)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(0.0)
tc := mockcluster.NewCluster(ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetHotRegionCacheHitsThreshold(0)
tc.AddRegionStore(1, 20)
tc.AddRegionStore(2, 20)
tc.AddRegionStore(3, 20)
tc.AddRegionStore(4, 20)
tc.AddRegionStore(5, 20)
hb.(*hotScheduler).conf.StrictPickingStore = false

// skip uniform cluster
tc.UpdateStorageWrittenStats(1, 5*MB*statistics.StoreHeartBeatReportInterval, 5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(2, 5.3*MB*statistics.StoreHeartBeatReportInterval, 5.3*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(3, 5*MB*statistics.StoreHeartBeatReportInterval, 5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(4, 5*MB*statistics.StoreHeartBeatReportInterval, 5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(5, 4.8*MB*statistics.StoreHeartBeatReportInterval, 4.8*MB*statistics.StoreHeartBeatReportInterval)
addRegionInfo(tc, statistics.Write, []testRegionInfo{
{6, []uint64{3, 4, 2}, 0.1 * MB, 0.1 * MB, 0},
})
hb.(*hotScheduler).conf.WritePeerPriorities = []string{BytePriority, KeyPriority}
stddevThreshold = 0.1
ops := hb.Schedule(tc)
c.Assert(ops, HasLen, 0)
stddevThreshold = -1.0
ops = hb.Schedule(tc)
c.Assert(ops, HasLen, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 2, 5)
clearPendingInfluence(hb.(*hotScheduler))

// skip -1 case (uniform cluster)
tc.UpdateStorageWrittenStats(1, 5*MB*statistics.StoreHeartBeatReportInterval, 100*MB*statistics.StoreHeartBeatReportInterval) // two dims are not uniform.
tc.UpdateStorageWrittenStats(2, 5.3*MB*statistics.StoreHeartBeatReportInterval, 4.8*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(3, 5*MB*statistics.StoreHeartBeatReportInterval, 5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(4, 5*MB*statistics.StoreHeartBeatReportInterval, 5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(5, 4.8*MB*statistics.StoreHeartBeatReportInterval, 5*MB*statistics.StoreHeartBeatReportInterval)
addRegionInfo(tc, statistics.Write, []testRegionInfo{
{6, []uint64{3, 4, 2}, 0.1 * MB, 0.1 * MB, 0},
})
hb.(*hotScheduler).conf.WritePeerPriorities = []string{BytePriority, KeyPriority}
stddevThreshold = 0.1
ops = hb.Schedule(tc)
c.Assert(ops, HasLen, 0)
stddevThreshold = -1.0
ops = hb.Schedule(tc)
c.Assert(ops, HasLen, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 2, 5)
clearPendingInfluence(hb.(*hotScheduler))
}

Expand Down
27 changes: 26 additions & 1 deletion server/statistics/store_hot_peers_infos.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package statistics

import (
"fmt"
"math"

"github.com/tikv/pd/server/core"
)
Expand Down Expand Up @@ -186,6 +187,19 @@ func summaryStoresLoadByEngine(
for i := range expectLoads {
expectLoads[i] = allStoreLoadSum[i] / float64(allStoreCount)
}

stddevLoads := make([]float64, len(allStoreLoadSum))
if allHotPeersCount != 0 {
for _, detail := range loadDetail {
for i := range expectLoads {
stddevLoads[i] += math.Pow(detail.LoadPred.Current.Loads[i]-expectLoads[i], 2)
}
}
for i := range stddevLoads {
stddevLoads[i] = math.Sqrt(stddevLoads[i]/float64(allStoreCount)) / expectLoads[i]
}
}

{
// Metric for debug.
engine := collector.Engine()
Expand All @@ -197,13 +211,24 @@ func summaryStoresLoadByEngine(
hotPeerSummary.WithLabelValues(ty, engine).Set(expectLoads[QueryDim])
ty = "exp-count-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, engine).Set(expectCount)
ty = "stddev-byte-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, engine).Set(stddevLoads[ByteDim])
ty = "stddev-key-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, engine).Set(stddevLoads[KeyDim])
ty = "stddev-query-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, engine).Set(stddevLoads[QueryDim])
}
expect := StoreLoad{
Loads: expectLoads,
Count: float64(allHotPeersCount) / float64(allStoreCount),
Count: expectCount,
}
stddev := StoreLoad{
Loads: stddevLoads,
Count: expectCount,
}
for _, detail := range loadDetail {
detail.LoadPred.Expect = expect
detail.LoadPred.Stddev = stddev
}
return loadDetail
}
Expand Down
6 changes: 6 additions & 0 deletions server/statistics/store_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (li *StoreLoadDetail) ToHotPeersStat() *HotPeersStat {
}
}

// IsUniform returns true if the stores are uniform.
func (li *StoreLoadDetail) IsUniform(dim int, threshold float64) bool {
return li.LoadPred.Stddev.Loads[dim] < threshold
}

func toHotPeerStatShow(p *HotPeerStat, kind RWType) HotPeerStatShow {
b, k, q := GetRegionStatKind(kind, ByteDim), GetRegionStatKind(kind, KeyDim), GetRegionStatKind(kind, QueryDim)
byteRate := p.Loads[b]
Expand Down Expand Up @@ -206,6 +211,7 @@ type StoreLoadPred struct {
Current StoreLoad
Future StoreLoad
Expect StoreLoad
Stddev StoreLoad
}

// Min returns the min load between current and future.
Expand Down