diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 2c8f98d0c029..054d0408aba5 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2145,10 +2145,10 @@ func (ds *DistSender) sendToReplicas( var updatedLeaseholder bool if tErr.Lease != nil { - updatedLeaseholder = routing.UpdateLease(ctx, tErr.Lease) + updatedLeaseholder = routing.UpdateLease(ctx, tErr.Lease, tErr.DescriptorGeneration) } else if tErr.LeaseHolder != nil { // tErr.LeaseHolder might be set when tErr.Lease isn't. - routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder) + routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder, tErr.DescriptorGeneration) updatedLeaseholder = true } // Move the new leaseholder to the head of the queue for the next diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index ca9420600049..360c192c638e 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -1033,6 +1033,112 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) { require.LessOrEqual(t, callsToNode2, 11) } +// TestDistSenderIgnodesNLHEBasedOnOldRangeGeneration tests that a +// NotLeaseHolderError received from a replica that has a stale range descriptor +// version is ignored, and the next replica is attempted. +func TestDistSenderIgnoresNLHEBasedOnOldRangeGeneration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tracer := tracing.NewTracer() + ctx, finishAndGetRecording := tracing.ContextWithRecordingSpan( + context.Background(), tracer, "test", + ) + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + g := makeGossip(t, stopper, rpcContext) + for _, n := range testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors() { + require.NoError(t, g.AddInfoProto( + gossip.MakeNodeIDKey(n.NodeID), + newNodeDesc(n.NodeID), + gossip.NodeDescriptorTTL, + )) + } + + oldGeneration := roachpb.RangeGeneration(1) + newGeneration := roachpb.RangeGeneration(2) + desc := roachpb.RangeDescriptor{ + RangeID: 1, + Generation: newGeneration, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, + } + // Ambiguous lease refers to a replica that is incompatible with the cached + // range descriptor. + ambiguousLease := roachpb.Lease{ + Replica: roachpb.ReplicaDescriptor{NodeID: 4, StoreID: 4, ReplicaID: 4}, + } + cachedLease := roachpb.Lease{ + Replica: desc.InternalReplicas[1], + } + + // The cache starts with a lease on node 2, so the first request will be + // routed there. That replica will reply with an NLHE with an old descriptor + // generation value, which should make the DistSender try the next replica. + var calls []roachpb.NodeID + sendFn := func(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) { + calls = append(calls, ba.Replica.NodeID) + if ba.Replica.NodeID == 2 { + reply := &roachpb.BatchResponse{} + err := &roachpb.NotLeaseHolderError{Lease: &ambiguousLease, DescriptorGeneration: oldGeneration} + reply.Error = roachpb.NewError(err) + return reply, nil + } + require.Equal(t, ba.Replica.NodeID, roachpb.NodeID(1)) + return ba.CreateReply(), nil + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracer}, + Clock: clock, + NodeDescs: g, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(sendFn), + }, + RangeDescriptorDB: threeReplicaMockRangeDescriptorDB, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + Settings: cluster.MakeTestingClusterSettings(), + } + ds := NewDistSender(cfg) + + ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: desc, + Lease: cachedLease, + }) + + get := roachpb.NewGet(roachpb.Key("a"), false /* forUpdate */) + _, pErr := kv.SendWrapped(ctx, ds, get) + require.Nil(t, pErr) + + require.Equal(t, int64(0), ds.Metrics().RangeLookups.Count()) + // We expect to backoff and retry the same replica 11 times when we get an + // NLHE with stale info. See `sameReplicaRetryLimit`. + require.Equal(t, int64(11), ds.Metrics().NextReplicaErrCount.Count()) + require.Equal(t, int64(11), ds.Metrics().NotLeaseHolderErrCount.Count()) + + // Ensure that we called Node 2 11 times and then finally called Node 1. + var expectedCalls []roachpb.NodeID + for i := 0; i < 11; i++ { + expectedCalls = append(expectedCalls, roachpb.NodeID(2)) + } + expectedCalls = append(expectedCalls, roachpb.NodeID(1)) + require.Equal(t, expectedCalls, calls) + + require.Regexp( + t, + "backing off due to .* stale info", + finishAndGetRecording().String(), + ) +} + func TestDistSenderRetryOnTransportErrors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index 01433913c990..fa013e3d7189 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -379,7 +379,9 @@ func (et *EvictionToken) syncRLocked( // It's legal to pass in a lease with a zero Sequence; it will be treated as a // speculative lease and considered newer than any existing lease (and then in // turn will be overridden by any subsequent update). -func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool { +func (et *EvictionToken) UpdateLease( + ctx context.Context, l *roachpb.Lease, descGeneration roachpb.RangeGeneration, +) bool { rdc := et.rdc rdc.rangeCache.Lock() defer rdc.rangeCache.Unlock() @@ -388,7 +390,7 @@ func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool if !stillValid { return false } - ok, newEntry := cachedEntry.updateLease(l) + ok, newEntry := cachedEntry.updateLease(l, descGeneration) if !ok { return false } @@ -407,11 +409,13 @@ func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool // a full lease. This is called when a likely leaseholder is known, but not a // full lease. The lease we'll insert into the cache will be considered // "speculative". -func (et *EvictionToken) UpdateLeaseholder(ctx context.Context, lh roachpb.ReplicaDescriptor) { +func (et *EvictionToken) UpdateLeaseholder( + ctx context.Context, lh roachpb.ReplicaDescriptor, descGeneration roachpb.RangeGeneration, +) { // Notice that we don't initialize Lease.Sequence, which will make // entry.LeaseSpeculative() return true. l := &roachpb.Lease{Replica: lh} - et.UpdateLease(ctx, l) + et.UpdateLease(ctx, l, descGeneration) } // EvictLease evicts information about the current lease from the cache, if the @@ -1294,11 +1298,15 @@ func compareEntryLeases(a, b *CacheEntry) int { // This means that the passed-in lease is older than the lease already in the // entry. // -// If the new leaseholder is not a replica in the descriptor, we assume the -// lease information to be more recent than the entry's descriptor, and we -// return true, nil. The caller should evict the receiver from the cache, but -// it'll have to do extra work to figure out what to insert instead. -func (e *CacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry *CacheEntry) { +// If the new leaseholder is not a replica in the descriptor, and the error is +// coming from a replica with range descriptor generation at least as high as +// the cache's, we deduce the lease information to be more recent than the +// entry's descriptor, and we return true, nil. The caller should evict the +// receiver from the cache, but it'll have to do extra work to figure out what +// to insert instead. +func (e *CacheEntry) updateLease( + l *roachpb.Lease, descGeneration roachpb.RangeGeneration, +) (updated bool, newEntry *CacheEntry) { // If l is older than what the entry has (or the same), return early. // // This method handles speculative leases: a new lease with a sequence of 0 is @@ -1317,11 +1325,21 @@ func (e *CacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry *Cach return false, e } - // Check whether the lease we were given is compatible with the replicas in - // the descriptor. If it's not, the descriptor must be really stale, and the - // RangeCacheEntry needs to be evicted. + // If the lease is incompatible with the cached descriptor and the error is + // coming from a replica that has a non-stale descriptor, the cached + // descriptor must be stale and the RangeCacheEntry needs to be evicted. _, ok := e.desc.GetReplicaDescriptorByID(l.Replica.ReplicaID) if !ok { + // If the error is coming from a replica that has a stale range descriptor, + // we cannot trigger a cache eviction since this means we've rebalanced the + // old leaseholder away. If we were to evict here, we'd keep evicting until + // this replica applied the new lease. Not updating the cache here means + // that we'll end up trying all voting replicas until we either hit the new + // leaseholder or hit a replica that has accurate knowledge of the + // leaseholder. + if descGeneration != 0 && descGeneration < e.desc.Generation { + return false, nil + } return true, nil } diff --git a/pkg/kv/kvclient/rangecache/range_cache_test.go b/pkg/kv/kvclient/rangecache/range_cache_test.go index acc054381a6e..9c5680a5741f 100644 --- a/pkg/kv/kvclient/rangecache/range_cache_test.go +++ b/pkg/kv/kvclient/rangecache/range_cache_test.go @@ -1563,13 +1563,16 @@ func TestRangeCacheUpdateLease(t *testing.T) { StoreID: 4, ReplicaID: 4, } + + staleRangeGeneration := roachpb.RangeGeneration(2) + nonStaleRangeGeneration := roachpb.RangeGeneration(3) desc1 := roachpb.RangeDescriptor{ StartKey: roachpb.RKeyMin, EndKey: roachpb.RKeyMax, InternalReplicas: []roachpb.ReplicaDescriptor{ rep1, rep2, }, - Generation: 0, + Generation: nonStaleRangeGeneration, } desc2 := roachpb.RangeDescriptor{ StartKey: roachpb.RKeyMin, @@ -1577,7 +1580,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { InternalReplicas: []roachpb.ReplicaDescriptor{ rep2, rep3, }, - Generation: 1, + Generation: nonStaleRangeGeneration + 1, } desc3 := roachpb.RangeDescriptor{ StartKey: roachpb.RKeyMin, @@ -1585,7 +1588,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { InternalReplicas: []roachpb.ReplicaDescriptor{ rep1, rep2, }, - Generation: 2, + Generation: nonStaleRangeGeneration + 2, } startKey := desc1.StartKey @@ -1613,7 +1616,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { Sequence: 1, } oldTok := tok - ok := tok.UpdateLease(ctx, l) + ok := tok.UpdateLease(ctx, l, 0 /* descGeneration */) require.True(t, ok) require.Equal(t, oldTok.Desc(), tok.Desc()) require.Equal(t, &l.Replica, tok.Leaseholder()) @@ -1635,13 +1638,21 @@ func TestRangeCacheUpdateLease(t *testing.T) { require.True(t, ri.lease.Empty()) require.Equal(t, roachpb.LEAD_FOR_GLOBAL_READS, ri.ClosedTimestampPolicy()) - // Check that trying to update the lease to a non-member replica results - // in the entry's eviction and the token's invalidation. + // Check that trying to update the lease to a non-member replica results in + // the entry's eviction and the token's invalidation if the descriptor + // generation in the error is not older than the cached descriptor generation. l = &roachpb.Lease{ Replica: repNonMember, Sequence: 2, } - ok = tok.UpdateLease(ctx, l) + // Check that there's no eviction if the range desc generation in the error is + // stale. + ok = tok.UpdateLease(ctx, l, staleRangeGeneration) + require.False(t, ok) + require.True(t, tok.Valid()) + + // However, expect an eviction when the error's desc generation is non-stale. + ok = tok.UpdateLease(ctx, l, nonStaleRangeGeneration) require.False(t, ok) require.False(t, tok.Valid()) ri = cache.GetCached(ctx, startKey, false /* inverted */) @@ -1664,10 +1675,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { Desc: desc2, Lease: roachpb.Lease{}, }) - ok = tok.UpdateLease(ctx, - // Specify a lease compatible with desc2. - &roachpb.Lease{Replica: rep2, Sequence: 3}, - ) + ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep2, Sequence: 3}, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, tok) require.Equal(t, &desc2, tok.Desc()) @@ -1682,7 +1690,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { }) // This time try to specify a lease that's not compatible with the desc. The // entry should end up evicted from the cache. - ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4}) + ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4}, 0 /* descGeneration */) require.False(t, ok) require.False(t, tok.Valid()) ri = cache.GetCached(ctx, startKey, false /* inverted */) @@ -1726,7 +1734,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 1, } - ok, e := e.updateLease(l) + ok, e := e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.True(t, l.Equal(e.Lease())) @@ -1735,7 +1743,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Replica.Equal(*e.Leaseholder())) @@ -1747,7 +1755,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep2, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Replica.Equal(*e.Leaseholder())) @@ -1757,7 +1765,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Replica.Equal(*e.Leaseholder())) @@ -1767,7 +1775,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 2, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Equal(*e.Lease())) @@ -1777,7 +1785,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep2, Sequence: 1, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.False(t, ok) require.False(t, l.Equal(*e.Lease())) @@ -1786,7 +1794,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep2, Sequence: 2, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.True(t, l.Equal(e.Lease())) @@ -1796,7 +1804,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Sequence: 2, } require.True(t, l.Equal(e.Lease())) - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.False(t, ok) require.True(t, l.Equal(e.Lease())) @@ -1806,7 +1814,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: repNonMember, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.Nil(t, e) } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 02b993a755a8..130c6e2e40af 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -914,8 +914,9 @@ func newNotLeaseHolderError( l roachpb.Lease, proposerStoreID roachpb.StoreID, rangeDesc *roachpb.RangeDescriptor, msg string, ) *roachpb.NotLeaseHolderError { err := &roachpb.NotLeaseHolderError{ - RangeID: rangeDesc.RangeID, - CustomMsg: msg, + RangeID: rangeDesc.RangeID, + DescriptorGeneration: rangeDesc.Generation, + CustomMsg: msg, } if proposerStoreID != 0 { err.Replica, _ = rangeDesc.GetReplicaDescriptor(proposerStoreID) diff --git a/pkg/roachpb/errors.proto b/pkg/roachpb/errors.proto index 45a9066cae53..58f02087c8cf 100644 --- a/pkg/roachpb/errors.proto +++ b/pkg/roachpb/errors.proto @@ -56,6 +56,12 @@ message NotLeaseHolderError { optional roachpb.Lease lease = 4; optional int64 range_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "RangeID", (gogoproto.casttype) = "RangeID"]; + // The range descriptor generation of the replica the error originated from. + // Used by the DistSender's RangeCache to determine whether the error was + // returned because the replica had a stale understanding of who the + // leaseholder is. + optional int64 descriptor_generation = 6 [(gogoproto.nullable) = false, + (gogoproto.customname) = "DescriptorGeneration", (gogoproto.casttype) = "RangeGeneration"]; // If set, the Error() method will return this instead of composing its // regular spiel. Useful because we reuse this error when rejecting a command // because the lease under which its application was attempted is different