From 89d349ac58d5b1986c494851d8a31971410e3a1e Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 5 Dec 2018 12:48:32 -0500 Subject: [PATCH] kv: detect lease transfer and back off in DistSender This PR address a problem which could lead to very long stalls in range throughput when a lease transfer occurs when under load. As soon as the current lease holder begins a lease transfer, it rejects all future requests to the range with a NotLeaseHolderError which contains the new lease information. As soon as this happens, the new lease holder immediately begins receiving requests but is not able to service those requests until it processes the raft command that makes it the lease hold. Until it applies that command, it returns NotLeaseHolderError with the previous lease information. Prior to this change, the DistSender would immediately retry the request at the node indicated in the most recent NotLeaseHolderError it has received. This leads to a tight loop of requests bouncing between the current lease holder and the new lease holder which is unaware of the pending transfer (as observed in #22837) . The amount of load generated by this traffic can grind raft progress to a complete halt, with the author observing multi-minute durations for the new node to process a raft Ready and hundreds of milliseconds to process a single command. Fortunately, the DistSender can detect when this situation is occurring and can back off accordingly. This change detects that a replica is in the midst of a lease transfer by noticing that it continues to receive NotLeaseHolderErrors without observing new lease sequence number. In this case, the DistSender backs off exponentially until it succeeds, fails, or observes a new lease sequence. Fixes #22837, Fixes #32367 Release note: None --- pkg/kv/dist_sender.go | 62 ++++++++++++++++++++------- pkg/kv/dist_sender_test.go | 85 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+), 16 deletions(-) diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index 1ccf87b11ad4..ec8e0f26374c 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -94,6 +94,12 @@ var ( Measurement: "Errors", Unit: metric.Unit_COUNT, } + metaDistSenderInLeaseTransferBackoffsCount = metric.Metadata{ + Name: "distsender.errors.inleasetransferbackoffs", + Help: "Number of times backed off due to NotLeaseHolderErrors during lease transfer.", + Measurement: "Errors", + Unit: metric.Unit_COUNT, + } ) var rangeDescriptorCacheSize = settings.RegisterIntSetting( @@ -104,26 +110,28 @@ var rangeDescriptorCacheSize = settings.RegisterIntSetting( // DistSenderMetrics is the set of metrics for a given distributed sender. type DistSenderMetrics struct { - BatchCount *metric.Counter - PartialBatchCount *metric.Counter - AsyncSentCount *metric.Counter - AsyncThrottledCount *metric.Counter - SentCount *metric.Counter - LocalSentCount *metric.Counter - NextReplicaErrCount *metric.Counter - NotLeaseHolderErrCount *metric.Counter + BatchCount *metric.Counter + PartialBatchCount *metric.Counter + AsyncSentCount *metric.Counter + AsyncThrottledCount *metric.Counter + SentCount *metric.Counter + LocalSentCount *metric.Counter + NextReplicaErrCount *metric.Counter + NotLeaseHolderErrCount *metric.Counter + InLeaseTransferBackoffs *metric.Counter } func makeDistSenderMetrics() DistSenderMetrics { return DistSenderMetrics{ - BatchCount: metric.NewCounter(metaDistSenderBatchCount), - PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount), - AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount), - AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount), - SentCount: metric.NewCounter(metaTransportSentCount), - LocalSentCount: metric.NewCounter(metaTransportLocalSentCount), - NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount), - NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount), + BatchCount: metric.NewCounter(metaDistSenderBatchCount), + PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount), + AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount), + AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount), + SentCount: metric.NewCounter(metaTransportSentCount), + LocalSentCount: metric.NewCounter(metaTransportLocalSentCount), + NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount), + NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount), + InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount), } } @@ -1324,6 +1332,15 @@ func (ds *DistSender) sendToReplicas( } br, err := transport.SendNext(ctx, ba) + // maxSeenLeaseSequence tracks the maximum LeaseSequence seen in a + // NotLeaseHolderError. If we encounter a sequence number less than or equal + // to maxSeenLeaseSequence number in a subsequent NotLeaseHolderError then + // the range must be experiencing a least transfer and the client should back + // off using inTransferRetry. + maxSeenLeaseSequence := roachpb.LeaseSequence(-1) + inTransferRetry := retry.StartWithCtx(ctx, ds.rpcRetryOptions) + inTransferRetry.Next() // The first call to Next does not block. + // This loop will retry operations that fail with errors that reflect // per-replica state and may succeed on other replicas. for { @@ -1409,6 +1426,19 @@ func (ds *DistSender) sendToReplicas( transport.MoveToFront(*lh) } } + if l := tErr.Lease; !propagateError && l != nil { + // Check whether we've seen this lease or a prior lease before and + // backoff if so or update maxSeenLeaseSequence if not. + if l.Sequence > maxSeenLeaseSequence { + maxSeenLeaseSequence = l.Sequence + inTransferRetry.Reset() // The following Next call will not block. + } else { + ds.metrics.InLeaseTransferBackoffs.Inc(1) + log.VErrEventf(ctx, 2, "backing off due to NotLeaseHolderErr at "+ + "LeaseSequence %d <= %d", l.Sequence, maxSeenLeaseSequence) + } + inTransferRetry.Next() + } default: propagateError = true } diff --git a/pkg/kv/dist_sender_test.go b/pkg/kv/dist_sender_test.go index 890c9ba35356..721e45c730c6 100644 --- a/pkg/kv/dist_sender_test.go +++ b/pkg/kv/dist_sender_test.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -581,6 +582,90 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) { } } +// TestRetryOnNotLeaseHolderError verifies that the DistSender backs off upon +// receiving multiple NotLeaseHolderErrors without observing an increase in +// LeaseSequence. +func TestBackoffOnNotLeaseHolderErrorDuringTransfer(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + + g, clock := makeGossip(t, stopper) + leaseHolders := testUserRangeDescriptor3Replicas.Replicas + for _, n := range leaseHolders { + if err := g.AddInfoProto( + gossip.MakeNodeIDKey(n.NodeID), + &roachpb.NodeDescriptor{ + NodeID: n.NodeID, + Address: util.MakeUnresolvedAddr("tcp", fmt.Sprintf("neverused:%d", n.NodeID)), + }, + gossip.NodeDescriptorTTL, + ); err != nil { + t.Fatal(err) + } + } + var sequences []roachpb.LeaseSequence + var testFn simpleSendFn = func( + _ context.Context, + _ SendOptions, + _ ReplicaSlice, + args roachpb.BatchRequest, + ) (*roachpb.BatchResponse, error) { + reply := &roachpb.BatchResponse{} + if len(sequences) > 0 { + seq := sequences[0] + sequences = sequences[1:] + lease := roachpb.Lease{ + Sequence: seq, + Replica: leaseHolders[int(seq)%2], + } + reply.Error = roachpb.NewError( + &roachpb.NotLeaseHolderError{ + Replica: leaseHolders[int(seq)%2], + LeaseHolder: &leaseHolders[(int(seq)+1)%2], + Lease: &lease, + }) + return reply, nil + } + // Return an error to bail out of retries. + reply.Error = roachpb.NewErrorf("boom") + return reply, nil + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(testFn), + }, + RangeDescriptorDB: threeReplicaMockRangeDescriptorDB, + NodeDialer: nodedialer.New(nil, gossip.AddressResolver(g)), + RPCRetryOptions: &retry.Options{ + InitialBackoff: time.Microsecond, + MaxBackoff: time.Microsecond, + }, + } + for i, c := range []struct { + leaseSequences []roachpb.LeaseSequence + expected int64 + }{ + {[]roachpb.LeaseSequence{1, 0, 1, 2}, 2}, + {[]roachpb.LeaseSequence{0}, 0}, + {[]roachpb.LeaseSequence{1, 0, 1, 2, 1}, 3}, + } { + sequences = c.leaseSequences + ds := NewDistSender(cfg, g) + v := roachpb.MakeValueFromString("value") + put := roachpb.NewPut(roachpb.Key("a"), v) + if _, pErr := client.SendWrapped(context.Background(), ds, put); !testutils.IsPError(pErr, "boom") { + t.Fatalf("%d: unexpected error: %v", i, pErr) + } + if got := ds.Metrics().InLeaseTransferBackoffs.Count(); got != c.expected { + t.Fatalf("%d: expected %d backoffs, got %d", i, c.expected, got) + } + } +} + // This test verifies that when we have a cached leaseholder that is down // it is ejected from the cache. func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {