diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 0847fb02ea7..198d526cf6b 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -734,6 +734,13 @@ func (mc *Cluster) DisableFeature(fs ...versioninfo.Feature) { } } +// EnableFeature marks that these features are supported in the cluster. +func (mc *Cluster) EnableFeature(fs ...versioninfo.Feature) { + for _, f := range fs { + delete(mc.disabledFeatures, f) + } +} + // IsFeatureSupported checks if the feature is supported by current cluster. func (mc *Cluster) IsFeatureSupported(f versioninfo.Feature) bool { _, ok := mc.disabledFeatures[f] diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index f345682dec0..b06afe723e7 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -33,7 +33,6 @@ import ( "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/server/schedule/opt" "github.com/tikv/pd/server/statistics" - "github.com/tikv/pd/server/versioninfo" "go.uber.org/zap" ) @@ -403,52 +402,30 @@ func (bs *balanceSolver) init() { Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(), } + bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities()) +} + +func (bs *balanceSolver) isSelectedDim(dim int) bool { + return dim == bs.firstPriority || dim == bs.secondPriority +} + +func (bs *balanceSolver) getPriorities() []string { + querySupport := bs.sche.conf.checkQuerySupport(bs.cluster) // For read, transfer-leader and move-peer have the same priority config // For write, they are different switch bs.rwTy { case read: - bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetReadPriorities(), getReadPriorities) + return adjustConfig(querySupport, bs.sche.conf.GetReadPriorities(), getReadPriorities) case write: switch bs.opTy { case transferLeader: - bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetWriteLeaderPriorities(), getWriteLeaderPriorities) + return adjustConfig(querySupport, bs.sche.conf.GetWriteLeaderPriorities(), getWriteLeaderPriorities) case movePeer: - bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetWritePeerPriorities(), getWritePeerPriorities) + return adjustConfig(querySupport, bs.sche.conf.GetWritePeerPriorities(), getWritePeerPriorities) } } -} - -func (bs *balanceSolver) isSelectedDim(dim int) bool { - return dim == bs.firstPriority || dim == bs.secondPriority -} - -// adjustConfig will adjust config for cluster with low version tikv -// because tikv below 5.2.0 does not report query information, we will use byte and key as the scheduling dimensions -func (bs *balanceSolver) adjustConfig(origins []string, getPriorities func(*prioritiesConfig) []string) (first, second int) { - querySupport := bs.cluster.IsFeatureSupported(versioninfo.HotScheduleWithQuery) - withQuery := slice.AnyOf(origins, func(i int) bool { - return origins[i] == QueryPriority - }) - compatibles := getPriorities(&compatibleConfig) - if !querySupport && withQuery { - schedulerCounter.WithLabelValues(bs.sche.GetName(), "use-compatible-config").Inc() - return prioritiesToDim(compatibles) - } - - defaults := getPriorities(&defaultConfig) - isLegal := slice.AllOf(origins, func(i int) bool { - return origins[i] == BytePriority || origins[i] == KeyPriority || origins[i] == QueryPriority - }) - if len(defaults) == len(origins) && isLegal && origins[0] != origins[1] { - return prioritiesToDim(origins) - } - - if !querySupport { - schedulerCounter.WithLabelValues(bs.sche.GetName(), "use-compatible-config").Inc() - return prioritiesToDim(compatibles) - } - schedulerCounter.WithLabelValues(bs.sche.GetName(), "use-default-config").Inc() - return prioritiesToDim(defaults) + log.Error("illegal type or illegal operator while getting the priority", zap.String("type", bs.rwTy.String()), zap.String("operator", bs.opTy.String())) + return []string{} } func newBalanceSolver(sche *hotScheduler, cluster opt.Cluster, rwTy rwType, opTy opType) *balanceSolver { diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index 2a4655fefec..08dbe5fa947 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -24,10 +24,15 @@ import ( "time" "github.com/gorilla/mux" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule" + "github.com/tikv/pd/server/schedule/opt" "github.com/tikv/pd/server/statistics" + "github.com/tikv/pd/server/versioninfo" "github.com/unrolled/render" + "go.uber.org/zap" ) const ( @@ -79,9 +84,33 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { return cfg } +func (conf *hotRegionSchedulerConfig) getValidConf() *hotRegionSchedulerConfig { + return &hotRegionSchedulerConfig{ + MinHotByteRate: conf.MinHotByteRate, + MinHotKeyRate: conf.MinHotKeyRate, + MinHotQueryRate: conf.MinHotQueryRate, + MaxZombieRounds: conf.MaxZombieRounds, + MaxPeerNum: conf.MaxPeerNum, + ByteRateRankStepRatio: conf.ByteRateRankStepRatio, + KeyRateRankStepRatio: conf.KeyRateRankStepRatio, + QueryRateRankStepRatio: conf.QueryRateRankStepRatio, + CountRankStepRatio: conf.CountRankStepRatio, + GreatDecRatio: conf.GreatDecRatio, + MinorDecRatio: conf.MinorDecRatio, + SrcToleranceRatio: conf.SrcToleranceRatio, + DstToleranceRatio: conf.DstToleranceRatio, + ReadPriorities: adjustConfig(conf.lastQuerySupported, conf.ReadPriorities, getReadPriorities), + WriteLeaderPriorities: adjustConfig(conf.lastQuerySupported, conf.WriteLeaderPriorities, getWriteLeaderPriorities), + WritePeerPriorities: adjustConfig(conf.lastQuerySupported, conf.WritePeerPriorities, getWritePeerPriorities), + StrictPickingStore: conf.StrictPickingStore, + EnableForTiFlash: conf.EnableForTiFlash, + } +} + type hotRegionSchedulerConfig struct { sync.RWMutex - storage *core.Storage + storage *core.Storage + lastQuerySupported bool MinHotByteRate float64 `json:"min-hot-byte-rate"` MinHotKeyRate float64 `json:"min-hot-key-rate"` @@ -259,7 +288,7 @@ func (conf *hotRegionSchedulerConfig) handleGetConfig(w http.ResponseWriter, r * conf.RLock() defer conf.RUnlock() rd := render.New(render.Options{IndentJSON: true}) - rd.JSON(w, http.StatusOK, conf) + rd.JSON(w, http.StatusOK, conf.getValidConf()) } func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r *http.Request) { @@ -312,6 +341,19 @@ func (conf *hotRegionSchedulerConfig) persist() error { return conf.storage.SaveScheduleConfig(HotRegionName, data) } +func (conf *hotRegionSchedulerConfig) checkQuerySupport(cluster opt.Cluster) bool { + querySupport := cluster.IsFeatureSupported(versioninfo.HotScheduleWithQuery) + if querySupport != conf.lastQuerySupported { + log.Info("query supported changed", + zap.Bool("last-query-support", conf.lastQuerySupported), + zap.String("cluster-version", cluster.GetOpts().GetClusterVersion().String()), + zap.Reflect("config", conf), + zap.Reflect("valid-config", conf.getValidConf())) + conf.lastQuerySupported = querySupport + } + return querySupport +} + type prioritiesConfig struct { read []string writeLeader []string @@ -335,3 +377,28 @@ func getWriteLeaderPriorities(c *prioritiesConfig) []string { func getWritePeerPriorities(c *prioritiesConfig) []string { return c.writePeer } + +// adjustConfig will adjust config for cluster with low version tikv +// because tikv below 5.2.0 does not report query information, we will use byte and key as the scheduling dimensions +func adjustConfig(querySupport bool, origins []string, getPriorities func(*prioritiesConfig) []string) []string { + withQuery := slice.AnyOf(origins, func(i int) bool { + return origins[i] == QueryPriority + }) + compatibles := getPriorities(&compatibleConfig) + if !querySupport && withQuery { + return compatibles + } + + defaults := getPriorities(&defaultConfig) + isLegal := slice.AllOf(origins, func(i int) bool { + return origins[i] == BytePriority || origins[i] == KeyPriority || origins[i] == QueryPriority + }) + if len(defaults) == len(origins) && isLegal && origins[0] != origins[1] { + return origins + } + + if !querySupport { + return compatibles + } + return defaults +} diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index e437cae0a99..3b011de09c3 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -1948,6 +1948,17 @@ func (s *testHotSchedulerSuite) TestCompatibility(c *C) { {statistics.KeyDim, statistics.ByteDim}, {statistics.ByteDim, statistics.KeyDim}, }) + // test version change + tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version5_0)) + c.Assert(hb.(*hotScheduler).conf.lastQuerySupported, IsFalse) + tc.EnableFeature(versioninfo.HotScheduleWithQuery) + c.Assert(hb.(*hotScheduler).conf.lastQuerySupported, IsFalse) // it will updated after scheduling + checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{ + {statistics.QueryDim, statistics.ByteDim}, + {statistics.KeyDim, statistics.ByteDim}, + {statistics.ByteDim, statistics.KeyDim}, + }) + c.Assert(hb.(*hotScheduler).conf.lastQuerySupported, IsTrue) } func (s *testHotSchedulerSuite) TestCompatibilityConfig(c *C) { diff --git a/tests/pdctl/helper.go b/tests/pdctl/helper.go index 6a93ff23403..af0ee27daec 100644 --- a/tests/pdctl/helper.go +++ b/tests/pdctl/helper.go @@ -83,8 +83,9 @@ func CheckRegionsInfo(c *check.C, output *api.RegionsInfo, expected []*core.Regi // MustPutStore is used for test purpose. func MustPutStore(c *check.C, svr *server.Server, store *metapb.Store) { store.Address = fmt.Sprintf("tikv%d", store.GetId()) - store.Version = versioninfo.MinSupportedVersion(versioninfo.Version2_0).String() - + if len(store.Version) == 0 { + store.Version = versioninfo.MinSupportedVersion(versioninfo.Version2_0).String() + } _, err := svr.PutStore(context.Background(), &pdpb.PutStoreRequest{ Header: &pdpb.RequestHeader{ClusterId: svr.ClusterID()}, Store: store, diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index fa958566154..0599ed8c2ba 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" + "github.com/tikv/pd/server/versioninfo" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" @@ -270,7 +271,7 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { "minor-dec-ratio": 0.99, "src-tolerance-ratio": 1.05, "dst-tolerance-ratio": 1.05, - "read-priorities": []interface{}{"query", "byte"}, + "read-priorities": []interface{}{"byte", "key"}, "write-leader-priorities": []interface{}{"key", "byte"}, "write-peer-priorities": []interface{}{"byte", "key"}, "strict-picking-store": "true", @@ -315,11 +316,23 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) c.Assert(conf1, DeepEquals, expected1) + // test compatibility + for _, store := range stores { + version := versioninfo.HotScheduleWithQuery + store.Version = versioninfo.MinSupportedVersion(version).String() + pdctl.MustPutStore(c, leaderServer.GetServer(), store) + mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + } + conf["read-priorities"] = []interface{}{"query", "byte"} + mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) // cannot set qps as write-peer-priorities mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) c.Assert(conf1, DeepEquals, expected1) - + // test remove and add + mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil) + mustExec([]string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil) + c.Assert(conf1, DeepEquals, expected1) // test show scheduler with paused and disabled status. checkSchedulerWithStatusCommand := func(args []string, status string, expected []string) { if args != nil {