Skip to content

Commit

Permalink
scheduelrs: add load expectations for hot scheudler (tikv#2297)
Browse files Browse the repository at this point in the history
* scheduelrs: add load expectations for hot scheudler

Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch authored and hsqlu committed Mar 29, 2020
1 parent 43c3e95 commit f8e60b2
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 32 deletions.
32 changes: 32 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,40 @@ func (mc *Cluster) UpdateStorageRatio(storeID uint64, usedRatio, availableRatio
mc.PutStore(newStore)
}

// UpdateStorageWrittenStats updates store written bytes.
func (mc *Cluster) UpdateStorageWrittenStats(storeID, bytesWritten, keysWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesWritten = bytesWritten
newStats.KeysWritten = keysWritten
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

// UpdateStorageReadStats updates store written bytes.
func (mc *Cluster) UpdateStorageReadStats(storeID, bytesWritten, keysWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesRead = bytesWritten
newStats.KeysRead = keysWritten
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

// UpdateStorageWrittenBytes updates store written bytes.
func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesWritten = bytesWritten
newStats.KeysWritten = bytesWritten / 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
Expand All @@ -411,6 +440,7 @@ func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesRead = bytesRead
newStats.KeysRead = bytesRead / 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
Expand All @@ -424,6 +454,7 @@ func (mc *Cluster) UpdateStorageWrittenKeys(storeID uint64, keysWritten uint64)
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.KeysWritten = keysWritten
newStats.BytesWritten = keysWritten * 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
Expand All @@ -437,6 +468,7 @@ func (mc *Cluster) UpdateStorageReadKeys(storeID uint64, keysRead uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.KeysRead = keysRead
newStats.BytesRead = keysRead * 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
Expand Down
48 changes: 44 additions & 4 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ func summaryStoresLoad(
kind core.ResourceKind,
) map[uint64]*storeLoadDetail {
loadDetail := make(map[uint64]*storeLoadDetail, len(storeByteRate))
allByteSum := 0.0
allKeySum := 0.0
allCount := 0.0

// Stores without byte rate statistics is not available to schedule.
for id, byteRate := range storeByteRate {
Expand Down Expand Up @@ -295,6 +298,9 @@ func summaryStoresLoad(
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(keySum)
}
}
allByteSum += byteRate
allKeySum += keyRate
allCount += float64(len(hotPeers))

// Build store load prediction from current load and pending influence.
stLoadPred := (&storeLoad{
Expand All @@ -309,6 +315,29 @@ func summaryStoresLoad(
HotPeers: hotPeers,
}
}
storeLen := float64(len(storeByteRate))

for id, detail := range loadDetail {
byteExp := allByteSum / storeLen
keyExp := allKeySum / storeLen
countExp := allCount / storeLen
detail.LoadPred.Future.ExpByteRate = byteExp
detail.LoadPred.Future.ExpKeyRate = keyExp
detail.LoadPred.Future.ExpCount = countExp
// Debug
{
ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(byteExp)
}
{
ty := "exp-key-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(keyExp)
}
{
ty := "exp-count-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(countExp)
}
}
return loadDetail
}

Expand Down Expand Up @@ -553,7 +582,12 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
if len(detail.HotPeers) == 0 {
continue
}
ret[id] = detail
if detail.LoadPred.min().ByteRate > bs.sche.conf.GetToleranceRatio()*detail.LoadPred.Future.ExpByteRate &&
detail.LoadPred.min().KeyRate > bs.sche.conf.GetToleranceRatio()*detail.LoadPred.Future.ExpKeyRate {
ret[id] = detail
balanceHotRegionCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc()
}
balanceHotRegionCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc()
}
return ret
}
Expand Down Expand Up @@ -716,6 +750,13 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail, len(candidates))
for _, store := range candidates {
if filter.Target(bs.cluster, store, filters) {
detail := bs.stLoadDetail[store.GetID()]
if detail.LoadPred.max().ByteRate*bs.sche.conf.GetToleranceRatio() < detail.LoadPred.Future.ExpByteRate &&
detail.LoadPred.max().KeyRate*bs.sche.conf.GetToleranceRatio() < detail.LoadPred.Future.ExpKeyRate {
ret[store.GetID()] = bs.stLoadDetail[store.GetID()]
balanceHotRegionCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10)).Inc()
}
balanceHotRegionCounter.WithLabelValues("dst-store-fail", strconv.FormatUint(store.GetID(), 10)).Inc()
ret[store.GetID()] = bs.stLoadDetail[store.GetID()]
}
}
Expand All @@ -731,9 +772,8 @@ func (bs *balanceSolver) calcProgressiveRank() {
rank := int64(0)
if bs.rwTy == write && bs.opTy == transferLeader {
// In this condition, CPU usage is the matter.
// Only consider about count and key rate.
if srcLd.Count > dstLd.Count &&
srcLd.KeyRate >= dstLd.KeyRate+peer.GetKeyRate() {
// Only consider about key rate.
if srcLd.KeyRate >= dstLd.KeyRate+peer.GetKeyRate() {
rank = -1
}
} else {
Expand Down
14 changes: 14 additions & 0 deletions server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig {
GreatDecRatio: 0.95,
MinorDecRatio: 0.99,
MaxPeerNum: 1000,
ToleranceRatio: 1.02, // Tolerate 2% difference
}
}

Expand All @@ -61,6 +62,7 @@ type hotRegionSchedulerConfig struct {
CountRankStepRatio float64 `json:"count-rank-step-ratio"`
GreatDecRatio float64 `json:"great-dec-ratio"`
MinorDecRatio float64 `json:"minor-dec-ratio"`
ToleranceRatio float64 `json:"tolerance-ratio"`
}

func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) {
Expand All @@ -81,6 +83,18 @@ func (conf *hotRegionSchedulerConfig) GetMaxPeerNumber() int {
return conf.MaxPeerNum
}

func (conf *hotRegionSchedulerConfig) GetToleranceRatio() float64 {
conf.RLock()
defer conf.RUnlock()
return conf.ToleranceRatio
}

func (conf *hotRegionSchedulerConfig) SetToleranceRatio(tol float64) {
conf.Lock()
defer conf.Unlock()
conf.ToleranceRatio = tol
}

func (conf *hotRegionSchedulerConfig) GetByteRankStepRatio() float64 {
conf.RLock()
defer conf.RUnlock()
Expand Down
46 changes: 18 additions & 28 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) {
opt := mockoption.NewScheduleOptions()
hb, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil)
c.Assert(err, IsNil)
hb.(*hotScheduler).conf.SetToleranceRatio(1)
opt.HotRegionCacheHitsThreshold = 0

tc := mockcluster.NewCluster(opt)
Expand All @@ -307,17 +308,11 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) {
tc.AddRegionStore(4, 20)
tc.AddRegionStore(5, 20)

tc.UpdateStorageWrittenBytes(1, 10.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(2, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(3, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(4, 9*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(5, 8.9*MB*statistics.StoreHeartBeatReportInterval)

tc.UpdateStorageWrittenKeys(1, 10*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenKeys(2, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenKeys(3, 9.8*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenKeys(4, 9*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenKeys(5, 9.2*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(1, 10.5*MB*statistics.StoreHeartBeatReportInterval, 10.2*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(3, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.8*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(4, 9*MB*statistics.StoreHeartBeatReportInterval, 9*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenStats(5, 8.9*MB*statistics.StoreHeartBeatReportInterval, 9.2*MB*statistics.StoreHeartBeatReportInterval)

addRegionInfo(tc, write, []testRegionInfo{
{1, []uint64{2, 1, 3}, 0.5 * MB, 0.5 * MB},
Expand All @@ -331,19 +326,19 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) {
// byteDecRatio <= 0.95 && keyDecRatio <= 0.95
testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 1, 4)
// store byte rate (min, max): (10, 10.5) | 9.5 | 9.5 | (9, 9.5) | 8.9
// store key rate (min, max): (9.5, 10) | 9.5 | 9.8 | (9, 9.5) | 9.2
// store key rate (min, max): (9.7, 10.2) | 9.5 | 9.8 | (9, 9.5) | 9.2

op = hb.Schedule(tc)[0]
// byteDecRatio <= 0.99 && keyDecRatio <= 0.95
testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 3, 5)
// store byte rate (min, max): (10, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 8.95)
// store key rate (min, max): (9.5, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.3)
// store key rate (min, max): (9.7, 10.2) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.3)

op = hb.Schedule(tc)[0]
// byteDecRatio <= 0.95
testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 1, 5)
// store byte rate (min, max): (9.5, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 9.45)
// store key rate (min, max): (9, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.8)
// store key rate (min, max): (9.2, 10.2) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.8)
}
}

Expand Down Expand Up @@ -586,6 +581,7 @@ func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) {
opt := mockoption.NewScheduleOptions()
hb, err := schedule.CreateScheduler(HotReadRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil)
c.Assert(err, IsNil)
hb.(*hotScheduler).conf.SetToleranceRatio(1)
opt.HotRegionCacheHitsThreshold = 0

tc := mockcluster.NewCluster(opt)
Expand All @@ -595,17 +591,11 @@ func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) {
tc.AddRegionStore(4, 20)
tc.AddRegionStore(5, 20)

tc.UpdateStorageReadBytes(1, 10.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(2, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(3, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(4, 9*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(5, 8.9*MB*statistics.StoreHeartBeatReportInterval)

tc.UpdateStorageReadKeys(1, 10*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadKeys(2, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadKeys(3, 9.8*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadKeys(4, 9*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadKeys(5, 9.2*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(1, 10.5*MB*statistics.StoreHeartBeatReportInterval, 10.2*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.5*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(3, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.8*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(4, 9*MB*statistics.StoreHeartBeatReportInterval, 9*MB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadStats(5, 8.9*MB*statistics.StoreHeartBeatReportInterval, 9.2*MB*statistics.StoreHeartBeatReportInterval)

addRegionInfo(tc, read, []testRegionInfo{
{1, []uint64{1, 2, 4}, 0.5 * MB, 0.5 * MB},
Expand All @@ -619,19 +609,19 @@ func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) {
// byteDecRatio <= 0.95 && keyDecRatio <= 0.95
testutil.CheckTransferLeader(c, op, operator.OpHotRegion, 1, 4)
// store byte rate (min, max): (10, 10.5) | 9.5 | 9.5 | (9, 9.5) | 8.9
// store key rate (min, max): (9.5, 10) | 9.5 | 9.8 | (9, 9.5) | 9.2
// store key rate (min, max): (9.7, 10.2) | 9.5 | 9.8 | (9, 9.5) | 9.2

op = hb.Schedule(tc)[0]
// byteDecRatio <= 0.99 && keyDecRatio <= 0.95
testutil.CheckTransferLeader(c, op, operator.OpHotRegion, 3, 5)
// store byte rate (min, max): (10, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 8.95)
// store key rate (min, max): (9.5, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.3)
// store key rate (min, max): (9.7, 10.2) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.3)

op = hb.Schedule(tc)[0]
// byteDecRatio <= 0.95
testutil.CheckTransferPeerWithLeaderTransfer(c, op, operator.OpHotRegion, 1, 5)
// store byte rate (min, max): (9.5, 10.5) | 9.5 | (9.45, 9.5) | (9, 9.5) | (8.9, 9.45)
// store key rate (min, max): (9, 10) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.8)
// store key rate (min, max): (9.2, 10.2) | 9.5 | (9.7, 9.8) | (9, 9.5) | (9.2, 9.8)
}
}

Expand Down
4 changes: 4 additions & 0 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ type storeLoad struct {
ByteRate float64
KeyRate float64
Count float64

ExpByteRate float64
ExpKeyRate float64
ExpCount float64
}

func (load *storeLoad) ToLoadPred(infl Influence) *storeLoadPred {
Expand Down
81 changes: 81 additions & 0 deletions tools/pd-ctl/pdctl/command/completion_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package command

import (
"os"

"github.com/spf13/cobra"
)

const (
completionLongDesc = `
Output shell completion code for the specified shell (bash).
The shell code must be evaluated to provide interactive
completion of pd-ctl commands. This can be done by sourcing it from
the .bash_profile.
`

completionExample = `
# Installing bash completion on macOS using homebrew
## If running Bash 3.2 included with macOS
brew install bash-completion
## or, if running Bash 4.1+
brew install bash-completion@2
## If tkctl is installed via homebrew, this should start working immediately.
## If you've installed via other means, you may need add the completion to your completion directory
tkctl completion bash > $(brew --prefix)/etc/bash_completion.d/tkctl
# Installing bash completion on Linux
## If bash-completion is not installed on Linux, please install the 'bash-completion' package
## via your distribution's package manager.
## Load the pd-ctl completion code for bash into the current shell
source <(pd-ctl completion bash)
## Write bash completion code to a file and source if from .bash_profile
pd-ctl completion bash > ~/completion.bash.inc
printf "
# pd-ctl shell completion
source '$HOME/completion.bash.inc'
" >> $HOME/.bash_profile
source $HOME/.bash_profile
`
)

// NewCompletionCommand return a completion subcommand of root command
func NewCompletionCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "completion SHELL",
DisableFlagsInUseLine: true,
Short: "Output shell completion code for the specified shell (bash)",
Long: completionLongDesc,
Example: completionExample,
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 1 && args[0] != "bash" {
cmd.Printf("Unsupported shell type %s.\n", args[0])
cmd.Usage()
return
}
if len(args) > 1 {
cmd.Println("Too many arguments. Expected only the shell type.")
cmd.Usage()
return
}
cmd.Root().GenBashCompletion(os.Stdout)
},
ValidArgs: []string{"bash"},
}

return cmd
}
1 change: 1 addition & 0 deletions tools/pd-ctl/pdctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func getBasicCmd() *cobra.Command {
command.NewLogCommand(),
command.NewPluginCommand(),
command.NewComponentCommand(),
command.NewCompletionCommand(),
)

rootCmd.Flags().ParseErrorsWhitelist.UnknownFlags = true
Expand Down

0 comments on commit f8e60b2

Please sign in to comment.