Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hot region schedule supports store weight #2240

Closed
wants to merge 12 commits into from
16 changes: 16 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,22 @@ func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int) {
mc.PutStore(store)
}

// AddRegionStoreWithWeight adds store with specified count of region and hotRegionWeight.
func (mc *Cluster) AddRegionStoreWithWeight(storeID uint64, regionCount int, hotRegionWeight float64) {
stats := &pdpb.StoreStats{}
stats.Capacity = 1000 * (1 << 20)
stats.Available = stats.Capacity - uint64(regionCount)*10
store := core.NewStoreInfo(
&metapb.Store{Id: storeID},
core.SetStoreStats(stats),
core.SetRegionCount(regionCount),
core.SetRegionSize(int64(regionCount)*10),
core.SetLastHeartbeatTS(time.Now()),
core.SetHotRegionWeight(hotRegionWeight),
)
mc.PutStore(store)
}

// AddRegionStoreWithLeader adds store with specified count of region and leader.
func (mc *Cluster) AddRegionStoreWithLeader(storeID uint64, regionCount int, leaderCounts ...int) {
leaderCount := regionCount
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func createRouter(ctx context.Context, prefix string, svr *server.Server) (*mux.
clusterRouter.HandleFunc("/store/{id}/label", storeHandler.SetLabels).Methods("POST")
clusterRouter.HandleFunc("/store/{id}/weight", storeHandler.SetWeight).Methods("POST")
clusterRouter.HandleFunc("/store/{id}/limit", storeHandler.SetLimit).Methods("POST")
clusterRouter.HandleFunc("/store/{id}/hot-weight", storeHandler.SetHotRegionWeight).Methods("POST")
storesHandler := newStoresHandler(handler, rd)
clusterRouter.Handle("/stores", storesHandler).Methods("GET")
clusterRouter.HandleFunc("/stores/remove-tombstone", storesHandler.RemoveTombStone).Methods("DELETE")
Expand Down
35 changes: 35 additions & 0 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type StoreStatus struct {
StartTS *time.Time `json:"start_ts,omitempty"`
LastHeartbeatTS *time.Time `json:"last_heartbeat_ts,omitempty"`
Uptime *typeutil.Duration `json:"uptime,omitempty"`
HotRegionWeight float64 `json:"hot_region_weight"`
}

// StoreInfo contains information about a store.
Expand Down Expand Up @@ -93,6 +94,7 @@ func newStoreInfo(opt *config.ScheduleConfig, store *core.StoreInfo) *StoreInfo
ReceivingSnapCount: store.GetReceivingSnapCount(),
ApplyingSnapCount: store.GetApplyingSnapCount(),
IsBusy: store.IsBusy(),
HotRegionWeight: store.GetHotRegionWeight(),
},
}

Expand Down Expand Up @@ -283,6 +285,39 @@ func (h *storeHandler) SetWeight(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storeHandler) SetHotRegionWeight(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r.Context())
vars := mux.Vars(r)
storeID, errParse := apiutil.ParseUint64VarsField(vars, "id")
if errParse != nil {
apiutil.ErrorResp(h.rd, w, errcode.NewInvalidInputErr(errParse))
return
}

var input map[string]interface{}
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
}

hotVal, ok := input["hot-weight"]
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "hot region weight unset")
return
}
hot, ok := hotVal.(float64)
if !ok || hot < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat hot region weight")
return
}

