Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

region_cache: check epoch before insert #1079

Merged
merged 10 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 56 additions & 24 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,18 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
return c
}

// only used fot test.
func newTestRegionCache() *RegionCache {
c := &RegionCache{}
c.storeMu.stores = make(map[uint64]*Store)
c.tiflashComputeStoreMu.needReload = true
c.tiflashComputeStoreMu.stores = make([]*Store, 0)
c.notifyCheckCh = make(chan struct{}, 1)
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
c.mu = *newRegionIndexMu(nil)
return c
}

// clear clears all cached data in the RegionCache. It's only used in tests.
func (c *RegionCache) clear() {
c.mu = *newRegionIndexMu(nil)
Expand All @@ -558,8 +570,8 @@ func (c *RegionCache) clear() {
}

// thread unsafe, should use with lock
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) {
c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion, shouldCount)
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) bool {
return c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion, shouldCount)
}

// Close releases region cache's resource.
Expand Down Expand Up @@ -1491,9 +1503,32 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin
// It should be protected by c.mu.l.Lock().
// if `invalidateOldRegion` is false, the old region cache should be still valid,
// and it may still be used by some kv requests.
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) {
oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion)
if oldRegion != nil {
// Moreover, it will return false if the region is stale.
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) bool {
disksing marked this conversation as resolved.
Show resolved Hide resolved
newVer := cachedRegion.VerID()
oldVer, ok := mu.latestVersions[newVer.id]
// There are two or more situations in which the region we got is stale.
// The first case is that the process of getting a region is concurrent.
// The stale region may be returned later due to network reasons.
// The second case is that the region may be obtained from the PD follower,
// and there is the synchronization time between the pd follower and the leader.
// So we should check the epoch.
if ok && (oldVer.GetVer() > newVer.GetVer() || oldVer.GetConfVer() > newVer.GetConfVer()) {
logutil.BgLogger().Debug("get stale region",
zap.Uint64("region", newVer.GetID()), zap.Uint64("new-ver", newVer.GetVer()), zap.Uint64("new-conf", newVer.GetConfVer()),
zap.Uint64("old-ver", oldVer.GetVer()), zap.Uint64("old-conf", oldVer.GetConfVer()))
return false
}
// Also check and remove the intersecting regions including the old region.
intersectedRegions, stale := mu.sorted.removeIntersecting(cachedRegion, newVer)
if stale {
return false
}
// Insert the region (won't replace because of above deletion).
mu.sorted.ReplaceOrInsert(cachedRegion)
// Inherit the workTiKVIdx, workTiFlashIdx and buckets from the first intersected region.
if len(intersectedRegions) > 0 {
oldRegion := intersectedRegions[0].cachedRegion
store := cachedRegion.getStore()
oldRegionStore := oldRegion.getStore()
// TODO(youjiali1995): remove this because the new retry logic can handle this issue.
Expand All @@ -1506,15 +1541,6 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly))
}
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
if invalidateOldRegion {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
if shouldCount {
oldRegion.invalidate(Other)
} else {
oldRegion.invalidateWithoutMetrics(Other)
}
}
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
// is under transferring regions.
store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load())
Expand All @@ -1523,21 +1549,27 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
if store.buckets == nil || (oldRegionStore.buckets != nil && store.buckets.GetVersion() < oldRegionStore.buckets.GetVersion()) {
store.buckets = oldRegionStore.buckets
}
mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id)
}
mu.regions[cachedRegion.VerID()] = cachedRegion
newVer := cachedRegion.VerID()
latest, ok := mu.latestVersions[cachedRegion.VerID().id]
if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() {
mu.latestVersions[cachedRegion.VerID().id] = newVer
}
// The intersecting regions in the cache are probably stale, clear them.
deleted := mu.sorted.removeIntersecting(cachedRegion)
for _, region := range deleted {
for _, region := range intersectedRegions {
mu.removeVersionFromCache(region.cachedRegion.VerID(), region.cachedRegion.GetID())
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
if invalidateOldRegion {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
if shouldCount {
region.cachedRegion.invalidate(Other)
} else {
region.cachedRegion.invalidateWithoutMetrics(Other)
}
}
}
// update related vars.
mu.regions[newVer] = cachedRegion
mu.latestVersions[newVer.id] = newVer
return true
}

} // searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
// it should be called with c.mu.RLock(), and the returned Region should not be
// used after c.mu is RUnlock().
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
Expand Down
192 changes: 192 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/kv"
pd "github.com/tikv/pd/client"
uatomic "go.uber.org/atomic"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uatomic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because sync/atomic is also imported.

)

func TestRegionCache(t *testing.T) {
Expand Down Expand Up @@ -1567,6 +1568,8 @@ func (s *testRegionCacheSuite) TestBuckets() {

// update buckets if it's nil.
cachedRegion.getStore().buckets = nil
// we should replace the version of `cacheRegion` because of stale.
s.cluster.PutRegion(r.GetId(), newMeta.RegionEpoch.ConfVer, newMeta.RegionEpoch.Version, []uint64{s.store1, s.store2}, []uint64{s.peer1, s.peer2}, s.peer1)
s.cluster.SplitRegionBuckets(cachedRegion.GetID(), defaultBuckets.Keys, defaultBuckets.Version)
s.cache.UpdateBucketsIfNeeded(cachedRegion.VerID(), defaultBuckets.GetVersion())
waitUpdateBuckets(defaultBuckets, []byte("a"))
Expand Down Expand Up @@ -1833,3 +1836,192 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCacheConcurrency() {

cancel()
}

func TestRegionCacheWithDelay(t *testing.T) {
suite.Run(t, new(testRegionCacheWithDelaySuite))
}

type testRegionCacheWithDelaySuite struct {
suite.Suite
mvccStore mocktikv.MVCCStore
cluster *mocktikv.Cluster
store uint64 // store1 is leader
region1 uint64
bo *retry.Backoffer

delay uatomic.Bool
delayCache *RegionCache
cache *RegionCache
}

func (s *testRegionCacheWithDelaySuite) SetupTest() {
s.mvccStore = mocktikv.MustNewMVCCStore()
s.cluster = mocktikv.NewCluster(s.mvccStore)
storeIDs, _, regionID, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 1)
s.region1 = regionID
s.store = storeIDs[0]
pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)}
s.cache = NewRegionCache(pdCli)
pdCli2 := &CodecPDClient{mocktikv.NewPDClient(s.cluster, mocktikv.WithDelay(&s.delay)), apicodec.NewCodecV1(apicodec.ModeTxn)}
s.delayCache = NewRegionCache(pdCli2)
s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil)
}

func (s *testRegionCacheWithDelaySuite) TearDownTest() {
s.cache.Close()
s.delayCache.Close()
s.mvccStore.Close()
}

func (s *testRegionCacheWithDelaySuite) TestInsertStaleRegion() {
r, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
fakeRegion := &Region{
meta: r.meta,
syncFlag: r.syncFlag,
lastAccess: r.lastAccess,
invalidReason: r.invalidReason,
}
fakeRegion.setStore(r.getStore().clone())

newPeersIDs := s.cluster.AllocIDs(1)
s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("c"), newPeersIDs, newPeersIDs[0])
newPeersIDs = s.cluster.AllocIDs(1)
s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0])

r.invalidate(Other)
r2, err := s.cache.findRegionByKey(s.bo, []byte("c"), false)
s.NoError(err)
s.Equal([]byte("c"), r2.StartKey())
r2, err = s.cache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r2.StartKey())

stale := !s.cache.insertRegionToCache(fakeRegion, true, true)
s.True(stale)

rs, err := s.cache.scanRegionsFromCache(s.bo, []byte(""), []byte(""), 100)
s.NoError(err)
s.Greater(len(rs), 1)
s.NotEqual(rs[0].EndKey(), "")

r3, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
s.Equal([]byte("b"), r3.EndKey())
}

func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() {
r1, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
r2, err := s.delayCache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
s.Equal(r1.meta, r2.meta)

// simulates network delay
s.delay.Store(true)
var wg sync.WaitGroup
wg.Add(1)
go func() {
r2.invalidate(Other)
_, err := s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
wg.Done()
}()
time.Sleep(30 * time.Millisecond)
newPeersIDs := s.cluster.AllocIDs(1)
s.cluster.Split(r1.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0])
r1.invalidate(Other)
r, err := s.cache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)
r, err = s.cache.findRegionByKey(s.bo, []byte("c"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)

s.delay.Store(false)
r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)
wg.Wait()
// the delay response is received, but insert failed.
r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)
r, err = s.delayCache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.EndKey)
}

