Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add hot region schedule #611

Merged
merged 34 commits into from
Apr 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e2982b7
*: add hot region schedule
nolouch Apr 7, 2017
2a2ef7f
*: adjust state outqueue state
nolouch Apr 8, 2017
ca4c248
*:calculate avg bytes write
nolouch Apr 8, 2017
7d8e7bf
*: clean up
nolouch Apr 10, 2017
0f6c35c
address
nolouch Apr 11, 2017
382986e
tiny clean
nolouch Apr 11, 2017
465e6e7
some fix
nolouch Apr 12, 2017
20e78b0
change to lruCache
nolouch Apr 13, 2017
0d21c20
some clean
nolouch Apr 13, 2017
5050c94
address comments
nolouch Apr 15, 2017
f5c9de4
address comments
nolouch Apr 16, 2017
e4f281d
*:tiny fix
nolouch Apr 17, 2017
034e975
balance: add some test
nolouch Apr 17, 2017
9228598
tiny clean
nolouch Apr 17, 2017
a787547
Merge branch 'master' into hot-region-schedule
nolouch Apr 17, 2017
1097525
address comments
nolouch Apr 18, 2017
ef4fe14
Merge branch 'master' into shuning/hot-region-schedule
nolouch Apr 20, 2017
3eae118
address comment
nolouch Apr 20, 2017
f18d1c4
address comments
nolouch Apr 23, 2017
d9ad9f7
fix
nolouch Apr 23, 2017
95e3530
address comments
nolouch Apr 24, 2017
5e8f5a8
tiny fix
nolouch Apr 24, 2017
174782e
address comment
nolouch Apr 24, 2017
1c3605a
tiny fix
nolouch Apr 24, 2017
30bcb69
address comment
nolouch Apr 24, 2017
ce794ce
Merge branch 'master' into shuning/hot-region-schedule
disksing Apr 25, 2017
bad2c8a
fix ci
nolouch Apr 25, 2017
18e60e9
Merge branch 'shuning/hot-region-schedule' of https://github.com/ping…
nolouch Apr 25, 2017
c2469d7
tiny fix
nolouch Apr 25, 2017
7d80df0
Merge branch 'master' into shuning/hot-region-schedule
zhangjinpeng87 Apr 26, 2017
2760bc9
address comment
nolouch Apr 26, 2017
79a39d8
Merge branch 'shuning/hot-region-schedule' of https://github.com/ping…
nolouch Apr 26, 2017
79b9de3
address comment
nolouch Apr 26, 2017
91e69c6
address comment
nolouch Apr 27, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ package testutil

import (
"fmt"
"os"
"sync/atomic"
)

var unixURLCount uint64

// UnixURL returns a unique unix socket url, used for test only.
func UnixURL() string {
return fmt.Sprintf("unix://localhost:%d%d", os.Getpid(), atomic.AddUint64(&unixURLCount, 1))
return fmt.Sprintf("unix://localhost:%d", atomic.AddUint64(&unixURLCount, 1))
}
41 changes: 41 additions & 0 deletions server/api/hot_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"net/http"

"github.com/pingcap/pd/server"
"github.com/unrolled/render"
)

type hotStatusHandler struct {
*server.Handler
rd *render.Render
}

func newHotStatusHandler(handler *server.Handler, rd *render.Render) *hotStatusHandler {
return &hotStatusHandler{
Handler: handler,
rd: rd,
}
}

func (h *hotStatusHandler) GetHotRegions(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, h.GetHotWriteRegions())
}

func (h *hotStatusHandler) GetHotStores(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, h.GetHotWriteStores())
}
4 changes: 4 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router.HandleFunc("/api/v1/labels", labelsHandler.Get).Methods("GET")
router.HandleFunc("/api/v1/labels/stores", labelsHandler.GetStores).Methods("GET")

hotStatusHandler := newHotStatusHandler(handler, rd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let pd-ctl support this later.

router.HandleFunc("/api/v1/hotspot/regions", hotStatusHandler.GetHotRegions).Methods("GET")
router.HandleFunc("/api/v1/hotspot/stores", hotStatusHandler.GetHotStores).Methods("GET")

router.Handle("/api/v1/events", newEventsHandler(svr, rd)).Methods("GET")
router.Handle("/api/v1/feed", newFeedHandler(svr, rd)).Methods("GET")

Expand Down
278 changes: 278 additions & 0 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package server

import (
"math"
"math/rand"
"sort"
"sync"
"time"

log "github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -370,3 +373,278 @@ func (r *replicaChecker) checkBestReplacement(region *RegionInfo) Operator {
}
return newTransferPeer(region, oldPeer, newPeer)
}

