Skip to content

Commit

Permalink
scheduler: adjust hot region config when pd-ctl return (#3982) (#3992)
Browse files Browse the repository at this point in the history
* adjust config when pd-ctl return

Signed-off-by: lhy1024 <[email protected]>

* address comments

Signed-off-by: lhy1024 <[email protected]>

* address comments

Signed-off-by: lhy1024 <[email protected]>

* address comments

Signed-off-by: lhy1024 <[email protected]>

* address comment

Signed-off-by: lhy1024 <[email protected]>

* address comment

Signed-off-by: lhy1024 <[email protected]>

* address comment

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: 混沌DM <[email protected]>

Co-authored-by: lhy1024 <[email protected]>
Co-authored-by: 混沌DM <[email protected]>
  • Loading branch information
3 people authored Aug 16, 2021
1 parent 4bf6b18 commit 4c176e8
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 43 deletions.
7 changes: 7 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,13 @@ func (mc *Cluster) DisableFeature(fs ...versioninfo.Feature) {
}
}

// EnableFeature marks that these features are supported in the cluster.
func (mc *Cluster) EnableFeature(fs ...versioninfo.Feature) {
for _, f := range fs {
delete(mc.disabledFeatures, f)
}
}

// IsFeatureSupported checks if the feature is supported by current cluster.
func (mc *Cluster) IsFeatureSupported(f versioninfo.Feature) bool {
_, ok := mc.disabledFeatures[f]
Expand Down
51 changes: 14 additions & 37 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/server/versioninfo"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -403,52 +402,30 @@ func (bs *balanceSolver) init() {
Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(),
}

bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities())
}

func (bs *balanceSolver) isSelectedDim(dim int) bool {
return dim == bs.firstPriority || dim == bs.secondPriority
}

func (bs *balanceSolver) getPriorities() []string {
querySupport := bs.sche.conf.checkQuerySupport(bs.cluster)
// For read, transfer-leader and move-peer have the same priority config
// For write, they are different
switch bs.rwTy {
case read:
bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetReadPriorities(), getReadPriorities)
return adjustConfig(querySupport, bs.sche.conf.GetReadPriorities(), getReadPriorities)
case write:
switch bs.opTy {
case transferLeader:
bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetWriteLeaderPriorities(), getWriteLeaderPriorities)
return adjustConfig(querySupport, bs.sche.conf.GetWriteLeaderPriorities(), getWriteLeaderPriorities)
case movePeer:
bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetWritePeerPriorities(), getWritePeerPriorities)
return adjustConfig(querySupport, bs.sche.conf.GetWritePeerPriorities(), getWritePeerPriorities)
}
}
}

func (bs *balanceSolver) isSelectedDim(dim int) bool {
return dim == bs.firstPriority || dim == bs.secondPriority
}

// adjustConfig will adjust config for cluster with low version tikv
// because tikv below 5.2.0 does not report query information, we will use byte and key as the scheduling dimensions
func (bs *balanceSolver) adjustConfig(origins []string, getPriorities func(*prioritiesConfig) []string) (first, second int) {
querySupport := bs.cluster.IsFeatureSupported(versioninfo.HotScheduleWithQuery)
withQuery := slice.AnyOf(origins, func(i int) bool {
return origins[i] == QueryPriority
})
compatibles := getPriorities(&compatibleConfig)
if !querySupport && withQuery {
schedulerCounter.WithLabelValues(bs.sche.GetName(), "use-compatible-config").Inc()
return prioritiesToDim(compatibles)
}

defaults := getPriorities(&defaultConfig)
isLegal := slice.AllOf(origins, func(i int) bool {
return origins[i] == BytePriority || origins[i] == KeyPriority || origins[i] == QueryPriority
})
if len(defaults) == len(origins) && isLegal && origins[0] != origins[1] {
return prioritiesToDim(origins)
}

if !querySupport {
schedulerCounter.WithLabelValues(bs.sche.GetName(), "use-compatible-config").Inc()
return prioritiesToDim(compatibles)
}
schedulerCounter.WithLabelValues(bs.sche.GetName(), "use-default-config").Inc()
return prioritiesToDim(defaults)
log.Error("illegal type or illegal operator while getting the priority", zap.String("type", bs.rwTy.String()), zap.String("operator", bs.opTy.String()))
return []string{}
}

