Skip to content

Commit

Permalink
*: use address instead of storeID for metrics (#1429)
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored and disksing committed Mar 4, 2019
1 parent 0690cea commit 903e1e8
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 107 deletions.
29 changes: 14 additions & 15 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package server

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -303,46 +302,46 @@ func (c *coordinator) collectHotSpotMetrics() {
stores := c.cluster.GetStores()
status := s.Scheduler.(hasHotStatus).GetHotWriteStatus()
for _, s := range stores {
store := fmt.Sprintf("store_%d", s.GetID())
storeAddress := s.GetAddress()
stat, ok := status.AsPeer[s.GetID()]
if ok {
totalWriteBytes := float64(stat.TotalFlowBytes)
hotWriteRegionCount := float64(stat.RegionsCount)

hotSpotStatusGauge.WithLabelValues(store, "total_written_bytes_as_peer").Set(totalWriteBytes)
hotSpotStatusGauge.WithLabelValues(store, "hot_write_region_as_peer").Set(hotWriteRegionCount)
hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_peer").Set(totalWriteBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_peer").Set(hotWriteRegionCount)
} else {
hotSpotStatusGauge.WithLabelValues(store, "total_written_bytes_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(store, "hot_write_region_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_peer").Set(0)
}

stat, ok = status.AsLeader[s.GetID()]
if ok {
totalWriteBytes := float64(stat.TotalFlowBytes)
hotWriteRegionCount := float64(stat.RegionsCount)

hotSpotStatusGauge.WithLabelValues(store, "total_written_bytes_as_leader").Set(totalWriteBytes)
hotSpotStatusGauge.WithLabelValues(store, "hot_write_region_as_leader").Set(hotWriteRegionCount)
hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_leader").Set(totalWriteBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_leader").Set(hotWriteRegionCount)
} else {
hotSpotStatusGauge.WithLabelValues(store, "total_written_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(store, "hot_write_region_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_leader").Set(0)
}
}

// Collects hot read region metrics.
status = s.Scheduler.(hasHotStatus).GetHotReadStatus()
for _, s := range stores {
store := fmt.Sprintf("store_%d", s.GetID())
storeAddress := s.GetAddress()
stat, ok := status.AsLeader[s.GetID()]
if ok {
totalReadBytes := float64(stat.TotalFlowBytes)
hotReadRegionCount := float64(stat.RegionsCount)

hotSpotStatusGauge.WithLabelValues(store, "total_read_bytes_as_leader").Set(totalReadBytes)
hotSpotStatusGauge.WithLabelValues(store, "hot_read_region_as_leader").Set(hotReadRegionCount)
hotSpotStatusGauge.WithLabelValues(storeAddress, "total_read_bytes_as_leader").Set(totalReadBytes)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_read_region_as_leader").Set(hotReadRegionCount)
} else {
hotSpotStatusGauge.WithLabelValues(store, "total_read_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(store, "hot_read_region_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, "total_read_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_read_region_as_leader").Set(0)
}
}

Expand Down
42 changes: 29 additions & 13 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package server
import (
"fmt"
"math/rand"
"path/filepath"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -144,7 +145,7 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.clusterInfo.getClusterID())
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -201,7 +202,7 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID())
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -266,7 +267,7 @@ func (s *testCoordinatorSuite) TestCollectMetrics(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID())
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand All @@ -291,7 +292,7 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) {
c.Assert(err, IsNil)
cfg.DisableLearner = false
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID())
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -350,7 +351,7 @@ func (s *testCoordinatorSuite) TestReplica(c *C) {
cfg.RegionScheduleLimit = 0

tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID())
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -413,7 +414,7 @@ func (s *testCoordinatorSuite) TestPeerState(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID())
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -463,7 +464,7 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID())
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -512,7 +513,7 @@ func (s *testCoordinatorSuite) TestShouldRunWithNonLeaderRegions(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID())
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -562,7 +563,7 @@ func (s *testCoordinatorSuite) TestAddScheduler(c *C) {
cfg.ReplicaScheduleLimit = 0

tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID())
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()
co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
co.run()
Expand Down Expand Up @@ -621,7 +622,7 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) {
cfg.ReplicaScheduleLimit = 0

tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID())
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -715,7 +716,7 @@ func (s *testCoordinatorSuite) TestRestart(c *C) {
cfg.RegionScheduleLimit = 0

tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID())
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

