Skip to content

Commit

Permalink
kvcoord: fix extremely rare OOM hazard in the DistSender
Browse files Browse the repository at this point in the history
The outer loop in the DistSender subdivides a BatchRequest into partial
batches corresponding to a single range and sends them out. Previously,
the resolved span passed into the function responsible for sending out
this partial batch (`sendPartialBatch`) corresponded to the span of the
entire batch request (as opposed to the partial batch). This
resolved span is used to check if the request needs to be subdivided
further between retries in the outer loop of the DistSender. Given the
wrong parameter value, we'd always end up determining that the batch
needed to be subdivided.

This wasn't really an issue in practice as we don't expect too many
retries here. However, if we did (eg. timeouts causing the transport
to be exhausted on every try), the infinite recursion here could lead
to an OOM.

The issue here was introduced in #7fb06a22d6b5ac19f764306bdb43133946da9664
when we stopped truncating the supplied `rs` to the supplied range
descriptor.

References #87167

Release note: None
  • Loading branch information
arulajmani committed Sep 26, 2022
1 parent fc308c1 commit d46a1f5
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 17 deletions.
33 changes: 20 additions & 13 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
ds.dontReorderReplicas = cfg.TestingKnobs.DontReorderReplicas
ds.dontConsiderConnHealth = cfg.TestingKnobs.DontConsiderConnHealth
ds.rpcRetryOptions = base.DefaultRetryOptions()
// TODO(arul): The rpcRetryOptions passed in here from server/tenant don't
// set a max retries limit. Should they?
if cfg.RPCRetryOptions != nil {
ds.rpcRetryOptions = *cfg.RPCRetryOptions
}
Expand Down Expand Up @@ -1392,11 +1394,11 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// If we can reserve one of the limited goroutines available for parallel
// batch RPCs, send asynchronously.
if canParallelize && !lastRange && !ds.disableParallelBatches &&
ds.sendPartialBatchAsync(ctx, curRangeBatch, rs, isReverse, withCommit, batchIdx, ri.Token(), responseCh, positions) {
ds.sendPartialBatchAsync(ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), responseCh, positions) {
// Sent the batch asynchronously.
} else {
resp := ds.sendPartialBatch(
ctx, curRangeBatch, rs, isReverse, withCommit, batchIdx, ri.Token(), positions,
ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), positions,
)
responseCh <- resp
if resp.pErr != nil {
Expand Down Expand Up @@ -1530,21 +1532,26 @@ func slowRangeRPCReturnWarningStr(s *redact.StringBuilder, dur time.Duration, at
s.Printf("slow RPC finished after %.2fs (%d attempts)", dur.Seconds(), attempts)
}

// sendPartialBatch sends the supplied batch to the range specified by desc.
// sendPartialBatch sends the supplied batch to the range specified by the
// routing token.
//
// The batch request is supposed to be truncated already so that it contains
// only requests which intersect the range descriptor and keys for each request
// are limited to the range's key span. positions argument describes how the
// given BatchRequest corresponds to the original, un-truncated one, and allows
// us to combine the response later via BatchResponse.Combine. (nil positions
// argument should be used when the original batch request is fully contained
// within a single range.)
// The batch request is supposed to be truncated already so that it only
// contains requests which intersect the range descriptor and keys for each
// request are limited to the range's key span. The rs argument corresponds to
// the span encompassing the key ranges of all requests in the truncated batch.
// It should be entirely contained within the range descriptor of the supplied
// routing token. The positions argument describes how the given batch request
// corresponds to the original, un-truncated one, and allows us to combine the
// response later via BatchResponse.Combine. (nil positions argument should be
// used when the original batch request is fully contained within a single
// range.)
//
// The send occurs in a retry loop to handle send failures. On failure to send
// to any replicas, we backoff and retry by refetching the range descriptor. If
// the underlying range seems to have split, we recursively invoke
// divideAndSendBatchToRanges to re-enumerate the ranges in the span and resend
// to each.
// the underlying range seems to have split (determined by checking if the
// supplied rs is no longer entirely contained within the refreshed range
// descriptor) we recursively invoke divideAndSendBatchToRanges to re-enumerate
// the ranges in the span and resend to each.
func (ds *DistSender) sendPartialBatch(
ctx context.Context,
ba roachpb.BatchRequest,
Expand Down
84 changes: 84 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4553,6 +4553,90 @@ func TestRequestSubdivisionAfterDescriptorChange(t *testing.T) {
}
}

// TestRequestSubdivisionAfterDescriptorChangeWithUnavailableReplicasTerminates
// acts as a regression test for #87167. It essentially guards against infinite
// recursion which could happen in a very rare cases. Specifically, where a
// batch request spanned multiple ranges, but one or more of these ranges did
// not return a result and the DistSender exhausted the entire transport on
// each attempt. We simulate this by returning a sendError.
func TestRequestSubdivisionAfterDescriptorChangeWithUnavailableReplicasTerminates(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

keyA := roachpb.Key("a")
keyB := roachpb.Key("b")
keyC := roachpb.Key("c")
splitKey := keys.MustAddr(keyB)

get := func(k roachpb.Key) roachpb.Request {
return roachpb.NewGet(k, false /* forUpdate */)
}

ctx := context.Background()
tr := tracing.NewTracer()
stopper := stop.NewStopper(stop.WithTracer(tr))
defer stopper.Stop(ctx)

clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
g := makeGossip(t, stopper, rpcContext)

repls := []roachpb.ReplicaDescriptor{{
NodeID: 1,
StoreID: 1,
}}
splitDescs := []roachpb.RangeDescriptor{{
RangeID: roachpb.RangeID(1),
Generation: 2,
StartKey: roachpb.RKeyMin,
EndKey: splitKey,
InternalReplicas: repls,
}, {
RangeID: roachpb.RangeID(2),
Generation: 2,
StartKey: splitKey,
EndKey: roachpb.RKeyMax,
InternalReplicas: repls,
}}

splitRDB := mockRangeDescriptorDBForDescs(splitDescs...)

var numAttempts int32
transportFn := func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) {
atomic.AddInt32(&numAttempts, 1)
require.Equal(t, 1, len(ba.Requests))
return nil, newSendError("boom")
}
rpcRetryOptions := &retry.Options{
MaxRetries: 5, // maxAttempts = 6
}
cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tr},
Clock: clock,
NodeDescs: g,
RPCRetryOptions: rpcRetryOptions,
RPCContext: rpcContext,
RangeDescriptorDB: splitRDB,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(transportFn),
},
Settings: cluster.MakeTestingClusterSettings(),
}

