From 85591e15c0437cfc0fbaa84bc8291e2a1e4a5223 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 31 Oct 2023 01:36:43 +0000 Subject: [PATCH] kvcoord: fix error index in case of range split This commit fixes the DistSender to set the correct error index in case an error is encountered after `divideAndSendBatchToRanges` was called recursively due to a stale range cache. In this scenario, previously we would have the error index set as if the original batch was the one right before the recursive call, which is incorrect in case that batch itself was split (e.g. because the original batch touched multiple ranges, and each sub-batch was executed in parallel). We already had the error index mapping code in place for the main code path, but we forgot to do the error index re-mapping after two recursive calls, which is now fixed. This commit additionally pulls out the logic to set `response.positions` out of `sendPartialBatch` since there are fewer places to do that one level up. This bug has been present for a very long time, but it seems relatively minor, so I decided to not include the release note. Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 43 ++--- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 169 +++++++++++++++++++- 2 files changed, 188 insertions(+), 24 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 701e82d716e2..04ee703c3349 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1129,7 +1129,11 @@ func (ds *DistSender) incrementBatchCounters(ba *kvpb.BatchRequest) { } type response struct { - reply *kvpb.BatchResponse + reply *kvpb.BatchResponse + // positions argument describes how the given batch request corresponds to + // the original, un-truncated one, and allows us to combine the response + // later via BatchResponse.Combine. (nil positions should be used when the + // original batch request is fully contained within a single range.) positions []int pErr *kvpb.Error } @@ -1449,8 +1453,10 @@ func (ds *DistSender) divideAndSendBatchToRanges( // Take the fast path if this batch fits within a single range. if !ri.NeedAnother(rs) { resp := ds.sendPartialBatch( - ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(), nil, /* positions */ + ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(), ) + // resp.positions remains nil since the original batch is fully + // contained within a single range. return resp.reply, resp.pErr } @@ -1534,6 +1540,11 @@ func (ds *DistSender) divideAndSendBatchToRanges( for _, responseCh := range responseChs { resp := <-responseCh if resp.pErr != nil { + // Re-map the error index within this partial batch back to its + // position in the encompassing batch. + if resp.pErr.Index != nil && resp.pErr.Index.Index != -1 && resp.positions != nil { + resp.pErr.Index.Index = int32(resp.positions[resp.pErr.Index.Index]) + } if pErr == nil { pErr = resp.pErr // Update the error's transaction with any new information from @@ -1647,8 +1658,9 @@ func (ds *DistSender) divideAndSendBatchToRanges( // Sent the batch asynchronously. } else { resp := ds.sendPartialBatch( - ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), positions, + ctx, curRangeBatch, curRangeRS, isReverse, withCommit, batchIdx, ri.Token(), ) + resp.positions = positions responseCh <- resp if resp.pErr != nil { return @@ -1749,9 +1761,9 @@ func (ds *DistSender) sendPartialBatchAsync( }, func(ctx context.Context) { ds.metrics.AsyncSentCount.Inc(1) - responseCh <- ds.sendPartialBatch( - ctx, ba, rs, isReverse, withCommit, batchIdx, routing, positions, - ) + resp := ds.sendPartialBatch(ctx, ba, rs, isReverse, withCommit, batchIdx, routing) + resp.positions = positions + responseCh <- resp }, ); err != nil { ds.metrics.AsyncThrottledCount.Inc(1) @@ -1789,11 +1801,7 @@ func slowRangeRPCReturnWarningStr(s *redact.StringBuilder, dur time.Duration, at // request are limited to the range's key span. The rs argument corresponds to // the span encompassing the key ranges of all requests in the truncated batch. // It should be entirely contained within the range descriptor of the supplied -// routing token. The positions argument describes how the given batch request -// corresponds to the original, un-truncated one, and allows us to combine the -// response later via BatchResponse.Combine. (nil positions argument should be -// used when the original batch request is fully contained within a single -// range.) +// routing token. // // The send occurs in a retry loop to handle send failures. On failure to send // to any replicas, we backoff and retry by refetching the range descriptor. If @@ -1809,7 +1817,6 @@ func (ds *DistSender) sendPartialBatch( withCommit bool, batchIdx int, routingTok rangecache.EvictionToken, - positions []int, ) response { if batchIdx == 1 { ds.metrics.PartialBatchCount.Inc(2) // account for first batch @@ -1872,7 +1879,7 @@ func (ds *DistSender) sendPartialBatch( if !intersection.Equal(rs) { log.Eventf(ctx, "range shrunk; sub-dividing the request") reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx) - return response{reply: reply, positions: positions, pErr: pErr} + return response{reply: reply, pErr: pErr} } } @@ -1930,19 +1937,13 @@ func (ds *DistSender) sendPartialBatch( // If sending succeeded, return immediately. if reply.Error == nil { - return response{reply: reply, positions: positions} + return response{reply: reply} } // Untangle the error from the received response. pErr = reply.Error reply.Error = nil // scrub the response error - // Re-map the error index within this partial batch back - // to its position in the encompassing batch. - if pErr.Index != nil && pErr.Index.Index != -1 && positions != nil { - pErr.Index.Index = int32(positions[pErr.Index.Index]) - } - log.VErrEventf(ctx, 2, "reply error %s: %s", ba, pErr) // Error handling: If the error indicates that our range @@ -1979,7 +1980,7 @@ func (ds *DistSender) sendPartialBatch( // with unknown mapping to our truncated reply). log.VEventf(ctx, 1, "likely split; will resend. Got new descriptors: %s", tErr.Ranges) reply, pErr = ds.divideAndSendBatchToRanges(ctx, ba, rs, isReverse, withCommit, batchIdx) - return response{reply: reply, positions: positions, pErr: pErr} + return response{reply: reply, pErr: pErr} } break } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 5f8ad1fb258d..75832d300a73 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -1663,9 +1663,9 @@ func TestEvictCacheOnError(t *testing.T) { }, } - rangeMismachErr := kvpb.NewRangeKeyMismatchError( + rangeMismatchErr := kvpb.NewRangeKeyMismatchError( context.Background(), nil, nil, &lhs, nil /* lease */) - rangeMismachErr.AppendRangeInfo(context.Background(), roachpb.RangeInfo{Desc: rhs, Lease: roachpb.Lease{}}) + rangeMismatchErr.AppendRangeInfo(context.Background(), roachpb.RangeInfo{Desc: rhs, Lease: roachpb.Lease{}}) testCases := []struct { canceledCtx bool @@ -1674,7 +1674,7 @@ func TestEvictCacheOnError(t *testing.T) { shouldClearReplica bool }{ {false, errors.New(errString), false, false}, // non-retryable replica error - {false, rangeMismachErr, false, false}, // RangeKeyMismatch replica error + {false, rangeMismatchErr, false, false}, // RangeKeyMismatch replica error {false, &kvpb.RangeNotFoundError{}, false, false}, // RangeNotFound replica error {false, nil, false, false}, // RPC error {true, nil, false, false}, // canceled context @@ -4552,6 +4552,169 @@ func TestDistSenderSlowLogMessage(t *testing.T) { } } +// TestErrorIndexOnRangeSplit verifies that in case divideAndSendBatchToRanges +// is called recursively due to a stale range descriptor and the error is +// encountered during that recursive call, the error index is set correctly to +// point to the request in the original batch. It is a regression test for +// #111481. +func TestErrorIndexOnRangeSplit(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + keyA := roachpb.Key("a") + keyB := roachpb.Key("b") + keyC := roachpb.Key("c") + splitKey := keys.MustAddr(keyC) + newSplitKey := keys.MustAddr(keyB) + + ctx := context.Background() + tr := tracing.NewTracer() + stopper := stop.NewStopper(stop.WithTracer(tr)) + defer stopper.Stop(ctx) + + clock := hlc.NewClockForTesting(nil) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + // We start out with two ranges split at splitKey. This is needed so that + // we don't hit the fast-path in divideAndSendBatchToRanges where only a + // single range is touched. + repls := []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + }, + { + NodeID: 1, + StoreID: 1, + }, + } + initDescs := []roachpb.RangeDescriptor{ + { + RangeID: roachpb.RangeID(1), + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: splitKey, + InternalReplicas: repls, + }, + { + RangeID: roachpb.RangeID(2), + Generation: 1, + StartKey: splitKey, + EndKey: roachpb.RKeyMax, + InternalReplicas: repls, + }, + } + // We'll send a BatchRequest touching the original two ranges, which is so + // that it is split into two parallel BatchRequests, then on the one + // touching the first range we'll inject a range key mismatch error to + // simulate a range split. Thus, after the injected split we'll have three + // ranges with split points at newSplitKey and splitKey. + splitDescs := []roachpb.RangeDescriptor{ + { + RangeID: roachpb.RangeID(1), + Generation: 2, + StartKey: roachpb.RKeyMin, + EndKey: newSplitKey, + InternalReplicas: repls, + }, + { + RangeID: roachpb.RangeID(3), + Generation: 2, + StartKey: newSplitKey, + EndKey: splitKey, + InternalReplicas: repls, + }, + { + RangeID: roachpb.RangeID(2), + Generation: 2, + StartKey: splitKey, + EndKey: roachpb.RKeyMax, + InternalReplicas: repls, + }, + } + + initialRDB := mockRangeDescriptorDBForDescs(initDescs...) + splitRDB := mockRangeDescriptorDBForDescs(splitDescs...) + + var rc *rangecache.RangeCache + var wg sync.WaitGroup + var once sync.Once + wg.Add(2) + waitThenSwitchToSplitDesc := func() { + // Wait for both partial requests to be sent. + wg.Done() + wg.Wait() + // Switch out the RangeDescriptorDB. + once.Do(func() { rc.TestingSetDB(splitRDB) }) + } + + transportFn := func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { + rep := ba.CreateReply() + switch ba.ClientRangeInfo.DescriptorGeneration { + case 1: + waitThenSwitchToSplitDesc() + // We have two parallel batches: one with 2 requests and another + // with 1 request. Do nothing for the latter. + if len(ba.Requests) != 2 { + return rep, nil + } + // For the batch with 2 requests we inject an error to simulate a + // stale range cache. + rangeMismatchErr := kvpb.NewRangeKeyMismatchError(ctx, nil, nil, &splitDescs[0], nil /* lease */) + rangeMismatchErr.AppendRangeInfo(context.Background(), roachpb.RangeInfo{Desc: splitDescs[1], Lease: roachpb.Lease{}}) + rep.Error = kvpb.NewError(rangeMismatchErr) + return rep, nil + case 2: + // We'll receive a few batches, all with 1 Get. Find the one + // targeting keyB and simulate it encountering an error (that could + // occur for InitPuts). + if len(ba.Requests) != 1 { + require.Fail(t, "unexpected number of requests in a batch") + } + if ba.Requests[0].GetGet().Key.Equal(keyB) { + rep.Error = kvpb.NewError(&kvpb.ConditionFailedError{}) + // This is the zeroth request in a batch with 1 request. + rep.Error.SetErrorIndex(0) + } + return rep, nil + default: + require.Fail(t, "unexpected desc generation") + return nil, nil + } + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tr}, + Clock: clock, + NodeDescs: g, + RPCContext: rpcContext, + RangeDescriptorDB: initialRDB, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(transportFn), + }, + Settings: cluster.MakeTestingClusterSettings(), + } + + ds := NewDistSender(cfg) + rc = ds.rangeCache + + ba := &kvpb.BatchRequest{} + // Issue a BatchRequest such that it touches the original two ranges, and + // then the last request gets an error injected. + ba.Add(kvpb.NewGet(keyA), kvpb.NewGet(keyC), kvpb.NewGet(keyB)) + // Inconsistent read because otherwise the batch will ask to be re-sent in a + // txn when split. + ba.ReadConsistency = kvpb.INCONSISTENT + + _, pErr := ds.Send(ctx, ba) + require.NotNil(t, pErr) + require.NotNil(t, pErr.Index) + // This is the crux of the test - the error should have the position of + // Get(keyB) request from the original batch. + require.Equal(t, int32(2), pErr.Index.Index) +} + // Test the following scenario: the DistSender sends a request that results in a // sendError, meaning that the descriptor is probably stale. The descriptor is // then refreshed, and it turns out that the range had split in the meantime.