Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

region_cache: check epoch before insert #1079

Merged
merged 10 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 44 additions & 9 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,18 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
return c
}

// only used fot test.
func newTestRegionCache() *RegionCache {
c := &RegionCache{}
c.storeMu.stores = make(map[uint64]*Store)
c.tiflashComputeStoreMu.needReload = true
c.tiflashComputeStoreMu.stores = make([]*Store, 0)
c.notifyCheckCh = make(chan struct{}, 1)
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
c.mu = *newRegionIndexMu(nil)
return c
}

// clear clears all cached data in the RegionCache. It's only used in tests.
func (c *RegionCache) clear() {
c.mu = *newRegionIndexMu(nil)
Expand All @@ -552,8 +564,8 @@ func (c *RegionCache) clear() {
}

// thread unsafe, should use with lock
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion)
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) bool {
return c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion)
}

// Close releases region cache's resource.
Expand Down Expand Up @@ -1485,7 +1497,27 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin
// It should be protected by c.mu.l.Lock().
// if `invalidateOldRegion` is false, the old region cache should be still valid,
// and it may still be used by some kv requests.
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
// Moreover, it will return whether the region is stale.
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) bool {
newVer := cachedRegion.VerID()
latest, ok := mu.latestVersions[cachedRegion.VerID().id]
// There are two or more situations in which the region we got is stale.
// The first case is that the process of getting a region is concurrent.
// The stale region may be returned later due to network reasons.
// The second case is that the region may be obtained from the PD follower,
// and there is the synchronization time between the pd follower and the leader.
// So we should check the epoch.
if ok && (latest.GetVer() > newVer.GetVer() || latest.GetConfVer() > newVer.GetConfVer()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming is confused, which one is newer?

logutil.BgLogger().Debug("get stale region",
zap.Uint64("region", newVer.GetID()), zap.Uint64("ver", newVer.GetVer()), zap.Uint64("conf", newVer.GetConfVer()),
zap.Uint64("lastest-ver", latest.GetVer()), zap.Uint64("lastest-conf", latest.GetConfVer()))
return true
}
// Also check the intersecting regions.
intersectedRegions, stale := mu.sorted.removeIntersecting(cachedRegion, &newVer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the region is not stale, will the intersecting regions be removed first?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we will use lock first. So it seems no affect.

if stale {
return true
}
oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion)
if oldRegion != nil {
store := cachedRegion.getStore()
Expand Down Expand Up @@ -1513,21 +1545,24 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
if store.buckets == nil || (oldRegionStore.buckets != nil && store.buckets.GetVersion() < oldRegionStore.buckets.GetVersion()) {
store.buckets = oldRegionStore.buckets
}
mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id)
// Only delete when IDs are different, because we will update right away.
if cachedRegion.VerID().id != oldRegion.VerID().id {
mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id)
}
}
// update related vars.
mu.regions[cachedRegion.VerID()] = cachedRegion
newVer := cachedRegion.VerID()
latest, ok := mu.latestVersions[cachedRegion.VerID().id]
if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() {
mu.latestVersions[cachedRegion.VerID().id] = newVer
}
// The intersecting regions in the cache are probably stale, clear them.
deleted := mu.sorted.removeIntersecting(cachedRegion)
for _, region := range deleted {
for _, region := range intersectedRegions {
mu.removeVersionFromCache(region.cachedRegion.VerID(), region.cachedRegion.GetID())
}
return false
}

} // searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
// it should be called with c.mu.RLock(), and the returned Region should not be
// used after c.mu is RUnlock().
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
Expand Down
192 changes: 192 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/kv"
pd "github.com/tikv/pd/client"
uatomic "go.uber.org/atomic"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uatomic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because sync/atomic is also imported.

)

