Skip to content

Commit

Permalink
address the comment
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Nov 29, 2024
1 parent fc51ff6 commit 642c989
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 69 deletions.
62 changes: 20 additions & 42 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ import (
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
"github.com/tikv/pd/client/pkg/retry"
"github.com/tikv/pd/client/pkg/utils/grpcutil"
"github.com/tikv/pd/client/pkg/utils/tlsutil"
sd "github.com/tikv/pd/client/servicediscovery"
"go.uber.org/zap"
Expand Down Expand Up @@ -541,8 +539,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := protoClient.GetMembers(ctx, req, grpcutil.WithBackoffer(bo))
resp, err := protoClient.GetMembers(ctx, req)
if err = c.respForErr(metrics.CmdFailedDurationGetAllMembers, start, err, resp.GetHeader()); err != nil {
return nil, err
}
Expand Down Expand Up @@ -614,10 +611,9 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
return 0, 0, errs.ErrClientGetProtoClient
}

bo := retry.FromContext(ctx)
resp, err := protoClient.GetMinTS(ctx, &pdpb.GetMinTSRequest{
Header: c.requestHeader(),
}, grpcutil.WithBackoffer(bo))
})
if err != nil {
if strings.Contains(err.Error(), "Unimplemented") {
// If the method is not supported, we fallback to GetTS.
Expand Down Expand Up @@ -721,8 +717,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
return nil, errs.ErrClientGetProtoClient
}

bo := retry.FromContext(ctx)
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req, grpcutil.WithBackoffer(bo))
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
Expand Down Expand Up @@ -761,8 +756,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req, grpcutil.WithBackoffer(bo))
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
Expand Down Expand Up @@ -802,8 +796,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req, grpcutil.WithBackoffer(bo))
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
Expand Down Expand Up @@ -848,9 +841,8 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
//nolint:staticcheck
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).ScanRegions(cctx, req, grpcutil.WithBackoffer(bo))
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).ScanRegions(cctx, req)
failpoint.Inject("responseNil", func() {
resp = nil
})
Expand Down Expand Up @@ -905,8 +897,7 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).BatchScanRegions(cctx, req, grpcutil.WithBackoffer(bo))
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).BatchScanRegions(cctx, req)
failpoint.Inject("responseNil", func() {
resp = nil
})
Expand Down Expand Up @@ -990,8 +981,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := protoClient.GetStore(ctx, req, grpcutil.WithBackoffer(bo))
resp, err := protoClient.GetStore(ctx, req)

