Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Parameterize hotspot scheduling and adjust hot cache #2239

Merged
merged 17 commits into from
Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion server/api/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,46 @@ func (s *testScheduleSuite) TestAPI(c *C) {
extraTestFunc func(name string, c *C)
}{
{name: "balance-leader-scheduler"},
{name: "balance-hot-region-scheduler"},
{
name: "balance-hot-region-scheduler",
extraTestFunc: func(name string, c *C) {
resp := make(map[string]interface{})
listURL := fmt.Sprintf("%s%s%s/%s/list", s.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name)
c.Assert(readJSON(listURL, &resp), IsNil)
expectMap := map[string]float64{
"min-hot-byte-rate": 100,
"min-hot-key-rate": 10,
"max-zombie-rounds": 3,
"max-peer-number": 1000,
"byte-rate-rank-step-ratio": 0.05,
"key-rate-rank-step-ratio": 0.05,
"count-rank-step-ratio": 0.01,
"great-dec-ratio": 0.95,
"minor-dec-ratio": 0.99,
}
for key := range expectMap {
c.Assert(resp[key], DeepEquals, expectMap[key])
}
dataMap := make(map[string]interface{})
dataMap["max-zombie-rounds"] = 5.0
expectMap["max-zombie-rounds"] = 5.0
updateURL := fmt.Sprintf("%s%s%s/%s/config", s.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name)
body, err := json.Marshal(dataMap)
c.Assert(err, IsNil)
c.Assert(postJSON(updateURL, body), IsNil)
resp = make(map[string]interface{})
c.Assert(readJSON(listURL, &resp), IsNil)
for key := range expectMap {
c.Assert(resp[key], DeepEquals, expectMap[key])
}
// update again
err = postJSON(updateURL, body, func(res []byte, code int) {
c.Assert(string(res), Equals, "no changed")
c.Assert(code, Equals, 200)
})
c.Assert(err, IsNil)
},
},
{name: "balance-region-scheduler"},
{name: "shuffle-leader-scheduler"},
{name: "shuffle-region-scheduler"},
Expand Down
117 changes: 69 additions & 48 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"math"
"math/rand"
"net/http"
"sort"
"sync"
"time"
Expand All @@ -39,21 +40,29 @@ func init() {
}
})
schedule.RegisterScheduler(HotRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
return newHotScheduler(opController), nil
conf := initHotRegionScheduleConfig()
if err := decoder(conf); err != nil {
return nil, err
}
conf.storage = storage
return newHotScheduler(opController, conf), nil
})

// FIXME: remove this two schedule after the balance test move in schedulers package
schedule.RegisterScheduler(HotWriteRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
return newHotWriteScheduler(opController), nil
})
schedule.RegisterScheduler(HotReadRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
return newHotReadScheduler(opController), nil
})
{
schedule.RegisterScheduler(HotWriteRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
return newHotWriteScheduler(opController, initHotRegionScheduleConfig()), nil
})
schedule.RegisterScheduler(HotReadRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
return newHotReadScheduler(opController, initHotRegionScheduleConfig()), nil
})

}
}

