Skip to content

Commit

Permalink
*: put gRPC unknown error into the header. (tikv#5310) (tikv#6724)
Browse files Browse the repository at this point in the history
close tikv#5161, ref tikv#5309, ref tikv#5310, ref pingcap/tiflash#7408

put the unknown error into the header instead of directly returning a gRPC error.

Signed-off-by: husharp <[email protected]>

Co-authored-by: Hu# <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
3 people authored Jun 30, 2023
1 parent 0be684b commit 3dc0cc3
Show file tree
Hide file tree
Showing 15 changed files with 227 additions and 107 deletions.
4 changes: 4 additions & 0 deletions client/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ func (c *baseClient) getMembers(ctx context.Context, url string, timeout time.Du
attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause()
}
if members.GetHeader().GetError() != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s", members.GetHeader().GetError().String(), cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause()
}
return members, nil
}

Expand Down
69 changes: 32 additions & 37 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -583,10 +584,8 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.getClient().GetMembers(ctx, req)
cancel()
if err != nil {
cmdFailDurationGetAllMembers.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
if err = c.respForErr(cmdFailDurationGetAllMembers, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return resp.GetMembers(), nil
}
Expand Down Expand Up @@ -1368,10 +1367,8 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
resp, err := c.getClient().GetRegion(ctx, req)
cancel()

if err != nil {
cmdFailDurationGetRegion.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
}
Expand Down Expand Up @@ -1400,7 +1397,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
Header: c.requestHeader(),
RegionKey: key,
})
if err != nil {
if err != nil || resp.GetHeader().GetError() != nil {
log.Error("[pd] can't get region info", zap.String("member-URL", url), errs.ZapError(err))
continue
}
Expand Down Expand Up @@ -1440,10 +1437,8 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
resp, err := c.getClient().GetPrevRegion(ctx, req)
cancel()

if err != nil {
cmdFailDurationGetPrevRegion.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
if err = c.respForErr(cmdFailDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
}
Expand All @@ -1470,10 +1465,8 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
resp, err := c.getClient().GetRegionByID(ctx, req)
cancel()

if err != nil {
cmdFailedDurationGetRegionByID.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
if err = c.respForErr(cmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
}
Expand Down Expand Up @@ -1501,10 +1494,8 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int)
scanCtx = grpcutil.BuildForwardContext(scanCtx, c.GetLeaderAddr())
resp, err := c.getClient().ScanRegions(scanCtx, req)

if err != nil {
cmdFailedDurationScanRegions.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
if err = c.respForErr(cmdFailedDurationScanRegions, start, err, resp.GetHeader()); err != nil {
return nil, err
}

return handleRegionsResponse(resp), nil
Expand Down Expand Up @@ -1555,10 +1546,8 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e
resp, err := c.getClient().GetStore(ctx, req)
cancel()

if err != nil {
cmdFailedDurationGetStore.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
if err = c.respForErr(cmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleStoreResponse(resp)
}
Expand Down Expand Up @@ -1597,10 +1586,8 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
resp, err := c.getClient().GetAllStores(ctx, req)
cancel()

if err != nil {
cmdFailedDurationGetAllStores.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
if err = c.respForErr(cmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return resp.GetStores(), nil
}
Expand All @@ -1622,10 +1609,8 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
resp, err := c.getClient().UpdateGCSafePoint(ctx, req)
cancel()

if err != nil {
cmdFailedDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return 0, errors.WithStack(err)
if err = c.respForErr(cmdFailedDurationUpdateGCSafePoint, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetNewSafePoint(), nil
}
Expand Down Expand Up @@ -1654,10 +1639,8 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
resp, err := c.getClient().UpdateServiceGCSafePoint(ctx, req)
cancel()

if err != nil {
cmdFailedDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return 0, errors.WithStack(err)
if err = c.respForErr(cmdFailedDurationUpdateServiceGCSafePoint, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetMinSafePoint(), nil
}
Expand Down Expand Up @@ -1896,3 +1879,15 @@ func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem
}()
return globalConfigWatcherCh, err
}

func (c *client) respForErr(observer prometheus.Observer, start time.Time, err error, header *pdpb.ResponseHeader) error {
if err != nil || header.GetError() != nil {
observer.Observe(time.Since(start).Seconds())
if err != nil {
c.ScheduleCheckLeader()
return errors.WithStack(err)
}
return errors.WithStack(errors.New(header.GetError().String()))
}
return nil
}
9 changes: 4 additions & 5 deletions server/api/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package api
import (
"context"
"fmt"
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -268,7 +267,7 @@ func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) {
}

for _, t := range cases {
_, err := s.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{
resp, err := s.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: s.svr.ClusterID()},
Store: &metapb.Store{
Id: t.store.Id,
Expand All @@ -281,14 +280,14 @@ func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) {
if t.valid {
c.Assert(err, IsNil)
} else {
c.Assert(strings.Contains(err.Error(), t.expectError), IsTrue)
c.Assert(resp.GetHeader().GetError(), NotNil)
}
}

// enable placement rules. Report no error any more.
c.Assert(tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/config", s.urlPrefix), []byte(`{"enable-placement-rules":"true"}`), tu.StatusOK(c)), IsNil)
for _, t := range cases {
_, err := s.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{
resp, err := s.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: s.svr.ClusterID()},
Store: &metapb.Store{
Id: t.store.Id,
Expand All @@ -301,7 +300,7 @@ func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) {
if t.valid {
c.Assert(err, IsNil)
} else {
c.Assert(strings.Contains(err.Error(), t.expectError), IsTrue)
c.Assert(resp.GetHeader().GetError(), NotNil)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) {
if err != nil {
return nil, errors.WithStack(err)
}
if members.GetHeader().GetError() != nil {
return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String()))
}
dclocationDistribution, err := svr.GetTSOAllocatorManager().GetClusterDCLocationsFromEtcd()
if err != nil {
return nil, errors.WithStack(err)
Expand Down
Loading

0 comments on commit 3dc0cc3

Please sign in to comment.