Skip to content

Commit

Permalink
*: Parameterize hotspot scheduling and adjust hot cache (tikv#2239)
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Apr 10, 2020
1 parent 044382f commit e11d5b1
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 58 deletions.
41 changes: 40 additions & 1 deletion server/api/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,46 @@ func (s *testScheduleSuite) TestAPI(c *C) {
extraTestFunc func(name string, c *C)
}{
{name: "balance-leader-scheduler"},
{name: "balance-hot-region-scheduler"},
{
name: "balance-hot-region-scheduler",
extraTestFunc: func(name string, c *C) {
resp := make(map[string]interface{})
listURL := fmt.Sprintf("%s%s%s/%s/list", s.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name)
c.Assert(readJSON(listURL, &resp), IsNil)
expectMap := map[string]float64{
"min-hot-byte-rate": 100,
"min-hot-key-rate": 10,
"max-zombie-rounds": 3,
"max-peer-number": 1000,
"byte-rate-rank-step-ratio": 0.05,
"key-rate-rank-step-ratio": 0.05,
"count-rank-step-ratio": 0.01,
"great-dec-ratio": 0.95,
"minor-dec-ratio": 0.99,
}
for key := range expectMap {
c.Assert(resp[key], DeepEquals, expectMap[key])
}
dataMap := make(map[string]interface{})
dataMap["max-zombie-rounds"] = 5.0
expectMap["max-zombie-rounds"] = 5.0
updateURL := fmt.Sprintf("%s%s%s/%s/config", s.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name)
body, err := json.Marshal(dataMap)
c.Assert(err, IsNil)
c.Assert(postJSON(updateURL, body), IsNil)
resp = make(map[string]interface{})
c.Assert(readJSON(listURL, &resp), IsNil)
for key := range expectMap {
c.Assert(resp[key], DeepEquals, expectMap[key])
}
// update again
err = postJSON(updateURL, body, func(res []byte, code int) {
c.Assert(string(res), Equals, "no changed")
c.Assert(code, Equals, 200)
})
c.Assert(err, IsNil)
},
},
{name: "balance-region-scheduler"},
{name: "shuffle-leader-scheduler"},
{name: "shuffle-region-scheduler"},
Expand Down
119 changes: 69 additions & 50 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"math"
"math/rand"
"net/http"
"sort"
"sync"
"time"
Expand All @@ -39,21 +40,29 @@ func init() {
}
})
schedule.RegisterScheduler(HotRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
return newHotScheduler(opController), nil
conf := initHotRegionScheduleConfig()
if err := decoder(conf); err != nil {
return nil, err
}
conf.storage = storage
return newHotScheduler(opController, conf), nil
})

// FIXME: remove this two schedule after the balance test move in schedulers package
schedule.RegisterScheduler(HotWriteRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
return newHotWriteScheduler(opController), nil
})
schedule.RegisterScheduler(HotReadRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
return newHotReadScheduler(opController), nil
})
{
schedule.RegisterScheduler(HotWriteRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
return newHotWriteScheduler(opController, initHotRegionScheduleConfig()), nil
})
schedule.RegisterScheduler(HotReadRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
return newHotReadScheduler(opController, initHotRegionScheduleConfig()), nil
})

}
}

const (
// HotRegionName is balance hot region scheduler name.
HotRegionName = "balance-hot-region-scheduler"

// HotRegionType is balance hot region scheduler type.
HotRegionType = "hot-region"
// HotReadRegionType is hot read region scheduler type.
Expand All @@ -62,24 +71,6 @@ const (
HotWriteRegionType = "hot-write-region"

hotRegionLimitFactor = 0.75

maxPeerNum = 1000

maxZombieDur time.Duration = statistics.StoreHeartBeatReportInterval * time.Second

minRegionScheduleInterval time.Duration = statistics.StoreHeartBeatReportInterval * time.Second

minHotByteRate = 100
minHotKeyRate = 10

// rank step ratio decide the step when calculate rank
// step = max current * rank step ratio
byteRateRankStepRatio = 0.05
keyRateRankStepRatio = 0.05
countRankStepRatio = 0.1

greatDecRatio = 0.95
minorDecRatio = 0.99
)

type hotScheduler struct {
Expand All @@ -98,9 +89,11 @@ type hotScheduler struct {
// temporary states but exported to API or metrics
stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail
pendingSums [resourceTypeLen]map[uint64]Influence
// config of hot scheduler
conf *hotRegionSchedulerConfig
}

func newHotScheduler(opController *schedule.OperatorController) *hotScheduler {
func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionSchedulerConfig) *hotScheduler {
base := NewBaseScheduler(opController)
ret := &hotScheduler{
name: HotRegionName,
Expand All @@ -110,6 +103,7 @@ func newHotScheduler(opController *schedule.OperatorController) *hotScheduler {
types: []rwType{write, read},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
regionPendings: make(map[uint64][2]*operator.Operator),
conf: conf,
}
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
ret.pendings[ty] = map[*pendingInfluence]struct{}{}
Expand All @@ -118,15 +112,15 @@ func newHotScheduler(opController *schedule.OperatorController) *hotScheduler {
return ret
}

func newHotReadScheduler(opController *schedule.OperatorController) *hotScheduler {
ret := newHotScheduler(opController)
func newHotReadScheduler(opController *schedule.OperatorController, conf *hotRegionSchedulerConfig) *hotScheduler {
ret := newHotScheduler(opController, conf)
ret.name = ""
ret.types = []rwType{read}
return ret
}

func newHotWriteScheduler(opController *schedule.OperatorController) *hotScheduler {
ret := newHotScheduler(opController)
func newHotWriteScheduler(opController *schedule.OperatorController, conf *hotRegionSchedulerConfig) *hotScheduler {
ret := newHotScheduler(opController, conf)
ret.name = ""
ret.types = []rwType{write}
return ret
Expand All @@ -140,6 +134,10 @@ func (h *hotScheduler) GetType() string {
return HotRegionType
}

func (h *hotScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.conf.ServeHTTP(w, r)
}

func (h *hotScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
return h.allowBalanceLeader(cluster) || h.allowBalanceRegion(cluster)
}
Expand Down Expand Up @@ -218,7 +216,7 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {

func (h *hotScheduler) summaryPendingInfluence() {
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
h.pendingSums[ty] = summaryPendingInfluence(h.pendings[ty], calcPendingWeight)
h.pendingSums[ty] = summaryPendingInfluence(h.pendings[ty], h.calcPendingWeight)
}
h.gcRegionPendings()
}
Expand All @@ -228,7 +226,7 @@ func (h *hotScheduler) gcRegionPendings() {
empty := true
for ty, op := range pendings {
if op != nil && op.IsEnd() {
if time.Now().After(op.GetCreateTime().Add(minRegionScheduleInterval)) {
if time.Now().After(op.GetCreateTime().Add(h.conf.GetMaxZombieDuration())) {
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec()
pendings[ty] = nil
}
Expand Down Expand Up @@ -320,23 +318,27 @@ func filterHotPeers(
return ret
}

func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, rwTy rwType, opTy opType) {
influence := newPendingInfluence(op, srcStore, dstStore, infl)
func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, rwTy rwType, opTy opType) bool {
regionID := op.RegionID()
_, ok := h.regionPendings[regionID]
if ok {
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_fails").Inc()
return false
}

influence := newPendingInfluence(op, srcStore, dstStore, infl)
rcTy := toResourceType(rwTy, opTy)
h.pendings[rcTy][influence] = struct{}{}

if _, ok := h.regionPendings[regionID]; !ok {
h.regionPendings[regionID] = [2]*operator.Operator{nil, nil}
}
h.regionPendings[regionID] = [2]*operator.Operator{nil, nil}
{ // h.pendingOpInfos[regionID][ty] = influence
tmp := h.regionPendings[regionID]
tmp[opTy] = op
h.regionPendings[regionID] = tmp
}

schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc()
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_create").Inc()
return true
}

func (h *hotScheduler) balanceHotReadRegions(cluster opt.Cluster) []*operator.Operator {
Expand Down Expand Up @@ -429,9 +431,9 @@ func (bs *balanceSolver) init() {
}

bs.rankStep = &storeLoad{
ByteRate: maxCur.ByteRate * byteRateRankStepRatio,
KeyRate: maxCur.KeyRate * keyRateRankStepRatio,
Count: maxCur.Count * countRankStepRatio,
ByteRate: maxCur.ByteRate * bs.sche.conf.GetByteRankStepRatio(),
KeyRate: maxCur.KeyRate * bs.sche.conf.GetKeyRankStepRatio(),
Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(),
}
}

Expand Down Expand Up @@ -499,7 +501,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for dstStoreID := range bs.filterDstStores() {
bs.cur.dstStoreID = dstStoreID
bs.calcProgressiveRank()

if bs.cur.progressiveRank < 0 && bs.betterThan(best) {
if newOps, newInfls := bs.buildOperators(); len(newOps) > 0 {
ops = newOps
Expand All @@ -513,7 +514,10 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}

for i := 0; i < len(ops); i++ {
bs.sche.addPendingInfluence(ops[i], best.srcStoreID, best.dstStoreID, infls[i], bs.rwTy, bs.opTy)
// TODO: multiple operators need to be atomic.
if !bs.sche.addPendingInfluence(ops[i], best.srcStoreID, best.dstStoreID, infls[i], bs.rwTy, bs.opTy) {
return nil
}
}
return ops
}
Expand Down Expand Up @@ -546,9 +550,22 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {

func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat {
ret := bs.stLoadDetail[bs.cur.srcStoreID].HotPeers
// Return at most maxPeerNum peers, to prevent balanceSolver.solve() too slow.
// Return at most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
maxPeerNum := bs.sche.conf.GetMaxPeerNumber()

// filter pending region
appendItem := func(items []*statistics.HotPeerStat, item *statistics.HotPeerStat) []*statistics.HotPeerStat {
if _, ok := bs.sche.regionPendings[item.ID()]; !ok {
items = append(items, item)
}
return items
}
if len(ret) <= maxPeerNum {
return ret
nret := make([]*statistics.HotPeerStat, 0, len(ret))
for _, peer := range ret {
nret = appendItem(nret, peer)
}
return nret
}

byteSort := make([]*statistics.HotPeerStat, len(ret))
Expand Down Expand Up @@ -583,7 +600,7 @@ func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat {
}
ret = make([]*statistics.HotPeerStat, 0, len(union))
for peer := range union {
ret = append(ret, peer)
ret = appendItem(ret, peer)
}
return ret
}
Expand Down Expand Up @@ -709,9 +726,10 @@ func (bs *balanceSolver) calcProgressiveRank() {
}
} else {
keyDecRatio := (dstLd.KeyRate + peer.GetKeyRate()) / (srcLd.KeyRate + 1)
keyHot := peer.GetKeyRate() >= minHotKeyRate
keyHot := peer.GetKeyRate() >= bs.sche.conf.GetMinHotKeyRate()
byteDecRatio := (dstLd.ByteRate + peer.GetByteRate()) / (srcLd.ByteRate + 1)
byteHot := peer.GetByteRate() > minHotByteRate
byteHot := peer.GetByteRate() > bs.sche.conf.GetMinHotByteRate()
greatDecRatio, minorDecRatio := bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorGreatDecRatio()
switch {
case byteHot && byteDecRatio <= greatDecRatio && keyHot && keyDecRatio <= greatDecRatio:
// Both byte rate and key rate are balanced, the best choice.
Expand Down Expand Up @@ -1000,7 +1018,7 @@ func (h *hotScheduler) copyPendingInfluence(ty resourceType) map[uint64]Influenc
return ret
}

func calcPendingWeight(op *operator.Operator) float64 {
func (h *hotScheduler) calcPendingWeight(op *operator.Operator) float64 {
if op.CheckExpired() || op.CheckTimeout() {
return 0
}
Expand All @@ -1011,6 +1029,7 @@ func calcPendingWeight(op *operator.Operator) float64 {
switch status {
case operator.SUCCESS:
zombieDur := time.Since(op.GetReachTimeOf(status))
maxZombieDur := h.conf.GetMaxZombieDuration()
if zombieDur >= maxZombieDur {
return 0
}
Expand Down
Loading

0 comments on commit e11d5b1

Please sign in to comment.