Skip to content

Commit

Permalink
kvclient: ignore stale lease information from lagging replicas
Browse files Browse the repository at this point in the history
This commit makes it such that the `DistSender`'s range descriptor cache
doesn't trigger a cache eviction based on incompatible lease information in a
`NotLeaseHolderError` when it is coming from a replica that has a stale view of
the range's descriptor (characterized by an older `DescriptorGeneration` on the
replica)

Not doing so before was hazardous because, if we received an NLHE that pointed
to a replica that did not belong in the cached descriptor, we'd trigger a cache
evicion. This assumed that the replica returning the error had a fresher view
of the range than what we had in the cache, which is not always true. This
meant that we'd keep doing range lookups and subsequent evictions until this
lagging replica caught up to the current state of the range.

Release note (bug fix): A bug that caused high SQL tail latencies during
background rebalancing in the cluster has been fixed.
  • Loading branch information
aayushshah15 committed Sep 26, 2022
1 parent 53b90e9 commit 533d511
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 37 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2145,10 +2145,10 @@ func (ds *DistSender) sendToReplicas(

var updatedLeaseholder bool
if tErr.Lease != nil {
updatedLeaseholder = routing.UpdateLease(ctx, tErr.Lease)
updatedLeaseholder = routing.UpdateLease(ctx, tErr.Lease, tErr.DescriptorGeneration)
} else if tErr.LeaseHolder != nil {
// tErr.LeaseHolder might be set when tErr.Lease isn't.
routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder)
routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder, tErr.DescriptorGeneration)
updatedLeaseholder = true
}
// Move the new leaseholder to the head of the queue for the next
Expand Down
106 changes: 106 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,112 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) {
require.LessOrEqual(t, callsToNode2, 11)
}

// TestDistSenderIgnodesNLHEBasedOnOldRangeGeneration tests that a
// NotLeaseHolderError received from a replica that has a stale range descriptor
// version is ignored, and the next replica is attempted.
func TestDistSenderIgnoresNLHEBasedOnOldRangeGeneration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
tracer := tracing.NewTracer()
ctx, finishAndGetRecording := tracing.ContextWithRecordingSpan(
context.Background(), tracer, "test",
)
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
g := makeGossip(t, stopper, rpcContext)
for _, n := range testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors() {
require.NoError(t, g.AddInfoProto(
gossip.MakeNodeIDKey(n.NodeID),
newNodeDesc(n.NodeID),
gossip.NodeDescriptorTTL,
))
}

oldGeneration := roachpb.RangeGeneration(1)
newGeneration := roachpb.RangeGeneration(2)
desc := roachpb.RangeDescriptor{
RangeID: 1,
Generation: newGeneration,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
},
}
// Ambiguous lease refers to a replica that is incompatible with the cached
// range descriptor.
ambiguousLease := roachpb.Lease{
Replica: roachpb.ReplicaDescriptor{NodeID: 4, StoreID: 4, ReplicaID: 4},
}
cachedLease := roachpb.Lease{
Replica: desc.InternalReplicas[1],
}

// The cache starts with a lease on node 2, so the first request will be
// routed there. That replica will reply with an NLHE with an old descriptor
// generation value, which should make the DistSender try the next replica.
var calls []roachpb.NodeID
sendFn := func(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) {
calls = append(calls, ba.Replica.NodeID)
if ba.Replica.NodeID == 2 {
reply := &roachpb.BatchResponse{}
err := &roachpb.NotLeaseHolderError{Lease: &ambiguousLease, DescriptorGeneration: oldGeneration}
reply.Error = roachpb.NewError(err)
return reply, nil
}
require.Equal(t, ba.Replica.NodeID, roachpb.NodeID(1))
return ba.CreateReply(), nil
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracer},
Clock: clock,
NodeDescs: g,
RPCContext: rpcContext,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(sendFn),
},
RangeDescriptorDB: threeReplicaMockRangeDescriptorDB,
NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)),
Settings: cluster.MakeTestingClusterSettings(),
}
ds := NewDistSender(cfg)

ds.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: cachedLease,
})

get := roachpb.NewGet(roachpb.Key("a"), false /* forUpdate */)
_, pErr := kv.SendWrapped(ctx, ds, get)
require.Nil(t, pErr)

require.Equal(t, int64(0), ds.Metrics().RangeLookups.Count())
// We expect to backoff and retry the same replica 11 times when we get an
// NLHE with stale info. See `sameReplicaRetryLimit`.
require.Equal(t, int64(11), ds.Metrics().NextReplicaErrCount.Count())
require.Equal(t, int64(11), ds.Metrics().NotLeaseHolderErrCount.Count())

// Ensure that we called Node 2 11 times and then finally called Node 1.
var expectedCalls []roachpb.NodeID
for i := 0; i < 11; i++ {
expectedCalls = append(expectedCalls, roachpb.NodeID(2))
}
expectedCalls = append(expectedCalls, roachpb.NodeID(1))
require.Equal(t, expectedCalls, calls)

require.Regexp(
t,
"backing off due to .* stale info",
finishAndGetRecording().String(),
)
}

func TestDistSenderRetryOnTransportErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
42 changes: 30 additions & 12 deletions pkg/kv/kvclient/rangecache/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ func (et *EvictionToken) syncRLocked(
// It's legal to pass in a lease with a zero Sequence; it will be treated as a
// speculative lease and considered newer than any existing lease (and then in
// turn will be overridden by any subsequent update).
func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool {
func (et *EvictionToken) UpdateLease(
ctx context.Context, l *roachpb.Lease, descGeneration roachpb.RangeGeneration,
) bool {
rdc := et.rdc
rdc.rangeCache.Lock()
defer rdc.rangeCache.Unlock()
Expand All @@ -388,7 +390,7 @@ func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool
if !stillValid {
return false
}
ok, newEntry := cachedEntry.updateLease(l)
ok, newEntry := cachedEntry.updateLease(l, descGeneration)
if !ok {
return false
}
Expand All @@ -407,11 +409,13 @@ func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool
// a full lease. This is called when a likely leaseholder is known, but not a
// full lease. The lease we'll insert into the cache will be considered
// "speculative".
func (et *EvictionToken) UpdateLeaseholder(ctx context.Context, lh roachpb.ReplicaDescriptor) {
func (et *EvictionToken) UpdateLeaseholder(
ctx context.Context, lh roachpb.ReplicaDescriptor, descGeneration roachpb.RangeGeneration,
) {
// Notice that we don't initialize Lease.Sequence, which will make
// entry.LeaseSpeculative() return true.
l := &roachpb.Lease{Replica: lh}
et.UpdateLease(ctx, l)
et.UpdateLease(ctx, l, descGeneration)
}

// EvictLease evicts information about the current lease from the cache, if the
Expand Down Expand Up @@ -1294,11 +1298,15 @@ func compareEntryLeases(a, b *CacheEntry) int {
// This means that the passed-in lease is older than the lease already in the
// entry.
//
// If the new leaseholder is not a replica in the descriptor, we assume the
// lease information to be more recent than the entry's descriptor, and we
// return true, nil. The caller should evict the receiver from the cache, but
// it'll have to do extra work to figure out what to insert instead.
func (e *CacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry *CacheEntry) {
// If the new leaseholder is not a replica in the descriptor, and the error is
// coming from a replica with range descriptor generation at least as high as
// the cache's, we deduce the lease information to be more recent than the
// entry's descriptor, and we return true, nil. The caller should evict the
// receiver from the cache, but it'll have to do extra work to figure out what
// to insert instead.
func (e *CacheEntry) updateLease(
l *roachpb.Lease, descGeneration roachpb.RangeGeneration,
) (updated bool, newEntry *CacheEntry) {
// If l is older than what the entry has (or the same), return early.
//
// This method handles speculative leases: a new lease with a sequence of 0 is
Expand All @@ -1317,11 +1325,21 @@ func (e *CacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry *Cach
return false, e
}

// Check whether the lease we were given is compatible with the replicas in
// the descriptor. If it's not, the descriptor must be really stale, and the
// RangeCacheEntry needs to be evicted.
// If the lease is incompatible with the cached descriptor and the error is
// coming from a replica that has a non-stale descriptor, the cached
// descriptor must be stale and the RangeCacheEntry needs to be evicted.
_, ok := e.desc.GetReplicaDescriptorByID(l.Replica.ReplicaID)
if !ok {
// If the error is coming from a replica that has a stale range descriptor,
// we cannot trigger a cache eviction since this means we've rebalanced the
// old leaseholder away. If we were to evict here, we'd keep evicting until
// this replica applied the new lease. Not updating the cache here means
// that we'll end up trying all voting replicas until we either hit the new
// leaseholder or hit a replica that has accurate knowledge of the
// leaseholder.
if descGeneration != 0 && descGeneration < e.desc.Generation {
return false, nil
}
return true, nil
}

Expand Down
50 changes: 29 additions & 21 deletions pkg/kv/kvclient/rangecache/range_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,29 +1563,32 @@ func TestRangeCacheUpdateLease(t *testing.T) {
StoreID: 4,
ReplicaID: 4,
}

staleRangeGeneration := roachpb.RangeGeneration(2)
nonStaleRangeGeneration := roachpb.RangeGeneration(3)
desc1 := roachpb.RangeDescriptor{
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
rep1, rep2,
},
Generation: 0,
Generation: nonStaleRangeGeneration,
}
desc2 := roachpb.RangeDescriptor{
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
rep2, rep3,
},
Generation: 1,
Generation: nonStaleRangeGeneration + 1,
}
desc3 := roachpb.RangeDescriptor{
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
rep1, rep2,
},
Generation: 2,
Generation: nonStaleRangeGeneration + 2,
}
startKey := desc1.StartKey

Expand Down Expand Up @@ -1613,7 +1616,7 @@ func TestRangeCacheUpdateLease(t *testing.T) {
Sequence: 1,
}
oldTok := tok
ok := tok.UpdateLease(ctx, l)
ok := tok.UpdateLease(ctx, l, 0 /* descGeneration */)
require.True(t, ok)
require.Equal(t, oldTok.Desc(), tok.Desc())
require.Equal(t, &l.Replica, tok.Leaseholder())
Expand All @@ -1635,13 +1638,21 @@ func TestRangeCacheUpdateLease(t *testing.T) {
require.True(t, ri.lease.Empty())
require.Equal(t, roachpb.LEAD_FOR_GLOBAL_READS, ri.ClosedTimestampPolicy())

// Check that trying to update the lease to a non-member replica results
// in the entry's eviction and the token's invalidation.
// Check that trying to update the lease to a non-member replica results in
// the entry's eviction and the token's invalidation if the descriptor
// generation in the error is not older than the cached descriptor generation.
l = &roachpb.Lease{
Replica: repNonMember,
Sequence: 2,
}
ok = tok.UpdateLease(ctx, l)
// Check that there's no eviction if the range desc generation in the error is
// stale.
ok = tok.UpdateLease(ctx, l, staleRangeGeneration)
require.False(t, ok)
require.True(t, tok.Valid())

// However, expect an eviction when the error's desc generation is non-stale.
ok = tok.UpdateLease(ctx, l, nonStaleRangeGeneration)
require.False(t, ok)
require.False(t, tok.Valid())
ri = cache.GetCached(ctx, startKey, false /* inverted */)
Expand All @@ -1664,10 +1675,7 @@ func TestRangeCacheUpdateLease(t *testing.T) {
Desc: desc2,
Lease: roachpb.Lease{},
})
ok = tok.UpdateLease(ctx,
// Specify a lease compatible with desc2.
&roachpb.Lease{Replica: rep2, Sequence: 3},
)
ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep2, Sequence: 3}, 0 /* descGeneration */)
require.True(t, ok)
require.NotNil(t, tok)
require.Equal(t, &desc2, tok.Desc())
Expand All @@ -1682,7 +1690,7 @@ func TestRangeCacheUpdateLease(t *testing.T) {
})
// This time try to specify a lease that's not compatible with the desc. The
// entry should end up evicted from the cache.
ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4})
ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4}, 0 /* descGeneration */)
require.False(t, ok)
require.False(t, tok.Valid())
ri = cache.GetCached(ctx, startKey, false /* inverted */)
Expand Down Expand Up @@ -1726,7 +1734,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: rep1,
Sequence: 1,
}
ok, e := e.updateLease(l)
ok, e := e.updateLease(l, 0 /* descGeneration */)
require.True(t, ok)
require.True(t, l.Equal(e.Lease()))