if err := rc.SetHotRegionWeight(storeID, hot); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storeHandler) SetLimit(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
storeID, errParse := apiutil.ParseUint64VarsField(vars, "id")
Expand Down
21 changes: 21 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,27 @@ func (c *RaftCluster) SetStoreWeight(storeID uint64, leaderWeight, regionWeight
return c.putStoreLocked(newStore)
}

// SetHotRegionWeight sets up a store's hot region weight.
func (c *RaftCluster) SetHotRegionWeight(storeID uint64, hotRegionWeight float64) error {
c.Lock()
defer c.Unlock()

store := c.GetStore(storeID)
if store == nil {
return core.NewStoreNotFoundErr(storeID)
}

if err := c.storage.SaveHotRegionWeight(storeID, hotRegionWeight); err != nil {
return err
}

newStore := store.Clone(
core.SetHotRegionWeight(hotRegionWeight),
)

return c.putStoreLocked(newStore)
}

func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error {
if c.storage != nil {
if err := c.storage.SaveStore(store.GetMeta()); err != nil {
Expand Down
16 changes: 15 additions & 1 deletion server/core/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (s *Storage) storeRegionWeightPath(storeID uint64) string {
return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "region")
}

func (s *Storage) storeHotRegionWeightPath(storeID uint64) string {
return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "hot")
}

// SaveScheduleConfig saves the config of scheduler.
func (s *Storage) SaveScheduleConfig(scheduleName string, data []byte) error {
configPath := path.Join(customScheduleConfigPath, scheduleName)
Expand Down Expand Up @@ -283,7 +287,11 @@ func (s *Storage) LoadStores(f func(store *StoreInfo)) error {
if err != nil {
return err
}
newStoreInfo := NewStoreInfo(store, SetLeaderWeight(leaderWeight), SetRegionWeight(regionWeight))
hotRegionWeight, err := s.loadFloatWithDefaultValue(s.storeHotRegionWeightPath(store.GetId()), 1.0)
if err != nil {
return err
}
newStoreInfo := NewStoreInfo(store, SetLeaderWeight(leaderWeight), SetRegionWeight(regionWeight), SetHotRegionWeight(hotRegionWeight))

nextID = store.GetId() + 1
f(newStoreInfo)
Expand All @@ -304,6 +312,12 @@ func (s *Storage) SaveStoreWeight(storeID uint64, leader, region float64) error
return s.Save(s.storeRegionWeightPath(storeID), regionValue)
}

// SaveHotRegionWeight saves a store's hot region weight to storage.
func (s *Storage) SaveHotRegionWeight(storeID uint64, hot float64) error {
leaderValue := strconv.FormatFloat(hot, 'f', -1, 64)
return s.Save(s.storeHotRegionWeightPath(storeID), leaderValue)
}

func (s *Storage) loadFloatWithDefaultValue(path string, def float64) (float64, error) {
res, err := s.Load(path)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions server/core/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,16 @@ func (s *testKVSuite) TestStoreWeight(c *C) {
mustSaveStores(c, storage, n)
c.Assert(storage.SaveStoreWeight(1, 2.0, 3.0), IsNil)
c.Assert(storage.SaveStoreWeight(2, 0.2, 0.3), IsNil)
c.Assert(storage.SaveHotRegionWeight(1, 2.0), IsNil)
c.Assert(storage.SaveHotRegionWeight(2, 0.4), IsNil)
c.Assert(storage.LoadStores(cache.SetStore), IsNil)
leaderWeights := []float64{1.0, 2.0, 0.2}
regionWeights := []float64{1.0, 3.0, 0.3}
hotRegionWeights := []float64{1.0, 2.0, 0.4}
for i := 0; i < n; i++ {
c.Assert(cache.GetStore(uint64(i)).GetLeaderWeight(), Equals, leaderWeights[i])
c.Assert(cache.GetStore(uint64(i)).GetRegionWeight(), Equals, regionWeights[i])
c.Assert(cache.GetStore(uint64(i)).GetHotRegionWeight(), Equals, hotRegionWeights[i])
}
}

Expand Down
16 changes: 12 additions & 4 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ type StoreInfo struct {
leaderWeight float64
regionWeight float64
available func() bool
hotRegionWeight float64
}

// NewStoreInfo creates StoreInfo with meta data.
func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo {
storeInfo := &StoreInfo{
meta: store,
stats: &pdpb.StoreStats{},
leaderWeight: 1.0,
regionWeight: 1.0,
meta: store,
stats: &pdpb.StoreStats{},
leaderWeight: 1.0,
regionWeight: 1.0,
hotRegionWeight: 1.0,
}
for _, opt := range opts {
opt(storeInfo)
Expand All @@ -77,6 +79,7 @@ func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo {
leaderWeight: s.leaderWeight,
regionWeight: s.regionWeight,
available: s.available,
hotRegionWeight: s.hotRegionWeight,
}

for _, opt := range opts {
Expand Down Expand Up @@ -243,6 +246,11 @@ func (s *StoreInfo) GetRegionWeight() float64 {
return s.regionWeight
}

// GetHotRegionWeight returns the hot-region-weight of the store.
func (s *StoreInfo) GetHotRegionWeight() float64 {
return s.hotRegionWeight
}

// GetLastHeartbeatTS returns the last heartbeat timestamp of the store.
func (s *StoreInfo) GetLastHeartbeatTS() time.Time {
return time.Unix(0, s.meta.GetLastHeartbeat())
Expand Down
7 changes: 7 additions & 0 deletions server/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ func SetRegionWeight(regionWeight float64) StoreCreateOption {
}
}

// SetHotRegionWeight sets the hot-region-weight for the store.
func SetHotRegionWeight(hotRegionWeight float64) StoreCreateOption {
return func(store *StoreInfo) {
store.hotRegionWeight = hotRegionWeight
}
}

// SetLastHeartbeatTS sets the time of last heartbeat for the store.
func SetLastHeartbeatTS(lastHeartbeatTS time.Time) StoreCreateOption {
return func(store *StoreInfo) {
Expand Down
60 changes: 45 additions & 15 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ const (
HotWriteRegionType = "hot-write-region"

hotRegionLimitFactor = 0.75

minWeight = 1e-6
)

type hotScheduler struct {
Expand Down Expand Up @@ -177,6 +179,7 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
storesStat := cluster.GetStoresStats()

minHotDegree := cluster.GetHotRegionCacheHitsThreshold()
storesHotWeight := getStoresHotWeight(cluster)
{ // update read statistics
regionRead := cluster.RegionReadStats()
storeByte := storesStat.GetStoresBytesReadStat()
Expand All @@ -188,7 +191,9 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
h.pendingSums[readLeader],
regionRead,
minHotDegree,
read, core.LeaderKind)
read,
core.LeaderKind,
storesHotWeight)
}

{ // update write statistics
Expand All @@ -202,15 +207,19 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
h.pendingSums[writeLeader],
regionWrite,
minHotDegree,
write, core.LeaderKind)
write,
core.LeaderKind,
storesHotWeight)

h.stLoadInfos[writePeer] = summaryStoresLoad(
storeByte,
storeKey,
h.pendingSums[writePeer],
regionWrite,
minHotDegree,
write, core.RegionKind)
write,
core.RegionKind,
storesHotWeight)
}
}

Expand Down Expand Up @@ -252,6 +261,7 @@ func summaryStoresLoad(
minHotDegree int,
rwTy rwType,
kind core.ResourceKind,
storesHotWeight map[uint64]float64,
) map[uint64]*storeLoadDetail {
loadDetail := make(map[uint64]*storeLoadDetail, len(storeByteRate))

Expand Down Expand Up @@ -286,12 +296,13 @@ func summaryStoresLoad(
}
}

hotWeight := storesHotWeight[id]
// Build store load prediction from current load and pending influence.
stLoadPred := (&storeLoad{
ByteRate: byteRate,
KeyRate: keyRate,
ByteRate: byteRate / hotWeight,
KeyRate: keyRate / hotWeight,
Count: float64(len(hotPeers)),
}).ToLoadPred(pendings[id])
}).ToLoadPred(pendings[id], hotWeight)

// Construct store load info.
loadDetail[id] = &storeLoadDetail{
Expand Down Expand Up @@ -719,17 +730,20 @@ func (bs *balanceSolver) calcProgressiveRank() {
dstLd := bs.stLoadDetail[bs.cur.dstStoreID].LoadPred.max()
peer := bs.cur.srcPeerStat
rank := int64(0)
peerKeyRate := bs.getRateWithHotRegionWeight(peer.GetKeyRate(), bs.cur.dstStoreID)
peerByteRate := bs.getRateWithHotRegionWeight(peer.GetByteRate(), bs.cur.dstStoreID)

if bs.rwTy == write && bs.opTy == transferLeader {
// In this condition, CPU usage is the matter.
// Only consider about count and key rate.
if srcLd.Count > dstLd.Count &&
srcLd.KeyRate >= dstLd.KeyRate+peer.GetKeyRate() {
srcLd.KeyRate >= dstLd.KeyRate+peerKeyRate {
rank = -1
}
} else {
keyDecRatio := (dstLd.KeyRate + peer.GetKeyRate()) / (srcLd.KeyRate + 1)
keyDecRatio := (dstLd.KeyRate + peerKeyRate) / (srcLd.KeyRate + 1)
keyHot := peer.GetKeyRate() >= bs.sche.conf.GetMinHotKeyRate()
byteDecRatio := (dstLd.ByteRate + peer.GetByteRate()) / (srcLd.ByteRate + 1)
byteDecRatio := (dstLd.ByteRate + peerByteRate) / (srcLd.ByteRate + 1)
byteHot := peer.GetByteRate() > bs.sche.conf.GetMinHotByteRate()
greatDecRatio, minorDecRatio := bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorGreatDecRatio()
switch {
Expand Down Expand Up @@ -774,18 +788,20 @@ func (bs *balanceSolver) betterThan(old *solution) bool {

if bs.cur.srcPeerStat != old.srcPeerStat {
// compare region

srcPeerKeyRate := bs.getRateWithHotRegionWeight(bs.cur.srcPeerStat.GetKeyRate(), bs.cur.srcStoreID)
oldSrcPeerKeyRate := bs.getRateWithHotRegionWeight(old.srcPeerStat.GetKeyRate(), old.srcStoreID)
srcPeerByteRate := bs.getRateWithHotRegionWeight(bs.cur.srcPeerStat.GetByteRate(), bs.cur.srcStoreID)
oldSrcPeerByteRate := bs.getRateWithHotRegionWeight(old.srcPeerStat.GetByteRate(), old.srcStoreID)
if bs.rwTy == write && bs.opTy == transferLeader {
switch {
case bs.cur.srcPeerStat.GetKeyRate() > old.srcPeerStat.GetKeyRate():
case srcPeerKeyRate > oldSrcPeerKeyRate:
return true
case bs.cur.srcPeerStat.GetKeyRate() < old.srcPeerStat.GetKeyRate():
case srcPeerKeyRate < oldSrcPeerKeyRate:
return false
}
} else {
byteRkCmp := rankCmp(bs.cur.srcPeerStat.GetByteRate(), old.srcPeerStat.GetByteRate(), stepRank(0, 100))
keyRkCmp := rankCmp(bs.cur.srcPeerStat.GetKeyRate(), old.srcPeerStat.GetKeyRate(), stepRank(0, 10))

byteRkCmp := rankCmp(srcPeerByteRate, oldSrcPeerByteRate, stepRank(0, 100))
keyRkCmp := rankCmp(srcPeerKeyRate, oldSrcPeerKeyRate, stepRank(0, 10))
switch bs.cur.progressiveRank {
case -2: // greatDecRatio < byteDecRatio <= minorDecRatio && keyDecRatio <= greatDecRatio
if keyRkCmp != 0 {
Expand Down Expand Up @@ -885,6 +901,12 @@ func (bs *balanceSolver) compareDstStore(st1, st2 uint64) int {
return 0
}

// get the keyRate byteRate divide by hotRegionWeight.
func (bs *balanceSolver) getRateWithHotRegionWeight(realRate float64, storeID uint64) float64 {
hotRegionWeight := bs.cluster.GetStore(storeID).GetHotRegionWeight()
return realRate / math.Max(hotRegionWeight, minWeight)
}

func stepRank(rk0 float64, step float64) func(float64) int64 {
return func(rate float64) int64 {
return int64((rate - rk0) / step)
Expand Down Expand Up @@ -1110,3 +1132,11 @@ func toResourceType(rwTy rwType, opTy opType) resourceType {
}
panic(fmt.Sprintf("invalid arguments for toResourceType: rwTy = %v, opTy = %v", rwTy, opTy))
}

func getStoresHotWeight(cluster opt.Cluster) map[uint64]float64 {
res := make(map[uint64]float64, len(cluster.GetStores()))
for _, store := range cluster.GetStores() {
res[store.GetID()] = math.Max(store.GetHotRegionWeight(), minWeight)
}
return res
}
Loading