Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistic: collect both read and write pending influence at the same time #5521

Merged
merged 3 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func (mc *Cluster) IsRegionHot(region *core.RegionInfo) bool {
return mc.HotCache.IsRegionHot(region, mc.GetHotRegionCacheHitsThreshold())
}

// GetHotPeerStat returns hot peer stat with specified regionID and storeID.
func (mc *Cluster) GetHotPeerStat(rw statistics.RWType, regionID, storeID uint64) *statistics.HotPeerStat {
return mc.HotCache.GetHotPeerStat(rw, regionID, storeID)
}

// RegionReadStats returns hot region's read stats.
// The result only includes peers that are hot enough.
func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
Expand Down
5 changes: 5 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1994,6 +1994,11 @@ func (c *RaftCluster) IsRegionHot(region *core.RegionInfo) bool {
return c.hotStat.IsRegionHot(region, c.opt.GetHotRegionCacheHitsThreshold())
}

// GetHotPeerStat returns hot peer stat with specified regionID and storeID.
func (c *RaftCluster) GetHotPeerStat(rw statistics.RWType, regionID, storeID uint64) *statistics.HotPeerStat {
return c.hotStat.GetHotPeerStat(rw, regionID, storeID)
}

// RegionReadStats returns hot region's read stats.
// The result only includes peers that are hot enough.
// RegionStats is a thread-safe method
Expand Down
17 changes: 13 additions & 4 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,15 +642,13 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool {
// main peer
srcStoreID := bs.best.srcStore.GetID()
dstStoreID := bs.best.dstStore.GetID()
infl := statistics.Influence{Loads: make([]float64, statistics.RegionStatCount), Count: 1}
bs.rwTy.SetFullLoadRates(infl.Loads, bs.best.mainPeerStat.GetLoads())
infl := bs.collectPendingInfluence(bs.best.mainPeerStat)
if !bs.sche.tryAddPendingInfluence(bs.ops[0], srcStoreID, dstStoreID, infl, maxZombieDur) {
return false
}
// revert peers
if bs.best.revertPeerStat != nil {
infl = statistics.Influence{Loads: make([]float64, statistics.RegionStatCount), Count: 1}
bs.rwTy.SetFullLoadRates(infl.Loads, bs.best.revertPeerStat.GetLoads())
infl := bs.collectPendingInfluence(bs.best.revertPeerStat)
if !bs.sche.tryAddPendingInfluence(bs.ops[1], dstStoreID, srcStoreID, infl, maxZombieDur) {
return false
}
Expand All @@ -659,6 +657,17 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool {
return true
}

func (bs *balanceSolver) collectPendingInfluence(peer *statistics.HotPeerStat) statistics.Influence {
infl := statistics.Influence{Loads: make([]float64, statistics.RegionStatCount), Count: 1}
bs.rwTy.SetFullLoadRates(infl.Loads, peer.GetLoads())
inverse := bs.rwTy.Inverse()
another := bs.GetHotPeerStat(inverse, peer.RegionID, peer.StoreID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer summary all loads at a same time (including read and write) in prepareForBalance, then get it from the stLoadInfos. it is at a same "snapshot" rather than get it one by one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we don't need to use read and write statistics together. I think we can do this when we unify read and write schedulers later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it will be placed into together when we unify read and write schedulers.

if another != nil {
inverse.SetFullLoadRates(infl.Loads, another.GetLoads())
}
return infl
}

// Depending on the source of the statistics used, a different ZombieDuration will be used.
// If the statistics are from the sum of Regions, there will be a longer ZombieDuration.
func (bs *balanceSolver) calcMaxZombieDur() time.Duration {
Expand Down
19 changes: 19 additions & 0 deletions server/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,25 @@ func (w *HotCache) IsRegionHot(region *core.RegionInfo, minHotDegree int) bool {
return false
}

// GetHotPeerStat returns hot peer stat with specified regionID and storeID.
func (w *HotCache) GetHotPeerStat(rw RWType, regionID, storeID uint64) *HotPeerStat {
switch rw {
case Read:
task := newGetHotPeerStatTask(regionID, storeID)
succ := w.CheckReadAsync(task)
if succ {
return task.waitRet(w.ctx)
}
case Write:
task := newGetHotPeerStatTask(regionID, storeID)
succ := w.CheckWriteAsync(task)
if succ {
return task.waitRet(w.ctx)
}
}
return nil
}

// CollectMetrics collects the hot cache metrics.
func (w *HotCache) CollectMetrics() {
writeMetricsTask := newCollectMetricsTask("write")
Expand Down
33 changes: 33 additions & 0 deletions server/statistics/hot_cache_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
collectRegionStatsTaskType
isRegionHotTaskType
collectMetricsTaskType
getHotPeerStatTaskType
)

// FlowItemTask indicates the task in flowItem queue
Expand Down Expand Up @@ -188,3 +189,35 @@ func (t *collectMetricsTask) taskType() flowItemTaskKind {
func (t *collectMetricsTask) runTask(cache *hotPeerCache) {
cache.collectMetrics(t.typ)
}

type getHotPeerStatTask struct {
regionID uint64
storeID uint64
ret chan *HotPeerStat
}

func newGetHotPeerStatTask(regionID, storeID uint64) *getHotPeerStatTask {
return &getHotPeerStatTask{
regionID: regionID,
storeID: storeID,
ret: make(chan *HotPeerStat, 1),
}
}

func (t *getHotPeerStatTask) taskType() flowItemTaskKind {
return getHotPeerStatTaskType
}

func (t *getHotPeerStatTask) runTask(cache *hotPeerCache) {
t.ret <- cache.getHotPeerStat(t.regionID, t.storeID)
}

// TODO: do we need a wait-return timeout?
func (t *getHotPeerStatTask) waitRet(ctx context.Context) *HotPeerStat {
select {
case <-ctx.Done():
return nil
case r := <-t.ret:
return r
}
}
14 changes: 10 additions & 4 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,19 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb
if peer == nil {
return false
}
storeID := peer.GetStoreId()
if stat := f.getHotPeerStat(region.GetID(), peer.GetStoreId()); stat != nil {
return stat.HotDegree >= hotDegree
}
return false
}

func (f *hotPeerCache) getHotPeerStat(regionID, storeID uint64) *HotPeerStat {
if peers, ok := f.peersOfStore[storeID]; ok {
if stat := peers.Get(region.GetID()); stat != nil {
return stat.(*HotPeerStat).HotDegree >= hotDegree
if stat := peers.Get(regionID); stat != nil {
return stat.(*HotPeerStat)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there times when there may be insufficient statistical confidence?

}
}
return false
return nil
}

func (f *hotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat {
Expand Down
11 changes: 11 additions & 0 deletions server/statistics/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,17 @@ func (rw RWType) RegionStats() []RegionStatKind {
return nil
}

// Inverse returns the opposite of kind.
func (rw RWType) Inverse() RWType {
switch rw {
case Write:
return Read
case Read:
return Write
}
return Read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why default read?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

== Case Write:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are only two kind in RWType

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can panic if not match

}

// GetLoadRatesFromPeer gets the load rates of the read or write type from PeerInfo.
func (rw RWType) GetLoadRatesFromPeer(peer *core.PeerInfo) []float64 {
deltaLoads := peer.GetLoads()
Expand Down
1 change: 1 addition & 0 deletions server/statistics/region_stat_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import "github.com/tikv/pd/server/core"

// RegionStatInformer provides access to a shared informer of statistics.
type RegionStatInformer interface {
GetHotPeerStat(rw RWType, regionID, storeID uint64) *HotPeerStat
IsRegionHot(region *core.RegionInfo) bool
// RegionWriteStats return the storeID -> write stat of peers on this store.
// The result only includes peers that are hot enough.
Expand Down