Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.2: kvcoord: fix error index in case of range split #113638

Merged
merged 1 commit into from
Nov 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,11 @@ func (ds *DistSender) incrementBatchCounters(ba *kvpb.BatchRequest) {
}

type response struct {
reply *kvpb.BatchResponse
reply *kvpb.BatchResponse
// positions argument describes how the given batch request corresponds to
// the original, un-truncated one, and allows us to combine the response
// later via BatchResponse.Combine. (nil positions should be used when the
// original batch request is fully contained within a single range.)
positions []int
pErr *kvpb.Error
}
Expand Down Expand Up @@ -1449,8 +1453,10 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// Take the fast path if this batch fits within a single range.
if !ri.NeedAnother(rs) {
resp := ds.sendPartialBatch(
ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(), nil, /* positions */
ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(),
)
// resp.positions remains nil since the original batch is fully
// contained within a single range.
return resp.reply, resp.pErr
}

Expand Down Expand Up @@ -1534,6 +1540,11 @@ func (ds *DistSender) divideAndSendBatchToRanges(
for _, responseCh := range responseChs {
resp := <-responseCh
if resp.pErr != nil {
// Re-map the error index within this partial batch back to its
// position in the encompassing batch.
if resp.pErr.Index != nil && resp.pErr.Index.Index != -1 && resp.positions != nil {
resp.pErr.Index.Index = int32(resp.positions[resp.pErr.Index.Index])
}
if pErr == nil {
pErr = resp.pErr
// Update the error's transaction with any new information from
Expand Down Expand Up @@ -1647,8 +1658,9 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// Sent the batch asynchronously.
} else {
resp := ds.sendPartialBatch(
ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), positions,
ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(),
)
resp.positions = positions
responseCh <- resp
if resp.pErr != nil {
return
Expand Down Expand Up @@ -1749,9 +1761,9 @@ func (ds *DistSender) sendPartialBatchAsync(
},
func(ctx context.Context) {
ds.metrics.AsyncSentCount.Inc(1)
responseCh <- ds.sendPartialBatch(
ctx, ba, rs, isReverse, withCommit, batchIdx, routing, positions,
)
resp := ds.sendPartialBatch(ctx, ba, rs, isReverse, withCommit, batchIdx, routing)
resp.positions = positions
responseCh <- resp
},
); err != nil {
ds.metrics.AsyncThrottledCount.Inc(1)
Expand Down Expand Up @@ -1789,11 +1801,7 @@ func slowRangeRPCReturnWarningStr(s *redact.StringBuilder, dur time.Duration, at
// request are limited to the range's key span. The rs argument corresponds to
// the span encompassing the key ranges of all requests in the truncated batch.
// It should be entirely contained within the range descriptor of the supplied
// routing token. The positions argument describes how the given batch request
// corresponds to the original, un-truncated one, and allows us to combine the
// response later via BatchResponse.Combine. (nil positions argument should be
// used when the original batch request is fully contained within a single
// range.)
// routing token.
//
// The send occurs in a retry loop to handle send failures. On failure to send
// to any replicas, we backoff and retry by refetching the range descriptor. If
Expand All @@ -1809,7 +1817,6 @@ func (ds *DistSender) sendPartialBatch(
withCommit bool,
batchIdx int,
routingTok rangecache.EvictionToken,
positions []int,
) response {
if batchIdx == 1 {
ds.metrics.PartialBatchCount.Inc(2) // account for first batch
Expand Down Expand Up @@ -1872,7 +1879,7 @@ func (ds *DistSender) sendPartialBatch(
if !intersection.Equal(rs) {
log.Eventf(ctx, "range shrunk; sub-dividing the request")
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
return response{reply: reply, positions: positions, pErr: pErr}
return response{reply: reply, pErr: pErr}
}
}

Expand Down Expand Up @@ -1930,19 +1937,13 @@ func (ds *DistSender) sendPartialBatch(

// If sending succeeded, return immediately.
if reply.Error == nil {
return response{reply: reply, positions: positions}
return response{reply: reply}
}

// Untangle the error from the received response.
pErr = reply.Error
reply.Error = nil // scrub the response error

// Re-map the error index within this partial batch back
// to its position in the encompassing batch.
if pErr.Index != nil && pErr.Index.Index != -1 && positions != nil {
pErr.Index.Index = int32(positions[pErr.Index.Index])
}

log.VErrEventf(ctx, 2, "reply error %s: %s", ba, pErr)

// Error handling: If the error indicates that our range
Expand Down Expand Up @@ -1979,7 +1980,7 @@ func (ds *DistSender) sendPartialBatch(
// with unknown mapping to our truncated reply).
log.VEventf(ctx, 1, "likely split; will resend. Got new descriptors: %s", tErr.Ranges)
reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx)
return response{reply: reply, positions: positions, pErr: pErr}
return response{reply: reply, pErr: pErr}
}
break
}
Expand Down
169 changes: 166 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1663,9 +1663,9 @@ func TestEvictCacheOnError(t *testing.T) {
},
}

rangeMismachErr := kvpb.NewRangeKeyMismatchError(
rangeMismatchErr := kvpb.NewRangeKeyMismatchError(
context.Background(), nil, nil, &lhs, nil /* lease */)
rangeMismachErr.AppendRangeInfo(context.Background(), roachpb.RangeInfo{Desc: rhs, Lease: roachpb.Lease{}})
rangeMismatchErr.AppendRangeInfo(context.Background(), roachpb.RangeInfo{Desc: rhs, Lease: roachpb.Lease{}})

testCases := []struct {
canceledCtx bool
Expand All @@ -1674,7 +1674,7 @@ func TestEvictCacheOnError(t *testing.T) {
shouldClearReplica bool
}{
{false, errors.New(errString), false, false}, // non-retryable replica error
{false, rangeMismachErr, false, false}, // RangeKeyMismatch replica error
{false, rangeMismatchErr, false, false}, // RangeKeyMismatch replica error
{false, &kvpb.RangeNotFoundError{}, false, false}, // RangeNotFound replica error
{false, nil, false, false}, // RPC error
{true, nil, false, false}, // canceled context
Expand Down Expand Up @@ -4552,6 +4552,169 @@ func TestDistSenderSlowLogMessage(t *testing.T) {
}
}

// TestErrorIndexOnRangeSplit verifies that in case divideAndSendBatchToRanges
// is called recursively due to a stale range descriptor and the error is
// encountered during that recursive call, the error index is set correctly to
// point to the request in the original batch. It is a regression test for
// #111481.
func TestErrorIndexOnRangeSplit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

keyA := roachpb.Key("a")
keyB := roachpb.Key("b")
keyC := roachpb.Key("c")
splitKey := keys.MustAddr(keyC)
newSplitKey := keys.MustAddr(keyB)

ctx := context.Background()
tr := tracing.NewTracer()
stopper := stop.NewStopper(stop.WithTracer(tr))
defer stopper.Stop(ctx)

clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
g := makeGossip(t, stopper, rpcContext)

// We start out with two ranges split at splitKey. This is needed so that
// we don't hit the fast-path in divideAndSendBatchToRanges where only a
// single range is touched.
repls := []roachpb.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
},
{
NodeID: 1,
StoreID: 1,
},
}
initDescs := []roachpb.RangeDescriptor{
{
RangeID: roachpb.RangeID(1),
Generation: 1,
StartKey: roachpb.RKeyMin,
EndKey: splitKey,
InternalReplicas: repls,
},
{
RangeID: roachpb.RangeID(2),
Generation: 1,
StartKey: splitKey,
EndKey: roachpb.RKeyMax,
InternalReplicas: repls,
},
}
// We'll send a BatchRequest touching the original two ranges, which is so
// that it is split into two parallel BatchRequests, then on the one
// touching the first range we'll inject a range key mismatch error to
// simulate a range split. Thus, after the injected split we'll have three
// ranges with split points at newSplitKey and splitKey.
splitDescs := []roachpb.RangeDescriptor{
{
RangeID: roachpb.RangeID(1),
Generation: 2,
StartKey: roachpb.RKeyMin,
EndKey: newSplitKey,
InternalReplicas: repls,
},
{
RangeID: roachpb.RangeID(3),
Generation: 2,
StartKey: newSplitKey,
EndKey: splitKey,
InternalReplicas: repls,
},
{
RangeID: roachpb.RangeID(2),
Generation: 2,
StartKey: splitKey,
EndKey: roachpb.RKeyMax,
InternalReplicas: repls,
},
}

initialRDB := mockRangeDescriptorDBForDescs(initDescs...)
splitRDB := mockRangeDescriptorDBForDescs(splitDescs...)

var rc *rangecache.RangeCache
var wg sync.WaitGroup
var once sync.Once
wg.Add(2)
waitThenSwitchToSplitDesc := func() {
// Wait for both partial requests to be sent.
wg.Done()
wg.Wait()
// Switch out the RangeDescriptorDB.
once.Do(func() { rc.TestingSetDB(splitRDB) })
}

transportFn := func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
rep := ba.CreateReply()
switch ba.ClientRangeInfo.DescriptorGeneration {
case 1:
waitThenSwitchToSplitDesc()
// We have two parallel batches: one with 2 requests and another
// with 1 request. Do nothing for the latter.
if len(ba.Requests) != 2 {
return rep, nil
}
// For the batch with 2 requests we inject an error to simulate a
// stale range cache.
rangeMismatchErr := kvpb.NewRangeKeyMismatchError(ctx, nil, nil, &splitDescs[0], nil /* lease */)
rangeMismatchErr.AppendRangeInfo(context.Background(), roachpb.RangeInfo{Desc: splitDescs[1], Lease: roachpb.Lease{}})
rep.Error = kvpb.NewError(rangeMismatchErr)
return rep, nil
case 2:
// We'll receive a few batches, all with 1 Get. Find the one
// targeting keyB and simulate it encountering an error (that could
// occur for InitPuts).
if len(ba.Requests) != 1 {
require.Fail(t, "unexpected number of requests in a batch")
}
if ba.Requests[0].GetGet().Key.Equal(keyB) {
rep.Error = kvpb.NewError(&kvpb.ConditionFailedError{})
// This is the zeroth request in a batch with 1 request.
rep.Error.SetErrorIndex(0)
}
return rep, nil
default:
require.Fail(t, "unexpected desc generation")
return nil, nil
}
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tr},
Clock: clock,
NodeDescs: g,
RPCContext: rpcContext,
RangeDescriptorDB: initialRDB,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(transportFn),
},
Settings: cluster.MakeTestingClusterSettings(),
}

ds := NewDistSender(cfg)
rc = ds.rangeCache

ba := &kvpb.BatchRequest{}
// Issue a BatchRequest such that it touches the original two ranges, and
// then the last request gets an error injected.
ba.Add(kvpb.NewGet(keyA), kvpb.NewGet(keyC), kvpb.NewGet(keyB))
// Inconsistent read because otherwise the batch will ask to be re-sent in a
// txn when split.
ba.ReadConsistency = kvpb.INCONSISTENT

_, pErr := ds.Send(ctx, ba)
require.NotNil(t, pErr)
require.NotNil(t, pErr.Index)
// This is the crux of the test - the error should have the position of
// Get(keyB) request from the original batch.
require.Equal(t, int32(2), pErr.Index.Index)
}

// Test the following scenario: the DistSender sends a request that results in a
// sendError, meaning that the descriptor is probably stale. The descriptor is
// then refreshed, and it turns out that the range had split in the meantime.
Expand Down