From 11e78d200b22b1b07026a656e33874cfddf3c537 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 5 Dec 2018 12:48:32 -0500 Subject: [PATCH] kv: detect lease transfer and back off in DistSender This PR address a problem which could lead to very long stalls in range throughput when a lease transfer occurs when under load. As soon as the current lease holder begins a lease transfer, it rejects all future requests to the range with a NotLeaseHolderError which contains the new lease information. As soon as this happens, the new lease holder immediately begins receiving requests but is not able to service those requests until it processes the raft command that makes it the lease hold. Until it applies that command, it returns NotLeaseHolderError with the previous lease information. Prior to this change, the DistSender would immediately retry the request at the node indicated in the most recent NotLeaseHolderError it has received. This leads to a tight loop of requests bouncing between the current lease holder and the new lease holder which is unaware of the pending transfer (as observed in #22837) . The amount of load generated by this traffic can grind raft progress to a complete halt, with the author observing multi-minute durations for the new node to process a raft Ready and hundreds of milliseconds to process a single command. Fortunately, the DistSender can detect when this situation is occurring and can back off accordingly. This change detects that a replica is in the midst of a lease transfer by noticing that it continues to receive NotLeaseHolderErrors without observing new lease sequence number. In this case, the DistSender backs off exponentially until it succeeds, fails, or observes a new lease sequence. Fixes #22837, Fixes #32367 Release note: None --- pkg/kv/dist_sender.go | 30 ++++++++++++++ pkg/kv/dist_sender_test.go | 85 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+) diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index 1ccf87b11ad4..5a7c41708008 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -94,6 +94,12 @@ var ( Measurement: "Errors", Unit: metric.Unit_COUNT, } + metaDistSenderInTransferBackoffsErrCount = metric.Metadata{ + Name: "distsender.errors.intransferbackoff", + Help: "Number of times backed off due to NotLeaseHolderErrors during lease transfer.", + Measurement: "Errors", + Unit: metric.Unit_COUNT, + } ) var rangeDescriptorCacheSize = settings.RegisterIntSetting( @@ -112,6 +118,7 @@ type DistSenderMetrics struct { LocalSentCount *metric.Counter NextReplicaErrCount *metric.Counter NotLeaseHolderErrCount *metric.Counter + InTransferBackoffs *metric.Counter } func makeDistSenderMetrics() DistSenderMetrics { @@ -124,6 +131,7 @@ func makeDistSenderMetrics() DistSenderMetrics { LocalSentCount: metric.NewCounter(metaTransportLocalSentCount), NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount), NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount), + InTransferBackoffs: metric.NewCounter(metaDistSenderInTransferBackoffsErrCount), } } @@ -1324,6 +1332,15 @@ func (ds *DistSender) sendToReplicas( } br, err := transport.SendNext(ctx, ba) + // maxSeenLeaseSequence tracks the maximum LeaseSequence seen in a + // NotLeaseHolderError. If we encounter a sequence number less than or equal + // to maxSeenLeaseSequence number in a subsequent NotLeaseHolderError then + // the range must be experiencing a least transfer and the client should back + // off using inTransferRetry. + maxSeenLeaseSequence := roachpb.LeaseSequence(-1) + inTransferRetry := retry.StartWithCtx(ctx, ds.rpcRetryOptions) + inTransferRetry.Next() // The first call to Next does not block. + // This loop will retry operations that fail with errors that reflect // per-replica state and may succeed on other replicas. for { @@ -1409,6 +1426,19 @@ func (ds *DistSender) sendToReplicas( transport.MoveToFront(*lh) } } + if l := tErr.Lease; !propagateError && l != nil { + // Check whether we've seen this lease or a prior lease before and + // backoff if so or update maxSeenLeaseSequence if not. + if l.Sequence > maxSeenLeaseSequence { + maxSeenLeaseSequence = l.Sequence + inTransferRetry.Reset() // The following Next call will not block. + } else { + ds.metrics.InTransferBackoffs.Inc(1) + log.VErrEventf(ctx, 2, "backing off due to NotLeaseHolderErr at "+ + "LeaseSequence %d <= %d", l.Sequence, maxSeenLeaseSequence) + } + inTransferRetry.Next() + } default: propagateError = true } diff --git a/pkg/kv/dist_sender_test.go b/pkg/kv/dist_sender_test.go index 890c9ba35356..55571e8c8181 100644 --- a/pkg/kv/dist_sender_test.go +++ b/pkg/kv/dist_sender_test.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -581,6 +582,90 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { } } +// TestRetryOnNotLeaseHolderError verifies that the DistSender backs off upon +// receiving multiple NotLeaseHolderErrors without observing an increase in +// LeaseSequence. +func TestBackoffOnNotLeaseHolderErrorDuringTransfer(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + + g, clock := makeGossip(t, stopper) + leaseHolders := testUserRangeDescriptor3Replicas.Replicas + for _, n := range leaseHolders { + if err := g.AddInfoProto( + gossip.MakeNodeIDKey(n.NodeID), + &roachpb.NodeDescriptor{ + NodeID: n.NodeID, + Address: util.MakeUnresolvedAddr("tcp", fmt.Sprintf("neverused:%d", n.NodeID)), + }, + gossip.NodeDescriptorTTL, + ); err != nil { + t.Fatal(err) + } + } + var sequences []roachpb.LeaseSequence + var testFn simpleSendFn = func( + _ context.Context, + _ SendOptions, + _ ReplicaSlice, + args roachpb.BatchRequest, + ) (*roachpb.BatchResponse, error) { + reply := &roachpb.BatchResponse{} + if len(sequences) > 0 { + seq := sequences[0] + sequences = sequences[1:] + lease := roachpb.Lease{ + Sequence: seq, + Replica: leaseHolders[int(seq)%2], + } + reply.Error = roachpb.NewError( + &roachpb.NotLeaseHolderError{ + Replica: leaseHolders[int(seq)%2], + LeaseHolder: &leaseHolders[(int(seq)+1)%2], + Lease: &lease, + }) + return reply, nil + } + // Return an error to bail out of retries. + reply.Error = roachpb.NewErrorf("boom") + return reply, nil + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(testFn), + }, + RangeDescriptorDB: threeReplicaMockRangeDescriptorDB, + NodeDialer: nodedialer.New(nil, gossip.AddressResolver(g)), + RPCRetryOptions: &retry.Options{ + InitialBackoff: time.Microsecond, + MaxBackoff: time.Microsecond, + }, + } + for i, c := range []struct { + leaseSequences []roachpb.LeaseSequence + expected int64 + }{ + {[]roachpb.LeaseSequence{1, 0, 1, 2}, 2}, + {[]roachpb.LeaseSequence{0}, 0}, + {[]roachpb.LeaseSequence{1, 0, 1, 2, 1}, 3}, + } { + sequences = c.leaseSequences + ds := NewDistSender(cfg, g) + v := roachpb.MakeValueFromString("value") + put := roachpb.NewPut(roachpb.Key("a"), v) + if _, pErr := client.SendWrapped(context.Background(), ds, put); !testutils.IsPError(pErr, "boom") { + t.Fatalf("%d: unexpected error: %v", i, pErr) + } + if got := ds.Metrics().InTransferBackoffs.Count(); got != c.expected { + t.Fatalf("%d: expected %d backoffs, got %d", i, c.expected, got) + } + } +} + // This test verifies that when we have a cached leaseholder that is down // it is ejected from the cache. func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {