Skip to content

Commit

Permalink
Merge #32877
Browse files Browse the repository at this point in the history
32877: kv: detect lease transfer and back off in DistSender r=ajwerner a=ajwerner

This PR address a problem which could lead to very long stalls in range
throughput when a lease transfer occurs when under load. As soon as the
current lease holder begins a lease transfer, it rejects all future requests
to the range with a NotLeaseHolderError which contains the new lease
information. As soon as this happens, the new lease holder immediately begins
receiving requests but is not able to service those requests until it processes
the raft command that makes it the lease hold. Until it applies that command, it
returns NotLeaseHolderError with the previous lease information. Prior to this
change, the DistSender would immediately retry the request at the node indicated
in the most recent NotLeaseHolderError it has received. This leads to a tight
loop of requests bouncing between the current lease holder and the new lease
holder which is unaware of the pending transfer (as observed in #22837) . The
amount of load generated by this traffic can grind raft progress to a complete
halt, with the author observing multi-minute durations for the new node to
process a raft Ready and hundreds of milliseconds to process a single command.
Fortunately, the DistSender can detect when this situation is occurring and can
back off accordingly.

This change detects that a replica is in the midst of a lease transfer by
noticing that it continues to receive NotLeaseHolderErrors without observing
new lease sequence number. In this case, the DistSender backs off exponentially
until it succeeds, fails, or observes a new lease sequence.

Fixes #22837, Fixes #32367

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Dec 7, 2018
2 parents 8d047b6 + 89d349a commit 376e036
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 16 deletions.
62 changes: 46 additions & 16 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ var (
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}
metaDistSenderInLeaseTransferBackoffsCount = metric.Metadata{
Name: "distsender.errors.inleasetransferbackoffs",
Help: "Number of times backed off due to NotLeaseHolderErrors during lease transfer.",
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}
)

var rangeDescriptorCacheSize = settings.RegisterIntSetting(
Expand All @@ -104,26 +110,28 @@ var rangeDescriptorCacheSize = settings.RegisterIntSetting(

// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
AsyncSentCount *metric.Counter
AsyncThrottledCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
AsyncSentCount *metric.Counter
AsyncThrottledCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
InLeaseTransferBackoffs *metric.Counter
}

func makeDistSenderMetrics() DistSenderMetrics {
return DistSenderMetrics{
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount),
AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount),
AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
}
}

Expand Down Expand Up @@ -1324,6 +1332,15 @@ func (ds *DistSender) sendToReplicas(
}
br, err := transport.SendNext(ctx, ba)

// maxSeenLeaseSequence tracks the maximum LeaseSequence seen in a
// NotLeaseHolderError. If we encounter a sequence number less than or equal
// to maxSeenLeaseSequence number in a subsequent NotLeaseHolderError then
// the range must be experiencing a least transfer and the client should back
// off using inTransferRetry.
maxSeenLeaseSequence := roachpb.LeaseSequence(-1)
inTransferRetry := retry.StartWithCtx(ctx, ds.rpcRetryOptions)
inTransferRetry.Next() // The first call to Next does not block.

// This loop will retry operations that fail with errors that reflect
// per-replica state and may succeed on other replicas.
for {
Expand Down Expand Up @@ -1409,6 +1426,19 @@ func (ds *DistSender) sendToReplicas(
transport.MoveToFront(*lh)
}
}
if l := tErr.Lease; !propagateError && l != nil {
// Check whether we've seen this lease or a prior lease before and
// backoff if so or update maxSeenLeaseSequence if not.
if l.Sequence > maxSeenLeaseSequence {
maxSeenLeaseSequence = l.Sequence
inTransferRetry.Reset() // The following Next call will not block.
} else {
ds.metrics.InLeaseTransferBackoffs.Inc(1)
log.VErrEventf(ctx, 2, "backing off due to NotLeaseHolderErr at "+
"LeaseSequence %d <= %d", l.Sequence, maxSeenLeaseSequence)
}
inTransferRetry.Next()
}
default:
propagateError = true
}
Expand Down
85 changes: 85 additions & 0 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -579,6 +580,90 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
}
}

// TestRetryOnNotLeaseHolderError verifies that the DistSender backs off upon
// receiving multiple NotLeaseHolderErrors without observing an increase in
// LeaseSequence.
func TestBackoffOnNotLeaseHolderErrorDuringTransfer(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

g, clock := makeGossip(t, stopper)
leaseHolders := testUserRangeDescriptor3Replicas.Replicas
for _, n := range leaseHolders {
if err := g.AddInfoProto(
gossip.MakeNodeIDKey(n.NodeID),
&roachpb.NodeDescriptor{
NodeID: n.NodeID,
Address: util.MakeUnresolvedAddr("tcp", fmt.Sprintf("neverused:%d", n.NodeID)),
},
gossip.NodeDescriptorTTL,
); err != nil {
t.Fatal(err)
}
}
var sequences []roachpb.LeaseSequence
var testFn simpleSendFn = func(
_ context.Context,
_ SendOptions,
_ ReplicaSlice,
args roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
reply := &roachpb.BatchResponse{}
if len(sequences) > 0 {
seq := sequences[0]
sequences = sequences[1:]
lease := roachpb.Lease{
Sequence: seq,
Replica: leaseHolders[int(seq)%2],
}
reply.Error = roachpb.NewError(
&roachpb.NotLeaseHolderError{
Replica: leaseHolders[int(seq)%2],
LeaseHolder: &leaseHolders[(int(seq)+1)%2],
Lease: &lease,
})
return reply, nil
}
// Return an error to bail out of retries.
reply.Error = roachpb.NewErrorf("boom")
return reply, nil
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(testFn),
},
RangeDescriptorDB: threeReplicaMockRangeDescriptorDB,
NodeDialer: nodedialer.New(nil, gossip.AddressResolver(g)),
RPCRetryOptions: &retry.Options{
InitialBackoff: time.Microsecond,
MaxBackoff: time.Microsecond,
},
}
for i, c := range []struct {
leaseSequences []roachpb.LeaseSequence
expected int64
}{
{[]roachpb.LeaseSequence{1, 0, 1, 2}, 2},
{[]roachpb.LeaseSequence{0}, 0},
{[]roachpb.LeaseSequence{1, 0, 1, 2, 1}, 3},
} {
sequences = c.leaseSequences
ds := NewDistSender(cfg, g)
v := roachpb.MakeValueFromString("value")
put := roachpb.NewPut(roachpb.Key("a"), v)
if _, pErr := client.SendWrapped(context.Background(), ds, put); !testutils.IsPError(pErr, "boom") {
t.Fatalf("%d: unexpected error: %v", i, pErr)
}
if got := ds.Metrics().InLeaseTransferBackoffs.Count(); got != c.expected {
t.Fatalf("%d: expected %d backoffs, got %d", i, c.expected, got)
}
}
}

// This test verifies that when we have a cached leaseholder that is down
// it is ejected from the cache.
func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {
Expand Down

0 comments on commit 376e036

Please sign in to comment.