Skip to content

Commit

Permalink
statistic: collect both read and write pending influence at the same …
Browse files Browse the repository at this point in the history
…time (#5521)

ref #4949

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: 混沌DM <[email protected]>
  • Loading branch information
lhy1024 and HunDunDM authored Sep 19, 2022
1 parent 1e1732c commit ccb97f3
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 8 deletions.
5 changes: 5 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func (mc *Cluster) IsRegionHot(region *core.RegionInfo) bool {
return mc.HotCache.IsRegionHot(region, mc.GetHotRegionCacheHitsThreshold())
}

// GetHotPeerStat returns hot peer stat with specified regionID and storeID.
func (mc *Cluster) GetHotPeerStat(rw statistics.RWType, regionID, storeID uint64) *statistics.HotPeerStat {
return mc.HotCache.GetHotPeerStat(rw, regionID, storeID)
}

// RegionReadStats returns hot region's read stats.
// The result only includes peers that are hot enough.
func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
Expand Down
5 changes: 5 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1993,6 +1993,11 @@ func (c *RaftCluster) IsRegionHot(region *core.RegionInfo) bool {
return c.hotStat.IsRegionHot(region, c.opt.GetHotRegionCacheHitsThreshold())
}

// GetHotPeerStat returns hot peer stat with specified regionID and storeID.
func (c *RaftCluster) GetHotPeerStat(rw statistics.RWType, regionID, storeID uint64) *statistics.HotPeerStat {
return c.hotStat.GetHotPeerStat(rw, regionID, storeID)
}

// RegionReadStats returns hot region's read stats.
// The result only includes peers that are hot enough.
// RegionStats is a thread-safe method
Expand Down
17 changes: 13 additions & 4 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,15 +642,13 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool {
// main peer
srcStoreID := bs.best.srcStore.GetID()
dstStoreID := bs.best.dstStore.GetID()
infl := statistics.Influence{Loads: make([]float64, statistics.RegionStatCount), Count: 1}
bs.rwTy.SetFullLoadRates(infl.Loads, bs.best.mainPeerStat.GetLoads())
infl := bs.collectPendingInfluence(bs.best.mainPeerStat)
if !bs.sche.tryAddPendingInfluence(bs.ops[0], srcStoreID, dstStoreID, infl, maxZombieDur) {
return false
}
// revert peers
if bs.best.revertPeerStat != nil {
infl = statistics.Influence{Loads: make([]float64, statistics.RegionStatCount), Count: 1}
bs.rwTy.SetFullLoadRates(infl.Loads, bs.best.revertPeerStat.GetLoads())
infl := bs.collectPendingInfluence(bs.best.revertPeerStat)
if !bs.sche.tryAddPendingInfluence(bs.ops[1], dstStoreID, srcStoreID, infl, maxZombieDur) {
return false
}
Expand All @@ -659,6 +657,17 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool {
return true
}

func (bs *balanceSolver) collectPendingInfluence(peer *statistics.HotPeerStat) statistics.Influence {
infl := statistics.Influence{Loads: make([]float64, statistics.RegionStatCount), Count: 1}
bs.rwTy.SetFullLoadRates(infl.Loads, peer.GetLoads())
inverse := bs.rwTy.Inverse()
another := bs.GetHotPeerStat(inverse, peer.RegionID, peer.StoreID)
if another != nil {
inverse.SetFullLoadRates(infl.Loads, another.GetLoads())
}
return infl
}

// Depending on the source of the statistics used, a different ZombieDuration will be used.
// If the statistics are from the sum of Regions, there will be a longer ZombieDuration.
func (bs *balanceSolver) calcMaxZombieDur() time.Duration {
Expand Down
19 changes: 19 additions & 0 deletions server/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,25 @@ func (w *HotCache) IsRegionHot(region *core.RegionInfo, minHotDegree int) bool {
return false
}

// GetHotPeerStat returns hot peer stat with specified regionID and storeID.
func (w *HotCache) GetHotPeerStat(rw RWType, regionID, storeID uint64) *HotPeerStat {
switch rw {
case Read:
task := newGetHotPeerStatTask(regionID, storeID)
succ := w.CheckReadAsync(task)
if succ {
return task.waitRet(w.ctx)
}
case Write:
task := newGetHotPeerStatTask(regionID, storeID)
succ := w.CheckWriteAsync(task)
if succ {
return task.waitRet(w.ctx)
}
}
return nil
}

// CollectMetrics collects the hot cache metrics.
func (w *HotCache) CollectMetrics() {
writeMetricsTask := newCollectMetricsTask("write")
Expand Down
33 changes: 33 additions & 0 deletions server/statistics/hot_cache_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
collectRegionStatsTaskType
isRegionHotTaskType
collectMetricsTaskType
getHotPeerStatTaskType
)

// FlowItemTask indicates the task in flowItem queue
Expand Down Expand Up @@ -188,3 +189,35 @@ func (t *collectMetricsTask) taskType() flowItemTaskKind {
func (t *collectMetricsTask) runTask(cache *hotPeerCache) {
cache.collectMetrics(t.typ)
}

type getHotPeerStatTask struct {
regionID uint64
storeID uint64
ret chan *HotPeerStat
}

func newGetHotPeerStatTask(regionID, storeID uint64) *getHotPeerStatTask {
return &getHotPeerStatTask{
regionID: regionID,
storeID: storeID,
ret: make(chan *HotPeerStat, 1),
}
}

func (t *getHotPeerStatTask) taskType() flowItemTaskKind {
return getHotPeerStatTaskType
}

func (t *getHotPeerStatTask) runTask(cache *hotPeerCache) {
t.ret <- cache.getHotPeerStat(t.regionID, t.storeID)
}

// TODO: do we need a wait-return timeout?
func (t *getHotPeerStatTask) waitRet(ctx context.Context) *HotPeerStat {
select {
case <-ctx.Done():
return nil
case r := <-t.ret:
return r
}
}
14 changes: 10 additions & 4 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,19 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb
if peer == nil {
return false
}
storeID := peer.GetStoreId()
if stat := f.getHotPeerStat(region.GetID(), peer.GetStoreId()); stat != nil {
return stat.HotDegree >= hotDegree
}
return false
}

func (f *hotPeerCache) getHotPeerStat(regionID, storeID uint64) *HotPeerStat {
if peers, ok := f.peersOfStore[storeID]; ok {
if stat := peers.Get(region.GetID()); stat != nil {
return stat.(*HotPeerStat).HotDegree >= hotDegree
if stat := peers.Get(regionID); stat != nil {
return stat.(*HotPeerStat)
}
}
return false
return nil
}

func (f *hotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat {
Expand Down
11 changes: 11 additions & 0 deletions server/statistics/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,17 @@ func (rw RWType) RegionStats() []RegionStatKind {
return nil
}

// Inverse returns the opposite of kind.
func (rw RWType) Inverse() RWType {
switch rw {
case Write:
return Read
case Read:
return Write
}
return Read
}

// GetLoadRatesFromPeer gets the load rates of the read or write type from PeerInfo.
func (rw RWType) GetLoadRatesFromPeer(peer *core.PeerInfo) []float64 {
deltaLoads := peer.GetLoads()
Expand Down
1 change: 1 addition & 0 deletions server/statistics/region_stat_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import "github.com/tikv/pd/server/core"

// RegionStatInformer provides access to a shared informer of statistics.
type RegionStatInformer interface {
GetHotPeerStat(rw RWType, regionID, storeID uint64) *HotPeerStat
IsRegionHot(region *core.RegionInfo) bool
// RegionWriteStats return the storeID -> write stat of peers on this store.
// The result only includes peers that are hot enough.
Expand Down

0 comments on commit ccb97f3

Please sign in to comment.