Skip to content

Commit

Permalink
make tests work
Browse files Browse the repository at this point in the history
Signed-off-by: rleungx <[email protected]>
  • Loading branch information
rleungx committed Feb 13, 2019
1 parent 3150ede commit 93c847f
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 47 deletions.
40 changes: 28 additions & 12 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package server
import (
"fmt"
"math/rand"
"path/filepath"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -144,7 +145,7 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.clusterInfo.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -201,7 +202,7 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -266,7 +267,7 @@ func (s *testCoordinatorSuite) TestCollectMetrics(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand All @@ -291,7 +292,7 @@ func (s *testCoordinatorSuite) TestCheckRegion(c *C) {
c.Assert(err, IsNil)
cfg.DisableLearner = false
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -350,7 +351,7 @@ func (s *testCoordinatorSuite) TestReplica(c *C) {
cfg.RegionScheduleLimit = 0

tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -413,7 +414,7 @@ func (s *testCoordinatorSuite) TestPeerState(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -463,7 +464,7 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -515,7 +516,7 @@ func (s *testCoordinatorSuite) TestAddScheduler(c *C) {
cfg.ReplicaScheduleLimit = 0

tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()
co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
co.run()
Expand Down Expand Up @@ -574,7 +575,7 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) {
cfg.ReplicaScheduleLimit = 0

tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -668,7 +669,7 @@ func (s *testCoordinatorSuite) TestRestart(c *C) {
cfg.RegionScheduleLimit = 0

tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

// Add 3 stores (1, 2, 3) and a region with 1 replica on store 1.
Expand Down Expand Up @@ -763,7 +764,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

c.Assert(tc.addLeaderRegion(1, 1), IsNil)
Expand Down Expand Up @@ -841,7 +842,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestClusterInfo(opt)
hbStreams := newHeartbeatStreams(tc.getClusterID(), tc.kv, tc.clusterInfo)
hbStreams := getHeartBeatStreams(c, tc)
defer hbStreams.Close()

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down Expand Up @@ -928,3 +929,18 @@ func waitNoResponse(c *C, stream *mockHeartbeatStream) {
return res == nil
})
}

func getHeartBeatStreams(c *C, tc *testClusterInfo) *heartbeatStreams {
config := NewTestSingleConfig(c)
svr, err := CreateServer(config, nil)
c.Assert(err, IsNil)
kvBase := newEtcdKVBase(svr)
path := filepath.Join(svr.cfg.DataDir, "region-meta")
regionKV, err := core.NewRegionKV(path)
c.Assert(err, IsNil)
svr.kv = core.NewKV(kvBase).SetRegionKV(regionKV)
cluster := newRaftCluster(svr, tc.getClusterID())
cluster.cachedCluster = tc.clusterInfo
hbStreams := newHeartbeatStreams(tc.getClusterID(), cluster)
return hbStreams
}
6 changes: 3 additions & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,19 +355,19 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
region := core.RegionFromHeartbeat(request)
if region.GetID() == 0 {
msg := fmt.Sprintf("invalid request region, %v", request)
hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeAddress)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
continue
}
if region.GetLeader() == nil {
msg := fmt.Sprintf("invalid request leader, %v", request)
hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeAddress)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
continue
}

err = cluster.HandleRegionHeartbeat(region)
if err != nil {
msg := err.Error()
hbStreams.sendErr(region, pdpb.ErrorType_UNKNOWN, msg, storeAddress)
hbStreams.sendErr(pdpb.ErrorType_UNKNOWN, msg, storeAddress)
}

regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "ok").Inc()
Expand Down
1 change: 0 additions & 1 deletion server/heartbeat_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) {
return 0
}
}

