diff --git a/errors.toml b/errors.toml index 9bfd4a79190..fbef5617a45 100644 --- a/errors.toml +++ b/errors.toml @@ -516,9 +516,9 @@ error = ''' init file log error, %s ''' -["PD:mcs:ErrNotFoundSchedulingAddr"] +["PD:mcs:ErrNotFoundSchedulingPrimary"] error = ''' -cannot find scheduling address +cannot find scheduling primary address ''' ["PD:mcs:ErrSchedulingServer"] diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index e5c23cffde2..a4f3a373f82 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -14,7 +14,11 @@ package errs -import "github.com/pingcap/errors" +import ( + "github.com/pingcap/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) const ( // NotLeaderErr indicates the non-leader member received the requests which should be received by leader. @@ -31,6 +35,62 @@ const ( NotServedErr = "is not served" ) +// gRPC errors +var ( + // Canceled indicates the operation was canceled (typically by the caller). + ErrStreamClosed = status.Error(codes.Canceled, "stream is closed") + + // Unknown error. An example of where this error may be returned is + // if a Status value received from another address space belongs to + // an error-space that is not known in this address space. Also + // errors raised by APIs that do not return enough error information + // may be converted to this error. + ErrUnknown = func(err error) error { + return status.Error(codes.Unknown, err.Error()) + } + + // DeadlineExceeded means operation expired before completion. + // For operations that change the state of the system, this error may be + // returned even if the operation has completed successfully. For + // example, a successful response from a server could have been delayed + // long enough for the deadline to expire. + ErrForwardTSOTimeout = status.Error(codes.DeadlineExceeded, "forward tso request timeout") + ErrTSOProxyRecvFromClientTimeout = status.Error(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server") + ErrSendHeartbeatTimeout = status.Error(codes.DeadlineExceeded, "send heartbeat timeout") + + // NotFound means some requested entity (e.g., file or directory) was + // not found. + ErrNotFoundTSOAddr = status.Error(codes.NotFound, "not found tso address") + ErrNotFoundSchedulingAddr = status.Error(codes.NotFound, "not found scheduling address") + ErrNotFoundService = status.Error(codes.NotFound, "not found service") + + // ResourceExhausted indicates some resource has been exhausted, perhaps + // a per-user quota, or perhaps the entire file system is out of space. + ErrMaxCountTSOProxyRoutinesExceeded = status.Error(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded") + ErrGRPCRateLimitExceeded = func(err error) error { + return status.Error(codes.ResourceExhausted, err.Error()) + } + + // FailedPrecondition indicates operation was rejected because the + // system is not in a state required for the operation's execution. + // For example, directory to be deleted may be non-empty, an rmdir + // operation is applied to a non-directory, etc. + ErrMismatchClusterID = func(clusterID, requestClusterID uint64) error { + return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", clusterID, requestClusterID) + } + + // Unavailable indicates the service is currently unavailable. + // This is a most likely a transient condition and may be corrected + // by retrying with a backoff. Note that it is not always safe to retry + // non-idempotent operations. + // ErrNotLeader is returned when current server is not the leader and not possible to process request. + // TODO: work as proxy. + ErrNotLeader = status.Error(codes.Unavailable, "not leader") + ErrNotStarted = status.Error(codes.Unavailable, "server not started") + ErrEtcdNotStarted = status.Error(codes.Unavailable, "server is started, but etcd not started") + ErrFollowerHandlingNotAllowed = status.Error(codes.Unavailable, "not leader and follower handling not allowed") +) + // common error in multiple packages var ( ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:common:ErrGetSourceStore")) @@ -414,6 +474,6 @@ var ( // Micro service errors var ( - ErrNotFoundSchedulingAddr = errors.Normalize("cannot find scheduling address", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingAddr")) - ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer")) + ErrNotFoundSchedulingPrimary = errors.Normalize("cannot find scheduling primary address", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingPrimary")) + ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer")) ) diff --git a/pkg/mcs/metastorage/server/grpc_service.go b/pkg/mcs/metastorage/server/grpc_service.go index 32b3788906d..bb604e8fac5 100644 --- a/pkg/mcs/metastorage/server/grpc_service.go +++ b/pkg/mcs/metastorage/server/grpc_service.go @@ -22,19 +22,13 @@ import ( "github.com/pingcap/kvproto/pkg/meta_storagepb" "github.com/pingcap/log" bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/keypath" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -var ( - // errNotLeader is returned when current server is not the leader. - errNotLeader = status.Errorf(codes.Unavailable, "not leader") ) var _ meta_storagepb.MetaStorageServer = (*Service)(nil) @@ -79,7 +73,7 @@ func (*Service) RegisterRESTHandler(_ map[string]http.Handler) error { func (s *Service) checkServing() error { if s.manager == nil || s.manager.srv == nil || !s.manager.srv.IsServing() { - return errNotLeader + return errs.ErrNotLeader } return nil } diff --git a/pkg/mcs/resourcemanager/server/grpc_service.go b/pkg/mcs/resourcemanager/server/grpc_service.go index 21681bc0759..83ff3fdbc50 100644 --- a/pkg/mcs/resourcemanager/server/grpc_service.go +++ b/pkg/mcs/resourcemanager/server/grpc_service.go @@ -25,17 +25,11 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/pkg/utils/apiutil" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -var ( - // errNotLeader is returned when current server is not the leader. - errNotLeader = status.Errorf(codes.Unavailable, "not leader") ) var _ rmpb.ResourceManagerServer = (*Service)(nil) @@ -87,7 +81,7 @@ func (s *Service) GetManager() *Manager { func (s *Service) checkServing() error { if s.manager == nil || s.manager.srv == nil || !s.manager.srv.IsServing() { - return errNotLeader + return errs.ErrNotLeader } return nil } diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index f4fe606b403..03910f9f408 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -35,14 +35,6 @@ import ( "github.com/tikv/pd/pkg/versioninfo" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// gRPC errors -var ( - ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") - ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched") ) // SetUpRestHandler is a hook to sets up the REST service. @@ -105,7 +97,7 @@ func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error { return errors.WithStack(err) case <-timer.C: atomic.StoreInt32(&s.closed, 1) - return status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") + return errs.ErrSendHeartbeatTimeout } } diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index 3419fd16221..8c2f906f9dc 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -25,18 +25,11 @@ import ( "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/keypath" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// gRPC errors -var ( - ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") - ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched") ) var _ tsopb.TSOServer = (*Service)(nil) @@ -100,14 +93,12 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error { start := time.Now() // TSO uses leader lease to determine validity. No need to check leader here. if s.IsClosed() { - return status.Errorf(codes.Unknown, "server not started") + return errs.ErrNotStarted } header := request.GetHeader() clusterID := header.GetClusterId() if clusterID != keypath.ClusterID() { - return status.Errorf( - codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", - keypath.ClusterID(), clusterID) + return errs.ErrMismatchClusterID(keypath.ClusterID(), clusterID) } keyspaceID := header.GetKeyspaceId() keyspaceGroupID := header.GetKeyspaceGroupId() @@ -117,7 +108,7 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error { keyspaceID, keyspaceGroupID, count) if err != nil { - return status.Error(codes.Unknown, err.Error()) + return errs.ErrUnknown(err) } keyspaceGroupIDStr := strconv.FormatUint(uint64(keyspaceGroupID), 10) tsoHandleDuration.WithLabelValues(keyspaceGroupIDStr).Observe(time.Since(start).Seconds()) @@ -218,10 +209,10 @@ func (s *Service) GetMinTS( func (s *Service) validRequest(header *tsopb.RequestHeader) (tsopb.ErrorType, error) { if s.IsClosed() || s.keyspaceGroupManager == nil { - return tsopb.ErrorType_NOT_BOOTSTRAPPED, ErrNotStarted + return tsopb.ErrorType_NOT_BOOTSTRAPPED, errs.ErrNotStarted } if header == nil || header.GetClusterId() != keypath.ClusterID() { - return tsopb.ErrorType_CLUSTER_MISMATCHED, ErrClusterMismatched + return tsopb.ErrorType_CLUSTER_MISMATCHED, errs.ErrMismatchClusterID(keypath.ClusterID(), header.GetClusterId()) } return tsopb.ErrorType_OK, nil } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index d2974075e94..8ead22ba503 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -51,8 +51,6 @@ import ( "github.com/tikv/pd/pkg/versioninfo" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) var _ bs.Server = (*Server)(nil) @@ -277,7 +275,7 @@ func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorM // TODO: Check if the sender is from the global TSO allocator func (s *Server) ValidateInternalRequest(_ *tsopb.RequestHeader, _ bool) error { if s.IsClosed() { - return ErrNotStarted + return errs.ErrNotStarted } return nil } @@ -286,11 +284,10 @@ func (s *Server) ValidateInternalRequest(_ *tsopb.RequestHeader, _ bool) error { // TODO: Check if the keyspace replica is the primary func (s *Server) ValidateRequest(header *tsopb.RequestHeader) error { if s.IsClosed() { - return ErrNotStarted + return errs.ErrNotStarted } if header.GetClusterId() != keypath.ClusterID() { - return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", - keypath.ClusterID(), header.GetClusterId()) + return errs.ErrMismatchClusterID(keypath.ClusterID(), header.GetClusterId()) } return nil } diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 6009bba1d7d..da41e96ca4b 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -36,8 +36,6 @@ import ( "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) const ( @@ -206,7 +204,7 @@ func (s *RegionSyncer) Sync(ctx context.Context, stream pdpb.PD_SyncRegionsServe } clusterID := request.GetHeader().GetClusterId() if clusterID != keypath.ClusterID() { - return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", keypath.ClusterID(), clusterID) + return errs.ErrMismatchClusterID(keypath.ClusterID(), clusterID) } log.Info("establish sync region stream", zap.String("requested-server", request.GetMember().GetName()), diff --git a/server/api/admin.go b/server/api/admin.go index 24b9feaea75..979694817f2 100644 --- a/server/api/admin.go +++ b/server/api/admin.go @@ -229,7 +229,7 @@ func (h *adminHandler) recoverAllocID(w http.ResponseWriter, r *http.Request) { func (h *adminHandler) deleteRegionCacheInSchedulingServer(id ...uint64) error { addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), constant.SchedulingServiceName) if !ok { - return errs.ErrNotFoundSchedulingAddr.FastGenByArgs() + return errs.ErrNotFoundSchedulingPrimary.FastGenByArgs() } var idStr string if len(id) > 0 { diff --git a/server/api/config.go b/server/api/config.go index 511f47284a9..66ee6672217 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -564,7 +564,7 @@ func (h *confHandler) GetPDServerConfig(w http.ResponseWriter, _ *http.Request) func (h *confHandler) getSchedulingServerConfig() (*config.Config, error) { addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), constant.SchedulingServiceName) if !ok { - return nil, errs.ErrNotFoundSchedulingAddr.FastGenByArgs() + return nil, errs.ErrNotFoundSchedulingPrimary.FastGenByArgs() } url := fmt.Sprintf("%s/scheduling/api/v1/config", addr) req, err := http.NewRequest(http.MethodGet, url, http.NoBody) diff --git a/server/forward.go b/server/forward.go index 674b3e008b6..ca932a785e6 100644 --- a/server/forward.go +++ b/server/forward.go @@ -35,8 +35,6 @@ import ( "github.com/tikv/pd/server/cluster" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) func forwardTSORequest( @@ -105,7 +103,7 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { maxConcurrentTSOProxyStreamings := int32(s.GetMaxConcurrentTSOProxyStreamings()) if maxConcurrentTSOProxyStreamings >= 0 { if newCount := s.concurrentTSOProxyStreamings.Add(1); newCount > maxConcurrentTSOProxyStreamings { - return errors.WithStack(ErrMaxCountTSOProxyRoutinesExceeded) + return errors.WithStack(errs.ErrMaxCountTSOProxyRoutinesExceeded) } } @@ -130,7 +128,7 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { } if request.GetCount() == 0 { err = errs.ErrGenerateTimestamp.FastGenByArgs("tso count should be positive") - return status.Error(codes.Unknown, err.Error()) + return errs.ErrUnknown(err) } forwardCtx, cancelForward, forwardStream, lastForwardedHost, tsoStreamErr, err = s.handleTSOForwarding(forwardCtx, forwardStream, stream, server, request, tsDeadlineCh, lastForwardedHost, cancelForward) if tsoStreamErr != nil { @@ -153,7 +151,7 @@ func (s *GrpcServer) handleTSOForwarding(forwardCtx context.Context, forwardStre ) { forwardedHost, ok := s.GetServicePrimaryAddr(stream.Context(), constant.TSOServiceName) if !ok || len(forwardedHost) == 0 { - return forwardCtx, cancelForward, forwardStream, lastForwardedHost, errors.WithStack(ErrNotFoundTSOAddr), nil + return forwardCtx, cancelForward, forwardStream, lastForwardedHost, errors.WithStack(errs.ErrNotFoundTSOAddr), nil } if forwardStream == nil || lastForwardedHost != forwardedHost { if cancelForward != nil { @@ -456,7 +454,7 @@ func (s *GrpcServer) getGlobalTSO(ctx context.Context) (pdpb.Timestamp, error) { } forwardedHost, ok = s.GetServicePrimaryAddr(ctx, constant.TSOServiceName) if !ok || forwardedHost == "" { - return pdpb.Timestamp{}, ErrNotFoundTSOAddr + return pdpb.Timestamp{}, errs.ErrNotFoundTSOAddr } forwardStream, err = s.getTSOForwardStream(forwardedHost) if err != nil { diff --git a/server/grpc_service.go b/server/grpc_service.go index de74916900e..a331088cf2b 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -51,8 +51,6 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) const ( @@ -65,23 +63,6 @@ const ( gRPCServiceName = "pdpb.PD" ) -// gRPC errors -var ( - // ErrNotLeader is returned when current server is not the leader and not possible to process request. - // TODO: work as proxy. - ErrNotLeader = status.Errorf(codes.Unavailable, "not leader") - ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") - ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") - ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address") - ErrNotFoundSchedulingAddr = status.Errorf(codes.NotFound, "not found scheduling address") - ErrNotFoundService = status.Errorf(codes.NotFound, "not found service") - ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout") - ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded") - ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server") - ErrEtcdNotStarted = status.Errorf(codes.Unavailable, "server is started, but etcd not started") - ErrFollowerHandlingNotAllowed = status.Errorf(codes.Unavailable, "not leader and follower handling not allowed") -) - var ( errRegionHeartbeatSend = forwardFailCounter.WithLabelValues("region_heartbeat", "send") errRegionHeartbeatClient = forwardFailCounter.WithLabelValues("region_heartbeat", "client") @@ -135,7 +116,7 @@ func (s *tsoServer) send(m *pdpb.TsoResponse) error { return errors.WithStack(err) case <-timer.C: atomic.StoreInt32(&s.closed, 1) - return ErrForwardTSOTimeout + return errs.ErrForwardTSOTimeout } } @@ -165,7 +146,7 @@ func (s *tsoServer) recv(timeout time.Duration) (*pdpb.TsoRequest, error) { return req.request, nil case <-timer.C: atomic.StoreInt32(&s.closed, 1) - return nil, ErrTSOProxyRecvFromClientTimeout + return nil, errs.ErrTSOProxyRecvFromClientTimeout } } @@ -196,7 +177,7 @@ func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error { return errors.WithStack(err) case <-timer.C: atomic.StoreInt32(&s.closed, 1) - return ErrSendHeartbeatTimeout + return errs.ErrSendHeartbeatTimeout } } @@ -300,7 +281,7 @@ func (s *GrpcServer) GetMinTS( if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -454,7 +435,7 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } // Here we purposely do not check the cluster ID because the client does not know the correct cluster ID @@ -504,7 +485,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return status.Error(codes.ResourceExhausted, err.Error()) + return errs.ErrGRPCRateLimitExceeded(err) } } if s.IsServiceIndependent(constant.TSOServiceName) { @@ -574,7 +555,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { if s.IsServiceIndependent(constant.TSOServiceName) { if request.GetCount() == 0 { err = errs.ErrGenerateTimestamp.FastGenByArgs("tso count should be positive") - return status.Error(codes.Unknown, err.Error()) + return errs.ErrUnknown(err) } forwardCtx, cancelForward, forwardStream, lastForwardedHost, tsoStreamErr, err = s.handleTSOForwarding(forwardCtx, forwardStream, stream, nil, request, tsDeadlineCh, lastForwardedHost, cancelForward) if tsoStreamErr != nil { @@ -589,11 +570,10 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { start := time.Now() // TSO uses leader lease to determine validity. No need to check leader here. if s.IsClosed() { - return status.Errorf(codes.Unknown, "server not started") + return errs.ErrNotStarted } if clusterID := keypath.ClusterID(); request.GetHeader().GetClusterId() != clusterID { - return status.Errorf(codes.FailedPrecondition, - "mismatch cluster id, need %d but got %d", clusterID, request.GetHeader().GetClusterId()) + return errs.ErrMismatchClusterID(clusterID, request.GetHeader().GetClusterId()) } count := request.GetCount() ctx, task := trace.NewTask(ctx, "tso") @@ -601,7 +581,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { task.End() tsoHandleDuration.Observe(time.Since(start).Seconds()) if err != nil { - return status.Error(codes.Unknown, err.Error()) + return errs.ErrUnknown(err) } response := &pdpb.TsoResponse{ Header: wrapHeader(), @@ -622,7 +602,7 @@ func (s *GrpcServer) Bootstrap(ctx context.Context, request *pdpb.BootstrapReque if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -664,7 +644,7 @@ func (s *GrpcServer) IsBootstrapped(ctx context.Context, request *pdpb.IsBootstr if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -691,7 +671,7 @@ func (s *GrpcServer) AllocID(ctx context.Context, request *pdpb.AllocIDRequest) if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -725,7 +705,7 @@ func (s *GrpcServer) IsSnapshotRecovering(ctx context.Context, _ *pdpb.IsSnapsho if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } // recovering mark is stored in etcd directly, there's no need to forward. @@ -749,7 +729,7 @@ func (s *GrpcServer) GetStore(ctx context.Context, request *pdpb.GetStoreRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -803,7 +783,7 @@ func (s *GrpcServer) PutStore(ctx context.Context, request *pdpb.PutStoreRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -858,7 +838,7 @@ func (s *GrpcServer) GetAllStores(ctx context.Context, request *pdpb.GetAllStore if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -901,7 +881,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -990,7 +970,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (*schedulingClient, error) { forwardedHost, _ := s.GetServicePrimaryAddr(ctx, constant.SchedulingServiceName) if forwardedHost == "" { - return nil, ErrNotFoundSchedulingAddr + return nil, errs.ErrNotFoundSchedulingAddr } pre := s.schedulingClient.Load() @@ -1027,7 +1007,7 @@ type bucketHeartbeatServer struct { func (b *bucketHeartbeatServer) send(bucket *pdpb.ReportBucketsResponse) error { if atomic.LoadInt32(&b.closed) == 1 { - return status.Errorf(codes.Canceled, "stream is closed") + return errs.ErrStreamClosed } done := make(chan error, 1) go func() { @@ -1044,7 +1024,7 @@ func (b *bucketHeartbeatServer) send(bucket *pdpb.ReportBucketsResponse) error { return err case <-timer.C: atomic.StoreInt32(&b.closed, 1) - return ErrSendHeartbeatTimeout + return errs.ErrSendHeartbeatTimeout } } @@ -1080,13 +1060,13 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error { if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return status.Error(codes.ResourceExhausted, err.Error()) + return errs.ErrGRPCRateLimitExceeded(err) } } for { request, err := server.recv() failpoint.Inject("grpcClientClosed", func() { - err = status.Error(codes.Canceled, "grpc client closed") + err = errs.ErrStreamClosed request = nil }) if err == io.EOF { @@ -1196,7 +1176,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return status.Error(codes.ResourceExhausted, err.Error()) + return errs.ErrGRPCRateLimitExceeded(err) } } for { @@ -1396,7 +1376,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error // GetRegion implements gRPC PDServer. func (s *GrpcServer) GetRegion(ctx context.Context, request *pdpb.GetRegionRequest) (*pdpb.GetRegionResponse, error) { failpoint.Inject("rateLimit", func() { - failpoint.Return(nil, status.Error(codes.ResourceExhausted, errs.ErrRateLimitExceeded.Error())) + failpoint.Return(nil, errs.ErrGRPCRateLimitExceeded(errs.ErrRateLimitExceeded)) }) if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { fName := currentFunction() @@ -1404,7 +1384,7 @@ func (s *GrpcServer) GetRegion(ctx context.Context, request *pdpb.GetRegionReque if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1466,7 +1446,7 @@ func (s *GrpcServer) GetPrevRegion(ctx context.Context, request *pdpb.GetRegionR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1523,7 +1503,7 @@ func (s *GrpcServer) GetRegionByID(ctx context.Context, request *pdpb.GetRegionB if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1583,7 +1563,7 @@ func (s *GrpcServer) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1639,7 +1619,7 @@ func (s *GrpcServer) BatchScanRegions(ctx context.Context, request *pdpb.BatchSc if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1729,7 +1709,7 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1773,7 +1753,7 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } @@ -1847,7 +1827,7 @@ func (s *GrpcServer) ReportSplit(ctx context.Context, request *pdpb.ReportSplitR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1883,7 +1863,7 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1920,7 +1900,7 @@ func (s *GrpcServer) GetClusterConfig(ctx context.Context, request *pdpb.GetClus if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1950,7 +1930,7 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1989,7 +1969,7 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } @@ -2101,7 +2081,7 @@ func (s *GrpcServer) GetGCSafePoint(ctx context.Context, request *pdpb.GetGCSafe if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2132,7 +2112,7 @@ func (s *GrpcServer) GetGCSafePoint(ctx context.Context, request *pdpb.GetGCSafe // SyncRegions syncs the regions. func (s *GrpcServer) SyncRegions(stream pdpb.PD_SyncRegionsServer) error { if s.IsClosed() || s.cluster == nil { - return ErrNotStarted + return errs.ErrNotStarted } if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { fName := currentFunction() @@ -2140,12 +2120,12 @@ func (s *GrpcServer) SyncRegions(stream pdpb.PD_SyncRegionsServer) error { if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return status.Error(codes.ResourceExhausted, err.Error()) + return errs.ErrGRPCRateLimitExceeded(err) } } ctx := s.cluster.Context() if ctx == nil { - return ErrNotStarted + return errs.ErrNotStarted } return s.cluster.GetRegionSyncer().Sync(ctx, stream) } @@ -2158,7 +2138,7 @@ func (s *GrpcServer) UpdateGCSafePoint(ctx context.Context, request *pdpb.Update if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2205,7 +2185,7 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2259,7 +2239,7 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } @@ -2332,20 +2312,20 @@ func (s *GrpcServer) validateRequest(header *pdpb.RequestHeader) error { // TODO: Call it in gRPC interceptor. func (s *GrpcServer) validateRoleInRequest(ctx context.Context, header *pdpb.RequestHeader, allowFollower *bool) error { if s.IsClosed() { - return ErrNotStarted + return errs.ErrNotStarted } if !s.member.IsLeader() { if allowFollower == nil { - return ErrNotLeader + return errs.ErrNotLeader } if !grpcutil.IsFollowerHandleEnabled(ctx) { // TODO: change the error code - return ErrFollowerHandlingNotAllowed + return errs.ErrFollowerHandlingNotAllowed } *allowFollower = true } if clusterID := keypath.ClusterID(); header.GetClusterId() != clusterID { - return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", clusterID, header.GetClusterId()) + return errs.ErrMismatchClusterID(clusterID, header.GetClusterId()) } return nil } @@ -2471,7 +2451,7 @@ func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegion if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } @@ -2535,7 +2515,7 @@ func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.S if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2598,7 +2578,7 @@ const globalConfigPath = "/global/config/" // it should be set to `Payload bytes` instead of `Value string` func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) { if s.client == nil { - return nil, ErrEtcdNotStarted + return nil, errs.ErrEtcdNotStarted } if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { fName := currentFunction() @@ -2606,7 +2586,7 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } configPath := request.GetConfigPath() @@ -2644,7 +2624,7 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo // - `ConfigPath` if `Names` is nil can get all values and revision of current path func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) { if s.client == nil { - return nil, ErrEtcdNotStarted + return nil, errs.ErrEtcdNotStarted } if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { fName := currentFunction() @@ -2652,7 +2632,7 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } configPath := request.GetConfigPath() @@ -2692,7 +2672,7 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo // Watch on revision which greater than or equal to the required revision. func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error { if s.client == nil { - return ErrEtcdNotStarted + return errs.ErrEtcdNotStarted } if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { fName := currentFunction() @@ -2700,7 +2680,7 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return status.Error(codes.ResourceExhausted, err.Error()) + return errs.ErrGRPCRateLimitExceeded(err) } } ctx, cancel := context.WithCancel(server.Context()) @@ -2797,7 +2777,7 @@ func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.Repo if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2835,7 +2815,7 @@ func (s *GrpcServer) SetExternalTimestamp(ctx context.Context, request *pdpb.Set if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2871,7 +2851,7 @@ func (s *GrpcServer) GetExternalTimestamp(ctx context.Context, request *pdpb.Get if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, status.Error(codes.ResourceExhausted, err.Error()) + return nil, errs.ErrGRPCRateLimitExceeded(err) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index dfdb9cb8685..53e5db260af 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -35,6 +35,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/dashboard" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/mock/mockserver" @@ -765,7 +766,7 @@ func TestNotLeader(t *testing.T) { grpcStatus, ok := status.FromError(err) re.True(ok) re.Equal(codes.Unavailable, grpcStatus.Code()) - re.ErrorContains(server.ErrNotLeader, grpcStatus.Message()) + re.ErrorContains(errs.ErrNotLeader, grpcStatus.Message()) } func TestStoreVersionChange(t *testing.T) {