diff --git a/server/grpc_service.go b/server/grpc_service.go index 1bc9ebf3314..8d1d544057f 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -45,19 +45,29 @@ import ( "google.golang.org/grpc/status" ) -// GrpcServer wraps Server to provide grpc service. -type GrpcServer struct { - *Server -} +const ( + // tso + maxMergeTSORequests = 10000 + defaultTSOProxyTimeout = 3 * time.Second + + // global config + globalConfigPath = "/global/config/" +) // gRPC errors var ( // ErrNotLeader is returned when current server is not the leader and not possible to process request. // TODO: work as proxy. - ErrNotLeader = status.Errorf(codes.Unavailable, "not leader") - ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrNotLeader = status.Errorf(codes.Unavailable, "not leader") + ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") ) +// GrpcServer wraps Server to provide grpc service. +type GrpcServer struct { + *Server +} + // GetMembers implements gRPC PDServer. func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) { // Here we purposely do not check the cluster ID because the client does not know the correct cluster ID @@ -99,11 +109,6 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb }, nil } -const ( - maxMergeTSORequests = 10000 - defaultTSOProxyTimeout = 3 * time.Second -) - // Tso implements gRPC PDServer. func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { var ( @@ -631,8 +636,6 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear const regionHeartbeatSendTimeout = 5 * time.Second -var errSendRegionHeartbeatTimeout = errors.New("send region heartbeat timeout") - // heartbeatServer wraps PD_RegionHeartbeatServer to ensure when any error // occurs on Send() or Recv(), both endpoints will be closed. type heartbeatServer struct { @@ -654,7 +657,7 @@ func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error { return errors.WithStack(err) case <-time.After(regionHeartbeatSendTimeout): atomic.StoreInt32(&s.closed, 1) - return errors.WithStack(errSendRegionHeartbeatTimeout) + return ErrSendHeartbeatTimeout } } @@ -1418,7 +1421,7 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR // TODO: Call it in gRPC interceptor. func (s *GrpcServer) validateRequest(header *pdpb.RequestHeader) error { if s.IsClosed() || !s.member.IsLeader() { - return errors.WithStack(ErrNotLeader) + return ErrNotLeader } if header.GetClusterId() != s.clusterID { return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, header.GetClusterId()) @@ -1603,7 +1606,7 @@ func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCL // the gRPC communication between PD servers internally. func (s *GrpcServer) validateInternalRequest(header *pdpb.RequestHeader, onlyAllowLeader bool) error { if s.IsClosed() { - return errors.WithStack(ErrNotStarted) + return ErrNotStarted } // If onlyAllowLeader is true, check whether the sender is PD leader. if onlyAllowLeader { @@ -1701,8 +1704,6 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan <-done } -const globalConfigPath = "/global/config/" - // StoreGlobalConfig store global config into etcd by transaction func (s *GrpcServer) StoreGlobalConfig(ctx context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) { ops := make([]clientv3.Op, len(request.Changes)) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index a35245beae6..1ff1a82da5e 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -39,6 +39,8 @@ import ( syncer "github.com/tikv/pd/server/region_syncer" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/tests" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func Test(t *testing.T) { @@ -408,6 +410,25 @@ func (s *clusterTestSuite) TestGetPDMembers(c *C) { c.Assert(resp.GetMembers(), Not(HasLen), 0) } +func (s *clusterTestSuite) TestNotLeader(c *C) { + tc, err := tests.NewTestCluster(s.ctx, 2) + defer tc.Destroy() + c.Assert(err, IsNil) + c.Assert(tc.RunInitialServers(), IsNil) + + tc.WaitLeader() + followerServer := tc.GetServer(tc.GetFollower()) + grpcPDClient := testutil.MustNewGrpcClient(c, followerServer.GetAddr()) + clusterID := followerServer.GetClusterID() + req := &pdpb.AllocIDRequest{Header: testutil.NewRequestHeader(clusterID)} + resp, err := grpcPDClient.AllocID(context.Background(), req) + c.Assert(resp, IsNil) + grpcStatus, ok := status.FromError(err) + c.Assert(ok, IsTrue) + c.Assert(grpcStatus.Code(), Equals, codes.Unavailable) + c.Assert(grpcStatus.Message(), Equals, "not leader") +} + func (s *clusterTestSuite) TestStoreVersionChange(c *C) { tc, err := tests.NewTestCluster(s.ctx, 1) defer tc.Destroy()