func TestRegionCache(t *testing.T) {
Expand Down Expand Up @@ -1567,6 +1568,8 @@ func (s *testRegionCacheSuite) TestBuckets() {

// update buckets if it's nil.
cachedRegion.getStore().buckets = nil
// we should replace the version of `cacheRegion` because of stale.
s.cluster.PutRegion(r.GetId(), newMeta.RegionEpoch.ConfVer, newMeta.RegionEpoch.Version, []uint64{s.store1, s.store2}, []uint64{s.peer1, s.peer2}, s.peer1)
s.cluster.SplitRegionBuckets(cachedRegion.GetID(), defaultBuckets.Keys, defaultBuckets.Version)
s.cache.UpdateBucketsIfNeeded(cachedRegion.VerID(), defaultBuckets.GetVersion())
waitUpdateBuckets(defaultBuckets, []byte("a"))
Expand Down Expand Up @@ -1833,3 +1836,192 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCacheConcurrency() {

cancel()
}

func TestRegionCacheWithDelay(t *testing.T) {
suite.Run(t, new(testRegionCacheWithDelaySuite))
}

type testRegionCacheWithDelaySuite struct {
suite.Suite
mvccStore mocktikv.MVCCStore
cluster *mocktikv.Cluster
store uint64 // store1 is leader
region1 uint64
bo *retry.Backoffer

delay uatomic.Bool
delayCache *RegionCache
cache *RegionCache
}

func (s *testRegionCacheWithDelaySuite) SetupTest() {
s.mvccStore = mocktikv.MustNewMVCCStore()
s.cluster = mocktikv.NewCluster(s.mvccStore)
storeIDs, _, regionID, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 1)
s.region1 = regionID
s.store = storeIDs[0]
pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)}
s.cache = NewRegionCache(pdCli)
pdCli2 := &CodecPDClient{mocktikv.NewPDClient(s.cluster, mocktikv.WithDelay(&s.delay)), apicodec.NewCodecV1(apicodec.ModeTxn)}
s.delayCache = NewRegionCache(pdCli2)
s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil)
}

func (s *testRegionCacheWithDelaySuite) TearDownTest() {
s.cache.Close()
s.delayCache.Close()
s.mvccStore.Close()
}

func (s *testRegionCacheWithDelaySuite) TestInsertStaleRegion() {
r, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
fakeRegion := &Region{
meta: r.meta,
syncFlag: r.syncFlag,
lastAccess: r.lastAccess,
invalidReason: r.invalidReason,
}
fakeRegion.setStore(r.getStore().clone())

newPeersIDs := s.cluster.AllocIDs(1)
s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("c"), newPeersIDs, newPeersIDs[0])
newPeersIDs = s.cluster.AllocIDs(1)
s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0])

r.invalidate(Other)
r2, err := s.cache.findRegionByKey(s.bo, []byte("c"), false)
s.NoError(err)
s.Equal([]byte("c"), r2.StartKey())
r2, err = s.cache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r2.StartKey())

stale := s.cache.insertRegionToCache(fakeRegion, true)
s.True(stale)

rs, err := s.cache.scanRegionsFromCache(s.bo, []byte(""), []byte(""), 100)
s.NoError(err)
s.Greater(len(rs), 1)
s.NotEqual(rs[0].EndKey(), "")

r3, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
s.Equal([]byte("b"), r3.EndKey())
}

func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() {
r1, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
r2, err := s.delayCache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
s.Equal(r1.meta, r2.meta)

// simulates network delay
s.delay.Store(true)
var wg sync.WaitGroup
wg.Add(1)
go func() {
r2.invalidate(Other)
_, err := s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
wg.Done()
}()
time.Sleep(30 * time.Millisecond)
newPeersIDs := s.cluster.AllocIDs(1)
s.cluster.Split(r1.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0])
r1.invalidate(Other)
r, err := s.cache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)
r, err = s.cache.findRegionByKey(s.bo, []byte("c"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)

s.delay.Store(false)
r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)
wg.Wait()
// the delay response is received, but insert failed.
r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.StartKey)
r, err = s.delayCache.findRegionByKey(s.bo, []byte("a"), false)
s.NoError(err)
s.Equal([]byte("b"), r.meta.EndKey)
}

func generateKeyForSimulator(id int, keyLen int) []byte {
k := make([]byte, keyLen)
copy(k, fmt.Sprintf("%010d", id))
return k
}

func BenchmarkInsertRegionToCache(b *testing.B) {
b.StopTimer()
cache := newTestRegionCache()
r := &Region{
meta: &metapb.Region{
Id: 1,
RegionEpoch: &metapb.RegionEpoch{},
},
}
rs := &regionStore{
workTiKVIdx: 0,
proxyTiKVIdx: -1,
stores: make([]*Store, 0, len(r.meta.Peers)),
pendingTiFlashPeerStores: map[uint64]uint64{},
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
}
r.setStore(rs)
b.StartTimer()
for i := 0; i < b.N; i++ {
newMeta := proto.Clone(r.meta).(*metapb.Region)
newMeta.Id = uint64(i + 1)
newMeta.RegionEpoch.ConfVer = uint64(i+1) - uint64(rand.Intn(i+1))
newMeta.RegionEpoch.Version = uint64(i+1) - uint64(rand.Intn(i+1))
if i%2 == 0 {
newMeta.StartKey = generateKeyForSimulator(rand.Intn(i+1), 56)
newMeta.EndKey = []byte("")
} else {
newMeta.EndKey = generateKeyForSimulator(rand.Intn(i+1), 56)
newMeta.StartKey = []byte("")
}
region := &Region{
meta: newMeta,
}
region.setStore(r.getStore())
cache.insertRegionToCache(region, true)
}
}