// Add 3 stores (1, 2, 3) and a region with 1 replica on store 1.
Expand Down Expand Up @@ -810,7 +811,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID())
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

c.Assert(tc.addLeaderRegion(1, 1), IsNil)
Expand Down Expand Up @@ -888,7 +889,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID())
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -975,3 +976,18 @@ func waitNoResponse(c *C, stream *mockHeartbeatStream) {
return res == nil
})
}

func getHeartBeatStreams(c *C, tc *testClusterInfo) *heartbeatStreams {
config := NewTestSingleConfig(c)
svr, err := CreateServer(config, nil)
c.Assert(err, IsNil)
kvBase := newEtcdKVBase(svr)
path := filepath.Join(svr.cfg.DataDir, "region-meta")
regionKV, err := core.NewRegionKV(path)
c.Assert(err, IsNil)
svr.kv = core.NewKV(kvBase).SetRegionKV(regionKV)
cluster := newRaftCluster(svr, tc.getClusterID())
cluster.cachedCluster = tc.clusterInfo
hbStreams := newHeartbeatStreams(tc.getClusterID(), cluster)
return hbStreams
}
21 changes: 12 additions & 9 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"fmt"
"io"
"strconv"
"sync/atomic"
"time"

Expand Down Expand Up @@ -347,40 +346,44 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
}

storeID := request.GetLeader().GetStoreId()
storeLabel := strconv.FormatUint(storeID, 10)
store, err := cluster.GetStore(storeID)
if err != nil {
return err
}
storeAddress := store.GetAddress()

regionHeartbeatCounter.WithLabelValues(storeLabel, "report", "recv").Inc()
regionHeartbeatLatency.WithLabelValues(storeLabel).Observe(float64(time.Now().Unix()) - float64(request.GetInterval().GetEndTimestamp()))
regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "recv").Inc()
regionHeartbeatLatency.WithLabelValues(storeAddress).Observe(float64(time.Now().Unix()) - float64(request.GetInterval().GetEndTimestamp()))

cluster.RLock()
hbStreams := cluster.coordinator.hbStreams
cluster.RUnlock()

if time.Since(lastBind) > s.cfg.heartbeatStreamBindInterval.Duration {
regionHeartbeatCounter.WithLabelValues(storeLabel, "report", "bind").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "bind").Inc()
hbStreams.bindStream(storeID, server)
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request)
if region.GetID() == 0 {
msg := fmt.Sprintf("invalid request region, %v", request)
hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeLabel)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
continue
}
if region.GetLeader() == nil {
msg := fmt.Sprintf("invalid request leader, %v", request)
hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeLabel)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
continue
}

err = cluster.HandleRegionHeartbeat(region)
if err != nil {
msg := err.Error()
hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeLabel)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
}

regionHeartbeatCounter.WithLabelValues(storeLabel, "report", "ok").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "ok").Inc()
}
}

Expand Down
1 change: 0 additions & 1 deletion server/heartbeat_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) {
return 0
}
}

