Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add hot region schedule #611

Merged
merged 34 commits into from
Apr 27, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e2982b7
*: add hot region schedule
nolouch Apr 7, 2017
2a2ef7f
*: adjust state outqueue state
nolouch Apr 8, 2017
ca4c248
*:calculate avg bytes write
nolouch Apr 8, 2017
7d8e7bf
*: clean up
nolouch Apr 10, 2017
0f6c35c
address
nolouch Apr 11, 2017
382986e
tiny clean
nolouch Apr 11, 2017
465e6e7
some fix
nolouch Apr 12, 2017
20e78b0
change to lruCache
nolouch Apr 13, 2017
0d21c20
some clean
nolouch Apr 13, 2017
5050c94
address comments
nolouch Apr 15, 2017
f5c9de4
address comments
nolouch Apr 16, 2017
e4f281d
*:tiny fix
nolouch Apr 17, 2017
034e975
balance: add some test
nolouch Apr 17, 2017
9228598
tiny clean
nolouch Apr 17, 2017
a787547
Merge branch 'master' into hot-region-schedule
nolouch Apr 17, 2017
1097525
address comments
nolouch Apr 18, 2017
ef4fe14
Merge branch 'master' into shuning/hot-region-schedule
nolouch Apr 20, 2017
3eae118
address comment
nolouch Apr 20, 2017
f18d1c4
address comments
nolouch Apr 23, 2017
d9ad9f7
fix
nolouch Apr 23, 2017
95e3530
address comments
nolouch Apr 24, 2017
5e8f5a8
tiny fix
nolouch Apr 24, 2017
174782e
address comment
nolouch Apr 24, 2017
1c3605a
tiny fix
nolouch Apr 24, 2017
30bcb69
address comment
nolouch Apr 24, 2017
ce794ce
Merge branch 'master' into shuning/hot-region-schedule
disksing Apr 25, 2017
bad2c8a
fix ci
nolouch Apr 25, 2017
18e60e9
Merge branch 'shuning/hot-region-schedule' of https://github.com/ping…
nolouch Apr 25, 2017
c2469d7
tiny fix
nolouch Apr 25, 2017
7d80df0
Merge branch 'master' into shuning/hot-region-schedule
zhangjinpeng87 Apr 26, 2017
2760bc9
address comment
nolouch Apr 26, 2017
79a39d8
Merge branch 'shuning/hot-region-schedule' of https://github.com/ping…
nolouch Apr 26, 2017
79b9de3
address comment
nolouch Apr 26, 2017
91e69c6
address comment
nolouch Apr 27, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions server/api/hot_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package api

import (
"net/http"

"github.com/pingcap/pd/server"
"github.com/unrolled/render"
)

type hotStatusHandler struct {
svr *server.Server
rd *render.Render
}

func newHotStatusHandler(svr *server.Server, rd *render.Render) *hotStatusHandler {
return &hotStatusHandler{
svr: svr,
rd: rd,
}
}

func (h *hotStatusHandler) GetHot(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, h.svr.GetHotRegions())
}

func (h *hotStatusHandler) GetHotStores(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, h.svr.GetHotStores())
}
4 changes: 4 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router.HandleFunc("/api/v1/labels", labelsHandler.Get).Methods("GET")
router.HandleFunc("/api/v1/labels/stores", labelsHandler.GetStores).Methods("GET")

