Skip to content

Commit

Permalink
kv: evict leaseholder on RPC error
Browse files Browse the repository at this point in the history
This addresses a situation in which we would not evict a stale
leaseholder for a long time. Consider the replica [s1,s2,s3] and
s1 is down but is the cached leaseholder, while s2 is the actual lease
holder. The RPC layer will try s1, get an RPC error, try s2 and succeed.
Since there is no NotLeaseHolderError involved, the cache would not get
updated, and so every request pays the overhead of trying s1 first.

Fixes cockroachdb#23601.

Release note: None
  • Loading branch information
tbg committed Aug 16, 2018
1 parent d556fa9 commit bc2bffd
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 11 deletions.
22 changes: 20 additions & 2 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,9 @@ func (ds *DistSender) sendToReplicas(
return nil, roachpb.NewSendError(
fmt.Sprintf("sending to all %d replicas failed", len(replicas)))
}
log.VEventf(ctx, 2, "r%d: sending batch %s to %s", rangeID, args.Summary(), transport.NextReplica())

curReplica := transport.NextReplica()
log.VEventf(ctx, 2, "r%d: sending batch %s to %s", rangeID, args.Summary(), curReplica)
br, err := transport.SendNext(ctx)

// This loop will retry operations that fail with errors that reflect
Expand All @@ -1302,6 +1304,21 @@ func (ds *DistSender) sendToReplicas(
ambiguousError = err
}
log.VErrEventf(ctx, 2, "RPC error: %s", err)
if storeID, ok := ds.leaseHolderCache.Lookup(ctx, rangeID); ok && curReplica.StoreID == storeID {
// If the down replica is cached as the lease holder, evict
// it. The only other eviction happens below on
// NotLeaseHolderError, but if the next replica is the
// actual lease holder, we're never going to receive one of
// those and will thus pay the price of trying the down node
// first forever.
//
// NB: we could consider instead adding a successful reply
// from the next replica into the cache, but without a
// leaseholder (and taking into account that the local
// node can't be down) it won't take long until we talk
// to a replica that tells us who the leaseholder is.
ds.leaseHolderCache.Update(ctx, rangeID, 0 /* evict */)
}
} else {
propagateError := false
switch tErr := br.Error.GetDetail().(type) {
Expand Down Expand Up @@ -1375,7 +1392,8 @@ func (ds *DistSender) sendToReplicas(
}

ds.metrics.NextReplicaErrCount.Inc(1)
log.VEventf(ctx, 2, "error: %v %v; trying next peer %s", br, err, transport.NextReplica())
curReplica = transport.NextReplica()
log.VEventf(ctx, 2, "error: %v %v; trying next peer %s", br, err, curReplica)
br, err = transport.SendNext(ctx)
}
}
112 changes: 103 additions & 9 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,33 +131,40 @@ func adaptSimpleTransport(fn simpleSendFn) TransportFactory {
replicas ReplicaSlice,
args roachpb.BatchRequest,
) (Transport, error) {
return &simpleTransportAdapter{fn, opts, replicas, args, false}, nil
return &simpleTransportAdapter{
fn: fn,
opts: opts,
replicas: replicas,
args: args}, nil
}
}

type simpleTransportAdapter struct {
fn simpleSendFn
opts SendOptions
replicas ReplicaSlice
args roachpb.BatchRequest

called bool
fn simpleSendFn
opts SendOptions
replicas ReplicaSlice
args roachpb.BatchRequest
nextReplica int
}

func (l *simpleTransportAdapter) IsExhausted() bool {
return l.called
return l.nextReplica >= len(l.replicas)
}

func (l *simpleTransportAdapter) GetPending() []roachpb.ReplicaDescriptor {
return nil
}

func (l *simpleTransportAdapter) SendNext(ctx context.Context) (*roachpb.BatchResponse, error) {
l.called = true
l.args.Replica = l.replicas[l.nextReplica].ReplicaDescriptor
l.nextReplica++
return l.fn(ctx, l.opts, l.replicas, l.args)
}

func (l *simpleTransportAdapter) NextReplica() roachpb.ReplicaDescriptor {
if !l.IsExhausted() {
return l.replicas[l.nextReplica].ReplicaDescriptor
}
return roachpb.ReplicaDescriptor{}
}

Expand Down Expand Up @@ -568,6 +575,93 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
}
}

// This test verifies that when we have a cached leaseholder that is down
// it is ejected from the cache.
func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

g, clock := makeGossip(t, stopper)
if err := g.AddInfoProto(
gossip.MakeNodeIDKey(roachpb.NodeID(2)),
&roachpb.NodeDescriptor{
NodeID: 2,
Address: util.MakeUnresolvedAddr("tcp", "neverused:12345"),
},
gossip.NodeDescriptorTTL,
); err != nil {
t.Fatal(err)
}

var contacted1, contacted2 bool

transport := func(
ctx context.Context,
opts SendOptions,
replicas ReplicaSlice,
ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
switch ba.Replica.StoreID {
case 1:
contacted1 = true
return nil, errors.New("mock RPC error")
case 2:
contacted2 = true
return ba.CreateReply(), nil
default:
panic("unexpected replica: " + ba.Replica.String())
}
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(transport),
},
RangeDescriptorDB: mockRangeDescriptorDBForDescs(
roachpb.RangeDescriptor{
RangeID: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
Replicas: []roachpb.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
},
{
NodeID: 2,
StoreID: 2,
},
},
}),
}

ds := NewDistSender(cfg, g)
ds.LeaseHolderCache().Update(ctx, roachpb.RangeID(1), roachpb.StoreID(1))

var ba roachpb.BatchRequest
ba.RangeID = 1
get := &roachpb.GetRequest{}
get.Key = roachpb.Key("a")
ba.Add(get)

if _, pErr := ds.Send(ctx, ba); pErr != nil {
t.Fatal(pErr)
}

if !contacted1 || !contacted2 {
t.Errorf("contacted n1: %t, contacted n2: %t", contacted1, contacted2)
}

if storeID, ok := ds.LeaseHolderCache().Lookup(ctx, roachpb.RangeID(1)); ok {
t.Fatalf("expected no lease holder for r1, but got s%d", storeID)
}
}

// TestRetryOnDescriptorLookupError verifies that the DistSender retries a descriptor
// lookup on any error.
func TestRetryOnDescriptorLookupError(t *testing.T) {
Expand Down

0 comments on commit bc2bffd

Please sign in to comment.