diff --git a/go.mod b/go.mod index b2bd664da13f3..c4b668a81dcf1 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e + github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 github.com/pingcap/parser v0.0.0-20210513020953-ae2c4497c07b github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 diff --git a/go.sum b/go.sum index c42d129ae5b8b..212917e414404 100644 --- a/go.sum +++ b/go.sum @@ -436,8 +436,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e h1:oUMZ6X/Kpaoxfejh9/jQ+4UZ5xk9MRYcouWJ0oXRKNE= -github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c h1:cy87vgUJT0U4JuxC7R14PuwBrabI9fDawYhyKTbjOBQ= +github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 4bec370a9a4d5..bf8d1b7c893e0 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -406,6 +406,20 @@ func (b *batchCopIterator) handleBatchCopResponse(bo *backoffer, response *copro return errors.Trace(err) } + if len(response.RetryRegions) > 0 { + logutil.BgLogger().Info("multiple regions are stale and need to be refreshed", zap.Int("region size", len(response.RetryRegions))) + for idx, retry := range response.RetryRegions { + id := tikv.NewRegionVerID(retry.Id, retry.RegionEpoch.ConfVer, retry.RegionEpoch.Version) + logutil.BgLogger().Info("invalid region because tiflash detected stale region", zap.String("region id", id.String())) + b.store.GetRegionCache().InvalidateCachedRegionWithReason(id, tikv.EpochNotMatch) + if idx >= 10 { + logutil.BgLogger().Info("stale regions are too many, so we omit the rest ones") + break + } + } + return + } + resp := batchCopResponse{ pbResp: response, detail: new(CopRuntimeStats), diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go index f571ff4fe963f..adf3049330897 100644 --- a/store/mockstore/unistore/tikv/server.go +++ b/store/mockstore/unistore/tikv/server.go @@ -845,6 +845,11 @@ func (svr *Server) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpc return &kvrpcpb.ReadIndexResponse{}, nil } +// GetLockWaitInfo implements implements the tikvpb.TikvServer interface. +func (svr *Server) GetLockWaitInfo(ctx context.Context, _ *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) { + return &kvrpcpb.GetLockWaitInfoResponse{}, nil +} + // transaction debugger commands. // MvccGetByKey implements implements the tikvpb.TikvServer interface. diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index 9c5172e52f372..8d531ee209a78 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -462,6 +462,10 @@ func (s *mockTikvGrpcServer) SplitRegion(context.Context, *kvrpcpb.SplitRegionRe return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) GetLockWaitInfo(context.Context, *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) { + return nil, errors.New("unreachable") +} + func (s *mockTikvGrpcServer) CoprocessorStream(*coprocessor.Request, tikvpb.Tikv_CoprocessorStreamServer) error { return errors.New("unreachable") }