Skip to content

Commit

Permalink
Merge #113111
Browse files Browse the repository at this point in the history
113111: kvcoord: fix error index in case of range split r=yuzefovich a=yuzefovich

This commit fixes the DistSender to set the correct error index in case an error is encountered after `divideAndSendBatchToRanges` was called recursively due to a stale range cache. In this scenario, previously we would have the error index set as if the original batch was the one right before the recursive call, which is incorrect in case that batch itself was split (e.g. because the original batch touched multiple ranges, and each sub-batch was executed in parallel). We already had the error index mapping code in place for the main code path, but we forgot to do the error index re-mapping after two recursive calls, which is now fixed.

This commit additionally pulls out the logic to set `response.positions` out of `sendPartialBatch` since there are fewer places to do that one level up.

This bug has been present for a very long time, but it seems relatively minor, so I decided to not include the release note.

Fixes: #111481.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Nov 1, 2023
2 parents 8c7f1a9 + 5f8e863 commit f515f70
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 24 deletions.
43 changes: 22 additions & 21 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,11 @@ func (ds *DistSender) incrementBatchCounters(ba *kvpb.BatchRequest) {
}

type response struct {
reply *kvpb.BatchResponse
reply *kvpb.BatchResponse
// positions argument describes how the given batch request corresponds to
// the original, un-truncated one, and allows us to combine the response
// later via BatchResponse.Combine. (nil positions should be used when the
// original batch request is fully contained within a single range.)
positions []int
pErr *kvpb.Error
}
Expand Down Expand Up @@ -1449,8 +1453,10 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// Take the fast path if this batch fits within a single range.
if !ri.NeedAnother(rs) {
resp := ds.sendPartialBatch(
ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(), nil, /* positions */
ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(),
)
// resp.positions remains nil since the original batch is fully
// contained within a single range.
return resp.reply, resp.pErr
}

