From ead65655a60f085ea886cc36b206be84763d001e Mon Sep 17 00:00:00 2001 From: zhangjinpeng1987 Date: Tue, 9 May 2017 15:01:27 +0800 Subject: [PATCH] add the concept major && minor for hot regions --- server/balancer.go | 160 ++++++++++++++++++++++++++++++---------- server/balancer_test.go | 2 +- server/cache.go | 75 +++++++++++++++---- server/coordinator.go | 3 +- 4 files changed, 182 insertions(+), 58 deletions(-) diff --git a/server/balancer.go b/server/balancer.go index 1e4e86ac4d5..60dc83f412f 100644 --- a/server/balancer.go +++ b/server/balancer.go @@ -30,6 +30,13 @@ const ( bootstrapBalanceDiff = 2 ) +type HotRegionType int + +const ( + minor HotRegionType = iota + major HotRegionType = iota +) + // minBalanceDiff returns the minimal diff to do balance. The formula is based // on experience to let the diff increase alone with the count slowly. func minBalanceDiff(count uint64) float64 { @@ -410,18 +417,20 @@ type StoreHotRegions struct { type balanceHotRegionScheduler struct { sync.RWMutex - opt *scheduleOption - limit uint64 - scoreStatus map[uint64]*StoreHotRegions // store id -> regions status in this store - r *rand.Rand + opt *scheduleOption + limit uint64 + majorScoreStatus map[uint64]*StoreHotRegions // store id -> regions status in this store + minorScoreStatus map[uint64]*StoreHotRegions // store id -> regions status in this store + r *rand.Rand } func newBalanceHotRegionScheduler(opt *scheduleOption) *balanceHotRegionScheduler { return &balanceHotRegionScheduler{ - opt: opt, - limit: 1, - scoreStatus: make(map[uint64]*StoreHotRegions), - r: rand.New(rand.NewSource(time.Now().UnixNano())), + opt: opt, + limit: 1, + majorScoreStatus: make(map[uint64]*StoreHotRegions), + minorScoreStatus: make(map[uint64]*StoreHotRegions), + r: rand.New(rand.NewSource(time.Now().UnixNano())), } } @@ -444,16 +453,26 @@ func (h *balanceHotRegionScheduler) Cleanup(cluster *clusterInfo) {} func (h *balanceHotRegionScheduler) Schedule(cluster *clusterInfo) Operator { h.calculateScores(cluster) - // balance by peer - srcRegion, srcPeer, destPeer := h.balanceByPeer(cluster) - if srcRegion != nil { - return newPriorityTransferPeer(srcRegion, srcPeer, destPeer) + // balance major hot regions + srcRegionMajor, srcPeerMajor, destPeerMajor := h.balanceByPeer(cluster, major) + if srcRegionMajor != nil { + return newPriorityTransferPeer(srcRegionMajor, srcPeerMajor, destPeerMajor) + } + + srcRegionMajor, newLeaderMajor := h.balanceByLeader(cluster, major) + if srcRegionMajor != nil { + return newPriorityTransferLeader(srcRegionMajor, newLeaderMajor) + } + + // balance minor hot regions + srcRegionMinor, srcPeerMinor, destPeerMinor := h.balanceByPeer(cluster, minor) + if srcRegionMinor != nil { + return newPriorityTransferPeer(srcRegionMinor, srcPeerMinor, destPeerMinor) } - // balance by leader - srcRegion, newLeader := h.balanceByLeader(cluster) - if srcRegion != nil { - return newPriorityTransferLeader(srcRegion, newLeader) + srcRegionMinor, newLeaderMinor := h.balanceByLeader(cluster, minor) + if srcRegionMinor != nil { + return newPriorityTransferLeader(srcRegionMinor, newLeaderMinor) } return nil @@ -462,8 +481,23 @@ func (h *balanceHotRegionScheduler) Schedule(cluster *clusterInfo) Operator { func (h *balanceHotRegionScheduler) calculateScores(cluster *clusterInfo) { h.Lock() defer h.Unlock() - h.scoreStatus = make(map[uint64]*StoreHotRegions) - items := cluster.writeStatistics.elems() + + h.calculateScoresImpl(cluster, major) + h.calculateScoresImpl(cluster, minor) +} + +func (h *balanceHotRegionScheduler) calculateScoresImpl(cluster *clusterInfo, t HotRegionType) { + scoreStatus := make(map[uint64]*StoreHotRegions) + var items []*cacheItem + switch t { + case major: + items = cluster.majorWriteStatistics.elems() + case minor: + items = cluster.minorWriteStatistics.elems() + default: + panic("Not supportted hot region type") + } + for _, item := range items { r, ok := item.value.(*RegionStat) if !ok { @@ -477,13 +511,13 @@ func (h *balanceHotRegionScheduler) calculateScores(cluster *clusterInfo) { LeaderStoreId := regionInfo.Leader.GetStoreId() StoreIds := regionInfo.GetStoreIds() for storeId := range StoreIds { - statistics, ok := h.scoreStatus[storeId] + statistics, ok := scoreStatus[storeId] if !ok { statistics = &StoreHotRegions{ RegionsStatAsLeader: make(RegionsStat, 0, storeHotRegionsDefaultLen), RegionsStatAsPeer: make(RegionsStat, 0, storeHotRegionsDefaultLen), } - h.scoreStatus[storeId] = statistics + scoreStatus[storeId] = statistics } stat := RegionStat{ @@ -506,17 +540,35 @@ func (h *balanceHotRegionScheduler) calculateScores(cluster *clusterInfo) { } } } + + switch t { + case major: + h.majorScoreStatus = scoreStatus + case minor: + h.minorScoreStatus = scoreStatus + default: + panic("Not supportted hot region type") + } } -func (h *balanceHotRegionScheduler) balanceByPeer(cluster *clusterInfo) (*RegionInfo, *metapb.Peer, *metapb.Peer) { +func (h *balanceHotRegionScheduler) balanceByPeer(cluster *clusterInfo, t HotRegionType) (*RegionInfo, *metapb.Peer, *metapb.Peer) { var ( maxWrittenBytes uint64 srcStoreId uint64 maxHotStoreRegionCount int + scoreStatus map[uint64]*StoreHotRegions ) // get the srcStoreId - for storeId, statistics := range h.scoreStatus { + switch t { + case major: + scoreStatus = h.majorScoreStatus + case minor: + scoreStatus = h.minorScoreStatus + default: + panic("Not supportted hot region type") + } + for storeId, statistics := range scoreStatus { if statistics.RegionsStatAsPeer.Len() < 2 { continue } @@ -539,8 +591,8 @@ func (h *balanceHotRegionScheduler) balanceByPeer(cluster *clusterInfo) (*Region stores := cluster.getStores() var destStoreId uint64 - for _, i := range h.r.Perm(h.scoreStatus[srcStoreId].RegionsStatAsPeer.Len()) { - rs := h.scoreStatus[srcStoreId].RegionsStatAsPeer[i] + for _, i := range h.r.Perm(scoreStatus[srcStoreId].RegionsStatAsPeer.Len()) { + rs := scoreStatus[srcStoreId].RegionsStatAsPeer[i] srcRegion := cluster.getRegion(rs.RegionID) if len(srcRegion.DownPeers) != 0 || len(srcRegion.PendingPeers) != 0 { continue @@ -559,7 +611,7 @@ func (h *balanceHotRegionScheduler) balanceByPeer(cluster *clusterInfo) (*Region destStoreIds = append(destStoreIds, store.GetId()) } - destStoreId = h.selectDestStoreByPeer(destStoreIds, srcRegion, srcStoreId) + destStoreId = h.selectDestStoreByPeer(destStoreIds, srcRegion, srcStoreId, t) if destStoreId != 0 { srcRegion.WrittenBytes = rs.WrittenBytes h.adjustBalanceLimitByPeer(srcStoreId) @@ -590,8 +642,17 @@ func (h *balanceHotRegionScheduler) balanceByPeer(cluster *clusterInfo) (*Region return nil, nil, nil } -func (h *balanceHotRegionScheduler) selectDestStoreByPeer(candidateStoreIds []uint64, srcRegion *RegionInfo, srcStoreId uint64) uint64 { - sr := h.scoreStatus[srcStoreId] +func (h *balanceHotRegionScheduler) selectDestStoreByPeer(candidateStoreIds []uint64, srcRegion *RegionInfo, srcStoreId uint64, t HotRegionType) uint64 { + var scoreStatus map[uint64]*StoreHotRegions + switch t { + case major: + scoreStatus = h.majorScoreStatus + case minor: + scoreStatus = h.minorScoreStatus + default: + panic("Not supportted hot region type") + } + sr := scoreStatus[srcStoreId] srcWrittenBytes := sr.WrittenBytesAsPeer srcHotRegionsCount := sr.RegionsStatAsPeer.Len() @@ -601,7 +662,7 @@ func (h *balanceHotRegionScheduler) selectDestStoreByPeer(candidateStoreIds []ui ) minRegionsCount := int(math.MaxInt32) for _, storeId := range candidateStoreIds { - if s, ok := h.scoreStatus[storeId]; ok { + if s, ok := scoreStatus[storeId]; ok { if srcHotRegionsCount-s.RegionsStatAsPeer.Len() > 1 && minRegionsCount > s.RegionsStatAsLeader.Len() { destStoreId = storeId minWrittenBytes = s.WrittenBytesAsPeer @@ -622,27 +683,36 @@ func (h *balanceHotRegionScheduler) selectDestStoreByPeer(candidateStoreIds []ui } func (h *balanceHotRegionScheduler) adjustBalanceLimitByPeer(storeID uint64) { - s := h.scoreStatus[storeID] + s := h.majorScoreStatus[storeID] var hotRegionTotalCount float64 - for _, m := range h.scoreStatus { + for _, m := range h.majorScoreStatus { hotRegionTotalCount += float64(m.RegionsStatAsPeer.Len()) } - avgRegionCount := hotRegionTotalCount / float64(len(h.scoreStatus)) + avgRegionCount := hotRegionTotalCount / float64(len(h.majorScoreStatus)) // Multiplied by hotRegionLimitFactor to avoid transfer back and forth limit := uint64((float64(s.RegionsStatAsPeer.Len()) - avgRegionCount) * hotRegionLimitFactor) h.limit = maxUint64(1, limit) } -func (h *balanceHotRegionScheduler) balanceByLeader(cluster *clusterInfo) (*RegionInfo, *metapb.Peer) { +func (h *balanceHotRegionScheduler) balanceByLeader(cluster *clusterInfo, t HotRegionType) (*RegionInfo, *metapb.Peer) { var ( maxWrittenBytes uint64 srcStoreId uint64 maxHotStoreRegionCount int + scoreStatus map[uint64]*StoreHotRegions ) + switch t { + case major: + scoreStatus = h.majorScoreStatus + case minor: + scoreStatus = h.minorScoreStatus + default: + panic("Not supportted hot region type") + } // select srcStoreId by leader - for storeId, statistics := range h.scoreStatus { + for storeId, statistics := range scoreStatus { if statistics.RegionsStatAsLeader.Len() < 2 { continue } @@ -664,14 +734,14 @@ func (h *balanceHotRegionScheduler) balanceByLeader(cluster *clusterInfo) (*Regi } // select destPeer - for _, i := range h.r.Perm(h.scoreStatus[srcStoreId].RegionsStatAsLeader.Len()) { - rs := h.scoreStatus[srcStoreId].RegionsStatAsLeader[i] + for _, i := range h.r.Perm(scoreStatus[srcStoreId].RegionsStatAsLeader.Len()) { + rs := scoreStatus[srcStoreId].RegionsStatAsLeader[i] srcRegion := cluster.getRegion(rs.RegionID) if len(srcRegion.DownPeers) != 0 || len(srcRegion.PendingPeers) != 0 { continue } - destPeer := h.selectDestStoreByLeader(srcRegion) + destPeer := h.selectDestStoreByLeader(srcRegion, t) if destPeer != nil { return srcRegion, destPeer } @@ -679,8 +749,18 @@ func (h *balanceHotRegionScheduler) balanceByLeader(cluster *clusterInfo) (*Regi return nil, nil } -func (h *balanceHotRegionScheduler) selectDestStoreByLeader(srcRegion *RegionInfo) *metapb.Peer { - sr := h.scoreStatus[srcRegion.Leader.GetStoreId()] +func (h *balanceHotRegionScheduler) selectDestStoreByLeader(srcRegion *RegionInfo, t HotRegionType) *metapb.Peer { + var scoreStatus map[uint64]*StoreHotRegions + switch t { + case major: + scoreStatus = h.majorScoreStatus + case minor: + scoreStatus = h.minorScoreStatus + default: + panic("Not supportted hot region type") + } + + sr := scoreStatus[srcRegion.Leader.GetStoreId()] srcWrittenBytes := sr.WrittenBytesAsLeader srcHotRegionsCount := sr.RegionsStatAsLeader.Len() @@ -690,7 +770,7 @@ func (h *balanceHotRegionScheduler) selectDestStoreByLeader(srcRegion *RegionInf ) minRegionsCount := int(math.MaxInt32) for storeId, peer := range srcRegion.GetFollowers() { - if s, ok := h.scoreStatus[storeId]; ok { + if s, ok := scoreStatus[storeId]; ok { if srcHotRegionsCount-s.RegionsStatAsLeader.Len() > 1 && minRegionsCount > s.RegionsStatAsLeader.Len() { destPeer = peer minWrittenBytes = s.WrittenBytesAsLeader @@ -714,7 +794,7 @@ func (h *balanceHotRegionScheduler) GetStatus() map[uint64]*StoreHotRegions { h.RLock() defer h.RUnlock() status := make(map[uint64]*StoreHotRegions) - for id, stat := range h.scoreStatus { + for id, stat := range h.majorScoreStatus { clone := *stat status[id] = &clone } diff --git a/server/balancer_test.go b/server/balancer_test.go index e5787987bd6..23a3898f2db 100644 --- a/server/balancer_test.go +++ b/server/balancer_test.go @@ -841,7 +841,7 @@ func (s *testBalanceHotRegionSchedulerSuite) TestBalance(c *C) { } hb.calculateScore(tc.clusterInfo) for _, e := range expect { - c.Assert(hb.scoreStatus[uint64(e.streID)].RegionCount, Equals, e.hotRegionNumber) + c.Assert(hb.majorScoreStatus[uint64(e.streID)].RegionCount, Equals, e.hotRegionNumber) } // Test adjustLimit diff --git a/server/cache.go b/server/cache.go index 8eceda0720e..22a3799000d 100644 --- a/server/cache.go +++ b/server/cache.go @@ -333,16 +333,18 @@ type clusterInfo struct { stores *storesInfo regions *regionsInfo - activeRegions int - writeStatistics *lruCache + activeRegions int + majorWriteStatistics *lruCache + minorWriteStatistics *lruCache } func newClusterInfo(id IDAllocator) *clusterInfo { return &clusterInfo{ - id: id, - stores: newStoresInfo(), - regions: newRegionsInfo(), - writeStatistics: newLRUCache(writeStatLRUMaxLen), + id: id, + stores: newStoresInfo(), + regions: newRegionsInfo(), + majorWriteStatistics: newLRUCache(writeStatLRUMaxLen), + minorWriteStatistics: newLRUCache(writeStatLRUMaxLen), } } @@ -490,10 +492,51 @@ func (c *clusterInfo) getRegion(regionID uint64) *RegionInfo { } // updateWriteStatCache updates statistic for a region if it's hot, or remove it from statistics if it cools down -func (c *clusterInfo) updateWriteStatCache(region *RegionInfo, hotRegionThreshold uint64) { +func (c *clusterInfo) updateWriteStatCache(region *RegionInfo, majorHotRegionThreshold, minorHotRegionThreshold uint64) { + if !c.updateMajorWriteStatCache(region, majorHotRegionThreshold) { + c.updateMinorWriteStatCache(region, minorHotRegionThreshold) + } +} + +func (c *clusterInfo) updateMajorWriteStatCache(region *RegionInfo, majorHotRegionThreshold uint64) bool { + var v *RegionStat + key := region.GetId() + value, isExist := c.majorWriteStatistics.peek(key) + newItem := &RegionStat{ + RegionID: region.GetId(), + WrittenBytes: region.WrittenBytes, + LastUpdateTime: time.Now(), + StoreID: region.Leader.GetStoreId(), + version: region.GetRegionEpoch().GetVersion(), + antiCount: hotRegionAntiCount, + } + + if isExist { + v = value.(*RegionStat) + newItem.HotDegree = v.HotDegree + 1 + } + + if region.WrittenBytes < majorHotRegionThreshold { + if !isExist { + return false + } + if v.antiCount <= 0 { + c.majorWriteStatistics.remove(key) + return false + } + // eliminate some noise + newItem.HotDegree = v.HotDegree - 1 + newItem.antiCount = v.antiCount - 1 + newItem.WrittenBytes = v.WrittenBytes + } + c.majorWriteStatistics.add(key, newItem) + return true +} + +func (c *clusterInfo) updateMinorWriteStatCache(region *RegionInfo, minorHotRegionThreshold uint64) { var v *RegionStat key := region.GetId() - value, isExist := c.writeStatistics.peek(key) + value, isExist := c.minorWriteStatistics.peek(key) newItem := &RegionStat{ RegionID: region.GetId(), WrittenBytes: region.WrittenBytes, @@ -508,12 +551,12 @@ func (c *clusterInfo) updateWriteStatCache(region *RegionInfo, hotRegionThreshol newItem.HotDegree = v.HotDegree + 1 } - if region.WrittenBytes < hotRegionThreshold { + if region.WrittenBytes < minorHotRegionThreshold { if !isExist { return } if v.antiCount <= 0 { - c.writeStatistics.remove(key) + c.minorWriteStatistics.remove(key) return } // eliminate some noise @@ -521,7 +564,7 @@ func (c *clusterInfo) updateWriteStatCache(region *RegionInfo, hotRegionThreshol newItem.antiCount = v.antiCount - 1 newItem.WrittenBytes = v.WrittenBytes } - c.writeStatistics.add(key, newItem) + c.minorWriteStatistics.add(key, newItem) } func (c *clusterInfo) searchRegion(regionKey []byte) *RegionInfo { @@ -725,7 +768,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *RegionInfo) error { func (c *clusterInfo) updateWriteStatus(region *RegionInfo) { var WrittenBytesPerSec uint64 - v, isExist := c.writeStatistics.peek(region.GetId()) + v, isExist := c.majorWriteStatistics.peek(region.GetId()) if isExist { interval := time.Now().Sub(v.(*RegionStat).LastUpdateTime).Seconds() if interval < minHotRegionReportInterval { @@ -742,10 +785,10 @@ func (c *clusterInfo) updateWriteStatus(region *RegionInfo) { // and we use total written Bytes past storeHeartBeatReportInterval seconds to divide the number of hot regions // divide 2 because the store reports data about two times than the region record write to rocksdb divisor := float64(writeStatLRUMaxLen) * 2 * storeHeartBeatReportInterval - hotRegionThreshold := uint64(float64(c.getClusterTotalWrittenBytes()) / divisor) + majorHotRegionThreshold := uint64(float64(c.getClusterTotalWrittenBytes()) / divisor) - if hotRegionThreshold < hotRegionMinWriteRate { - hotRegionThreshold = hotRegionMinWriteRate + if majorHotRegionThreshold < majorHotRegionMinWriteRate { + majorHotRegionThreshold = majorHotRegionMinWriteRate } - c.updateWriteStatCache(region, hotRegionThreshold) + c.updateWriteStatCache(region, majorHotRegionThreshold, minorHotRegionMinWriteRate) } diff --git a/server/coordinator.go b/server/coordinator.go index 607f7011fd8..e7fe138b286 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -38,7 +38,8 @@ const ( storeHotRegionsDefaultLen = 100 hotRegionLimitFactor = 0.75 hotRegionScheduleFactor = 0.9 - hotRegionMinWriteRate = 16 * 1024 + majorHotRegionMinWriteRate = 16 * 1024 + minorHotRegionMinWriteRate = 1 * 1024 regionHeartBeatReportInterval = 60 storeHeartBeatReportInterval = 10 minHotRegionReportInterval = 3