From 8e50b49a77f166be4a028ebc3ac6b5488678c4e6 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Fri, 29 Dec 2023 17:42:28 +0800 Subject: [PATCH] *: follower support to handle `GetRegion` and other region api (#7432) ref tikv/pd#7431 Signed-off-by: Cabinfever_B Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Signed-off-by: pingandb --- client/client.go | 87 ++++++++++---- client/pd_service_discovery.go | 4 +- pkg/utils/grpcutil/grpcutil.go | 15 ++- server/forward.go | 6 +- server/grpc_service.go | 144 ++++++++++++++++++----- tests/integrations/client/client_test.go | 93 +++++++++++++++ 6 files changed, 295 insertions(+), 54 deletions(-) diff --git a/client/client.go b/client/client.go index 4e03e5e3507d..df153608c05e 100644 --- a/client/client.go +++ b/client/client.go @@ -298,7 +298,7 @@ func (k *serviceModeKeeper) close() { type client struct { keyspaceID uint32 svrUrls []string - pdSvcDiscovery ServiceDiscovery + pdSvcDiscovery *pdServiceDiscovery tokenDispatcher *tokenDispatcher // For service mode switching. @@ -503,7 +503,7 @@ func newClientWithKeyspaceName( return err } // c.keyspaceID is the source of truth for keyspace id. - c.pdSvcDiscovery.(*pdServiceDiscovery).SetKeyspaceID(c.keyspaceID) + c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID) return nil } @@ -733,6 +733,23 @@ func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, contex return pdpb.NewPDClient(serviceClient.GetClientConn()), serviceClient.BuildGRPCTargetContext(ctx, true) } +// getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns +// follower pd client and the context which holds forward information. +func (c *client) getRegionAPIClientAndContext(ctx context.Context, allowFollower bool) (ServiceClient, context.Context) { + var serviceClient ServiceClient + if allowFollower { + serviceClient = c.pdSvcDiscovery.getServiceClientByKind(regionAPIKind) + if serviceClient != nil { + return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower) + } + } + serviceClient = c.pdSvcDiscovery.GetServiceClient() + if serviceClient == nil { + return nil, ctx + } + return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower) +} + func (c *client) GetTSAsync(ctx context.Context) TSFuture { return c.GetLocalTSAsync(ctx, globalDCLocation) } @@ -885,6 +902,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt start := time.Now() defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + defer cancel() options := &GetRegionOp{} for _, opt := range opts { @@ -895,13 +913,18 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt RegionKey: key, NeedBuckets: options.needBuckets, } - protoClient, ctx := c.getClientAndContext(ctx) - if protoClient == nil { - cancel() + serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle()) + if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } - resp, err := protoClient.GetRegion(ctx, req) - cancel() + resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req) + if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) { + protoClient, cctx := c.getClientAndContext(ctx) + if protoClient == nil { + return nil, errs.ErrClientGetProtoClient + } + resp, err = protoClient.GetRegion(cctx, req) + } if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil { return nil, err @@ -917,6 +940,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio start := time.Now() defer func() { cmdDurationGetPrevRegion.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + defer cancel() options := &GetRegionOp{} for _, opt := range opts { @@ -927,13 +951,18 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio RegionKey: key, NeedBuckets: options.needBuckets, } - protoClient, ctx := c.getClientAndContext(ctx) - if protoClient == nil { - cancel() + serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle()) + if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } - resp, err := protoClient.GetPrevRegion(ctx, req) - cancel() + resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req) + if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) { + protoClient, cctx := c.getClientAndContext(ctx) + if protoClient == nil { + return nil, errs.ErrClientGetProtoClient + } + resp, err = protoClient.GetPrevRegion(cctx, req) + } if err = c.respForErr(cmdFailDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil { return nil, err @@ -949,6 +978,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get start := time.Now() defer func() { cmdDurationGetRegionByID.Observe(time.Since(start).Seconds()) }() ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + defer cancel() options := &GetRegionOp{} for _, opt := range opts { @@ -959,13 +989,18 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get RegionId: regionID, NeedBuckets: options.needBuckets, } - protoClient, ctx := c.getClientAndContext(ctx) - if protoClient == nil { - cancel() + serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle()) + if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } - resp, err := protoClient.GetRegionByID(ctx, req) - cancel() + resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req) + if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) { + protoClient, cctx := c.getClientAndContext(ctx) + if protoClient == nil { + return nil, errs.ErrClientGetProtoClient + } + resp, err = protoClient.GetRegionByID(cctx, req) + } if err = c.respForErr(cmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil { return nil, err @@ -987,18 +1022,28 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, scanCtx, cancel = context.WithTimeout(ctx, c.option.timeout) defer cancel() } + options := &GetRegionOp{} + for _, opt := range opts { + opt(options) + } req := &pdpb.ScanRegionsRequest{ Header: c.requestHeader(), StartKey: key, EndKey: endKey, Limit: int32(limit), } - protoClient, scanCtx := c.getClientAndContext(scanCtx) - if protoClient == nil { - cancel() + serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.allowFollowerHandle && c.option.getEnableFollowerHandle()) + if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } - resp, err := protoClient.ScanRegions(scanCtx, req) + resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).ScanRegions(cctx, req) + if !serviceClient.IsConnectedToLeader() && err != nil || resp.Header.GetError() != nil { + protoClient, cctx := c.getClientAndContext(scanCtx) + if protoClient == nil { + return nil, errs.ErrClientGetProtoClient + } + resp, err = protoClient.ScanRegions(cctx, req) + } if err = c.respForErr(cmdFailedDurationScanRegions, start, err, resp.GetHeader()); err != nil { return nil, err diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index f9bdf888b9f3..efef2940adff 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -53,6 +53,7 @@ type apiKind int const ( forwardAPIKind apiKind = iota + regionAPIKind apiKindCount ) @@ -445,7 +446,7 @@ func newPDServiceDiscovery( ctx: ctx, cancel: cancel, wg: wg, - apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn)}, + apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn), newPDServiceBalancer(regionAPIErrorFn)}, serviceModeUpdateCb: serviceModeUpdateCb, updateKeyspaceIDCb: updateKeyspaceIDCb, keyspaceID: keyspaceID, @@ -563,6 +564,7 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() { } } } + func (c *pdServiceDiscovery) memberHealthCheckLoop() { defer c.wg.Done() diff --git a/pkg/utils/grpcutil/grpcutil.go b/pkg/utils/grpcutil/grpcutil.go index 0030551d0fc9..1bfb64868f34 100644 --- a/pkg/utils/grpcutil/grpcutil.go +++ b/pkg/utils/grpcutil/grpcutil.go @@ -38,6 +38,8 @@ import ( const ( // ForwardMetadataKey is used to record the forwarded host of PD. ForwardMetadataKey = "pd-forwarded-host" + // FollowerHandleMetadataKey is used to mark the permit of follower handle. + FollowerHandleMetadataKey = "pd-allow-follower-handle" ) // TLSConfig is the configuration for supporting tls. @@ -173,7 +175,7 @@ func ResetForwardContext(ctx context.Context) context.Context { func GetForwardedHost(ctx context.Context) string { md, ok := metadata.FromIncomingContext(ctx) if !ok { - log.Debug("failed to get forwarding metadata") + log.Debug("failed to get gRPC incoming metadata when getting forwarded host") return "" } if t, ok := md[ForwardMetadataKey]; ok { @@ -182,6 +184,17 @@ func GetForwardedHost(ctx context.Context) string { return "" } +// IsFollowerHandleEnabled returns the follower host in metadata. +func IsFollowerHandleEnabled(ctx context.Context) bool { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + log.Debug("failed to get gRPC incoming metadata when checking follower handle is enabled") + return false + } + _, ok = md[FollowerHandleMetadataKey] + return ok +} + func establish(ctx context.Context, addr string, tlsConfig *TLSConfig, do ...grpc.DialOption) (*grpc.ClientConn, error) { tlsCfg, err := tlsConfig.ToTLSConfig() if err != nil { diff --git a/server/forward.go b/server/forward.go index e765d442539e..65750fcd4be9 100644 --- a/server/forward.go +++ b/server/forward.go @@ -384,16 +384,16 @@ func (s *GrpcServer) getForwardedHost(ctx, streamCtx context.Context, serviceNam return forwardedHost, nil } -func (s *GrpcServer) isLocalRequest(forwardedHost string) bool { +func (s *GrpcServer) isLocalRequest(host string) bool { failpoint.Inject("useForwardRequest", func() { failpoint.Return(false) }) - if forwardedHost == "" { + if host == "" { return true } memberAddrs := s.GetMember().Member().GetClientUrls() for _, addr := range memberAddrs { - if addr == forwardedHost { + if addr == host { return true } } diff --git a/server/grpc_service.go b/server/grpc_service.go index 14eea9442c70..4174570379f8 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -77,6 +77,7 @@ var ( ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded") ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server") ErrEtcdNotStarted = status.Errorf(codes.Unavailable, "server is started, but etcd not started") + ErrFollowerHandlingNotAllowed = status.Errorf(codes.Unavailable, "not leader and follower handling not allowed") ) var ( @@ -234,6 +235,11 @@ type request interface { type forwardFn func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) func (s *GrpcServer) unaryMiddleware(ctx context.Context, req request, fn forwardFn) (rsp interface{}, err error) { + return s.unaryFollowerMiddleware(ctx, req, fn, nil) +} + +// unaryFollowerMiddleware adds the check of followers enable compared to unaryMiddleware. +func (s *GrpcServer) unaryFollowerMiddleware(ctx context.Context, req request, fn forwardFn, allowFollower *bool) (rsp interface{}, err error) { failpoint.Inject("customTimeout", func() { time.Sleep(5 * time.Second) }) @@ -246,7 +252,7 @@ func (s *GrpcServer) unaryMiddleware(ctx context.Context, req request, fn forwar ctx = grpcutil.ResetForwardContext(ctx) return fn(ctx, client) } - if err := s.validateRequest(req.GetHeader()); err != nil { + if err := s.validateRoleInRequest(ctx, req.GetHeader(), allowFollower); err != nil { return nil, err } return nil, nil @@ -1390,22 +1396,39 @@ func (s *GrpcServer) GetRegion(ctx context.Context, request *pdpb.GetRegionReque fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).GetRegion(ctx, request) } - if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil { + followerHandle := new(bool) + if rsp, err := s.unaryFollowerMiddleware(ctx, request, fn, followerHandle); err != nil { return nil, err } else if rsp != nil { return rsp.(*pdpb.GetRegionResponse), nil } - - rc := s.GetRaftCluster() - if rc == nil { - return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil - } - region := rc.GetRegionByKey(request.GetRegionKey()) - if region == nil { - return &pdpb.GetRegionResponse{Header: s.header()}, nil + var rc *cluster.RaftCluster + var region *core.RegionInfo + if *followerHandle { + rc = s.cluster + if !rc.GetRegionSyncer().IsRunning() { + return &pdpb.GetRegionResponse{Header: s.regionNotFound()}, nil + } + region = rc.GetRegionByKey(request.GetRegionKey()) + if region == nil { + log.Warn("follower get region nil", zap.String("key", string(request.GetRegionKey()))) + return &pdpb.GetRegionResponse{Header: s.regionNotFound()}, nil + } + } else { + rc = s.GetRaftCluster() + if rc == nil { + return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil + } + region = rc.GetRegionByKey(request.GetRegionKey()) + if region == nil { + log.Warn("leader get region nil", zap.String("key", string(request.GetRegionKey()))) + return &pdpb.GetRegionResponse{Header: s.header()}, nil + } } + var buckets *metapb.Buckets - if rc.GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets() { + // FIXME: If the bucket is disabled dynamically, the bucket information is returned unexpectedly + if !*followerHandle && rc.GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets() { buckets = region.GetBuckets() } return &pdpb.GetRegionResponse{ @@ -1434,23 +1457,37 @@ func (s *GrpcServer) GetPrevRegion(ctx context.Context, request *pdpb.GetRegionR fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).GetPrevRegion(ctx, request) } - if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil { + followerHandle := new(bool) + if rsp, err := s.unaryFollowerMiddleware(ctx, request, fn, followerHandle); err != nil { return nil, err } else if rsp != nil { return rsp.(*pdpb.GetRegionResponse), err } - rc := s.GetRaftCluster() - if rc == nil { - return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil + var rc *cluster.RaftCluster + if *followerHandle { + // no need to check running status + rc = s.cluster + if !rc.GetRegionSyncer().IsRunning() { + return &pdpb.GetRegionResponse{Header: s.regionNotFound()}, nil + } + } else { + rc = s.GetRaftCluster() + if rc == nil { + return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil + } } region := rc.GetPrevRegionByKey(request.GetRegionKey()) if region == nil { + if *followerHandle { + return &pdpb.GetRegionResponse{Header: s.regionNotFound()}, nil + } return &pdpb.GetRegionResponse{Header: s.header()}, nil } var buckets *metapb.Buckets - if rc.GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets() { + // FIXME: If the bucket is disabled dynamically, the bucket information is returned unexpectedly + if !*followerHandle && rc.GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets() { buckets = region.GetBuckets() } return &pdpb.GetRegionResponse{ @@ -1479,22 +1516,39 @@ func (s *GrpcServer) GetRegionByID(ctx context.Context, request *pdpb.GetRegionB fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).GetRegionByID(ctx, request) } - if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil { + followerHandle := new(bool) + if rsp, err := s.unaryFollowerMiddleware(ctx, request, fn, followerHandle); err != nil { return nil, err } else if rsp != nil { return rsp.(*pdpb.GetRegionResponse), err } - rc := s.GetRaftCluster() - if rc == nil { - return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil + var rc *cluster.RaftCluster + if *followerHandle { + rc = s.cluster + if !rc.GetRegionSyncer().IsRunning() { + return &pdpb.GetRegionResponse{Header: s.regionNotFound()}, nil + } + } else { + rc = s.GetRaftCluster() + if rc == nil { + return &pdpb.GetRegionResponse{Header: s.regionNotFound()}, nil + } } region := rc.GetRegion(request.GetRegionId()) + failpoint.Inject("followerHandleError", func() { + if *followerHandle { + region = nil + } + }) if region == nil { + if *followerHandle { + return &pdpb.GetRegionResponse{Header: s.regionNotFound()}, nil + } return &pdpb.GetRegionResponse{Header: s.header()}, nil } var buckets *metapb.Buckets - if rc.GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets() { + if !*followerHandle && rc.GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets() { buckets = region.GetBuckets() } return &pdpb.GetRegionResponse{ @@ -1523,17 +1577,29 @@ func (s *GrpcServer) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsR fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { return pdpb.NewPDClient(client).ScanRegions(ctx, request) } - if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil { + followerHandle := new(bool) + if rsp, err := s.unaryFollowerMiddleware(ctx, request, fn, followerHandle); err != nil { return nil, err } else if rsp != nil { return rsp.(*pdpb.ScanRegionsResponse), nil } - rc := s.GetRaftCluster() - if rc == nil { - return &pdpb.ScanRegionsResponse{Header: s.notBootstrappedHeader()}, nil + var rc *cluster.RaftCluster + if *followerHandle { + rc = s.cluster + if !rc.GetRegionSyncer().IsRunning() { + return &pdpb.ScanRegionsResponse{Header: s.regionNotFound()}, nil + } + } else { + rc = s.GetRaftCluster() + if rc == nil { + return &pdpb.ScanRegionsResponse{Header: s.notBootstrappedHeader()}, nil + } } regions := rc.ScanRegions(request.GetStartKey(), request.GetEndKey(), int(request.GetLimit())) + if *followerHandle && len(regions) == 0 { + return &pdpb.ScanRegionsResponse{Header: s.regionNotFound()}, nil + } resp := &pdpb.ScanRegionsResponse{Header: s.header()} for _, r := range regions { leader := r.GetLeader() @@ -2183,10 +2249,25 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR } // validateRequest checks if Server is leader and clusterID is matched. -// TODO: Call it in gRPC interceptor. func (s *GrpcServer) validateRequest(header *pdpb.RequestHeader) error { - if s.IsClosed() || !s.member.IsLeader() { - return ErrNotLeader + return s.validateRoleInRequest(context.TODO(), header, nil) +} + +// validateRoleInRequest checks if Server is leader when disallow follower-handle and clusterID is matched. +// TODO: Call it in gRPC interceptor. +func (s *GrpcServer) validateRoleInRequest(ctx context.Context, header *pdpb.RequestHeader, allowFollower *bool) error { + if s.IsClosed() { + return ErrNotStarted + } + if !s.member.IsLeader() { + if allowFollower == nil { + return ErrNotLeader + } + if !grpcutil.IsFollowerHandleEnabled(ctx) { + // TODO: change the error code + return ErrFollowerHandlingNotAllowed + } + *allowFollower = true } if header.GetClusterId() != s.clusterID { return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, header.GetClusterId()) @@ -2237,6 +2318,13 @@ func (s *GrpcServer) invalidValue(msg string) *pdpb.ResponseHeader { }) } +func (s *GrpcServer) regionNotFound() *pdpb.ResponseHeader { + return s.errorHeader(&pdpb.Error{ + Type: pdpb.ErrorType_REGION_NOT_FOUND, + Message: "region not found", + }) +} + func (s *GrpcServer) convertHeader(header *schedulingpb.ResponseHeader) *pdpb.ResponseHeader { switch header.GetError().GetType() { case schedulingpb.ErrorType_UNKNOWN: diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index e1e841342ad8..ed394f6fad4b 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -716,6 +716,99 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoAndRegionByFollowerFor }) } +func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { + re := suite.Require() + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + + cluster := suite.cluster + cli := setupCli(re, ctx, suite.endpoints) + cli.UpdateOption(pd.EnableFollowerHandle, true) + re.NotEmpty(cluster.WaitLeader()) + leader := cluster.GetLeaderServer() + testutil.Eventually(re, func() bool { + ret := true + for _, s := range cluster.GetServers() { + if s.IsLeader() { + continue + } + if !s.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunning() { + ret = false + } + } + return ret + }) + // follower have no region + cnt := 0 + for i := 0; i < 100; i++ { + resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + if err == nil && resp != nil { + cnt++ + } + re.Equal(resp.Meta.Id, suite.regionID) + } + re.Equal(100, cnt) + + // because we can't check whether this request is processed by followers from response, + // we can disable forward and make network problem for leader. + re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", fmt.Sprintf("return(\"%s\")", leader.GetAddr()))) + time.Sleep(150 * time.Millisecond) + cnt = 0 + for i := 0; i < 100; i++ { + resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + if err == nil && resp != nil { + cnt++ + } + re.Equal(resp.Meta.Id, suite.regionID) + } + re.Equal(100, cnt) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork1")) + + // make network problem for follower. + follower := cluster.GetServer(cluster.GetFollower()) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", fmt.Sprintf("return(\"%s\")", follower.GetAddr()))) + time.Sleep(100 * time.Millisecond) + cnt = 0 + for i := 0; i < 100; i++ { + resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + if err == nil && resp != nil { + cnt++ + } + re.Equal(resp.Meta.Id, suite.regionID) + } + re.Equal(100, cnt) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork1")) + + // follower client failed will retry by leader service client. + re.NoError(failpoint.Enable("github.com/tikv/pd/server/followerHandleError", "return(true)")) + cnt = 0 + for i := 0; i < 100; i++ { + resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + if err == nil && resp != nil { + cnt++ + } + re.Equal(resp.Meta.Id, suite.regionID) + } + re.Equal(100, cnt) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/followerHandleError")) + + // test after being healthy + re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", fmt.Sprintf("return(\"%s\")", leader.GetAddr()))) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/fastCheckAvailable", "return(true)")) + time.Sleep(100 * time.Millisecond) + cnt = 0 + for i := 0; i < 100; i++ { + resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + if err == nil && resp != nil { + cnt++ + } + re.Equal(resp.Meta.Id, suite.regionID) + } + re.Equal(100, cnt) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork1")) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastCheckAvailable")) +} + func checkTS(re *require.Assertions, cli pd.Client, lastTS uint64) uint64 { for i := 0; i < tsoRequestRound; i++ { physical, logical, err := cli.GetTS(context.TODO())