Expand Down Expand Up @@ -1534,6 +1540,11 @@ func (ds *DistSender) divideAndSendBatchToRanges(
for _, responseCh := range responseChs {
resp := <-responseCh
if resp.pErr != nil {
// Re-map the error index within this partial batch back to its
// position in the encompassing batch.
if resp.pErr.Index != nil && resp.pErr.Index.Index != -1 && resp.positions != nil {
resp.pErr.Index.Index = int32(resp.positions[resp.pErr.Index.Index])
}
if pErr == nil {
pErr = resp.pErr
// Update the error's transaction with any new information from
Expand Down Expand Up @@ -1647,8 +1658,9 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// Sent the batch asynchronously.
} else {
resp := ds.sendPartialBatch(
ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), positions,
ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(),
)
resp.positions = positions
responseCh <- resp
if resp.pErr != nil {
return
Expand Down Expand Up @@ -1749,9 +1761,9 @@ func (ds *DistSender) sendPartialBatchAsync(
},
func(ctx context.Context) {
ds.metrics.AsyncSentCount.Inc(1)
responseCh <- ds.sendPartialBatch(
ctx, ba, rs, isReverse, withCommit, batchIdx, routing, positions,
)
resp := ds.sendPartialBatch(ctx, ba, rs, isReverse, withCommit, batchIdx, routing)
resp.positions = positions
responseCh <- resp
},
); err != nil {
ds.metrics.AsyncThrottledCount.Inc(1)
Expand Down Expand Up @@ -1789,11 +1801,7 @@ func slowRangeRPCReturnWarningStr(s *redact.StringBuilder, dur time.Duration, at
// request are limited to the range's key span. The rs argument corresponds to
// the span encompassing the key ranges of all requests in the truncated batch.
// It should be entirely contained within the range descriptor of the supplied
// routing token. The positions argument describes how the given batch request
// corresponds to the original, un-truncated one, and allows us to combine the
// response later via BatchResponse.Combine. (nil positions argument should be
// used when the original batch request is fully contained within a single
// range.)
// routing token.
//
// The send occurs in a retry loop to handle send failures. On failure to send
// to any replicas, we backoff and retry by refetching the range descriptor. If
Expand All @@ -1809,7 +1817,6 @@ func (ds *DistSender) sendPartialBatch(
withCommit bool,
batchIdx int,
routingTok rangecache.EvictionToken,
positions []int,
) response {
if batchIdx == 1 {
ds.metrics.PartialBatchCount.Inc(2) // account for first batch
Expand Down Expand Up @@ -1872,7 +1879,7 @@ func (ds *DistSender) sendPartialBatch(
if !intersection.Equal(rs) {
log.Eventf(ctx, "range shrunk; sub-dividing the request")
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
return response{reply: reply, positions: positions, pErr: pErr}
return response{reply: reply, pErr: pErr}
}
}

Expand Down Expand Up @@ -1930,19 +1937,13 @@ func (ds *DistSender) sendPartialBatch(

// If sending succeeded, return immediately.
if reply.Error == nil {
return response{reply: reply, positions: positions}
return response{reply: reply}
}

// Untangle the error from the received response.
pErr = reply.Error
reply.Error = nil // scrub the response error

// Re-map the error index within this partial batch back
// to its position in the encompassing batch.
if pErr.Index != nil && pErr.Index.Index != -1 && positions != nil {
pErr.Index.Index = int32(positions[pErr.Index.Index])
}

log.VErrEventf(ctx, 2, "reply error %s: %s", ba, pErr)

// Error handling: If the error indicates that our range
Expand Down Expand Up @@ -1979,7 +1980,7 @@ func (ds *DistSender) sendPartialBatch(
// with unknown mapping to our truncated reply).
log.VEventf(ctx, 1, "likely split; will resend. Got new descriptors: %s", tErr.Ranges)
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
return response{reply: reply, positions: positions, pErr: pErr}
return response{reply: reply, pErr: pErr}
}
break
}
Expand Down
169 changes: 166 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1663,9 +1663,9 @@ func TestEvictCacheOnError(t *testing.T) {
},
}

rangeMismachErr := kvpb.NewRangeKeyMismatchError(
rangeMismatchErr := kvpb.NewRangeKeyMismatchError(
context.Background(), nil, nil, &lhs, nil /* lease */)
rangeMismachErr.AppendRangeInfo(context.Background(), roachpb.RangeInfo{Desc: rhs, Lease: roachpb.Lease{}})
rangeMismatchErr.AppendRangeInfo(context.Background(), roachpb.RangeInfo{Desc: rhs, Lease: roachpb.Lease{}})

testCases := []struct {
canceledCtx bool
Expand All @@ -1674,7 +1674,7 @@ func TestEvictCacheOnError(t *testing.T) {
shouldClearReplica bool
}{
{false, errors.New(errString), false, false}, // non-retryable replica error
{false, rangeMismachErr, false, false}, // RangeKeyMismatch replica error
{false, rangeMismatchErr, false, false}, // RangeKeyMismatch replica error
{false, &kvpb.RangeNotFoundError{}, false, false}, // RangeNotFound replica error
{false, nil, false, false}, // RPC error
{true, nil, false, false}, // canceled context
Expand Down Expand Up @@ -4552,6 +4552,169 @@ func TestDistSenderSlowLogMessage(t *testing.T) {
}
}

// TestErrorIndexOnRangeSplit verifies that in case divideAndSendBatchToRanges
// is called recursively due to a stale range descriptor and the error is
// encountered during that recursive call, the error index is set correctly to
// point to the request in the original batch. It is a regression test for
// #111481.
func TestErrorIndexOnRangeSplit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

keyA := roachpb.Key("a")
keyB := roachpb.Key("b")
keyC := roachpb.Key("c")
splitKey := keys.MustAddr(keyC)
newSplitKey := keys.MustAddr(keyB)

ctx := context.Background()
tr := tracing.NewTracer()
stopper := stop.NewStopper(stop.WithTracer(tr))
defer stopper.Stop(ctx)

clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
g := makeGossip(t, stopper, rpcContext)

// We start out with two ranges split at splitKey. This is needed so that
// we don't hit the fast-path in divideAndSendBatchToRanges where only a
// single range is touched.
repls := []roachpb.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
},
{
NodeID: 1,
StoreID: 1,
},
}
initDescs := []roachpb.RangeDescriptor{
{
RangeID: roachpb.RangeID(1),
Generation: 1,
StartKey: roachpb.RKeyMin,
EndKey: splitKey,
InternalReplicas: repls,
},
{
RangeID: roachpb.RangeID(2),
Generation: 1,
StartKey: splitKey,
EndKey: roachpb.RKeyMax,
InternalReplicas: repls,
},
}
// We'll send a BatchRequest touching the original two ranges, which is so
// that it is split into two parallel BatchRequests, then on the one
// touching the first range we'll inject a range key mismatch error to
// simulate a range split. Thus, after the injected split we'll have three
// ranges with split points at newSplitKey and splitKey.
splitDescs := []roachpb.RangeDescriptor{
{
RangeID: roachpb.RangeID(1),
Generation: 2,
StartKey: roachpb.RKeyMin,
EndKey: newSplitKey,
InternalReplicas: repls,
},
{
RangeID: roachpb.RangeID(3),
Generation: 2,
StartKey: newSplitKey,
EndKey: splitKey,
InternalReplicas: repls,
},
{
RangeID: roachpb.RangeID(2),
Generation: 2,
StartKey: splitKey,
EndKey: roachpb.RKeyMax,
InternalReplicas: repls,
},
}

initialRDB := mockRangeDescriptorDBForDescs(initDescs...)
splitRDB := mockRangeDescriptorDBForDescs(splitDescs...)

var rc *rangecache.RangeCache
var wg sync.WaitGroup
var once sync.Once
wg.Add(2)
waitThenSwitchToSplitDesc := func() {
// Wait for both partial requests to be sent.
wg.Done()
wg.Wait()
// Switch out the RangeDescriptorDB.
once.Do(func() { rc.TestingSetDB(splitRDB) })
}

transportFn := func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
rep := ba.CreateReply()
switch ba.ClientRangeInfo.DescriptorGeneration {
case 1:
waitThenSwitchToSplitDesc()
// We have two parallel batches: one with 2 requests and another
// with 1 request. Do nothing for the latter.
if len(ba.Requests) != 2 {
return rep, nil
}
// For the batch with 2 requests we inject an error to simulate a
// stale range cache.
rangeMismatchErr := kvpb.NewRangeKeyMismatchError(ctx, nil, nil, &splitDescs[0], nil /* lease */)
rangeMismatchErr.AppendRangeInfo(context.Background(), roachpb.RangeInfo{Desc: splitDescs[1], Lease: roachpb.Lease{}})
rep.Error = kvpb.NewError(rangeMismatchErr)
return rep, nil
case 2:
// We'll receive a few batches, all with 1 Get. Find the one
// targeting keyB and simulate it encountering an error (that could
// occur for InitPuts).
if len(ba.Requests) != 1 {
require.Fail(t, "unexpected number of requests in a batch")
}
if ba.Requests[0].GetGet().Key.Equal(keyB) {
rep.Error = kvpb.NewError(&kvpb.ConditionFailedError{})
// This is the zeroth request in a batch with 1 request.
rep.Error.SetErrorIndex(0)
}
return rep, nil
default:
require.Fail(t, "unexpected desc generation")
return nil, nil
}
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tr},
Clock: clock,
NodeDescs: g,
RPCContext: rpcContext,
RangeDescriptorDB: initialRDB,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(transportFn),
},
Settings: cluster.MakeTestingClusterSettings(),
}

ds := NewDistSender(cfg)
rc = ds.rangeCache

ba := &kvpb.BatchRequest{}
// Issue a BatchRequest such that it touches the original two ranges, and
// then the last request gets an error injected.
ba.Add(kvpb.NewGet(keyA), kvpb.NewGet(keyC), kvpb.NewGet(keyB))
// Inconsistent read because otherwise the batch will ask to be re-sent in a
// txn when split.
ba.ReadConsistency = kvpb.INCONSISTENT

_, pErr := ds.Send(ctx, ba)
require.NotNil(t, pErr)
require.NotNil(t, pErr.Index)
// This is the crux of the test - the error should have the position of
// Get(keyB) request from the original batch.
require.Equal(t, int32(2), pErr.Index.Index)
}

// Test the following scenario: the DistSender sends a request that results in a
// sendError, meaning that the descriptor is probably stale. The descriptor is
// then refreshed, and it turns out that the range had split in the meantime.
Expand Down

0 comments on commit f515f70

Please sign in to comment.