From 8b71fdd740be753f88d398e0a57122b254146853 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 11 Aug 2023 16:45:46 -0400 Subject: [PATCH] kv: intentionally handle paginated responses in txnPipeliner See https://github.com/cockroachdb/cockroach/issues/108539#issuecomment-1675312955. This commit adds intentional handling to the txnPipeliner for the case where a response is paginated and not all QueryIntent requests were evaluated. Previously, we handled this case, but we logged a warning and had a comment that said it was unexpected. The commit also adds a test for the case. Epic: None Release note: None --- .../kvcoord/txn_interceptor_pipeliner.go | 20 +++-- .../kvcoord/txn_interceptor_pipeliner_test.go | 90 +++++++++++++++++++ 2 files changed, 102 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index fcb35b0a7706..9c3aa5f71de1 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -701,11 +701,18 @@ func (tp *txnPipeliner) updateLockTrackingInner( resp := br.Responses[i].GetInner() if qiReq, ok := req.(*kvpb.QueryIntentRequest); ok { - // Remove any in-flight writes that were proven to exist. - // It shouldn't be possible for a QueryIntentRequest with - // the ErrorIfMissing option set to return without error - // and with FoundIntent=false, but we handle that case here - // because it happens a lot in tests. + // Remove any in-flight writes that were proven to exist. It should not be + // possible for a QueryIntentRequest with the ErrorIfMissing option set to + // return without error and with FoundIntent=false if the request was + // evaluated on the server. + // + // However, it is possible that the batch was split on a range boundary + // and hit a batch-wide key or byte limit before a portion was even sent + // by the DistSender. In such cases, an empty response will be returned + // for the requests that were not evaluated (see fillSkippedResponses). + // For these requests, we neither proved nor disproved the existence of + // their intent, so we ignore the response. + // // TODO(nvanbenschoten): we only need to check FoundIntent, but this field // was not set before v23.2, so for now, we check both fields. Remove this // in the future. @@ -714,9 +721,6 @@ func (tp *txnPipeliner) updateLockTrackingInner( tp.ifWrites.remove(qiReq.Key, qiReq.Txn.Sequence) // Move to lock footprint. tp.lockFootprint.insert(roachpb.Span{Key: qiReq.Key}) - } else { - log.Warningf(ctx, - "QueryIntent(ErrorIfMissing=true) found no intent, but did not error; resp=%+v", qiResp) } } else if kvpb.IsLocking(req) { // If the request intended to acquire locks, track its lock spans. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index 2241658d3e6f..55cad957943e 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -328,6 +328,96 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) { require.Equal(t, 0, tp.ifWrites.len()) } +// TestTxnPipelinerTrackInFlightWritesPaginatedResponse tests that txnPipeliner +// handles cases where a batch is paginated and not all in-flight writes that +// were queried are proven to exist. +func TestTxnPipelinerTrackInFlightWritesPaginatedResponse(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) + + txn := makeTxnProto() + keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c") + + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + putArgs1 := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}} + putArgs1.Sequence = 1 + ba.Add(&putArgs1) + putArgs2 := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyB}} + putArgs2.Sequence = 2 + ba.Add(&putArgs2) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 2) + require.True(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.PutRequest{}, ba.Requests[1].GetInner()) + + br := ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + + br, pErr := tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, 2, tp.ifWrites.len()) + + w := tp.ifWrites.t.Min().(*inFlightWrite) + require.Equal(t, putArgs1.Key, w.Key) + require.Equal(t, putArgs1.Sequence, w.Sequence) + w = tp.ifWrites.t.Max().(*inFlightWrite) + require.Equal(t, putArgs2.Key, w.Key) + require.Equal(t, putArgs2.Sequence, w.Sequence) + + // Scan both keys with a key limit. + ba.Requests = nil + ba.MaxSpanRequestKeys = 1 + ba.Add(&kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC}}) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 3) + require.False(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[1].GetInner()) + require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[2].GetInner()) + + qiReq1 := ba.Requests[0].GetQueryIntent() + require.Equal(t, keyA, qiReq1.Key) + require.Equal(t, enginepb.TxnSeq(1), qiReq1.Txn.Sequence) + qiReq2 := ba.Requests[1].GetQueryIntent() + require.Equal(t, keyB, qiReq2.Key) + require.Equal(t, enginepb.TxnSeq(2), qiReq2.Txn.Sequence) + + // Assume a range split at key "b". DistSender will split this batch into + // two partial batches: + // part1: {QueryIntent(key: "a"), Scan(key: "a", endKey: "b")} + // part2: {QueryIntent(key: "b"), Scan(key: "b", endKey: "c")} + // + // If part1 hits the batch-wide key limit. DistSender will construct an + // empty response for QueryIntent(key: "b") in fillSkippedResponses. + br = ba.CreateReply() + br.Txn = ba.Txn + br.Responses[0].GetQueryIntent().FoundIntent = true + // br.Responses[1].GetQueryIntent() intentionally left empty. + br.Responses[2].GetScan().ResumeReason = kvpb.RESUME_KEY_LIMIT + br.Responses[2].GetScan().ResumeSpan = &roachpb.Span{Key: keyB, EndKey: keyC} + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.NotNil(t, br) + require.Len(t, br.Responses, 1) // QueryIntent responses stripped + require.Nil(t, pErr) + require.Equal(t, 1, tp.ifWrites.len()) + + w = tp.ifWrites.t.Min().(*inFlightWrite) + require.Equal(t, putArgs2.Key, w.Key) + require.Equal(t, putArgs2.Sequence, w.Sequence) +} + // TestTxnPipelinerReads tests that txnPipeliner will never instruct batches // with reads in them to use async consensus. It also tests that these reading // batches will still chain on to in-flight writes, if necessary.