Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient: ignore stale lease information from lagging replicas #73697

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2102,10 +2102,10 @@ func (ds *DistSender) sendToReplicas(

var updatedLeaseholder bool
if tErr.Lease != nil {
updatedLeaseholder = routing.UpdateLease(ctx, tErr.Lease)
updatedLeaseholder = routing.UpdateLease(ctx, tErr.Lease, tErr.DescriptorGeneration)
} else if tErr.LeaseHolder != nil {
// tErr.LeaseHolder might be set when tErr.Lease isn't.
routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder)
routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder, tErr.DescriptorGeneration)
updatedLeaseholder = true
}
// Move the new leaseholder to the head of the queue for the next
Expand Down
106 changes: 106 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,112 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) {
require.LessOrEqual(t, callsToNode2, 11)
}

// TestDistSenderIgnodesNLHEBasedOnOldRangeGeneration tests that a
// NotLeaseHolderError received from a replica that has a stale range descriptor
// version is ignored, and the next replica is attempted.
func TestDistSenderIgnoresNLHEBasedOnOldRangeGeneration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
tracer := tracing.NewTracer()
ctx, finishAndGetRecording := tracing.ContextWithRecordingSpan(
context.Background(), tracer, "test",
)
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
g := makeGossip(t, stopper, rpcContext)
for _, n := range testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors() {
require.NoError(t, g.AddInfoProto(
gossip.MakeNodeIDKey(n.NodeID),
newNodeDesc(n.NodeID),
gossip.NodeDescriptorTTL,
))
}

oldGeneration := roachpb.RangeGeneration(1)
newGeneration := roachpb.RangeGeneration(2)
desc := roachpb.RangeDescriptor{
RangeID: 1,
Generation: newGeneration,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
},
}
// Ambiguous lease refers to a replica that is incompatible with the cached
// range descriptor.
ambiguousLease := roachpb.Lease{
Replica: roachpb.ReplicaDescriptor{NodeID: 4, StoreID: 4, ReplicaID: 4},
}
cachedLease := roachpb.Lease{
Replica: desc.InternalReplicas[1],
}

// The cache starts with a lease on node 2, so the first request will be
// routed there. That replica will reply with an NLHE with an old descriptor
// generation value, which should make the DistSender try the next replica.
var calls []roachpb.NodeID
sendFn := func(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) {
calls = append(calls, ba.Replica.NodeID)
if ba.Replica.NodeID == 2 {
reply := &roachpb.BatchResponse{}
err := &roachpb.NotLeaseHolderError{Lease: &ambiguousLease, DescriptorGeneration: oldGeneration}
reply.Error = roachpb.NewError(err)
return reply, nil
}
require.Equal(t, ba.Replica.NodeID, roachpb.NodeID(1))
return ba.CreateReply(), nil
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracer},
Clock: clock,
NodeDescs: g,
RPCContext: rpcContext,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(sendFn),
},
RangeDescriptorDB: threeReplicaMockRangeDescriptorDB,
NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)),
Settings: cluster.MakeTestingClusterSettings(),
}
ds := NewDistSender(cfg)

ds.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: cachedLease,
})

get := roachpb.NewGet(roachpb.Key("a"), false /* forUpdate */)
_, pErr := kv.SendWrapped(ctx, ds, get)
require.Nil(t, pErr)

require.Equal(t, int64(0), ds.Metrics().RangeLookups.Count())
// We expect to backoff and retry the same replica 11 times when we get an
// NLHE with stale info. See `sameReplicaRetryLimit`.
require.Equal(t, int64(11), ds.Metrics().NextReplicaErrCount.Count())
require.Equal(t, int64(11), ds.Metrics().NotLeaseHolderErrCount.Count())

// Ensure that we called Node 2 11 times and then finally called Node 1.
var expectedCalls []roachpb.NodeID
for i := 0; i < 11; i++ {
expectedCalls = append(expectedCalls, roachpb.NodeID(2))
}
expectedCalls = append(expectedCalls, roachpb.NodeID(1))
require.Equal(t, expectedCalls, calls)

require.Regexp(
t,
"backing off due to .* stale info",
finishAndGetRecording().String(),
)
}

func TestDistSenderRetryOnTransportErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
42 changes: 30 additions & 12 deletions pkg/kv/kvclient/rangecache/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ func (et *EvictionToken) syncRLocked(
// It's legal to pass in a lease with a zero Sequence; it will be treated as a
// speculative lease and considered newer than any existing lease (and then in
// turn will be overridden by any subsequent update).
func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool {
func (et *EvictionToken) UpdateLease(
ctx context.Context, l *roachpb.Lease, descGeneration roachpb.RangeGeneration,
) bool {
rdc := et.rdc
rdc.rangeCache.Lock()
defer rdc.rangeCache.Unlock()
Expand All @@ -388,7 +390,7 @@ func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool
if !stillValid {
return false
}
ok, newEntry := cachedEntry.updateLease(l)
ok, newEntry := cachedEntry.updateLease(l, descGeneration)
if !ok {
return false
}
Expand All @@ -407,11 +409,13 @@ func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool
// a full lease. This is called when a likely leaseholder is known, but not a
// full lease. The lease we'll insert into the cache will be considered
// "speculative".
func (et *EvictionToken) UpdateLeaseholder(ctx context.Context, lh roachpb.ReplicaDescriptor) {
func (et *EvictionToken) UpdateLeaseholder(
ctx context.Context, lh roachpb.ReplicaDescriptor, descGeneration roachpb.RangeGeneration,
) {
// Notice that we don't initialize Lease.Sequence, which will make
// entry.LeaseSpeculative() return true.
l := &roachpb.Lease{Replica: lh}
et.UpdateLease(ctx, l)
et.UpdateLease(ctx, l, descGeneration)
}

// EvictLease evicts information about the current lease from the cache, if the
Expand Down Expand Up @@ -1303,11 +1307,15 @@ func compareEntryLeases(a, b *CacheEntry) int {
// This means that the passed-in lease is older than the lease already in the
// entry.
//
// If the new leaseholder is not a replica in the descriptor, we assume the
// lease information to be more recent than the entry's descriptor, and we
// return true, nil. The caller should evict the receiver from the cache, but
// it'll have to do extra work to figure out what to insert instead.
func (e *CacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry *CacheEntry) {
// If the new leaseholder is not a replica in the descriptor, and the error is
// coming from a replica with range descriptor generation at least as high as
// the cache's, we deduce the lease information to be more recent than the
// entry's descriptor, and we return true, nil. The caller should evict the
// receiver from the cache, but it'll have to do extra work to figure out what
// to insert instead.
func (e *CacheEntry) updateLease(
l *roachpb.Lease, descGeneration roachpb.RangeGeneration,
) (updated bool, newEntry *CacheEntry) {
// If l is older than what the entry has (or the same), return early.
//
// This method handles speculative leases: a new lease with a sequence of 0 is
Expand All @@ -1326,11 +1334,21 @@ func (e *CacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry *Cach
return false, e
}

// Check whether the lease we were given is compatible with the replicas in
// the descriptor. If it's not, the descriptor must be really stale, and the
// RangeCacheEntry needs to be evicted.
// If the lease is incompatible with the cached descriptor and the error is
// coming from a replica that has a non-stale descriptor, the cached
// descriptor must be stale and the RangeCacheEntry needs to be evicted.
_, ok := e.desc.GetReplicaDescriptorByID(l.Replica.ReplicaID)
if !ok {
// If the error is coming from a replica that has a stale range descriptor,
// we cannot trigger a cache eviction since this means we've rebalanced the
// old leaseholder away. If we were to evict here, we'd keep evicting until
// this replica applied the new lease. Not updating the cache here means
// that we'll end up trying all voting replicas until we either hit the new
// leaseholder or hit a replica that has accurate knowledge of the
// leaseholder.
if descGeneration != 0 && descGeneration < e.desc.Generation {
return false, nil
}
return true, nil
}

Expand Down
50 changes: 29 additions & 21 deletions pkg/kv/kvclient/rangecache/range_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1570,29 +1570,32 @@ func TestRangeCacheUpdateLease(t *testing.T) {
StoreID: 4,
ReplicaID: 4,
}

staleRangeGeneration := roachpb.RangeGeneration(2)
nonStaleRangeGeneration := roachpb.RangeGeneration(3)
desc1 := roachpb.RangeDescriptor{
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
rep1, rep2,
},
Generation: 0,
Generation: nonStaleRangeGeneration,
}
desc2 := roachpb.RangeDescriptor{
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
rep2, rep3,
},
Generation: 1,
Generation: nonStaleRangeGeneration + 1,
}
desc3 := roachpb.RangeDescriptor{
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
rep1, rep2,
},
Generation: 2,
Generation: nonStaleRangeGeneration + 2,
}
startKey := desc1.StartKey

Expand Down Expand Up @@ -1621,7 +1624,7 @@ func TestRangeCacheUpdateLease(t *testing.T) {
Sequence: 1,
}
oldTok := tok
ok := tok.UpdateLease(ctx, l)
ok := tok.UpdateLease(ctx, l, 0 /* descGeneration */)
require.True(t, ok)
require.Equal(t, oldTok.Desc(), tok.Desc())
require.Equal(t, &l.Replica, tok.Leaseholder())
Expand All @@ -1643,13 +1646,21 @@ func TestRangeCacheUpdateLease(t *testing.T) {
require.True(t, ri.lease.Empty())
require.Equal(t, roachpb.LEAD_FOR_GLOBAL_READS, ri.ClosedTimestampPolicy())

// Check that trying to update the lease to a non-member replica results
// in the entry's eviction and the token's invalidation.
// Check that trying to update the lease to a non-member replica results in
// the entry's eviction and the token's invalidation if the descriptor
// generation in the error is not older than the cached descriptor generation.
l = &roachpb.Lease{
Replica: repNonMember,
Sequence: 2,
}
ok = tok.UpdateLease(ctx, l)
// Check that there's no eviction if the range desc generation in the error is
// stale.
ok = tok.UpdateLease(ctx, l, staleRangeGeneration)
require.False(t, ok)
require.True(t, tok.Valid())

// However, expect an eviction when the error's desc generation is non-stale.
ok = tok.UpdateLease(ctx, l, nonStaleRangeGeneration)
require.False(t, ok)
require.False(t, tok.Valid())
ri = cache.GetCached(ctx, startKey, false /* inverted */)
Expand All @@ -1672,10 +1683,7 @@ func TestRangeCacheUpdateLease(t *testing.T) {
Desc: desc2,
Lease: roachpb.Lease{},
})
ok = tok.UpdateLease(ctx,
// Specify a lease compatible with desc2.
&roachpb.Lease{Replica: rep2, Sequence: 3},
)
ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep2, Sequence: 3}, 0 /* descGeneration */)
require.True(t, ok)
require.NotNil(t, tok)
require.Equal(t, &desc2, tok.Desc())
Expand All @@ -1690,7 +1698,7 @@ func TestRangeCacheUpdateLease(t *testing.T) {
})
// This time try to specify a lease that's not compatible with the desc. The
// entry should end up evicted from the cache.
ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4})
ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4}, 0 /* descGeneration */)
require.False(t, ok)
require.False(t, tok.Valid())
ri = cache.GetCached(ctx, startKey, false /* inverted */)
Expand Down Expand Up @@ -1734,7 +1742,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: rep1,
Sequence: 1,
}
ok, e := e.updateLease(l)
ok, e := e.updateLease(l, 0 /* descGeneration */)
require.True(t, ok)
require.True(t, l.Equal(e.Lease()))

