Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduelrs: add load expectations for hot scheudler #2297

Merged
merged 4 commits into from
Mar 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,40 @@ func (mc *Cluster) UpdateStorageRatio(storeID uint64, usedRatio, availableRatio
mc.PutStore(newStore)
}

// UpdateStorageWrittenStats updates store written bytes.
func (mc *Cluster) UpdateStorageWrittenStats(storeID, bytesWritten, keysWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesWritten = bytesWritten
newStats.KeysWritten = keysWritten
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

// UpdateStorageReadStats updates store written bytes.
func (mc *Cluster) UpdateStorageReadStats(storeID, bytesWritten, keysWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesRead = bytesWritten
newStats.KeysRead = keysWritten
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

// UpdateStorageWrittenBytes updates store written bytes.
func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesWritten = bytesWritten
newStats.KeysWritten = bytesWritten / 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
Expand All @@ -411,6 +440,7 @@ func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesRead = bytesRead
newStats.KeysRead = bytesRead / 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
Expand All @@ -424,6 +454,7 @@ func (mc *Cluster) UpdateStorageWrittenKeys(storeID uint64, keysWritten uint64)
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.KeysWritten = keysWritten
newStats.BytesWritten = keysWritten * 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
Expand All @@ -437,6 +468,7 @@ func (mc *Cluster) UpdateStorageReadKeys(storeID uint64, keysRead uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.KeysRead = keysRead
newStats.BytesRead = keysRead * 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
Expand Down
48 changes: 44 additions & 4 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ func summaryStoresLoad(
kind core.ResourceKind,
) map[uint64]*storeLoadDetail {
loadDetail := make(map[uint64]*storeLoadDetail, len(storeByteRate))
allByteSum := 0.0
allKeySum := 0.0
allCount := 0.0

// Stores without byte rate statistics is not available to schedule.
for id, byteRate := range storeByteRate {
Expand Down Expand Up @@ -295,6 +298,9 @@ func summaryStoresLoad(
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(keySum)
}
}
allByteSum += byteRate
allKeySum += keyRate
allCount += float64(len(hotPeers))

// Build store load prediction from current load and pending influence.
stLoadPred := (&storeLoad{
Expand All @@ -309,6 +315,29 @@ func summaryStoresLoad(
HotPeers: hotPeers,
}
}
storeLen := float64(len(storeByteRate))

for id, detail := range loadDetail {
byteExp := allByteSum / storeLen
keyExp := allKeySum / storeLen
countExp := allCount / storeLen
detail.LoadPred.Future.ExpByteRate = byteExp
detail.LoadPred.Future.ExpKeyRate = keyExp
detail.LoadPred.Future.ExpCount = countExp
// Debug
{
ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(byteExp)
}
{
ty := "exp-key-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(keyExp)
}
{
ty := "exp-count-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(countExp)
}
}
return loadDetail
}

Expand Down Expand Up @@ -553,7 +582,12 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
if len(detail.HotPeers) == 0 {
continue
}
ret[id] = detail
if detail.LoadPred.min().ByteRate > bs.sche.conf.GetToleranceRatio()*detail.LoadPred.Future.ExpByteRate &&
detail.LoadPred.min().KeyRate > bs.sche.conf.GetToleranceRatio()*detail.LoadPred.Future.ExpKeyRate {
ret[id] = detail
balanceHotRegionCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc()
}
balanceHotRegionCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc()
}
return ret
}
Expand Down Expand Up @@ -716,6 +750,13 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail, len(candidates))
for _, store := range candidates {
if filter.Target(bs.cluster, store, filters) {
detail := bs.stLoadDetail[store.GetID()]
if detail.LoadPred.max().ByteRate*bs.sche.conf.GetToleranceRatio() < detail.LoadPred.Future.ExpByteRate &&
detail.LoadPred.max().KeyRate*bs.sche.conf.GetToleranceRatio() < detail.LoadPred.Future.ExpKeyRate {
ret[store.GetID()] = bs.stLoadDetail[store.GetID()]
balanceHotRegionCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10)).Inc()
}
balanceHotRegionCounter.WithLabelValues("dst-store-fail", strconv.FormatUint(store.GetID(), 10)).Inc()
ret[store.GetID()] = bs.stLoadDetail[store.GetID()]
}
}
Expand All @@ -731,9 +772,8 @@ func (bs *balanceSolver) calcProgressiveRank() {
rank := int64(0)
if bs.rwTy == write && bs.opTy == transferLeader {
// In this condition, CPU usage is the matter.
// Only consider about count and key rate.
if srcLd.Count > dstLd.Count &&
srcLd.KeyRate >= dstLd.KeyRate+peer.GetKeyRate() {
// Only consider about key rate.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is its effect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The srcCount may higher but the keyRate lower.

if srcLd.KeyRate >= dstLd.KeyRate+peer.GetKeyRate() {
rank = -1
}
} else {
Expand Down
14 changes: 14 additions & 0 deletions server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig {
GreatDecRatio: 0.95,
MinorDecRatio: 0.99,
MaxPeerNum: 1000,
ToleranceRatio: 1.02, // Tolerate 2% difference
}
}

Expand All @@ -61,6 +62,7 @@ type hotRegionSchedulerConfig struct {
CountRankStepRatio float64 `json:"count-rank-step-ratio"`
GreatDecRatio float64 `json:"great-dec-ratio"`
MinorDecRatio float64 `json:"minor-dec-ratio"`
ToleranceRatio float64 `json:"tolerance-ratio"`
}

func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) {
Expand All @@ -81,6 +83,18 @@ func (conf *hotRegionSchedulerConfig) GetMaxPeerNumber() int {
return conf.MaxPeerNum
}

func (conf *hotRegionSchedulerConfig) GetToleranceRatio() float64 {
conf.RLock()
defer conf.RUnlock()
return conf.ToleranceRatio
}

func (conf *hotRegionSchedulerConfig) SetToleranceRatio(tol float64) {
conf.Lock()
defer conf.Unlock()
conf.ToleranceRatio = tol
}

func (conf *hotRegionSchedulerConfig) GetByteRankStepRatio() float64 {
conf.RLock()
defer conf.RUnlock()
Expand Down
46 changes: 18 additions & 28 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) {
opt := mockoption.NewScheduleOptions()
hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil)
c.Assert(err, IsNil)
hb.(*hotScheduler).conf.SetToleranceRatio(1)
opt.HotRegionCacheHitsThreshold = 0

tc := mockcluster.NewCluster(opt)
Expand All @@ -307,17 +308,11 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) {
tc.AddRegionStore(4, 20)
tc.AddRegionStore(5, 20)

tc.UpdateStorageWrittenBytes(1, 10.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(2, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(3, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(4, 9*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(5, 8.9*MB*statistics.StoreHeartBeatReportInterval)

tc.UpdateStorageWrittenKeys(1, 10*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenKeys(2, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenKeys(3, 9.8*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenKeys(4, 9*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenKeys(5, 9.2*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(1, 10.5*MB*statistics.StoreHeartBeatReportInterval, 10.2*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(3, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.8*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(4, 9*MB*statistics.StoreHeartBeatReportInterval, 9*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(5, 8.9*MB*statistics.StoreHeartBeatReportInterval, 9.2*MB*statistics.StoreHeartBeatReportInterval)

addRegionInfo(tc, write, []testRegionInfo{
{1, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB},
Expand All @@ -331,19 +326,19 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) {
// byteDecRatio <= 0.95 && keyDecRatio <= 0.95
testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 1, 4)
// store byte rate (min, max): (10, 10.5) | 9.5 | 9.5 | (9, 9.5) | 8.9
// store key rate (min, max): (9.5, 10) | 9.5 | 9.8 | (9, 9.5) | 9.2
// store key rate (min, max): (9.7, 10.2) | 9.5 | 9.8 | (9, 9.5) | 9.2

op = hb.Schedule(tc)[0]
// byteDecRatio <= 0.99 && keyDecRatio <= 0.95
testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 3, 5)
// store byte rate (min, max): (10, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 8.95)
// store key rate (min, max): (9.5, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.3)
// store key rate (min, max): (9.7, 10.2) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.3)

op = hb.Schedule(tc)[0]
// byteDecRatio <= 0.95
testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 1, 5)
// store byte rate (min, max): (9.5, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 9.45)
// store key rate (min, max): (9, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.8)
// store key rate (min, max): (9.2, 10.2) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.8)
}
}

Expand Down Expand Up @@ -586,6 +581,7 @@ func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) {
opt := mockoption.NewScheduleOptions()
hb, err := schedule.CreateScheduler(HotReadRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil)
c.Assert(err, IsNil)
hb.(*hotScheduler).conf.SetToleranceRatio(1)
opt.HotRegionCacheHitsThreshold = 0

tc := mockcluster.NewCluster(opt)
Expand All @@ -595,17 +591,11 @@ func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) {
tc.AddRegionStore(4, 20)
tc.AddRegionStore(5, 20)

tc.UpdateStorageReadBytes(1, 10.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(2, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(3, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(4, 9*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(5, 8.9*MB*statistics.StoreHeartBeatReportInterval)

tc.UpdateStorageReadKeys(1, 10*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadKeys(2, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadKeys(3, 9.8*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadKeys(4, 9*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadKeys(5, 9.2*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(1, 10.5*MB*statistics.StoreHeartBeatReportInterval, 10.2*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(3, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.8*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(4, 9*MB*statistics.StoreHeartBeatReportInterval, 9*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(5, 8.9*MB*statistics.StoreHeartBeatReportInterval, 9.2*MB*statistics.StoreHeartBeatReportInterval)

addRegionInfo(tc, read, []testRegionInfo{
{1, []uint64{1, 2, 4}, 0.5 * MB, 0.5 * MB},
Expand All @@ -619,19 +609,19 @@ func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) {
// byteDecRatio <= 0.95 && keyDecRatio <= 0.95
testutil.CheckTransferLeader(c, op, operator.OpHotRegion, 1, 4)
// store byte rate (min, max): (10, 10.5) | 9.5 | 9.5 | (9, 9.5) | 8.9
// store key rate (min, max): (9.5, 10) | 9.5 | 9.8 | (9, 9.5) | 9.2
// store key rate (min, max): (9.7, 10.2) | 9.5 | 9.8 | (9, 9.5) | 9.2

op = hb.Schedule(tc)[0]
// byteDecRatio <= 0.99 && keyDecRatio <= 0.95
testutil.CheckTransferLeader(c, op, operator.OpHotRegion, 3, 5)
// store byte rate (min, max): (10, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 8.95)
// store key rate (min, max): (9.5, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.3)
// store key rate (min, max): (9.7, 10.2) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.3)

op = hb.Schedule(tc)[0]
// byteDecRatio <= 0.95
testutil.CheckTransferPeerWithLeaderTransfer(c, op, operator.OpHotRegion, 1, 5)
// store byte rate (min, max): (9.5, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 9.45)
// store key rate (min, max): (9, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.8)
// store key rate (min, max): (9.2, 10.2) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.8)
}
}

Expand Down
4 changes: 4 additions & 0 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ type storeLoad struct {
ByteRate float64
KeyRate float64
Count float64

ExpByteRate float64
ExpKeyRate float64
ExpCount float64
}

func (load *storeLoad) ToLoadPred(infl Influence) *storeLoadPred {
Expand Down