if err = c.respForErr(metrics.CmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand Down Expand Up @@ -1035,8 +1025,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) (
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := protoClient.GetAllStores(ctx, req, grpcutil.WithBackoffer(bo))
resp, err := protoClient.GetAllStores(ctx, req)

if err = c.respForErr(metrics.CmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand All @@ -1063,8 +1052,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
if protoClient == nil {
return 0, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := protoClient.UpdateGCSafePoint(ctx, req, grpcutil.WithBackoffer(bo))
resp, err := protoClient.UpdateGCSafePoint(ctx, req)

if err = c.respForErr(metrics.CmdFailedDurationUpdateGCSafePoint, start, err, resp.GetHeader()); err != nil {
return 0, err
Expand Down Expand Up @@ -1097,8 +1085,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
if protoClient == nil {
return 0, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := protoClient.UpdateServiceGCSafePoint(ctx, req, grpcutil.WithBackoffer(bo))
resp, err := protoClient.UpdateServiceGCSafePoint(ctx, req)

if err = c.respForErr(metrics.CmdFailedDurationUpdateServiceGCSafePoint, start, err, resp.GetHeader()); err != nil {
return 0, err
Expand Down Expand Up @@ -1130,8 +1117,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g
if protoClient == nil {
return errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := protoClient.ScatterRegion(ctx, req, grpcutil.WithBackoffer(bo))
resp, err := protoClient.ScatterRegion(ctx, req)
if err != nil {
return err
}
Expand Down Expand Up @@ -1175,8 +1161,7 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte,
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
return protoClient.SplitAndScatterRegions(ctx, req, grpcutil.WithBackoffer(bo))
return protoClient.SplitAndScatterRegions(ctx, req)
}

// GetOperator implements the RPCClient interface.
Expand All @@ -1198,8 +1183,7 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
return protoClient.GetOperator(ctx, req, grpcutil.WithBackoffer(bo))
return protoClient.GetOperator(ctx, req)
}

// SplitRegions split regions by given split keys
Expand All @@ -1225,8 +1209,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...o
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
return protoClient.SplitRegions(ctx, req, grpcutil.WithBackoffer(bo))
return protoClient.SplitRegions(ctx, req)
}

func (c *client) requestHeader() *pdpb.RequestHeader {
Expand Down Expand Up @@ -1258,8 +1241,7 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := protoClient.ScatterRegion(ctx, req, grpcutil.WithBackoffer(bo))
resp, err := protoClient.ScatterRegion(ctx, req)

if err != nil {
return nil, err
Expand All @@ -1278,8 +1260,7 @@ func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPat
if protoClient == nil {
return nil, 0, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := protoClient.LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{Names: names, ConfigPath: configPath}, grpcutil.WithBackoffer(bo))
resp, err := protoClient.LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{Names: names, ConfigPath: configPath})
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -1311,8 +1292,7 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items
if protoClient == nil {
return errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
_, err := protoClient.StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr, ConfigPath: configPath}, grpcutil.WithBackoffer(bo))
_, err := protoClient.StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr, ConfigPath: configPath})
if err != nil {
return err
}
Expand Down Expand Up @@ -1379,10 +1359,9 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
if protoClient == nil {
return 0, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := protoClient.GetExternalTimestamp(ctx, &pdpb.GetExternalTimestampRequest{
Header: c.requestHeader(),
}, grpcutil.WithBackoffer(bo))
})
if err != nil {
return 0, err
}
Expand All @@ -1401,11 +1380,10 @@ func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) err
if protoClient == nil {
return errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := protoClient.SetExternalTimestamp(ctx, &pdpb.SetExternalTimestampRequest{
Header: c.requestHeader(),
Timestamp: timestamp,
}, grpcutil.WithBackoffer(bo))
})
if err != nil {
return err
}
Expand Down
7 changes: 2 additions & 5 deletions client/meta_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/retry"
"github.com/tikv/pd/client/pkg/utils/grpcutil"
)

Expand Down Expand Up @@ -79,8 +78,7 @@ func (c *innerClient) Put(ctx context.Context, key, value []byte, opts ...opt.Me
cancel()
return nil, errs.ErrClientGetMetaStorageClient
}
bo := retry.FromContext(ctx)
resp, err := cli.Put(ctx, req, grpcutil.WithBackoffer(bo))
resp, err := cli.Put(ctx, req)
cancel()

if err = c.respForMetaStorageErr(metrics.CmdFailedDurationPut, start, err, resp.GetHeader()); err != nil {
Expand Down Expand Up @@ -119,8 +117,7 @@ func (c *innerClient) Get(ctx context.Context, key []byte, opts ...opt.MetaStora
cancel()
return nil, errs.ErrClientGetMetaStorageClient
}
bo := retry.FromContext(ctx)
resp, err := cli.Get(ctx, req, grpcutil.WithBackoffer(bo))
resp, err := cli.Get(ctx, req)
cancel()

if err = c.respForMetaStorageErr(metrics.CmdFailedDurationGet, start, err, resp.GetHeader()); err != nil {
Expand Down
24 changes: 2 additions & 22 deletions client/pkg/utils/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,10 @@ const (
FollowerHandleMetadataKey = "pd-allow-follower-handle"
)

// Add retry related CallOption
type retryCallOption struct {
grpc.EmptyCallOption
bo *retry.Backoffer
}

// WithBackoffer returns a CallOption that adds a backoffer to the call.
func WithBackoffer(bo *retry.Backoffer) grpc.CallOption {
return &retryCallOption{bo: bo}
}

func getBackofferFromCallOptions(opts []grpc.CallOption) *retry.Backoffer {
for _, opt := range opts {
if bo, ok := opt.(*retryCallOption); ok {
return bo.bo
}
}
return nil
}

// UnaryBackofferInterceptor is a gRPC interceptor that adds a backoffer to the call.
func UnaryBackofferInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
bo := getBackofferFromCallOptions(opts)
bo := retry.FromContext(ctx)
if bo == nil {
return invoker(ctx, method, req, reply, cc, opts...)
}
Expand Down Expand Up @@ -113,7 +93,7 @@ func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...g
return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause()
}

// Add retry interceptor
// Add backoffer interceptor
retryOpt := grpc.WithUnaryInterceptor(UnaryBackofferInterceptor())

// Add retry related connection parameters
Expand Down

0 comments on commit 642c989

Please sign in to comment.