Skip to content

Commit

Permalink
scheduler: enlarge the search space so that the hot-scheduler can sti…
Browse files Browse the repository at this point in the history
…ll schedule under dimensional conflicts (tikv#4912)

ref tikv#4949

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ShuNing <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
3 people authored Jun 7, 2022
1 parent 1f3c305 commit 726d345
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 27 deletions.
66 changes: 45 additions & 21 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ var (
schedulePeerPr = 0.66
// pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together
pendingAmpFactor = 2.0
// If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension,
// as it implies that this dimension is sufficiently uniform.
stddevThreshold = 0.1
)

type hotScheduler struct {
Expand Down Expand Up @@ -384,6 +387,8 @@ type balanceSolver struct {
minorDecRatio float64
maxPeerNum int
minHotDegree int

pick func(s interface{}, p func(int) bool) bool
}

func (bs *balanceSolver) init() {
Expand Down Expand Up @@ -423,6 +428,11 @@ func (bs *balanceSolver) init() {
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetOpts().GetHotRegionCacheHitsThreshold()

bs.pick = slice.AnyOf
if bs.sche.conf.IsStrictPickingStoreEnabled() {
bs.pick = slice.AllOf
}
}

func (bs *balanceSolver) isSelectedDim(dim int) bool {
Expand Down Expand Up @@ -472,7 +482,14 @@ func (bs *balanceSolver) solve() []*operator.Operator {
return nil
}
bs.cur = &solution{}
tryUpdateBestSolution := func() {

tryUpdateBestSolution := func(isUniformFirstPriority bool) {
if bs.cur.progressiveRank == -1 && isUniformFirstPriority {
// Because region is available for src and dst, so stddev is the same for both, only need to calcurate one.
// If first priority dim is enough uniform, -1 is unnecessary and maybe lead to worse balance for second priority dim
hotSchedulerResultCounter.WithLabelValues("skip-uniform-store", strconv.FormatUint(bs.cur.dstStore.GetID(), 10)).Inc()
return
}
if bs.cur.progressiveRank < 0 && bs.betterThan(bs.best) {
if newOps, newInfl := bs.buildOperators(); len(newOps) > 0 {
bs.ops = newOps
Expand All @@ -486,7 +503,11 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for _, srcStore := range bs.filterSrcStores() {
bs.cur.srcStore = srcStore
srcStoreID := srcStore.GetID()

isUniformFirstPriority, isUniformSecondPriority := bs.isUniformFirstPriority(bs.cur.srcStore), bs.isUniformSecondPriority(bs.cur.srcStore)
if isUniformFirstPriority && isUniformSecondPriority {
hotSchedulerResultCounter.WithLabelValues("skip-uniform-store", strconv.FormatUint(bs.cur.srcStore.GetID(), 10)).Inc()
continue
}
for _, srcPeerStat := range bs.filterHotPeers(srcStore) {
if bs.cur.region = bs.getRegion(srcPeerStat, srcStoreID); bs.cur.region == nil {
continue
Expand All @@ -499,7 +520,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for _, dstStore := range bs.filterDstStores() {
bs.cur.dstStore = dstStore
bs.calcProgressiveRank()
tryUpdateBestSolution()
tryUpdateBestSolution(isUniformFirstPriority)
}
}
}
Expand Down Expand Up @@ -564,15 +585,12 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai
}

func (bs *balanceSolver) checkSrcByDimPriorityAndTolerance(minLoad, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool {
if bs.sche.conf.IsStrictPickingStoreEnabled() {
return slice.AllOf(minLoad.Loads, func(i int) bool {
if bs.isSelectedDim(i) {
return minLoad.Loads[i] > toleranceRatio*expectLoad.Loads[i]
}
return true
})
}
return minLoad.Loads[bs.firstPriority] > toleranceRatio*expectLoad.Loads[bs.firstPriority]
return bs.pick(minLoad.Loads, func(i int) bool {
if bs.isSelectedDim(i) {
return minLoad.Loads[i] > toleranceRatio*expectLoad.Loads[i]
}
return true
})
}

// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
Expand Down Expand Up @@ -770,15 +788,21 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st
}

func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statistics.StoreLoad, toleranceRatio float64) bool {
if bs.sche.conf.IsStrictPickingStoreEnabled() {
return slice.AllOf(maxLoad.Loads, func(i int) bool {
if bs.isSelectedDim(i) {
return maxLoad.Loads[i]*toleranceRatio < expect.Loads[i]
}
return true
})
}
return maxLoad.Loads[bs.firstPriority]*toleranceRatio < expect.Loads[bs.firstPriority]
return bs.pick(maxLoad.Loads, func(i int) bool {
if bs.isSelectedDim(i) {
return maxLoad.Loads[i]*toleranceRatio < expect.Loads[i]
}
return true
})
}

func (bs *balanceSolver) isUniformFirstPriority(store *statistics.StoreLoadDetail) bool {
// first priority should be more uniform than second priority
return store.IsUniform(bs.firstPriority, stddevThreshold*0.5)
}

func (bs *balanceSolver) isUniformSecondPriority(store *statistics.StoreLoadDetail) bool {
return store.IsUniform(bs.secondPriority, stddevThreshold)
}

// calcProgressiveRank calculates `bs.cur.progressiveRank`.
Expand Down
88 changes: 83 additions & 5 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,12 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithRuleEnabled(c *C) {
tc.SetHotRegionCacheHitsThreshold(0)
key, err := hex.DecodeString("")
c.Assert(err, IsNil)
// skip stddev check
origin := stddevThreshold
stddevThreshold = -1.0
defer func() {
stddevThreshold = origin
}()

tc.AddRegionStore(1, 20)
tc.AddRegionStore(2, 20)
Expand Down Expand Up @@ -1776,6 +1782,12 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) {
c.Assert(err, IsNil)
hb.(*hotScheduler).conf.SetDstToleranceRatio(1.05)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.05)
// skip stddev check
origin := stddevThreshold
stddevThreshold = -1.0
defer func() {
stddevThreshold = origin
}()

tc := mockcluster.NewCluster(ctx, opt)
tc.SetHotRegionCacheHitsThreshold(0)
Expand Down Expand Up @@ -1831,28 +1843,94 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) {

hb, err = schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
c.Assert(err, IsNil)
hb.(*hotScheduler).conf.StrictPickingStore = false

// assert loose store picking
tc.UpdateStorageWrittenStats(1, 10*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(2, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(2, 6.1*MB*statistics.StoreHeartBeatReportInterval, 6.1*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(3, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(4, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(5, 1*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval)
hb.(*hotScheduler).conf.WritePeerPriorities = []string{BytePriority, KeyPriority}
hb.(*hotScheduler).conf.StrictPickingStore = true
ops = hb.Schedule(tc)
c.Assert(ops, HasLen, 0)
hb.(*hotScheduler).conf.StrictPickingStore = false
ops = hb.Schedule(tc)
c.Assert(ops, HasLen, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 1, 5)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 2, 5) // two dims will be better
clearPendingInfluence(hb.(*hotScheduler))

tc.UpdateStorageWrittenStats(1, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(2, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(2, 6.1*MB*statistics.StoreHeartBeatReportInterval, 6.1*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(3, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(4, 1*MB*statistics.StoreHeartBeatReportInterval, 10*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(5, 1*MB*statistics.StoreHeartBeatReportInterval, 1*MB*statistics.StoreHeartBeatReportInterval)
hb.(*hotScheduler).conf.WritePeerPriorities = []string{KeyPriority, BytePriority}
hb.(*hotScheduler).conf.StrictPickingStore = true
ops = hb.Schedule(tc)
c.Assert(ops, HasLen, 0)
hb.(*hotScheduler).conf.StrictPickingStore = false
ops = hb.Schedule(tc)
c.Assert(ops, HasLen, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 4, 5)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 2, 5) // two dims will be better
clearPendingInfluence(hb.(*hotScheduler))
}

func (s *testHotSchedulerSuite) TestHotScheduleWithStddev(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
opt := config.NewTestOptions()
hb, err := schedule.CreateScheduler(statistics.Write.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil)
c.Assert(err, IsNil)
hb.(*hotScheduler).conf.SetDstToleranceRatio(0.0)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(0.0)
tc := mockcluster.NewCluster(ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetHotRegionCacheHitsThreshold(0)
tc.AddRegionStore(1, 20)
tc.AddRegionStore(2, 20)
tc.AddRegionStore(3, 20)
tc.AddRegionStore(4, 20)
tc.AddRegionStore(5, 20)
hb.(*hotScheduler).conf.StrictPickingStore = false

// skip uniform cluster
tc.UpdateStorageWrittenStats(1, 5*MB*statistics.StoreHeartBeatReportInterval, 5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(2, 5.3*MB*statistics.StoreHeartBeatReportInterval, 5.3*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(3, 5*MB*statistics.StoreHeartBeatReportInterval, 5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(4, 5*MB*statistics.StoreHeartBeatReportInterval, 5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(5, 4.8*MB*statistics.StoreHeartBeatReportInterval, 4.8*MB*statistics.StoreHeartBeatReportInterval)
addRegionInfo(tc, statistics.Write, []testRegionInfo{
{6, []uint64{3, 4, 2}, 0.1 * MB, 0.1 * MB, 0},
})
hb.(*hotScheduler).conf.WritePeerPriorities = []string{BytePriority, KeyPriority}
stddevThreshold = 0.1
ops := hb.Schedule(tc)
c.Assert(ops, HasLen, 0)
stddevThreshold = -1.0
ops = hb.Schedule(tc)
c.Assert(ops, HasLen, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 2, 5)
clearPendingInfluence(hb.(*hotScheduler))

// skip -1 case (uniform cluster)
tc.UpdateStorageWrittenStats(1, 5*MB*statistics.StoreHeartBeatReportInterval, 100*MB*statistics.StoreHeartBeatReportInterval) // two dims are not uniform.
tc.UpdateStorageWrittenStats(2, 5.3*MB*statistics.StoreHeartBeatReportInterval, 4.8*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(3, 5*MB*statistics.StoreHeartBeatReportInterval, 5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(4, 5*MB*statistics.StoreHeartBeatReportInterval, 5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(5, 4.8*MB*statistics.StoreHeartBeatReportInterval, 5*MB*statistics.StoreHeartBeatReportInterval)
addRegionInfo(tc, statistics.Write, []testRegionInfo{
{6, []uint64{3, 4, 2}, 0.1 * MB, 0.1 * MB, 0},
})
hb.(*hotScheduler).conf.WritePeerPriorities = []string{BytePriority, KeyPriority}
stddevThreshold = 0.1
ops = hb.Schedule(tc)
c.Assert(ops, HasLen, 0)
stddevThreshold = -1.0
ops = hb.Schedule(tc)
c.Assert(ops, HasLen, 1)
testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 2, 5)
clearPendingInfluence(hb.(*hotScheduler))
}

Expand Down
27 changes: 26 additions & 1 deletion server/statistics/store_hot_peers_infos.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package statistics

import (
"fmt"
"math"

"github.com/tikv/pd/server/core"
)
Expand Down Expand Up @@ -186,6 +187,19 @@ func summaryStoresLoadByEngine(
for i := range expectLoads {
expectLoads[i] = allStoreLoadSum[i] / float64(allStoreCount)
}

stddevLoads := make([]float64, len(allStoreLoadSum))
if allHotPeersCount != 0 {
for _, detail := range loadDetail {
for i := range expectLoads {
stddevLoads[i] += math.Pow(detail.LoadPred.Current.Loads[i]-expectLoads[i], 2)
}
}
for i := range stddevLoads {
stddevLoads[i] = math.Sqrt(stddevLoads[i]/float64(allStoreCount)) / expectLoads[i]
}
}

{
// Metric for debug.
engine := collector.Engine()
Expand All @@ -197,13 +211,24 @@ func summaryStoresLoadByEngine(
hotPeerSummary.WithLabelValues(ty, engine).Set(expectLoads[QueryDim])
ty = "exp-count-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, engine).Set(expectCount)
ty = "stddev-byte-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, engine).Set(stddevLoads[ByteDim])
ty = "stddev-key-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, engine).Set(stddevLoads[KeyDim])
ty = "stddev-query-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, engine).Set(stddevLoads[QueryDim])
}
expect := StoreLoad{
Loads: expectLoads,
Count: float64(allHotPeersCount) / float64(allStoreCount),
Count: expectCount,
}
stddev := StoreLoad{
Loads: stddevLoads,
Count: expectCount,
}
for _, detail := range loadDetail {
detail.LoadPred.Expect = expect
detail.LoadPred.Stddev = stddev
}
return loadDetail
}
Expand Down
6 changes: 6 additions & 0 deletions server/statistics/store_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (li *StoreLoadDetail) ToHotPeersStat() *HotPeersStat {
}
}

// IsUniform returns true if the stores are uniform.
func (li *StoreLoadDetail) IsUniform(dim int, threshold float64) bool {
return li.LoadPred.Stddev.Loads[dim] < threshold
}

func toHotPeerStatShow(p *HotPeerStat, kind RWType) HotPeerStatShow {
b, k, q := GetRegionStatKind(kind, ByteDim), GetRegionStatKind(kind, KeyDim), GetRegionStatKind(kind, QueryDim)
byteRate := p.Loads[b]
Expand Down Expand Up @@ -206,6 +211,7 @@ type StoreLoadPred struct {
Current StoreLoad
Future StoreLoad
Expect StoreLoad
Stddev StoreLoad
}

// Min returns the min load between current and future.
Expand Down

0 comments on commit 726d345

Please sign in to comment.