Skip to content

Commit

Permalink
region_cache: check epoch before insert (#1079)
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <[email protected]>
Co-authored-by: disksing <[email protected]>
  • Loading branch information
CabinfeverB and disksing authored Dec 20, 2023
1 parent 4ce1e45 commit 85ca0a4
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 31 deletions.
80 changes: 56 additions & 24 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,18 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
return c
}

// only used fot test.
func newTestRegionCache() *RegionCache {
c := &RegionCache{}
c.storeMu.stores = make(map[uint64]*Store)
c.tiflashComputeStoreMu.needReload = true
c.tiflashComputeStoreMu.stores = make([]*Store, 0)
c.notifyCheckCh = make(chan struct{}, 1)
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
c.mu = *newRegionIndexMu(nil)
return c
}

// clear clears all cached data in the RegionCache. It's only used in tests.
func (c *RegionCache) clear() {
c.mu = *newRegionIndexMu(nil)
Expand All @@ -558,8 +570,8 @@ func (c *RegionCache) clear() {
}

// thread unsafe, should use with lock
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) {
c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion, shouldCount)
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) bool {
return c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion, shouldCount)
}

// Close releases region cache's resource.
Expand Down Expand Up @@ -1491,9 +1503,32 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin
// It should be protected by c.mu.l.Lock().
// if `invalidateOldRegion` is false, the old region cache should be still valid,
// and it may still be used by some kv requests.
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) {
oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion)
if oldRegion != nil {
// Moreover, it will return false if the region is stale.
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) bool {
newVer := cachedRegion.VerID()
oldVer, ok := mu.latestVersions[newVer.id]
// There are two or more situations in which the region we got is stale.
// The first case is that the process of getting a region is concurrent.
// The stale region may be returned later due to network reasons.
// The second case is that the region may be obtained from the PD follower,
// and there is the synchronization time between the pd follower and the leader.
// So we should check the epoch.
if ok && (oldVer.GetVer() > newVer.GetVer() || oldVer.GetConfVer() > newVer.GetConfVer()) {
logutil.BgLogger().Debug("get stale region",
zap.Uint64("region", newVer.GetID()), zap.Uint64("new-ver", newVer.GetVer()), zap.Uint64("new-conf", newVer.GetConfVer()),
zap.Uint64("old-ver", oldVer.GetVer()), zap.Uint64("old-conf", oldVer.GetConfVer()))
return false
}
// Also check and remove the intersecting regions including the old region.
intersectedRegions, stale := mu.sorted.removeIntersecting(cachedRegion, newVer)
if stale {
return false
}
// Insert the region (won't replace because of above deletion).
mu.sorted.ReplaceOrInsert(cachedRegion)
// Inherit the workTiKVIdx, workTiFlashIdx and buckets from the first intersected region.
if len(intersectedRegions) > 0 {
oldRegion := intersectedRegions[0].cachedRegion
store := cachedRegion.getStore()
oldRegionStore := oldRegion.getStore()
// TODO(youjiali1995): remove this because the new retry logic can handle this issue.
Expand All @@ -1506,15 +1541,6 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly))
}
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
if invalidateOldRegion {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
if shouldCount {
oldRegion.invalidate(Other)
} else {
oldRegion.invalidateWithoutMetrics(Other)
}
}
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
// is under transferring regions.
store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load())
Expand All @@ -1523,21 +1549,27 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
if store.buckets == nil || (oldRegionStore.buckets != nil && store.buckets.GetVersion() < oldRegionStore.buckets.GetVersion()) {
store.buckets = oldRegionStore.buckets
}
mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id)
}
mu.regions[cachedRegion.VerID()] = cachedRegion
newVer := cachedRegion.VerID()
latest, ok := mu.latestVersions[cachedRegion.VerID().id]
if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() {
mu.latestVersions[cachedRegion.VerID().id] = newVer
}
// The intersecting regions in the cache are probably stale, clear them.
deleted := mu.sorted.removeIntersecting(cachedRegion)
for _, region := range deleted {
for _, region := range intersectedRegions {
mu.removeVersionFromCache(region.cachedRegion.VerID(), region.cachedRegion.GetID())
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
if invalidateOldRegion {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
if shouldCount {
region.cachedRegion.invalidate(Other)
} else {
region.cachedRegion.invalidateWithoutMetrics(Other)
}
}
}
// update related vars.
mu.regions[newVer] = cachedRegion
mu.latestVersions[newVer.id] = newVer
return true
}

} // searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
// it should be called with c.mu.RLock(), and the returned Region should not be
// used after c.mu is RUnlock().
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
Expand Down
192 changes: 192 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/kv"
pd "github.com/tikv/pd/client"
uatomic "go.uber.org/atomic"
)

