From f2da0cdd836782f8ed3bd9b25fac19e94e5b047c Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 9 Aug 2023 11:32:19 +0800 Subject: [PATCH] handle mismatch peer id Signed-off-by: you06 --- internal/locate/region_request.go | 14 +++++++ internal/locate/region_request3_test.go | 49 +++++++++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index eca28b872..47fac7b07 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1882,6 +1882,8 @@ func regionErrorToLabel(e *errorpb.Error) string { return "peer_is_witness" } else if isDeadlineExceeded(e) { return "deadline_exceeded" + } else if e.GetMismatchPeerId() != nil { + return "mismatch_peer_id" } return "unknown" } @@ -2173,6 +2175,18 @@ func (s *RegionRequestSender) onRegionError( s.replicaSelector.onDeadlineExceeded() } + if mismatch := regionErr.GetMismatchPeerId(); mismatch != nil { + logutil.Logger(bo.GetCtx()).Warn( + "tikv reports `MismatchPeerId`, invalidate region cache", + zap.Uint64("req peer id", mismatch.GetRequestPeerId()), + zap.Uint64("store peer id", mismatch.GetStorePeerId()), + ) + if s.replicaSelector != nil { + s.replicaSelector.invalidateRegion() + } + return false, nil + } + logutil.Logger(bo.GetCtx()).Debug( "tikv reports region failed", zap.Stringer("regionErr", regionErr), diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index bae33b7cf..28bc43e87 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1376,3 +1376,52 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() { } } } + +func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderRegionError() { + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(regionLoc) + + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + select { + case <-ctx.Done(): + return nil, errors.New("timeout") + default: + } + // Return `mismatch peer id` when accesses the leader. + if addr == s.cluster.GetStore(s.storeIDs[0]).Address { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + MismatchPeerId: &errorpb.MismatchPeerId{ + RequestPeerId: 1, + StorePeerId: 2, + }, + }}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + DataIsNotReady: &errorpb.DataIsNotReady{}, + }}}, nil + }} + + region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID()) + s.True(region.isValid()) + + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + req.ReplicaReadType = kv.ReplicaReadFollower + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + bo := retry.NewBackoffer(ctx, -1) + s.Nil(err) + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV) + s.Nil(err) + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.Equal(regionErrorToLabel(regionErr), "mismatch_peer_id") + // return non-epoch-not-match region error and the upper layer can auto retry. + s.Nil(regionErr.GetEpochNotMatch()) + // after region error returned, the region should be invalidated. + s.False(region.isValid()) +}