// RegionStat records each hot region's statistics
type RegionStat struct {
RegionID uint64 `json:"region_id"`
WrittenBytes uint64 `json:"written_bytes"`
// HotDegree records the hot region update times
HotDegree int `json:"hot_degree"`
// LastUpdateTime used to calculate average write
LastUpdateTime time.Time `json:"last_update_time"`
StoreID uint64 `json:"-"`
// antiCount used to eliminate some noise when remove region in cache
antiCount int
// version used to check the region split times
version uint64
}

// RegionsStat is a list of a group region state type
type RegionsStat []RegionStat

func (m RegionsStat) Len() int { return len(m) }
func (m RegionsStat) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m RegionsStat) Less(i, j int) bool { return m[i].WrittenBytes < m[j].WrittenBytes }

// StoreHotRegions records all hot regions in one store with sequence
type StoreHotRegions struct {
TotalWrittenBytes uint64 `json:"total_written"`
RegionCount int `json:"region_count"`
RegionsStat RegionsStat `json:"stats"`
}

type balanceHotRegionScheduler struct {
sync.RWMutex
opt *scheduleOption
limit uint64
scoreStatus map[uint64]*StoreHotRegions // store id -> regions status in this store
r *rand.Rand
}

func newBalanceHotRegionScheduler(opt *scheduleOption) *balanceHotRegionScheduler {
return &balanceHotRegionScheduler{
opt: opt,
limit: 1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we modify the limit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use a const var

scoreStatus: make(map[uint64]*StoreHotRegions),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

func (h *balanceHotRegionScheduler) GetName() string {
return "balance-hot-region-scheduler"
}

func (h *balanceHotRegionScheduler) GetResourceKind() ResourceKind {
return priorityKind
}

func (h *balanceHotRegionScheduler) GetResourceLimit() uint64 {
return h.limit
}

func (h *balanceHotRegionScheduler) Prepare(cluster *clusterInfo) error { return nil }

func (h *balanceHotRegionScheduler) Cleanup(cluster *clusterInfo) {}

func (h *balanceHotRegionScheduler) Schedule(cluster *clusterInfo) Operator {
h.calculateScore(cluster)
region := h.SelectSourceRegion(cluster)
if region == nil {
return nil
}
newLeader := h.selectTransferLeader(region)
if newLeader != nil {
return newPriorityTransferLeader(region, newLeader)
}
peer := h.selectTransferPeer(region, cluster)
if peer != nil {
return newPriorityTransferPeer(region, region.Leader, peer)
}
return nil
}

func (h *balanceHotRegionScheduler) calculateScore(cluster *clusterInfo) {
h.Lock()
defer h.Unlock()
h.scoreStatus = make(map[uint64]*StoreHotRegions)
items := cluster.writeStatistics.elems()
for _, item := range items {
r, ok := item.value.(*RegionStat)
if !ok {
continue
}
if r.HotDegree < hotRegionLowThreshold {
continue
}

regionInfo := cluster.getRegion(r.RegionID)
storeID := regionInfo.Leader.GetStoreId()
status, ok := h.scoreStatus[storeID]
if !ok {
status = &StoreHotRegions{
RegionsStat: make(RegionsStat, 0, storeHotRegionsDefaultLen),
}
h.scoreStatus[storeID] = status
}
status.TotalWrittenBytes += r.WrittenBytes
status.RegionsStat = append(status.RegionsStat, RegionStat{
RegionID: r.RegionID,
WrittenBytes: r.WrittenBytes,
HotDegree: r.HotDegree,
LastUpdateTime: r.LastUpdateTime,
StoreID: storeID,
antiCount: r.antiCount,
version: r.version,
})
status.RegionCount++
}

for _, rs := range h.scoreStatus {
sort.Sort(sort.Reverse(rs.RegionsStat))
}
}

func (h *balanceHotRegionScheduler) SelectSourceRegion(cluster *clusterInfo) *RegionInfo {
var (
maxWritten uint64
sourceStore uint64
maxHotStoreRegionCount int
)
// choose a hot store as transfer source
// the numbers of the hot regions in that store has higher priority than TotalWrittenBytes
for sid, s := range h.scoreStatus {
if s.RegionsStat.Len() < 2 {
continue
}

if maxHotStoreRegionCount < s.RegionsStat.Len() {
maxHotStoreRegionCount = s.RegionsStat.Len()
maxWritten = s.TotalWrittenBytes
sourceStore = sid
continue
}

if maxHotStoreRegionCount == s.RegionsStat.Len() && maxWritten < s.TotalWrittenBytes {
maxWritten = s.TotalWrittenBytes
sourceStore = sid
}
}

if sourceStore == 0 {
return nil
}

length := h.scoreStatus[sourceStore].RegionsStat.Len()
// the hottest region in the store not move
// randomly pick a region from 1 .. length-1
// TODO: consider hot degree when pick
rr := h.r.Intn(length-1) + 1
pickedRegionStat := h.scoreStatus[sourceStore].RegionsStat[rr]
if pickedRegionStat.antiCount < hotRegionAntiCount {
return nil
}
sourceRegion := cluster.getRegion(pickedRegionStat.RegionID)
if len(sourceRegion.DownPeers) != 0 || len(sourceRegion.PendingPeers) != 0 {
return nil
}
// use written bytes per second
sourceRegion.WrittenBytes = pickedRegionStat.WrittenBytes
h.adjustBalanceLimit(sourceStore)
return sourceRegion
}

func (h *balanceHotRegionScheduler) adjustBalanceLimit(storeID uint64) {
s := h.scoreStatus[storeID]
var hotRegionTotalCount float64
for _, m := range h.scoreStatus {
hotRegionTotalCount += float64(m.RegionsStat.Len())
}

avgRegionCount := hotRegionTotalCount / float64(len(h.scoreStatus))
// Multiplied by hotRegionLimitFactor to avoid transfer back and forth
limit := uint64((float64(s.RegionsStat.Len()) - avgRegionCount) * hotRegionLimitFactor)
h.limit = maxUint64(1, limit)
}

func (h *balanceHotRegionScheduler) GetStatus() map[uint64]*StoreHotRegions {
h.RLock()
defer h.RUnlock()
status := make(map[uint64]*StoreHotRegions)
for id, stat := range h.scoreStatus {
clone := *stat
status[id] = &clone
}
return status
}

func (h *balanceHotRegionScheduler) selectTransferLeader(sourceRegion *RegionInfo) *metapb.Peer {
followPeers := sourceRegion.GetFollowers()
storeIDs := make([]uint64, 0, len(followPeers))
for _, peer := range followPeers {
storeIDs = append(storeIDs, peer.GetStoreId())
}

targetStoreID := h.selectBestStore(storeIDs, sourceRegion)
var targetPeer *metapb.Peer
for _, peer := range followPeers {
if peer.GetStoreId() == targetStoreID {
targetPeer = peer
}
}
return targetPeer
}

func (h *balanceHotRegionScheduler) selectTransferPeer(sourceRegion *RegionInfo, cluster *clusterInfo) *metapb.Peer {
var filters []Filter
stores := cluster.getStores()

filters = append(filters, newExcludedFilter(sourceRegion.GetStoreIds(), sourceRegion.GetStoreIds()))
filters = append(filters, newDistinctScoreFilter(h.opt.GetReplication(), stores, cluster.getLeaderStore(sourceRegion)))
filters = append(filters, newStateFilter(h.opt))
filters = append(filters, newStorageThresholdFilter(h.opt))

storeIDs := make([]uint64, 0, len(stores))
for _, store := range stores {
if filterTarget(store, filters) {
continue
}
storeIDs = append(storeIDs, store.GetId())
}
targetStoreID := h.selectBestStore(storeIDs, sourceRegion)
targetStore := cluster.getStore(targetStoreID)
if targetStore == nil {
return nil
}
newPeer, err := cluster.allocPeer(targetStore.GetId())
if err != nil {
log.Errorf("failed to allocate peer: %v", err)
return nil
}

return newPeer
}

// select a store to transfer peer
// preferred to the store that with the least number of regions
// and then choose the least total written bytes store
func (h *balanceHotRegionScheduler) selectBestStore(stores []uint64, sourceRegion *RegionInfo) uint64 {
sr := h.scoreStatus[sourceRegion.Leader.GetStoreId()]
sourceStoreWrittenBytes := sr.TotalWrittenBytes
sourceStoreHotRegionCount := sr.RegionsStat.Len()

var (
targetStore uint64
minWrittenBytes uint64 = math.MaxUint64
)
minRegionCount := int(math.MaxInt32)
for _, store := range stores {
if s, ok := h.scoreStatus[store]; ok {
if sourceStoreHotRegionCount-s.RegionsStat.Len() > 1 && minRegionCount > s.RegionsStat.Len() {
targetStore = store
minWrittenBytes = s.TotalWrittenBytes
minRegionCount = s.RegionsStat.Len()
continue
}
if minRegionCount == s.RegionsStat.Len() && minWrittenBytes > s.TotalWrittenBytes &&
uint64(float64(sourceStoreWrittenBytes)*hotRegionScheduleFactor) > s.TotalWrittenBytes+2*sourceRegion.WrittenBytes {
minWrittenBytes = s.TotalWrittenBytes
targetStore = store
}

} else {
targetStore = store
break
}
}
return targetStore
}
Loading