From bc2bffdc5805fc44909fae05acf59b5c65075744 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Thu, 16 Aug 2018 13:09:39 +0200 Subject: [PATCH] kv: evict leaseholder on RPC error This addresses a situation in which we would not evict a stale leaseholder for a long time. Consider the replica [s1,s2,s3] and s1 is down but is the cached leaseholder, while s2 is the actual lease holder. The RPC layer will try s1, get an RPC error, try s2 and succeed. Since there is no NotLeaseHolderError involved, the cache would not get updated, and so every request pays the overhead of trying s1 first. Fixes #23601. Release note: None --- pkg/kv/dist_sender.go | 22 +++++++- pkg/kv/dist_sender_test.go | 112 ++++++++++++++++++++++++++++++++++--- 2 files changed, 123 insertions(+), 11 deletions(-) diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index 9a1bfbab326b..65df0755b9bd 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -1277,7 +1277,9 @@ func (ds *DistSender) sendToReplicas( return nil, roachpb.NewSendError( fmt.Sprintf("sending to all %d replicas failed", len(replicas))) } - log.VEventf(ctx, 2, "r%d: sending batch %s to %s", rangeID, args.Summary(), transport.NextReplica()) + + curReplica := transport.NextReplica() + log.VEventf(ctx, 2, "r%d: sending batch %s to %s", rangeID, args.Summary(), curReplica) br, err := transport.SendNext(ctx) // This loop will retry operations that fail with errors that reflect @@ -1302,6 +1304,21 @@ func (ds *DistSender) sendToReplicas( ambiguousError = err } log.VErrEventf(ctx, 2, "RPC error: %s", err) + if storeID, ok := ds.leaseHolderCache.Lookup(ctx, rangeID); ok && curReplica.StoreID == storeID { + // If the down replica is cached as the lease holder, evict + // it. The only other eviction happens below on + // NotLeaseHolderError, but if the next replica is the + // actual lease holder, we're never going to receive one of + // those and will thus pay the price of trying the down node + // first forever. + // + // NB: we could consider instead adding a successful reply + // from the next replica into the cache, but without a + // leaseholder (and taking into account that the local + // node can't be down) it won't take long until we talk + // to a replica that tells us who the leaseholder is. + ds.leaseHolderCache.Update(ctx, rangeID, 0 /* evict */) + } } else { propagateError := false switch tErr := br.Error.GetDetail().(type) { @@ -1375,7 +1392,8 @@ func (ds *DistSender) sendToReplicas( } ds.metrics.NextReplicaErrCount.Inc(1) - log.VEventf(ctx, 2, "error: %v %v; trying next peer %s", br, err, transport.NextReplica()) + curReplica = transport.NextReplica() + log.VEventf(ctx, 2, "error: %v %v; trying next peer %s", br, err, curReplica) br, err = transport.SendNext(ctx) } } diff --git a/pkg/kv/dist_sender_test.go b/pkg/kv/dist_sender_test.go index 9a17ca9d6943..0225dc261a96 100644 --- a/pkg/kv/dist_sender_test.go +++ b/pkg/kv/dist_sender_test.go @@ -131,21 +131,24 @@ func adaptSimpleTransport(fn simpleSendFn) TransportFactory { replicas ReplicaSlice, args roachpb.BatchRequest, ) (Transport, error) { - return &simpleTransportAdapter{fn, opts, replicas, args, false}, nil + return &simpleTransportAdapter{ + fn: fn, + opts: opts, + replicas: replicas, + args: args}, nil } } type simpleTransportAdapter struct { - fn simpleSendFn - opts SendOptions - replicas ReplicaSlice - args roachpb.BatchRequest - - called bool + fn simpleSendFn + opts SendOptions + replicas ReplicaSlice + args roachpb.BatchRequest + nextReplica int } func (l *simpleTransportAdapter) IsExhausted() bool { - return l.called + return l.nextReplica >= len(l.replicas) } func (l *simpleTransportAdapter) GetPending() []roachpb.ReplicaDescriptor { @@ -153,11 +156,15 @@ func (l *simpleTransportAdapter) GetPending() []roachpb.ReplicaDescriptor { } func (l *simpleTransportAdapter) SendNext(ctx context.Context) (*roachpb.BatchResponse, error) { - l.called = true + l.args.Replica = l.replicas[l.nextReplica].ReplicaDescriptor + l.nextReplica++ return l.fn(ctx, l.opts, l.replicas, l.args) } func (l *simpleTransportAdapter) NextReplica() roachpb.ReplicaDescriptor { + if !l.IsExhausted() { + return l.replicas[l.nextReplica].ReplicaDescriptor + } return roachpb.ReplicaDescriptor{} } @@ -568,6 +575,93 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { } } +// This test verifies that when we have a cached leaseholder that is down +// it is ejected from the cache. +func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + g, clock := makeGossip(t, stopper) + if err := g.AddInfoProto( + gossip.MakeNodeIDKey(roachpb.NodeID(2)), + &roachpb.NodeDescriptor{ + NodeID: 2, + Address: util.MakeUnresolvedAddr("tcp", "neverused:12345"), + }, + gossip.NodeDescriptorTTL, + ); err != nil { + t.Fatal(err) + } + + var contacted1, contacted2 bool + + transport := func( + ctx context.Context, + opts SendOptions, + replicas ReplicaSlice, + ba roachpb.BatchRequest, + ) (*roachpb.BatchResponse, error) { + switch ba.Replica.StoreID { + case 1: + contacted1 = true + return nil, errors.New("mock RPC error") + case 2: + contacted2 = true + return ba.CreateReply(), nil + default: + panic("unexpected replica: " + ba.Replica.String()) + } + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(transport), + }, + RangeDescriptorDB: mockRangeDescriptorDBForDescs( + roachpb.RangeDescriptor{ + RangeID: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + Replicas: []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + }, + { + NodeID: 2, + StoreID: 2, + }, + }, + }), + } + + ds := NewDistSender(cfg, g) + ds.LeaseHolderCache().Update(ctx, roachpb.RangeID(1), roachpb.StoreID(1)) + + var ba roachpb.BatchRequest + ba.RangeID = 1 + get := &roachpb.GetRequest{} + get.Key = roachpb.Key("a") + ba.Add(get) + + if _, pErr := ds.Send(ctx, ba); pErr != nil { + t.Fatal(pErr) + } + + if !contacted1 || !contacted2 { + t.Errorf("contacted n1: %t, contacted n2: %t", contacted1, contacted2) + } + + if storeID, ok := ds.LeaseHolderCache().Lookup(ctx, roachpb.RangeID(1)); ok { + t.Fatalf("expected no lease holder for r1, but got s%d", storeID) + } +} + // TestRetryOnDescriptorLookupError verifies that the DistSender retries a descriptor // lookup on any error. func TestRetryOnDescriptorLookupError(t *testing.T) {