Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add hot region schedule #611

Merged
merged 34 commits into from
Apr 27, 2017
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e2982b7
*: add hot region schedule
nolouch Apr 7, 2017
2a2ef7f
*: adjust state outqueue state
nolouch Apr 8, 2017
ca4c248
*:calculate avg bytes write
nolouch Apr 8, 2017
7d8e7bf
*: clean up
nolouch Apr 10, 2017
0f6c35c
address
nolouch Apr 11, 2017
382986e
tiny clean
nolouch Apr 11, 2017
465e6e7
some fix
nolouch Apr 12, 2017
20e78b0
change to lruCache
nolouch Apr 13, 2017
0d21c20
some clean
nolouch Apr 13, 2017
5050c94
address comments
nolouch Apr 15, 2017
f5c9de4
address comments
nolouch Apr 16, 2017
e4f281d
*:tiny fix
nolouch Apr 17, 2017
034e975
balance: add some test
nolouch Apr 17, 2017
9228598
tiny clean
nolouch Apr 17, 2017
a787547
Merge branch 'master' into hot-region-schedule
nolouch Apr 17, 2017
1097525
address comments
nolouch Apr 18, 2017
ef4fe14
Merge branch 'master' into shuning/hot-region-schedule
nolouch Apr 20, 2017
3eae118
address comment
nolouch Apr 20, 2017
f18d1c4
address comments
nolouch Apr 23, 2017
d9ad9f7
fix
nolouch Apr 23, 2017
95e3530
address comments
nolouch Apr 24, 2017
5e8f5a8
tiny fix
nolouch Apr 24, 2017
174782e
address comment
nolouch Apr 24, 2017
1c3605a
tiny fix
nolouch Apr 24, 2017
30bcb69
address comment
nolouch Apr 24, 2017
ce794ce
Merge branch 'master' into shuning/hot-region-schedule
disksing Apr 25, 2017
bad2c8a
fix ci
nolouch Apr 25, 2017
18e60e9
Merge branch 'shuning/hot-region-schedule' of https://github.com/ping…
nolouch Apr 25, 2017
c2469d7
tiny fix
nolouch Apr 25, 2017
7d80df0
Merge branch 'master' into shuning/hot-region-schedule
zhangjinpeng87 Apr 26, 2017
2760bc9
address comment
nolouch Apr 26, 2017
79a39d8
Merge branch 'shuning/hot-region-schedule' of https://github.com/ping…
nolouch Apr 26, 2017
79b9de3
address comment
nolouch Apr 26, 2017
91e69c6
address comment
nolouch Apr 27, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions server/api/hot_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"net/http"

"github.com/pingcap/pd/server"
"github.com/unrolled/render"
)

type hotStatusHandler struct {
svr *server.Server
rd *render.Render
}

func newHotStatusHandler(svr *server.Server, rd *render.Render) *hotStatusHandler {
return &hotStatusHandler{
svr: svr,
rd: rd,
}
}

func (h *hotStatusHandler) GetHotRegions(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, h.svr.GetHotRegions())
}

func (h *hotStatusHandler) GetHotStores(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, h.svr.GetHotStores())
}
4 changes: 4 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router.HandleFunc("/api/v1/labels", labelsHandler.Get).Methods("GET")
router.HandleFunc("/api/v1/labels/stores", labelsHandler.GetStores).Methods("GET")

