diff --git a/client/base_client.go b/client/base_client.go index 16639bc2218..26c4505c608 100644 --- a/client/base_client.go +++ b/client/base_client.go @@ -333,6 +333,10 @@ func (c *baseClient) getMembers(ctx context.Context, url string, timeout time.Du attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String()) return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause() } + if members.GetHeader().GetError() != nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", members.GetHeader().GetError().String(), cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause() + } return members, nil } diff --git a/client/client.go b/client/client.go index 8d9ed44c77d..970b0df2938 100644 --- a/client/client.go +++ b/client/client.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" "go.uber.org/zap" @@ -585,10 +586,8 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) resp, err := c.getClient().GetMembers(ctx, req) cancel() - if err != nil { - cmdFailDurationGetAllMembers.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return nil, errors.WithStack(err) + if err = c.respForErr(cmdFailDurationGetAllMembers, start, err, resp.GetHeader()); err != nil { + return nil, err } return resp.GetMembers(), nil } @@ -1365,10 +1364,8 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt resp, err := c.getClient().GetRegion(ctx, req) cancel() - if err != nil { - cmdFailDurationGetRegion.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return nil, errors.WithStack(err) + if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil { + return nil, err } return handleRegionResponse(resp), nil } @@ -1397,7 +1394,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs Header: c.requestHeader(), RegionKey: key, }) - if err != nil { + if err != nil || resp.GetHeader().GetError() != nil { log.Error("[pd] can't get region info", zap.String("member-URL", url), errs.ZapError(err)) continue } @@ -1437,10 +1434,8 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio resp, err := c.getClient().GetPrevRegion(ctx, req) cancel() - if err != nil { - cmdFailDurationGetPrevRegion.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return nil, errors.WithStack(err) + if err = c.respForErr(cmdFailDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil { + return nil, err } return handleRegionResponse(resp), nil } @@ -1467,10 +1462,8 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get resp, err := c.getClient().GetRegionByID(ctx, req) cancel() - if err != nil { - cmdFailedDurationGetRegionByID.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return nil, errors.WithStack(err) + if err = c.respForErr(cmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil { + return nil, err } return handleRegionResponse(resp), nil } @@ -1498,10 +1491,8 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) scanCtx = grpcutil.BuildForwardContext(scanCtx, c.GetLeaderAddr()) resp, err := c.getClient().ScanRegions(scanCtx, req) - if err != nil { - cmdFailedDurationScanRegions.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return nil, errors.WithStack(err) + if err = c.respForErr(cmdFailedDurationScanRegions, start, err, resp.GetHeader()); err != nil { + return nil, err } return handleRegionsResponse(resp), nil @@ -1552,10 +1543,8 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e resp, err := c.getClient().GetStore(ctx, req) cancel() - if err != nil { - cmdFailedDurationGetStore.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return nil, errors.WithStack(err) + if err = c.respForErr(cmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil { + return nil, err } return handleStoreResponse(resp) } @@ -1594,10 +1583,8 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m resp, err := c.getClient().GetAllStores(ctx, req) cancel() - if err != nil { - cmdFailedDurationGetAllStores.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return nil, errors.WithStack(err) + if err = c.respForErr(cmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil { + return nil, err } return resp.GetStores(), nil } @@ -1619,10 +1606,8 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6 resp, err := c.getClient().UpdateGCSafePoint(ctx, req) cancel() - if err != nil { - cmdFailedDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return 0, errors.WithStack(err) + if err = c.respForErr(cmdFailedDurationUpdateGCSafePoint, start, err, resp.GetHeader()); err != nil { + return 0, err } return resp.GetNewSafePoint(), nil } @@ -1651,10 +1636,8 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, resp, err := c.getClient().UpdateServiceGCSafePoint(ctx, req) cancel() - if err != nil { - cmdFailedDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) - c.ScheduleCheckLeader() - return 0, errors.WithStack(err) + if err = c.respForErr(cmdFailedDurationUpdateServiceGCSafePoint, start, err, resp.GetHeader()); err != nil { + return 0, err } return resp.GetMinSafePoint(), nil } @@ -1893,3 +1876,15 @@ func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem }() return globalConfigWatcherCh, err } + +func (c *client) respForErr(observer prometheus.Observer, start time.Time, err error, header *pdpb.ResponseHeader) error { + if err != nil || header.GetError() != nil { + observer.Observe(time.Since(start).Seconds()) + if err != nil { + c.ScheduleCheckLeader() + return errors.WithStack(err) + } + return errors.WithStack(errors.New(header.GetError().String())) + } + return nil +} diff --git a/server/api/label_test.go b/server/api/label_test.go index c130403364e..4cc89fa98f5 100644 --- a/server/api/label_test.go +++ b/server/api/label_test.go @@ -277,7 +277,7 @@ func (suite *strictlyLabelsStoreTestSuite) TestStoreMatch() { } for _, testCase := range testCases { - _, err := suite.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{ + resp, err := suite.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{ Header: &pdpb.RequestHeader{ClusterId: suite.svr.ClusterID()}, Store: &metapb.Store{ Id: testCase.store.Id, @@ -289,8 +289,9 @@ func (suite *strictlyLabelsStoreTestSuite) TestStoreMatch() { }) if testCase.valid { suite.NoError(err) + suite.Nil(resp.GetHeader().GetError()) } else { - suite.Contains(err.Error(), testCase.expectError) + suite.Contains(resp.GetHeader().GetError().String(), testCase.expectError) } } @@ -301,7 +302,7 @@ func (suite *strictlyLabelsStoreTestSuite) TestStoreMatch() { []byte(`{"enable-placement-rules":"true"}`), tu.StatusOK(suite.Require()))) for _, testCase := range testCases { - _, err := suite.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{ + resp, err := suite.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{ Header: &pdpb.RequestHeader{ClusterId: suite.svr.ClusterID()}, Store: &metapb.Store{ Id: testCase.store.Id, @@ -313,8 +314,9 @@ func (suite *strictlyLabelsStoreTestSuite) TestStoreMatch() { }) if testCase.valid { suite.NoError(err) + suite.Nil(resp.GetHeader().GetError()) } else { - suite.Contains(err.Error(), testCase.expectError) + suite.Contains(resp.GetHeader().GetError().String(), testCase.expectError) } } } diff --git a/server/api/member.go b/server/api/member.go index eaf743c0493..e459983f3b9 100644 --- a/server/api/member.go +++ b/server/api/member.go @@ -67,6 +67,9 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) { if err != nil { return nil, errors.WithStack(err) } + if members.GetHeader().GetError() != nil { + return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String())) + } dclocationDistribution, err := svr.GetTSOAllocatorManager().GetClusterDCLocationsFromEtcd() if err != nil { return nil, errors.WithStack(err) diff --git a/server/grpc_service.go b/server/grpc_service.go old mode 100644 new mode 100755 index c02e51ed510..29aa5e63c25 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -92,13 +92,22 @@ func (s *GrpcServer) unaryMiddleware(ctx context.Context, header *pdpb.RequestHe return nil, nil } +func (s *GrpcServer) wrapErrorToHeader(errorType pdpb.ErrorType, message string) *pdpb.ResponseHeader { + return s.errorHeader(&pdpb.Error{ + Type: errorType, + Message: message, + }) +} + // GetMembers implements gRPC PDServer. func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) { // Here we purposely do not check the cluster ID because the client does not know the correct cluster ID // at startup and needs to get the cluster ID with the first request (i.e. GetMembers). members, err := s.Server.GetMembers() if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.GetMembersResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } var etcdLeader, pdLeader *pdpb.Member @@ -113,7 +122,9 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb tsoAllocatorManager := s.GetTSOAllocatorManager() tsoAllocatorLeaders, err := tsoAllocatorManager.GetLocalAllocatorLeaders() if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.GetMembersResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } leader := s.member.GetLeader() @@ -399,7 +410,9 @@ func (s *GrpcServer) Bootstrap(ctx context.Context, request *pdpb.BootstrapReque res, err := s.bootstrapCluster(request) if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.BootstrapResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } res.Header = s.header() @@ -438,7 +451,9 @@ func (s *GrpcServer) AllocID(ctx context.Context, request *pdpb.AllocIDRequest) // We can use an allocator for all types ID allocation. id, err := s.idAllocator.Alloc() if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.AllocIDResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } return &pdpb.AllocIDResponse{ @@ -466,7 +481,10 @@ func (s *GrpcServer) GetStore(ctx context.Context, request *pdpb.GetStoreRequest storeID := request.GetStoreId() store := rc.GetStore(storeID) if store == nil { - return nil, status.Errorf(codes.Unknown, "invalid store ID %d, not found", storeID) + return &pdpb.GetStoreResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + fmt.Sprintf("invalid store ID %d, not found", storeID)), + }, nil } return &pdpb.GetStoreResponse{ Header: s.header(), @@ -515,11 +533,16 @@ func (s *GrpcServer) PutStore(ctx context.Context, request *pdpb.PutStoreRequest // NOTE: can be removed when placement rules feature is enabled by default. if !s.GetConfig().Replication.EnablePlacementRules && core.IsStoreContainLabel(store, core.EngineKey, core.EngineTiFlash) { - return nil, status.Errorf(codes.FailedPrecondition, "placement rules is disabled") + return &pdpb.PutStoreResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + "placement rules is disabled"), + }, nil } if err := rc.PutStore(store); err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.PutStoreResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } log.Info("put store ok", zap.Stringer("store", store)) @@ -592,7 +615,10 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear storeID := request.GetStats().GetStoreId() store := rc.GetStore(storeID) if store == nil { - return nil, errors.Errorf("store %v not found", storeID) + return &pdpb.StoreHeartbeatResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + fmt.Sprintf("store %v not found", storeID)), + }, nil } // Bypass stats handling if the store report for unsafe recover is not empty. @@ -603,7 +629,10 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear err := rc.HandleStoreHeartbeat(request.GetStats()) if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.StoreHeartbeatResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + err.Error()), + }, nil } s.handleDamagedStore(request.GetStats()) @@ -1063,7 +1092,10 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest return &pdpb.AskSplitResponse{Header: s.notBootstrappedHeader()}, nil } if request.GetRegion() == nil { - return nil, errors.New("missing region for split") + return &pdpb.AskSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_REGION_NOT_FOUND, + "missing region for split"), + }, nil } req := &pdpb.AskSplitRequest{ Region: request.Region, @@ -1071,10 +1103,7 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest split, err := rc.HandleAskSplit(req) if err != nil { return &pdpb.AskSplitResponse{ - Header: s.errorHeader(&pdpb.Error{ - Type: pdpb.ErrorType_UNKNOWN, - Message: err.Error(), - }), + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), }, nil } @@ -1105,7 +1134,10 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp return &pdpb.AskBatchSplitResponse{Header: s.incompatibleVersion("batch_split")}, nil } if request.GetRegion() == nil { - return nil, errors.New("missing region for split") + return &pdpb.AskBatchSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_REGION_NOT_FOUND, + "missing region for split"), + }, nil } req := &pdpb.AskBatchSplitRequest{ Region: request.Region, @@ -1114,10 +1146,7 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp split, err := rc.HandleAskBatchSplit(req) if err != nil { return &pdpb.AskBatchSplitResponse{ - Header: s.errorHeader(&pdpb.Error{ - Type: pdpb.ErrorType_UNKNOWN, - Message: err.Error(), - }), + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), }, nil } @@ -1144,7 +1173,9 @@ func (s *GrpcServer) ReportSplit(ctx context.Context, request *pdpb.ReportSplitR } _, err := rc.HandleReportSplit(request) if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.ReportSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } return &pdpb.ReportSplitResponse{ @@ -1170,7 +1201,10 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB _, err := rc.HandleBatchReportSplit(request) if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.ReportBatchSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + err.Error()), + }, nil } return &pdpb.ReportBatchSplitResponse{ @@ -1216,7 +1250,10 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus } conf := request.GetCluster() if err := rc.PutMetaCluster(conf); err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.PutClusterConfigResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + err.Error()), + }, nil } log.Info("put cluster config ok", zap.Reflect("config", conf)) @@ -1258,7 +1295,10 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg if region == nil { if request.GetRegion() == nil { //nolint - return nil, errors.Errorf("region %d not found", request.GetRegionId()) + return &pdpb.ScatterRegionResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_REGION_NOT_FOUND, + "region %d not found"), + }, nil } region = core.NewRegionInfo(request.GetRegion(), request.GetLeader()) } @@ -1487,12 +1527,17 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest tsoAllocatorManager := s.GetTSOAllocatorManager() // There is no dc-location found in this server, return err. if tsoAllocatorManager.GetClusterDCLocationsNumber() == 0 { - return nil, status.Errorf(codes.Unknown, "empty cluster dc-location found, checker may not work properly") + return &pdpb.SyncMaxTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + "empty cluster dc-location found, checker may not work properly"), + }, nil } // Get all Local TSO Allocator leaders allocatorLeaders, err := tsoAllocatorManager.GetHoldingLocalAllocatorLeaders() if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.SyncMaxTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } if !request.GetSkipCheck() { var maxLocalTS *pdpb.Timestamp @@ -1505,7 +1550,9 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest } currentLocalTSO, err := allocator.GetCurrentTSO() if err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.SyncMaxTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } if tsoutil.CompareTimestamp(currentLocalTSO, maxLocalTS) > 0 { maxLocalTS = currentLocalTSO @@ -1522,10 +1569,16 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest }) if maxLocalTS == nil { - return nil, status.Errorf(codes.Unknown, "local tso allocator leaders have changed during the sync, should retry") + return &pdpb.SyncMaxTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + "local tso allocator leaders have changed during the sync, should retry"), + }, nil } if request.GetMaxTs() == nil { - return nil, status.Errorf(codes.Unknown, "empty maxTS in the request, should retry") + return &pdpb.SyncMaxTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + "empty maxTS in the request, should retry"), + }, nil } // Found a bigger or equal maxLocalTS, return it directly. cmpResult := tsoutil.CompareTimestamp(maxLocalTS, request.GetMaxTs()) @@ -1551,7 +1604,9 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest continue } if err := allocator.WriteTSO(request.GetMaxTs()); err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.SyncMaxTSResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } syncedDCs = append(syncedDCs, allocator.GetDCLocation()) } @@ -1649,7 +1704,10 @@ func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCL info, ok := am.GetDCLocationInfo(request.GetDcLocation()) if !ok { am.ClusterDCLocationChecker() - return nil, status.Errorf(codes.Unknown, "dc-location %s is not found", request.GetDcLocation()) + return &pdpb.GetDCLocationInfoResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, + fmt.Sprintf("dc-location %s is not found", request.GetDcLocation())), + }, nil } resp := &pdpb.GetDCLocationInfoResponse{ Header: s.header(), @@ -1664,7 +1722,9 @@ func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCL // when it becomes the Local TSO Allocator leader. // Please take a look at https://github.com/tikv/pd/issues/3260 for more details. if resp.MaxTs, err = am.GetMaxLocalTSO(ctx); err != nil { - return nil, status.Errorf(codes.Unknown, err.Error()) + return &pdpb.GetDCLocationInfoResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil } return resp, nil } diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index f0689ee4713..4ca6d435f12 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -17,6 +17,7 @@ package tso import ( "context" "fmt" + "github.com/pingcap/errors" "math" "path" "strconv" @@ -1074,6 +1075,9 @@ func (am *AllocatorManager) getDCLocationInfoFromLeader(ctx context.Context, dcL if err != nil { return false, &pdpb.GetDCLocationInfoResponse{}, err } + if resp.GetHeader().GetError() != nil { + return false, &pdpb.GetDCLocationInfoResponse{}, errors.Errorf("get the dc-location info from leader failed: %s", resp.GetHeader().GetError().String()) + } return resp.GetSuffix() != 0, resp, nil } diff --git a/server/tso/global_allocator.go b/server/tso/global_allocator.go old mode 100644 new mode 100755 index 02d7b7438c8..c5aaaaac819 --- a/server/tso/global_allocator.go +++ b/server/tso/global_allocator.go @@ -17,6 +17,7 @@ package tso import ( "context" "fmt" + "github.com/pingcap/errors" "sync" "sync/atomic" "time" @@ -347,6 +348,11 @@ func (gta *GlobalTSOAllocator) SyncMaxTS( log.Error("sync max ts rpc failed, got an error", zap.String("local-allocator-leader-url", leaderConn.Target()), errs.ZapError(err)) return } + if syncMaxTSResp.rpcRes.GetHeader().GetError() != nil { + log.Error("sync max ts rpc failed, got an error", zap.String("local-allocator-leader-url", leaderConn.Target()), + errs.ZapError(errors.Errorf("%s", syncMaxTSResp.rpcRes.GetHeader().GetError().String()))) + return + } }(ctx, leaderConn, respCh) } wg.Wait() diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 0dca158f75e..84c76508b2a 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -782,8 +782,9 @@ func (suite *clientTestSuite) bootstrapServer(header *pdpb.RequestHeader, client Store: stores[0], Region: region, } - _, err := client.Bootstrap(context.Background(), req) + resp, err := client.Bootstrap(context.Background(), req) suite.NoError(err) + suite.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) } func (suite *clientTestSuite) TestNormalTSO() { diff --git a/tests/cluster.go b/tests/cluster.go index 6c79d680c7f..83edbd72daa 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -260,9 +260,9 @@ func (s *TestServer) GetEtcdLeader() (string, error) { s.RLock() defer s.RUnlock() req := &pdpb.GetMembersRequest{Header: &pdpb.RequestHeader{ClusterId: s.server.ClusterID()}} - members, err := s.grpcServer.GetMembers(context.TODO(), req) - if err != nil { - return "", errors.WithStack(err) + members, _ := s.grpcServer.GetMembers(context.TODO(), req) + if members.Header.GetError() != nil { + return "", errors.WithStack(errors.New(members.Header.GetError().String())) } return members.GetEtcdLeader().GetName(), nil } @@ -276,6 +276,9 @@ func (s *TestServer) GetEtcdLeaderID() (uint64, error) { if err != nil { return 0, errors.WithStack(err) } + if members.GetHeader().GetError() != nil { + return 0, errors.WithStack(errors.New(members.GetHeader().GetError().String())) + } return members.GetEtcdLeader().GetMemberId(), nil } @@ -365,10 +368,13 @@ func (s *TestServer) BootstrapCluster() error { Store: &metapb.Store{Id: 1, Address: "mock://1", LastHeartbeat: time.Now().UnixNano()}, Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, } - _, err := s.grpcServer.Bootstrap(context.Background(), bootstrapReq) + resp, err := s.grpcServer.Bootstrap(context.Background(), bootstrapReq) if err != nil { return err } + if resp.GetHeader().GetError() != nil { + return errors.New(resp.GetHeader().GetError().String()) + } return nil } diff --git a/tests/compatibility/version_upgrade_test.go b/tests/compatibility/version_upgrade_test.go index e600a848b3a..11573e6da2f 100644 --- a/tests/compatibility/version_upgrade_test.go +++ b/tests/compatibility/version_upgrade_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/coreos/go-semver/semver" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" @@ -76,8 +77,9 @@ func TestStoreRegister(t *testing.T) { Version: "1.0.1", }, } - _, err = svr.PutStore(context.Background(), putStoreRequest) - re.Error(err) + putStoreResponse, err := svr.PutStore(context.Background(), putStoreRequest) + re.NoError(err) + re.Error(errors.New(putStoreResponse.GetHeader().GetError().String())) } func TestRollingUpgrade(t *testing.T) { diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 8d50b5eaa30..e65711a5b2d 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -682,8 +682,9 @@ func TestRemovingProgress(t *testing.T) { Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"}, Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, } - _, err = grpcPDClient.Bootstrap(context.Background(), req) + resp, err := grpcPDClient.Bootstrap(context.Background(), req) re.NoError(err) + re.Nil(resp.GetHeader().GetError()) stores := []*metapb.Store{ { Id: 1, @@ -799,8 +800,9 @@ func TestPreparingProgress(t *testing.T) { Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"}, Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, } - _, err = grpcPDClient.Bootstrap(context.Background(), req) + resp, err := grpcPDClient.Bootstrap(context.Background(), req) re.NoError(err) + re.Nil(resp.GetHeader().GetError()) stores := []*metapb.Store{ { Id: 1, diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 73a62abee2c..a8d03bde13d 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -160,8 +160,9 @@ func TestDamagedRegion(t *testing.T) { // To put stores. svr := &server.GrpcServer{Server: leaderServer.GetServer()} for _, store := range stores { - _, err = svr.PutStore(context.Background(), store) + resp, err := svr.PutStore(context.Background(), store) re.NoError(err) + re.Nil(resp.GetHeader().GetError()) } // To validate remove peer op be added. @@ -234,56 +235,65 @@ func TestGetPutConfig(t *testing.T) { func testPutStore(re *require.Assertions, clusterID uint64, rc *cluster.RaftCluster, grpcPDClient pdpb.PDClient, store *metapb.Store) { // Update store. - _, err := putStore(grpcPDClient, clusterID, store) + resp, err := putStore(grpcPDClient, clusterID, store) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) + updatedStore := getStore(re, clusterID, grpcPDClient, store.GetId()) re.Equal(store, updatedStore) // Update store again. - _, err = putStore(grpcPDClient, clusterID, store) + resp, err = putStore(grpcPDClient, clusterID, store) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) rc.GetAllocator().Alloc() id, err := rc.GetAllocator().Alloc() re.NoError(err) // Put new store with a duplicated address when old store is up will fail. - _, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id))) - re.Error(err) + resp, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id))) + re.NoError(err) + re.Equal(pdpb.ErrorType_UNKNOWN, resp.GetHeader().GetError().GetType()) id, err = rc.GetAllocator().Alloc() re.NoError(err) // Put new store with a duplicated address when old store is offline will fail. resetStoreState(re, rc, store.GetId(), metapb.StoreState_Offline) - _, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id))) - re.Error(err) + resp, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id))) + re.NoError(err) + re.Equal(pdpb.ErrorType_UNKNOWN, resp.GetHeader().GetError().GetType()) id, err = rc.GetAllocator().Alloc() re.NoError(err) // Put new store with a duplicated address when old store is tombstone is OK. resetStoreState(re, rc, store.GetId(), metapb.StoreState_Tombstone) rc.GetStore(store.GetId()) - _, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id))) + resp, err = putStore(grpcPDClient, clusterID, newMetaStore(id, store.GetAddress(), "2.1.0", metapb.StoreState_Up, getTestDeployPath(id))) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) id, err = rc.GetAllocator().Alloc() re.NoError(err) deployPath := getTestDeployPath(id) // Put a new store. - _, err = putStore(grpcPDClient, clusterID, newMetaStore(id, testMetaStoreAddr, "2.1.0", metapb.StoreState_Up, deployPath)) + resp, err = putStore(grpcPDClient, clusterID, newMetaStore(id, testMetaStoreAddr, "2.1.0", metapb.StoreState_Up, deployPath)) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) s := rc.GetStore(id).GetMeta() re.Equal(deployPath, s.DeployPath) deployPath = fmt.Sprintf("move/test/store%d", id) - _, err = putStore(grpcPDClient, clusterID, newMetaStore(id, testMetaStoreAddr, "2.1.0", metapb.StoreState_Up, deployPath)) + resp, err = putStore(grpcPDClient, clusterID, newMetaStore(id, testMetaStoreAddr, "2.1.0", metapb.StoreState_Up, deployPath)) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) s = rc.GetStore(id).GetMeta() re.Equal(deployPath, s.DeployPath) // Put an existed store with duplicated address with other old stores. resetStoreState(re, rc, store.GetId(), metapb.StoreState_Up) - _, err = putStore(grpcPDClient, clusterID, newMetaStore(store.GetId(), testMetaStoreAddr, "2.1.0", metapb.StoreState_Up, getTestDeployPath(store.GetId()))) - re.Error(err) + resp, err = putStore(grpcPDClient, clusterID, newMetaStore(store.GetId(), testMetaStoreAddr, "2.1.0", metapb.StoreState_Up, getTestDeployPath(store.GetId()))) + re.NoError(err) + re.Equal(pdpb.ErrorType_UNKNOWN, resp.GetHeader().GetError().GetType()) } func getTestDeployPath(storeID uint64) string { @@ -486,6 +496,7 @@ func TestGetPDMembers(t *testing.T) { req := &pdpb.GetMembersRequest{Header: testutil.NewRequestHeader(clusterID)} resp, err := grpcPDClient.GetMembers(context.Background(), req) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) // A more strict test can be found at api/member_test.go re.NotEmpty(resp.GetMembers()) } @@ -537,8 +548,9 @@ func TestStoreVersionChange(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, err = putStore(grpcPDClient, clusterID, store) + resp, err := putStore(grpcPDClient, clusterID, store) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) }() time.Sleep(100 * time.Millisecond) svr.SetClusterVersion("1.0.0") @@ -575,8 +587,9 @@ func TestConcurrentHandleRegion(t *testing.T) { re.NoError(err) store := newMetaStore(storeID, addr, "2.1.0", metapb.StoreState_Up, getTestDeployPath(storeID)) stores = append(stores, store) - _, err = putStore(grpcPDClient, clusterID, store) + resp, err := putStore(grpcPDClient, clusterID, store) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) } var wg sync.WaitGroup @@ -591,8 +604,9 @@ func TestConcurrentHandleRegion(t *testing.T) { }, } grpcServer := &server.GrpcServer{Server: leaderServer.GetServer()} - _, err := grpcServer.StoreHeartbeat(context.TODO(), req) + resp, err := grpcServer.StoreHeartbeat(context.TODO(), req) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) stream, err := grpcPDClient.RegionHeartbeat(ctx) re.NoError(err) peerID, err := id.Alloc() @@ -849,15 +863,18 @@ func TestTiFlashWithPlacementRules(t *testing.T) { } // cannot put TiFlash node without placement rules - _, err = putStore(grpcPDClient, clusterID, tiflashStore) - re.Error(err) + resp, err := putStore(grpcPDClient, clusterID, tiflashStore) + re.NoError(err) + re.Equal(pdpb.ErrorType_UNKNOWN, resp.GetHeader().GetError().GetType()) + rep := leaderServer.GetConfig().Replication rep.EnablePlacementRules = true svr := leaderServer.GetServer() err = svr.SetReplicationConfig(rep) re.NoError(err) - _, err = putStore(grpcPDClient, clusterID, tiflashStore) + resp, err = putStore(grpcPDClient, clusterID, tiflashStore) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) // test TiFlash store limit expect := map[uint64]config.StoreLimitConfig{11: {AddPeer: 30, RemovePeer: 30}} re.Equal(expect, svr.GetScheduleConfig().StoreLimit) @@ -923,8 +940,9 @@ func newBootstrapRequest(clusterID uint64) *pdpb.BootstrapRequest { // helper function to check and bootstrap. func bootstrapCluster(re *require.Assertions, clusterID uint64, grpcPDClient pdpb.PDClient) { req := newBootstrapRequest(clusterID) - _, err := grpcPDClient.Bootstrap(context.Background(), req) + resp, err := grpcPDClient.Bootstrap(context.Background(), req) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) } func putStore(grpcPDClient pdpb.PDClient, clusterID uint64, store *metapb.Store) (*pdpb.PutStoreResponse, error) { @@ -940,6 +958,7 @@ func getStore(re *require.Assertions, clusterID uint64, grpcPDClient pdpb.PDClie StoreId: storeID, }) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) re.Equal(storeID, resp.GetStore().GetId()) return resp.GetStore() } @@ -997,8 +1016,9 @@ func TestOfflineStoreLimit(t *testing.T) { storeID, err := id.Alloc() re.NoError(err) store := newMetaStore(storeID, addr, "4.0.0", metapb.StoreState_Up, getTestDeployPath(storeID)) - _, err = putStore(grpcPDClient, clusterID, store) + resp, err := putStore(grpcPDClient, clusterID, store) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) } for i := uint64(1); i <= 2; i++ { r := &metapb.Region{ @@ -1084,8 +1104,9 @@ func TestUpgradeStoreLimit(t *testing.T) { re.NotNil(rc) rc.SetStorage(storage.NewStorageWithMemoryBackend()) store := newMetaStore(1, "127.0.1.1:0", "4.0.0", metapb.StoreState_Up, "test/store1") - _, err = putStore(grpcPDClient, clusterID, store) + resp, err := putStore(grpcPDClient, clusterID, store) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) r := &metapb.Region{ Id: 1, RegionEpoch: &metapb.RegionEpoch{ @@ -1151,8 +1172,9 @@ func TestStaleTermHeartbeat(t *testing.T) { peerID, err := id.Alloc() re.NoError(err) store := newMetaStore(storeID, addr, "3.0.0", metapb.StoreState_Up, getTestDeployPath(storeID)) - _, err = putStore(grpcPDClient, clusterID, store) + resp, err := putStore(grpcPDClient, clusterID, store) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) peers = append(peers, &metapb.Peer{ Id: peerID, StoreId: storeID, @@ -1273,8 +1295,9 @@ func TestMinResolvedTS(t *testing.T) { if isTiflash { store.Labels = []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}} } - _, err = putStore(grpcPDClient, clusterID, store) + resp, err := putStore(grpcPDClient, clusterID, store) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) req := &pdpb.ReportMinResolvedTsRequest{ Header: testutil.NewRequestHeader(clusterID), StoreId: storeID, diff --git a/tests/server/id/id_test.go b/tests/server/id/id_test.go index 3375cc55adb..35637d43f1a 100644 --- a/tests/server/id/id_test.go +++ b/tests/server/id/id_test.go @@ -98,6 +98,7 @@ func TestCommand(t *testing.T) { for i := uint64(0); i < 2*allocStep; i++ { resp, err := grpcPDClient.AllocID(context.Background(), req) re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) re.Greater(resp.GetId(), last) last = resp.GetId() } diff --git a/tools/pd-heartbeat-bench/main.go b/tools/pd-heartbeat-bench/main.go index 18d6746e983..72a58fdd3da 100644 --- a/tools/pd-heartbeat-bench/main.go +++ b/tools/pd-heartbeat-bench/main.go @@ -61,6 +61,9 @@ func initClusterID(cli pdpb.PDClient) { if err != nil { log.Fatal(err) } + if res.GetHeader().GetError() != nil { + log.Fatal(res.GetHeader().GetError()) + } clusterID = res.GetHeader().GetClusterId() log.Println("ClusterID:", clusterID) } @@ -95,10 +98,13 @@ func bootstrap(cli pdpb.PDClient) { Store: store, Region: region, } - _, err = cli.Bootstrap(context.TODO(), req) + resp, err := cli.Bootstrap(context.TODO(), req) if err != nil { log.Fatal(err) } + if resp.GetHeader().GetError() != nil { + log.Fatalf("bootstrap failed: %s", resp.GetHeader().GetError().String()) + } log.Println("bootstrapped") } @@ -108,10 +114,13 @@ func putStores(cli pdpb.PDClient) { Id: i, Address: fmt.Sprintf("localhost:%d", i), } - _, err := cli.PutStore(context.TODO(), &pdpb.PutStoreRequest{Header: header(), Store: store}) + resp, err := cli.PutStore(context.TODO(), &pdpb.PutStoreRequest{Header: header(), Store: store}) if err != nil { log.Fatal(err) } + if resp.GetHeader().GetError() != nil { + log.Fatalf("put store failed: %s", resp.GetHeader().GetError().String()) + } } } diff --git a/tools/pd-simulator/simulator/client.go b/tools/pd-simulator/simulator/client.go index 13a4b7c0bf1..1306af648e0 100644 --- a/tools/pd-simulator/simulator/client.go +++ b/tools/pd-simulator/simulator/client.go @@ -118,6 +118,9 @@ func (c *client) getMembers(ctx context.Context) (*pdpb.GetMembersResponse, erro if err != nil { return nil, errors.WithStack(err) } + if members.GetHeader().GetError() != nil { + return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String())) + } return members, nil } @@ -245,6 +248,9 @@ func (c *client) AllocID(ctx context.Context) (uint64, error) { if err != nil { return 0, err } + if resp.GetHeader().GetError() != nil { + return 0, errors.Errorf("alloc id failed: %s", resp.GetHeader().GetError().String()) + } return resp.GetId(), nil } @@ -263,7 +269,7 @@ func (c *client) Bootstrap(ctx context.Context, store *metapb.Store, region *met if err != nil { return err } - _, err = c.pdClient().Bootstrap(ctx, &pdpb.BootstrapRequest{ + res, err := c.pdClient().Bootstrap(ctx, &pdpb.BootstrapRequest{ Header: c.requestHeader(), Store: proto.Clone(store).(*metapb.Store), Region: proto.Clone(region).(*metapb.Region), @@ -271,6 +277,9 @@ func (c *client) Bootstrap(ctx context.Context, store *metapb.Store, region *met if err != nil { return err } + if res.GetHeader().GetError() != nil { + return errors.Errorf("bootstrap failed: %s", resp.GetHeader().GetError().String()) + } return nil }