diff --git a/server/coordinator_test.go b/server/coordinator_test.go index a695c9f40053..5d379d006e8e 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -144,7 +144,7 @@ func (s *testCoordinatorSuite) TestBasic(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.clusterInfo.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := newHeartbeatStreams(tc.clusterInfo.getClusterID(), tc.clusterInfo) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -201,7 +201,7 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -266,7 +266,7 @@ func (s *testCoordinatorSuite) TestCollectMetrics(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -291,7 +291,7 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) { c.Assert(err, IsNil) cfg.DisableLearner = false tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -350,7 +350,7 @@ func (s *testCoordinatorSuite) TestReplica(c *C) { cfg.RegionScheduleLimit = 0 tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -413,7 +413,7 @@ func (s *testCoordinatorSuite) TestPeerState(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -463,7 +463,7 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -515,7 +515,7 @@ func (s *testCoordinatorSuite) TestAddScheduler(c *C) { cfg.ReplicaScheduleLimit = 0 tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) co.run() @@ -574,7 +574,7 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { cfg.ReplicaScheduleLimit = 0 tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -668,7 +668,7 @@ func (s *testCoordinatorSuite) TestRestart(c *C) { cfg.RegionScheduleLimit = 0 tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo) defer hbStreams.Close() // Add 3 stores (1, 2, 3) and a region with 1 replica on store 1. @@ -763,7 +763,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo) defer hbStreams.Close() c.Assert(tc.addLeaderRegion(1, 1), IsNil) @@ -841,7 +841,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.clusterInfo) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) diff --git a/server/grpc_service.go b/server/grpc_service.go index 6054afa92fa2..9a958a868a80 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -355,19 +355,19 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { region := core.RegionFromHeartbeat(request) if region.GetID() == 0 { msg := fmt.Sprintf("invalid request region, %v", request) - hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeAddress) + hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress) continue } if region.GetLeader() == nil { msg := fmt.Sprintf("invalid request leader, %v", request) - hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeAddress) + hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress) continue } err = cluster.HandleRegionHeartbeat(region) if err != nil { msg := err.Error() - hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeAddress) + hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress) } regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "ok").Inc() diff --git a/server/heartbeat_stream_test.go b/server/heartbeat_stream_test.go index c517cee3c114..3c452062ae58 100644 --- a/server/heartbeat_stream_test.go +++ b/server/heartbeat_stream_test.go @@ -68,7 +68,6 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) { return 0 } } - req := &pdpb.RegionHeartbeatRequest{ Header: newRequestHeader(s.svr.clusterID), Leader: s.region.Peers[0], diff --git a/server/heartbeat_streams.go b/server/heartbeat_streams.go index 1b6b0eef498b..fdc4b490d8a6 100644 --- a/server/heartbeat_streams.go +++ b/server/heartbeat_streams.go @@ -18,7 +18,6 @@ import ( "sync" "time" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/pkg/logutil" "github.com/pingcap/pd/server/core" @@ -44,11 +43,10 @@ type heartbeatStreams struct { streams map[uint64]heartbeatStream msgCh chan *pdpb.RegionHeartbeatResponse streamCh chan streamUpdate - kv *core.KV cluster *clusterInfo } -func newHeartbeatStreams(clusterID uint64, kv *core.KV, cluster *clusterInfo) *heartbeatStreams { +func newHeartbeatStreams(clusterID uint64, cluster *clusterInfo) *heartbeatStreams { ctx, cancel := context.WithCancel(context.Background()) hs := &heartbeatStreams{ ctx: ctx, @@ -57,7 +55,6 @@ func newHeartbeatStreams(clusterID uint64, kv *core.KV, cluster *clusterInfo) *h streams: make(map[uint64]heartbeatStream), msgCh: make(chan *pdpb.RegionHeartbeatResponse, regionheartbeatSendChanCap), streamCh: make(chan streamUpdate, 1), - kv: kv, cluster: cluster, } hs.wg.Add(1) @@ -80,17 +77,10 @@ func (s *heartbeatStreams) run() { case update := <-s.streamCh: s.streams[update.storeID] = update.stream case msg := <-s.msgCh: - var storeAddress string - store := &metapb.Store{} storeID := msg.GetTargetPeer().GetStoreId() + var storeAddress string if s.cluster == nil { - ok, err := s.kv.LoadStore(storeID, store) - if err != nil { - log.Errorf("[region %v] failed to load store %v: %v", msg.RegionId, storeID, err) - } - if ok { - storeAddress = store.GetAddress() - } + continue } else { storeAddress = s.cluster.GetStore(storeID).GetAddress() } @@ -109,15 +99,8 @@ func (s *heartbeatStreams) run() { case <-keepAliveTicker.C: for storeID, stream := range s.streams { var storeAddress string - store := &metapb.Store{} if s.cluster == nil { - ok, err := s.kv.LoadStore(storeID, store) - if err != nil { - log.Errorf("[store %v] failed to load store: %v", storeID, err) - } - if ok { - storeAddress = store.GetAddress() - } + continue } else { storeAddress = s.cluster.GetStore(storeID).GetAddress() } @@ -167,7 +150,7 @@ func (s *heartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHear } } -func (s *heartbeatStreams) sendErr(region *core.RegionInfo, errType pdpb.ErrorType, errMsg string, storeAddress string) { +func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, storeAddress string) { regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "err").Inc() msg := &pdpb.RegionHeartbeatResponse{ diff --git a/server/server.go b/server/server.go index 9103be39b200..bb8568618d86 100644 --- a/server/server.go +++ b/server/server.go @@ -221,7 +221,7 @@ func (s *Server) startServer() error { } s.kv = core.NewKV(kvBase).SetRegionKV(regionKV) s.cluster = newRaftCluster(s, s.clusterID) - s.hbStreams = newHeartbeatStreams(s.clusterID, s.kv, s.cluster.cachedCluster) + s.hbStreams = newHeartbeatStreams(s.clusterID, s.cluster.cachedCluster) if s.classifier, err = namespace.CreateClassifier(s.cfg.NamespaceClassifier, s.kv, s.idAlloc); err != nil { return err }