diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 701e82d716e2..04ee703c3349 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1129,7 +1129,11 @@ func (ds *DistSender) incrementBatchCounters(ba *kvpb.BatchRequest) { } type response struct { - reply *kvpb.BatchResponse + reply *kvpb.BatchResponse + // positions argument describes how the given batch request corresponds to + // the original, un-truncated one, and allows us to combine the response + // later via BatchResponse.Combine. (nil positions should be used when the + // original batch request is fully contained within a single range.) positions []int pErr *kvpb.Error } @@ -1449,8 +1453,10 @@ func (ds *DistSender) divideAndSendBatchToRanges( // Take the fast path if this batch fits within a single range. if !ri.NeedAnother(rs) { resp := ds.sendPartialBatch( - ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(), nil, /* positions */ + ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(), ) + // resp.positions remains nil since the original batch is fully + // contained within a single range. return resp.reply, resp.pErr } @@ -1534,6 +1540,11 @@ func (ds *DistSender) divideAndSendBatchToRanges( for _, responseCh := range responseChs { resp := <-responseCh if resp.pErr != nil { + // Re-map the error index within this partial batch back to its + // position in the encompassing batch. + if resp.pErr.Index != nil && resp.pErr.Index.Index != -1 && resp.positions != nil { + resp.pErr.Index.Index = int32(resp.positions[resp.pErr.Index.Index]) + } if pErr == nil { pErr = resp.pErr // Update the error's transaction with any new information from @@ -1647,8 +1658,9 @@ func (ds *DistSender) divideAndSendBatchToRanges( // Sent the batch asynchronously. } else { resp := ds.sendPartialBatch( - ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), positions, + ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), ) + resp.positions = positions responseCh <- resp if resp.pErr != nil { return @@ -1749,9 +1761,9 @@ func (ds *DistSender) sendPartialBatchAsync( }, func(ctx context.Context) { ds.metrics.AsyncSentCount.Inc(1) - responseCh <- ds.sendPartialBatch( - ctx, ba, rs, isReverse, withCommit, batchIdx, routing, positions, - ) + resp := ds.sendPartialBatch(ctx, ba, rs, isReverse, withCommit, batchIdx, routing) + resp.positions = positions + responseCh <- resp }, ); err != nil { ds.metrics.AsyncThrottledCount.Inc(1) @@ -1789,11 +1801,7 @@ func slowRangeRPCReturnWarningStr(s *redact.StringBuilder, dur time.Duration, at // request are limited to the range's key span. The rs argument corresponds to // the span encompassing the key ranges of all requests in the truncated batch. // It should be entirely contained within the range descriptor of the supplied -// routing token. The positions argument describes how the given batch request -// corresponds to the original, un-truncated one, and allows us to combine the -// response later via BatchResponse.Combine. (nil positions argument should be -// used when the original batch request is fully contained within a single -// range.) +// routing token. // // The send occurs in a retry loop to handle send failures. On failure to send // to any replicas, we backoff and retry by refetching the range descriptor. If @@ -1809,7 +1817,6 @@ func (ds *DistSender) sendPartialBatch( withCommit bool, batchIdx int, routingTok rangecache.EvictionToken, - positions []int, ) response { if batchIdx == 1 { ds.metrics.PartialBatchCount.Inc(2) // account for first batch @@ -1872,7 +1879,7 @@ func (ds *DistSender) sendPartialBatch( if !intersection.Equal(rs) { log.Eventf(ctx, "range shrunk; sub-dividing the request") reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx) - return response{reply: reply, positions: positions, pErr: pErr} + return response{reply: reply, pErr: pErr} } } @@ -1930,19 +1937,13 @@ func (ds *DistSender) sendPartialBatch( // If sending succeeded, return immediately. if reply.Error == nil { - return response{reply: reply, positions: positions} + return response{reply: reply} } // Untangle the error from the received response. pErr = reply.Error reply.Error = nil // scrub the response error - // Re-map the error index within this partial batch back - // to its position in the encompassing batch. - if pErr.Index != nil && pErr.Index.Index != -1 && positions != nil { - pErr.Index.Index = int32(positions[pErr.Index.Index]) - } - log.VErrEventf(ctx, 2, "reply error %s: %s", ba, pErr) // Error handling: If the error indicates that our range @@ -1979,7 +1980,7 @@ func (ds *DistSender) sendPartialBatch( // with unknown mapping to our truncated reply). log.VEventf(ctx, 1, "likely split; will resend. Got new descriptors: %s", tErr.Ranges) reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx) - return response{reply: reply, positions: positions, pErr: pErr} + return response{reply: reply, pErr: pErr} } break } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 5f8ad1fb258d..75832d300a73 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -1663,9 +1663,9 @@ func TestEvictCacheOnError(t *testing.T) { }, } - rangeMismachErr := kvpb.NewRangeKeyMismatchError( + rangeMismatchErr := kvpb.NewRangeKeyMismatchError( context.Background(), nil, nil, &lhs, nil /* lease */) - rangeMismachErr.AppendRangeInfo(context.Background(), roachpb.RangeInfo{Desc: rhs, Lease: roachpb.Lease{}}) + rangeMismatchErr.AppendRangeInfo(context.Background(), roachpb.RangeInfo{Desc: rhs, Lease: roachpb.Lease{}}) testCases := []struct { canceledCtx bool @@ -1674,7 +1674,7 @@ func TestEvictCacheOnError(t *testing.T) { shouldClearReplica bool }{ {false, errors.New(errString), false, false}, // non-retryable replica error - {false, rangeMismachErr, false, false}, // RangeKeyMismatch replica error + {false, rangeMismatchErr, false, false}, // RangeKeyMismatch replica error {false, &kvpb.RangeNotFoundError{}, false, false}, // RangeNotFound replica error {false, nil, false, false}, // RPC error {true, nil, false, false}, // canceled context @@ -4552,6 +4552,169 @@ func TestDistSenderSlowLogMessage(t *testing.T) { } } +// TestErrorIndexOnRangeSplit verifies that in case divideAndSendBatchToRanges +// is called recursively due to a stale range descriptor and the error is +// encountered during that recursive call, the error index is set correctly to +// point to the request in the original batch. It is a regression test for +// #111481. +func TestErrorIndexOnRangeSplit(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + keyA := roachpb.Key("a") + keyB := roachpb.Key("b") + keyC := roachpb.Key("c") + splitKey := keys.MustAddr(keyC) + newSplitKey := keys.MustAddr(keyB) + + ctx := context.Background() + tr := tracing.NewTracer() + stopper := stop.NewStopper(stop.WithTracer(tr)) + defer stopper.Stop(ctx) + + clock := hlc.NewClockForTesting(nil) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + // We start out with two ranges split at splitKey. This is needed so that + // we don't hit the fast-path in divideAndSendBatchToRanges where only a + // single range is touched. + repls := []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + }, + { + NodeID: 1, + StoreID: 1, + }, + } + initDescs := []roachpb.RangeDescriptor{ + { + RangeID: roachpb.RangeID(1), + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: splitKey, + InternalReplicas: repls, + }, + { + RangeID: roachpb.RangeID(2), + Generation: 1, + StartKey: splitKey, + EndKey: roachpb.RKeyMax, + InternalReplicas: repls, + }, + } + // We'll send a BatchRequest touching the original two ranges, which is so + // that it is split into two parallel BatchRequests, then on the one + // touching the first range we'll inject a range key mismatch error to + // simulate a range split. Thus, after the injected split we'll have three + // ranges with split points at newSplitKey and splitKey. + splitDescs := []roachpb.RangeDescriptor{ + { + RangeID: roachpb.RangeID(1), + Generation: 2, + StartKey: roachpb.RKeyMin, + EndKey: newSplitKey, + InternalReplicas: repls, + }, + { + RangeID: roachpb.RangeID(3), + Generation: 2, + StartKey: newSplitKey, + EndKey: splitKey, + InternalReplicas: repls, + }, + { + RangeID: roachpb.RangeID(2), + Generation: 2, + StartKey: splitKey, + EndKey: roachpb.RKeyMax, + InternalReplicas: repls, + }, + } + + initialRDB := mockRangeDescriptorDBForDescs(initDescs...) + splitRDB := mockRangeDescriptorDBForDescs(splitDescs...) + + var rc *rangecache.RangeCache + var wg sync.WaitGroup + var once sync.Once + wg.Add(2) + waitThenSwitchToSplitDesc := func() { + // Wait for both partial requests to be sent. + wg.Done() + wg.Wait() + // Switch out the RangeDescriptorDB. + once.Do(func() { rc.TestingSetDB(splitRDB) }) + } + + transportFn := func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { + rep := ba.CreateReply() + switch ba.ClientRangeInfo.DescriptorGeneration { + case 1: + waitThenSwitchToSplitDesc() + // We have two parallel batches: one with 2 requests and another + // with 1 request. Do nothing for the latter. + if len(ba.Requests) != 2 { + return rep, nil + } + // For the batch with 2 requests we inject an error to simulate a + // stale range cache. + rangeMismatchErr := kvpb.NewRangeKeyMismatchError(ctx, nil, nil, &splitDescs[0], nil /* lease */) + rangeMismatchErr.AppendRangeInfo(context.Background(), roachpb.RangeInfo{Desc: splitDescs[1], Lease: roachpb.Lease{}}) + rep.Error = kvpb.NewError(rangeMismatchErr) + return rep, nil + case 2: + // We'll receive a few batches, all with 1 Get. Find the one + // targeting keyB and simulate it encountering an error (that could + // occur for InitPuts). + if len(ba.Requests) != 1 { + require.Fail(t, "unexpected number of requests in a batch") + } + if ba.Requests[0].GetGet().Key.Equal(keyB) { + rep.Error = kvpb.NewError(&kvpb.ConditionFailedError{}) + // This is the zeroth request in a batch with 1 request. + rep.Error.SetErrorIndex(0) + } + return rep, nil + default: + require.Fail(t, "unexpected desc generation") + return nil, nil + } + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tr}, + Clock: clock, + NodeDescs: g, + RPCContext: rpcContext, + RangeDescriptorDB: initialRDB, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(transportFn), + }, + Settings: cluster.MakeTestingClusterSettings(), + } + + ds := NewDistSender(cfg) + rc = ds.rangeCache + + ba := &kvpb.BatchRequest{} + // Issue a BatchRequest such that it touches the original two ranges, and + // then the last request gets an error injected. + ba.Add(kvpb.NewGet(keyA), kvpb.NewGet(keyC), kvpb.NewGet(keyB)) + // Inconsistent read because otherwise the batch will ask to be re-sent in a + // txn when split. + ba.ReadConsistency = kvpb.INCONSISTENT + + _, pErr := ds.Send(ctx, ba) + require.NotNil(t, pErr) + require.NotNil(t, pErr.Index) + // This is the crux of the test - the error should have the position of + // Get(keyB) request from the original batch. + require.Equal(t, int32(2), pErr.Index.Index) +} + // Test the following scenario: the DistSender sends a request that results in a // sendError, meaning that the descriptor is probably stale. The descriptor is // then refreshed, and it turns out that the range had split in the meantime.