Skip to content

Commit

Permalink
schedulers: add some metrics for hotreigon (tikv#2295)
Browse files Browse the repository at this point in the history
* schedulers: add some metrics for hotreigon

Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch authored Mar 28, 2020
1 parent 396d23c commit 43c3e95
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 3 deletions.
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
"go.uber.org/zap"
)

var backgroundJobInterval = time.Minute
var backgroundJobInterval = 10 * time.Second

const (
clientTimeout = 3 * time.Second
Expand Down
6 changes: 6 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,18 +402,22 @@ func (c *coordinator) collectHotSpotMetrics() {
stat, ok := status.AsPeer[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(0)
}

stat, ok = status.AsLeader[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(stat.TotalKeysRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(0)
}

Expand All @@ -434,9 +438,11 @@ func (c *coordinator) collectHotSpotMetrics() {
stat, ok := status.AsLeader[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(stat.TotalKeysRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(0)
}

Expand Down
17 changes: 16 additions & 1 deletion server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math/rand"
"net/http"
"sort"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -70,7 +71,9 @@ const (
// HotWriteRegionType is hot write region scheduler type.
HotWriteRegionType = "hot-write-region"

hotRegionLimitFactor = 0.75
hotRegionLimitFactor = 0.75
minHotScheduleInterval = time.Second
maxHotScheduleInterval = 20 * time.Second
)

type hotScheduler struct {
Expand Down Expand Up @@ -138,6 +141,13 @@ func (h *hotScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.conf.ServeHTTP(w, r)
}

func (h *hotScheduler) GetMinInterval() time.Duration {
return minHotScheduleInterval
}
func (h *hotScheduler) GetNextInterval(interval time.Duration) time.Duration {
return intervalGrow(h.GetMinInterval(), maxHotScheduleInterval, exponentialGrowth)
}

func (h *hotScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
return h.allowBalanceLeader(cluster) || h.allowBalanceRegion(cluster)
}
Expand Down Expand Up @@ -924,6 +934,9 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) {
operator.OpHotRegion,
bs.cur.srcStoreID,
dstPeer)

op.Counters = append(op.Counters, balanceHotRegionCounter.WithLabelValues("move-peer", strconv.FormatUint(bs.cur.srcStoreID, 10)+"-out"))
op.Counters = append(op.Counters, balanceHotRegionCounter.WithLabelValues("move-peer", strconv.FormatUint(dstPeer.GetStoreId(), 10)+"-in"))
case transferLeader:
if bs.cur.region.GetStoreVoter(bs.cur.dstStoreID) == nil {
return nil, nil
Expand All @@ -936,6 +949,8 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) {
bs.cur.srcStoreID,
bs.cur.dstStoreID,
operator.OpHotRegion)
op.Counters = append(op.Counters, balanceHotRegionCounter.WithLabelValues("move-leader", strconv.FormatUint(bs.cur.srcStoreID, 10)+"-out"))
op.Counters = append(op.Counters, balanceHotRegionCounter.WithLabelValues("move-leader", strconv.FormatUint(bs.cur.dstStoreID, 10)+"-in"))
}

if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions server/schedulers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ var balanceRegionCounter = prometheus.NewCounterVec(
Help: "Counter of balance region scheduler.",
}, []string{"type", "address", "store"})

var balanceHotRegionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "scheduler",
Name: "hot_region",
Help: "Counter of hot region scheduler.",
}, []string{"type", "store"})

var balanceDirectionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Expand Down Expand Up @@ -101,6 +109,7 @@ func init() {
prometheus.MustRegister(hotPeerSummary)
prometheus.MustRegister(balanceLeaderCounter)
prometheus.MustRegister(balanceRegionCounter)
prometheus.MustRegister(balanceHotRegionCounter)
prometheus.MustRegister(balanceDirectionCounter)
prometheus.MustRegister(scatterRangeLeaderCounter)
prometheus.MustRegister(scatterRangeRegionCounter)
Expand Down
1 change: 1 addition & 0 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ func (li *storeLoadDetail) toHotPeersStat() *statistics.HotPeersStat {
}
return &statistics.HotPeersStat{
TotalBytesRate: li.LoadPred.Current.ByteRate,
TotalKeysRate: li.LoadPred.Current.KeyRate,
Count: len(li.HotPeers),
Stats: peers,
}
Expand Down
1 change: 1 addition & 0 deletions server/statistics/hot_regions_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package statistics
// HotPeersStat records all hot regions statistics
type HotPeersStat struct {
TotalBytesRate float64 `json:"total_flow_bytes"`
TotalKeysRate float64 `json:"total_flow_keys"`
Count int `json:"regions_count"`
Stats []HotPeerStat `json:"statistics"`
}
3 changes: 3 additions & 0 deletions server/statistics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/v4/server/core"
"go.uber.org/zap"
)

// StoresStats is a cache hold hot regions.
Expand Down Expand Up @@ -298,6 +300,7 @@ func collect(records []*pdpb.RecordPair) float64 {
func (r *RollingStoreStats) Observe(stats *pdpb.StoreStats) {
statInterval := stats.GetInterval()
interval := statInterval.GetEndTimestamp() - statInterval.GetStartTimestamp()
log.Debug("update store stats", zap.Uint64("key-write", stats.KeysWritten), zap.Uint64("bytes-write", stats.BytesWritten), zap.Duration("interval", time.Duration(interval)*time.Second), zap.Uint64("store-id", stats.GetStoreId()))
r.Lock()
defer r.Unlock()
r.bytesWriteRate.Add(float64(stats.BytesWritten), time.Duration(interval)*time.Second)
Expand Down
2 changes: 1 addition & 1 deletion tests/pdctl/hot/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (s *hotTestSuite) TestHot(c *C) {
hotReadRegionID, hotWriteRegionID, hotStoreId := uint64(3), uint64(2), uint64(1)
pdctl.MustPutRegion(c, cluster, hotReadRegionID, hotStoreId, []byte("b"), []byte("c"), core.SetReadBytes(1000000000), core.SetReportInterval(reportInterval))
pdctl.MustPutRegion(c, cluster, hotWriteRegionID, hotStoreId, []byte("c"), []byte("d"), core.SetWrittenBytes(1000000000), core.SetReportInterval(reportInterval))
time.Sleep(3200 * time.Millisecond)
time.Sleep(5000 * time.Millisecond)
testHot(hotReadRegionID, hotStoreId, "read")
testHot(hotWriteRegionID, hotStoreId, "write")
}

0 comments on commit 43c3e95

Please sign in to comment.