ds := NewDistSender(cfg)

var ba roachpb.BatchRequest
ba.Add(get(keyA), get(keyC))
// Inconsistent read because otherwise the batch will ask to be re-sent in a
// txn when split.
ba.ReadConsistency = roachpb.INCONSISTENT
_, pErr := ds.Send(ctx, ba)
require.NotNil(t, pErr)
require.True(t, testutils.IsError(pErr.GoError(), "boom"))
// 6 attempts each for the two partial batches.
require.Equal(t, int32(12), atomic.LoadInt32(&numAttempts))
}

// TestDescriptorChangeAfterRequestSubdivision is similar to
// TestRequestSubdivisionAfterDescriptorChange, but it exercises a scenario
// where the request is subdivided before observing a descriptor change. After
Expand Down
9 changes: 5 additions & 4 deletions pkg/util/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (

// Options provides reusable configuration of Retry objects.
type Options struct {
InitialBackoff time.Duration // Default retry backoff interval
MaxBackoff time.Duration // Maximum retry backoff interval
Multiplier float64 // Default backoff constant
MaxRetries int // Maximum number of attempts (0 for infinite)
InitialBackoff time.Duration // Default retry backoff interval
MaxBackoff time.Duration // Maximum retry backoff interval
Multiplier float64 // Default backoff constant
// Maximum number of retries; attempts = MaxRetries + 1. (0 for infinite)
MaxRetries int
RandomizationFactor float64 // Randomize the backoff interval by constant
Closer <-chan struct{} // Optionally end retry loop channel close
}
Expand Down

0 comments on commit d46a1f5

Please sign in to comment.