From 92f0260764a8d31d28f4ecae288c19c8468c2e0e Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Mon, 17 Jan 2022 04:34:36 -0500 Subject: [PATCH] kvclient: ignore stale lease information from lagging replicas This commit makes it such that the `DistSender`'s range descriptor cache doesn't trigger a cache eviction based on incompatible lease information in a `NotLeaseHolderError` when it is coming from a replica that has a stale view of the range's descriptor (characterized by an older `DescriptorGeneration` on the replica) Not doing so before was hazardous because, if we received an NLHE that pointed to a replica that did not belong in the cached descriptor, we'd trigger a cache evicion. This assumed that the replica returning the error had a fresher view of the range than what we had in the cache, which is not always true. This meant that we'd keep doing range lookups and subsequent evictions until this lagging replica caught up to the current state of the range. Release note (bug fix): A bug that caused high SQL tail latencies during background rebalancing in the cluster has been fixed. --- pkg/kv/kvclient/kvcoord/dist_sender.go | 4 +- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 106 ++++++++++++++++++ pkg/kv/kvclient/rangecache/range_cache.go | 42 +++++-- .../kvclient/rangecache/range_cache_test.go | 50 +++++---- pkg/kv/kvserver/replica_range_lease.go | 5 +- pkg/roachpb/errors.proto | 6 + 6 files changed, 176 insertions(+), 37 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index c51e77fbe2c0..22205f7e205d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2102,10 +2102,10 @@ func (ds *DistSender) sendToReplicas( var updatedLeaseholder bool if tErr.Lease != nil { - updatedLeaseholder = routing.UpdateLease(ctx, tErr.Lease) + updatedLeaseholder = routing.UpdateLease(ctx, tErr.Lease, tErr.DescriptorGeneration) } else if tErr.LeaseHolder != nil { // tErr.LeaseHolder might be set when tErr.Lease isn't. - routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder) + routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder, tErr.DescriptorGeneration) updatedLeaseholder = true } // Move the new leaseholder to the head of the queue for the next diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 6d4f4e0170ad..58b7fa1edcb0 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -971,6 +971,112 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) { require.LessOrEqual(t, callsToNode2, 11) } +// TestDistSenderIgnodesNLHEBasedOnOldRangeGeneration tests that a +// NotLeaseHolderError received from a replica that has a stale range descriptor +// version is ignored, and the next replica is attempted. +func TestDistSenderIgnoresNLHEBasedOnOldRangeGeneration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tracer := tracing.NewTracer() + ctx, finishAndGetRecording := tracing.ContextWithRecordingSpan( + context.Background(), tracer, "test", + ) + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + g := makeGossip(t, stopper, rpcContext) + for _, n := range testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors() { + require.NoError(t, g.AddInfoProto( + gossip.MakeNodeIDKey(n.NodeID), + newNodeDesc(n.NodeID), + gossip.NodeDescriptorTTL, + )) + } + + oldGeneration := roachpb.RangeGeneration(1) + newGeneration := roachpb.RangeGeneration(2) + desc := roachpb.RangeDescriptor{ + RangeID: 1, + Generation: newGeneration, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, + } + // Ambiguous lease refers to a replica that is incompatible with the cached + // range descriptor. + ambiguousLease := roachpb.Lease{ + Replica: roachpb.ReplicaDescriptor{NodeID: 4, StoreID: 4, ReplicaID: 4}, + } + cachedLease := roachpb.Lease{ + Replica: desc.InternalReplicas[1], + } + + // The cache starts with a lease on node 2, so the first request will be + // routed there. That replica will reply with an NLHE with an old descriptor + // generation value, which should make the DistSender try the next replica. + var calls []roachpb.NodeID + sendFn := func(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) { + calls = append(calls, ba.Replica.NodeID) + if ba.Replica.NodeID == 2 { + reply := &roachpb.BatchResponse{} + err := &roachpb.NotLeaseHolderError{Lease: &ambiguousLease, DescriptorGeneration: oldGeneration} + reply.Error = roachpb.NewError(err) + return reply, nil + } + require.Equal(t, ba.Replica.NodeID, roachpb.NodeID(1)) + return ba.CreateReply(), nil + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracer}, + Clock: clock, + NodeDescs: g, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(sendFn), + }, + RangeDescriptorDB: threeReplicaMockRangeDescriptorDB, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + Settings: cluster.MakeTestingClusterSettings(), + } + ds := NewDistSender(cfg) + + ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: desc, + Lease: cachedLease, + }) + + get := roachpb.NewGet(roachpb.Key("a"), false /* forUpdate */) + _, pErr := kv.SendWrapped(ctx, ds, get) + require.Nil(t, pErr) + + require.Equal(t, int64(0), ds.Metrics().RangeLookups.Count()) + // We expect to backoff and retry the same replica 11 times when we get an + // NLHE with stale info. See `sameReplicaRetryLimit`. + require.Equal(t, int64(11), ds.Metrics().NextReplicaErrCount.Count()) + require.Equal(t, int64(11), ds.Metrics().NotLeaseHolderErrCount.Count()) + + // Ensure that we called Node 2 11 times and then finally called Node 1. + var expectedCalls []roachpb.NodeID + for i := 0; i < 11; i++ { + expectedCalls = append(expectedCalls, roachpb.NodeID(2)) + } + expectedCalls = append(expectedCalls, roachpb.NodeID(1)) + require.Equal(t, expectedCalls, calls) + + require.Regexp( + t, + "backing off due to .* stale info", + finishAndGetRecording().String(), + ) +} + func TestDistSenderRetryOnTransportErrors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index 29e068bcda1f..086c585f51f2 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -379,7 +379,9 @@ func (et *EvictionToken) syncRLocked( // It's legal to pass in a lease with a zero Sequence; it will be treated as a // speculative lease and considered newer than any existing lease (and then in // turn will be overridden by any subsequent update). -func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool { +func (et *EvictionToken) UpdateLease( + ctx context.Context, l *roachpb.Lease, descGeneration roachpb.RangeGeneration, +) bool { rdc := et.rdc rdc.rangeCache.Lock() defer rdc.rangeCache.Unlock() @@ -388,7 +390,7 @@ func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool if !stillValid { return false } - ok, newEntry := cachedEntry.updateLease(l) + ok, newEntry := cachedEntry.updateLease(l, descGeneration) if !ok { return false } @@ -407,11 +409,13 @@ func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool // a full lease. This is called when a likely leaseholder is known, but not a // full lease. The lease we'll insert into the cache will be considered // "speculative". -func (et *EvictionToken) UpdateLeaseholder(ctx context.Context, lh roachpb.ReplicaDescriptor) { +func (et *EvictionToken) UpdateLeaseholder( + ctx context.Context, lh roachpb.ReplicaDescriptor, descGeneration roachpb.RangeGeneration, +) { // Notice that we don't initialize Lease.Sequence, which will make // entry.LeaseSpeculative() return true. l := &roachpb.Lease{Replica: lh} - et.UpdateLease(ctx, l) + et.UpdateLease(ctx, l, descGeneration) } // EvictLease evicts information about the current lease from the cache, if the @@ -1303,11 +1307,15 @@ func compareEntryLeases(a, b *CacheEntry) int { // This means that the passed-in lease is older than the lease already in the // entry. // -// If the new leaseholder is not a replica in the descriptor, we assume the -// lease information to be more recent than the entry's descriptor, and we -// return true, nil. The caller should evict the receiver from the cache, but -// it'll have to do extra work to figure out what to insert instead. -func (e *CacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry *CacheEntry) { +// If the new leaseholder is not a replica in the descriptor, and the error is +// coming from a replica with range descriptor generation at least as high as +// the cache's, we deduce the lease information to be more recent than the +// entry's descriptor, and we return true, nil. The caller should evict the +// receiver from the cache, but it'll have to do extra work to figure out what +// to insert instead. +func (e *CacheEntry) updateLease( + l *roachpb.Lease, descGeneration roachpb.RangeGeneration, +) (updated bool, newEntry *CacheEntry) { // If l is older than what the entry has (or the same), return early. // // This method handles speculative leases: a new lease with a sequence of 0 is @@ -1326,11 +1334,21 @@ func (e *CacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry *Cach return false, e } - // Check whether the lease we were given is compatible with the replicas in - // the descriptor. If it's not, the descriptor must be really stale, and the - // RangeCacheEntry needs to be evicted. + // If the lease is incompatible with the cached descriptor and the error is + // coming from a replica that has a non-stale descriptor, the cached + // descriptor must be stale and the RangeCacheEntry needs to be evicted. _, ok := e.desc.GetReplicaDescriptorByID(l.Replica.ReplicaID) if !ok { + // If the error is coming from a replica that has a stale range descriptor, + // we cannot trigger a cache eviction since this means we've rebalanced the + // old leaseholder away. If we were to evict here, we'd keep evicting until + // this replica applied the new lease. Not updating the cache here means + // that we'll end up trying all voting replicas until we either hit the new + // leaseholder or hit a replica that has accurate knowledge of the + // leaseholder. + if descGeneration != 0 && descGeneration < e.desc.Generation { + return false, nil + } return true, nil } diff --git a/pkg/kv/kvclient/rangecache/range_cache_test.go b/pkg/kv/kvclient/rangecache/range_cache_test.go index a88c7c27404a..0fe5fc1bd9cb 100644 --- a/pkg/kv/kvclient/rangecache/range_cache_test.go +++ b/pkg/kv/kvclient/rangecache/range_cache_test.go @@ -1570,13 +1570,16 @@ func TestRangeCacheUpdateLease(t *testing.T) { StoreID: 4, ReplicaID: 4, } + + staleRangeGeneration := roachpb.RangeGeneration(2) + nonStaleRangeGeneration := roachpb.RangeGeneration(3) desc1 := roachpb.RangeDescriptor{ StartKey: roachpb.RKeyMin, EndKey: roachpb.RKeyMax, InternalReplicas: []roachpb.ReplicaDescriptor{ rep1, rep2, }, - Generation: 0, + Generation: nonStaleRangeGeneration, } desc2 := roachpb.RangeDescriptor{ StartKey: roachpb.RKeyMin, @@ -1584,7 +1587,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { InternalReplicas: []roachpb.ReplicaDescriptor{ rep2, rep3, }, - Generation: 1, + Generation: nonStaleRangeGeneration + 1, } desc3 := roachpb.RangeDescriptor{ StartKey: roachpb.RKeyMin, @@ -1592,7 +1595,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { InternalReplicas: []roachpb.ReplicaDescriptor{ rep1, rep2, }, - Generation: 2, + Generation: nonStaleRangeGeneration + 2, } startKey := desc1.StartKey @@ -1621,7 +1624,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { Sequence: 1, } oldTok := tok - ok := tok.UpdateLease(ctx, l) + ok := tok.UpdateLease(ctx, l, 0 /* descGeneration */) require.True(t, ok) require.Equal(t, oldTok.Desc(), tok.Desc()) require.Equal(t, &l.Replica, tok.Leaseholder()) @@ -1643,13 +1646,21 @@ func TestRangeCacheUpdateLease(t *testing.T) { require.True(t, ri.lease.Empty()) require.Equal(t, roachpb.LEAD_FOR_GLOBAL_READS, ri.ClosedTimestampPolicy()) - // Check that trying to update the lease to a non-member replica results - // in the entry's eviction and the token's invalidation. + // Check that trying to update the lease to a non-member replica results in + // the entry's eviction and the token's invalidation if the descriptor + // generation in the error is not older than the cached descriptor generation. l = &roachpb.Lease{ Replica: repNonMember, Sequence: 2, } - ok = tok.UpdateLease(ctx, l) + // Check that there's no eviction if the range desc generation in the error is + // stale. + ok = tok.UpdateLease(ctx, l, staleRangeGeneration) + require.False(t, ok) + require.True(t, tok.Valid()) + + // However, expect an eviction when the error's desc generation is non-stale. + ok = tok.UpdateLease(ctx, l, nonStaleRangeGeneration) require.False(t, ok) require.False(t, tok.Valid()) ri = cache.GetCached(ctx, startKey, false /* inverted */) @@ -1672,10 +1683,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { Desc: desc2, Lease: roachpb.Lease{}, }) - ok = tok.UpdateLease(ctx, - // Specify a lease compatible with desc2. - &roachpb.Lease{Replica: rep2, Sequence: 3}, - ) + ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep2, Sequence: 3}, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, tok) require.Equal(t, &desc2, tok.Desc()) @@ -1690,7 +1698,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { }) // This time try to specify a lease that's not compatible with the desc. The // entry should end up evicted from the cache. - ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4}) + ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4}, 0 /* descGeneration */) require.False(t, ok) require.False(t, tok.Valid()) ri = cache.GetCached(ctx, startKey, false /* inverted */) @@ -1734,7 +1742,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 1, } - ok, e := e.updateLease(l) + ok, e := e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.True(t, l.Equal(e.Lease())) @@ -1743,7 +1751,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Replica.Equal(*e.Leaseholder())) @@ -1755,7 +1763,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep2, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Replica.Equal(*e.Leaseholder())) @@ -1765,7 +1773,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Replica.Equal(*e.Leaseholder())) @@ -1775,7 +1783,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 2, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Equal(*e.Lease())) @@ -1785,7 +1793,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep2, Sequence: 1, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.False(t, ok) require.False(t, l.Equal(*e.Lease())) @@ -1794,7 +1802,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep2, Sequence: 2, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.True(t, l.Equal(e.Lease())) @@ -1804,7 +1812,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Sequence: 2, } require.True(t, l.Equal(e.Lease())) - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.False(t, ok) require.True(t, l.Equal(e.Lease())) @@ -1814,7 +1822,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: repNonMember, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.Nil(t, e) } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 7eadd4a8084c..ca9172ea364e 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -913,8 +913,9 @@ func newNotLeaseHolderError( l roachpb.Lease, proposerStoreID roachpb.StoreID, rangeDesc *roachpb.RangeDescriptor, msg string, ) *roachpb.NotLeaseHolderError { err := &roachpb.NotLeaseHolderError{ - RangeID: rangeDesc.RangeID, - CustomMsg: msg, + RangeID: rangeDesc.RangeID, + DescriptorGeneration: rangeDesc.Generation, + CustomMsg: msg, } if proposerStoreID != 0 { err.Replica, _ = rangeDesc.GetReplicaDescriptor(proposerStoreID) diff --git a/pkg/roachpb/errors.proto b/pkg/roachpb/errors.proto index e77b56e58290..1aff2829da0d 100644 --- a/pkg/roachpb/errors.proto +++ b/pkg/roachpb/errors.proto @@ -56,6 +56,12 @@ message NotLeaseHolderError { optional roachpb.Lease lease = 4; optional int64 range_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "RangeID", (gogoproto.casttype) = "RangeID"]; + // The range descriptor generation of the replica the error originated from. + // Used by the DistSender's RangeCache to determine whether the error was + // returned because the replica had a stale understanding of who the + // leaseholder is. + optional int64 descriptor_generation = 6 [(gogoproto.nullable) = false, + (gogoproto.customname) = "DescriptorGeneration", (gogoproto.casttype) = "RangeGeneration"]; // If set, the Error() method will return this instead of composing its // regular spiel. Useful because we reuse this error when rejecting a command // because the lease under which its application was attempted is different