hotStatusHandler := newHotStatusHandler(svr, rd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hot or hotspot?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/api/v1/hotspot/host
/api/v1/hotspot/store?

router.HandleFunc("/api/v1/hotspot/regions", hotStatusHandler.GetHotRegions).Methods("GET")
router.HandleFunc("/api/v1/hotspot/stores", hotStatusHandler.GetHotStores).Methods("GET")

router.Handle("/api/v1/events", newEventsHandler(svr, rd)).Methods("GET")
router.Handle("/api/v1/feed", newFeedHandler(svr, rd)).Methods("GET")

Expand Down
261 changes: 261 additions & 0 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package server

import (
"math"
"math/rand"
"sort"
"time"

"github.com/montanaflynn/stats"
Expand Down Expand Up @@ -370,3 +372,262 @@ func (r *replicaChecker) checkBestReplacement(region *RegionInfo) Operator {
}
return newTransferPeer(region, oldPeer, newPeer)
}

// RegionStat records each hot region's statistics
type RegionStat struct {
RegionID uint64 `json:"region_id"`
WrittenBytes uint64 `json:"written_bytes"`
HotDegree int `json:"hot_degree"`
LastUpdateTime time.Time `json:"last_update_time"`
StoreID uint64 `json:"-"`
antiCount int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment for this member.

version uint64
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any need to explain what do WrittenBytes / HotDegree / antiCount mean and what they are used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for JSON encoding

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess @disksing wants to know the meaning of some fields, not only for JSON.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I know what they do generally, but I think there should be some explanation here, for potential future readers.


// ListRegionsStat is a list of a group region state type
type ListRegionsStat []RegionStat
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ListRegionsStat/RegionsStat

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RegionStats or RegionStatList? cc @zhangjinpeng1987 What's your opinion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RegionStats


func (m ListRegionsStat) Len() int { return len(m) }
func (m ListRegionsStat) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m ListRegionsStat) Less(i, j int) bool { return m[i].WrittenBytes < m[j].WrittenBytes }

// StoreHotRegions records all hot regions in one store with sequence
type StoreHotRegions struct {
TotalWrittenBytes uint64 `json:"total_written"`
RegionCount int `json:"region_count"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is RegionCount always equal to RegionsStat.Len()? Seems can be discarded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can see count directly in api more easy to know how many hot regions in that store.

RegionsStat ListRegionsStat `json:"status"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json:"stats"?

}

type balanceHotRegionScheduler struct {
opt *scheduleOption
limit uint64
scoreStatus map[uint64]*StoreHotRegions // store id -> regions status in this store
r *rand.Rand
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not using global Rand directly? Using Source directly is not thread safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just used in one thread.

}

func newBalanceHotRegionScheduler(opt *scheduleOption) *balanceHotRegionScheduler {
return &balanceHotRegionScheduler{
opt: opt,
limit: 1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we modify the limit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use a const var

scoreStatus: make(map[uint64]*StoreHotRegions),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

func (l *balanceHotRegionScheduler) GetName() string {
return "balance-hot-region-scheduler"
}

func (l *balanceHotRegionScheduler) GetResourceKind() ResourceKind {
return priorityKind
}

func (l *balanceHotRegionScheduler) GetResourceLimit() uint64 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/l/s/

return l.limit
}

func (l *balanceHotRegionScheduler) Prepare(cluster *clusterInfo) error { return nil }

func (l *balanceHotRegionScheduler) Cleanup(cluster *clusterInfo) {}

func (l *balanceHotRegionScheduler) Schedule(cluster *clusterInfo) Operator {
l.calculateScore(cluster)
region := l.SelectSourceRegion(cluster)
if region == nil {
return nil
}
newLeader := l.selectTransferLeader(region)
if newLeader != nil {
return newPriorityTransferLeader(region, newLeader)
}
peer := l.selectTransferPeer(region, cluster)
if peer != nil {
return newPriorityTransferPeer(region, region.Leader, peer)
}
return nil
}

func (l *balanceHotRegionScheduler) clearScore() {
l.scoreStatus = make(map[uint64]*StoreHotRegions)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to abstract a function here. It's only 1 line and be called only at 1 place.

func (l *balanceHotRegionScheduler) calculateScore(cluster *clusterInfo) {
l.clearScore()
items := cluster.writeStatistics.elems()
for _, item := range items {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to access items after unlocked?

r, ok := item.value.(RegionStat)
if !ok {
continue
}
if r.HotDegree < hotRegionLowThreshold {
continue
}

regionInfo := cluster.getRegion(r.RegionID)
storeID := regionInfo.Leader.GetStoreId()
status, ok := l.scoreStatus[storeID]
if !ok {
status = &StoreHotRegions{
RegionsStat: make(ListRegionsStat, 0, 100),
}
l.scoreStatus[storeID] = status
}
status.TotalWrittenBytes += r.WrittenBytes
status.RegionsStat = append(status.RegionsStat, RegionStat{
RegionID: r.RegionID,
WrittenBytes: r.WrittenBytes,
HotDegree: r.HotDegree,
LastUpdateTime: r.LastUpdateTime,
StoreID: storeID,
antiCount: r.antiCount,
version: r.version,
})
status.RegionCount++
}

for _, rs := range l.scoreStatus {
sort.Sort(sort.Reverse(rs.RegionsStat))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for _, rs := range l.scoreStatus {
    sort.Sort(rs.MetaStatus)
}

should be ok.

}

func (l *balanceHotRegionScheduler) SelectSourceRegion(cluster *clusterInfo) *RegionInfo {
var (
maxWritten uint64
sourceStore uint64
maxHotStoreRegionCount int
)
// choose a hot store as transfer source
// the numbers of the hot regions in that store has higher priority than TotalWrittenBytes
for sid, s := range l.scoreStatus {
if s.RegionsStat.Len() < 2 {
continue
}

if maxHotStoreRegionCount < s.RegionsStat.Len() {
maxHotStoreRegionCount = s.RegionsStat.Len()
maxWritten = s.TotalWrittenBytes
sourceStore = sid
continue
}

if maxHotStoreRegionCount == s.RegionsStat.Len() && maxWritten < s.TotalWrittenBytes {
maxWritten = s.TotalWrittenBytes
sourceStore = sid
}
}

if sourceStore == 0 {
return nil
}

length := l.scoreStatus[sourceStore].RegionsStat.Len()
// the hottest region in the store not move
// radmonly pick a region from 1 .. length-1
// TODO: consider hot degree when pick
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment // the hottest region for one store never move.

rr := l.r.Int31n(int32(length-1)) + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment for this algorithm.

pickedRegionStat := l.scoreStatus[sourceStore].RegionsStat[rr]
if pickedRegionStat.antiCount < hotRegionAntiCount {
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try to peek another hot region in that condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will retry in scheduleController

}
sourceRegion := cluster.getRegion(pickedRegionStat.RegionID)
if len(sourceRegion.DownPeers) != 0 || len(sourceRegion.PendingPeers) != 0 {
return nil
}
// use written bytes per second
sourceRegion.WrittenBytes = pickedRegionStat.WrittenBytes
l.adjustBalanceLimit(sourceStore)
return sourceRegion
}

func (l *balanceHotRegionScheduler) adjustBalanceLimit(storeID uint64) {
s := l.scoreStatus[storeID]
var hotRegionTotalCount float64
for _, m := range l.scoreStatus {
hotRegionTotalCount += float64(m.RegionsStat.Len())
}

avgRegionCount := hotRegionTotalCount / float64(len(l.scoreStatus))
// Multiplied by 0.75 to avoid transfer back and forth
limit := uint64((float64(s.RegionsStat.Len()) - avgRegionCount) * 0.75)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

define a const var for 0.75

l.limit = maxUint64(1, limit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 means ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least 1.

}

func (l *balanceHotRegionScheduler) GetStatus() map[uint64]*StoreHotRegions {
return l.scoreStatus
}

func (l *balanceHotRegionScheduler) selectTransferLeader(sourceRegion *RegionInfo) *metapb.Peer {
followPeers := sourceRegion.GetFollowers()
storeIDs := make([]uint64, 0, len(followPeers))
for _, peer := range followPeers {
storeIDs = append(storeIDs, peer.GetStoreId())
}

targetStoreID := l.selectBestStore(storeIDs, sourceRegion)
var targetPeer *metapb.Peer
for _, peer := range followPeers {
if peer.GetStoreId() == targetStoreID {
targetPeer = peer
}
}
return targetPeer
}

func (l *balanceHotRegionScheduler) selectTransferPeer(sourceRegion *RegionInfo, cluster *clusterInfo) *metapb.Peer {
filter := newExcludedFilter(sourceRegion.GetStoreIds(), sourceRegion.GetStoreIds())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to consider store labels when move peer?
FYI, the balanceRegionScheduler use distinctScoreFilter to make sure replicas of regions are separated as much as possible.

stores := cluster.getStores()

storeIDs := make([]uint64, 0, len(stores))
for _, store := range stores {
if filter.FilterTarget(store) {
continue
}
storeIDs = append(storeIDs, store.GetId())
}
targetStoreID := l.selectBestStore(storeIDs, sourceRegion)
targetStore := cluster.getStore(targetStoreID)
if targetStore == nil {
return nil
}
newPeer, err := cluster.allocPeer(targetStore.GetId())
if err != nil {
log.Errorf("failed to allocate peer: %v", err)
return nil
}

return newPeer
}

// select a store to transfer peer
// preferred to the store that with the least number of regions
// and then choose the least total written bytes store
func (l *balanceHotRegionScheduler) selectBestStore(stores []uint64, sourceRegion *RegionInfo) uint64 {
sr := l.scoreStatus[sourceRegion.Leader.GetStoreId()]
sourceStoreWrittenBytes := sr.TotalWrittenBytes
sourceStoreHotRegionCount := sr.RegionsStat.Len()

var (
targetStore uint64
minWrittenBytes uint64 = math.MaxUint64
)
minRegionCount := int(math.MaxInt32)
for _, store := range stores {
if s, ok := l.scoreStatus[store]; ok {
if sourceStoreHotRegionCount-s.RegionsStat.Len() > 1 && minRegionCount > s.RegionsStat.Len() {
targetStore = store
minWrittenBytes = s.TotalWrittenBytes
minRegionCount = s.RegionsStat.Len()
continue
}
if minRegionCount == s.RegionsStat.Len() && minWrittenBytes > s.TotalWrittenBytes &&
uint64(float64(sourceStoreWrittenBytes)*hotRegionScheduleFactor) > s.TotalWrittenBytes+2*sourceRegion.WrittenBytes {
minWrittenBytes = s.TotalWrittenBytes
targetStore = store
}

} else {
targetStore = store
break
}
}
return targetStore
}
Loading