diff --git a/server/coordinator.go b/server/coordinator.go index 9f5f0647af0..b228a01b201 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -15,7 +15,6 @@ package server import ( "context" - "fmt" "sync" "time" @@ -303,17 +302,17 @@ func (c *coordinator) collectHotSpotMetrics() { stores := c.cluster.GetStores() status := s.Scheduler.(hasHotStatus).GetHotWriteStatus() for _, s := range stores { - store := fmt.Sprintf("store_%d", s.GetID()) + storeAddress := s.GetAddress() stat, ok := status.AsPeer[s.GetID()] if ok { totalWriteBytes := float64(stat.TotalFlowBytes) hotWriteRegionCount := float64(stat.RegionsCount) - hotSpotStatusGauge.WithLabelValues(store, "total_written_bytes_as_peer").Set(totalWriteBytes) - hotSpotStatusGauge.WithLabelValues(store, "hot_write_region_as_peer").Set(hotWriteRegionCount) + hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_peer").Set(totalWriteBytes) + hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_peer").Set(hotWriteRegionCount) } else { - hotSpotStatusGauge.WithLabelValues(store, "total_written_bytes_as_peer").Set(0) - hotSpotStatusGauge.WithLabelValues(store, "hot_write_region_as_peer").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_peer").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_peer").Set(0) } stat, ok = status.AsLeader[s.GetID()] @@ -321,28 +320,28 @@ func (c *coordinator) collectHotSpotMetrics() { totalWriteBytes := float64(stat.TotalFlowBytes) hotWriteRegionCount := float64(stat.RegionsCount) - hotSpotStatusGauge.WithLabelValues(store, "total_written_bytes_as_leader").Set(totalWriteBytes) - hotSpotStatusGauge.WithLabelValues(store, "hot_write_region_as_leader").Set(hotWriteRegionCount) + hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_leader").Set(totalWriteBytes) + hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_leader").Set(hotWriteRegionCount) } else { - hotSpotStatusGauge.WithLabelValues(store, "total_written_bytes_as_leader").Set(0) - hotSpotStatusGauge.WithLabelValues(store, "hot_write_region_as_leader").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, "total_written_bytes_as_leader").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_write_region_as_leader").Set(0) } } // Collects hot read region metrics. status = s.Scheduler.(hasHotStatus).GetHotReadStatus() for _, s := range stores { - store := fmt.Sprintf("store_%d", s.GetID()) + storeAddress := s.GetAddress() stat, ok := status.AsLeader[s.GetID()] if ok { totalReadBytes := float64(stat.TotalFlowBytes) hotReadRegionCount := float64(stat.RegionsCount) - hotSpotStatusGauge.WithLabelValues(store, "total_read_bytes_as_leader").Set(totalReadBytes) - hotSpotStatusGauge.WithLabelValues(store, "hot_read_region_as_leader").Set(hotReadRegionCount) + hotSpotStatusGauge.WithLabelValues(storeAddress, "total_read_bytes_as_leader").Set(totalReadBytes) + hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_read_region_as_leader").Set(hotReadRegionCount) } else { - hotSpotStatusGauge.WithLabelValues(store, "total_read_bytes_as_leader").Set(0) - hotSpotStatusGauge.WithLabelValues(store, "hot_read_region_as_leader").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, "total_read_bytes_as_leader").Set(0) + hotSpotStatusGauge.WithLabelValues(storeAddress, "hot_read_region_as_leader").Set(0) } } diff --git a/server/coordinator_test.go b/server/coordinator_test.go index b9d31765e0a..9e3ef73ce30 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -16,6 +16,7 @@ package server import ( "fmt" "math/rand" + "path/filepath" "time" . "github.com/pingcap/check" @@ -144,7 +145,7 @@ func (s *testCoordinatorSuite) TestBasic(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.clusterInfo.getClusterID()) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -201,7 +202,7 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID()) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -266,7 +267,7 @@ func (s *testCoordinatorSuite) TestCollectMetrics(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID()) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -291,7 +292,7 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) { c.Assert(err, IsNil) cfg.DisableLearner = false tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID()) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -350,7 +351,7 @@ func (s *testCoordinatorSuite) TestReplica(c *C) { cfg.RegionScheduleLimit = 0 tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID()) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -413,7 +414,7 @@ func (s *testCoordinatorSuite) TestPeerState(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID()) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -463,7 +464,7 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID()) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -512,7 +513,7 @@ func (s *testCoordinatorSuite) TestShouldRunWithNonLeaderRegions(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID()) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -562,7 +563,7 @@ func (s *testCoordinatorSuite) TestAddScheduler(c *C) { cfg.ReplicaScheduleLimit = 0 tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID()) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) co.run() @@ -621,7 +622,7 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { cfg.ReplicaScheduleLimit = 0 tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID()) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -715,7 +716,7 @@ func (s *testCoordinatorSuite) TestRestart(c *C) { cfg.RegionScheduleLimit = 0 tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID()) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() // Add 3 stores (1, 2, 3) and a region with 1 replica on store 1. @@ -810,7 +811,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID()) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() c.Assert(tc.addLeaderRegion(1, 1), IsNil) @@ -888,7 +889,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID()) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -975,3 +976,18 @@ func waitNoResponse(c *C, stream *mockHeartbeatStream) { return res == nil }) } + +func getHeartBeatStreams(c *C, tc *testClusterInfo) *heartbeatStreams { + config := NewTestSingleConfig(c) + svr, err := CreateServer(config, nil) + c.Assert(err, IsNil) + kvBase := newEtcdKVBase(svr) + path := filepath.Join(svr.cfg.DataDir, "region-meta") + regionKV, err := core.NewRegionKV(path) + c.Assert(err, IsNil) + svr.kv = core.NewKV(kvBase).SetRegionKV(regionKV) + cluster := newRaftCluster(svr, tc.getClusterID()) + cluster.cachedCluster = tc.clusterInfo + hbStreams := newHeartbeatStreams(tc.getClusterID(), cluster) + return hbStreams +} diff --git a/server/grpc_service.go b/server/grpc_service.go index 2663d7b6ed3..b0e343e5398 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -17,7 +17,6 @@ import ( "context" "fmt" "io" - "strconv" "sync/atomic" "time" @@ -347,17 +346,21 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { } storeID := request.GetLeader().GetStoreId() - storeLabel := strconv.FormatUint(storeID, 10) + store, err := cluster.GetStore(storeID) + if err != nil { + return err + } + storeAddress := store.GetAddress() - regionHeartbeatCounter.WithLabelValues(storeLabel, "report", "recv").Inc() - regionHeartbeatLatency.WithLabelValues(storeLabel).Observe(float64(time.Now().Unix()) - float64(request.GetInterval().GetEndTimestamp())) + regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "recv").Inc() + regionHeartbeatLatency.WithLabelValues(storeAddress).Observe(float64(time.Now().Unix()) - float64(request.GetInterval().GetEndTimestamp())) cluster.RLock() hbStreams := cluster.coordinator.hbStreams cluster.RUnlock() if time.Since(lastBind) > s.cfg.heartbeatStreamBindInterval.Duration { - regionHeartbeatCounter.WithLabelValues(storeLabel, "report", "bind").Inc() + regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "bind").Inc() hbStreams.bindStream(storeID, server) lastBind = time.Now() } @@ -365,22 +368,22 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { region := core.RegionFromHeartbeat(request) if region.GetID() == 0 { msg := fmt.Sprintf("invalid request region, %v", request) - hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeLabel) + hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress) continue } if region.GetLeader() == nil { msg := fmt.Sprintf("invalid request leader, %v", request) - hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeLabel) + hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress) continue } err = cluster.HandleRegionHeartbeat(region) if err != nil { msg := err.Error() - hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeLabel) + hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress) } - regionHeartbeatCounter.WithLabelValues(storeLabel, "report", "ok").Inc() + regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "ok").Inc() } } diff --git a/server/heartbeat_stream_test.go b/server/heartbeat_stream_test.go index 48784180c6c..8979ee7ad54 100644 --- a/server/heartbeat_stream_test.go +++ b/server/heartbeat_stream_test.go @@ -69,7 +69,6 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) { return 0 } } - req := &pdpb.RegionHeartbeatRequest{ Header: newRequestHeader(s.svr.clusterID), Leader: s.region.Peers[0], diff --git a/server/heartbeat_streams.go b/server/heartbeat_streams.go index c1bcd2ad835..976a16cd4df 100644 --- a/server/heartbeat_streams.go +++ b/server/heartbeat_streams.go @@ -15,7 +15,6 @@ package server import ( "context" - "strconv" "sync" "time" @@ -45,9 +44,10 @@ type heartbeatStreams struct { streams map[uint64]heartbeatStream msgCh chan *pdpb.RegionHeartbeatResponse streamCh chan streamUpdate + cluster *RaftCluster } -func newHeartbeatStreams(clusterID uint64) *heartbeatStreams { +func newHeartbeatStreams(clusterID uint64, cluster *RaftCluster) *heartbeatStreams { ctx, cancel := context.WithCancel(context.Background()) hs := &heartbeatStreams{ ctx: ctx, @@ -56,6 +56,7 @@ func newHeartbeatStreams(clusterID uint64) *heartbeatStreams { streams: make(map[uint64]heartbeatStream), msgCh: make(chan *pdpb.RegionHeartbeatResponse, regionheartbeatSendChanCap), streamCh: make(chan streamUpdate, 1), + cluster: cluster, } hs.wg.Add(1) go hs.run() @@ -78,31 +79,48 @@ func (s *heartbeatStreams) run() { s.streams[update.storeID] = update.stream case msg := <-s.msgCh: storeID := msg.GetTargetPeer().GetStoreId() - storeLabel := strconv.FormatUint(storeID, 10) + store, err := s.cluster.GetStore(storeID) + if err != nil { + log.Error("fail to get store", + zap.Uint64("region-id", msg.RegionId), + zap.Uint64("store-id", storeID), + zap.Error(err)) + delete(s.streams, storeID) + continue + } + storeAddress := store.GetAddress() if stream, ok := s.streams[storeID]; ok { if err := stream.Send(msg); err != nil { log.Error("send heartbeat message fail", zap.Uint64("region-id", msg.RegionId), zap.Error(err)) delete(s.streams, storeID) - regionHeartbeatCounter.WithLabelValues(storeLabel, "push", "err").Inc() + regionHeartbeatCounter.WithLabelValues(storeAddress, "push", "err").Inc() } else { - regionHeartbeatCounter.WithLabelValues(storeLabel, "push", "ok").Inc() + regionHeartbeatCounter.WithLabelValues(storeAddress, "push", "ok").Inc() } } else { - log.Debug("heartbeat stream not found, skip send message", zap.Uint64("region-id", msg.RegionId), zap.Uint64("store-id", storeID)) - regionHeartbeatCounter.WithLabelValues(storeLabel, "push", "skip").Inc() + log.Debug("heartbeat stream not found, skip send message", + zap.Uint64("region-id", msg.RegionId), + zap.Uint64("store-id", storeID)) + regionHeartbeatCounter.WithLabelValues(storeAddress, "push", "skip").Inc() } case <-keepAliveTicker.C: for storeID, stream := range s.streams { - storeLabel := strconv.FormatUint(storeID, 10) + store, err := s.cluster.GetStore(storeID) + if err != nil { + log.Error("fail to get store", zap.Uint64("store-id", storeID), zap.Error(err)) + delete(s.streams, storeID) + continue + } + storeAddress := store.GetAddress() if err := stream.Send(keepAlive); err != nil { log.Error("send keepalive message fail", zap.Uint64("target-store-id", storeID), zap.Error(err)) delete(s.streams, storeID) - regionHeartbeatCounter.WithLabelValues(storeLabel, "keepalive", "err").Inc() + regionHeartbeatCounter.WithLabelValues(storeAddress, "keepalive", "err").Inc() } else { - regionHeartbeatCounter.WithLabelValues(storeLabel, "keepalive", "ok").Inc() + regionHeartbeatCounter.WithLabelValues(storeAddress, "keepalive", "ok").Inc() } } case <-s.ctx.Done(): @@ -143,8 +161,8 @@ func (s *heartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHear } } -func (s *heartbeatStreams) sendErr(region *core.RegionInfo, errType pdpb.ErrorType, errMsg string, storeLabel string) { - regionHeartbeatCounter.WithLabelValues(storeLabel, "report", "err").Inc() +func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, storeAddress string) { + regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "err").Inc() msg := &pdpb.RegionHeartbeatResponse{ Header: &pdpb.ResponseHeader{ diff --git a/server/metrics.go b/server/metrics.go index a5a04c9b234..dc0916b31d6 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -95,7 +95,7 @@ var ( Subsystem: "scheduler", Name: "region_heartbeat", Help: "Counter of region hearbeat.", - }, []string{"store", "type", "status"}) + }, []string{"address", "type", "status"}) regionHeartbeatLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -104,7 +104,7 @@ var ( Name: "region_heartbeat_latency_seconds", Help: "Bucketed histogram of latency (s) of receiving heartbeat.", Buckets: prometheus.ExponentialBuckets(1, 2, 12), - }, []string{"store"}) + }, []string{"address"}) storeStatusGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -112,7 +112,7 @@ var ( Subsystem: "scheduler", Name: "store_status", Help: "Store status for schedule", - }, []string{"namespace", "store", "type"}) + }, []string{"namespace", "address", "type"}) hotSpotStatusGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -120,7 +120,7 @@ var ( Subsystem: "hotspot", Name: "status", Help: "Status of the hotspot.", - }, []string{"store", "type"}) + }, []string{"address", "type"}) tsoCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ diff --git a/server/schedule/filters.go b/server/schedule/filters.go index eb25db0dc23..5cc273518c3 100644 --- a/server/schedule/filters.go +++ b/server/schedule/filters.go @@ -14,8 +14,6 @@ package schedule import ( - "fmt" - "github.com/pingcap/pd/server/cache" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" @@ -34,10 +32,10 @@ type Filter interface { // FilterSource checks if store can pass all Filters as source store. func FilterSource(opt Options, store *core.StoreInfo, filters []Filter) bool { - storeID := fmt.Sprintf("store%d", store.GetID()) + storeAddress := store.GetAddress() for _, filter := range filters { if filter.FilterSource(opt, store) { - filterCounter.WithLabelValues("filter-source", storeID, filter.Type()).Inc() + filterCounter.WithLabelValues("filter-source", storeAddress, filter.Type()).Inc() return true } } @@ -46,10 +44,10 @@ func FilterSource(opt Options, store *core.StoreInfo, filters []Filter) bool { // FilterTarget checks if store can pass all Filters as target store. func FilterTarget(opt Options, store *core.StoreInfo, filters []Filter) bool { - storeID := fmt.Sprintf("store%d", store.GetID()) + storeAddress := store.GetAddress() for _, filter := range filters { if filter.FilterTarget(opt, store) { - filterCounter.WithLabelValues("filter-target", storeID, filter.Type()).Inc() + filterCounter.WithLabelValues("filter-target", storeAddress, filter.Type()).Inc() return true } } diff --git a/server/schedule/metrics.go b/server/schedule/metrics.go index 6c9c2c597e3..d3d3cb9a5d6 100644 --- a/server/schedule/metrics.go +++ b/server/schedule/metrics.go @@ -47,7 +47,7 @@ var ( Subsystem: "schedule", Name: "filter", Help: "Counter of the filter", - }, []string{"action", "store", "type"}) + }, []string{"action", "address", "type"}) operatorCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index ca059a96faa..8540eb2eb7a 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -14,9 +14,6 @@ package schedulers import ( - "fmt" - "strconv" - log "github.com/pingcap/log" "github.com/pingcap/pd/server/cache" "github.com/pingcap/pd/server/core" @@ -92,28 +89,28 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*schedule. } log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("max-store", source.GetID()), zap.Uint64("min-store", target.GetID())) - sourceStoreLabel := strconv.FormatUint(source.GetID(), 10) - targetStoreLabel := strconv.FormatUint(target.GetID(), 10) - balanceLeaderCounter.WithLabelValues("high_score", sourceStoreLabel).Inc() - balanceLeaderCounter.WithLabelValues("low_score", targetStoreLabel).Inc() + sourceAddress := source.GetAddress() + targetAddress := target.GetAddress() + balanceLeaderCounter.WithLabelValues("high_score", sourceAddress).Inc() + balanceLeaderCounter.WithLabelValues("low_score", targetAddress).Inc() opInfluence := l.opController.GetOpInfluence(cluster) for i := 0; i < balanceLeaderRetryLimit; i++ { if op := l.transferLeaderOut(source, cluster, opInfluence); op != nil { - balanceLeaderCounter.WithLabelValues("transfer_out", sourceStoreLabel).Inc() + balanceLeaderCounter.WithLabelValues("transfer_out", sourceAddress).Inc() return op } if op := l.transferLeaderIn(target, cluster, opInfluence); op != nil { - balanceLeaderCounter.WithLabelValues("transfer_in", targetStoreLabel).Inc() + balanceLeaderCounter.WithLabelValues("transfer_in", targetAddress).Inc() return op } } // If no operator can be created for the selected stores, ignore them for a while. log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", source.GetID()), zap.Uint64("target", target.GetID())) - balanceLeaderCounter.WithLabelValues("add_taint", strconv.FormatUint(source.GetID(), 10)).Inc() + balanceLeaderCounter.WithLabelValues("add_taint", sourceAddress).Inc() l.taintStores.Put(source.GetID()) - balanceLeaderCounter.WithLabelValues("add_taint", strconv.FormatUint(target.GetID(), 10)).Inc() + balanceLeaderCounter.WithLabelValues("add_taint", targetAddress).Inc() l.taintStores.Put(target.GetID()) return nil } @@ -180,8 +177,8 @@ func (l *balanceLeaderScheduler) createOperator(region *core.RegionInfo, source, } schedulerCounter.WithLabelValues(l.GetName(), "new_operator").Inc() - balanceLeaderCounter.WithLabelValues("move_leader", fmt.Sprintf("store%d-out", source.GetID())).Inc() - balanceLeaderCounter.WithLabelValues("move_leader", fmt.Sprintf("store%d-in", target.GetID())).Inc() + balanceLeaderCounter.WithLabelValues("move_leader", source.GetAddress()+"-out").Inc() + balanceLeaderCounter.WithLabelValues("move_leader", target.GetAddress()+"-in").Inc() step := schedule.TransferLeader{FromStore: region.GetLeader().GetStoreId(), ToStore: target.GetID()} op := schedule.NewOperator("balance-leader", region.GetID(), region.GetRegionEpoch(), schedule.OpBalance|schedule.OpLeader, step) return []*schedule.Operator{op} diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index 99d002392b6..7a6f9ad3952 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -14,9 +14,6 @@ package schedulers import ( - "fmt" - "strconv" - "github.com/pingcap/kvproto/pkg/metapb" log "github.com/pingcap/log" "github.com/pingcap/pd/server/cache" @@ -87,8 +84,8 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule. } log.Debug("store has the max region score", zap.String("scheduler", s.GetName()), zap.Uint64("store-id", source.GetID())) - sourceLabel := strconv.FormatUint(source.GetID(), 10) - balanceRegionCounter.WithLabelValues("source_store", sourceLabel).Inc() + sourceAddress := source.GetAddress() + balanceRegionCounter.WithLabelValues("source_store", sourceAddress).Inc() opInfluence := s.opController.GetOpInfluence(cluster) var hasPotentialTarget bool @@ -134,7 +131,7 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule. if !hasPotentialTarget { // If no potential target store can be found for the selected store, ignore it for a while. log.Debug("no operator created for selected store", zap.String("scheduler", s.GetName()), zap.Uint64("store-id", source.GetID())) - balanceRegionCounter.WithLabelValues("add_taint", sourceLabel).Inc() + balanceRegionCounter.WithLabelValues("add_taint", sourceAddress).Inc() s.taintStores.Put(source.GetID()) } @@ -175,8 +172,8 @@ func (s *balanceRegionScheduler) transferPeer(cluster schedule.Cluster, region * schedulerCounter.WithLabelValues(s.GetName(), "no_peer").Inc() return nil } - balanceRegionCounter.WithLabelValues("move_peer", fmt.Sprintf("store%d-out", source.GetID())).Inc() - balanceRegionCounter.WithLabelValues("move_peer", fmt.Sprintf("store%d-in", target.GetID())).Inc() + balanceRegionCounter.WithLabelValues("move_peer", source.GetAddress()+"-out").Inc() + balanceRegionCounter.WithLabelValues("move_peer", target.GetAddress()+"-in").Inc() return schedule.CreateMovePeerOperator("balance-region", cluster, region, schedule.OpBalance, oldPeer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId()) } diff --git a/server/schedulers/metrics.go b/server/schedulers/metrics.go index 2830b3c1361..4d5370c2d4f 100644 --- a/server/schedulers/metrics.go +++ b/server/schedulers/metrics.go @@ -37,7 +37,7 @@ var balanceLeaderCounter = prometheus.NewCounterVec( Subsystem: "scheduler", Name: "balance_leader", Help: "Counter of balance leader scheduler.", - }, []string{"type", "store"}) + }, []string{"type", "address"}) var balanceRegionCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -45,7 +45,7 @@ var balanceRegionCounter = prometheus.NewCounterVec( Subsystem: "scheduler", Name: "balance_region", Help: "Counter of balance region scheduler.", - }, []string{"type", "store"}) + }, []string{"type", "address"}) func init() { prometheus.MustRegister(schedulerCounter) diff --git a/server/server.go b/server/server.go index 790b63c2b28..8a32e5ca062 100644 --- a/server/server.go +++ b/server/server.go @@ -228,7 +228,7 @@ func (s *Server) startServer() error { } s.kv = core.NewKV(kvBase).SetRegionKV(regionKV) s.cluster = newRaftCluster(s, s.clusterID) - s.hbStreams = newHeartbeatStreams(s.clusterID) + s.hbStreams = newHeartbeatStreams(s.clusterID, s.cluster) if s.classifier, err = namespace.CreateClassifier(s.cfg.NamespaceClassifier, s.kv, s.idAlloc); err != nil { return err } diff --git a/server/store_statistics.go b/server/store_statistics.go index b3b525e49bc..7783f9e9fb9 100644 --- a/server/store_statistics.go +++ b/server/store_statistics.go @@ -15,7 +15,6 @@ package server import ( "fmt" - "strconv" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/server/core" @@ -61,7 +60,7 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) { key := fmt.Sprintf("%s:%s", k, v) s.LabelCounter[key]++ } - id := strconv.FormatUint(store.GetID(), 10) + storeAddress := store.GetAddress() // Store state. switch store.GetState() { case metapb.StoreState_Up: @@ -78,7 +77,7 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) { s.Offline++ case metapb.StoreState_Tombstone: s.Tombstone++ - s.resetStoreStatistics(id) + s.resetStoreStatistics(storeAddress) return } if store.IsLowSpace(s.opt.GetLowSpaceRatio()) { @@ -91,15 +90,15 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) { s.RegionCount += store.GetRegionCount() s.LeaderCount += store.GetLeaderCount() - storeStatusGauge.WithLabelValues(s.namespace, id, "region_score").Set(store.RegionScore(s.opt.GetHighSpaceRatio(), s.opt.GetLowSpaceRatio(), 0)) - storeStatusGauge.WithLabelValues(s.namespace, id, "leader_score").Set(store.LeaderScore(0)) - storeStatusGauge.WithLabelValues(s.namespace, id, "region_size").Set(float64(store.GetRegionSize())) - storeStatusGauge.WithLabelValues(s.namespace, id, "region_count").Set(float64(store.GetRegionCount())) - storeStatusGauge.WithLabelValues(s.namespace, id, "leader_size").Set(float64(store.GetLeaderSize())) - storeStatusGauge.WithLabelValues(s.namespace, id, "leader_count").Set(float64(store.GetLeaderCount())) - storeStatusGauge.WithLabelValues(s.namespace, id, "store_available").Set(float64(store.GetAvailable())) - storeStatusGauge.WithLabelValues(s.namespace, id, "store_used").Set(float64(store.GetUsedSize())) - storeStatusGauge.WithLabelValues(s.namespace, id, "store_capacity").Set(float64(store.GetCapacity())) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "region_score").Set(store.RegionScore(s.opt.GetHighSpaceRatio(), s.opt.GetLowSpaceRatio(), 0)) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "leader_score").Set(store.LeaderScore(0)) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "region_size").Set(float64(store.GetRegionSize())) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "region_count").Set(float64(store.GetRegionCount())) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "leader_size").Set(float64(store.GetLeaderSize())) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "leader_count").Set(float64(store.GetLeaderCount())) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "store_available").Set(float64(store.GetAvailable())) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "store_used").Set(float64(store.GetUsedSize())) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "store_capacity").Set(float64(store.GetCapacity())) } func (s *storeStatistics) Collect() { @@ -163,16 +162,16 @@ func (s *storeStatistics) Collect() { } } -func (s *storeStatistics) resetStoreStatistics(id string) { - storeStatusGauge.WithLabelValues(s.namespace, id, "region_score").Set(0) - storeStatusGauge.WithLabelValues(s.namespace, id, "leader_score").Set(0) - storeStatusGauge.WithLabelValues(s.namespace, id, "region_size").Set(0) - storeStatusGauge.WithLabelValues(s.namespace, id, "region_count").Set(0) - storeStatusGauge.WithLabelValues(s.namespace, id, "leader_size").Set(0) - storeStatusGauge.WithLabelValues(s.namespace, id, "leader_count").Set(0) - storeStatusGauge.WithLabelValues(s.namespace, id, "store_available").Set(0) - storeStatusGauge.WithLabelValues(s.namespace, id, "store_used").Set(0) - storeStatusGauge.WithLabelValues(s.namespace, id, "store_capacity").Set(0) +func (s *storeStatistics) resetStoreStatistics(storeAddress string) { + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "region_score").Set(0) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "leader_score").Set(0) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "region_size").Set(0) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "region_count").Set(0) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "leader_size").Set(0) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "leader_count").Set(0) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "store_available").Set(0) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "store_used").Set(0) + storeStatusGauge.WithLabelValues(s.namespace, storeAddress, "store_capacity").Set(0) } type storeStatisticsMap struct {