req := &pdpb.RegionHeartbeatRequest{
Header: newRequestHeader(s.svr.clusterID),
Leader: s.region.Peers[0],
Expand Down
42 changes: 30 additions & 12 deletions server/heartbeat_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package server

import (
"context"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -45,9 +44,10 @@ type heartbeatStreams struct {
streams map[uint64]heartbeatStream
msgCh chan *pdpb.RegionHeartbeatResponse
streamCh chan streamUpdate
cluster *RaftCluster
}

func newHeartbeatStreams(clusterID uint64) *heartbeatStreams {
func newHeartbeatStreams(clusterID uint64, cluster *RaftCluster) *heartbeatStreams {
ctx, cancel := context.WithCancel(context.Background())
hs := &heartbeatStreams{
ctx: ctx,
Expand All @@ -56,6 +56,7 @@ func newHeartbeatStreams(clusterID uint64) *heartbeatStreams {
streams: make(map[uint64]heartbeatStream),
msgCh: make(chan *pdpb.RegionHeartbeatResponse, regionheartbeatSendChanCap),
streamCh: make(chan streamUpdate, 1),
cluster: cluster,
}
hs.wg.Add(1)
go hs.run()
Expand All @@ -78,31 +79,48 @@ func (s *heartbeatStreams) run() {
s.streams[update.storeID] = update.stream
case msg := <-s.msgCh:
storeID := msg.GetTargetPeer().GetStoreId()
storeLabel := strconv.FormatUint(storeID, 10)
store, err := s.cluster.GetStore(storeID)
if err != nil {
log.Error("fail to get store",
zap.Uint64("region-id", msg.RegionId),
zap.Uint64("store-id", storeID),
zap.Error(err))
delete(s.streams, storeID)
continue
}
storeAddress := store.GetAddress()
if stream, ok := s.streams[storeID]; ok {
if err := stream.Send(msg); err != nil {
log.Error("send heartbeat message fail",
zap.Uint64("region-id", msg.RegionId), zap.Error(err))
delete(s.streams, storeID)
regionHeartbeatCounter.WithLabelValues(storeLabel, "push", "err").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, "push", "err").Inc()
} else {
regionHeartbeatCounter.WithLabelValues(storeLabel, "push", "ok").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, "push", "ok").Inc()
}
} else {
log.Debug("heartbeat stream not found, skip send message", zap.Uint64("region-id", msg.RegionId), zap.Uint64("store-id", storeID))
regionHeartbeatCounter.WithLabelValues(storeLabel, "push", "skip").Inc()
log.Debug("heartbeat stream not found, skip send message",
zap.Uint64("region-id", msg.RegionId),
zap.Uint64("store-id", storeID))
regionHeartbeatCounter.WithLabelValues(storeAddress, "push", "skip").Inc()
}
case <-keepAliveTicker.C:
for storeID, stream := range s.streams {
storeLabel := strconv.FormatUint(storeID, 10)
store, err := s.cluster.GetStore(storeID)
if err != nil {
log.Error("fail to get store", zap.Uint64("store-id", storeID), zap.Error(err))
delete(s.streams, storeID)
continue
}
storeAddress := store.GetAddress()
if err := stream.Send(keepAlive); err != nil {
log.Error("send keepalive message fail",
zap.Uint64("target-store-id", storeID),
zap.Error(err))
delete(s.streams, storeID)
regionHeartbeatCounter.WithLabelValues(storeLabel, "keepalive", "err").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, "keepalive", "err").Inc()
} else {
regionHeartbeatCounter.WithLabelValues(storeLabel, "keepalive", "ok").Inc()
regionHeartbeatCounter.WithLabelValues(storeAddress, "keepalive", "ok").Inc()
}
}
case <-s.ctx.Done():
Expand Down Expand Up @@ -143,8 +161,8 @@ func (s *heartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHear
}
}

func (s *heartbeatStreams) sendErr(region *core.RegionInfo, errType pdpb.ErrorType, errMsg string, storeLabel string) {
regionHeartbeatCounter.WithLabelValues(storeLabel, "report", "err").Inc()
func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, storeAddress string) {
regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "err").Inc()

msg := &pdpb.RegionHeartbeatResponse{
Header: &pdpb.ResponseHeader{
Expand Down
Loading

0 comments on commit 903e1e8

Please sign in to comment.