Skip to content

Commit

Permalink
client: support configuring backoff (#8679)
Browse files Browse the repository at this point in the history
close #8047

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Dec 2, 2024
1 parent a86f3dd commit 29dfaf9
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 4 deletions.
9 changes: 9 additions & 0 deletions client/opt/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/pkg/retry"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -61,6 +62,7 @@ type Option struct {
UseTSOServerProxy bool
MetricsLabels prometheus.Labels
InitMetrics bool
Backoffer *retry.Backoffer

// Dynamic options.
dynamicOptions [dynamicOptionCount]atomic.Value
Expand Down Expand Up @@ -199,6 +201,13 @@ func WithInitMetricsOption(initMetrics bool) ClientOption {
}
}

// WithBackoffer configures the client with backoffer.
func WithBackoffer(bo *retry.Backoffer) ClientOption {
return func(op *Option) {
op.Backoffer = bo
}
}

// GetStoreOp represents available options when getting stores.
type GetStoreOp struct {
ExcludeTombstone bool
Expand Down
22 changes: 22 additions & 0 deletions client/pkg/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,25 @@ func getFunctionName(f any) string {
strs := strings.Split(runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), ".")
return strings.Split(strs[len(strs)-1], "-")[0]
}

// Define context key type
type boCtxKey struct{}

// Key used to store backoffer
var backofferKey = boCtxKey{}

// FromContext retrieves the backoffer from the context
func FromContext(ctx context.Context) *Backoffer {
if ctx == nil {
return nil
}
if bo, ok := ctx.Value(backofferKey).(*Backoffer); ok {
return bo
}
return nil
}

// WithBackoffer stores the backoffer into a new context
func WithBackoffer(ctx context.Context, bo *Backoffer) context.Context {
return context.WithValue(ctx, backofferKey, bo)
}
38 changes: 35 additions & 3 deletions client/pkg/utils/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/pkg/retry"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
Expand All @@ -41,6 +42,33 @@ const (
FollowerHandleMetadataKey = "pd-allow-follower-handle"
)

// UnaryBackofferInterceptor is a gRPC interceptor that adds a backoffer to the call.
func UnaryBackofferInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
bo := retry.FromContext(ctx)
if bo == nil {
return invoker(ctx, method, req, reply, cc, opts...)
}

// Copy a new backoffer
newBo := *bo
var lastErr error
err := newBo.Exec(ctx, func() error {
err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
lastErr = err
return err
}
return nil
})

if err != nil {
return lastErr
}
return nil
}
}

// GetClientConn returns a gRPC client connection.
// creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
Expand All @@ -64,8 +92,11 @@ func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...g
if err != nil {
return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause()
}
// Here we use a shorter MaxDelay to make the connection recover faster.
// The default MaxDelay is 120s, which is too long for us.

// Add backoffer interceptor
retryOpt := grpc.WithUnaryInterceptor(UnaryBackofferInterceptor())

// Add retry related connection parameters
backoffOpts := grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: time.Second,
Expand All @@ -74,7 +105,8 @@ func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...g
MaxDelay: 3 * time.Second,
},
})
do = append(do, opt, backoffOpts)

do = append(do, opt, retryOpt, backoffOpts)
cc, err := grpc.DialContext(ctx, u.Host, do...)
if err != nil {
return nil, errs.ErrGRPCDial.Wrap(err).GenWithStackByCause()
Expand Down
3 changes: 2 additions & 1 deletion client/servicediscovery/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,8 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
case <-ticker.C:
case <-c.checkMembershipCh:
}
if err := bo.Exec(ctx, c.updateMember); err != nil {
err := bo.Exec(ctx, c.updateMember)
if err != nil {
log.Error("[pd] failed to update member", zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err))
}
}
Expand Down
3 changes: 3 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,6 +1395,9 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error

// GetRegion implements gRPC PDServer.
func (s *GrpcServer) GetRegion(ctx context.Context, request *pdpb.GetRegionRequest) (*pdpb.GetRegionResponse, error) {
failpoint.Inject("rateLimit", func() {
failpoint.Return(nil, status.Error(codes.ResourceExhausted, errs.ErrRateLimitExceeded.Error()))
})
if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() {
fName := currentFunction()
limiter := s.GetGRPCRateLimiter()
Expand Down
53 changes: 53 additions & 0 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ import (
"github.com/tikv/pd/tests/integrations/mcs"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/goleak"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -1993,3 +1995,54 @@ func (suite *clientTestSuite) TestBatchScanRegions() {
return err != nil && strings.Contains(err.Error(), "found a hole region between")
})
}

func TestGetRegionWithBackoff(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/server/rateLimit", "return(true)"))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
defer cluster.Destroy()
endpoints := runServer(re, cluster)

// Define the backoff parameters
base := 100 * time.Millisecond
max := 500 * time.Millisecond
total := 3 * time.Second

// Create a backoff strategy
bo := retry.InitialBackoffer(base, max, total)
bo.SetRetryableChecker(needRetry, true)

// Initialize the client with context and backoff
client, err := pd.NewClientWithContext(ctx, caller.TestComponent, endpoints, pd.SecurityOption{})
re.NoError(err)

// Record the start time
start := time.Now()

ctx = retry.WithBackoffer(ctx, bo)
// Call GetRegion and expect it to handle backoff internally
_, err = client.GetRegion(ctx, []byte("key"))
re.Error(err)
// Calculate the elapsed time
elapsed := time.Since(start)
// Verify that some backoff occurred by checking if the elapsed time is greater than the base backoff
re.Greater(elapsed, total, "Expected some backoff to have occurred")

re.NoError(failpoint.Disable("github.com/tikv/pd/server/rateLimit"))
// Call GetRegion again and expect it to succeed
region, err := client.GetRegion(ctx, []byte("key"))
re.NoError(err)
re.Equal(uint64(2), region.Meta.Id) // Adjust this based on expected region
}

func needRetry(err error) bool {
st, ok := status.FromError(err)
if !ok {
return false
}
return st.Code() == codes.ResourceExhausted
}

0 comments on commit 29dfaf9

Please sign in to comment.