From 93c847f1296089e70846b5bc31a91a7e234bfa83 Mon Sep 17 00:00:00 2001 From: rleungx Date: Wed, 13 Feb 2019 20:24:26 +0800 Subject: [PATCH] make tests work Signed-off-by: rleungx --- server/coordinator_test.go | 40 ++++++++++++++++++++--------- server/grpc_service.go | 6 ++--- server/heartbeat_stream_test.go | 1 - server/heartbeat_streams.go | 45 +++++++++++---------------------- server/server.go | 2 +- 5 files changed, 47 insertions(+), 47 deletions(-) diff --git a/server/coordinator_test.go b/server/coordinator_test.go index a695c9f40053..a58766acfb67 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -16,6 +16,7 @@ package server import ( "fmt" "math/rand" + "path/filepath" "time" . "github.com/pingcap/check" @@ -144,7 +145,7 @@ func (s *testCoordinatorSuite) TestBasic(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.clusterInfo.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -201,7 +202,7 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -266,7 +267,7 @@ func (s *testCoordinatorSuite) TestCollectMetrics(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -291,7 +292,7 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) { c.Assert(err, IsNil) cfg.DisableLearner = false tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -350,7 +351,7 @@ func (s *testCoordinatorSuite) TestReplica(c *C) { cfg.RegionScheduleLimit = 0 tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -413,7 +414,7 @@ func (s *testCoordinatorSuite) TestPeerState(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -463,7 +464,7 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -515,7 +516,7 @@ func (s *testCoordinatorSuite) TestAddScheduler(c *C) { cfg.ReplicaScheduleLimit = 0 tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) co.run() @@ -574,7 +575,7 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { cfg.ReplicaScheduleLimit = 0 tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -668,7 +669,7 @@ func (s *testCoordinatorSuite) TestRestart(c *C) { cfg.RegionScheduleLimit = 0 tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() // Add 3 stores (1, 2, 3) and a region with 1 replica on store 1. @@ -763,7 +764,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() c.Assert(tc.addLeaderRegion(1, 1), IsNil) @@ -841,7 +842,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestClusterInfo(opt) - hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo) + hbStreams := getHeartBeatStreams(c, tc) defer hbStreams.Close() co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -928,3 +929,18 @@ func waitNoResponse(c *C, stream *mockHeartbeatStream) { return res == nil }) } + +func getHeartBeatStreams(c *C, tc *testClusterInfo) *heartbeatStreams { + config := NewTestSingleConfig(c) + svr, err := CreateServer(config, nil) + c.Assert(err, IsNil) + kvBase := newEtcdKVBase(svr) + path := filepath.Join(svr.cfg.DataDir, "region-meta") + regionKV, err := core.NewRegionKV(path) + c.Assert(err, IsNil) + svr.kv = core.NewKV(kvBase).SetRegionKV(regionKV) + cluster := newRaftCluster(svr, tc.getClusterID()) + cluster.cachedCluster = tc.clusterInfo + hbStreams := newHeartbeatStreams(tc.getClusterID(), cluster) + return hbStreams +} diff --git a/server/grpc_service.go b/server/grpc_service.go index 6054afa92fa2..9a958a868a80 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -355,19 +355,19 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { region := core.RegionFromHeartbeat(request) if region.GetID() == 0 { msg := fmt.Sprintf("invalid request region, %v", request) - hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeAddress) + hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress) continue } if region.GetLeader() == nil { msg := fmt.Sprintf("invalid request leader, %v", request) - hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeAddress) + hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress) continue } err = cluster.HandleRegionHeartbeat(region) if err != nil { msg := err.Error() - hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeAddress) + hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress) } regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "ok").Inc() diff --git a/server/heartbeat_stream_test.go b/server/heartbeat_stream_test.go index c517cee3c114..3c452062ae58 100644 --- a/server/heartbeat_stream_test.go +++ b/server/heartbeat_stream_test.go @@ -68,7 +68,6 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) { return 0 } } - req := &pdpb.RegionHeartbeatRequest{ Header: newRequestHeader(s.svr.clusterID), Leader: s.region.Peers[0], diff --git a/server/heartbeat_streams.go b/server/heartbeat_streams.go index 1b6b0eef498b..aca1665e6f82 100644 --- a/server/heartbeat_streams.go +++ b/server/heartbeat_streams.go @@ -18,7 +18,6 @@ import ( "sync" "time" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/pkg/logutil" "github.com/pingcap/pd/server/core" @@ -44,11 +43,10 @@ type heartbeatStreams struct { streams map[uint64]heartbeatStream msgCh chan *pdpb.RegionHeartbeatResponse streamCh chan streamUpdate - kv *core.KV - cluster *clusterInfo + cluster *RaftCluster } -func newHeartbeatStreams(clusterID uint64, kv *core.KV, cluster *clusterInfo) *heartbeatStreams { +func newHeartbeatStreams(clusterID uint64, cluster *RaftCluster) *heartbeatStreams { ctx, cancel := context.WithCancel(context.Background()) hs := &heartbeatStreams{ ctx: ctx, @@ -57,7 +55,6 @@ func newHeartbeatStreams(clusterID uint64, kv *core.KV, cluster *clusterInfo) *h streams: make(map[uint64]heartbeatStream), msgCh: make(chan *pdpb.RegionHeartbeatResponse, regionheartbeatSendChanCap), streamCh: make(chan streamUpdate, 1), - kv: kv, cluster: cluster, } hs.wg.Add(1) @@ -80,20 +77,14 @@ func (s *heartbeatStreams) run() { case update := <-s.streamCh: s.streams[update.storeID] = update.stream case msg := <-s.msgCh: - var storeAddress string - store := &metapb.Store{} storeID := msg.GetTargetPeer().GetStoreId() - if s.cluster == nil { - ok, err := s.kv.LoadStore(storeID, store) - if err != nil { - log.Errorf("[region %v] failed to load store %v: %v", msg.RegionId, storeID, err) - } - if ok { - storeAddress = store.GetAddress() - } - } else { - storeAddress = s.cluster.GetStore(storeID).GetAddress() + store, err := s.cluster.GetStore(storeID) + if err != nil { + log.Errorf("[region %v] fail to get store %v: %v", msg.RegionId, storeID, err) + delete(s.streams, storeID) + continue } + storeAddress := store.GetAddress() if stream, ok := s.streams[storeID]; ok { if err := stream.Send(msg); err != nil { log.Errorf("[region %v] send heartbeat message fail: %v", msg.RegionId, err) @@ -108,19 +99,13 @@ func (s *heartbeatStreams) run() { } case <-keepAliveTicker.C: for storeID, stream := range s.streams { - var storeAddress string - store := &metapb.Store{} - if s.cluster == nil { - ok, err := s.kv.LoadStore(storeID, store) - if err != nil { - log.Errorf("[store %v] failed to load store: %v", storeID, err) - } - if ok { - storeAddress = store.GetAddress() - } - } else { - storeAddress = s.cluster.GetStore(storeID).GetAddress() + store, err := s.cluster.GetStore(storeID) + if err != nil { + log.Errorf("[store %v] fail to get store: %v", storeID, err) + delete(s.streams, storeID) + continue } + storeAddress := store.GetAddress() if err := stream.Send(keepAlive); err != nil { log.Errorf("[store %v] send keepalive message fail: %v", storeID, err) delete(s.streams, storeID) @@ -167,7 +152,7 @@ func (s *heartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHear } } -func (s *heartbeatStreams) sendErr(region *core.RegionInfo, errType pdpb.ErrorType, errMsg string, storeAddress string) { +func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, storeAddress string) { regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "err").Inc() msg := &pdpb.RegionHeartbeatResponse{ diff --git a/server/server.go b/server/server.go index 9103be39b200..6d23fe059484 100644 --- a/server/server.go +++ b/server/server.go @@ -221,7 +221,7 @@ func (s *Server) startServer() error { } s.kv = core.NewKV(kvBase).SetRegionKV(regionKV) s.cluster = newRaftCluster(s, s.clusterID) - s.hbStreams = newHeartbeatStreams(s.clusterID, s.kv, s.cluster.cachedCluster) + s.hbStreams = newHeartbeatStreams(s.clusterID, s.cluster) if s.classifier, err = namespace.CreateClassifier(s.cfg.NamespaceClassifier, s.kv, s.idAlloc); err != nil { return err }