diff --git a/server/grpc_service.go b/server/grpc_service.go index 585bd434f7a..f12a2b812db 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -42,14 +42,18 @@ import ( "google.golang.org/grpc/status" ) -const slowThreshold = 5 * time.Millisecond +const ( + heartbeatSendTimeout = 5 * time.Second + slowThreshold = 5 * time.Millisecond +) // gRPC errors var ( // ErrNotLeader is returned when current server is not the leader and not possible to process request. // TODO: work as proxy. - ErrNotLeader = status.Errorf(codes.Unavailable, "not leader") - ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrNotLeader = status.Errorf(codes.Unavailable, "not leader") + ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") ) // GetMembers implements gRPC PDServer. @@ -458,10 +462,6 @@ func (s *Server) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHeartbea }, nil } -const regionHeartbeatSendTimeout = 5 * time.Second - -var errSendRegionHeartbeatTimeout = errors.New("send region heartbeat timeout") - // heartbeatServer wraps PD_RegionHeartbeatServer to ensure when any error // occurs on Send() or Recv(), both endpoints will be closed. type heartbeatServer struct { @@ -481,9 +481,9 @@ func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error { atomic.StoreInt32(&s.closed, 1) } return errors.WithStack(err) - case <-time.After(regionHeartbeatSendTimeout): + case <-time.After(heartbeatSendTimeout): atomic.StoreInt32(&s.closed, 1) - return errors.WithStack(errSendRegionHeartbeatTimeout) + return ErrSendHeartbeatTimeout } } @@ -1243,7 +1243,7 @@ func (s *Server) GetOperator(ctx context.Context, request *pdpb.GetOperatorReque // TODO: Call it in gRPC interceptor. func (s *Server) validateRequest(header *pdpb.RequestHeader) error { if s.IsClosed() || !s.member.IsLeader() { - return errors.WithStack(ErrNotLeader) + return ErrNotLeader } if header.GetClusterId() != s.clusterID { return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, header.GetClusterId()) @@ -1423,7 +1423,7 @@ func (s *Server) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCLocat // the gRPC communication between PD servers internally. func (s *Server) validateInternalRequest(header *pdpb.RequestHeader, onlyAllowLeader bool) error { if s.IsClosed() { - return errors.WithStack(ErrNotStarted) + return ErrNotStarted } // If onlyAllowLeader is true, check whether the sender is PD leader. if onlyAllowLeader { diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 48dd36444f8..7e77980e083 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -39,6 +39,8 @@ import ( syncer "github.com/tikv/pd/server/region_syncer" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/tests" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func Test(t *testing.T) { @@ -416,6 +418,25 @@ func (s *clusterTestSuite) TestGetPDMembers(c *C) { c.Assert(len(resp.GetMembers()), Not(Equals), 0) } +func (s *clusterTestSuite) TestNotLeader(c *C) { + tc, err := tests.NewTestCluster(s.ctx, 2) + defer tc.Destroy() + c.Assert(err, IsNil) + c.Assert(tc.RunInitialServers(), IsNil) + + tc.WaitLeader() + followerServer := tc.GetServer(tc.GetFollower()) + grpcPDClient := testutil.MustNewGrpcClient(c, followerServer.GetAddr()) + clusterID := followerServer.GetClusterID() + req := &pdpb.AllocIDRequest{Header: testutil.NewRequestHeader(clusterID)} + resp, err := grpcPDClient.AllocID(context.Background(), req) + c.Assert(resp, IsNil) + grpcStatus, ok := status.FromError(err) + c.Assert(ok, IsTrue) + c.Assert(grpcStatus.Code(), Equals, codes.Unavailable) + c.Assert(grpcStatus.Message(), Equals, "not leader") +} + func (s *clusterTestSuite) TestStoreVersionChange(c *C) { tc, err := tests.NewTestCluster(s.ctx, 1) defer tc.Destroy()