diff --git a/server/api/scheduler_test.go b/server/api/scheduler_test.go index ca569ff2c18..609c4d213bf 100644 --- a/server/api/scheduler_test.go +++ b/server/api/scheduler_test.go @@ -98,7 +98,46 @@ func (s *testScheduleSuite) TestAPI(c *C) { extraTestFunc func(name string, c *C) }{ {name: "balance-leader-scheduler"}, - {name: "balance-hot-region-scheduler"}, + { + name: "balance-hot-region-scheduler", + extraTestFunc: func(name string, c *C) { + resp := make(map[string]interface{}) + listURL := fmt.Sprintf("%s%s%s/%s/list", s.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name) + c.Assert(readJSON(listURL, &resp), IsNil) + expectMap := map[string]float64{ + "min-hot-byte-rate": 100, + "min-hot-key-rate": 10, + "max-zombie-rounds": 3, + "max-peer-number": 1000, + "byte-rate-rank-step-ratio": 0.05, + "key-rate-rank-step-ratio": 0.05, + "count-rank-step-ratio": 0.01, + "great-dec-ratio": 0.95, + "minor-dec-ratio": 0.99, + } + for key := range expectMap { + c.Assert(resp[key], DeepEquals, expectMap[key]) + } + dataMap := make(map[string]interface{}) + dataMap["max-zombie-rounds"] = 5.0 + expectMap["max-zombie-rounds"] = 5.0 + updateURL := fmt.Sprintf("%s%s%s/%s/config", s.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name) + body, err := json.Marshal(dataMap) + c.Assert(err, IsNil) + c.Assert(postJSON(updateURL, body), IsNil) + resp = make(map[string]interface{}) + c.Assert(readJSON(listURL, &resp), IsNil) + for key := range expectMap { + c.Assert(resp[key], DeepEquals, expectMap[key]) + } + // update again + err = postJSON(updateURL, body, func(res []byte, code int) { + c.Assert(string(res), Equals, "no changed") + c.Assert(code, Equals, 200) + }) + c.Assert(err, IsNil) + }, + }, {name: "balance-region-scheduler"}, {name: "shuffle-leader-scheduler"}, {name: "shuffle-region-scheduler"}, diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index a36c97e9e38..9bdf7d4bf31 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -17,6 +17,7 @@ import ( "fmt" "math" "math/rand" + "net/http" "sort" "sync" "time" @@ -39,21 +40,29 @@ func init() { } }) schedule.RegisterScheduler(HotRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - return newHotScheduler(opController), nil + conf := initHotRegionScheduleConfig() + if err := decoder(conf); err != nil { + return nil, err + } + conf.storage = storage + return newHotScheduler(opController, conf), nil }) + // FIXME: remove this two schedule after the balance test move in schedulers package - schedule.RegisterScheduler(HotWriteRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - return newHotWriteScheduler(opController), nil - }) - schedule.RegisterScheduler(HotReadRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - return newHotReadScheduler(opController), nil - }) + { + schedule.RegisterScheduler(HotWriteRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + return newHotWriteScheduler(opController, initHotRegionScheduleConfig()), nil + }) + schedule.RegisterScheduler(HotReadRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + return newHotReadScheduler(opController, initHotRegionScheduleConfig()), nil + }) + + } } const ( // HotRegionName is balance hot region scheduler name. HotRegionName = "balance-hot-region-scheduler" - // HotRegionType is balance hot region scheduler type. HotRegionType = "hot-region" // HotReadRegionType is hot read region scheduler type. @@ -62,24 +71,6 @@ const ( HotWriteRegionType = "hot-write-region" hotRegionLimitFactor = 0.75 - - maxPeerNum = 1000 - - maxZombieDur time.Duration = statistics.StoreHeartBeatReportInterval * time.Second - - minRegionScheduleInterval time.Duration = statistics.StoreHeartBeatReportInterval * time.Second - - minHotByteRate = 100 - minHotKeyRate = 10 - - // rank step ratio decide the step when calculate rank - // step = max current * rank step ratio - byteRateRankStepRatio = 0.05 - keyRateRankStepRatio = 0.05 - countRankStepRatio = 0.1 - - greatDecRatio = 0.95 - minorDecRatio = 0.99 ) type hotScheduler struct { @@ -98,9 +89,11 @@ type hotScheduler struct { // temporary states but exported to API or metrics stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail pendingSums [resourceTypeLen]map[uint64]Influence + // config of hot scheduler + conf *hotRegionSchedulerConfig } -func newHotScheduler(opController *schedule.OperatorController) *hotScheduler { +func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionSchedulerConfig) *hotScheduler { base := NewBaseScheduler(opController) ret := &hotScheduler{ name: HotRegionName, @@ -110,6 +103,7 @@ func newHotScheduler(opController *schedule.OperatorController) *hotScheduler { types: []rwType{write, read}, r: rand.New(rand.NewSource(time.Now().UnixNano())), regionPendings: make(map[uint64][2]*operator.Operator), + conf: conf, } for ty := resourceType(0); ty < resourceTypeLen; ty++ { ret.pendings[ty] = map[*pendingInfluence]struct{}{} @@ -118,15 +112,15 @@ func newHotScheduler(opController *schedule.OperatorController) *hotScheduler { return ret } -func newHotReadScheduler(opController *schedule.OperatorController) *hotScheduler { - ret := newHotScheduler(opController) +func newHotReadScheduler(opController *schedule.OperatorController, conf *hotRegionSchedulerConfig) *hotScheduler { + ret := newHotScheduler(opController, conf) ret.name = "" ret.types = []rwType{read} return ret } -func newHotWriteScheduler(opController *schedule.OperatorController) *hotScheduler { - ret := newHotScheduler(opController) +func newHotWriteScheduler(opController *schedule.OperatorController, conf *hotRegionSchedulerConfig) *hotScheduler { + ret := newHotScheduler(opController, conf) ret.name = "" ret.types = []rwType{write} return ret @@ -140,6 +134,10 @@ func (h *hotScheduler) GetType() string { return HotRegionType } +func (h *hotScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.conf.ServeHTTP(w, r) +} + func (h *hotScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { return h.allowBalanceLeader(cluster) || h.allowBalanceRegion(cluster) } @@ -218,7 +216,7 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { func (h *hotScheduler) summaryPendingInfluence() { for ty := resourceType(0); ty < resourceTypeLen; ty++ { - h.pendingSums[ty] = summaryPendingInfluence(h.pendings[ty], calcPendingWeight) + h.pendingSums[ty] = summaryPendingInfluence(h.pendings[ty], h.calcPendingWeight) } h.gcRegionPendings() } @@ -228,7 +226,7 @@ func (h *hotScheduler) gcRegionPendings() { empty := true for ty, op := range pendings { if op != nil && op.IsEnd() { - if time.Now().After(op.GetCreateTime().Add(minRegionScheduleInterval)) { + if time.Now().After(op.GetCreateTime().Add(h.conf.GetMaxZombieDuration())) { schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec() pendings[ty] = nil } @@ -320,23 +318,27 @@ func filterHotPeers( return ret } -func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, rwTy rwType, opTy opType) { - influence := newPendingInfluence(op, srcStore, dstStore, infl) +func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, rwTy rwType, opTy opType) bool { regionID := op.RegionID() + _, ok := h.regionPendings[regionID] + if ok { + schedulerStatus.WithLabelValues(h.GetName(), "pending_op_fails").Inc() + return false + } + influence := newPendingInfluence(op, srcStore, dstStore, infl) rcTy := toResourceType(rwTy, opTy) h.pendings[rcTy][influence] = struct{}{} - if _, ok := h.regionPendings[regionID]; !ok { - h.regionPendings[regionID] = [2]*operator.Operator{nil, nil} - } + h.regionPendings[regionID] = [2]*operator.Operator{nil, nil} { // h.pendingOpInfos[regionID][ty] = influence tmp := h.regionPendings[regionID] tmp[opTy] = op h.regionPendings[regionID] = tmp } - schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc() + schedulerStatus.WithLabelValues(h.GetName(), "pending_op_create").Inc() + return true } func (h *hotScheduler) balanceHotReadRegions(cluster opt.Cluster) []*operator.Operator { @@ -429,9 +431,9 @@ func (bs *balanceSolver) init() { } bs.rankStep = &storeLoad{ - ByteRate: maxCur.ByteRate * byteRateRankStepRatio, - KeyRate: maxCur.KeyRate * keyRateRankStepRatio, - Count: maxCur.Count * countRankStepRatio, + ByteRate: maxCur.ByteRate * bs.sche.conf.GetByteRankStepRatio(), + KeyRate: maxCur.KeyRate * bs.sche.conf.GetKeyRankStepRatio(), + Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(), } } @@ -499,7 +501,6 @@ func (bs *balanceSolver) solve() []*operator.Operator { for dstStoreID := range bs.filterDstStores() { bs.cur.dstStoreID = dstStoreID bs.calcProgressiveRank() - if bs.cur.progressiveRank < 0 && bs.betterThan(best) { if newOps, newInfls := bs.buildOperators(); len(newOps) > 0 { ops = newOps @@ -513,7 +514,10 @@ func (bs *balanceSolver) solve() []*operator.Operator { } for i := 0; i < len(ops); i++ { - bs.sche.addPendingInfluence(ops[i], best.srcStoreID, best.dstStoreID, infls[i], bs.rwTy, bs.opTy) + // TODO: multiple operators need to be atomic. + if !bs.sche.addPendingInfluence(ops[i], best.srcStoreID, best.dstStoreID, infls[i], bs.rwTy, bs.opTy) { + return nil + } } return ops } @@ -546,9 +550,22 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat { ret := bs.stLoadDetail[bs.cur.srcStoreID].HotPeers - // Return at most maxPeerNum peers, to prevent balanceSolver.solve() too slow. + // Return at most MaxPeerNum peers, to prevent balanceSolver.solve() too slow. + maxPeerNum := bs.sche.conf.GetMaxPeerNumber() + + // filter pending region + appendItem := func(items []*statistics.HotPeerStat, item *statistics.HotPeerStat) []*statistics.HotPeerStat { + if _, ok := bs.sche.regionPendings[item.ID()]; !ok { + items = append(items, item) + } + return items + } if len(ret) <= maxPeerNum { - return ret + nret := make([]*statistics.HotPeerStat, 0, len(ret)) + for _, peer := range ret { + nret = appendItem(nret, peer) + } + return nret } byteSort := make([]*statistics.HotPeerStat, len(ret)) @@ -583,7 +600,7 @@ func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat { } ret = make([]*statistics.HotPeerStat, 0, len(union)) for peer := range union { - ret = append(ret, peer) + ret = appendItem(ret, peer) } return ret } @@ -709,9 +726,10 @@ func (bs *balanceSolver) calcProgressiveRank() { } } else { keyDecRatio := (dstLd.KeyRate + peer.GetKeyRate()) / (srcLd.KeyRate + 1) - keyHot := peer.GetKeyRate() >= minHotKeyRate + keyHot := peer.GetKeyRate() >= bs.sche.conf.GetMinHotKeyRate() byteDecRatio := (dstLd.ByteRate + peer.GetByteRate()) / (srcLd.ByteRate + 1) - byteHot := peer.GetByteRate() > minHotByteRate + byteHot := peer.GetByteRate() > bs.sche.conf.GetMinHotByteRate() + greatDecRatio, minorDecRatio := bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorGreatDecRatio() switch { case byteHot && byteDecRatio <= greatDecRatio && keyHot && keyDecRatio <= greatDecRatio: // Both byte rate and key rate are balanced, the best choice. @@ -1000,7 +1018,7 @@ func (h *hotScheduler) copyPendingInfluence(ty resourceType) map[uint64]Influenc return ret } -func calcPendingWeight(op *operator.Operator) float64 { +func (h *hotScheduler) calcPendingWeight(op *operator.Operator) float64 { if op.CheckExpired() || op.CheckTimeout() { return 0 } @@ -1011,6 +1029,7 @@ func calcPendingWeight(op *operator.Operator) float64 { switch status { case operator.SUCCESS: zombieDur := time.Since(op.GetReachTimeOf(status)) + maxZombieDur := h.conf.GetMaxZombieDuration() if zombieDur >= maxZombieDur { return 0 } diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go new file mode 100644 index 00000000000..2db258ed44e --- /dev/null +++ b/server/schedulers/hot_region_config.go @@ -0,0 +1,190 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + "reflect" + "strings" + "sync" + "time" + + "github.com/gorilla/mux" + "github.com/pingcap/pd/v4/server/core" + "github.com/pingcap/pd/v4/server/schedule" + "github.com/pingcap/pd/v4/server/statistics" + "github.com/unrolled/render" +) + +// params about hot region. +func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { + return &hotRegionSchedulerConfig{ + MinHotByteRate: 100, + MinHotKeyRate: 10, + MaxZombieRounds: 3, + ByteRateRankStepRatio: 0.05, + KeyRateRankStepRatio: 0.05, + CountRankStepRatio: 0.01, + GreatDecRatio: 0.95, + MinorDecRatio: 0.99, + MaxPeerNum: 1000, + } +} + +type hotRegionSchedulerConfig struct { + sync.RWMutex + storage *core.Storage + + MinHotByteRate float64 `json:"min-hot-byte-rate"` + MinHotKeyRate float64 `json:"min-hot-key-rate"` + MaxZombieRounds int `json:"max-zombie-rounds"` + MaxPeerNum int `json:"max-peer-number"` + + // rank step ratio decide the step when calculate rank + // step = max current * rank step ratio + ByteRateRankStepRatio float64 `json:"byte-rate-rank-step-ratio"` + KeyRateRankStepRatio float64 `json:"key-rate-rank-step-ratio"` + CountRankStepRatio float64 `json:"count-rank-step-ratio"` + GreatDecRatio float64 `json:"great-dec-ratio"` + MinorDecRatio float64 `json:"minor-dec-ratio"` +} + +func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) { + conf.RLock() + defer conf.RUnlock() + return schedule.EncodeConfig(conf) +} + +func (conf *hotRegionSchedulerConfig) GetMaxZombieDuration() time.Duration { + conf.RLock() + defer conf.RUnlock() + return time.Duration(conf.MaxZombieRounds) * statistics.StoreHeartBeatReportInterval * time.Second +} + +func (conf *hotRegionSchedulerConfig) GetMaxPeerNumber() int { + conf.RLock() + defer conf.RUnlock() + return conf.MaxPeerNum +} + +func (conf *hotRegionSchedulerConfig) GetByteRankStepRatio() float64 { + conf.RLock() + defer conf.RUnlock() + return conf.ByteRateRankStepRatio +} + +func (conf *hotRegionSchedulerConfig) GetKeyRankStepRatio() float64 { + conf.RLock() + defer conf.RUnlock() + return conf.KeyRateRankStepRatio +} + +func (conf *hotRegionSchedulerConfig) GetCountRankStepRatio() float64 { + conf.RLock() + defer conf.RUnlock() + return conf.CountRankStepRatio +} + +func (conf *hotRegionSchedulerConfig) GetGreatDecRatio() float64 { + conf.RLock() + defer conf.RUnlock() + return conf.GreatDecRatio +} + +func (conf *hotRegionSchedulerConfig) GetMinorGreatDecRatio() float64 { + conf.RLock() + defer conf.RUnlock() + return conf.MinorDecRatio +} + +func (conf *hotRegionSchedulerConfig) GetMinHotKeyRate() float64 { + conf.RLock() + defer conf.RUnlock() + return conf.MinHotKeyRate +} + +func (conf *hotRegionSchedulerConfig) GetMinHotByteRate() float64 { + conf.RLock() + defer conf.RUnlock() + return conf.MinHotByteRate +} + +func (conf *hotRegionSchedulerConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { + router := mux.NewRouter() + router.HandleFunc("/list", conf.handleGetConfig).Methods("GET") + router.HandleFunc("/config", conf.handleSetConfig).Methods("POST") + router.ServeHTTP(w, r) +} + +func (conf *hotRegionSchedulerConfig) handleGetConfig(w http.ResponseWriter, r *http.Request) { + conf.RLock() + defer conf.RUnlock() + rd := render.New(render.Options{IndentJSON: true}) + rd.JSON(w, http.StatusOK, conf) +} + +func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r *http.Request) { + conf.Lock() + defer conf.Unlock() + rd := render.New(render.Options{IndentJSON: true}) + oldc, _ := json.Marshal(conf) + data, err := ioutil.ReadAll(r.Body) + r.Body.Close() + if err != nil { + rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + + if err := json.Unmarshal(data, conf); err != nil { + rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + newc, _ := json.Marshal(conf) + if !bytes.Equal(oldc, newc) { + conf.persist() + rd.Text(w, http.StatusOK, "success") + } + + m := make(map[string]interface{}) + if err := json.Unmarshal(data, &m); err != nil { + rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + t := reflect.TypeOf(conf).Elem() + for i := 0; i < t.NumField(); i++ { + jsonTag := t.Field(i).Tag.Get("json") + if i := strings.Index(jsonTag, ","); i != -1 { // trim 'foobar,string' to 'foobar' + jsonTag = jsonTag[:i] + } + if _, ok := m[jsonTag]; ok { + rd.Text(w, http.StatusOK, "no changed") + return + } + } + + rd.Text(w, http.StatusBadRequest, "config item not found") +} + +func (conf *hotRegionSchedulerConfig) persist() error { + data, err := schedule.EncodeConfig(conf) + if err != nil { + return err + + } + return conf.storage.SaveScheduleConfig(HotRegionName, data) + +} diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index 1317df6bbd5..d937fff7686 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -40,7 +40,7 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { opt := mockoption.NewScheduleOptions() newTestReplication(opt, 3, "zone", "host") tc := mockcluster.NewCluster(opt) - sche, err := schedule.CreateScheduler(HotRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) + sche, err := schedule.CreateScheduler(HotRegionType, schedule.NewOperatorController(ctx, tc, nil), core.NewStorage(kv.NewMemoryKV()), schedule.ConfigJSONDecoder([]byte("null"))) c.Assert(err, IsNil) hb := sche.(*hotScheduler) @@ -67,7 +67,7 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { } shouldRemoveOp := func(region *core.RegionInfo, ty opType) *operator.Operator { op := doneOp(region, ty) - operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-minRegionScheduleInterval)) + operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second)) return op } opCreaters := [4]func(region *core.RegionInfo, ty opType) *operator.Operator{nilOp, shouldRemoveOp, notDoneOp, doneOp} diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index aa565dbddcd..76def9b4deb 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -29,7 +29,7 @@ type HotPeerStat struct { // HotDegree records the hot region update times HotDegree int `json:"hot_degree"` // AntiCount used to eliminate some noise when remove region in cache - AntiCount int + AntiCount int `json:"anti_count"` Kind FlowKind `json:"kind"` ByteRate float64 `json:"flow_bytes"` @@ -42,7 +42,7 @@ type HotPeerStat struct { // LastUpdateTime used to calculate average write LastUpdateTime time.Time `json:"last_update_time"` // Version used to check the region split times - Version uint64 + Version uint64 `json:"version"` needDelete bool isLeader bool diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index 8f64d8039e7..5335eb12916 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -116,9 +116,13 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, storesStats *Sto byteRate := totalBytes / float64(interval) keyRate := totalKeys / float64(interval) - for storeID := range storeIDs { + var tmpItem *HotPeerStat + for _, storeID := range storeIDs { isExpired := f.isRegionExpired(region, storeID) oldItem := f.getOldHotPeerStat(region.GetID(), storeID) + if isExpired && oldItem != nil { + tmpItem = oldItem + } // This is used for the simulator. Ignore if report too fast. if !isExpired && Denoising && interval < hotRegionReportMinInterval { @@ -137,6 +141,11 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, storesStats *Sto isLeader: region.GetLeader().GetStoreId() == storeID, } + // use the tmpItem cached from other store + if oldItem == nil && tmpItem != nil { + oldItem = tmpItem + } + newItem = f.updateHotPeerStat(newItem, oldItem, storesStats) if newItem != nil { ret = append(ret, newItem) @@ -224,13 +233,15 @@ func (f *hotPeerCache) calcHotThresholds(stats *StoresStats, storeID uint64) [di } // gets the storeIDs, including old region and new region -func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) map[uint64]struct{} { +func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) []uint64 { storeIDs := make(map[uint64]struct{}) + ret := make([]uint64, 0, len(region.GetPeers())) // old stores ids, ok := f.storesOfRegion[region.GetID()] if ok { for storeID := range ids { storeIDs[storeID] = struct{}{} + ret = append(ret, storeID) } } @@ -242,10 +253,11 @@ func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) map[uint64]struct } if _, ok := storeIDs[peer.GetStoreId()]; !ok { storeIDs[peer.GetStoreId()] = struct{}{} + ret = append(ret, peer.GetStoreId()) } } - return storeIDs + return ret } func (f *hotPeerCache) isRegionHotWithAnyPeers(region *core.RegionInfo, hotDegree int) bool { diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index aa7cd68aafc..92975a4924b 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -430,11 +430,29 @@ func NewConfigSchedulerCommand() *cobra.Command { c.AddCommand( newConfigEvictLeaderCommand(), newConfigGrantLeaderCommand(), + newConfigHotRegionCommand(), newConfigShuffleRegionCommand(), ) return c } +func newConfigHotRegionCommand() *cobra.Command { + c := &cobra.Command{ + Use: "balance-hot-region-scheduler", + Short: "show evict-leader-scheduler config", + Run: listSchedulerConfigCommandFunc, + } + c.AddCommand(&cobra.Command{ + Use: "list", + Short: "list the config item", + Run: listSchedulerConfigCommandFunc}) + c.AddCommand(&cobra.Command{ + Use: "set ", + Short: "set the config item", + Run: func(cmd *cobra.Command, args []string) { postSchedulerConfigCommandFunc(cmd, c.Name(), args) }}) + return c +} + func newConfigEvictLeaderCommand() *cobra.Command { c := &cobra.Command{ Use: "evict-leader-scheduler", @@ -519,6 +537,22 @@ func listSchedulerConfigCommandFunc(cmd *cobra.Command, args []string) { cmd.Println(r) } +func postSchedulerConfigCommandFunc(cmd *cobra.Command, schedulerName string, args []string) { + if len(args) != 2 { + cmd.Println(cmd.UsageString()) + return + } + var val interface{} + input := make(map[string]interface{}) + key, value := args[0], args[1] + val, err := strconv.ParseFloat(value, 64) + if err != nil { + val = value + } + input[key] = val + postJSON(cmd, path.Join(schedulerConfigPrefix, schedulerName, "config"), input) +} + // convertReomveConfigToReomveScheduler make cmd can be used at removeCommandFunc func convertReomveConfigToReomveScheduler(cmd *cobra.Command) { setCommandUse(cmd, "remove")