req := &pdpb.RegionHeartbeatRequest{
Header: newRequestHeader(s.svr.clusterID),
Leader: s.region.Peers[0],
Expand Down
45 changes: 15 additions & 30 deletions server/heartbeat_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"sync"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/pkg/logutil"
"github.com/pingcap/pd/server/core"
Expand All @@ -44,11 +43,10 @@ type heartbeatStreams struct {
streams map[uint64]heartbeatStream
msgCh chan *pdpb.RegionHeartbeatResponse
streamCh chan streamUpdate
kv *core.KV
cluster *clusterInfo
cluster *RaftCluster
}

func newHeartbeatStreams(clusterID uint64, kv *core.KV, cluster *clusterInfo) *heartbeatStreams {
func newHeartbeatStreams(clusterID uint64, cluster *RaftCluster) *heartbeatStreams {
ctx, cancel := context.WithCancel(context.Background())
hs := &heartbeatStreams{
ctx: ctx,
Expand All @@ -57,7 +55,6 @@ func newHeartbeatStreams(clusterID uint64, kv *core.KV, cluster *clusterInfo) *h
streams: make(map[uint64]heartbeatStream),
msgCh: make(chan *pdpb.RegionHeartbeatResponse, regionheartbeatSendChanCap),
streamCh: make(chan streamUpdate, 1),
kv: kv,
cluster: cluster,
}
hs.wg.Add(1)
Expand All @@ -80,20 +77,14 @@ func (s *heartbeatStreams) run() {
case update := <-s.streamCh:
s.streams[update.storeID] = update.stream
case msg := <-s.msgCh:
var storeAddress string
store := &metapb.Store{}
storeID := msg.GetTargetPeer().GetStoreId()
if s.cluster == nil {
ok, err := s.kv.LoadStore(storeID, store)
if err != nil {
log.Errorf("[region %v] failed to load store %v: %v", msg.RegionId, storeID, err)
}
if ok {
storeAddress = store.GetAddress()
}
} else {
storeAddress = s.cluster.GetStore(storeID).GetAddress()
store, err := s.cluster.GetStore(storeID)
if err != nil {
log.Errorf("[region %v] fail to get store %v: %v", msg.RegionId, storeID, err)
delete(s.streams, storeID)
continue
}
storeAddress := store.GetAddress()
if stream, ok := s.streams[storeID]; ok {
if err := stream.Send(msg); err != nil {
log.Errorf("[region %v] send heartbeat message fail: %v", msg.RegionId, err)
Expand All @@ -108,19 +99,13 @@ func (s *heartbeatStreams) run() {
}
case <-keepAliveTicker.C:
for storeID, stream := range s.streams {
var storeAddress string
store := &metapb.Store{}
if s.cluster == nil {
ok, err := s.kv.LoadStore(storeID, store)
if err != nil {
log.Errorf("[store %v] failed to load store: %v", storeID, err)
}
if ok {
storeAddress = store.GetAddress()
}
} else {
storeAddress = s.cluster.GetStore(storeID).GetAddress()
store, err := s.cluster.GetStore(storeID)
if err != nil {
log.Errorf("[store %v] fail to get store: %v", storeID, err)
delete(s.streams, storeID)
continue
}
storeAddress := store.GetAddress()
if err := stream.Send(keepAlive); err != nil {
log.Errorf("[store %v] send keepalive message fail: %v", storeID, err)
delete(s.streams, storeID)
Expand Down Expand Up @@ -167,7 +152,7 @@ func (s *heartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHear
}
}

func (s *heartbeatStreams) sendErr(region *core.RegionInfo, errType pdpb.ErrorType, errMsg string, storeAddress string) {
func (s *heartbeatStreams) sendErr(errType pdpb.ErrorType, errMsg string, storeAddress string) {
regionHeartbeatCounter.WithLabelValues(storeAddress, "report", "err").Inc()

msg := &pdpb.RegionHeartbeatResponse{
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (s *Server) startServer() error {
}
s.kv = core.NewKV(kvBase).SetRegionKV(regionKV)
s.cluster = newRaftCluster(s, s.clusterID)
s.hbStreams = newHeartbeatStreams(s.clusterID, s.kv, s.cluster.cachedCluster)
s.hbStreams = newHeartbeatStreams(s.clusterID, s.cluster)
if s.classifier, err = namespace.CreateClassifier(s.cfg.NamespaceClassifier, s.kv, s.idAlloc); err != nil {
return err
}
Expand Down

0 comments on commit 93c847f

Please sign in to comment.