Skip to content

Commit

Permalink
*: put gRPC unknown error into the header. (#5310)
Browse files Browse the repository at this point in the history
close #5161, ref #5161

put the unknown error into the header instead of directly returning a gRPC error.

Signed-off-by: husharp <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
HuSharp and ti-chi-bot authored Jul 21, 2022
1 parent 3480a62 commit bde0a1b
Show file tree
Hide file tree
Showing 15 changed files with 234 additions and 107 deletions.
4 changes: 4 additions & 0 deletions client/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ func (c *baseClient) getMembers(ctx context.Context, url string, timeout time.Du
attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause()
}
if members.GetHeader().GetError() != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s", members.GetHeader().GetError().String(), cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause()
}
return members, nil
}

Expand Down
69 changes: 32 additions & 37 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -585,10 +586,8 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.getClient().GetMembers(ctx, req)
cancel()
if err != nil {
cmdFailDurationGetAllMembers.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
if err = c.respForErr(cmdFailDurationGetAllMembers, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return resp.GetMembers(), nil
}
Expand Down Expand Up @@ -1365,10 +1364,8 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
resp, err := c.getClient().GetRegion(ctx, req)
cancel()

if err != nil {
cmdFailDurationGetRegion.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
}
Expand Down Expand Up @@ -1397,7 +1394,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
Header: c.requestHeader(),
RegionKey: key,
})
if err != nil {
if err != nil || resp.GetHeader().GetError() != nil {
log.Error("[pd] can't get region info", zap.String("member-URL", url), errs.ZapError(err))
continue
}
Expand Down Expand Up @@ -1437,10 +1434,8 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
resp, err := c.getClient().GetPrevRegion(ctx, req)
cancel()

if err != nil {
cmdFailDurationGetPrevRegion.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
if err = c.respForErr(cmdFailDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
}
Expand All @@ -1467,10 +1462,8 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
resp, err := c.getClient().GetRegionByID(ctx, req)
cancel()

if err != nil {
cmdFailedDurationGetRegionByID.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
if err = c.respForErr(cmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
}
Expand Down Expand Up @@ -1498,10 +1491,8 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int)
scanCtx = grpcutil.BuildForwardContext(scanCtx, c.GetLeaderAddr())
resp, err := c.getClient().ScanRegions(scanCtx, req)

if err != nil {
cmdFailedDurationScanRegions.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
if err = c.respForErr(cmdFailedDurationScanRegions, start, err, resp.GetHeader()); err != nil {
return nil, err
}

return handleRegionsResponse(resp), nil
Expand Down Expand Up @@ -1552,10 +1543,8 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e
resp, err := c.getClient().GetStore(ctx, req)
cancel()

if err != nil {
cmdFailedDurationGetStore.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
if err = c.respForErr(cmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleStoreResponse(resp)
}
Expand Down Expand Up @@ -1594,10 +1583,8 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
resp, err := c.getClient().GetAllStores(ctx, req)
cancel()

if err != nil {
cmdFailedDurationGetAllStores.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
if err = c.respForErr(cmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return resp.GetStores(), nil
}
Expand All @@ -1619,10 +1606,8 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
resp, err := c.getClient().UpdateGCSafePoint(ctx, req)
cancel()

if err != nil {
cmdFailedDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return 0, errors.WithStack(err)
if err = c.respForErr(cmdFailedDurationUpdateGCSafePoint, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetNewSafePoint(), nil
}
Expand Down Expand Up @@ -1651,10 +1636,8 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
resp, err := c.getClient().UpdateServiceGCSafePoint(ctx, req)
cancel()

if err != nil {
cmdFailedDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return 0, errors.WithStack(err)
if err = c.respForErr(cmdFailedDurationUpdateServiceGCSafePoint, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetMinSafePoint(), nil
}
Expand Down Expand Up @@ -1893,3 +1876,15 @@ func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem
}()
return globalConfigWatcherCh, err
}

func (c *client) respForErr(observer prometheus.Observer, start time.Time, err error, header *pdpb.ResponseHeader) error {
if err != nil || header.GetError() != nil {
observer.Observe(time.Since(start).Seconds())
if err != nil {
c.ScheduleCheckLeader()
return errors.WithStack(err)
}
return errors.WithStack(errors.New(header.GetError().String()))
}
return nil
}
10 changes: 6 additions & 4 deletions server/api/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (suite *strictlyLabelsStoreTestSuite) TestStoreMatch() {
}

for _, testCase := range testCases {
_, err := suite.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{
resp, err := suite.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: suite.svr.ClusterID()},
Store: &metapb.Store{
Id: testCase.store.Id,
Expand All @@ -289,8 +289,9 @@ func (suite *strictlyLabelsStoreTestSuite) TestStoreMatch() {
})
if testCase.valid {
suite.NoError(err)
suite.Nil(resp.GetHeader().GetError())
} else {
suite.Contains(err.Error(), testCase.expectError)
suite.Contains(resp.GetHeader().GetError().String(), testCase.expectError)
}
}

Expand All @@ -301,7 +302,7 @@ func (suite *strictlyLabelsStoreTestSuite) TestStoreMatch() {
[]byte(`{"enable-placement-rules":"true"}`),
tu.StatusOK(suite.Require())))
for _, testCase := range testCases {
_, err := suite.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{
resp, err := suite.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: suite.svr.ClusterID()},
Store: &metapb.Store{
Id: testCase.store.Id,
Expand All @@ -313,8 +314,9 @@ func (suite *strictlyLabelsStoreTestSuite) TestStoreMatch() {
})
if testCase.valid {
suite.NoError(err)
suite.Nil(resp.GetHeader().GetError())
} else {
suite.Contains(err.Error(), testCase.expectError)
suite.Contains(resp.GetHeader().GetError().String(), testCase.expectError)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) {
if err != nil {
return nil, errors.WithStack(err)
}
if members.GetHeader().GetError() != nil {
return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String()))
}
dclocationDistribution, err := svr.GetTSOAllocatorManager().GetClusterDCLocationsFromEtcd()
if err != nil {
return nil, errors.WithStack(err)
Expand Down
Loading

0 comments on commit bde0a1b

Please sign in to comment.