Skip to content

Commit

Permalink
server: partially fix grpc status (#4798) (#5218)
Browse files Browse the repository at this point in the history
ref #4797, ref #4798

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: Ryan Leung <[email protected]>
  • Loading branch information
ti-chi-bot and rleungx authored Jun 23, 2022
1 parent 0204bfa commit 7646455
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 18 deletions.
37 changes: 19 additions & 18 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,29 @@ import (
"google.golang.org/grpc/status"
)

// GrpcServer wraps Server to provide grpc service.
type GrpcServer struct {
*Server
}
const (
// tso
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second

// global config
globalConfigPath = "/global/config/"
)

// gRPC errors
var (
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
)

// GrpcServer wraps Server to provide grpc service.
type GrpcServer struct {
*Server
}

// GetMembers implements gRPC PDServer.
func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) {
// Here we purposely do not check the cluster ID because the client does not know the correct cluster ID
Expand Down Expand Up @@ -99,11 +109,6 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb
}, nil
}

const (
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second
)

// Tso implements gRPC PDServer.
func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
var (
Expand Down Expand Up @@ -631,8 +636,6 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear

const regionHeartbeatSendTimeout = 5 * time.Second

var errSendRegionHeartbeatTimeout = errors.New("send region heartbeat timeout")

// heartbeatServer wraps PD_RegionHeartbeatServer to ensure when any error
// occurs on Send() or Recv(), both endpoints will be closed.
type heartbeatServer struct {
Expand All @@ -654,7 +657,7 @@ func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error {
return errors.WithStack(err)
case <-time.After(regionHeartbeatSendTimeout):
atomic.StoreInt32(&s.closed, 1)
return errors.WithStack(errSendRegionHeartbeatTimeout)
return ErrSendHeartbeatTimeout
}
}

Expand Down Expand Up @@ -1418,7 +1421,7 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR
// TODO: Call it in gRPC interceptor.
func (s *GrpcServer) validateRequest(header *pdpb.RequestHeader) error {
if s.IsClosed() || !s.member.IsLeader() {
return errors.WithStack(ErrNotLeader)
return ErrNotLeader
}
if header.GetClusterId() != s.clusterID {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, header.GetClusterId())
Expand Down Expand Up @@ -1603,7 +1606,7 @@ func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCL
// the gRPC communication between PD servers internally.
func (s *GrpcServer) validateInternalRequest(header *pdpb.RequestHeader, onlyAllowLeader bool) error {
if s.IsClosed() {
return errors.WithStack(ErrNotStarted)
return ErrNotStarted
}
// If onlyAllowLeader is true, check whether the sender is PD leader.
if onlyAllowLeader {
Expand Down Expand Up @@ -1701,8 +1704,6 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
<-done
}

const globalConfigPath = "/global/config/"

// StoreGlobalConfig store global config into etcd by transaction
func (s *GrpcServer) StoreGlobalConfig(ctx context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
ops := make([]clientv3.Op, len(request.Changes))
Expand Down
21 changes: 21 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
syncer "github.com/tikv/pd/server/region_syncer"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/tests"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func Test(t *testing.T) {
Expand Down Expand Up @@ -408,6 +410,25 @@ func (s *clusterTestSuite) TestGetPDMembers(c *C) {
c.Assert(resp.GetMembers(), Not(HasLen), 0)
}

func (s *clusterTestSuite) TestNotLeader(c *C) {
tc, err := tests.NewTestCluster(s.ctx, 2)
defer tc.Destroy()
c.Assert(err, IsNil)
c.Assert(tc.RunInitialServers(), IsNil)

tc.WaitLeader()
followerServer := tc.GetServer(tc.GetFollower())
grpcPDClient := testutil.MustNewGrpcClient(c, followerServer.GetAddr())
clusterID := followerServer.GetClusterID()
req := &pdpb.AllocIDRequest{Header: testutil.NewRequestHeader(clusterID)}
resp, err := grpcPDClient.AllocID(context.Background(), req)
c.Assert(resp, IsNil)
grpcStatus, ok := status.FromError(err)
c.Assert(ok, IsTrue)
c.Assert(grpcStatus.Code(), Equals, codes.Unavailable)
c.Assert(grpcStatus.Message(), Equals, "not leader")
}

func (s *clusterTestSuite) TestStoreVersionChange(c *C) {
tc, err := tests.NewTestCluster(s.ctx, 1)
defer tc.Destroy()
Expand Down

0 comments on commit 7646455

Please sign in to comment.