Skip to content

Commit

Permalink
metrics: add scheduling hot peer load (#5525)
Browse files Browse the repository at this point in the history
ref #5521, fix #5527

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: 混沌DM <[email protected]>
  • Loading branch information
lhy1024 and HunDunDM authored Oct 14, 2022
1 parent 0a5b7b6 commit bc2019e
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 40 deletions.
4 changes: 4 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,10 @@ func (mc *Cluster) AddSuspectRegions(ids ...uint64) {
}
}

// SetHotPendingInfluenceMetrics mock method
func (mc *Cluster) SetHotPendingInfluenceMetrics(storeLabel, rwTy, dim string, load float64) {
}

// GetBasicCluster mock method
func (mc *Cluster) GetBasicCluster() *core.BasicCluster {
return mc.BasicCluster
Expand Down
5 changes: 5 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,11 @@ func (c *RaftCluster) deleteStoreLocked(store *core.StoreInfo) error {
return nil
}

// SetHotPendingInfluenceMetrics sets pending influence in hot scheduler.
func (c *RaftCluster) SetHotPendingInfluenceMetrics(storeLabel, rwTy, dim string, load float64) {
hotPendingSum.WithLabelValues(storeLabel, rwTy, dim).Set(load)
}

func (c *RaftCluster) collectMetrics() {
statsMap := statistics.NewStoreStatisticsMap(c.opt, c.storeConfigManager.GetStoreConfig())
stores := c.GetStores()
Expand Down
31 changes: 9 additions & 22 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,6 @@ func (c *coordinator) collectHotSpotMetrics() {
collectHotMetrics(c.cluster, stores, statistics.Write)
// Collects hot read region metrics.
collectHotMetrics(c.cluster, stores, statistics.Read)
// Collects pending influence.
collectPendingInfluence(stores)
}

func collectHotMetrics(cluster *RaftCluster, stores []*core.StoreInfo, typ statistics.RWType) {
Expand All @@ -563,8 +561,8 @@ func collectHotMetrics(cluster *RaftCluster, stores []*core.StoreInfo, typ stati
storeAddress := s.GetAddress()
storeID := s.GetID()
storeLabel := strconv.FormatUint(storeID, 10)
stat, ok := status.AsLeader[storeID]
if ok {
stat, hasHotLeader := status.AsLeader[storeID]
if hasHotLeader {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_bytes_as_leader").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_keys_as_leader").Set(stat.TotalKeysRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_query_as_leader").Set(stat.TotalQueryRate)
Expand All @@ -576,8 +574,8 @@ func collectHotMetrics(cluster *RaftCluster, stores []*core.StoreInfo, typ stati
hotSpotStatusGauge.DeleteLabelValues(storeAddress, storeLabel, "hot_"+kind+"_region_as_leader")
}

stat, ok = status.AsPeer[storeID]
if ok {
stat, hasHotPeer := status.AsPeer[storeID]
if hasHotPeer {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_bytes_as_peer").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_keys_as_peer").Set(stat.TotalKeysRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_query_as_peer").Set(stat.TotalQueryRate)
Expand All @@ -588,29 +586,18 @@ func collectHotMetrics(cluster *RaftCluster, stores []*core.StoreInfo, typ stati
hotSpotStatusGauge.DeleteLabelValues(storeAddress, storeLabel, "total_"+kind+"_query_as_peer")
hotSpotStatusGauge.DeleteLabelValues(storeAddress, storeLabel, "hot_"+kind+"_region_as_peer")
}
}
}

func collectPendingInfluence(stores []*core.StoreInfo) {
pendings := statistics.GetPendingInfluence(stores)
for _, s := range stores {
storeAddress := s.GetAddress()
storeID := s.GetID()
storeLabel := strconv.FormatUint(storeID, 10)
if infl := pendings[storeID]; infl != nil {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_byte_rate").Set(infl.Loads[statistics.RegionReadBytes])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_key_rate").Set(infl.Loads[statistics.RegionReadKeys])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_query_rate").Set(infl.Loads[statistics.RegionReadQueryNum])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_byte_rate").Set(infl.Loads[statistics.RegionWriteBytes])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_key_rate").Set(infl.Loads[statistics.RegionWriteKeys])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_query_rate").Set(infl.Loads[statistics.RegionWriteQueryNum])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "pending_influence_count").Set(infl.Count)
if !hasHotLeader && !hasHotPeer {
statistics.ForeachRegionStats(func(rwTy statistics.RWType, dim int, _ statistics.RegionStatKind) {
hotPendingSum.DeleteLabelValues(storeLabel, rwTy.String(), statistics.DimToString(dim))
})
}
}
}

func (c *coordinator) resetHotSpotMetrics() {
hotSpotStatusGauge.Reset()
hotPendingSum.Reset()
}

func (c *coordinator) shouldRun() bool {
Expand Down
8 changes: 8 additions & 0 deletions server/cluster/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ var (
Help: "Status of the hotspot.",
}, []string{"address", "store", "type"})

hotPendingSum = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "scheduler",
Name: "hot_pending_sum",
Help: "Pending influence sum of store in hot region scheduler.",
}, []string{"store", "rw", "dim"})

patrolCheckRegionsGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "pd",
Expand Down
1 change: 1 addition & 0 deletions server/schedule/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ type Cluster interface {

RemoveScheduler(name string) error
AddSuspectRegions(ids ...uint64)
SetHotPendingInfluenceMetrics(storeLabel, rwTy, dim string, load float64)
}
15 changes: 13 additions & 2 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster)
// each store
func (h *hotScheduler) prepareForBalance(typ statistics.RWType, cluster schedule.Cluster) {
h.stInfos = statistics.SummaryStoreInfos(cluster.GetStores())
h.summaryPendingInfluence()
h.summaryPendingInfluence(cluster)
storesLoads := cluster.GetStoresLoads()
isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow()

Expand Down Expand Up @@ -223,7 +223,7 @@ func (h *hotScheduler) prepareForBalance(typ statistics.RWType, cluster schedule
// summaryPendingInfluence calculate the summary of pending Influence for each store
// and clean the region from regionInfluence if they have ended operator.
// It makes each dim rate or count become `weight` times to the origin value.
func (h *hotScheduler) summaryPendingInfluence() {
func (h *hotScheduler) summaryPendingInfluence(cluster schedule.Cluster) {
for id, p := range h.regionPendings {
from := h.stInfos[p.from]
to := h.stInfos[p.to]
Expand All @@ -248,6 +248,14 @@ func (h *hotScheduler) summaryPendingInfluence() {
to.AddInfluence(&p.origin, weight)
}
}
for storeID, info := range h.stInfos {
storeLabel := strconv.FormatUint(storeID, 10)
if infl := info.PendingSum; infl != nil {
statistics.ForeachRegionStats(func(rwTy statistics.RWType, dim int, kind statistics.RegionStatKind) {
cluster.SetHotPendingInfluenceMetrics(storeLabel, rwTy.String(), statistics.DimToString(dim), infl.Loads[kind])
})
}
}
}

func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool {
Expand All @@ -262,6 +270,9 @@ func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore, d
h.regionPendings[regionID] = influence

schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc()
statistics.ForeachRegionStats(func(rwTy statistics.RWType, dim int, kind statistics.RegionStatKind) {
hotPeerHist.WithLabelValues(h.GetName(), rwTy.String(), statistics.DimToString(dim)).Observe(infl.Loads[kind])
})
return true
}

Expand Down
6 changes: 3 additions & 3 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestGCPendingOpInfos(t *testing.T) {
}
}

hb.summaryPendingInfluence() // Calling this function will GC.
hb.summaryPendingInfluence(tc) // Calling this function will GC.

for i := range opInfluenceCreators {
for j, typ := range typs {
Expand Down Expand Up @@ -1781,7 +1781,7 @@ func TestInfluenceByRWType(t *testing.T) {
op := ops[0]
re.NotNil(op)

hb.(*hotScheduler).summaryPendingInfluence()
hb.(*hotScheduler).summaryPendingInfluence(tc)
stInfos := hb.(*hotScheduler).stInfos
re.True(nearlyAbout(stInfos[1].PendingSum.Loads[statistics.RegionWriteKeys], -0.5*units.MiB))
re.True(nearlyAbout(stInfos[1].PendingSum.Loads[statistics.RegionWriteBytes], -0.5*units.MiB))
Expand All @@ -1806,7 +1806,7 @@ func TestInfluenceByRWType(t *testing.T) {
op = ops[0]
re.NotNil(op)

hb.(*hotScheduler).summaryPendingInfluence()
hb.(*hotScheduler).summaryPendingInfluence(tc)
stInfos = hb.(*hotScheduler).stInfos
// assert read/write influence is the sum of write peer and write leader
re.True(nearlyAbout(stInfos[1].PendingSum.Loads[statistics.RegionWriteKeys], -1.2*units.MiB))
Expand Down
12 changes: 11 additions & 1 deletion server/schedulers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,18 @@ var hotPendingStatus = prometheus.NewGaugeVec(
Namespace: "pd",
Subsystem: "scheduler",
Name: "hot_pending",
Help: "Counter of direction of balance related schedulers.",
Help: "Pending influence status in hot region scheduler.",
}, []string{"type", "source", "target"})

var hotPeerHist = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "scheduler",
Name: "hot_peer",
Help: "Bucketed histogram of the scheduling hot peer.",
Buckets: prometheus.ExponentialBuckets(1, 2, 30),
}, []string{"type", "rw", "dim"})

func init() {
prometheus.MustRegister(schedulerCounter)
prometheus.MustRegister(schedulerStatus)
Expand All @@ -125,4 +134,5 @@ func init() {
prometheus.MustRegister(opInfluenceStatus)
prometheus.MustRegister(tolerantResourceStatus)
prometheus.MustRegister(hotPendingStatus)
prometheus.MustRegister(hotPeerHist)
}
9 changes: 9 additions & 0 deletions server/statistics/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ func (rw RWType) Inverse() RWType {
}
}

// ForeachRegionStats foreach all region stats of read and write.
func ForeachRegionStats(f func(RWType, int, RegionStatKind)) {
for _, rwTy := range []RWType{Read, Write} {
for dim, kind := range rwTy.RegionStats() {
f(rwTy, dim, kind)
}
}
}

// GetLoadRatesFromPeer gets the load rates of the read or write type from PeerInfo.
func (rw RWType) GetLoadRatesFromPeer(peer *core.PeerInfo) []float64 {
deltaLoads := peer.GetLoads()
Expand Down
12 changes: 0 additions & 12 deletions server/statistics/store_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,6 @@ func (s *StoreSummaryInfo) SetEngineAsTiFlash() {
s.isTiFlash = true
}

// GetPendingInfluence returns the current pending influence.
func GetPendingInfluence(stores []*core.StoreInfo) map[uint64]*Influence {
stInfos := SummaryStoreInfos(stores)
ret := make(map[uint64]*Influence, len(stInfos))
for id, info := range stInfos {
if info.PendingSum != nil {
ret[id] = info.PendingSum
}
}
return ret
}

// StoreLoad records the current load.
type StoreLoad struct {
Loads []float64
Expand Down

0 comments on commit bc2019e

Please sign in to comment.