func newBalanceSolver(sche *hotScheduler, cluster opt.Cluster, rwTy rwType, opTy opType) *balanceSolver {
Expand Down
71 changes: 69 additions & 2 deletions server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ import (
"time"

"github.com/gorilla/mux"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/opt"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/server/versioninfo"
"github.com/unrolled/render"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -79,9 +84,33 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig {
return cfg
}

func (conf *hotRegionSchedulerConfig) getValidConf() *hotRegionSchedulerConfig {
return &hotRegionSchedulerConfig{
MinHotByteRate: conf.MinHotByteRate,
MinHotKeyRate: conf.MinHotKeyRate,
MinHotQueryRate: conf.MinHotQueryRate,
MaxZombieRounds: conf.MaxZombieRounds,
MaxPeerNum: conf.MaxPeerNum,
ByteRateRankStepRatio: conf.ByteRateRankStepRatio,
KeyRateRankStepRatio: conf.KeyRateRankStepRatio,
QueryRateRankStepRatio: conf.QueryRateRankStepRatio,
CountRankStepRatio: conf.CountRankStepRatio,
GreatDecRatio: conf.GreatDecRatio,
MinorDecRatio: conf.MinorDecRatio,
SrcToleranceRatio: conf.SrcToleranceRatio,
DstToleranceRatio: conf.DstToleranceRatio,
ReadPriorities: adjustConfig(conf.lastQuerySupported, conf.ReadPriorities, getReadPriorities),
WriteLeaderPriorities: adjustConfig(conf.lastQuerySupported, conf.WriteLeaderPriorities, getWriteLeaderPriorities),
WritePeerPriorities: adjustConfig(conf.lastQuerySupported, conf.WritePeerPriorities, getWritePeerPriorities),
StrictPickingStore: conf.StrictPickingStore,
EnableForTiFlash: conf.EnableForTiFlash,
}
}

type hotRegionSchedulerConfig struct {
sync.RWMutex
storage *core.Storage
storage *core.Storage
lastQuerySupported bool

MinHotByteRate float64 `json:"min-hot-byte-rate"`
MinHotKeyRate float64 `json:"min-hot-key-rate"`
Expand Down Expand Up @@ -259,7 +288,7 @@ func (conf *hotRegionSchedulerConfig) handleGetConfig(w http.ResponseWriter, r *
conf.RLock()
defer conf.RUnlock()
rd := render.New(render.Options{IndentJSON: true})
rd.JSON(w, http.StatusOK, conf)
rd.JSON(w, http.StatusOK, conf.getValidConf())
}

func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -312,6 +341,19 @@ func (conf *hotRegionSchedulerConfig) persist() error {
return conf.storage.SaveScheduleConfig(HotRegionName, data)
}

func (conf *hotRegionSchedulerConfig) checkQuerySupport(cluster opt.Cluster) bool {
querySupport := cluster.IsFeatureSupported(versioninfo.HotScheduleWithQuery)
if querySupport != conf.lastQuerySupported {
log.Info("query supported changed",
zap.Bool("last-query-support", conf.lastQuerySupported),
zap.String("cluster-version", cluster.GetOpts().GetClusterVersion().String()),
zap.Reflect("config", conf),
zap.Reflect("valid-config", conf.getValidConf()))
conf.lastQuerySupported = querySupport
}
return querySupport
}

type prioritiesConfig struct {
read []string
writeLeader []string
Expand All @@ -335,3 +377,28 @@ func getWriteLeaderPriorities(c *prioritiesConfig) []string {
func getWritePeerPriorities(c *prioritiesConfig) []string {
return c.writePeer
}

// adjustConfig will adjust config for cluster with low version tikv
// because tikv below 5.2.0 does not report query information, we will use byte and key as the scheduling dimensions
func adjustConfig(querySupport bool, origins []string, getPriorities func(*prioritiesConfig) []string) []string {
withQuery := slice.AnyOf(origins, func(i int) bool {
return origins[i] == QueryPriority
})
compatibles := getPriorities(&compatibleConfig)
if !querySupport && withQuery {
return compatibles
}

defaults := getPriorities(&defaultConfig)
isLegal := slice.AllOf(origins, func(i int) bool {
return origins[i] == BytePriority || origins[i] == KeyPriority || origins[i] == QueryPriority
})
if len(defaults) == len(origins) && isLegal && origins[0] != origins[1] {
return origins
}

if !querySupport {
return compatibles
}
return defaults
}
11 changes: 11 additions & 0 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1948,6 +1948,17 @@ func (s *testHotSchedulerSuite) TestCompatibility(c *C) {
{statistics.KeyDim, statistics.ByteDim},
{statistics.ByteDim, statistics.KeyDim},
})
// test version change
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version5_0))
c.Assert(hb.(*hotScheduler).conf.lastQuerySupported, IsFalse)
tc.EnableFeature(versioninfo.HotScheduleWithQuery)
c.Assert(hb.(*hotScheduler).conf.lastQuerySupported, IsFalse) // it will updated after scheduling
checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{
{statistics.QueryDim, statistics.ByteDim},
{statistics.KeyDim, statistics.ByteDim},
{statistics.ByteDim, statistics.KeyDim},
})
c.Assert(hb.(*hotScheduler).conf.lastQuerySupported, IsTrue)
}