func BenchmarkInsertRegionToCache2(b *testing.B) {
b.StopTimer()
cache := newTestRegionCache()
r := &Region{
meta: &metapb.Region{
Id: 1,
RegionEpoch: &metapb.RegionEpoch{},
},
}
rs := &regionStore{
workTiKVIdx: 0,
proxyTiKVIdx: -1,
stores: make([]*Store, 0, len(r.meta.Peers)),
pendingTiFlashPeerStores: map[uint64]uint64{},
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
}
r.setStore(rs)
b.StartTimer()
for i := 0; i < b.N; i++ {
newMeta := proto.Clone(r.meta).(*metapb.Region)
newMeta.RegionEpoch.ConfVer = uint64(i + 1)
newMeta.RegionEpoch.Version = uint64(i + 1)
region := &Region{
meta: newMeta,
}
region.setStore(r.getStore())
cache.insertRegionToCache(region, true)
}
}
17 changes: 15 additions & 2 deletions internal/locate/sorted_btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"bytes"

"github.com/google/btree"
"github.com/tikv/client-go/v2/internal/logutil"
"go.uber.org/zap"
)

// SortedRegions is a sorted btree.
Expand Down Expand Up @@ -93,23 +95,34 @@ func (s *SortedRegions) AscendGreaterOrEqual(startKey, endKey []byte, limit int)

// removeIntersecting removes all items that have intersection with the key range of given region.
// If the region itself is in the cache, it's not removed.
func (s *SortedRegions) removeIntersecting(r *Region) []*btreeItem {
func (s *SortedRegions) removeIntersecting(r *Region, verID *RegionVerID) ([]*btreeItem, bool) {
disksing marked this conversation as resolved.
Show resolved Hide resolved
var deleted []*btreeItem
var stale bool
s.b.AscendGreaterOrEqual(newBtreeSearchItem(r.StartKey()), func(item *btreeItem) bool {
// Skip the item that is equal to the given region.
if item.cachedRegion.VerID() == r.VerID() {
return true
}
if item.cachedRegion.meta.GetRegionEpoch().GetVersion() > verID.ver {
logutil.BgLogger().Debug("get stale region",
zap.Uint64("region", verID.GetID()), zap.Uint64("ver", verID.GetVer()), zap.Uint64("conf", verID.GetConfVer()),
zap.Uint64("intersecting-ver", item.cachedRegion.meta.GetRegionEpoch().GetVersion()))
stale = true
return false
}
if len(r.EndKey()) > 0 && bytes.Compare(item.cachedRegion.StartKey(), r.EndKey()) >= 0 {
return false
}
deleted = append(deleted, item)
return true
})
if stale {
return nil, true
}
for _, item := range deleted {
s.b.Delete(item)
}
return deleted
return deleted, false
}

// Clear removes all items from the btree.
Expand Down
14 changes: 13 additions & 1 deletion internal/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,14 @@ func (c *Cluster) Bootstrap(regionID uint64, storeIDs, peerIDs []uint64, leaderP
c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID)
}

// PutRegion adds or replaces a region.
func (c *Cluster) PutRegion(regionID, confVer, ver uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) {
c.Lock()
defer c.Unlock()

c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID, confVer, ver)
}

// AddPeer adds a new Peer for the Region on the Store.
func (c *Cluster) AddPeer(regionID, storeID, peerID uint64) {
c.Lock()
Expand Down Expand Up @@ -634,7 +642,7 @@ func newPeerMeta(peerID, storeID uint64) *metapb.Peer {
}
}

func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) *Region {
func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64, epoch ...uint64) *Region {
if len(storeIDs) != len(peerIDs) {
panic("len(storeIDs) != len(peerIds)")
}
Expand All @@ -647,6 +655,10 @@ func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64)
Peers: peers,
RegionEpoch: &metapb.RegionEpoch{},
}
if len(epoch) == 2 {
meta.RegionEpoch.ConfVer = epoch[0]
meta.RegionEpoch.Version = epoch[1]
}
return &Region{
Meta: meta,
leader: leaderPeerID,
Expand Down
Loading
Loading