func TestRegionCache(t *testing.T) {
Expand Down Expand Up @@ -1567,6 +1568,8 @@ func (s *testRegionCacheSuite) TestBuckets() {

// update buckets if it's nil.
cachedRegion.getStore().buckets = nil
// we should replace the version of `cacheRegion` because of stale.
s.cluster.PutRegion(r.GetId(), newMeta.RegionEpoch.ConfVer, newMeta.RegionEpoch.Version, []uint64{s.store1, s.store2}, []uint64{s.peer1, s.peer2}, s.peer1)
s.cluster.SplitRegionBuckets(cachedRegion.GetID(), defaultBuckets.Keys, defaultBuckets.Version)
s.cache.UpdateBucketsIfNeeded(cachedRegion.VerID(), defaultBuckets.GetVersion())
waitUpdateBuckets(defaultBuckets, []byte("a"))
Expand Down Expand Up @@ -1833,3 +1836,192 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCacheConcurrency() {

cancel()
}

func TestRegionCacheWithDelay(t *testing.T) {
suite.Run(t, new(testRegionCacheWithDelaySuite))
}

type testRegionCacheWithDelaySuite struct {
suite.Suite
mvccStore mocktikv.MVCCStore
cluster *mocktikv.Cluster
store uint64 // store1 is leader
region1 uint64
bo *retry.Backoffer

delay uatomic.Bool
delayCache *RegionCache
cache *RegionCache
}

func (s *testRegionCacheWithDelaySuite) SetupTest() {
s.mvccStore = mocktikv.MustNewMVCCStore()
s.cluster = mocktikv.NewCluster(s.mvccStore)
storeIDs, _, regionID, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 1)
s.region1 = regionID
s.store = storeIDs[0]
pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)}
s.cache = NewRegionCache(pdCli)
pdCli2 := &CodecPDClient{mocktikv.NewPDClient(s.cluster, mocktikv.WithDelay(&s.delay)), apicodec.NewCodecV1(apicodec.ModeTxn)}
s.delayCache = NewRegionCache(pdCli2)
s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil)
}

func (s *testRegionCacheWithDelaySuite) TearDownTest() {
s.cache.Close()
s.delayCache.Close()
s.mvccStore.Close()
}

func (s *testRegionCacheWithDelaySuite) TestInsertStaleRegion() {
r, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
fakeRegion := &Region{
meta: r.meta,
syncFlag: r.syncFlag,
lastAccess: r.lastAccess,
invalidReason: r.invalidReason,
}
fakeRegion.setStore(r.getStore().clone())

newPeersIDs := s.cluster.AllocIDs(1)
s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("c"), newPeersIDs, newPeersIDs[0])
newPeersIDs = s.cluster.AllocIDs(1)
s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0])

r.invalidate(Other)
r2, err := s.cache.findRegionByKey(s.bo, []byte("c"), false)
s.NoError(err)
s.Equal([]byte("c"), r2.StartKey())
r2, err = s.cache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r2.StartKey())

stale := !s.cache.insertRegionToCache(fakeRegion, true, true)
s.True(stale)

rs, err := s.cache.scanRegionsFromCache(s.bo, []byte(""), []byte(""), 100)
s.NoError(err)
s.Greater(len(rs), 1)
s.NotEqual(rs[0].EndKey(), "")

r3, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
s.Equal([]byte("b"), r3.EndKey())
}

func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() {
r1, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
r2, err := s.delayCache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
s.Equal(r1.meta, r2.meta)

// simulates network delay
s.delay.Store(true)
var wg sync.WaitGroup
wg.Add(1)
go func() {
r2.invalidate(Other)
_, err := s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
wg.Done()
}()
time.Sleep(30 * time.Millisecond)
newPeersIDs := s.cluster.AllocIDs(1)
s.cluster.Split(r1.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0])
r1.invalidate(Other)
r, err := s.cache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)
r, err = s.cache.findRegionByKey(s.bo, []byte("c"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)

s.delay.Store(false)
r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)
wg.Wait()
// the delay response is received, but insert failed.
r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)
r, err = s.delayCache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.EndKey)
}