hotStatusHandler := newHotStatusHandler(svr, rd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hot or hotspot?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/api/v1/hotspot/host
/api/v1/hotspot/store?

router.HandleFunc("/api/v1/hot", hotStatusHandler.GetHot).Methods("GET")
router.HandleFunc("/api/v1/hotstore", hotStatusHandler.GetHotStores).Methods("GET")

router.Handle("/api/v1/events", newEventsHandler(svr, rd)).Methods("GET")
router.Handle("/api/v1/feed", newFeedHandler(svr, rd)).Methods("GET")

Expand Down
209 changes: 209 additions & 0 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package server

import (
"math"
"math/rand"
"sort"
"time"

"github.com/montanaflynn/stats"
Expand Down Expand Up @@ -370,3 +372,210 @@ func (r *replicaChecker) checkBestReplacement(region *RegionInfo) Operator {
}
return newTransferPeer(region, oldPeer, newPeer)
}

type RegionStateValue struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/RegionStateValue/RegionStat

RegionID uint64 `json:"region_id"`
WriteBytes uint64 `json:"write_bytes"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Written

UpdateTimes int `json:"update_times"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UpdateCount

LastUpdateTime time.Time `json:"last_update_time"`
StoreID uint64 `json: "-"`
antiTimes int `json:"-"`
version uint64 `json:"-"`
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any need to explain what do WrittenBytes / HotDegree / antiCount mean and what they are used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for JSON encoding

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess @disksing wants to know the meaning of some fields, not only for JSON.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I know what they do generally, but I think there should be some explanation here, for potential future readers.

type MetaRegionStatus []RegionStateValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add test for the slice sort


func (m MetaRegionStatus) Len() int { return len(m) }
func (m MetaRegionStatus) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m MetaRegionStatus) Less(i, j int) bool { return m[i].WriteBytes > m[j].WriteBytes }