Expand All @@ -1735,7 +1743,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: rep1,
Sequence: 0,
}
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.True(t, ok)
require.NotNil(t, e.Leaseholder())
require.True(t, l.Replica.Equal(*e.Leaseholder()))
Expand All @@ -1747,7 +1755,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: rep2,
Sequence: 0,
}
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.True(t, ok)
require.NotNil(t, e.Leaseholder())
require.True(t, l.Replica.Equal(*e.Leaseholder()))
Expand All @@ -1757,7 +1765,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: rep1,
Sequence: 0,
}
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.True(t, ok)
require.NotNil(t, e.Leaseholder())
require.True(t, l.Replica.Equal(*e.Leaseholder()))
Expand All @@ -1767,7 +1775,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: rep1,
Sequence: 2,
}
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.True(t, ok)
require.NotNil(t, e.Leaseholder())
require.True(t, l.Equal(*e.Lease()))
Expand All @@ -1777,7 +1785,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: rep2,
Sequence: 1,
}
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.False(t, ok)
require.False(t, l.Equal(*e.Lease()))

Expand All @@ -1786,7 +1794,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: rep2,
Sequence: 2,
}
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.True(t, ok)
require.True(t, l.Equal(e.Lease()))

Expand All @@ -1796,7 +1804,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Sequence: 2,
}
require.True(t, l.Equal(e.Lease()))
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.False(t, ok)
require.True(t, l.Equal(e.Lease()))

Expand All @@ -1806,7 +1814,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: repNonMember,
Sequence: 0,
}
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.True(t, ok)
require.Nil(t, e)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,8 +914,9 @@ func newNotLeaseHolderError(
l roachpb.Lease, proposerStoreID roachpb.StoreID, rangeDesc *roachpb.RangeDescriptor, msg string,
) *roachpb.NotLeaseHolderError {
err := &roachpb.NotLeaseHolderError{
RangeID: rangeDesc.RangeID,
CustomMsg: msg,
RangeID: rangeDesc.RangeID,
DescriptorGeneration: rangeDesc.Generation,
CustomMsg: msg,
}
if proposerStoreID != 0 {
err.Replica, _ = rangeDesc.GetReplicaDescriptor(proposerStoreID)
Expand Down
Loading

0 comments on commit 533d511

Please sign in to comment.