func generateKeyForSimulator(id int, keyLen int) []byte {
k := make([]byte, keyLen)
copy(k, fmt.Sprintf("%010d", id))
return k
}

func BenchmarkInsertRegionToCache(b *testing.B) {
b.StopTimer()
cache := newTestRegionCache()
r := &Region{
meta: &metapb.Region{
Id: 1,
RegionEpoch: &metapb.RegionEpoch{},
},
}
rs := &regionStore{
workTiKVIdx: 0,
proxyTiKVIdx: -1,
stores: make([]*Store, 0, len(r.meta.Peers)),
pendingTiFlashPeerStores: map[uint64]uint64{},
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
}
r.setStore(rs)
b.StartTimer()
for i := 0; i < b.N; i++ {
newMeta := proto.Clone(r.meta).(*metapb.Region)
newMeta.Id = uint64(i + 1)
newMeta.RegionEpoch.ConfVer = uint64(i+1) - uint64(rand.Intn(i+1))
newMeta.RegionEpoch.Version = uint64(i+1) - uint64(rand.Intn(i+1))
if i%2 == 0 {
newMeta.StartKey = generateKeyForSimulator(rand.Intn(i+1), 56)
newMeta.EndKey = []byte("")
} else {
newMeta.EndKey = generateKeyForSimulator(rand.Intn(i+1), 56)
newMeta.StartKey = []byte("")
}
region := &Region{
meta: newMeta,
}
region.setStore(r.getStore())
cache.insertRegionToCache(region, true, true)
}
}

func BenchmarkInsertRegionToCache2(b *testing.B) {
b.StopTimer()
cache := newTestRegionCache()
r := &Region{
meta: &metapb.Region{
Id: 1,
RegionEpoch: &metapb.RegionEpoch{},
},
}
rs := &regionStore{
workTiKVIdx: 0,
proxyTiKVIdx: -1,
stores: make([]*Store, 0, len(r.meta.Peers)),
pendingTiFlashPeerStores: map[uint64]uint64{},
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
}
r.setStore(rs)
b.StartTimer()
for i := 0; i < b.N; i++ {
newMeta := proto.Clone(r.meta).(*metapb.Region)
newMeta.RegionEpoch.ConfVer = uint64(i + 1)
newMeta.RegionEpoch.Version = uint64(i + 1)
region := &Region{
meta: newMeta,
}
region.setStore(r.getStore())
cache.insertRegionToCache(region, true, true)
}
}
19 changes: 14 additions & 5 deletions internal/locate/sorted_btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"bytes"

"github.com/google/btree"
"github.com/tikv/client-go/v2/internal/logutil"
"go.uber.org/zap"
)

// SortedRegions is a sorted btree.
Expand Down Expand Up @@ -93,23 +95,30 @@ func (s *SortedRegions) AscendGreaterOrEqual(startKey, endKey []byte, limit int)

// removeIntersecting removes all items that have intersection with the key range of given region.
// If the region itself is in the cache, it's not removed.
func (s *SortedRegions) removeIntersecting(r *Region) []*btreeItem {
func (s *SortedRegions) removeIntersecting(r *Region, verID RegionVerID) ([]*btreeItem, bool) {
var deleted []*btreeItem
var stale bool
s.b.AscendGreaterOrEqual(newBtreeSearchItem(r.StartKey()), func(item *btreeItem) bool {
// Skip the item that is equal to the given region.
if item.cachedRegion.VerID() == r.VerID() {
return true
if item.cachedRegion.meta.GetRegionEpoch().GetVersion() > verID.ver {
logutil.BgLogger().Debug("get stale region",
zap.Uint64("region", verID.GetID()), zap.Uint64("ver", verID.GetVer()), zap.Uint64("conf", verID.GetConfVer()),
zap.Uint64("intersecting-ver", item.cachedRegion.meta.GetRegionEpoch().GetVersion()))
stale = true
return false
}
if len(r.EndKey()) > 0 && bytes.Compare(item.cachedRegion.StartKey(), r.EndKey()) >= 0 {
return false
}
deleted = append(deleted, item)
return true
})
if stale {
return nil, true
}
for _, item := range deleted {
s.b.Delete(item)
}
return deleted
return deleted, false
}

// Clear removes all items from the btree.
Expand Down
Loading

0 comments on commit 85ca0a4

Please sign in to comment.