type RegionHotStatus struct {
TotalWriteBytes uint64 `json:"total_write"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the difference between TotalWriteBytes and MetaStatus.WriteBytes?

MetaStatus MetaRegionStatus `json:"status"`
}

type balanceHotRegionScheduler struct {
opt *scheduleOption
limit uint64
scoreStatus map[uint64]RegionHotStatus // store id -> regions status in this store
r *rand.Rand
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not using global Rand directly? Using Source directly is not thread safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just used in one thread.

}

func newBalanceHotRegionScheduler(opt *scheduleOption) *balanceHotRegionScheduler {
return &balanceHotRegionScheduler{
opt: opt,
limit: 1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we modify the limit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use a const var

scoreStatus: make(map[uint64]RegionHotStatus),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

func (l *balanceHotRegionScheduler) GetName() string {
return "balance-hot-region-scheduler"
}

func (l *balanceHotRegionScheduler) GetResourceKind() ResourceKind {
return priorityKind
}

func (l *balanceHotRegionScheduler) GetResourceLimit() uint64 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/l/s/

return l.limit
}

func (l *balanceHotRegionScheduler) Prepare(cluster *clusterInfo) error { return nil }

func (l *balanceHotRegionScheduler) Cleanup(cluster *clusterInfo) {}

func (l *balanceHotRegionScheduler) Schedule(cluster *clusterInfo) Operator {
l.CalculateScore(cluster)
region, leader := l.SelectTransferLeader(cluster)
if region == nil {
return nil
}
if leader != nil {
return newPriorityTransferLeader(region, leader)
}
peer := l.SelectTransferPeer(cluster, region)
return newPriorityTransferPeer(region, region.Leader, peer)
}

func (l *balanceHotRegionScheduler) clearScore() {
for sid, status := range l.scoreStatus {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer clear the map directly.

status.TotalWriteBytes = 0
if status.MetaStatus.Len() != 0 {
status.MetaStatus = status.MetaStatus[0:0]
}
l.scoreStatus[sid] = status
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to abstract a function here. It's only 1 line and be called only at 1 place.

func (l *balanceHotRegionScheduler) CalculateScore(cluster *clusterInfo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to public?

l.clearScore()
items := cluster.writeStatus.GetList()
for _, item := range items {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to access items after unlocked?

r := item.(RegionStateValue)
if r.UpdateTimes < 5 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this 5 a const?

continue
}

id := r.RegionID
regionInfo := cluster.getRegion(id)
if regionInfo.WriteBytes == 0 {
log.Infof("Debug error meet 0 write_bytes %d \n", regionInfo.GetId())
continue
}
storeID := regionInfo.Leader.GetStoreId()
status, ok := l.scoreStatus[storeID]
if !ok {
status = RegionHotStatus{
MetaStatus: make(MetaRegionStatus, 0, 100),
}
l.scoreStatus[storeID] = status
}
status.TotalWriteBytes += regionInfo.WriteBytes
status.MetaStatus = append(status.MetaStatus, RegionStateValue{id, regionInfo.WriteBytes, r.UpdateTimes, r.LastUpdateTime, storeID, r.antiTimes, r.version})
l.scoreStatus[storeID] = status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is redundant.

}

for sid, rs := range l.scoreStatus {
sort.Sort(rs.MetaStatus)
l.scoreStatus[sid] = rs
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for _, rs := range l.scoreStatus {
    sort.Sort(rs.MetaStatus)
}

should be ok.

}
func (l *balanceHotRegionScheduler) SelectSourceRegion(cluster *clusterInfo) *RegionInfo {
var (
maxWrite uint64
sourceStore uint64
)
for sid, s := range l.scoreStatus {
if s.MetaStatus.Len() < 2 {
continue
}
if maxWrite < s.TotalWriteBytes {
maxWrite = s.TotalWriteBytes
sourceStore = sid
}
}
if sourceStore == 0 {
return nil
}

length := l.scoreStatus[sourceStore].MetaStatus.Len()
for i := 0; i < 10; i++ {
rr := l.r.Int31n(int32((length+1)/2)) + int32(length/2)
rid := l.scoreStatus[sourceStore].MetaStatus[rr].RegionID
region := cluster.getRegion(rid)
if len(region.DownPeers) != 0 || len(region.PendingPeers) != 0 {
log.Info("Debug not select peer", region.DownPeers, region.PendingPeers)
continue
}
return region
}
return nil
}

func (l *balanceHotRegionScheduler) GetStatus() map[uint64]RegionHotStatus {
return l.scoreStatus
}

func (l *balanceHotRegionScheduler) SelectTransferLeader(cluster *clusterInfo) (*RegionInfo, *metapb.Peer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function do two different things(select source region and select target leader), the function name make me confused.

sourceRegion := l.SelectSourceRegion(cluster)
if sourceRegion == nil {
log.Info("Debug not select source region")
return nil, nil
}
sourceStoreWriteBytes := l.scoreStatus[sourceRegion.Leader.GetStoreId()].TotalWriteBytes
var targetPeer *metapb.Peer

//select follow to transfer leader
var least uint64 = math.MaxUint64
for _, peer := range sourceRegion.GetFollowers() {
if s, ok := l.scoreStatus[peer.GetStoreId()]; ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment for the logic of this algorithm.

if least >= s.TotalWriteBytes && uint64(float64(sourceStoreWriteBytes)*hotRegionScheduleFactor) > s.TotalWriteBytes+2*sourceRegion.WriteBytes {
targetPeer = peer
least = s.TotalWriteBytes
}
} else {
targetPeer = peer
least = 0
}
}
if targetPeer != nil {
log.Infof("Debug Transfer Leader Source:%d Target:%d Write:%d,RegionID:%d Key:%s\n", sourceRegion.Leader.GetStoreId(), targetPeer.GetStoreId(), sourceRegion.WriteBytes, sourceRegion.GetId(), sourceRegion.GetEndKey())
}
return sourceRegion, targetPeer
}

func (l *balanceHotRegionScheduler) SelectTransferPeer(cluster *clusterInfo, region *RegionInfo) *metapb.Peer {
filter := newExcludedFilter(region.GetStoreIds(), region.GetStoreIds())
sourceStoreWriteBytes := l.scoreStatus[region.Leader.GetStoreId()].TotalWriteBytes
stores := cluster.getStores()
var bestStore *storeInfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/bestStore/TargetStore

var least uint64 = math.MaxUint64
for _, store := range stores {
if filter.FilterSource(store) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if filter.FilterTarget(..) is more clear.

continue
}
if s, ok := l.scoreStatus[store.GetId()]; ok {
if least >= s.TotalWriteBytes && uint64(float64(sourceStoreWriteBytes)*hotRegionScheduleFactor) > s.TotalWriteBytes+2*region.WriteBytes {
least = s.TotalWriteBytes
bestStore = store
}
} else {
least = 0
bestStore = store
break
}
}
if bestStore == nil {
return nil
}
newPeer, err := cluster.allocPeer(bestStore.GetId())
if err != nil {
log.Errorf("failed to allocate peer: %v", err)
return nil
}
if newPeer != nil {
log.Infof("Debug Transfer Peer Source:%d Target:%d Write:%d RegionID:%d Key:%s \n", region.Leader.GetStoreId(), newPeer.GetStoreId(), region.WriteBytes, region.GetId(), region.GetEndKey())
}

return newPeer
}
Loading