Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: intentionally handle paginated responses in txnPipeliner #108639

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,11 +701,18 @@ func (tp *txnPipeliner) updateLockTrackingInner(
resp := br.Responses[i].GetInner()

if qiReq, ok := req.(*kvpb.QueryIntentRequest); ok {
// Remove any in-flight writes that were proven to exist.
// It shouldn't be possible for a QueryIntentRequest with
// the ErrorIfMissing option set to return without error
// and with FoundIntent=false, but we handle that case here
// because it happens a lot in tests.
// Remove any in-flight writes that were proven to exist. It should not be
// possible for a QueryIntentRequest with the ErrorIfMissing option set to
// return without error and with FoundIntent=false if the request was
// evaluated on the server.
//
// However, it is possible that the batch was split on a range boundary
// and hit a batch-wide key or byte limit before a portion was even sent
// by the DistSender. In such cases, an empty response will be returned
// for the requests that were not evaluated (see fillSkippedResponses).
// For these requests, we neither proved nor disproved the existence of
// their intent, so we ignore the response.
//
// TODO(nvanbenschoten): we only need to check FoundIntent, but this field
// was not set before v23.2, so for now, we check both fields. Remove this
// in the future.
Expand All @@ -714,9 +721,6 @@ func (tp *txnPipeliner) updateLockTrackingInner(
tp.ifWrites.remove(qiReq.Key, qiReq.Txn.Sequence)
// Move to lock footprint.
tp.lockFootprint.insert(roachpb.Span{Key: qiReq.Key})
} else {
log.Warningf(ctx,
"QueryIntent(ErrorIfMissing=true) found no intent, but did not error; resp=%+v", qiResp)
}
} else if kvpb.IsLocking(req) {
// If the request intended to acquire locks, track its lock spans.
Expand Down
90 changes: 90 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,96 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) {
require.Equal(t, 0, tp.ifWrites.len())
}

// TestTxnPipelinerTrackInFlightWritesPaginatedResponse tests that txnPipeliner
// handles cases where a batch is paginated and not all in-flight writes that
// were queried are proven to exist.
func TestTxnPipelinerTrackInFlightWritesPaginatedResponse(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
tp, mockSender := makeMockTxnPipeliner(nil /* iter */)

txn := makeTxnProto()
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")

ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
putArgs1 := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}}
putArgs1.Sequence = 1
ba.Add(&putArgs1)
putArgs2 := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyB}}
putArgs2.Sequence = 2
ba.Add(&putArgs2)

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 2)
require.True(t, ba.AsyncConsensus)
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[1].GetInner())

br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr := tp.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, 2, tp.ifWrites.len())

w := tp.ifWrites.t.Min().(*inFlightWrite)
require.Equal(t, putArgs1.Key, w.Key)
require.Equal(t, putArgs1.Sequence, w.Sequence)
w = tp.ifWrites.t.Max().(*inFlightWrite)
require.Equal(t, putArgs2.Key, w.Key)
require.Equal(t, putArgs2.Sequence, w.Sequence)

// Scan both keys with a key limit.
ba.Requests = nil
ba.MaxSpanRequestKeys = 1
ba.Add(&kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC}})

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 3)
require.False(t, ba.AsyncConsensus)
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[2].GetInner())

qiReq1 := ba.Requests[0].GetQueryIntent()
require.Equal(t, keyA, qiReq1.Key)
require.Equal(t, enginepb.TxnSeq(1), qiReq1.Txn.Sequence)
qiReq2 := ba.Requests[1].GetQueryIntent()
require.Equal(t, keyB, qiReq2.Key)
require.Equal(t, enginepb.TxnSeq(2), qiReq2.Txn.Sequence)

// Assume a range split at key "b". DistSender will split this batch into
// two partial batches:
// part1: {QueryIntent(key: "a"), Scan(key: "a", endKey: "b")}
// part2: {QueryIntent(key: "b"), Scan(key: "b", endKey: "c")}
//
// If part1 hits the batch-wide key limit. DistSender will construct an
// empty response for QueryIntent(key: "b") in fillSkippedResponses.
br = ba.CreateReply()
br.Txn = ba.Txn
br.Responses[0].GetQueryIntent().FoundIntent = true
// br.Responses[1].GetQueryIntent() intentionally left empty.
br.Responses[2].GetScan().ResumeReason = kvpb.RESUME_KEY_LIMIT
br.Responses[2].GetScan().ResumeSpan = &roachpb.Span{Key: keyB, EndKey: keyC}
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Len(t, br.Responses, 1) // QueryIntent responses stripped
require.Nil(t, pErr)
require.Equal(t, 1, tp.ifWrites.len())

w = tp.ifWrites.t.Min().(*inFlightWrite)
require.Equal(t, putArgs2.Key, w.Key)
require.Equal(t, putArgs2.Sequence, w.Sequence)
}

// TestTxnPipelinerReads tests that txnPipeliner will never instruct batches
// with reads in them to use async consensus. It also tests that these reading
// batches will still chain on to in-flight writes, if necessary.
Expand Down