Skip to content

Commit

Permalink
server: partially fix grpc status (#4798) (#5215)
Browse files Browse the repository at this point in the history
ref #4797, ref #4798

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: Ryan Leung <[email protected]>
  • Loading branch information
ti-chi-bot and rleungx authored Jul 5, 2022
1 parent e792849 commit 6853e8f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
22 changes: 11 additions & 11 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,18 @@ import (
"google.golang.org/grpc/status"
)

const slowThreshold = 5 * time.Millisecond
const (
heartbeatSendTimeout = 5 * time.Second
slowThreshold = 5 * time.Millisecond
)

// gRPC errors
var (
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
)

// GetMembers implements gRPC PDServer.
Expand Down Expand Up @@ -458,10 +462,6 @@ func (s *Server) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHeartbea
}, nil
}

const regionHeartbeatSendTimeout = 5 * time.Second

var errSendRegionHeartbeatTimeout = errors.New("send region heartbeat timeout")

// heartbeatServer wraps PD_RegionHeartbeatServer to ensure when any error
// occurs on Send() or Recv(), both endpoints will be closed.
type heartbeatServer struct {
Expand All @@ -481,9 +481,9 @@ func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error {
atomic.StoreInt32(&s.closed, 1)
}
return errors.WithStack(err)
case <-time.After(regionHeartbeatSendTimeout):
case <-time.After(heartbeatSendTimeout):
atomic.StoreInt32(&s.closed, 1)
return errors.WithStack(errSendRegionHeartbeatTimeout)
return ErrSendHeartbeatTimeout
}
}

Expand Down Expand Up @@ -1243,7 +1243,7 @@ func (s *Server) GetOperator(ctx context.Context, request *pdpb.GetOperatorReque
// TODO: Call it in gRPC interceptor.
func (s *Server) validateRequest(header *pdpb.RequestHeader) error {
if s.IsClosed() || !s.member.IsLeader() {
return errors.WithStack(ErrNotLeader)
return ErrNotLeader
}
if header.GetClusterId() != s.clusterID {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, header.GetClusterId())
Expand Down Expand Up @@ -1423,7 +1423,7 @@ func (s *Server) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCLocat
// the gRPC communication between PD servers internally.
func (s *Server) validateInternalRequest(header *pdpb.RequestHeader, onlyAllowLeader bool) error {
if s.IsClosed() {
return errors.WithStack(ErrNotStarted)
return ErrNotStarted
}
// If onlyAllowLeader is true, check whether the sender is PD leader.
if onlyAllowLeader {
Expand Down
21 changes: 21 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
syncer "github.com/tikv/pd/server/region_syncer"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/tests"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func Test(t *testing.T) {
Expand Down Expand Up @@ -416,6 +418,25 @@ func (s *clusterTestSuite) TestGetPDMembers(c *C) {
c.Assert(len(resp.GetMembers()), Not(Equals), 0)
}

func (s *clusterTestSuite) TestNotLeader(c *C) {
tc, err := tests.NewTestCluster(s.ctx, 2)
defer tc.Destroy()
c.Assert(err, IsNil)
c.Assert(tc.RunInitialServers(), IsNil)

tc.WaitLeader()
followerServer := tc.GetServer(tc.GetFollower())
grpcPDClient := testutil.MustNewGrpcClient(c, followerServer.GetAddr())
clusterID := followerServer.GetClusterID()
req := &pdpb.AllocIDRequest{Header: testutil.NewRequestHeader(clusterID)}
resp, err := grpcPDClient.AllocID(context.Background(), req)
c.Assert(resp, IsNil)
grpcStatus, ok := status.FromError(err)
c.Assert(ok, IsTrue)
c.Assert(grpcStatus.Code(), Equals, codes.Unavailable)
c.Assert(grpcStatus.Message(), Equals, "not leader")
}

func (s *clusterTestSuite) TestStoreVersionChange(c *C) {
tc, err := tests.NewTestCluster(s.ctx, 1)
defer tc.Destroy()
Expand Down

0 comments on commit 6853e8f

Please sign in to comment.