diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 84b9b40b6..18de47f70 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -549,6 +549,18 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { return c } +// only used fot test. +func newTestRegionCache() *RegionCache { + c := &RegionCache{} + c.storeMu.stores = make(map[uint64]*Store) + c.tiflashComputeStoreMu.needReload = true + c.tiflashComputeStoreMu.stores = make([]*Store, 0) + c.notifyCheckCh = make(chan struct{}, 1) + c.ctx, c.cancelFunc = context.WithCancel(context.Background()) + c.mu = *newRegionIndexMu(nil) + return c +} + // clear clears all cached data in the RegionCache. It's only used in tests. func (c *RegionCache) clear() { c.mu = *newRegionIndexMu(nil) @@ -558,8 +570,8 @@ func (c *RegionCache) clear() { } // thread unsafe, should use with lock -func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) { - c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion, shouldCount) +func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) bool { + return c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion, shouldCount) } // Close releases region cache's resource. @@ -1491,9 +1503,32 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin // It should be protected by c.mu.l.Lock(). // if `invalidateOldRegion` is false, the old region cache should be still valid, // and it may still be used by some kv requests. -func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) { - oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion) - if oldRegion != nil { +// Moreover, it will return false if the region is stale. +func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) bool { + newVer := cachedRegion.VerID() + oldVer, ok := mu.latestVersions[newVer.id] + // There are two or more situations in which the region we got is stale. + // The first case is that the process of getting a region is concurrent. + // The stale region may be returned later due to network reasons. + // The second case is that the region may be obtained from the PD follower, + // and there is the synchronization time between the pd follower and the leader. + // So we should check the epoch. + if ok && (oldVer.GetVer() > newVer.GetVer() || oldVer.GetConfVer() > newVer.GetConfVer()) { + logutil.BgLogger().Debug("get stale region", + zap.Uint64("region", newVer.GetID()), zap.Uint64("new-ver", newVer.GetVer()), zap.Uint64("new-conf", newVer.GetConfVer()), + zap.Uint64("old-ver", oldVer.GetVer()), zap.Uint64("old-conf", oldVer.GetConfVer())) + return false + } + // Also check and remove the intersecting regions including the old region. + intersectedRegions, stale := mu.sorted.removeIntersecting(cachedRegion, newVer) + if stale { + return false + } + // Insert the region (won't replace because of above deletion). + mu.sorted.ReplaceOrInsert(cachedRegion) + // Inherit the workTiKVIdx, workTiFlashIdx and buckets from the first intersected region. + if len(intersectedRegions) > 0 { + oldRegion := intersectedRegions[0].cachedRegion store := cachedRegion.getStore() oldRegionStore := oldRegion.getStore() // TODO(youjiali1995): remove this because the new retry logic can handle this issue. @@ -1506,15 +1541,6 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader { store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly)) } - // If the old region is still valid, do not invalidate it to avoid unnecessary backoff. - if invalidateOldRegion { - // Invalidate the old region in case it's not invalidated and some requests try with the stale region information. - if shouldCount { - oldRegion.invalidate(Other) - } else { - oldRegion.invalidateWithoutMetrics(Other) - } - } // Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which // is under transferring regions. store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load()) @@ -1523,21 +1549,27 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld if store.buckets == nil || (oldRegionStore.buckets != nil && store.buckets.GetVersion() < oldRegionStore.buckets.GetVersion()) { store.buckets = oldRegionStore.buckets } - mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id) - } - mu.regions[cachedRegion.VerID()] = cachedRegion - newVer := cachedRegion.VerID() - latest, ok := mu.latestVersions[cachedRegion.VerID().id] - if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() { - mu.latestVersions[cachedRegion.VerID().id] = newVer } // The intersecting regions in the cache are probably stale, clear them. - deleted := mu.sorted.removeIntersecting(cachedRegion) - for _, region := range deleted { + for _, region := range intersectedRegions { mu.removeVersionFromCache(region.cachedRegion.VerID(), region.cachedRegion.GetID()) + // If the old region is still valid, do not invalidate it to avoid unnecessary backoff. + if invalidateOldRegion { + // Invalidate the old region in case it's not invalidated and some requests try with the stale region information. + if shouldCount { + region.cachedRegion.invalidate(Other) + } else { + region.cachedRegion.invalidateWithoutMetrics(Other) + } + } } + // update related vars. + mu.regions[newVer] = cachedRegion + mu.latestVersions[newVer.id] = newVer + return true +} -} // searchCachedRegion finds a region from cache by key. Like `getCachedRegion`, +// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`, // it should be called with c.mu.RLock(), and the returned Region should not be // used after c.mu is RUnlock(). // If the given key is the end key of the region that you want, you may set the second argument to true. This is useful diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 91a7603d1..7c06b8b59 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -55,6 +55,7 @@ import ( "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/kv" pd "github.com/tikv/pd/client" + uatomic "go.uber.org/atomic" ) func TestRegionCache(t *testing.T) { @@ -1567,6 +1568,8 @@ func (s *testRegionCacheSuite) TestBuckets() { // update buckets if it's nil. cachedRegion.getStore().buckets = nil + // we should replace the version of `cacheRegion` because of stale. + s.cluster.PutRegion(r.GetId(), newMeta.RegionEpoch.ConfVer, newMeta.RegionEpoch.Version, []uint64{s.store1, s.store2}, []uint64{s.peer1, s.peer2}, s.peer1) s.cluster.SplitRegionBuckets(cachedRegion.GetID(), defaultBuckets.Keys, defaultBuckets.Version) s.cache.UpdateBucketsIfNeeded(cachedRegion.VerID(), defaultBuckets.GetVersion()) waitUpdateBuckets(defaultBuckets, []byte("a")) @@ -1833,3 +1836,192 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCacheConcurrency() { cancel() } + +func TestRegionCacheWithDelay(t *testing.T) { + suite.Run(t, new(testRegionCacheWithDelaySuite)) +} + +type testRegionCacheWithDelaySuite struct { + suite.Suite + mvccStore mocktikv.MVCCStore + cluster *mocktikv.Cluster + store uint64 // store1 is leader + region1 uint64 + bo *retry.Backoffer + + delay uatomic.Bool + delayCache *RegionCache + cache *RegionCache +} + +func (s *testRegionCacheWithDelaySuite) SetupTest() { + s.mvccStore = mocktikv.MustNewMVCCStore() + s.cluster = mocktikv.NewCluster(s.mvccStore) + storeIDs, _, regionID, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 1) + s.region1 = regionID + s.store = storeIDs[0] + pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)} + s.cache = NewRegionCache(pdCli) + pdCli2 := &CodecPDClient{mocktikv.NewPDClient(s.cluster, mocktikv.WithDelay(&s.delay)), apicodec.NewCodecV1(apicodec.ModeTxn)} + s.delayCache = NewRegionCache(pdCli2) + s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil) +} + +func (s *testRegionCacheWithDelaySuite) TearDownTest() { + s.cache.Close() + s.delayCache.Close() + s.mvccStore.Close() +} + +func (s *testRegionCacheWithDelaySuite) TestInsertStaleRegion() { + r, err := s.cache.findRegionByKey(s.bo, []byte("a"), false) + s.NoError(err) + fakeRegion := &Region{ + meta: r.meta, + syncFlag: r.syncFlag, + lastAccess: r.lastAccess, + invalidReason: r.invalidReason, + } + fakeRegion.setStore(r.getStore().clone()) + + newPeersIDs := s.cluster.AllocIDs(1) + s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("c"), newPeersIDs, newPeersIDs[0]) + newPeersIDs = s.cluster.AllocIDs(1) + s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0]) + + r.invalidate(Other) + r2, err := s.cache.findRegionByKey(s.bo, []byte("c"), false) + s.NoError(err) + s.Equal([]byte("c"), r2.StartKey()) + r2, err = s.cache.findRegionByKey(s.bo, []byte("b"), false) + s.NoError(err) + s.Equal([]byte("b"), r2.StartKey()) + + stale := !s.cache.insertRegionToCache(fakeRegion, true, true) + s.True(stale) + + rs, err := s.cache.scanRegionsFromCache(s.bo, []byte(""), []byte(""), 100) + s.NoError(err) + s.Greater(len(rs), 1) + s.NotEqual(rs[0].EndKey(), "") + + r3, err := s.cache.findRegionByKey(s.bo, []byte("a"), false) + s.NoError(err) + s.Equal([]byte("b"), r3.EndKey()) +} + +func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() { + r1, err := s.cache.findRegionByKey(s.bo, []byte("a"), false) + s.NoError(err) + r2, err := s.delayCache.findRegionByKey(s.bo, []byte("a"), false) + s.NoError(err) + s.Equal(r1.meta, r2.meta) + + // simulates network delay + s.delay.Store(true) + var wg sync.WaitGroup + wg.Add(1) + go func() { + r2.invalidate(Other) + _, err := s.delayCache.findRegionByKey(s.bo, []byte("b"), false) + s.NoError(err) + wg.Done() + }() + time.Sleep(30 * time.Millisecond) + newPeersIDs := s.cluster.AllocIDs(1) + s.cluster.Split(r1.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0]) + r1.invalidate(Other) + r, err := s.cache.findRegionByKey(s.bo, []byte("b"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.StartKey) + r, err = s.cache.findRegionByKey(s.bo, []byte("c"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.StartKey) + + s.delay.Store(false) + r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.StartKey) + wg.Wait() + // the delay response is received, but insert failed. + r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.StartKey) + r, err = s.delayCache.findRegionByKey(s.bo, []byte("a"), false) + s.NoError(err) + s.Equal([]byte("b"), r.meta.EndKey) +} + +func generateKeyForSimulator(id int, keyLen int) []byte { + k := make([]byte, keyLen) + copy(k, fmt.Sprintf("%010d", id)) + return k +} + +func BenchmarkInsertRegionToCache(b *testing.B) { + b.StopTimer() + cache := newTestRegionCache() + r := &Region{ + meta: &metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{}, + }, + } + rs := ®ionStore{ + workTiKVIdx: 0, + proxyTiKVIdx: -1, + stores: make([]*Store, 0, len(r.meta.Peers)), + pendingTiFlashPeerStores: map[uint64]uint64{}, + storeEpochs: make([]uint32, 0, len(r.meta.Peers)), + } + r.setStore(rs) + b.StartTimer() + for i := 0; i < b.N; i++ { + newMeta := proto.Clone(r.meta).(*metapb.Region) + newMeta.Id = uint64(i + 1) + newMeta.RegionEpoch.ConfVer = uint64(i+1) - uint64(rand.Intn(i+1)) + newMeta.RegionEpoch.Version = uint64(i+1) - uint64(rand.Intn(i+1)) + if i%2 == 0 { + newMeta.StartKey = generateKeyForSimulator(rand.Intn(i+1), 56) + newMeta.EndKey = []byte("") + } else { + newMeta.EndKey = generateKeyForSimulator(rand.Intn(i+1), 56) + newMeta.StartKey = []byte("") + } + region := &Region{ + meta: newMeta, + } + region.setStore(r.getStore()) + cache.insertRegionToCache(region, true, true) + } +} + +func BenchmarkInsertRegionToCache2(b *testing.B) { + b.StopTimer() + cache := newTestRegionCache() + r := &Region{ + meta: &metapb.Region{ + Id: 1, + RegionEpoch: &metapb.RegionEpoch{}, + }, + } + rs := ®ionStore{ + workTiKVIdx: 0, + proxyTiKVIdx: -1, + stores: make([]*Store, 0, len(r.meta.Peers)), + pendingTiFlashPeerStores: map[uint64]uint64{}, + storeEpochs: make([]uint32, 0, len(r.meta.Peers)), + } + r.setStore(rs) + b.StartTimer() + for i := 0; i < b.N; i++ { + newMeta := proto.Clone(r.meta).(*metapb.Region) + newMeta.RegionEpoch.ConfVer = uint64(i + 1) + newMeta.RegionEpoch.Version = uint64(i + 1) + region := &Region{ + meta: newMeta, + } + region.setStore(r.getStore()) + cache.insertRegionToCache(region, true, true) + } +} diff --git a/internal/locate/sorted_btree.go b/internal/locate/sorted_btree.go index 6165f7614..f718aba3d 100644 --- a/internal/locate/sorted_btree.go +++ b/internal/locate/sorted_btree.go @@ -38,6 +38,8 @@ import ( "bytes" "github.com/google/btree" + "github.com/tikv/client-go/v2/internal/logutil" + "go.uber.org/zap" ) // SortedRegions is a sorted btree. @@ -93,12 +95,16 @@ func (s *SortedRegions) AscendGreaterOrEqual(startKey, endKey []byte, limit int) // removeIntersecting removes all items that have intersection with the key range of given region. // If the region itself is in the cache, it's not removed. -func (s *SortedRegions) removeIntersecting(r *Region) []*btreeItem { +func (s *SortedRegions) removeIntersecting(r *Region, verID RegionVerID) ([]*btreeItem, bool) { var deleted []*btreeItem + var stale bool s.b.AscendGreaterOrEqual(newBtreeSearchItem(r.StartKey()), func(item *btreeItem) bool { - // Skip the item that is equal to the given region. - if item.cachedRegion.VerID() == r.VerID() { - return true + if item.cachedRegion.meta.GetRegionEpoch().GetVersion() > verID.ver { + logutil.BgLogger().Debug("get stale region", + zap.Uint64("region", verID.GetID()), zap.Uint64("ver", verID.GetVer()), zap.Uint64("conf", verID.GetConfVer()), + zap.Uint64("intersecting-ver", item.cachedRegion.meta.GetRegionEpoch().GetVersion())) + stale = true + return false } if len(r.EndKey()) > 0 && bytes.Compare(item.cachedRegion.StartKey(), r.EndKey()) >= 0 { return false @@ -106,10 +112,13 @@ func (s *SortedRegions) removeIntersecting(r *Region) []*btreeItem { deleted = append(deleted, item) return true }) + if stale { + return nil, true + } for _, item := range deleted { s.b.Delete(item) } - return deleted + return deleted, false } // Clear removes all items from the btree. diff --git a/internal/mockstore/mocktikv/cluster.go b/internal/mockstore/mocktikv/cluster.go index e46b02d3d..7b5f1cc3f 100644 --- a/internal/mockstore/mocktikv/cluster.go +++ b/internal/mockstore/mocktikv/cluster.go @@ -413,6 +413,14 @@ func (c *Cluster) Bootstrap(regionID uint64, storeIDs, peerIDs []uint64, leaderP c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID) } +// PutRegion adds or replaces a region. +func (c *Cluster) PutRegion(regionID, confVer, ver uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) { + c.Lock() + defer c.Unlock() + + c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID, confVer, ver) +} + // AddPeer adds a new Peer for the Region on the Store. func (c *Cluster) AddPeer(regionID, storeID, peerID uint64) { c.Lock() @@ -634,7 +642,7 @@ func newPeerMeta(peerID, storeID uint64) *metapb.Peer { } } -func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) *Region { +func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64, epoch ...uint64) *Region { if len(storeIDs) != len(peerIDs) { panic("len(storeIDs) != len(peerIds)") } @@ -647,6 +655,10 @@ func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) Peers: peers, RegionEpoch: &metapb.RegionEpoch{}, } + if len(epoch) == 2 { + meta.RegionEpoch.ConfVer = epoch[0] + meta.RegionEpoch.Version = epoch[1] + } return &Region{ Meta: meta, leader: leaderPeerID, diff --git a/internal/mockstore/mocktikv/pd.go b/internal/mockstore/mocktikv/pd.go index 833b914c0..b2b01aa41 100644 --- a/internal/mockstore/mocktikv/pd.go +++ b/internal/mockstore/mocktikv/pd.go @@ -61,6 +61,16 @@ var tsMu = struct { const defaultResourceGroupName = "default" +var _ pd.Client = (*pdClient)(nil) + +type MockPDOption func(*pdClient) + +func WithDelay(delay *atomic.Bool) MockPDOption { + return func(pc *pdClient) { + pc.delay = delay + } +} + type pdClient struct { cluster *Cluster // SafePoint set by `UpdateGCSafePoint`. Not to be confused with SafePointKV. @@ -73,11 +83,13 @@ type pdClient struct { externalTimestamp atomic.Uint64 groups map[string]*rmpb.ResourceGroup + + delay *atomic.Bool } // NewPDClient creates a mock pd.Client that uses local timestamp and meta data // from a Cluster. -func NewPDClient(cluster *Cluster) pd.Client { +func NewPDClient(cluster *Cluster, ops ...MockPDOption) *pdClient { mockCli := &pdClient{ cluster: cluster, serviceSafePoints: make(map[string]uint64), @@ -97,6 +109,9 @@ func NewPDClient(cluster *Cluster) pd.Client { }, Priority: 8, } + for _, op := range ops { + op(mockCli) + } return mockCli } @@ -206,6 +221,12 @@ func (c *pdClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegi if len(opts) == 0 { buckets = nil } + if c.delay != nil && c.delay.Load() { + select { + case <-ctx.Done(): + case <-time.After(200 * time.Millisecond): + } + } return &pd.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil }