From dea6e8df4be78cf9f900e0f3edf9f6968e62edda Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Thu, 25 Jan 2024 09:13:44 -0500 Subject: [PATCH 1/9] kvclient: remove EvictByKey from the interface Remove the EvictByKey from the interface. This replaces everything in the cache, and it is better to update entries through an EvictionToken. Epic: none Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 8 +++++++- pkg/kv/kvclient/rangecache/range_cache.go | 6 ++++-- pkg/kv/kvclient/rangecache/range_cache_test.go | 2 +- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 28abc6a76d59..e6ba512af22c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -708,7 +708,13 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { return } log.VEventf(ctx, 1, "gossiped first range descriptor: %+v", desc.Replicas()) - ds.rangeCache.EvictByKey(ctx, roachpb.RKeyMin) + et, err := ds.rangeCache.LookupWithEvictionToken(ctx, roachpb.RKeyMin, rangecache.EvictionToken{}, false) + if err != nil { + et.EvictAndReplace(ctx, roachpb.RangeInfo{ + Desc: *desc, + ClosedTimestampPolicy: rangecache.UnknownClosedTimestampPolicy, + }) + } }) } diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index 1fad2354b5a7..2d2c10ce3e97 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -978,10 +978,12 @@ func (rc *RangeCache) Clear() { rc.rangeCache.cache.Clear() } -// EvictByKey evicts the descriptor containing the given key, if any. +// evictByKey evicts the descriptor containing the given key, if any. This +// evicts everything about the key and is only used in tests. Updates should be +// done through an EvictionToken. // // Returns true if a descriptor was evicted. -func (rc *RangeCache) EvictByKey(ctx context.Context, descKey roachpb.RKey) bool { +func (rc *RangeCache) evictByKey(ctx context.Context, descKey roachpb.RKey) bool { rc.rangeCache.Lock() defer rc.rangeCache.Unlock() diff --git a/pkg/kv/kvclient/rangecache/range_cache_test.go b/pkg/kv/kvclient/rangecache/range_cache_test.go index 17010b64d615..27a105311cb9 100644 --- a/pkg/kv/kvclient/rangecache/range_cache_test.go +++ b/pkg/kv/kvclient/rangecache/range_cache_test.go @@ -457,7 +457,7 @@ func TestRangeCache(t *testing.T) { // Evicts [d,e). require.True(t, evict(ctx, db.cache, deTok.Desc())) // Evicts [meta(min),meta(g)). - require.True(t, db.cache.EvictByKey(ctx, keys.RangeMetaKey(roachpb.RKey("da")))) + require.True(t, db.cache.evictByKey(ctx, keys.RangeMetaKey(roachpb.RKey("da")))) doLookup(ctx, db.cache, "fa") db.assertLookupCountEq(t, 0, "fa") // Totally uncached range. From 29b26aceb15ad1049226501e0d30574e019ef9c4 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Thu, 25 Jan 2024 13:40:09 -0500 Subject: [PATCH 2/9] kvclient: change GetCached to return RangeInfo The internal CacheEntry inside the RangeCache should not be publicly exposed. This commit changes GetCached to return the public RangeInfo instead. This is a test only menthod. Epic: none Release note: None --- .../kvfollowerreadsccl/followerreads_test.go | 24 ++++---- pkg/ccl/multiregionccl/datadriven_test.go | 4 +- pkg/ccl/multiregionccl/roundtrips_test.go | 6 +- pkg/kv/bulk/sst_batcher.go | 2 +- pkg/kv/bulk/sst_batcher_test.go | 2 +- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 55 ++++++++----------- pkg/kv/kvclient/rangecache/range_cache.go | 37 +++++++++++-- .../kvclient/rangecache/range_cache_test.go | 36 ++++++------ pkg/kv/kvserver/intentresolver/BUILD.bazel | 1 - .../intentresolver/intent_resolver.go | 15 +++-- pkg/sql/distsql_physical_planner_test.go | 4 +- 11 files changed, 101 insertions(+), 85 deletions(-) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index 55afd8bcb21f..e11f66cdc9e2 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -817,12 +817,12 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { n4Cache := tc.Server(3).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache() entry := n4Cache.GetCached(ctx, tablePrefix, false /* inverted */) require.NotNil(t, entry) - require.False(t, entry.Lease().Empty()) - require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID) + require.False(t, entry.Lease.Empty()) + require.Equal(t, roachpb.StoreID(1), entry.Lease.Replica.StoreID) require.Equal(t, []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 1, ReplicaID: 1}, {NodeID: 2, StoreID: 2, ReplicaID: 2}, - }, entry.Desc().Replicas().Descriptors()) + }, entry.Desc.Replicas().Descriptors()) // Remove the follower and add a new non-voter to n3. n2 will no longer have a // replica. @@ -840,12 +840,12 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { // Check that the cache was properly updated. entry = n4Cache.GetCached(ctx, tablePrefix, false /* inverted */) require.NotNil(t, entry) - require.False(t, entry.Lease().Empty()) - require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID) + require.False(t, entry.Lease.Empty()) + require.Equal(t, roachpb.StoreID(1), entry.Lease.Replica.StoreID) require.Equal(t, []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 1, ReplicaID: 1}, {NodeID: 3, StoreID: 3, ReplicaID: 3, Type: roachpb.NON_VOTER}, - }, entry.Desc().Replicas().Descriptors()) + }, entry.Desc.Replicas().Descriptors()) // Make a note of the follower reads metric on n3. We'll check that it was // incremented. @@ -883,12 +883,12 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { n3Cache := tc.Server(2).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache() entry = n3Cache.GetCached(ctx, tablePrefix, false /* inverted */) require.NotNil(t, entry) - require.False(t, entry.Lease().Empty()) - require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID) + require.False(t, entry.Lease.Empty()) + require.Equal(t, roachpb.StoreID(1), entry.Lease.Replica.StoreID) require.Equal(t, []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 1, ReplicaID: 1}, {NodeID: 3, StoreID: 3, ReplicaID: 3, Type: roachpb.NON_VOTER}, - }, entry.Desc().Replicas().Descriptors()) + }, entry.Desc.Replicas().Descriptors()) // Enable DistSQL so that we have a distributed plan with a single flow on // n3 (local plans ignore the misplanned ranges). @@ -1136,13 +1136,13 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) { cache := tenants[gatewayNode].DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache() entry := cache.GetCached(ctx, tablePrefix, false /* inverted */) require.NotNil(t, entry) - require.False(t, entry.Lease().Empty()) - require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID) + require.False(t, entry.Lease.Empty()) + require.Equal(t, roachpb.StoreID(1), entry.Lease.Replica.StoreID) require.Equal(t, []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 1, ReplicaID: 1}, {NodeID: 2, StoreID: 2, ReplicaID: 2}, {NodeID: 3, StoreID: 3, ReplicaID: 3}, - }, entry.Desc().Replicas().Descriptors()) + }, entry.Desc.Replicas().Descriptors()) tenantSQL.Exec(t, historicalQuery) rec := <-recCh diff --git a/pkg/ccl/multiregionccl/datadriven_test.go b/pkg/ccl/multiregionccl/datadriven_test.go index 0adf7650d93e..08b7a37365dd 100644 --- a/pkg/ccl/multiregionccl/datadriven_test.go +++ b/pkg/ccl/multiregionccl/datadriven_test.go @@ -320,10 +320,10 @@ SET CLUSTER SETTING kv.allocator.min_lease_transfer_interval = '5m' cache := ds.tc.Server(idx).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache() tablePrefix := keys.MustAddr(keys.SystemSQLCodec.TablePrefix(tableID)) entry := cache.GetCached(ctx, tablePrefix, false /* inverted */) - if entry == nil { + if !entry.Desc.IsInitialized() { return errors.Newf("no entry found for %s in cache", tbName).Error() } - return entry.ClosedTimestampPolicy().String() + return entry.ClosedTimestampPolicy.String() case "wait-for-zone-config-changes": lookupKey, err := getRangeKeyForInput(t, d, ds.tc) diff --git a/pkg/ccl/multiregionccl/roundtrips_test.go b/pkg/ccl/multiregionccl/roundtrips_test.go index a6e755377490..a2bdb95fd1bc 100644 --- a/pkg/ccl/multiregionccl/roundtrips_test.go +++ b/pkg/ccl/multiregionccl/roundtrips_test.go @@ -124,14 +124,14 @@ func TestEnsureLocalReadsOnGlobalTables(t *testing.T) { // Check that the cache was indeed populated. cache := tc.Server(i).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache() entry := cache.GetCached(context.Background(), tablePrefix, false /* inverted */) - require.NotNil(t, entry.Lease().Empty()) + require.NotNil(t, entry.Lease.Empty()) require.NotNil(t, entry) - if expected, got := roachpb.LEAD_FOR_GLOBAL_READS, entry.ClosedTimestampPolicy(); got != expected { + if expected, got := roachpb.LEAD_FOR_GLOBAL_READS, entry.ClosedTimestampPolicy; got != expected { return errors.Newf("expected closedts policy %s, got %s", expected, got) } - isLeaseHolder = entry.Lease().Replica.NodeID == tc.Server(i).NodeID() + isLeaseHolder = entry.Lease.Replica.NodeID == tc.Server(i).NodeID() return nil }) diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 2cc0c6ee6b66..b3837410f246 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -449,7 +449,7 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err if r, err := b.rc.Lookup(ctx, k); err != nil { log.Warningf(ctx, "failed to lookup range cache entry for key %v: %v", k, err) } else { - k := r.Desc().EndKey.AsRawKey() + k := r.Desc.EndKey.AsRawKey() b.flushKey = k log.VEventf(ctx, 3, "%s building sstable that will flush before %v", b.name, k) } diff --git a/pkg/kv/bulk/sst_batcher_test.go b/pkg/kv/bulk/sst_batcher_test.go index e7969c42c26e..4912a4bbe3e3 100644 --- a/pkg/kv/bulk/sst_batcher_test.go +++ b/pkg/kv/bulk/sst_batcher_test.go @@ -294,7 +294,7 @@ func runTestImport(t *testing.T, batchSizeValue int64) { for _, k := range []int{0, split1} { ent, err := ds.RangeDescriptorCache().Lookup(ctx, keys.MustAddr(key(k))) require.NoError(t, err) - mockCache.Insert(ctx, roachpb.RangeInfo{Desc: *ent.Desc()}) + mockCache.Insert(ctx, ent) } t.Logf("splitting at %s", key(split2)) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 75673b2bb827..2336a22274b8 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -741,11 +741,11 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { require.NotNil(t, rng) if tc.expLeaseholder != nil { - lh := rng.Leaseholder() + lh := rng.Lease.Replica require.NotNil(t, lh) require.Equal(t, tc.expLeaseholder, lh) if tc.expLease { - l := rng.Lease() + l := rng.Lease require.NotNil(t, l) require.Equal(t, *tc.expLeaseholder, l.Replica) // The transport retry will use the replica descriptor from the @@ -755,7 +755,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { expRetryReplica.Type = 0 require.Equal(t, expRetryReplica, retryReplica) } else { - require.Nil(t, rng.Lease()) + require.Nil(t, rng.Lease) } } }) @@ -1418,9 +1418,9 @@ func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) { } rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */) - require.Equal(t, desc, *rng.Desc()) - require.Equal(t, roachpb.StoreID(2), rng.Lease().Replica.StoreID) - require.Equal(t, roachpb.LAG_BY_CLUSTER_SETTING, rng.ClosedTimestampPolicy()) + require.Equal(t, desc, rng.Desc) + require.Equal(t, roachpb.StoreID(2), rng.Lease.Replica.StoreID) + require.Equal(t, roachpb.LAG_BY_CLUSTER_SETTING, rng.ClosedTimestampPolicy) } // TestRetryOnDescriptorLookupError verifies that the DistSender retries a descriptor @@ -1685,7 +1685,7 @@ func TestEvictCacheOnError(t *testing.T) { if tc.shouldClearReplica { require.Nil(t, rng) } else if tc.shouldClearLeaseHolder { - require.True(t, rng.Lease().Empty()) + require.True(t, rng.Lease.Empty()) } }) } @@ -2190,15 +2190,7 @@ func TestDistSenderDescriptorUpdatesOnSuccessfulRPCs(t *testing.T) { for _, ri := range tc { rk := ri.Desc.StartKey entry := ds.rangeCache.GetCached(ctx, rk, false /* inverted */) - require.NotNil(t, entry) - require.Equal(t, &ri.Desc, entry.Desc()) - if ri.Lease.Empty() { - require.Nil(t, entry.Leaseholder()) - require.Nil(t, entry.Lease()) - } else { - require.Equal(t, &ri.Lease, entry.Lease()) - } - require.Equal(t, ri.ClosedTimestampPolicy, entry.ClosedTimestampPolicy()) + require.Equal(t, ri, entry) } }) } @@ -2264,7 +2256,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) { // that the response evicts it. rng := ds.rangeCache.GetCached(ctx, descriptor.StartKey, false /* inverse */) ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ - Desc: *rng.Desc(), + Desc: rng.Desc, Lease: roachpb.Lease{Replica: ba.Replica}, }) } @@ -2299,7 +2291,7 @@ func TestSendRPCRangeNotFoundError(t *testing.T) { rng := ds.rangeCache.GetCached(ctx, descriptor.StartKey, false /* inverted */) require.NotNil(t, rng) - require.Equal(t, leaseholderStoreID, rng.Lease().Replica.StoreID) + require.Equal(t, leaseholderStoreID, rng.Lease.Replica.StoreID) } func TestMultiRangeGapReverse(t *testing.T) { @@ -3974,8 +3966,8 @@ func TestCanSendToFollower(t *testing.T) { // leaseholder. rng := ds.rangeCache.GetCached(ctx, testUserRangeDescriptor.StartKey, false /* inverted */) require.NotNil(t, rng) - require.NotNil(t, rng.Lease()) - require.Equal(t, roachpb.StoreID(2), rng.Lease().Replica.StoreID) + require.NotNil(t, rng.Lease) + require.Equal(t, roachpb.StoreID(2), rng.Lease.Replica.StoreID) }) } } @@ -4139,9 +4131,9 @@ func TestEvictMetaRange(t *testing.T) { // Verify that there is one meta2 cached range. cachedRange := ds.rangeCache.GetCached(ctx, keys.RangeMetaKey(roachpb.RKey("a")), false) - if !cachedRange.Desc().StartKey.Equal(keys.Meta2Prefix) || !cachedRange.Desc().EndKey.Equal(testMetaEndKey) { + if !cachedRange.Desc.StartKey.Equal(keys.Meta2Prefix) || !cachedRange.Desc.EndKey.Equal(testMetaEndKey) { t.Fatalf("expected cached meta2 range to be [%s, %s), actual [%s, %s)", - keys.Meta2Prefix, testMetaEndKey, cachedRange.Desc().StartKey, cachedRange.Desc().EndKey) + keys.Meta2Prefix, testMetaEndKey, cachedRange.Desc.StartKey, cachedRange.Desc.EndKey) } // Simulate a split on the meta2 range and mark it as stale. @@ -4154,14 +4146,14 @@ func TestEvictMetaRange(t *testing.T) { // Verify that there are two meta2 cached ranges. cachedRange = ds.rangeCache.GetCached(ctx, keys.RangeMetaKey(roachpb.RKey("a")), false) - if !cachedRange.Desc().StartKey.Equal(keys.Meta2Prefix) || !cachedRange.Desc().EndKey.Equal(splitKey) { + if !cachedRange.Desc.StartKey.Equal(keys.Meta2Prefix) || !cachedRange.Desc.EndKey.Equal(splitKey) { t.Fatalf("expected cached meta2 range to be [%s, %s), actual [%s, %s)", - keys.Meta2Prefix, splitKey, cachedRange.Desc().StartKey, cachedRange.Desc().EndKey) + keys.Meta2Prefix, splitKey, cachedRange.Desc.StartKey, cachedRange.Desc.EndKey) } cachedRange = ds.rangeCache.GetCached(ctx, keys.RangeMetaKey(roachpb.RKey("b")), false) - if !cachedRange.Desc().StartKey.Equal(splitKey) || !cachedRange.Desc().EndKey.Equal(testMetaEndKey) { + if !cachedRange.Desc.StartKey.Equal(splitKey) || !cachedRange.Desc.EndKey.Equal(testMetaEndKey) { t.Fatalf("expected cached meta2 range to be [%s, %s), actual [%s, %s)", - splitKey, testMetaEndKey, cachedRange.Desc().StartKey, cachedRange.Desc().EndKey) + splitKey, testMetaEndKey, cachedRange.Desc.StartKey, cachedRange.Desc.EndKey) } }) } @@ -5151,9 +5143,8 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) { }, }, }) - ent, err := rc.Lookup(ctx, roachpb.RKeyMin) + tok, err := rc.LookupWithEvictionToken(ctx, roachpb.RKeyMin, rangecache.EvictionToken{}, false) require.NoError(t, err) - tok := rc.MakeEvictionToken(&ent) numCalled := 0 transportFn := func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { @@ -5211,14 +5202,14 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) { require.Regexp(t, "NotLeaseHolderError", err) cached := rc.GetCached(ctx, tc.initialDesc.StartKey, false /* inverted */) require.NotNil(t, cached) - require.Equal(t, tc.updatedDesc, *cached.Desc()) + require.Equal(t, tc.updatedDesc, cached.Desc) require.Equal(t, tc.expReplicasTried, numCalled) if tc.expLeaseholder == 0 { // Check that the leaseholder is cleared out. - require.Nil(t, cached.Leaseholder()) + require.Nil(t, cached.Lease.Replica) } else { - require.NotNil(t, cached.Leaseholder()) - require.Equal(t, tc.expLeaseholder, cached.Leaseholder().ReplicaID) + require.NotNil(t, cached.Lease) + require.Equal(t, tc.expLeaseholder, cached.Lease.Replica.ReplicaID) } }) }) diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index 2d2c10ce3e97..73661dcfa421 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -597,16 +597,27 @@ func (rc *RangeCache) LookupWithEvictionToken( return tok, nil } +func (rc *RangeCache) LookupRangeID( + ctx context.Context, key roachpb.RKey, +) (roachpb.RangeID, error) { + tok, err := rc.lookupInternal( + ctx, key, EvictionToken{}, false /* useReverseScan */) + if err != nil { + return 0, err + } + return tok.entry.Desc().RangeID, nil +} + // Lookup presents a simpler interface for looking up a RangeDescriptor for a // key without the eviction tokens or scan direction control of // LookupWithEvictionToken. -func (rc *RangeCache) Lookup(ctx context.Context, key roachpb.RKey) (CacheEntry, error) { +func (rc *RangeCache) Lookup(ctx context.Context, key roachpb.RKey) (roachpb.RangeInfo, error) { tok, err := rc.lookupInternal( ctx, key, EvictionToken{}, false /* useReverseScan */) if err != nil { - return CacheEntry{}, err + return roachpb.RangeInfo{}, err } - return *tok.entry, nil + return tok.entry.toRangeInfo(), nil } // GetCachedOverlapping returns all the cached entries which overlap a given @@ -1023,11 +1034,15 @@ func (rc *RangeCache) evictDescLocked(ctx context.Context, desc *roachpb.RangeDe // `inverted` determines the behavior at the range boundary: If set to true // and `key` is the EndKey and StartKey of two adjacent ranges, the first range // is returned instead of the second (which technically contains the given key). -func (rc *RangeCache) GetCached(ctx context.Context, key roachpb.RKey, inverted bool) *CacheEntry { +// This is only used by tests. +// TODO(baptist) return an err instead of an empty RangeInfo +func (rc *RangeCache) GetCached( + ctx context.Context, key roachpb.RKey, inverted bool, +) roachpb.RangeInfo { rc.rangeCache.RLock() defer rc.rangeCache.RUnlock() entry, _ := rc.getCachedRLocked(ctx, key, inverted) - return entry + return entry.toRangeInfo() } // getCachedRLocked is like GetCached, but it assumes that the caller holds a @@ -1344,6 +1359,18 @@ func (e *CacheEntry) LeaseSpeculative() bool { return e.lease.Speculative() } +func (e *CacheEntry) toRangeInfo() roachpb.RangeInfo { + if e == nil { + return roachpb.RangeInfo{} + } + return roachpb.RangeInfo{ + Desc: e.desc, + Lease: e.lease, + ClosedTimestampPolicy: e.ClosedTimestampPolicy(), + } + +} + // overrides returns true if o should replace e in the cache. It is assumed that // e's and o'd descriptors overlap (and so they can't co-exist in the cache). A // newer entry overrides an older entry. What entry is newer is decided based diff --git a/pkg/kv/kvclient/rangecache/range_cache_test.go b/pkg/kv/kvclient/rangecache/range_cache_test.go index 27a105311cb9..f8142fe3587f 100644 --- a/pkg/kv/kvclient/rangecache/range_cache_test.go +++ b/pkg/kv/kvclient/rangecache/range_cache_test.go @@ -507,13 +507,13 @@ func TestLookupByKeyMin(t *testing.T) { cache.Insert(ctx, roachpb.RangeInfo{Desc: startToMeta2Desc}) entMin := cache.GetCached(ctx, roachpb.RKeyMin, false /* inverted */) require.NotNil(t, entMin) - require.NotNil(t, entMin.Desc()) - require.Equal(t, startToMeta2Desc, *entMin.Desc()) + require.NotNil(t, entMin.Desc) + require.Equal(t, startToMeta2Desc, entMin.Desc) entNext := cache.GetCached(ctx, roachpb.RKeyMin.Next(), false /* inverted */) - require.True(t, entMin == entNext) + require.Equal(t, entMin, entNext) entNext = cache.GetCached(ctx, roachpb.RKeyMin.Next().Next(), false /* inverted */) - require.True(t, entMin == entNext) + require.Equal(t, entMin, entNext) } // TestRangeCacheCoalescedRequests verifies that concurrent lookups for @@ -1046,13 +1046,13 @@ func TestRangeCacheClearOverlapping(t *testing.T) { curGeneration := roachpb.RangeGeneration(1) require.True(t, clearOlderOverlapping(ctx, cache, minToBDesc)) cache.addEntryLocked(&CacheEntry{desc: *minToBDesc}) - if desc := cache.GetCached(ctx, roachpb.RKey("b"), false); desc != nil { - t.Errorf("descriptor unexpectedly non-nil: %s", desc) + if ri := cache.GetCached(ctx, roachpb.RKey("b"), false); ri.Desc.IsInitialized() { + t.Errorf("descriptor unexpectedly non-nil: %s", ri.Desc) } require.True(t, clearOlderOverlapping(ctx, cache, bToMaxDesc)) cache.addEntryLocked(&CacheEntry{desc: *bToMaxDesc}) - ri := cache.GetCached(ctx, roachpb.RKey("b"), false) + ri, _ := cache.getCachedRLocked(ctx, roachpb.RKey("b"), false) require.Equal(t, bToMaxDesc, ri.Desc()) // Add default descriptor back which should remove two split descriptors. @@ -1062,7 +1062,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) { require.True(t, clearOlderOverlapping(ctx, cache, &defDescCpy)) cache.addEntryLocked(&CacheEntry{desc: defDescCpy}) for _, key := range []roachpb.RKey{roachpb.RKey("a"), roachpb.RKey("b")} { - ri = cache.GetCached(ctx, key, false) + ri, _ = cache.getCachedRLocked(ctx, key, false) require.Equal(t, &defDescCpy, ri.Desc()) } @@ -1075,7 +1075,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) { } require.True(t, clearOlderOverlapping(ctx, cache, bToCDesc)) cache.addEntryLocked(&CacheEntry{desc: *bToCDesc}) - ri = cache.GetCached(ctx, roachpb.RKey("c"), true) + ri, _ = cache.getCachedRLocked(ctx, roachpb.RKey("c"), true) require.Equal(t, bToCDesc, ri.Desc()) curGeneration++ @@ -1086,7 +1086,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) { } require.True(t, clearOlderOverlapping(ctx, cache, aToBDesc)) cache.addEntryLocked(ri) - ri = cache.GetCached(ctx, roachpb.RKey("c"), true) + ri, _ = cache.getCachedRLocked(ctx, roachpb.RKey("c"), true) require.Equal(t, bToCDesc, ri.Desc()) } @@ -1441,10 +1441,10 @@ func TestRangeCacheGeneration(t *testing.T) { ri := cache.GetCached(ctx, queryKey, false) exp := tc.expectedDesc[index] if exp == nil { - require.Nil(t, ri) + require.Equal(t, roachpb.RangeInfo{}, ri) } else { require.NotNil(t, ri) - require.NotNil(t, *exp, ri.Desc()) + require.NotNil(t, *exp, ri.Desc) } } }) @@ -1671,9 +1671,9 @@ func TestRangeCacheSyncTokenAndMaybeUpdateCache(t *testing.T) { require.Equal(t, oldTok.ClosedTimestampPolicy(lag), tok.ClosedTimestampPolicy(lag)) ri := cache.GetCached(ctx, startKey, false /* inverted */) require.NotNil(t, ri) - require.Equal(t, desc1, *ri.Desc()) - require.Equal(t, rep1, ri.Lease().Replica) - require.Equal(t, lead, ri.ClosedTimestampPolicy()) + require.Equal(t, desc1, ri.Desc) + require.Equal(t, rep1, ri.Lease.Replica) + require.Equal(t, lead, ri.ClosedTimestampPolicy) // Ensure evicting the lease doesn't remove the closed timestamp // policy/desc. @@ -1684,9 +1684,9 @@ func TestRangeCacheSyncTokenAndMaybeUpdateCache(t *testing.T) { require.Equal(t, oldTok.ClosedTimestampPolicy(lag), tok.ClosedTimestampPolicy(lag)) ri = cache.GetCached(ctx, startKey, false /* inverted */) require.NotNil(t, ri) - require.Equal(t, desc1, *ri.Desc()) - require.True(t, ri.lease.Empty()) - require.Equal(t, lead, ri.ClosedTimestampPolicy()) + require.Equal(t, desc1, ri.Desc) + require.True(t, ri.Lease.Empty()) + require.Equal(t, lead, ri.ClosedTimestampPolicy) }, }, { diff --git a/pkg/kv/kvserver/intentresolver/BUILD.bazel b/pkg/kv/kvserver/intentresolver/BUILD.bazel index c59a533a9dc2..5e8f83b5e4c4 100644 --- a/pkg/kv/kvserver/intentresolver/BUILD.bazel +++ b/pkg/kv/kvserver/intentresolver/BUILD.bazel @@ -14,7 +14,6 @@ go_library( "//pkg/internal/client/requestbatcher", "//pkg/keys", "//pkg/kv", - "//pkg/kv/kvclient/rangecache", "//pkg/kv/kvpb", "//pkg/kv/kvserver/batcheval/result", "//pkg/kv/kvserver/kvserverbase", diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver.go b/pkg/kv/kvserver/intentresolver/intent_resolver.go index 7afcdf10b91f..693aab30172c 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/internal/client/requestbatcher" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" @@ -140,8 +139,8 @@ type Config struct { // RangeCache is a simplified interface to the rngcache.RangeCache. type RangeCache interface { - // Lookup looks up range information for the range containing key. - Lookup(ctx context.Context, key roachpb.RKey) (rangecache.CacheEntry, error) + // LookupRangeID looks up range ID for the range containing key. + LookupRangeID(ctx context.Context, key roachpb.RKey) (roachpb.RangeID, error) } // IntentResolver manages the process of pushing transactions and @@ -203,10 +202,10 @@ func setConfigDefaults(c *Config) { type nopRangeDescriptorCache struct{} -func (nrdc nopRangeDescriptorCache) Lookup( +func (nrdc nopRangeDescriptorCache) LookupRangeID( ctx context.Context, key roachpb.RKey, -) (rangecache.CacheEntry, error) { - return rangecache.CacheEntry{}, nil +) (roachpb.RangeID, error) { + return 0, nil } // New creates an new IntentResolver. @@ -913,14 +912,14 @@ func (ir *IntentResolver) lookupRangeID(ctx context.Context, key roachpb.Key) ro } return 0 } - rInfo, err := ir.rdc.Lookup(ctx, rKey) + rangeID, err := ir.rdc.LookupRangeID(ctx, rKey) if err != nil { if ir.every.ShouldLog() { log.Warningf(ctx, "failed to look up range descriptor for key %q: %+v", key, err) } return 0 } - return rInfo.Desc().RangeID + return rangeID } // lockUpdates allows for eager or lazy translation of lock spans to lock updates. diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index 4529e6e82177..e5a68f349cbd 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -258,8 +258,8 @@ func TestDistSQLReceiverUpdatesCaches(t *testing.T) { for i := range descs { ri := rangeCache.GetCached(ctx, descs[i].StartKey, false /* inclusive */) require.NotNilf(t, ri, "failed to find range for key: %s", descs[i].StartKey) - require.Equal(t, &descs[i], ri.Desc()) - require.NotNil(t, ri.Lease()) + require.Equal(t, descs[i], ri.Desc) + require.NotNil(t, ri.Lease) } } From 56e78205241015c17e487d44caef25c234e75ffd Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Thu, 25 Jan 2024 16:31:43 -0500 Subject: [PATCH 3/9] kvclient: make cacheEntry private CacheEntry is not intended to be exposed externally. This commit makes it a private type in the RangeCache. Epic: none Release note: None --- pkg/kv/kvclient/rangecache/range_cache.go | 96 +++++++++---------- .../kvclient/rangecache/range_cache_test.go | 68 ++++++------- 2 files changed, 82 insertions(+), 82 deletions(-) diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index 73661dcfa421..8e8157b6ec8d 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -94,7 +94,7 @@ const ( ReadFromLeaseholder = kvpb.READ_UNCOMMITTED ) -// UnknownClosedTimestampPolicy is used to mark on a CacheEntry that the closed +// UnknownClosedTimestampPolicy is used to mark on a cacheEntry that the closed // timestamp policy is not known. This value is never serialized into // RangeInfo or any other message which uses the type. const UnknownClosedTimestampPolicy roachpb.RangeClosedTimestampPolicy = -1 @@ -275,10 +275,10 @@ type EvictionToken struct { // compatible descriptor, with the same range id and key bounds. If the // descriptor changes in a non-compatible way, this EvictionToken must be // discarded and a new one retrieved from the RangeCache. - entry *CacheEntry + entry *cacheEntry } -func (rc *RangeCache) makeEvictionToken(entry *CacheEntry) EvictionToken { +func (rc *RangeCache) makeEvictionToken(entry *cacheEntry) EvictionToken { return EvictionToken{ rdc: rc, entry: entry, @@ -286,7 +286,7 @@ func (rc *RangeCache) makeEvictionToken(entry *CacheEntry) EvictionToken { } // MakeEvictionToken is the exported ctor. For tests only. -func (rc *RangeCache) MakeEvictionToken(entry *CacheEntry) EvictionToken { +func (rc *RangeCache) MakeEvictionToken(entry *cacheEntry) EvictionToken { return rc.makeEvictionToken(entry) } @@ -375,7 +375,7 @@ func (et EvictionToken) ClosedTimestampPolicy( // entry for the start key any more. func (et *EvictionToken) syncRLocked( ctx context.Context, -) (stillValid bool, cachedEntry *CacheEntry, rawEntry *cache.Entry) { +) (stillValid bool, cachedEntry *cacheEntry, rawEntry *cache.Entry) { cachedEntry, rawEntry = et.rdc.getCachedRLocked(ctx, et.entry.desc.StartKey, false /* inverted */) if cachedEntry == nil || !descsCompatible(cachedEntry.Desc(), et.Desc()) { et.clear() @@ -622,11 +622,11 @@ func (rc *RangeCache) Lookup(ctx context.Context, key roachpb.RKey) (roachpb.Ran // GetCachedOverlapping returns all the cached entries which overlap a given // span [Key, EndKey). The results are sorted ascendingly. -func (rc *RangeCache) GetCachedOverlapping(ctx context.Context, span roachpb.RSpan) []*CacheEntry { +func (rc *RangeCache) GetCachedOverlapping(ctx context.Context, span roachpb.RSpan) []*cacheEntry { rc.rangeCache.RLock() defer rc.rangeCache.RUnlock() rawEntries := rc.getCachedOverlappingRLocked(ctx, span) - entries := make([]*CacheEntry, len(rawEntries)) + entries := make([]*cacheEntry, len(rawEntries)) for i, e := range rawEntries { entries[i] = rc.getValue(e) } @@ -906,10 +906,10 @@ func tryLookupImpl( rc.rangeCache.Lock() defer rc.rangeCache.Unlock() - // We want to insert a new CacheEntry, possibly with a speculativeDesc. + // We want to insert a new cacheEntry, possibly with a speculativeDesc. // Create the entry based on the lookup and try and insert it into the // cache. - newEntry := CacheEntry{ + newEntry := cacheEntry{ desc: rs[0], // We don't have any lease information. lease: roachpb.Lease{}, @@ -930,10 +930,10 @@ func tryLookupImpl( // rs[0]'s eviction token. Note that ranges for which the cache has more // up-to-date information will not be clobbered - for example ranges for // which the cache has the prefetched descriptor already plus a lease. - newEntries := make([]*CacheEntry, len(preRs)+1) + newEntries := make([]*cacheEntry, len(preRs)+1) newEntries[0] = &newEntry for i, preR := range preRs { - newEntries[i+1] = &CacheEntry{desc: preR, closedts: UnknownClosedTimestampPolicy} + newEntries[i+1] = &cacheEntry{desc: preR, closedts: UnknownClosedTimestampPolicy} } insertedEntries := rc.insertLockedInner(ctx, newEntries) // entry corresponds to rs[0], which is the descriptor covering the key @@ -1052,7 +1052,7 @@ func (rc *RangeCache) GetCached( // used for descriptor eviction. func (rc *RangeCache) getCachedRLocked( ctx context.Context, key roachpb.RKey, inverted bool, -) (*CacheEntry, *cache.Entry) { +) (*cacheEntry, *cache.Entry) { // rawEntry will be the range containing key, or the first cached entry around // key, in the direction indicated by inverted. var rawEntry *cache.Entry @@ -1118,10 +1118,10 @@ func (rc *RangeCache) Insert(ctx context.Context, rs ...roachpb.RangeInfo) { // for putting in eviction tokens. Any element in the returned array can be nil // if inserting the respective RangeInfo failed because it was found to be // stale. -func (rc *RangeCache) insertLocked(ctx context.Context, rs ...roachpb.RangeInfo) []*CacheEntry { - entries := make([]*CacheEntry, len(rs)) +func (rc *RangeCache) insertLocked(ctx context.Context, rs ...roachpb.RangeInfo) []*cacheEntry { + entries := make([]*cacheEntry, len(rs)) for i, r := range rs { - entries[i] = &CacheEntry{ + entries[i] = &cacheEntry{ desc: r.Desc, lease: r.Lease, closedts: r.ClosedTimestampPolicy, @@ -1130,10 +1130,10 @@ func (rc *RangeCache) insertLocked(ctx context.Context, rs ...roachpb.RangeInfo) return rc.insertLockedInner(ctx, entries) } -func (rc *RangeCache) insertLockedInner(ctx context.Context, rs []*CacheEntry) []*CacheEntry { +func (rc *RangeCache) insertLockedInner(ctx context.Context, rs []*cacheEntry) []*cacheEntry { // entries will have the same element as rs, except the ones that couldn't be // inserted for which the slots will remain nil. - entries := make([]*CacheEntry, len(rs)) + entries := make([]*cacheEntry, len(rs)) for i, ent := range rs { if !ent.desc.IsInitialized() { log.Fatalf(ctx, "inserting uninitialized desc: %s", ent) @@ -1171,13 +1171,13 @@ func (rc *RangeCache) insertLockedInner(ctx context.Context, rs []*CacheEntry) [ return entries } -func (rc *RangeCache) getValue(entry *cache.Entry) *CacheEntry { - return entry.Value.(*CacheEntry) +func (rc *RangeCache) getValue(entry *cache.Entry) *cacheEntry { + return entry.Value.(*cacheEntry) } func (rc *RangeCache) clearOlderOverlapping( - ctx context.Context, newEntry *CacheEntry, -) (ok bool, newerEntry *CacheEntry) { + ctx context.Context, newEntry *cacheEntry, +) (ok bool, newerEntry *cacheEntry) { rc.rangeCache.Lock() defer rc.rangeCache.Unlock() return rc.clearOlderOverlappingLocked(ctx, newEntry) @@ -1194,11 +1194,11 @@ func (rc *RangeCache) clearOlderOverlapping( // Note that even if false is returned, older descriptors are still cleared from // the cache. func (rc *RangeCache) clearOlderOverlappingLocked( - ctx context.Context, newEntry *CacheEntry, -) (ok bool, newerEntry *CacheEntry) { + ctx context.Context, newEntry *cacheEntry, +) (ok bool, newerEntry *cacheEntry) { log.VEventf(ctx, 2, "clearing entries overlapping %s", newEntry.Desc()) newest := true - var newerFound *CacheEntry + var newerFound *cacheEntry overlapping := rc.getCachedOverlappingRLocked(ctx, newEntry.Desc().RSpan()) for _, e := range overlapping { entry := rc.getValue(e) @@ -1227,7 +1227,7 @@ func (rc *RangeCache) clearOlderOverlappingLocked( // swapEntryLocked swaps oldEntry for newEntry. If newEntry is nil, oldEntry is // simply removed. func (rc *RangeCache) swapEntryLocked( - ctx context.Context, oldEntry *cache.Entry, newEntry *CacheEntry, + ctx context.Context, oldEntry *cache.Entry, newEntry *cacheEntry, ) { if newEntry != nil { old := rc.getValue(oldEntry) @@ -1244,7 +1244,7 @@ func (rc *RangeCache) swapEntryLocked( } } -func (rc *RangeCache) addEntryLocked(entry *CacheEntry) { +func (rc *RangeCache) addEntryLocked(entry *cacheEntry) { key := newRangeCacheKey(entry.Desc().StartKey) rc.rangeCache.cache.Add(key, entry) } @@ -1270,12 +1270,12 @@ func (rc *RangeCache) NumInFlight(name string) int { return rc.lookupRequests.NumCalls(name) } -// CacheEntry represents one cache entry. +// cacheEntry represents one cache entry. // -// The cache stores *CacheEntry. Entries are immutable: cache lookups -// returns the same *CacheEntry to multiple queriers for efficiency, but +// The cache stores *cacheEntry. Entries are immutable: cache lookups +// returns the same *cacheEntry to multiple queriers for efficiency, but // nobody should modify the lookup result. -type CacheEntry struct { +type cacheEntry struct { // desc is always populated. desc roachpb.RangeDescriptor // speculativeDesc, if not nil, is the descriptor that should replace desc if @@ -1301,20 +1301,20 @@ type CacheEntry struct { closedts roachpb.RangeClosedTimestampPolicy } -func (e CacheEntry) String() string { +func (e cacheEntry) String() string { return fmt.Sprintf("desc:%s, lease:%s", e.Desc(), e.lease) } // Desc returns the cached descriptor. Note that, besides being possibly stale, // this descriptor also might not represent a descriptor that was ever // committed. See DescSpeculative(). -func (e *CacheEntry) Desc() *roachpb.RangeDescriptor { +func (e *cacheEntry) Desc() *roachpb.RangeDescriptor { return &e.desc } // Leaseholder returns the cached leaseholder replica, if known. Returns nil if // the leaseholder is not known. -func (e *CacheEntry) Leaseholder() *roachpb.ReplicaDescriptor { +func (e *cacheEntry) Leaseholder() *roachpb.ReplicaDescriptor { if e.lease.Empty() { return nil } @@ -1324,7 +1324,7 @@ func (e *CacheEntry) Leaseholder() *roachpb.ReplicaDescriptor { // Lease returns the cached lease, if known. Returns nil if no lease is known. // It's possible for a leaseholder to be known, but not a full lease, in which // case Leaseholder() returns non-nil but Lease() returns nil. -func (e *CacheEntry) Lease() *roachpb.Lease { +func (e *cacheEntry) Lease() *roachpb.Lease { if e.lease.Empty() { return nil } @@ -1336,7 +1336,7 @@ func (e *CacheEntry) Lease() *roachpb.Lease { // ClosedTimestampPolicy returns the cached understanding of the range's closed // timestamp policy. If no policy is known, LAG_BY_CLUSTER_SETTING is returned. -func (e *CacheEntry) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy { +func (e *cacheEntry) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy { return e.closedts } @@ -1345,21 +1345,21 @@ func (e *CacheEntry) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy // inserted in the cache with Generation=0. // // Speculative descriptors come from (not-yet-committed) intents. -func (e *CacheEntry) DescSpeculative() bool { +func (e *cacheEntry) DescSpeculative() bool { return e.desc.Generation == 0 } // LeaseSpeculative returns true if the lease in the entry is "speculative" // - i.e. it doesn't correspond to a committed lease. Such leases have been // inserted in the cache with Sequence=0. -func (e *CacheEntry) LeaseSpeculative() bool { +func (e *cacheEntry) LeaseSpeculative() bool { if e.lease.Empty() { panic(fmt.Sprintf("LeaseSpeculative called on entry with empty lease: %s", e)) } return e.lease.Speculative() } -func (e *CacheEntry) toRangeInfo() roachpb.RangeInfo { +func (e *cacheEntry) toRangeInfo() roachpb.RangeInfo { if e == nil { return roachpb.RangeInfo{} } @@ -1384,7 +1384,7 @@ func (e *CacheEntry) toRangeInfo() roachpb.RangeInfo { // can't be determined what information is newer is when at least one of the // descriptors is "speculative" (generation=0), or when the lease information is // "speculative" (sequence=0). -func (e *CacheEntry) overrides(o *CacheEntry) bool { +func (e *cacheEntry) overrides(o *cacheEntry) bool { if util.RaceEnabled { if _, err := e.Desc().RSpan().Intersect(o.Desc().RSpan()); err != nil { panic(fmt.Sprintf("descriptors don't intersect: %s vs %s", e.Desc(), o.Desc())) @@ -1424,7 +1424,7 @@ func (e *CacheEntry) overrides(o *CacheEntry) bool { // // In case at least one of the descriptors is "speculative", a is considered // older; this matches the semantics of b.overrides(a). -func compareEntryDescs(a, b *CacheEntry) int { +func compareEntryDescs(a, b *cacheEntry) int { if util.RaceEnabled { if _, err := a.Desc().RSpan().Intersect(b.Desc().RSpan()); err != nil { panic(fmt.Sprintf("descriptors don't intersect: %s vs %s", a.Desc(), b.Desc())) @@ -1455,7 +1455,7 @@ func compareEntryDescs(a, b *CacheEntry) int { // An empty lease is considered older than any other. In case at least one of // the leases is "speculative", a is considered older; this matches the // semantics of b.overrides(a). -func compareEntryLeases(a, b *CacheEntry) int { +func compareEntryLeases(a, b *cacheEntry) int { if aEmpty, bEmpty := a.lease.Empty(), b.lease.Empty(); aEmpty || bEmpty { if aEmpty && !bEmpty { return -1 @@ -1481,7 +1481,7 @@ func compareEntryLeases(a, b *CacheEntry) int { return 0 } -// maybeUpdate returns a new CacheEntry which contains the freshest lease/range +// maybeUpdate returns a new cacheEntry which contains the freshest lease/range // descriptor by comparing the receiver's fields to the passed-in parameters. // // The updated retval indicates if either the passed-in lease or the range @@ -1504,15 +1504,15 @@ func compareEntryLeases(a, b *CacheEntry) int { // // It's expected that the supplied rangeDesc is compatible with the descriptor // on the cache entry. -func (e *CacheEntry) maybeUpdate( +func (e *cacheEntry) maybeUpdate( ctx context.Context, l *roachpb.Lease, rangeDesc *roachpb.RangeDescriptor, -) (updated, updatedLease bool, newEntry *CacheEntry) { +) (updated, updatedLease bool, newEntry *cacheEntry) { if !descsCompatible(e.Desc(), rangeDesc) { log.Fatalf(ctx, "attempting to update by comparing non-compatible descs: %s vs %s", e.Desc(), rangeDesc) } - newEntry = &CacheEntry{ + newEntry = &cacheEntry{ lease: e.lease, desc: e.desc, closedts: e.closedts, @@ -1576,13 +1576,13 @@ func (e *CacheEntry) maybeUpdate( return updatedLease || updatedDesc, updatedLease, newEntry } -func (e *CacheEntry) evictLeaseholder( +func (e *cacheEntry) evictLeaseholder( lh roachpb.ReplicaDescriptor, -) (updated bool, newEntry *CacheEntry) { +) (updated bool, newEntry *cacheEntry) { if e.lease.Replica != lh { return false, e } - return true, &CacheEntry{ + return true, &cacheEntry{ desc: e.desc, closedts: e.closedts, } diff --git a/pkg/kv/kvclient/rangecache/range_cache_test.go b/pkg/kv/kvclient/rangecache/range_cache_test.go index f8142fe3587f..d7d5aac61a90 100644 --- a/pkg/kv/kvclient/rangecache/range_cache_test.go +++ b/pkg/kv/kvclient/rangecache/range_cache_test.go @@ -298,7 +298,7 @@ func evict(ctx context.Context, rc *RangeCache, desc *roachpb.RangeDescriptor) b func clearOlderOverlapping( ctx context.Context, rc *RangeCache, desc *roachpb.RangeDescriptor, ) bool { - ent := &CacheEntry{desc: *desc} + ent := &cacheEntry{desc: *desc} ok, _ /* newerEntry */ := rc.clearOlderOverlapping(ctx, ent) return ok } @@ -1030,7 +1030,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) cache := NewRangeCache(st, nil, staticSize(2<<10), stopper) - cache.addEntryLocked(&CacheEntry{desc: *defDesc}) + cache.addEntryLocked(&cacheEntry{desc: *defDesc}) // Now, add a new, overlapping set of descriptors. minToBDesc := &roachpb.RangeDescriptor{ @@ -1045,13 +1045,13 @@ func TestRangeCacheClearOverlapping(t *testing.T) { } curGeneration := roachpb.RangeGeneration(1) require.True(t, clearOlderOverlapping(ctx, cache, minToBDesc)) - cache.addEntryLocked(&CacheEntry{desc: *minToBDesc}) + cache.addEntryLocked(&cacheEntry{desc: *minToBDesc}) if ri := cache.GetCached(ctx, roachpb.RKey("b"), false); ri.Desc.IsInitialized() { t.Errorf("descriptor unexpectedly non-nil: %s", ri.Desc) } require.True(t, clearOlderOverlapping(ctx, cache, bToMaxDesc)) - cache.addEntryLocked(&CacheEntry{desc: *bToMaxDesc}) + cache.addEntryLocked(&cacheEntry{desc: *bToMaxDesc}) ri, _ := cache.getCachedRLocked(ctx, roachpb.RKey("b"), false) require.Equal(t, bToMaxDesc, ri.Desc()) @@ -1060,7 +1060,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) { curGeneration++ defDescCpy.Generation = curGeneration require.True(t, clearOlderOverlapping(ctx, cache, &defDescCpy)) - cache.addEntryLocked(&CacheEntry{desc: defDescCpy}) + cache.addEntryLocked(&cacheEntry{desc: defDescCpy}) for _, key := range []roachpb.RKey{roachpb.RKey("a"), roachpb.RKey("b")} { ri, _ = cache.getCachedRLocked(ctx, key, false) require.Equal(t, &defDescCpy, ri.Desc()) @@ -1074,7 +1074,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) { Generation: curGeneration, } require.True(t, clearOlderOverlapping(ctx, cache, bToCDesc)) - cache.addEntryLocked(&CacheEntry{desc: *bToCDesc}) + cache.addEntryLocked(&cacheEntry{desc: *bToCDesc}) ri, _ = cache.getCachedRLocked(ctx, roachpb.RKey("c"), true) require.Equal(t, bToCDesc, ri.Desc()) @@ -1209,7 +1209,7 @@ func TestRangeCacheClearOlderOverlapping(t *testing.T) { for _, d := range tc.cachedDescs { cache.Insert(ctx, roachpb.RangeInfo{Desc: d}) } - newEntry := &CacheEntry{desc: tc.clearDesc} + newEntry := &cacheEntry{desc: tc.clearDesc} newest, newer := cache.clearOlderOverlapping(ctx, newEntry) all := cache.GetCachedOverlapping(ctx, roachpb.RSpan{Key: roachpb.RKeyMin, EndKey: roachpb.RKeyMax}) var allDescs []roachpb.RangeDescriptor @@ -1273,7 +1273,7 @@ func TestRangeCacheClearOverlappingMeta(t *testing.T) { t.Fatalf("invocation of clearOlderOverlapping panicked: %v", r) } }() - cache.clearOlderOverlapping(ctx, &CacheEntry{desc: metaSplitDesc}) + cache.clearOlderOverlapping(ctx, &cacheEntry{desc: metaSplitDesc}) }() } @@ -2010,7 +2010,7 @@ func TestRangeCacheEntryMaybeUpdate(t *testing.T) { Generation: 5, } - e := &CacheEntry{ + e := &cacheEntry{ desc: desc, lease: roachpb.Lease{}, } @@ -2175,17 +2175,17 @@ func TestRangeCacheEntryOverrides(t *testing.T) { tests := []struct { name string // We'll test b.overrides(a). - a, b CacheEntry + a, b cacheEntry exp bool }{ { name: "b newer gen", exp: true, - a: CacheEntry{ + a: cacheEntry{ desc: desc(5), lease: roachpb.Lease{}, }, - b: CacheEntry{ + b: cacheEntry{ desc: desc(6), lease: roachpb.Lease{}, }, @@ -2193,11 +2193,11 @@ func TestRangeCacheEntryOverrides(t *testing.T) { { name: "a newer gen", exp: false, - a: CacheEntry{ + a: cacheEntry{ desc: desc(7), lease: roachpb.Lease{}, }, - b: CacheEntry{ + b: cacheEntry{ desc: desc(6), lease: roachpb.Lease{}, }, @@ -2205,11 +2205,11 @@ func TestRangeCacheEntryOverrides(t *testing.T) { { name: "b newer lease", exp: true, - a: CacheEntry{ + a: cacheEntry{ desc: desc(5), lease: roachpb.Lease{Sequence: 1}, }, - b: CacheEntry{ + b: cacheEntry{ desc: desc(5), lease: roachpb.Lease{Sequence: 2}, }, @@ -2217,11 +2217,11 @@ func TestRangeCacheEntryOverrides(t *testing.T) { { name: "a newer lease", exp: false, - a: CacheEntry{ + a: cacheEntry{ desc: desc(5), lease: roachpb.Lease{Sequence: 2}, }, - b: CacheEntry{ + b: cacheEntry{ desc: desc(5), lease: roachpb.Lease{Sequence: 1}, }, @@ -2229,12 +2229,12 @@ func TestRangeCacheEntryOverrides(t *testing.T) { { name: "different closed timestamp policy #1", exp: true, - a: CacheEntry{ + a: cacheEntry{ desc: desc(5), lease: roachpb.Lease{Sequence: 1}, closedts: roachpb.LAG_BY_CLUSTER_SETTING, }, - b: CacheEntry{ + b: cacheEntry{ desc: desc(5), lease: roachpb.Lease{Sequence: 1}, closedts: roachpb.LEAD_FOR_GLOBAL_READS, @@ -2243,12 +2243,12 @@ func TestRangeCacheEntryOverrides(t *testing.T) { { name: "different closed timestamp policy #2", exp: true, - a: CacheEntry{ + a: cacheEntry{ desc: desc(5), lease: roachpb.Lease{Sequence: 1}, closedts: roachpb.LEAD_FOR_GLOBAL_READS, }, - b: CacheEntry{ + b: cacheEntry{ desc: desc(5), lease: roachpb.Lease{Sequence: 1}, closedts: roachpb.LAG_BY_CLUSTER_SETTING, @@ -2257,11 +2257,11 @@ func TestRangeCacheEntryOverrides(t *testing.T) { { name: "equal", exp: false, - a: CacheEntry{ + a: cacheEntry{ desc: desc(5), lease: roachpb.Lease{Sequence: 1}, }, - b: CacheEntry{ + b: cacheEntry{ desc: desc(5), lease: roachpb.Lease{Sequence: 1}, }, @@ -2269,11 +2269,11 @@ func TestRangeCacheEntryOverrides(t *testing.T) { { name: "a speculative desc", exp: true, - a: CacheEntry{ + a: cacheEntry{ desc: desc(0), lease: roachpb.Lease{Sequence: 1}, }, - b: CacheEntry{ + b: cacheEntry{ desc: desc(5), lease: roachpb.Lease{Sequence: 2}, }, @@ -2281,11 +2281,11 @@ func TestRangeCacheEntryOverrides(t *testing.T) { { name: "b speculative desc", exp: true, - a: CacheEntry{ + a: cacheEntry{ desc: desc(1), lease: roachpb.Lease{Sequence: 1}, }, - b: CacheEntry{ + b: cacheEntry{ desc: desc(0), lease: roachpb.Lease{Sequence: 2}, }, @@ -2293,11 +2293,11 @@ func TestRangeCacheEntryOverrides(t *testing.T) { { name: "both speculative descs", exp: true, - a: CacheEntry{ + a: cacheEntry{ desc: desc(0), lease: roachpb.Lease{Sequence: 1}, }, - b: CacheEntry{ + b: cacheEntry{ desc: desc(0), lease: roachpb.Lease{Sequence: 2}, }, @@ -2305,11 +2305,11 @@ func TestRangeCacheEntryOverrides(t *testing.T) { { name: "a speculative lease", exp: true, - a: CacheEntry{ + a: cacheEntry{ desc: desc(1), lease: roachpb.Lease{Sequence: 0}, }, - b: CacheEntry{ + b: cacheEntry{ desc: desc(1), lease: roachpb.Lease{Sequence: 1}, }, @@ -2317,11 +2317,11 @@ func TestRangeCacheEntryOverrides(t *testing.T) { { name: "both speculative leases", exp: true, - a: CacheEntry{ + a: cacheEntry{ desc: desc(1), lease: roachpb.Lease{Replica: roachpb.ReplicaDescriptor{ReplicaID: 1}, Sequence: 0}, }, - b: CacheEntry{ + b: cacheEntry{ desc: desc(1), lease: roachpb.Lease{Replica: roachpb.ReplicaDescriptor{ReplicaID: 2}, Sequence: 0}, }, From 89934709f926af88d6b91ba4c18b2f1c75142f37 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Thu, 25 Jan 2024 14:17:31 -0500 Subject: [PATCH 4/9] kvclient: remove unnecessary getters Now that the RangeCache.CacheEntry is private, we no longer need all the getters for the different fields. This simplifies some test code that was converting back and froth from pointers, structs. Epic: none Release note: None --- pkg/kv/kvclient/rangecache/range_cache.go | 95 ++++------- .../kvclient/rangecache/range_cache_test.go | 161 +++++++++--------- 2 files changed, 106 insertions(+), 150 deletions(-) diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index 8e8157b6ec8d..2b454ad4cd06 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -377,7 +377,7 @@ func (et *EvictionToken) syncRLocked( ctx context.Context, ) (stillValid bool, cachedEntry *cacheEntry, rawEntry *cache.Entry) { cachedEntry, rawEntry = et.rdc.getCachedRLocked(ctx, et.entry.desc.StartKey, false /* inverted */) - if cachedEntry == nil || !descsCompatible(cachedEntry.Desc(), et.Desc()) { + if cachedEntry == nil || !descsCompatible(&cachedEntry.desc, et.Desc()) { et.clear() return false, nil, nil } @@ -438,7 +438,7 @@ func (et *EvictionToken) SyncTokenAndMaybeUpdateCache( ri := roachpb.RangeInfo{ Desc: *rangeDesc, Lease: *l, - ClosedTimestampPolicy: et.entry.ClosedTimestampPolicy(), + ClosedTimestampPolicy: et.entry.closedts, } et.evictAndReplaceLocked(ctx, ri) return false @@ -605,7 +605,7 @@ func (rc *RangeCache) LookupRangeID( if err != nil { return 0, err } - return tok.entry.Desc().RangeID, nil + return tok.entry.desc.RangeID, nil } // Lookup presents a simpler interface for looking up a RangeDescriptor for a @@ -640,7 +640,7 @@ func (rc *RangeCache) getCachedOverlappingRLocked( defer from.release() var res []*cache.Entry rc.rangeCache.cache.DoRangeReverseEntry(func(e *cache.Entry) (exit bool) { - desc := rc.getValue(e).Desc() + desc := rc.getValue(e).desc if desc.StartKey.Equal(span.EndKey) { // Skip over descriptor starting at the end key, who'd supposed to be exclusive. return false @@ -1015,7 +1015,7 @@ func (rc *RangeCache) evictDescLocked(ctx context.Context, desc *roachpb.RangeDe // Cache is empty; nothing to do. return false } - cachedDesc := cachedEntry.Desc() + cachedDesc := cachedEntry.desc cachedNewer := cachedDesc.Generation > desc.Generation if cachedNewer { return false @@ -1097,7 +1097,7 @@ func (rc *RangeCache) getCachedRLocked( } // Return nil if the key does not belong to the range. - if !containsFn(entry.Desc(), key) { + if !containsFn(&entry.desc, key) { return nil, nil } return entry, rawEntry @@ -1143,7 +1143,7 @@ func (rc *RangeCache) insertLockedInner(ctx context.Context, rs []*cacheEntry) [ _, ok := ent.desc.GetReplicaDescriptorByID(replID) if !ok { log.Fatalf(ctx, "leaseholder replicaID: %d not part of descriptor: %s. lease: %s", - replID, ent.Desc(), ent.Lease()) + replID, ent.desc, ent.lease) } } // Note: we append the end key of each range to meta records @@ -1196,10 +1196,10 @@ func (rc *RangeCache) clearOlderOverlapping( func (rc *RangeCache) clearOlderOverlappingLocked( ctx context.Context, newEntry *cacheEntry, ) (ok bool, newerEntry *cacheEntry) { - log.VEventf(ctx, 2, "clearing entries overlapping %s", newEntry.Desc()) + log.VEventf(ctx, 2, "clearing entries overlapping %s", newEntry.desc) newest := true var newerFound *cacheEntry - overlapping := rc.getCachedOverlappingRLocked(ctx, newEntry.Desc().RSpan()) + overlapping := rc.getCachedOverlappingRLocked(ctx, newEntry.desc.RSpan()) for _, e := range overlapping { entry := rc.getValue(e) if newEntry.overrides(entry) { @@ -1209,7 +1209,7 @@ func (rc *RangeCache) clearOlderOverlappingLocked( rc.delEntryLocked(e) } else { newest = false - if descsCompatible(entry.Desc(), newEntry.Desc()) { + if descsCompatible(&entry.desc, &newEntry.desc) { newerFound = entry // We've found a similar descriptor in the cache; there can't be any // other overlapping ones so let's stop the iteration. @@ -1231,7 +1231,7 @@ func (rc *RangeCache) swapEntryLocked( ) { if newEntry != nil { old := rc.getValue(oldEntry) - if !descsCompatible(old.Desc(), newEntry.Desc()) { + if !descsCompatible(&old.desc, &newEntry.desc) { log.Fatalf(ctx, "attempting to swap non-compatible descs: %s vs %s", old, newEntry) } @@ -1245,7 +1245,7 @@ func (rc *RangeCache) swapEntryLocked( } func (rc *RangeCache) addEntryLocked(entry *cacheEntry) { - key := newRangeCacheKey(entry.Desc().StartKey) + key := newRangeCacheKey(entry.desc.StartKey) rc.rangeCache.cache.Add(key, entry) } @@ -1301,45 +1301,6 @@ type cacheEntry struct { closedts roachpb.RangeClosedTimestampPolicy } -func (e cacheEntry) String() string { - return fmt.Sprintf("desc:%s, lease:%s", e.Desc(), e.lease) -} - -// Desc returns the cached descriptor. Note that, besides being possibly stale, -// this descriptor also might not represent a descriptor that was ever -// committed. See DescSpeculative(). -func (e *cacheEntry) Desc() *roachpb.RangeDescriptor { - return &e.desc -} - -// Leaseholder returns the cached leaseholder replica, if known. Returns nil if -// the leaseholder is not known. -func (e *cacheEntry) Leaseholder() *roachpb.ReplicaDescriptor { - if e.lease.Empty() { - return nil - } - return &e.lease.Replica -} - -// Lease returns the cached lease, if known. Returns nil if no lease is known. -// It's possible for a leaseholder to be known, but not a full lease, in which -// case Leaseholder() returns non-nil but Lease() returns nil. -func (e *cacheEntry) Lease() *roachpb.Lease { - if e.lease.Empty() { - return nil - } - if e.LeaseSpeculative() { - return nil - } - return &e.lease -} - -// ClosedTimestampPolicy returns the cached understanding of the range's closed -// timestamp policy. If no policy is known, LAG_BY_CLUSTER_SETTING is returned. -func (e *cacheEntry) ClosedTimestampPolicy() roachpb.RangeClosedTimestampPolicy { - return e.closedts -} - // DescSpeculative returns true if the descriptor in the entry is "speculative" // - i.e. it doesn't correspond to a committed value. Such descriptors have been // inserted in the cache with Generation=0. @@ -1359,6 +1320,10 @@ func (e *cacheEntry) LeaseSpeculative() bool { return e.lease.Speculative() } +func (e cacheEntry) String() string { + return fmt.Sprintf("desc:%s, lease:%s", e.desc, e.lease) +} + func (e *cacheEntry) toRangeInfo() roachpb.RangeInfo { if e == nil { return roachpb.RangeInfo{} @@ -1366,7 +1331,7 @@ func (e *cacheEntry) toRangeInfo() roachpb.RangeInfo { return roachpb.RangeInfo{ Desc: e.desc, Lease: e.lease, - ClosedTimestampPolicy: e.ClosedTimestampPolicy(), + ClosedTimestampPolicy: e.closedts, } } @@ -1386,8 +1351,8 @@ func (e *cacheEntry) toRangeInfo() roachpb.RangeInfo { // "speculative" (sequence=0). func (e *cacheEntry) overrides(o *cacheEntry) bool { if util.RaceEnabled { - if _, err := e.Desc().RSpan().Intersect(o.Desc().RSpan()); err != nil { - panic(fmt.Sprintf("descriptors don't intersect: %s vs %s", e.Desc(), o.Desc())) + if _, err := e.desc.RSpan().Intersect(o.desc.RSpan()); err != nil { + panic(fmt.Sprintf("descriptors don't intersect: %s vs %s", e.desc, o.desc)) } } @@ -1400,9 +1365,9 @@ func (e *cacheEntry) overrides(o *cacheEntry) bool { // If two RangeDescriptors overlap and have the same Generation, they must // be referencing the same range, in which case their lease sequences are // comparable. - if e.Desc().RangeID != o.Desc().RangeID { + if e.desc.RangeID != o.desc.RangeID { panic(fmt.Sprintf("overlapping descriptors with same gen but different IDs: %s vs %s", - e.Desc(), o.Desc())) + e.desc, o.desc)) } if res := compareEntryLeases(o, e); res != 0 { @@ -1426,8 +1391,8 @@ func (e *cacheEntry) overrides(o *cacheEntry) bool { // older; this matches the semantics of b.overrides(a). func compareEntryDescs(a, b *cacheEntry) int { if util.RaceEnabled { - if _, err := a.Desc().RSpan().Intersect(b.Desc().RSpan()); err != nil { - panic(fmt.Sprintf("descriptors don't intersect: %s vs %s", a.Desc(), b.Desc())) + if _, err := a.desc.RSpan().Intersect(b.desc.RSpan()); err != nil { + panic(fmt.Sprintf("descriptors don't intersect: %s vs %s", a.desc, b.desc)) } } @@ -1439,10 +1404,10 @@ func compareEntryDescs(a, b *cacheEntry) int { return -1 } - if a.Desc().Generation < b.Desc().Generation { + if a.desc.Generation < b.desc.Generation { return -1 } - if a.Desc().Generation > b.Desc().Generation { + if a.desc.Generation > b.desc.Generation { return 1 } return 0 @@ -1472,10 +1437,10 @@ func compareEntryLeases(a, b *cacheEntry) int { return -1 } - if a.Lease().Sequence < b.Lease().Sequence { + if a.lease.Sequence < b.lease.Sequence { return -1 } - if a.Lease().Sequence > b.Lease().Sequence { + if a.lease.Sequence > b.lease.Sequence { return 1 } return 0 @@ -1507,9 +1472,9 @@ func compareEntryLeases(a, b *cacheEntry) int { func (e *cacheEntry) maybeUpdate( ctx context.Context, l *roachpb.Lease, rangeDesc *roachpb.RangeDescriptor, ) (updated, updatedLease bool, newEntry *cacheEntry) { - if !descsCompatible(e.Desc(), rangeDesc) { + if !descsCompatible(&e.desc, rangeDesc) { log.Fatalf(ctx, "attempting to update by comparing non-compatible descs: %s vs %s", - e.Desc(), rangeDesc) + e.desc, rangeDesc) } newEntry = &cacheEntry{ @@ -1567,7 +1532,7 @@ func (e *cacheEntry) maybeUpdate( 2, "incompatible leaseholder id: %d/descriptor %v pair; eliding lease update to the cache", newEntry.lease.Replica.ReplicaID, - newEntry.Desc(), + newEntry.desc, ) newEntry.lease = roachpb.Lease{} updatedLease = false diff --git a/pkg/kv/kvclient/rangecache/range_cache_test.go b/pkg/kv/kvclient/rangecache/range_cache_test.go index d7d5aac61a90..a6c1a9a9a56b 100644 --- a/pkg/kv/kvclient/rangecache/range_cache_test.go +++ b/pkg/kv/kvclient/rangecache/range_cache_test.go @@ -1033,27 +1033,27 @@ func TestRangeCacheClearOverlapping(t *testing.T) { cache.addEntryLocked(&cacheEntry{desc: *defDesc}) // Now, add a new, overlapping set of descriptors. - minToBDesc := &roachpb.RangeDescriptor{ + minToBDesc := roachpb.RangeDescriptor{ StartKey: roachpb.RKeyMin, EndKey: roachpb.RKey("b"), Generation: 1, } - bToMaxDesc := &roachpb.RangeDescriptor{ + bToMaxDesc := roachpb.RangeDescriptor{ StartKey: roachpb.RKey("b"), EndKey: roachpb.RKeyMax, Generation: 1, } curGeneration := roachpb.RangeGeneration(1) - require.True(t, clearOlderOverlapping(ctx, cache, minToBDesc)) - cache.addEntryLocked(&cacheEntry{desc: *minToBDesc}) + require.True(t, clearOlderOverlapping(ctx, cache, &minToBDesc)) + cache.addEntryLocked(&cacheEntry{desc: minToBDesc}) if ri := cache.GetCached(ctx, roachpb.RKey("b"), false); ri.Desc.IsInitialized() { t.Errorf("descriptor unexpectedly non-nil: %s", ri.Desc) } - require.True(t, clearOlderOverlapping(ctx, cache, bToMaxDesc)) - cache.addEntryLocked(&cacheEntry{desc: *bToMaxDesc}) + require.True(t, clearOlderOverlapping(ctx, cache, &bToMaxDesc)) + cache.addEntryLocked(&cacheEntry{desc: bToMaxDesc}) ri, _ := cache.getCachedRLocked(ctx, roachpb.RKey("b"), false) - require.Equal(t, bToMaxDesc, ri.Desc()) + require.Equal(t, bToMaxDesc, ri.desc) // Add default descriptor back which should remove two split descriptors. defDescCpy := *defDesc @@ -1063,20 +1063,20 @@ func TestRangeCacheClearOverlapping(t *testing.T) { cache.addEntryLocked(&cacheEntry{desc: defDescCpy}) for _, key := range []roachpb.RKey{roachpb.RKey("a"), roachpb.RKey("b")} { ri, _ = cache.getCachedRLocked(ctx, key, false) - require.Equal(t, &defDescCpy, ri.Desc()) + require.Equal(t, defDescCpy, ri.desc) } // Insert ["b", "c") and then insert ["a", b"). Verify that the former is not evicted by the latter. curGeneration++ - bToCDesc := &roachpb.RangeDescriptor{ + bToCDesc := roachpb.RangeDescriptor{ StartKey: roachpb.RKey("b"), EndKey: roachpb.RKey("c"), Generation: curGeneration, } - require.True(t, clearOlderOverlapping(ctx, cache, bToCDesc)) - cache.addEntryLocked(&cacheEntry{desc: *bToCDesc}) + require.True(t, clearOlderOverlapping(ctx, cache, &bToCDesc)) + cache.addEntryLocked(&cacheEntry{desc: bToCDesc}) ri, _ = cache.getCachedRLocked(ctx, roachpb.RKey("c"), true) - require.Equal(t, bToCDesc, ri.Desc()) + require.Equal(t, bToCDesc, ri.desc) curGeneration++ aToBDesc := &roachpb.RangeDescriptor{ @@ -1087,7 +1087,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) { require.True(t, clearOlderOverlapping(ctx, cache, aToBDesc)) cache.addEntryLocked(ri) ri, _ = cache.getCachedRLocked(ctx, roachpb.RKey("c"), true) - require.Equal(t, bToCDesc, ri.Desc()) + require.Equal(t, bToCDesc, ri.desc) } // Test The ClearOlderOverlapping. There's also the older @@ -1153,7 +1153,7 @@ func TestRangeCacheClearOlderOverlapping(t *testing.T) { expNewest bool // If expNewest is false, expNewer indicates the expected 2nd ret val of // clearOlderOverlapping(). - expNewer *roachpb.RangeDescriptor + expNewer roachpb.RangeDescriptor }{ { cachedDescs: nil, @@ -1172,7 +1172,7 @@ func TestRangeCacheClearOlderOverlapping(t *testing.T) { clearDesc: descAB1, expCache: []roachpb.RangeDescriptor{descAB2, descBC2, descCD2}, expNewest: false, - expNewer: &descAB2, + expNewer: descAB2, }, { cachedDescs: []roachpb.RangeDescriptor{descAB2, descBC2, descCD2}, @@ -1216,12 +1216,12 @@ func TestRangeCacheClearOlderOverlapping(t *testing.T) { if len(all) != 0 { allDescs = make([]roachpb.RangeDescriptor, len(all)) for i, e := range all { - allDescs[i] = *e.Desc() + allDescs[i] = e.desc } } - var newerDesc *roachpb.RangeDescriptor + var newerDesc roachpb.RangeDescriptor if newer != nil { - newerDesc = newer.Desc() + newerDesc = newer.desc } assert.Equal(t, tc.expCache, allDescs) @@ -1304,42 +1304,39 @@ func TestGetCachedRangeDescriptorInverted(t *testing.T) { testCases := []struct { queryKey roachpb.RKey - rng *roachpb.RangeDescriptor + rng roachpb.RangeDescriptor }{ { // Check range start key. queryKey: roachpb.RKey("l"), - rng: nil, }, { // Check some key in first range. queryKey: roachpb.RKey("0"), - rng: &roachpb.RangeDescriptor{StartKey: roachpb.RKeyMin, EndKey: roachpb.RKey("a")}, + rng: roachpb.RangeDescriptor{StartKey: roachpb.RKeyMin, EndKey: roachpb.RKey("a")}, }, { // Check end key of first range. queryKey: roachpb.RKey("a"), - rng: &roachpb.RangeDescriptor{StartKey: roachpb.RKeyMin, EndKey: roachpb.RKey("a")}, + rng: roachpb.RangeDescriptor{StartKey: roachpb.RKeyMin, EndKey: roachpb.RKey("a")}, }, { // Check range end key. queryKey: roachpb.RKey("c"), - rng: &roachpb.RangeDescriptor{StartKey: roachpb.RKey("a"), EndKey: roachpb.RKey("c")}, + rng: roachpb.RangeDescriptor{StartKey: roachpb.RKey("a"), EndKey: roachpb.RKey("c")}, }, { // Check range middle key. queryKey: roachpb.RKey("d"), - rng: &roachpb.RangeDescriptor{StartKey: roachpb.RKey("c"), EndKey: roachpb.RKey("e")}, + rng: roachpb.RangeDescriptor{StartKey: roachpb.RKey("c"), EndKey: roachpb.RKey("e")}, }, { // Check miss range key. queryKey: roachpb.RKey("f"), - rng: nil, }, { // Check range start key with previous range miss. queryKey: roachpb.RKey("l"), - rng: nil, }, } @@ -1349,11 +1346,11 @@ func TestGetCachedRangeDescriptorInverted(t *testing.T) { targetRange, _ := cache.getCachedRLocked(ctx, test.queryKey, true /* inverted */) cache.rangeCache.RUnlock() - if test.rng == nil { + if !test.rng.IsInitialized() { require.Nil(t, targetRange) } else { require.NotNil(t, targetRange) - require.Equal(t, test.rng, targetRange.Desc()) + require.Equal(t, test.rng, targetRange.desc) } }) } @@ -2016,149 +2013,143 @@ func TestRangeCacheEntryMaybeUpdate(t *testing.T) { } // Check that some lease overwrites an empty lease. - l := &roachpb.Lease{ + l := roachpb.Lease{ Replica: rep1, Sequence: 1, } - updated, updatedLease, e := e.maybeUpdate(ctx, l, &desc) + updated, updatedLease, e := e.maybeUpdate(ctx, &l, &desc) require.True(t, updated) require.True(t, updatedLease) - require.True(t, l.Equal(e.Lease())) - require.True(t, desc.Equal(e.Desc())) + require.True(t, l.Equal(e.lease)) + require.True(t, desc.Equal(e.desc)) // Check that another lease with no seq num overwrites any other lease when // the associated range descriptor isn't stale. - l = &roachpb.Lease{ + l = roachpb.Lease{ Replica: rep2, Sequence: 0, } - updated, updatedLease, e = e.maybeUpdate(ctx, l, &desc) + updated, updatedLease, e = e.maybeUpdate(ctx, &l, &desc) require.True(t, updated) require.True(t, updatedLease) - require.NotNil(t, e.Leaseholder()) - require.True(t, l.Replica.Equal(*e.Leaseholder())) - require.True(t, desc.Equal(e.Desc())) - // Check that Seq=0 leases are not returned by Lease(). - require.Nil(t, e.Lease()) + require.Equal(t, l.Replica, e.lease.Replica) + require.Equal(t, desc, e.desc) + // Check that the lease is Speculative. + require.True(t, e.lease.Speculative()) // Check that another lease with no sequence number overwrites a lease with no // sequence num as long as the associated range descriptor isn't stale. - l = &roachpb.Lease{ + l = roachpb.Lease{ Replica: rep1, Sequence: 0, } - updated, updatedLease, e = e.maybeUpdate(ctx, l, &desc) + updated, updatedLease, e = e.maybeUpdate(ctx, &l, &desc) require.True(t, updated) require.True(t, updatedLease) - require.NotNil(t, e.Leaseholder()) - require.True(t, l.Replica.Equal(*e.Leaseholder())) - require.True(t, desc.Equal(e.Desc())) - // Check that Seq=0 leases are not returned by Lease(). - require.Nil(t, e.Lease()) + require.Equal(t, l.Replica, e.lease.Replica) + require.Equal(t, desc, e.desc) + // Check that the lease is Speculative. + require.True(t, e.lease.Speculative()) oldL := l - l = &roachpb.Lease{ + l = roachpb.Lease{ Replica: repStaleMember, Sequence: 0, } // Ensure that a speculative lease is not overwritten when accompanied by a // stale range descriptor. - updated, updatedLease, e = e.maybeUpdate(ctx, l, &staleDesc) + updated, updatedLease, e = e.maybeUpdate(ctx, &l, &staleDesc) require.False(t, updated) require.False(t, updatedLease) - require.NotNil(t, e.Leaseholder()) - require.True(t, oldL.Replica.Equal(*e.Leaseholder())) - require.True(t, desc.Equal(e.Desc())) + require.Equal(t, oldL.Replica, e.lease.Replica) + require.Equal(t, desc, e.desc) // The old lease is still speculative; ensure it isn't returned by Lease(). - require.Nil(t, e.Lease()) + require.True(t, e.lease.Speculative()) // Ensure a speculative lease is not overwritten by a "real" lease if the // accompanying range descriptor is stale. - l = &roachpb.Lease{ + l = roachpb.Lease{ Replica: rep1, Sequence: 1, } - updated, updatedLease, e = e.maybeUpdate(ctx, l, &staleDesc) + updated, updatedLease, e = e.maybeUpdate(ctx, &l, &staleDesc) require.False(t, updated) require.False(t, updatedLease) - require.NotNil(t, e.Leaseholder()) - require.True(t, oldL.Replica.Equal(*e.Leaseholder())) - require.True(t, desc.Equal(e.Desc())) + require.Equal(t, oldL.Replica, e.lease.Replica) + require.Equal(t, desc, e.desc) // Empty out the lease and ensure that it is overwritten by a lease even if // the accompanying range descriptor is stale. e.lease = roachpb.Lease{} - updated, updatedLease, e = e.maybeUpdate(ctx, l, &staleDesc) + updated, updatedLease, e = e.maybeUpdate(ctx, &l, &staleDesc) require.True(t, updated) require.True(t, updatedLease) - require.NotNil(t, e.Leaseholder()) - require.True(t, oldL.Replica.Equal(*e.Leaseholder())) - require.True(t, e.Lease().Equal(l)) + require.Equal(t, oldL.Replica, e.lease.Replica) + require.Equal(t, l, e.lease) // The range descriptor shouldn't be updated because the one supplied was // stale. - require.True(t, desc.Equal(e.Desc())) + require.Equal(t, desc, e.desc) // Ensure that a newer lease overwrites an older lease. - l = &roachpb.Lease{ + l = roachpb.Lease{ Replica: rep2, Sequence: 2, } - updated, updatedLease, e = e.maybeUpdate(ctx, l, &desc) + updated, updatedLease, e = e.maybeUpdate(ctx, &l, &desc) require.True(t, updated) require.True(t, updatedLease) - require.NotNil(t, e.Leaseholder()) - require.True(t, l.Equal(*e.Lease())) - require.True(t, desc.Equal(e.Desc())) + require.Equal(t, l, e.lease) + require.Equal(t, desc, e.desc) // Check that updating to an older lease doesn't work. - l = &roachpb.Lease{ + l = roachpb.Lease{ Replica: rep1, Sequence: 1, } - updated, updatedLease, e = e.maybeUpdate(ctx, l, &desc) + updated, updatedLease, e = e.maybeUpdate(ctx, &l, &desc) require.False(t, updated) require.False(t, updatedLease) - require.False(t, l.Equal(*e.Lease())) + require.NotEqual(t, l, e.lease) // Check that updating to an older descriptor doesn't work. - updated, updatedLease, e = e.maybeUpdate(ctx, l, &staleDesc) + updated, updatedLease, e = e.maybeUpdate(ctx, &l, &staleDesc) require.False(t, updated) require.False(t, updatedLease) - require.True(t, desc.Equal(e.Desc())) + require.Equal(t, desc, e.desc) // Check that updating to the same lease returns false. - l = &roachpb.Lease{ + l = roachpb.Lease{ Replica: rep2, Sequence: 2, } - require.True(t, l.Equal(e.Lease())) - updated, updatedLease, e = e.maybeUpdate(ctx, l, &desc) + require.Equal(t, l, e.lease) + updated, updatedLease, e = e.maybeUpdate(ctx, &l, &desc) require.False(t, updated) require.False(t, updatedLease) - require.True(t, l.Equal(e.Lease())) - require.True(t, desc.Equal(e.Desc())) + require.Equal(t, l, e.lease) + require.Equal(t, desc, e.desc) // Check that updating just the descriptor to a newer descriptor returns the // correct values for updated and updatedLease. - updated, updatedLease, e = e.maybeUpdate(ctx, l, &desc2) + updated, updatedLease, e = e.maybeUpdate(ctx, &l, &desc2) require.True(t, updated) require.False(t, updatedLease) - require.True(t, l.Equal(e.Lease())) - require.True(t, desc2.Equal(e.Desc())) + require.Equal(t, l, e.lease) + require.Equal(t, desc2, e.desc) // Check that updating the cache entry to a newer descriptor such that it // makes the (freshest) lease incompatible clears out the lease on the // returned cache entry. - l = &roachpb.Lease{ + l = roachpb.Lease{ Replica: rep1, Sequence: 1, } - require.Equal(t, roachpb.LeaseSequence(2), e.Lease().Sequence) - updated, updatedLease, e = e.maybeUpdate(ctx, l, &desc3) + require.Equal(t, roachpb.LeaseSequence(2), e.lease.Sequence) + updated, updatedLease, e = e.maybeUpdate(ctx, &l, &desc3) require.True(t, updated) require.False(t, updatedLease) - require.Nil(t, e.Lease()) - require.True(t, desc3.Equal(e.Desc())) + require.True(t, e.lease.Empty()) + require.Equal(t, desc3, e.desc) } func TestRangeCacheEntryOverrides(t *testing.T) { From 0a14ee7634a445a77ba9cedb44ab38b1d9c4fb58 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Thu, 25 Jan 2024 16:14:54 -0500 Subject: [PATCH 5/9] kvclient: don't return a cache.Entry from GetCachedOverlapping The cache.Entry is not intended to be public, instead return a RangeInfo. Epic: none Release note: None --- pkg/kv/kvclient/rangecache/range_cache.go | 6 +++--- pkg/kv/kvclient/rangecache/range_cache_test.go | 10 +++++----- pkg/sql/execinfra/readerbase.go | 14 +++++--------- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index 2b454ad4cd06..6429b90cf6c0 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -622,13 +622,13 @@ func (rc *RangeCache) Lookup(ctx context.Context, key roachpb.RKey) (roachpb.Ran // GetCachedOverlapping returns all the cached entries which overlap a given // span [Key, EndKey). The results are sorted ascendingly. -func (rc *RangeCache) GetCachedOverlapping(ctx context.Context, span roachpb.RSpan) []*cacheEntry { +func (rc *RangeCache) GetCachedOverlapping(ctx context.Context, span roachpb.RSpan) []roachpb.RangeInfo { rc.rangeCache.RLock() defer rc.rangeCache.RUnlock() rawEntries := rc.getCachedOverlappingRLocked(ctx, span) - entries := make([]*cacheEntry, len(rawEntries)) + entries := make([]roachpb.RangeInfo, len(rawEntries)) for i, e := range rawEntries { - entries[i] = rc.getValue(e) + entries[i] = rc.getValue(e).toRangeInfo() } return entries } diff --git a/pkg/kv/kvclient/rangecache/range_cache_test.go b/pkg/kv/kvclient/rangecache/range_cache_test.go index a6c1a9a9a56b..ecf773caefb4 100644 --- a/pkg/kv/kvclient/rangecache/range_cache_test.go +++ b/pkg/kv/kvclient/rangecache/range_cache_test.go @@ -1216,7 +1216,7 @@ func TestRangeCacheClearOlderOverlapping(t *testing.T) { if len(all) != 0 { allDescs = make([]roachpb.RangeDescriptor, len(all)) for i, e := range all { - allDescs[i] = e.desc + allDescs[i] = e.Desc } } var newerDesc roachpb.RangeDescriptor @@ -1874,8 +1874,8 @@ func TestRangeCacheSyncTokenAndMaybeUpdateCache(t *testing.T) { ctx, roachpb.RSpan{Key: roachpb.RKeyMin, EndKey: roachpb.RKeyMax}, ) require.Equal(t, 1, len(entries)) - require.Equal(t, incompatibleDescriptor, entries[0].desc) - require.Equal(t, l, entries[0].lease) + require.Equal(t, incompatibleDescriptor, entries[0].Desc) + require.Equal(t, l, entries[0].Lease) }, }, { @@ -1908,8 +1908,8 @@ func TestRangeCacheSyncTokenAndMaybeUpdateCache(t *testing.T) { ctx, roachpb.RSpan{Key: roachpb.RKeyMin, EndKey: roachpb.RKeyMax}, ) require.Equal(t, 1, len(entries)) - require.Equal(t, desc2, entries[0].desc) - require.Equal(t, l, entries[0].lease) + require.Equal(t, desc2, entries[0].Desc) + require.Equal(t, l, entries[0].Lease) }, }, { diff --git a/pkg/sql/execinfra/readerbase.go b/pkg/sql/execinfra/readerbase.go index fda11b3ae2b0..1c71ff48a0e0 100644 --- a/pkg/sql/execinfra/readerbase.go +++ b/pkg/sql/execinfra/readerbase.go @@ -66,24 +66,20 @@ func MisplannedRanges( overlapping := rdc.GetCachedOverlapping(ctx, rSpan) for _, ri := range overlapping { - if _, ok := misplanned[ri.Desc().RangeID]; ok { + if _, ok := misplanned[ri.Desc.RangeID]; ok { // We're already returning info about this range. continue } // Ranges with unknown leases are not returned, as the current node might // actually have the lease without the local cache knowing about it. - l := ri.Lease() - if l != nil && l.Replica.NodeID != nodeID { - misplannedRanges = append(misplannedRanges, roachpb.RangeInfo{ - Desc: *ri.Desc(), - Lease: *l, - ClosedTimestampPolicy: ri.ClosedTimestampPolicy(), - }) + l := ri.Lease + if !l.Empty() && l.Replica.NodeID != nodeID { + misplannedRanges = append(misplannedRanges, ri) if misplanned == nil { misplanned = make(map[roachpb.RangeID]struct{}) } - misplanned[ri.Desc().RangeID] = struct{}{} + misplanned[ri.Desc.RangeID] = struct{}{} } } } From 4b4d8c8abbd7cc816f751e1192643011d1fffe09 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Thu, 25 Jan 2024 16:26:41 -0500 Subject: [PATCH 6/9] kvclient: remove MakeEvictionToken from the interface This method was only used internally and is simple to create manually now. It was confusing to have it on the public interface as its not the right way to create a token. Epic: none Release note: None --- pkg/kv/kvclient/rangecache/range_cache.go | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index 6429b90cf6c0..710fb4c52243 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -278,18 +278,6 @@ type EvictionToken struct { entry *cacheEntry } -func (rc *RangeCache) makeEvictionToken(entry *cacheEntry) EvictionToken { - return EvictionToken{ - rdc: rc, - entry: entry, - } -} - -// MakeEvictionToken is the exported ctor. For tests only. -func (rc *RangeCache) MakeEvictionToken(entry *cacheEntry) EvictionToken { - return rc.makeEvictionToken(entry) -} - func (et EvictionToken) String() string { if !et.Valid() { return "" @@ -622,7 +610,9 @@ func (rc *RangeCache) Lookup(ctx context.Context, key roachpb.RKey) (roachpb.Ran // GetCachedOverlapping returns all the cached entries which overlap a given // span [Key, EndKey). The results are sorted ascendingly. -func (rc *RangeCache) GetCachedOverlapping(ctx context.Context, span roachpb.RSpan) []roachpb.RangeInfo { +func (rc *RangeCache) GetCachedOverlapping( + ctx context.Context, span roachpb.RSpan, +) []roachpb.RangeInfo { rc.rangeCache.RLock() defer rc.rangeCache.RUnlock() rawEntries := rc.getCachedOverlappingRLocked(ctx, span) @@ -711,7 +701,7 @@ func (rc *RangeCache) tryLookup( rc.rangeCache.RLock() if entry, _ := rc.getCachedRLocked(ctx, key, useReverseScan); entry != nil { rc.rangeCache.RUnlock() - returnToken := rc.makeEvictionToken(entry) + returnToken := EvictionToken{rdc: rc, entry: entry} return returnToken, nil } @@ -967,7 +957,7 @@ func tryLookupImpl( } entry = &newEntry } - lookupRes = rc.makeEvictionToken(entry) + lookupRes = EvictionToken{rdc: rc, entry: entry} return lookupRes, nil } From 3499116aaa19bb46899418aac9bfe40573f8f761 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Thu, 9 Nov 2023 09:24:18 -0500 Subject: [PATCH 7/9] kvclient: replace ReplicaSlice with ReplicaSet for routing This PR replaces ReplicaSlice with ReplicaSet outside of the DistSender. ReplicaSlice is an internal implementation which contains information only required for sorting the replicas. Outside of DistSender the additional sorting information is unnecessary and unused. Epic: none Informs: #112351 Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 2 +- .../kvcoord/dist_sender_ambiguous_test.go | 2 +- .../kvclient/kvcoord/dist_sender_rangefeed.go | 2 +- .../dist_sender_rangefeed_mock_test.go | 2 +- .../kvcoord/dist_sender_rangefeed_test.go | 2 +- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 10 ++++----- .../kvcoord/local_test_cluster_util.go | 2 +- pkg/kv/kvclient/kvcoord/range_iter_test.go | 2 +- .../kvclient/kvcoord/replayed_commit_test.go | 3 +-- pkg/kv/kvclient/kvcoord/replica_slice.go | 4 ++++ pkg/kv/kvclient/kvcoord/send_test.go | 8 +++---- pkg/kv/kvclient/kvcoord/transport.go | 22 +++++++++---------- pkg/kv/kvclient/kvcoord/transport_race.go | 3 ++- pkg/kv/kvclient/kvcoord/transport_regular.go | 7 ++++-- pkg/roachpb/metadata_replicas.go | 4 ++++ pkg/sql/ambiguous_commit_test.go | 2 +- 16 files changed, 44 insertions(+), 33 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index e6ba512af22c..b66c28214808 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2301,7 +2301,7 @@ func (ds *DistSender) sendToReplicas( metrics: &ds.metrics, dontConsiderConnHealth: ds.dontConsiderConnHealth, } - transport, err := ds.transportFactory(opts, replicas) + transport, err := ds.transportFactory(opts, replicas.AsReplicaSet()) if err != nil { return nil, err } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go index 017aa011b695..771348c949c7 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go @@ -297,7 +297,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { } getInterceptingTransportFactory := func(nID roachpb.NodeID) func(kvcoord.TransportFactory) kvcoord.TransportFactory { return func(factory kvcoord.TransportFactory) kvcoord.TransportFactory { - return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) { transport, tErr := factory(options, slice) interceptor := &interceptingTransport{ Transport: transport, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index ef1946ea4e86..06c352dfe9cc 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -756,7 +756,7 @@ func newTransportForRange( } replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality) opts := SendOptions{class: defRangefeedConnClass} - return ds.transportFactory(opts, replicas) + return ds.transportFactory(opts, replicas.AsReplicaSet()) } // makeRangeFeedRequest constructs kvpb.RangeFeedRequest for specified span and diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go index 1d0688206e86..cb8be6a2e168 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -146,7 +146,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { NodeDescs: g, RPCRetryOptions: &retry.Options{MaxRetries: 10}, Stopper: stopper, - TransportFactory: func(options SendOptions, slice ReplicaSlice) (Transport, error) { + TransportFactory: func(options SendOptions, slice roachpb.ReplicaSet) (Transport, error) { return transport, nil }, RangeDescriptorDB: rangeDB, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index bc2eb068df5e..2075e3b0cd23 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -169,7 +169,7 @@ func makeTransportFactory( rfStreamEnabled bool, counts *internalClientCounts, wrapFn wrapRangeFeedClientFn, ) func(kvcoord.TransportFactory) kvcoord.TransportFactory { return func(factory kvcoord.TransportFactory) kvcoord.TransportFactory { - return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) { transport, err := factory(options, slice) if err != nil { return nil, err diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 2336a22274b8..96b17cd2223e 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -145,7 +145,7 @@ var stubRPCSendFn simpleSendFn = func( func adaptSimpleTransport(fn simpleSendFn) TransportFactory { return func( _ SendOptions, - replicas ReplicaSlice, + replicas roachpb.ReplicaSet, ) (Transport, error) { return &simpleTransportAdapter{ fn: fn, @@ -419,7 +419,7 @@ func TestSendRPCOrder(t *testing.T) { var verifyCall func(SendOptions, []roachpb.ReplicaDescriptor) error var transportFactory TransportFactory = func( - opts SendOptions, replicas ReplicaSlice, + opts SendOptions, replicas roachpb.ReplicaSet, ) (Transport, error) { reps := replicas.Descriptors() if err := verifyCall(opts, reps); err != nil { @@ -3384,7 +3384,7 @@ func TestSenderTransport(t *testing.T) { ) (r *kvpb.BatchResponse, e *kvpb.Error) { return }, - ))(SendOptions{}, ReplicaSlice{{}}) + ))(SendOptions{}, roachpb.MakeReplicaSet([]roachpb.ReplicaDescriptor{{}})) if err != nil { t.Fatal(err) } @@ -4203,7 +4203,7 @@ func TestConnectionClass(t *testing.T) { // class will capture the connection class used for the last transport // created. var class rpc.ConnectionClass - var transportFactory TransportFactory = func(opts SendOptions, replicas ReplicaSlice) (Transport, error) { + var transportFactory TransportFactory = func(opts SendOptions, replicas roachpb.ReplicaSet) (Transport, error) { class = opts.class return adaptSimpleTransport( func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { @@ -5472,7 +5472,7 @@ func TestDistSenderComputeNetworkCost(t *testing.T) { tc.cfg.Stopper = stopper tc.cfg.RangeDescriptorDB = rddb tc.cfg.Settings = st - tc.cfg.TransportFactory = func(SendOptions, ReplicaSlice) (Transport, error) { + tc.cfg.TransportFactory = func(SendOptions, roachpb.ReplicaSet) (Transport, error) { assert.Fail(t, "test should not try and use the transport factory") return nil, nil } diff --git a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go index bffb408877fc..ef74928c4336 100644 --- a/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go +++ b/pkg/kv/kvclient/kvcoord/local_test_cluster_util.go @@ -90,7 +90,7 @@ func NewDistSenderForLocalTestCluster( Stopper: stopper, RPCRetryOptions: &retryOpts, FirstRangeProvider: g, - TransportFactory: func(opts SendOptions, replicas ReplicaSlice) (Transport, error) { + TransportFactory: func(opts SendOptions, replicas roachpb.ReplicaSet) (Transport, error) { transport, err := senderTransportFactory(opts, replicas) if err != nil { return nil, err diff --git a/pkg/kv/kvclient/kvcoord/range_iter_test.go b/pkg/kv/kvclient/kvcoord/range_iter_test.go index 36f716134004..fb602a718f07 100644 --- a/pkg/kv/kvclient/kvcoord/range_iter_test.go +++ b/pkg/kv/kvclient/kvcoord/range_iter_test.go @@ -48,7 +48,7 @@ func init() { alphaRangeDescriptorDB = mockRangeDescriptorDBForDescs( append(alphaRangeDescriptors, TestMetaRangeDescriptor)..., ) - tf = func(options SendOptions, slice ReplicaSlice) (Transport, error) { + tf = func(options SendOptions, slice roachpb.ReplicaSet) (Transport, error) { panic("transport not set up for use") } } diff --git a/pkg/kv/kvclient/kvcoord/replayed_commit_test.go b/pkg/kv/kvclient/kvcoord/replayed_commit_test.go index 58916c9d13ea..5de9cc688776 100644 --- a/pkg/kv/kvclient/kvcoord/replayed_commit_test.go +++ b/pkg/kv/kvclient/kvcoord/replayed_commit_test.go @@ -52,7 +52,7 @@ func TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit(t *testing.T } args.ServerArgs.Knobs.KVClient = &kvcoord.ClientTestingKnobs{ TransportFactory: func(factory kvcoord.TransportFactory) kvcoord.TransportFactory { - return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) { tf, err := factory(options, slice) if err != nil { return nil, err @@ -74,7 +74,6 @@ func TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit(t *testing.T }, nil } - }, } diff --git a/pkg/kv/kvclient/kvcoord/replica_slice.go b/pkg/kv/kvclient/kvcoord/replica_slice.go index 4b985c69d03e..846ac7db90c7 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice.go @@ -266,6 +266,10 @@ func (rs ReplicaSlice) OptimizeReplicaOrder( }) } +func (rs ReplicaSlice) AsReplicaSet() roachpb.ReplicaSet { + return roachpb.MakeReplicaSet(rs.Descriptors()) +} + // Descriptors returns the ReplicaDescriptors inside the ReplicaSlice. func (rs ReplicaSlice) Descriptors() []roachpb.ReplicaDescriptor { reps := make([]roachpb.ReplicaDescriptor, len(rs)) diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index 4e7a8164fad2..db83749eeca1 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -153,13 +153,13 @@ func TestSendToOneClient(t *testing.T) { // firstNErrorTransport is a mock transport that sends an error on // requests to the first N addresses, then succeeds. type firstNErrorTransport struct { - replicas ReplicaSlice + replicas roachpb.ReplicaSet numErrors int numSent int } func (f *firstNErrorTransport) IsExhausted() bool { - return f.numSent >= len(f.replicas) + return f.numSent >= len(f.replicas.Descriptors()) } func (f *firstNErrorTransport) Release() {} @@ -182,7 +182,7 @@ func (f *firstNErrorTransport) NextInternalClient( } func (f *firstNErrorTransport) NextReplica() roachpb.ReplicaDescriptor { - return f.replicas[f.numSent].ReplicaDescriptor + return f.replicas.Descriptors()[f.numSent] } func (f *firstNErrorTransport) SkipReplica() { @@ -241,7 +241,7 @@ func TestComplexScenarios(t *testing.T) { t, func( _ SendOptions, - replicas ReplicaSlice, + replicas roachpb.ReplicaSet, ) (Transport, error) { return &firstNErrorTransport{ replicas: replicas, diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index 7e9a9206789a..130a813b159c 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -49,7 +49,7 @@ type SendOptions struct { // // The caller is responsible for ordering the replicas in the slice according to // the order in which the should be tried. -type TransportFactory func(SendOptions, ReplicaSlice) (Transport, error) +type TransportFactory func(SendOptions, roachpb.ReplicaSet) (Transport, error) // Transport objects can send RPCs to one or more replicas of a range. // All calls to Transport methods are made from a single thread, so @@ -101,24 +101,24 @@ const ( // During race builds, we wrap this to hold on to and read all obtained // requests in a tight loop, exposing data races; see transport_race.go. func grpcTransportFactoryImpl( - opts SendOptions, nodeDialer *nodedialer.Dialer, rs ReplicaSlice, + opts SendOptions, nodeDialer *nodedialer.Dialer, rs roachpb.ReplicaSet, ) (Transport, error) { transport := grpcTransportPool.Get().(*grpcTransport) // Grab the saved slice memory from grpcTransport. replicas := transport.replicas - if cap(replicas) < len(rs) { - replicas = make([]roachpb.ReplicaDescriptor, len(rs)) + descriptors := rs.Descriptors() + if cap(replicas) < len(descriptors) { + replicas = make([]roachpb.ReplicaDescriptor, len(descriptors)) } else { - replicas = replicas[:len(rs)] + replicas = replicas[:len(descriptors)] } // We'll map the index of the replica descriptor in its slice to its health. var health util.FastIntMap - for i := range rs { - r := &rs[i] - replicas[i] = r.ReplicaDescriptor - healthy := nodeDialer.ConnHealth(r.NodeID, opts.class) == nil + for i, desc := range descriptors { + replicas[i] = desc + healthy := nodeDialer.ConnHealth(desc.NodeID, opts.class) == nil if healthy { health.Set(i, healthHealthy) } else { @@ -317,9 +317,9 @@ func (h *byHealth) Less(i, j int) bool { // Transport. This is useful for tests that want to use DistSender // without a full RPC stack. func SenderTransportFactory(tracer *tracing.Tracer, sender kv.Sender) TransportFactory { - return func(_ SendOptions, replicas ReplicaSlice) (Transport, error) { + return func(_ SendOptions, replicas roachpb.ReplicaSet) (Transport, error) { // Always send to the first replica. - replica := replicas[0].ReplicaDescriptor + replica := replicas.First() return &senderTransport{tracer, sender, replica, false}, nil } } diff --git a/pkg/kv/kvclient/kvcoord/transport_race.go b/pkg/kv/kvclient/kvcoord/transport_race.go index 1033e4cc39bf..ba5b804a037c 100644 --- a/pkg/kv/kvclient/kvcoord/transport_race.go +++ b/pkg/kv/kvclient/kvcoord/transport_race.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -92,7 +93,7 @@ func (tr raceTransport) SendNext( // a) the server doesn't hold on to any memory, and // b) the server doesn't mutate the request func GRPCTransportFactory(nodeDialer *nodedialer.Dialer) TransportFactory { - return func(opts SendOptions, replicas ReplicaSlice) (Transport, error) { + return func(opts SendOptions, replicas roachpb.ReplicaSet) (Transport, error) { if atomic.AddInt32(&running, 1) <= 1 { if err := nodeDialer.Stopper().RunAsyncTask( context.TODO(), "transport racer", func(ctx context.Context) { diff --git a/pkg/kv/kvclient/kvcoord/transport_regular.go b/pkg/kv/kvclient/kvcoord/transport_regular.go index b20c16280232..82829cfae800 100644 --- a/pkg/kv/kvclient/kvcoord/transport_regular.go +++ b/pkg/kv/kvclient/kvcoord/transport_regular.go @@ -13,11 +13,14 @@ package kvcoord -import "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" +) // GRPCTransportFactory is the default TransportFactory, using GRPC. func GRPCTransportFactory(nodeDialer *nodedialer.Dialer) TransportFactory { - return func(options SendOptions, slice ReplicaSlice) (Transport, error) { + return func(options SendOptions, slice roachpb.ReplicaSet) (Transport, error) { return grpcTransportFactoryImpl(options, nodeDialer, slice) } } diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index 9c764568c9e0..aeb322d97d5d 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -47,6 +47,10 @@ func (d ReplicaSet) String() string { return redact.StringWithoutMarkers(d) } +func (d ReplicaSet) First() ReplicaDescriptor { + return d.wrapped[0] +} + // Descriptors returns every replica descriptor in the set, including both voter // replicas and learner replicas. Voter replicas are ordered first in the // returned slice. diff --git a/pkg/sql/ambiguous_commit_test.go b/pkg/sql/ambiguous_commit_test.go index 54249166032a..18b8ae5dbfac 100644 --- a/pkg/sql/ambiguous_commit_test.go +++ b/pkg/sql/ambiguous_commit_test.go @@ -93,7 +93,7 @@ func TestAmbiguousCommit(t *testing.T) { params.Knobs.KVClient = &kvcoord.ClientTestingKnobs{ TransportFactory: func(factory kvcoord.TransportFactory) kvcoord.TransportFactory { - return func(options kvcoord.SendOptions, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + return func(options kvcoord.SendOptions, slice roachpb.ReplicaSet) (kvcoord.Transport, error) { transport, err := factory(options, slice) return &interceptingTransport{ Transport: transport, From 92bb015b0ca0fd0df9a0a9cdebc6d1bf9612be58 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Thu, 25 Jan 2024 21:31:47 -0500 Subject: [PATCH 8/9] kvclient: pull out the method to sort replicas In preperation for caching the sorted replicas, pull out the function to generate the two sorted lists. This is meant to be an intermediate commit as it will have worse performance characteristics since it always computes the list twice. Epic: none Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 103 +++++++++++++------------ 1 file changed, 52 insertions(+), 51 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index b66c28214808..1263f54b7357 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2198,6 +2198,46 @@ func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error { // value when populating the batch header. const defaultSendClosedTimestampPolicy = roachpb.LEAD_FOR_GLOBAL_READS +func (ds *DistSender) computeSortedReplicas( + ctx context.Context, desc *roachpb.RangeDescriptor, leaseholder *roachpb.ReplicaDescriptor, +) (roachpb.ReplicaSet, roachpb.ReplicaSet, error) { + leaseholderSet, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, OnlyPotentialLeaseholders) + if err != nil { + return roachpb.ReplicaSet{}, roachpb.ReplicaSet{}, err + } + + // Rearrange the replicas so that they're ordered according to the routing + // policy. + // First order by latency, then move the leaseholder to the front of the + // list, if it is known. + if !ds.dontReorderReplicas { + leaseholderSet.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.HealthFunc(), ds.latencyFunc, ds.locality) + } + + idx := -1 + if leaseholder != nil { + idx = leaseholderSet.Find(leaseholder.ReplicaID) + } + if idx != -1 { + leaseholderSet.MoveToFront(idx) + } else { + // The leaseholder node's info must have been missing from gossip when we + // created replicas. + log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not known") + } + + // Order by latency. + followerSet, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, AllExtantReplicas) + if err != nil { + return roachpb.ReplicaSet{}, roachpb.ReplicaSet{}, err + } + log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required") + followerSet.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.HealthFunc(), ds.latencyFunc, ds.locality) + + // Convert to ReplicaSet, we no longer need any of the sorting information. + return leaseholderSet.AsReplicaSet(), followerSet.AsReplicaSet(), nil +} + // sendToReplicas sends a batch to the replicas of a range. Replicas are tried one // at a time (generally the leaseholder first). The result of this call is // either a BatchResponse or an error. In the former case, the BatchResponse @@ -2241,67 +2281,28 @@ func (ds *DistSender) sendToReplicas( ba = ba.ShallowCopy() ba.RoutingPolicy = kvpb.RoutingPolicy_NEAREST } - // Filter the replicas to only those that are relevant to the routing policy. - // NB: When changing leaseholder policy constraint_status_report should be - // updated appropriately. - var replicaFilter ReplicaSliceFilter - switch ba.RoutingPolicy { - case kvpb.RoutingPolicy_LEASEHOLDER: - replicaFilter = OnlyPotentialLeaseholders - case kvpb.RoutingPolicy_NEAREST: - replicaFilter = AllExtantReplicas - default: - log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy) - } desc := routing.Desc() leaseholder := routing.Leaseholder() - replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, replicaFilter) + leaserholderSet, followerSet, err := ds.computeSortedReplicas(ctx, desc, leaseholder) if err != nil { - return nil, err - } - - // Rearrange the replicas so that they're ordered according to the routing - // policy. - var leaseholderFirst bool - switch ba.RoutingPolicy { - case kvpb.RoutingPolicy_LEASEHOLDER: - // First order by latency, then move the leaseholder to the front of the - // list, if it is known. - if !ds.dontReorderReplicas { - replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality) - } - - idx := -1 - if leaseholder != nil { - idx = replicas.Find(leaseholder.ReplicaID) - } - if idx != -1 { - replicas.MoveToFront(idx) - leaseholderFirst = true - } else { - // The leaseholder node's info must have been missing from gossip when we - // created replicas. - log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not known") - } - - case kvpb.RoutingPolicy_NEAREST: - // Order by latency. - log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required") - replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.healthFunc, ds.latencyFunc, ds.locality) - - default: - log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy) + return nil, newSendError(errors.New("Unable to compute the sorted replica list")) + } + // Generate the cached descriptors based on whether we are routing to + // NEAREST or LEASEHOLDER. + leaseholderFirst := ba.RoutingPolicy == kvpb.RoutingPolicy_LEASEHOLDER + var replicas roachpb.ReplicaSet + if leaseholderFirst { + replicas = leaserholderSet + } else { + replicas = followerSet } - // NB: upgrade the connection class to SYSTEM, for critical ranges. Set it to - // DEFAULT if the class is unknown, to handle mixed-version states gracefully. - // Other kinds of overrides are possible, see rpc.ConnectionClassForKey(). opts := SendOptions{ class: rpc.ConnectionClassForKey(desc.RSpan().Key, ba.ConnectionClass), metrics: &ds.metrics, dontConsiderConnHealth: ds.dontConsiderConnHealth, } - transport, err := ds.transportFactory(opts, replicas.AsReplicaSet()) + transport, err := ds.transportFactory(opts, replicas) if err != nil { return nil, err } From b9025b8233f261168948b6e3fa29ef98d1a23fb4 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Tue, 7 Nov 2023 17:56:39 -0500 Subject: [PATCH 9/9] kvclient: cache the sorted list of replicas Calling OptimizeReplicaOrder is expensive and can show up as a hotspot in many high QPS workloads. This PR caches the sorted list on a per-range basis and updates the cache at most once every 3s. Epic: none Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 21 +++---- pkg/kv/kvclient/kvcoord/transport.go | 11 +--- pkg/kv/kvclient/rangecache/range_cache.go | 73 +++++++++++++++++++++++ 3 files changed, 84 insertions(+), 21 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 1263f54b7357..05a1031eb5f3 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2281,21 +2281,16 @@ func (ds *DistSender) sendToReplicas( ba = ba.ShallowCopy() ba.RoutingPolicy = kvpb.RoutingPolicy_NEAREST } - desc := routing.Desc() - leaseholder := routing.Leaseholder() - leaserholderSet, followerSet, err := ds.computeSortedReplicas(ctx, desc, leaseholder) - if err != nil { - return nil, newSendError(errors.New("Unable to compute the sorted replica list")) - } - // Generate the cached descriptors based on whether we are routing to + // Generate or load the cached descriptors based on whether we are routing to // NEAREST or LEASEHOLDER. - leaseholderFirst := ba.RoutingPolicy == kvpb.RoutingPolicy_LEASEHOLDER - var replicas roachpb.ReplicaSet - if leaseholderFirst { - replicas = leaserholderSet - } else { - replicas = followerSet + // NB: When changing leaseholder policy constraint_status_report should be + // updated appropriately. + var leaseholderFirst = ba.RoutingPolicy == kvpb.RoutingPolicy_LEASEHOLDER + replicas, stillValid := routing.SortedReplicas(ctx, leaseholderFirst, ds.computeSortedReplicas) + if !stillValid { + return nil, newSendError(errors.New("Unable to compute the sorted replica list")) } + desc := routing.Desc() opts := SendOptions{ class: rpc.ConnectionClassForKey(desc.RSpan().Key, ba.ConnectionClass), diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index 130a813b159c..dbf40ff0edac 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -105,14 +105,10 @@ func grpcTransportFactoryImpl( ) (Transport, error) { transport := grpcTransportPool.Get().(*grpcTransport) // Grab the saved slice memory from grpcTransport. - replicas := transport.replicas - descriptors := rs.Descriptors() - if cap(replicas) < len(descriptors) { - replicas = make([]roachpb.ReplicaDescriptor, len(descriptors)) - } else { - replicas = replicas[:len(descriptors)] - } + // TODO(baptist): Remove this copy once transport no longer modifies replicas. + replicas := make([]roachpb.ReplicaDescriptor, len(descriptors)) + copy(replicas, descriptors) // We'll map the index of the replica descriptor in its slice to its health. var health util.FastIntMap @@ -125,7 +121,6 @@ func grpcTransportFactoryImpl( health.Set(i, healthUnhealthy) } } - *transport = grpcTransport{ opts: opts, nodeDialer: nodeDialer, diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index 710fb4c52243..07183fe8883b 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -263,6 +263,13 @@ func (rc *RangeCache) stringLocked() string { // replaced with a new entry. Other concurrent requests will only learn about // the new entry if they re-synchronize using the token. An EvictionToken is // created by calling LookupWithEvictionToken with an empty EvictionToken. +type sortedReplicaSets struct { + valid bool + leaseholder roachpb.ReplicaSet + follower roachpb.ReplicaSet +} + +// EvictionToken holds eviction state between calls to Lookup. type EvictionToken struct { // rdc is the cache that produced this token - and that will be modified by // Evict, EvictAndReplace, EvictLease or SyncTokenAndMaybeUpdateCache. @@ -278,6 +285,58 @@ type EvictionToken struct { entry *cacheEntry } +// SortedReplicas returns a list of sorted replicas for this token. +func (et *EvictionToken) SortedReplicas( + ctx context.Context, + isFollower bool, + compute func(ctx context.Context, desc *roachpb.RangeDescriptor, leaseholder *roachpb.ReplicaDescriptor) (roachpb.ReplicaSet, roachpb.ReplicaSet, error), +) (roachpb.ReplicaSet, bool) { + if !et.entry.sorted.valid { + rdc := et.rdc + rdc.rangeCache.Lock() + defer rdc.rangeCache.Unlock() + + cachedEntry, rawEntry := et.rdc.getCachedRLocked(ctx, et.entry.desc.StartKey, false /* inverted */) + if cachedEntry == nil { + et.clear() + return roachpb.ReplicaSet{}, false + } + leaseholder, follower, err := compute(ctx, &cachedEntry.desc, &cachedEntry.lease.Replica) + if err != nil { + return roachpb.ReplicaSet{}, false + } + + sorted := sortedReplicaSets{ + valid: true, + leaseholder: leaseholder, + follower: follower, + } + + newEntry := cacheEntry{ + desc: cachedEntry.desc, + speculativeDesc: cachedEntry.speculativeDesc, + lease: cachedEntry.lease, + closedts: cachedEntry.closedts, + sorted: sorted, + } + rdc.swapEntryLocked(ctx, rawEntry, &newEntry) + + // If the descriptor changed, we still cache the result, but invalidate + // the eviction token. The user needs to get a fresh eviction token. + if !descsCompatible(&cachedEntry.desc, et.Desc()) { + et.clear() + return roachpb.ReplicaSet{}, false + } + et.entry = cachedEntry + } + + if isFollower { + return et.entry.sorted.follower, true + } else { + return et.entry.sorted.leaseholder, true + } +} + func (et EvictionToken) String() string { if !et.Valid() { return "" @@ -905,6 +964,7 @@ func tryLookupImpl( lease: roachpb.Lease{}, // We don't know the closed timestamp policy. closedts: UnknownClosedTimestampPolicy, + sorted: sortedReplicaSets{valid: false}, } // speculativeDesc comes from intents. Being uncommitted, it is speculative. // We reset its generation to indicate this fact and allow it to be easily @@ -1115,6 +1175,7 @@ func (rc *RangeCache) insertLocked(ctx context.Context, rs ...roachpb.RangeInfo) desc: r.Desc, lease: r.Lease, closedts: r.ClosedTimestampPolicy, + sorted: sortedReplicaSets{valid: false}, } } return rc.insertLockedInner(ctx, entries) @@ -1289,6 +1350,10 @@ type cacheEntry struct { lease roachpb.Lease // closedts indicates the range's closed timestamp policy. closedts roachpb.RangeClosedTimestampPolicy + // sorted is an optimization to store the list of sorted RepicaSets for both + // leaseholder and follower reads. The sorting of this list shows up in hot + // path analysis. + sorted sortedReplicaSets } // DescSpeculative returns true if the descriptor in the entry is "speculative" @@ -1471,6 +1536,7 @@ func (e *cacheEntry) maybeUpdate( lease: e.lease, desc: e.desc, closedts: e.closedts, + sorted: e.sorted, } updatedLease = false @@ -1528,6 +1594,12 @@ func (e *cacheEntry) maybeUpdate( updatedLease = false } + // If we updated either the lease or the descriptor, we don't trust our + // sortedReplicaSets anymore. Let them recompute on next use. + if updatedLease || updatedDesc { + newEntry.sorted = sortedReplicaSets{valid: false} + } + return updatedLease || updatedDesc, updatedLease, newEntry } @@ -1540,6 +1612,7 @@ func (e *cacheEntry) evictLeaseholder( return true, &cacheEntry{ desc: e.desc, closedts: e.closedts, + sorted: sortedReplicaSets{valid: false}, } }