Skip to content

Commit

Permalink
add the concept major && minor for hot regions
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjinpeng87 committed May 9, 2017
1 parent 19589b9 commit ead6565
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 58 deletions.
160 changes: 120 additions & 40 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ const (
bootstrapBalanceDiff = 2
)

type HotRegionType int

const (
minor HotRegionType = iota
major HotRegionType = iota
)

// minBalanceDiff returns the minimal diff to do balance. The formula is based
// on experience to let the diff increase alone with the count slowly.
func minBalanceDiff(count uint64) float64 {
Expand Down Expand Up @@ -410,18 +417,20 @@ type StoreHotRegions struct {

type balanceHotRegionScheduler struct {
sync.RWMutex
opt *scheduleOption
limit uint64
scoreStatus map[uint64]*StoreHotRegions // store id -> regions status in this store
r *rand.Rand
opt *scheduleOption
limit uint64
majorScoreStatus map[uint64]*StoreHotRegions // store id -> regions status in this store
minorScoreStatus map[uint64]*StoreHotRegions // store id -> regions status in this store
r *rand.Rand
}

func newBalanceHotRegionScheduler(opt *scheduleOption) *balanceHotRegionScheduler {
return &balanceHotRegionScheduler{
opt: opt,
limit: 1,
scoreStatus: make(map[uint64]*StoreHotRegions),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
opt: opt,
limit: 1,
majorScoreStatus: make(map[uint64]*StoreHotRegions),
minorScoreStatus: make(map[uint64]*StoreHotRegions),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

Expand All @@ -444,16 +453,26 @@ func (h *balanceHotRegionScheduler) Cleanup(cluster *clusterInfo) {}
func (h *balanceHotRegionScheduler) Schedule(cluster *clusterInfo) Operator {
h.calculateScores(cluster)

// balance by peer
srcRegion, srcPeer, destPeer := h.balanceByPeer(cluster)
if srcRegion != nil {
return newPriorityTransferPeer(srcRegion, srcPeer, destPeer)
// balance major hot regions
srcRegionMajor, srcPeerMajor, destPeerMajor := h.balanceByPeer(cluster, major)
if srcRegionMajor != nil {
return newPriorityTransferPeer(srcRegionMajor, srcPeerMajor, destPeerMajor)
}

srcRegionMajor, newLeaderMajor := h.balanceByLeader(cluster, major)
if srcRegionMajor != nil {
return newPriorityTransferLeader(srcRegionMajor, newLeaderMajor)
}

// balance minor hot regions
srcRegionMinor, srcPeerMinor, destPeerMinor := h.balanceByPeer(cluster, minor)
if srcRegionMinor != nil {
return newPriorityTransferPeer(srcRegionMinor, srcPeerMinor, destPeerMinor)
}

// balance by leader
srcRegion, newLeader := h.balanceByLeader(cluster)
if srcRegion != nil {
return newPriorityTransferLeader(srcRegion, newLeader)
srcRegionMinor, newLeaderMinor := h.balanceByLeader(cluster, minor)
if srcRegionMinor != nil {
return newPriorityTransferLeader(srcRegionMinor, newLeaderMinor)
}

return nil
Expand All @@ -462,8 +481,23 @@ func (h *balanceHotRegionScheduler) Schedule(cluster *clusterInfo) Operator {
func (h *balanceHotRegionScheduler) calculateScores(cluster *clusterInfo) {
h.Lock()
defer h.Unlock()
h.scoreStatus = make(map[uint64]*StoreHotRegions)
items := cluster.writeStatistics.elems()

h.calculateScoresImpl(cluster, major)
h.calculateScoresImpl(cluster, minor)
}

func (h *balanceHotRegionScheduler) calculateScoresImpl(cluster *clusterInfo, t HotRegionType) {
scoreStatus := make(map[uint64]*StoreHotRegions)
var items []*cacheItem
switch t {
case major:
items = cluster.majorWriteStatistics.elems()
case minor:
items = cluster.minorWriteStatistics.elems()
default:
panic("Not supportted hot region type")
}

for _, item := range items {
r, ok := item.value.(*RegionStat)
if !ok {
Expand All @@ -477,13 +511,13 @@ func (h *balanceHotRegionScheduler) calculateScores(cluster *clusterInfo) {
LeaderStoreId := regionInfo.Leader.GetStoreId()
StoreIds := regionInfo.GetStoreIds()
for storeId := range StoreIds {
statistics, ok := h.scoreStatus[storeId]
statistics, ok := scoreStatus[storeId]
if !ok {
statistics = &StoreHotRegions{
RegionsStatAsLeader: make(RegionsStat, 0, storeHotRegionsDefaultLen),
RegionsStatAsPeer: make(RegionsStat, 0, storeHotRegionsDefaultLen),
}
h.scoreStatus[storeId] = statistics
scoreStatus[storeId] = statistics
}

stat := RegionStat{
Expand All @@ -506,17 +540,35 @@ func (h *balanceHotRegionScheduler) calculateScores(cluster *clusterInfo) {
}
}
}

switch t {
case major:
h.majorScoreStatus = scoreStatus
case minor:
h.minorScoreStatus = scoreStatus
default:
panic("Not supportted hot region type")
}
}

func (h *balanceHotRegionScheduler) balanceByPeer(cluster *clusterInfo) (*RegionInfo, *metapb.Peer, *metapb.Peer) {
func (h *balanceHotRegionScheduler) balanceByPeer(cluster *clusterInfo, t HotRegionType) (*RegionInfo, *metapb.Peer, *metapb.Peer) {
var (
maxWrittenBytes uint64
srcStoreId uint64
maxHotStoreRegionCount int
scoreStatus map[uint64]*StoreHotRegions
)

// get the srcStoreId
for storeId, statistics := range h.scoreStatus {
switch t {
case major:
scoreStatus = h.majorScoreStatus
case minor:
scoreStatus = h.minorScoreStatus
default:
panic("Not supportted hot region type")
}
for storeId, statistics := range scoreStatus {
if statistics.RegionsStatAsPeer.Len() < 2 {
continue
}
Expand All @@ -539,8 +591,8 @@ func (h *balanceHotRegionScheduler) balanceByPeer(cluster *clusterInfo) (*Region

stores := cluster.getStores()
var destStoreId uint64
for _, i := range h.r.Perm(h.scoreStatus[srcStoreId].RegionsStatAsPeer.Len()) {
rs := h.scoreStatus[srcStoreId].RegionsStatAsPeer[i]
for _, i := range h.r.Perm(scoreStatus[srcStoreId].RegionsStatAsPeer.Len()) {
rs := scoreStatus[srcStoreId].RegionsStatAsPeer[i]
srcRegion := cluster.getRegion(rs.RegionID)
if len(srcRegion.DownPeers) != 0 || len(srcRegion.PendingPeers) != 0 {
continue
Expand All @@ -559,7 +611,7 @@ func (h *balanceHotRegionScheduler) balanceByPeer(cluster *clusterInfo) (*Region
destStoreIds = append(destStoreIds, store.GetId())
}

destStoreId = h.selectDestStoreByPeer(destStoreIds, srcRegion, srcStoreId)
destStoreId = h.selectDestStoreByPeer(destStoreIds, srcRegion, srcStoreId, t)
if destStoreId != 0 {
srcRegion.WrittenBytes = rs.WrittenBytes
h.adjustBalanceLimitByPeer(srcStoreId)
Expand Down Expand Up @@ -590,8 +642,17 @@ func (h *balanceHotRegionScheduler) balanceByPeer(cluster *clusterInfo) (*Region
return nil, nil, nil
}

func (h *balanceHotRegionScheduler) selectDestStoreByPeer(candidateStoreIds []uint64, srcRegion *RegionInfo, srcStoreId uint64) uint64 {
sr := h.scoreStatus[srcStoreId]
func (h *balanceHotRegionScheduler) selectDestStoreByPeer(candidateStoreIds []uint64, srcRegion *RegionInfo, srcStoreId uint64, t HotRegionType) uint64 {
var scoreStatus map[uint64]*StoreHotRegions
switch t {
case major:
scoreStatus = h.majorScoreStatus
case minor:
scoreStatus = h.minorScoreStatus
default:
panic("Not supportted hot region type")
}
sr := scoreStatus[srcStoreId]
srcWrittenBytes := sr.WrittenBytesAsPeer
srcHotRegionsCount := sr.RegionsStatAsPeer.Len()

Expand All @@ -601,7 +662,7 @@ func (h *balanceHotRegionScheduler) selectDestStoreByPeer(candidateStoreIds []ui
)
minRegionsCount := int(math.MaxInt32)
for _, storeId := range candidateStoreIds {
if s, ok := h.scoreStatus[storeId]; ok {
if s, ok := scoreStatus[storeId]; ok {
if srcHotRegionsCount-s.RegionsStatAsPeer.Len() > 1 && minRegionsCount > s.RegionsStatAsLeader.Len() {
destStoreId = storeId
minWrittenBytes = s.WrittenBytesAsPeer
Expand All @@ -622,27 +683,36 @@ func (h *balanceHotRegionScheduler) selectDestStoreByPeer(candidateStoreIds []ui
}

func (h *balanceHotRegionScheduler) adjustBalanceLimitByPeer(storeID uint64) {
s := h.scoreStatus[storeID]
s := h.majorScoreStatus[storeID]
var hotRegionTotalCount float64
for _, m := range h.scoreStatus {
for _, m := range h.majorScoreStatus {
hotRegionTotalCount += float64(m.RegionsStatAsPeer.Len())
}

avgRegionCount := hotRegionTotalCount / float64(len(h.scoreStatus))
avgRegionCount := hotRegionTotalCount / float64(len(h.majorScoreStatus))
// Multiplied by hotRegionLimitFactor to avoid transfer back and forth
limit := uint64((float64(s.RegionsStatAsPeer.Len()) - avgRegionCount) * hotRegionLimitFactor)
h.limit = maxUint64(1, limit)
}

func (h *balanceHotRegionScheduler) balanceByLeader(cluster *clusterInfo) (*RegionInfo, *metapb.Peer) {
func (h *balanceHotRegionScheduler) balanceByLeader(cluster *clusterInfo, t HotRegionType) (*RegionInfo, *metapb.Peer) {
var (
maxWrittenBytes uint64
srcStoreId uint64
maxHotStoreRegionCount int
scoreStatus map[uint64]*StoreHotRegions
)
switch t {
case major:
scoreStatus = h.majorScoreStatus
case minor:
scoreStatus = h.minorScoreStatus
default:
panic("Not supportted hot region type")
}

// select srcStoreId by leader
for storeId, statistics := range h.scoreStatus {
for storeId, statistics := range scoreStatus {
if statistics.RegionsStatAsLeader.Len() < 2 {
continue
}
Expand All @@ -664,23 +734,33 @@ func (h *balanceHotRegionScheduler) balanceByLeader(cluster *clusterInfo) (*Regi
}

// select destPeer
for _, i := range h.r.Perm(h.scoreStatus[srcStoreId].RegionsStatAsLeader.Len()) {
rs := h.scoreStatus[srcStoreId].RegionsStatAsLeader[i]
for _, i := range h.r.Perm(scoreStatus[srcStoreId].RegionsStatAsLeader.Len()) {
rs := scoreStatus[srcStoreId].RegionsStatAsLeader[i]
srcRegion := cluster.getRegion(rs.RegionID)
if len(srcRegion.DownPeers) != 0 || len(srcRegion.PendingPeers) != 0 {
continue
}

destPeer := h.selectDestStoreByLeader(srcRegion)
destPeer := h.selectDestStoreByLeader(srcRegion, t)
if destPeer != nil {
return srcRegion, destPeer
}
}
return nil, nil
}

func (h *balanceHotRegionScheduler) selectDestStoreByLeader(srcRegion *RegionInfo) *metapb.Peer {
sr := h.scoreStatus[srcRegion.Leader.GetStoreId()]
func (h *balanceHotRegionScheduler) selectDestStoreByLeader(srcRegion *RegionInfo, t HotRegionType) *metapb.Peer {
var scoreStatus map[uint64]*StoreHotRegions
switch t {
case major:
scoreStatus = h.majorScoreStatus
case minor:
scoreStatus = h.minorScoreStatus
default:
panic("Not supportted hot region type")
}

sr := scoreStatus[srcRegion.Leader.GetStoreId()]
srcWrittenBytes := sr.WrittenBytesAsLeader
srcHotRegionsCount := sr.RegionsStatAsLeader.Len()

Expand All @@ -690,7 +770,7 @@ func (h *balanceHotRegionScheduler) selectDestStoreByLeader(srcRegion *RegionInf
)
minRegionsCount := int(math.MaxInt32)
for storeId, peer := range srcRegion.GetFollowers() {
if s, ok := h.scoreStatus[storeId]; ok {
if s, ok := scoreStatus[storeId]; ok {
if srcHotRegionsCount-s.RegionsStatAsLeader.Len() > 1 && minRegionsCount > s.RegionsStatAsLeader.Len() {
destPeer = peer
minWrittenBytes = s.WrittenBytesAsLeader
Expand All @@ -714,7 +794,7 @@ func (h *balanceHotRegionScheduler) GetStatus() map[uint64]*StoreHotRegions {
h.RLock()
defer h.RUnlock()
status := make(map[uint64]*StoreHotRegions)
for id, stat := range h.scoreStatus {
for id, stat := range h.majorScoreStatus {
clone := *stat
status[id] = &clone
}
Expand Down
2 changes: 1 addition & 1 deletion server/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ func (s *testBalanceHotRegionSchedulerSuite) TestBalance(c *C) {
}
hb.calculateScore(tc.clusterInfo)
for _, e := range expect {
c.Assert(hb.scoreStatus[uint64(e.streID)].RegionCount, Equals, e.hotRegionNumber)
c.Assert(hb.majorScoreStatus[uint64(e.streID)].RegionCount, Equals, e.hotRegionNumber)
}

// Test adjustLimit
Expand Down
Loading

0 comments on commit ead6565

Please sign in to comment.