func (s *testHotSchedulerSuite) TestCompatibilityConfig(c *C) {
Expand Down
5 changes: 3 additions & 2 deletions tests/pdctl/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ func CheckRegionsInfo(c *check.C, output *api.RegionsInfo, expected []*core.Regi
// MustPutStore is used for test purpose.
func MustPutStore(c *check.C, svr *server.Server, store *metapb.Store) {
store.Address = fmt.Sprintf("tikv%d", store.GetId())
store.Version = versioninfo.MinSupportedVersion(versioninfo.Version2_0).String()

if len(store.Version) == 0 {
store.Version = versioninfo.MinSupportedVersion(versioninfo.Version2_0).String()
}
_, err := svr.PutStore(context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: svr.ClusterID()},
Store: store,
Expand Down
17 changes: 15 additions & 2 deletions tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/versioninfo"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/pdctl"
pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl"
Expand Down Expand Up @@ -270,7 +271,7 @@ func (s *schedulerTestSuite) TestScheduler(c *C) {
"minor-dec-ratio": 0.99,
"src-tolerance-ratio": 1.05,
"dst-tolerance-ratio": 1.05,
"read-priorities": []interface{}{"query", "byte"},
"read-priorities": []interface{}{"byte", "key"},
"write-leader-priorities": []interface{}{"key", "byte"},
"write-peer-priorities": []interface{}{"byte", "key"},
"strict-picking-store": "true",
Expand Down Expand Up @@ -315,11 +316,23 @@ func (s *schedulerTestSuite) TestScheduler(c *C) {
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil)
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
c.Assert(conf1, DeepEquals, expected1)
// test compatibility
for _, store := range stores {
version := versioninfo.HotScheduleWithQuery
store.Version = versioninfo.MinSupportedVersion(version).String()
pdctl.MustPutStore(c, leaderServer.GetServer(), store)
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
}
conf["read-priorities"] = []interface{}{"query", "byte"}
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
// cannot set qps as write-peer-priorities
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil)
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
c.Assert(conf1, DeepEquals, expected1)

// test remove and add
mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil)
mustExec([]string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil)
c.Assert(conf1, DeepEquals, expected1)
// test show scheduler with paused and disabled status.
checkSchedulerWithStatusCommand := func(args []string, status string, expected []string) {
if args != nil {
Expand Down

0 comments on commit 4c176e8

Please sign in to comment.