const (
// HotRegionName is balance hot region scheduler name.
HotRegionName = "balance-hot-region-scheduler"

// HotRegionType is balance hot region scheduler type.
HotRegionType = "hot-region"
// HotReadRegionType is hot read region scheduler type.
Expand All @@ -63,23 +72,7 @@ const (

hotRegionLimitFactor = 0.75

maxPeerNum = 1000

maxZombieDur time.Duration = statistics.StoreHeartBeatReportInterval * time.Second

minRegionScheduleInterval time.Duration = statistics.StoreHeartBeatReportInterval * time.Second
nolouch marked this conversation as resolved.
Show resolved Hide resolved

minHotByteRate = 100
minHotKeyRate = 10

// rank step ratio decide the step when calculate rank
// step = max current * rank step ratio
byteRateRankStepRatio = 0.05
keyRateRankStepRatio = 0.05
countRankStepRatio = 0.1

greatDecRatio = 0.95
minorDecRatio = 0.99
)

type hotScheduler struct {
Expand All @@ -98,9 +91,11 @@ type hotScheduler struct {
// temporary states but exported to API or metrics
stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail
pendingSums [resourceTypeLen]map[uint64]Influence
// config of hot scheudler
nolouch marked this conversation as resolved.
Show resolved Hide resolved
conf *hotRegionSchedulerConfig
}

func newHotScheduler(opController *schedule.OperatorController) *hotScheduler {
func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionSchedulerConfig) *hotScheduler {
base := NewBaseScheduler(opController)
ret := &hotScheduler{
name: HotRegionName,
Expand All @@ -110,6 +105,7 @@ func newHotScheduler(opController *schedule.OperatorController) *hotScheduler {
types: []rwType{write, read},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
regionPendings: make(map[uint64][2]*operator.Operator),
conf: conf,
}
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
ret.pendings[ty] = map[*pendingInfluence]struct{}{}
Expand All @@ -118,15 +114,15 @@ func newHotScheduler(opController *schedule.OperatorController) *hotScheduler {
return ret
}

func newHotReadScheduler(opController *schedule.OperatorController) *hotScheduler {
ret := newHotScheduler(opController)
func newHotReadScheduler(opController *schedule.OperatorController, conf *hotRegionSchedulerConfig) *hotScheduler {
ret := newHotScheduler(opController, conf)
ret.name = ""
ret.types = []rwType{read}
return ret
}

func newHotWriteScheduler(opController *schedule.OperatorController) *hotScheduler {
ret := newHotScheduler(opController)
func newHotWriteScheduler(opController *schedule.OperatorController, conf *hotRegionSchedulerConfig) *hotScheduler {
ret := newHotScheduler(opController, conf)
ret.name = ""
ret.types = []rwType{write}
return ret
Expand All @@ -140,6 +136,10 @@ func (h *hotScheduler) GetType() string {
return HotRegionType
}

func (h *hotScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.conf.ServeHTTP(w, r)
}

func (h *hotScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
return h.allowBalanceLeader(cluster) || h.allowBalanceRegion(cluster)
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {

func (h *hotScheduler) summaryPendingInfluence() {
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
h.pendingSums[ty] = summaryPendingInfluence(h.pendings[ty], calcPendingWeight)
h.pendingSums[ty] = summaryPendingInfluence(h.pendings[ty], h.calcPendingWeight)
}
h.gcRegionPendings()
}
Expand All @@ -228,7 +228,7 @@ func (h *hotScheduler) gcRegionPendings() {
empty := true
for ty, op := range pendings {
if op != nil && op.IsEnd() {
if time.Now().After(op.GetCreateTime().Add(minRegionScheduleInterval)) {
if time.Now().After(op.GetCreateTime().Add(h.conf.GetMaxZombieDuration())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetMaxZombieDuration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it still influences the store, which should be consistent with the pendingSum.

schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec()
pendings[ty] = nil
}
Expand Down Expand Up @@ -320,23 +320,27 @@ func filterHotPeers(
return ret
}

func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, rwTy rwType, opTy opType) {
influence := newPendingInfluence(op, srcStore, dstStore, infl)
func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, rwTy rwType, opTy opType) bool {
regionID := op.RegionID()
_, ok := h.regionPendings[regionID]
if ok {
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_fails").Inc()
return false
}

influence := newPendingInfluence(op, srcStore, dstStore, infl)
rcTy := toResourceType(rwTy, opTy)
h.pendings[rcTy][influence] = struct{}{}

if _, ok := h.regionPendings[regionID]; !ok {
h.regionPendings[regionID] = [2]*operator.Operator{nil, nil}
}
h.regionPendings[regionID] = [2]*operator.Operator{nil, nil}
{ // h.pendingOpInfos[regionID][ty] = influence
tmp := h.regionPendings[regionID]
tmp[opTy] = op
h.regionPendings[regionID] = tmp
}

schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc()
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_create").Inc()
return true
}

func (h *hotScheduler) balanceHotReadRegions(cluster opt.Cluster) []*operator.Operator {
Expand Down Expand Up @@ -429,9 +433,9 @@ func (bs *balanceSolver) init() {
}

bs.rankStep = &storeLoad{
ByteRate: maxCur.ByteRate * byteRateRankStepRatio,
KeyRate: maxCur.KeyRate * keyRateRankStepRatio,
Count: maxCur.Count * countRankStepRatio,
ByteRate: maxCur.ByteRate * bs.sche.conf.GetByteRankStepRatio(),
KeyRate: maxCur.KeyRate * bs.sche.conf.GetKeyRankStepRatio(),
Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(),
}
}

Expand Down Expand Up @@ -499,7 +503,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for dstStoreID := range bs.filterDstStores() {
bs.cur.dstStoreID = dstStoreID
bs.calcProgressiveRank()

if bs.cur.progressiveRank < 0 && bs.betterThan(best) {
if newOps, newInfls := bs.buildOperators(); len(newOps) > 0 {
ops = newOps
Expand All @@ -513,7 +516,10 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}

for i := 0; i < len(ops); i++ {
bs.sche.addPendingInfluence(ops[i], best.srcStoreID, best.dstStoreID, infls[i], bs.rwTy, bs.opTy)
// TODO: multiple operators need to be atomic.
if !bs.sche.addPendingInfluence(ops[i], best.srcStoreID, best.dstStoreID, infls[i], bs.rwTy, bs.opTy) {
return nil
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return ops
}
Expand Down Expand Up @@ -546,9 +552,22 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {

func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat {
ret := bs.stLoadDetail[bs.cur.srcStoreID].HotPeers
// Return at most maxPeerNum peers, to prevent balanceSolver.solve() too slow.
// Return at most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
maxPeerNum := bs.sche.conf.GetMaxPeerNumber()

// filter pending region
appendItem := func(items []*statistics.HotPeerStat, item *statistics.HotPeerStat) []*statistics.HotPeerStat {
if _, ok := bs.sche.regionPendings[item.ID()]; !ok {
items = append(items, item)
}
return items
}
if len(ret) <= maxPeerNum {
return ret
nret := make([]*statistics.HotPeerStat, 0, len(ret))
for _, peer := range ret {
nret = appendItem(nret, peer)
}
return nret
}

byteSort := make([]*statistics.HotPeerStat, len(ret))
Expand Down Expand Up @@ -583,7 +602,7 @@ func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat {
}
ret = make([]*statistics.HotPeerStat, 0, len(union))
for peer := range union {
ret = append(ret, peer)
ret = appendItem(ret, peer)
}
return ret
}
Expand Down Expand Up @@ -709,9 +728,10 @@ func (bs *balanceSolver) calcProgressiveRank() {
}
} else {
keyDecRatio := (dstLd.KeyRate + peer.GetKeyRate()) / (srcLd.KeyRate + 1)
keyHot := peer.GetKeyRate() >= minHotKeyRate
keyHot := peer.GetKeyRate() >= bs.sche.conf.GetMinHotKeyRate()
byteDecRatio := (dstLd.ByteRate + peer.GetByteRate()) / (srcLd.ByteRate + 1)
byteHot := peer.GetByteRate() > minHotByteRate
byteHot := peer.GetByteRate() > bs.sche.conf.GetMinHotByteRate()
greatDecRatio, minorDecRatio := bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorGreatDecRatio()
switch {
case byteHot && byteDecRatio <= greatDecRatio && keyHot && keyDecRatio <= greatDecRatio:
// Both byte rate and key rate are balanced, the best choice.
Expand Down Expand Up @@ -1000,7 +1020,7 @@ func (h *hotScheduler) copyPendingInfluence(ty resourceType) map[uint64]Influenc
return ret
}

func calcPendingWeight(op *operator.Operator) float64 {
func (h *hotScheduler) calcPendingWeight(op *operator.Operator) float64 {
if op.CheckExpired() || op.CheckTimeout() {
return 0
}
Expand All @@ -1011,6 +1031,7 @@ func calcPendingWeight(op *operator.Operator) float64 {
switch status {
case operator.SUCCESS:
zombieDur := time.Since(op.GetReachTimeOf(status))
maxZombieDur := h.conf.GetMaxZombieDuration()
if zombieDur >= maxZombieDur {
return 0
}
Expand Down
Loading