Expand All @@ -1743,7 +1751,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: rep1,
Sequence: 0,
}
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.True(t, ok)
require.NotNil(t, e.Leaseholder())
require.True(t, l.Replica.Equal(*e.Leaseholder()))
Expand All @@ -1755,7 +1763,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: rep2,
Sequence: 0,
}
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.True(t, ok)
require.NotNil(t, e.Leaseholder())
require.True(t, l.Replica.Equal(*e.Leaseholder()))
Expand All @@ -1765,7 +1773,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: rep1,
Sequence: 0,
}
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.True(t, ok)
require.NotNil(t, e.Leaseholder())
require.True(t, l.Replica.Equal(*e.Leaseholder()))
Expand All @@ -1775,7 +1783,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: rep1,
Sequence: 2,
}
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.True(t, ok)
require.NotNil(t, e.Leaseholder())
require.True(t, l.Equal(*e.Lease()))
Expand All @@ -1785,7 +1793,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: rep2,
Sequence: 1,
}
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.False(t, ok)
require.False(t, l.Equal(*e.Lease()))

Expand All @@ -1794,7 +1802,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: rep2,
Sequence: 2,
}
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.True(t, ok)
require.True(t, l.Equal(e.Lease()))

Expand All @@ -1804,7 +1812,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Sequence: 2,
}
require.True(t, l.Equal(e.Lease()))
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.False(t, ok)
require.True(t, l.Equal(e.Lease()))

Expand All @@ -1814,7 +1822,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) {
Replica: repNonMember,
Sequence: 0,
}
ok, e = e.updateLease(l)
ok, e = e.updateLease(l, 0 /* descGeneration */)
require.True(t, ok)
require.Nil(t, e)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,8 +913,9 @@ func newNotLeaseHolderError(
l roachpb.Lease, proposerStoreID roachpb.StoreID, rangeDesc *roachpb.RangeDescriptor, msg string,
) *roachpb.NotLeaseHolderError {
err := &roachpb.NotLeaseHolderError{
RangeID: rangeDesc.RangeID,
CustomMsg: msg,
RangeID: rangeDesc.RangeID,
DescriptorGeneration: rangeDesc.Generation,
CustomMsg: msg,
}
if proposerStoreID != 0 {
err.Replica, _ = rangeDesc.GetReplicaDescriptor(proposerStoreID)
Expand Down
Loading