Skip to content

Commit

Permalink
tiny fix
Browse files Browse the repository at this point in the history
Signed-off-by: HunDunDM <[email protected]>
  • Loading branch information
HunDunDM committed Sep 15, 2022
1 parent 7c41413 commit fe56259
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 16 deletions.
16 changes: 8 additions & 8 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,9 @@ func (s *solution) getPendingLoad(dim int) (src float64, dst float64) {
func (s *solution) calcPeersRate(dims ...int) {
s.cachedPeersRate = make([]float64, statistics.DimLen)
for _, dim := range dims {
peersRate := s.mainPeerStat.Loads[dim]
peersRate := s.mainPeerStat.GetLoad(dim)
if s.revertPeerStat != nil {
peersRate -= s.revertPeerStat.Loads[dim]
peersRate -= s.revertPeerStat.GetLoad(dim)
}
s.cachedPeersRate[dim] = peersRate
}
Expand Down Expand Up @@ -649,14 +649,14 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool {
srcStoreID := bs.best.srcStore.GetID()
dstStoreID := bs.best.dstStore.GetID()
infl := statistics.Influence{Loads: make([]float64, statistics.RegionStatCount), Count: 1}
bs.rwTy.SetFullLoads(infl.Loads, bs.best.mainPeerStat.Loads)
bs.rwTy.SetFullLoadRates(infl.Loads, bs.best.mainPeerStat.GetLoads())
if !bs.sche.tryAddPendingInfluence(bs.ops[0], srcStoreID, dstStoreID, infl, maxZombieDur) {
return false
}
// revert peers
if bs.best.revertPeerStat != nil {
infl = statistics.Influence{Loads: make([]float64, statistics.RegionStatCount), Count: 1}
bs.rwTy.SetFullLoads(infl.Loads, bs.best.revertPeerStat.Loads)
bs.rwTy.SetFullLoadRates(infl.Loads, bs.best.revertPeerStat.GetLoads())
if !bs.sche.tryAddPendingInfluence(bs.ops[1], dstStoreID, srcStoreID, infl, maxZombieDur) {
return false
}
Expand Down Expand Up @@ -737,12 +737,12 @@ func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat) map[*statis
firstSort := make([]*statistics.HotPeerStat, len(ret))
copy(firstSort, ret)
sort.Slice(firstSort, func(i, j int) bool {
return firstSort[i].Loads[bs.firstPriority] > firstSort[j].Loads[bs.firstPriority]
return firstSort[i].GetLoad(bs.firstPriority) > firstSort[j].GetLoad(bs.firstPriority)
})
secondSort := make([]*statistics.HotPeerStat, len(ret))
copy(secondSort, ret)
sort.Slice(secondSort, func(i, j int) bool {
return secondSort[i].Loads[bs.secondPriority] > secondSort[j].Loads[bs.secondPriority]
return secondSort[i].GetLoad(bs.secondPriority) > secondSort[j].GetLoad(bs.secondPriority)
})
union := make(map[*statistics.HotPeerStat]struct{}, bs.maxPeerNum)
for len(union) < bs.maxPeerNum {
Expand Down Expand Up @@ -1315,8 +1315,8 @@ func (bs *balanceSolver) logBestSolution() {
// Log more information on solutions containing revertRegion
srcFirstRate, dstFirstRate := best.getExtremeLoad(bs.firstPriority)
srcSecondRate, dstSecondRate := best.getExtremeLoad(bs.secondPriority)
mainFirstRate := best.mainPeerStat.Loads[bs.firstPriority]
mainSecondRate := best.mainPeerStat.Loads[bs.secondPriority]
mainFirstRate := best.mainPeerStat.GetLoad(bs.firstPriority)
mainSecondRate := best.mainPeerStat.GetLoad(bs.secondPriority)
log.Info("use solution with revert regions",
zap.Uint64("src-store", best.srcStore.GetID()),
zap.Float64("src-first-rate", srcFirstRate),
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ func TestHotReadRegionScheduleByteRateOnly(t *testing.T) {
re.Len(stats, 3)
for _, ss := range stats {
for _, s := range ss {
re.Less(500.0*units.KiB, s.Loads[statistics.ByteDim])
re.Less(500.0*units.KiB, s.GetLoad(statistics.ByteDim))
}
}

Expand Down
34 changes: 34 additions & 0 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/movingaverage"
"github.com/tikv/pd/pkg/slice"
"go.uber.org/zap"
)

// Indicator dims.
Expand Down Expand Up @@ -125,6 +126,27 @@ func (stat *HotPeerStat) Less(dim int, than TopNItem) bool {
return stat.GetLoad(dim) < than.(*HotPeerStat).GetLoad(dim)
}

// Log is used to output some info
func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Field)) {
level(str,
zap.Uint64("interval", stat.interval),
zap.Uint64("region-id", stat.RegionID),
zap.Uint64("store", stat.StoreID),
zap.Bool("is-leader", stat.isLeader),
zap.Bool("is-learner", stat.isLearner),
zap.String("type", stat.Kind.String()),
zap.Float64s("loads", stat.GetLoads()),
zap.Float64s("loads-instant", stat.Loads),
zap.Float64s("thresholds", stat.thresholds),
zap.Int("hot-degree", stat.HotDegree),
zap.Int("hot-anti-count", stat.AntiCount),
zap.Duration("sum-interval", stat.getIntervalSum()),
zap.String("source", stat.source.String()),
zap.Bool("allow-inherited", stat.allowInherited),
zap.String("action-type", stat.actionType.String()),
zap.Time("last-transfer-leader-time", stat.lastTransferLeaderTime))
}

// IsNeedCoolDownTransferLeader use cooldown time after transfer leader to avoid unnecessary schedule
func (stat *HotPeerStat) IsNeedCoolDownTransferLeader(minHotDegree int) bool {
return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*stat.hotStatReportInterval())
Expand All @@ -148,6 +170,18 @@ func (stat *HotPeerStat) GetLoad(dim int) float64 {
return math.Round(stat.Loads[dim])
}

// GetLoads returns denoising loads if possible.
func (stat *HotPeerStat) GetLoads() []float64 {
if stat.rollingLoads != nil {
ret := make([]float64, len(stat.rollingLoads))
for dim := range ret {
ret[dim] = math.Round(stat.rollingLoads[dim].Get())
}
return ret
}
return stat.Loads
}

// GetThresholds returns thresholds.
// Only for test purpose.
func (stat *HotPeerStat) GetThresholds() []float64 {
Expand Down
9 changes: 6 additions & 3 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server/core"
)
Expand Down Expand Up @@ -102,6 +103,7 @@ func (f *hotPeerCache) updateStat(item *HotPeerStat) {
switch item.actionType {
case Remove:
f.removeItem(item)
item.Log("region heartbeat remove from cache", log.Debug)
incMetrics("remove_item", item.StoreID, item.Kind)
return
case Add:
Expand All @@ -111,6 +113,7 @@ func (f *hotPeerCache) updateStat(item *HotPeerStat) {
}
// for add and update
f.putItem(item)
item.Log("region heartbeat update", log.Debug)
}

func (f *hotPeerCache) collectPeerMetrics(loads []float64, interval uint64) {
Expand Down Expand Up @@ -173,7 +176,7 @@ func (f *hotPeerCache) checkPeerFlow(peer *core.PeerInfo, region *core.RegionInf
StoreID: storeID,
RegionID: regionID,
Kind: f.kind,
Loads: f.kind.GetLoadsFromPeer(peer),
Loads: f.kind.GetLoadRatesFromPeer(peer),
LastUpdateTime: time.Now(),
isLeader: region.GetLeader().GetStoreId() == storeID,
isLearner: core.IsLearner(region.GetPeer(storeID)),
Expand Down Expand Up @@ -270,8 +273,8 @@ func (f *hotPeerCache) getOldHotPeerStat(regionID, storeID uint64) *HotPeerStat
func (f *hotPeerCache) calcHotThresholds(storeID uint64) []float64 {
statKinds := f.kind.RegionStats()
ret := make([]float64, DimLen)
for i, k := range statKinds {
ret[i] = MinHotThresholds[k]
for dim, kind := range statKinds {
ret[dim] = MinHotThresholds[kind]
}
tn, ok := f.peersOfStore[storeID]
if !ok || tn.Len() < TopNN {
Expand Down
1 change: 1 addition & 0 deletions server/statistics/hot_peer_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ func TestUnstableData(t *testing.T) {
}
}

// Previously, there was a mixed use of dim and kind, which caused inconsistencies in write-related statistics.
func TestHotPeerCacheTopN(t *testing.T) {
re := require.New(t)

Expand Down
8 changes: 4 additions & 4 deletions server/statistics/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ func (rw RWType) RegionStats() []RegionStatKind {
return nil
}

// GetLoadsFromPeer gets the loads of the read or write type from PeerInfo.
func (rw RWType) GetLoadsFromPeer(peer *core.PeerInfo) []float64 {
// GetLoadRatesFromPeer gets the load rates of the read or write type from PeerInfo.
func (rw RWType) GetLoadRatesFromPeer(peer *core.PeerInfo) []float64 {
deltaLoads := peer.GetLoads()
interval := peer.GetInterval()
loads := make([]float64, DimLen)
Expand All @@ -165,8 +165,8 @@ func (rw RWType) GetLoadsFromPeer(peer *core.PeerInfo) []float64 {
return loads
}

// SetFullLoads set loads to full as read or write type.
func (rw RWType) SetFullLoads(full []float64, loads []float64) {
// SetFullLoadRates set load rates to full as read or write type.
func (rw RWType) SetFullLoadRates(full []float64, loads []float64) {
for dim, k := range rw.RegionStats() {
full[k] = loads[dim]
}
Expand Down

0 comments on commit fe56259

Please sign in to comment.