func generateKeyForSimulator(id int, keyLen int) []byte {
k := make([]byte, keyLen)
copy(k, fmt.Sprintf("%010d", id))
return k
}

func BenchmarkInsertRegionToCache(b *testing.B) {
b.StopTimer()
cache := newTestRegionCache()
r := &Region{
meta: &metapb.Region{
Id: 1,
RegionEpoch: &metapb.RegionEpoch{},
},
}
rs := &regionStore{
workTiKVIdx: 0,
proxyTiKVIdx: -1,
stores: make([]*Store, 0, len(r.meta.Peers)),
pendingTiFlashPeerStores: map[uint64]uint64{},
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
}
r.setStore(rs)
b.StartTimer()
for i := 0; i < b.N; i++ {
newMeta := proto.Clone(r.meta).(*metapb.Region)
newMeta.Id = uint64(i + 1)
newMeta.RegionEpoch.ConfVer = uint64(i+1) - uint64(rand.Intn(i+1))
newMeta.RegionEpoch.Version = uint64(i+1) - uint64(rand.Intn(i+1))
if i%2 == 0 {
newMeta.StartKey = generateKeyForSimulator(rand.Intn(i+1), 56)
newMeta.EndKey = []byte("")
} else {
newMeta.EndKey = generateKeyForSimulator(rand.Intn(i+1), 56)
newMeta.StartKey = []byte("")
}
region := &Region{
meta: newMeta,
}
region.setStore(r.getStore())
cache.insertRegionToCache(region, true, true)
}
}

func BenchmarkInsertRegionToCache2(b *testing.B) {
b.StopTimer()
cache := newTestRegionCache()
r := &Region{
meta: &metapb.Region{
Id: 1,
RegionEpoch: &metapb.RegionEpoch{},
},
}
rs := &regionStore{
workTiKVIdx: 0,
proxyTiKVIdx: -1,
stores: make([]*Store, 0, len(r.meta.Peers)),
pendingTiFlashPeerStores: map[uint64]uint64{},
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
}
r.setStore(rs)
b.StartTimer()
for i := 0; i < b.N; i++ {
newMeta := proto.Clone(r.meta).(*metapb.Region)
newMeta.RegionEpoch.ConfVer = uint64(i + 1)
newMeta.RegionEpoch.Version = uint64(i + 1)
region := &Region{
meta: newMeta,
}
region.setStore(r.getStore())
cache.insertRegionToCache(region, true, true)
}
}
19 changes: 14 additions & 5 deletions internal/locate/sorted_btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"bytes"

"github.com/google/btree"
"github.com/tikv/client-go/v2/internal/logutil"
"go.uber.org/zap"
)

// SortedRegions is a sorted btree.
Expand Down Expand Up @@ -93,23 +95,30 @@ func (s *SortedRegions) AscendGreaterOrEqual(startKey, endKey []byte, limit int)

// removeIntersecting removes all items that have intersection with the key range of given region.
// If the region itself is in the cache, it's not removed.
func (s *SortedRegions) removeIntersecting(r *Region) []*btreeItem {
func (s *SortedRegions) removeIntersecting(r *Region, verID RegionVerID) ([]*btreeItem, bool) {
var deleted []*btreeItem
var stale bool
s.b.AscendGreaterOrEqual(newBtreeSearchItem(r.StartKey()), func(item *btreeItem) bool {
// Skip the item that is equal to the given region.
if item.cachedRegion.VerID() == r.VerID() {
return true
if item.cachedRegion.meta.GetRegionEpoch().GetVersion() > verID.ver {
logutil.BgLogger().Debug("get stale region",
zap.Uint64("region", verID.GetID()), zap.Uint64("ver", verID.GetVer()), zap.Uint64("conf", verID.GetConfVer()),
zap.Uint64("intersecting-ver", item.cachedRegion.meta.GetRegionEpoch().GetVersion()))
stale = true
return false
}
if len(r.EndKey()) > 0 && bytes.Compare(item.cachedRegion.StartKey(), r.EndKey()) >= 0 {
return false
}
deleted = append(deleted, item)
return true
})
if stale {
return nil, true
}
for _, item := range deleted {
s.b.Delete(item)
}
return deleted
return deleted, false
}

// Clear removes all items from the btree.
Expand Down
Loading
Loading