Skip to content

Commit

Permalink
kv: intentionally handle paginated responses in txnPipeliner
Browse files Browse the repository at this point in the history
See cockroachdb#108539 (comment).

This commit adds intentional handling to the txnPipeliner for the case
where a response is paginated and not all QueryIntent requests were
evaluated. Previously, we handled this case, but we logged a warning and
had a comment that said it was unexpected.

The commit also adds a test for the case.

Epic: None
Release note: None
  • Loading branch information
nvanbenschoten committed Aug 11, 2023
1 parent a629092 commit 8b71fdd
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 8 deletions.
20 changes: 12 additions & 8 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,11 +701,18 @@ func (tp *txnPipeliner) updateLockTrackingInner(
resp := br.Responses[i].GetInner()

if qiReq, ok := req.(*kvpb.QueryIntentRequest); ok {
// Remove any in-flight writes that were proven to exist.
// It shouldn't be possible for a QueryIntentRequest with
// the ErrorIfMissing option set to return without error
// and with FoundIntent=false, but we handle that case here
// because it happens a lot in tests.
// Remove any in-flight writes that were proven to exist. It should not be
// possible for a QueryIntentRequest with the ErrorIfMissing option set to
// return without error and with FoundIntent=false if the request was
// evaluated on the server.
//
// However, it is possible that the batch was split on a range boundary
// and hit a batch-wide key or byte limit before a portion was even sent
// by the DistSender. In such cases, an empty response will be returned
// for the requests that were not evaluated (see fillSkippedResponses).
// For these requests, we neither proved nor disproved the existence of
// their intent, so we ignore the response.
//
// TODO(nvanbenschoten): we only need to check FoundIntent, but this field
// was not set before v23.2, so for now, we check both fields. Remove this
// in the future.
Expand All @@ -714,9 +721,6 @@ func (tp *txnPipeliner) updateLockTrackingInner(
tp.ifWrites.remove(qiReq.Key, qiReq.Txn.Sequence)
// Move to lock footprint.
tp.lockFootprint.insert(roachpb.Span{Key: qiReq.Key})
} else {
log.Warningf(ctx,
"QueryIntent(ErrorIfMissing=true) found no intent, but did not error; resp=%+v", qiResp)
}
} else if kvpb.IsLocking(req) {
// If the request intended to acquire locks, track its lock spans.
Expand Down
90 changes: 90 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,96 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) {
require.Equal(t, 0, tp.ifWrites.len())
}

// TestTxnPipelinerTrackInFlightWritesPaginatedResponse tests that txnPipeliner
// handles cases where a batch is paginated and not all in-flight writes that
// were queried are proven to exist.
func TestTxnPipelinerTrackInFlightWritesPaginatedResponse(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
tp, mockSender := makeMockTxnPipeliner(nil /* iter */)

txn := makeTxnProto()
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")

ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
putArgs1 := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}}
putArgs1.Sequence = 1
ba.Add(&putArgs1)
putArgs2 := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyB}}
putArgs2.Sequence = 2
ba.Add(&putArgs2)

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 2)
require.True(t, ba.AsyncConsensus)
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[1].GetInner())

br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr := tp.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, 2, tp.ifWrites.len())

w := tp.ifWrites.t.Min().(*inFlightWrite)
require.Equal(t, putArgs1.Key, w.Key)
require.Equal(t, putArgs1.Sequence, w.Sequence)
w = tp.ifWrites.t.Max().(*inFlightWrite)
require.Equal(t, putArgs2.Key, w.Key)
require.Equal(t, putArgs2.Sequence, w.Sequence)

// Scan both keys with a key limit.
ba.Requests = nil
ba.MaxSpanRequestKeys = 1
ba.Add(&kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC}})

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 3)
require.False(t, ba.AsyncConsensus)
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[2].GetInner())

qiReq1 := ba.Requests[0].GetQueryIntent()
require.Equal(t, keyA, qiReq1.Key)
require.Equal(t, enginepb.TxnSeq(1), qiReq1.Txn.Sequence)
qiReq2 := ba.Requests[1].GetQueryIntent()
require.Equal(t, keyB, qiReq2.Key)
require.Equal(t, enginepb.TxnSeq(2), qiReq2.Txn.Sequence)

// Assume a range split at key "b". DistSender will split this batch into
// two partial batches:
// part1: {QueryIntent(key: "a"), Scan(key: "a", endKey: "b")}
// part2: {QueryIntent(key: "b"), Scan(key: "b", endKey: "c")}
//
// If part1 hits the batch-wide key limit. DistSender will construct an
// empty response for QueryIntent(key: "b") in fillSkippedResponses.
br = ba.CreateReply()
br.Txn = ba.Txn
br.Responses[0].GetQueryIntent().FoundIntent = true
// br.Responses[1].GetQueryIntent() intentionally left empty.
br.Responses[2].GetScan().ResumeReason = kvpb.RESUME_KEY_LIMIT
br.Responses[2].GetScan().ResumeSpan = &roachpb.Span{Key: keyB, EndKey: keyC}
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Len(t, br.Responses, 1) // QueryIntent responses stripped
require.Nil(t, pErr)
require.Equal(t, 1, tp.ifWrites.len())

w = tp.ifWrites.t.Min().(*inFlightWrite)
require.Equal(t, putArgs2.Key, w.Key)
require.Equal(t, putArgs2.Sequence, w.Sequence)
}

// TestTxnPipelinerReads tests that txnPipeliner will never instruct batches
// with reads in them to use async consensus. It also tests that these reading
// batches will still chain on to in-flight writes, if necessary.
Expand Down

0 comments on commit 8b71fdd

Please sign in to comment.