diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index d277dea1b07b..a07e71866234 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -15,6 +15,7 @@ import ( "math" "sort" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -40,6 +41,21 @@ var PipelinedWritesEnabled = settings.RegisterBoolSetting( true, settings.WithName("kv.transaction.write_pipelining.enabled"), ) + +var pipelinedRangedWritesEnabled = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "kv.transaction.write_pipelining.ranged_writes.enabled", + "if enabled, transactional ranged writes are pipelined through Raft consensus", + true, +) + +var pipelinedLockingReadsEnabled = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "kv.transaction.write_pipelining.locking_reads.enabled", + "if enabled, transactional locking reads are pipelined through Raft consensus", + true, +) + var pipelinedWritesMaxBatchSize = settings.RegisterIntSetting( settings.ApplicationLevel, "kv.transaction.write_pipelining_max_batch_size", @@ -416,7 +432,7 @@ func (tp *txnPipeliner) attachLocksToEndTxn( // and forgo a parallel commit, but let's not break that abstraction // boundary here. if kvpb.IsIntentWrite(req) && !kvpb.IsRange(req) { - w := roachpb.SequencedWrite{Key: h.Key, Sequence: h.Sequence} + w := roachpb.SequencedWrite{Key: h.Key, Sequence: h.Sequence, Strength: lock.Intent} et.InFlightWrites = append(et.InFlightWrites, w) } else { et.LockSpans = append(et.LockSpans, h.Span()) @@ -433,7 +449,7 @@ func (tp *txnPipeliner) attachLocksToEndTxn( log.Infof(ctx, "intent: [%s,%s)", intent.Key, intent.EndKey) } for _, write := range et.InFlightWrites { - log.Infof(ctx, "in-flight: %d:%s", write.Sequence, write.Key) + log.Infof(ctx, "in-flight: %d:%s (%s)", write.Sequence, write.Key, write.Strength) } } return ba, nil @@ -500,6 +516,21 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ctx context.Context, ba *kvpb.Batch return false } + if kvpb.IsRange(req) { + if !pipelinedRangedWritesEnabled.Get(&tp.st.SV) { + return false + } + } + + if !kvpb.IsIntentWrite(req) { + if !pipelinedLockingReadsEnabled.Get(&tp.st.SV) { + return false + } + if !tp.st.Version.IsActive(ctx, clusterversion.V24_1_ReplicatedLockPipelining) { + return false + } + } + // Inhibit async consensus if the batch would push us over the maximum // tracking memory budget. If we allowed async consensus on this batch, its // writes would need to be tracked precisely. By inhibiting async consensus, @@ -574,7 +605,10 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba *kvpb.BatchRequest) *kvpb.Batch RequestHeader: kvpb.RequestHeader{ Key: w.Key, }, - Txn: meta, + Txn: meta, + Strength: w.Strength, + // TODO: test, maybe extend TestTxnPipelinerSavepoints + IgnoredSeqNums: ba.Txn.IgnoredSeqNums, ErrorIfMissing: true, }) @@ -758,13 +792,17 @@ func (tp *txnPipeliner) updateLockTrackingInner( // in the future. qiResp := resp.(*kvpb.QueryIntentResponse) if qiResp.FoundIntent || qiResp.FoundUnpushedIntent { - tp.ifWrites.remove(qiReq.Key, qiReq.Txn.Sequence) + tp.ifWrites.remove(qiReq.Key, qiReq.Txn.Sequence, qiReq.Strength) // Move to lock footprint. tp.lockFootprint.insert(roachpb.Span{Key: qiReq.Key}) } } else if kvpb.IsLocking(req) { // If the request intended to acquire locks, track its lock spans. seq := req.Header().Sequence + str := lock.Intent + if readOnlyReq, ok := req.(kvpb.LockingReadRequest); ok { + str, _ = readOnlyReq.KeyLocking() + } trackLocks := func(span roachpb.Span, _ lock.Durability) { if ba.AsyncConsensus { // Record any writes that were performed asynchronously. We'll @@ -772,7 +810,7 @@ func (tp *txnPipeliner) updateLockTrackingInner( if span.EndKey != nil { log.Fatalf(ctx, "unexpected multi-key intent pipelined") } - tp.ifWrites.insert(span.Key, seq) + tp.ifWrites.insert(span.Key, seq, str) } else { // If the lock acquisitions weren't performed asynchronously // then add them directly to our lock footprint. @@ -853,7 +891,7 @@ func (tp *txnPipeliner) populateLeafInputState(tis *roachpb.LeafTxnInputState) { func (tp *txnPipeliner) initializeLeaf(tis *roachpb.LeafTxnInputState) { // Copy all in-flight writes into the inFlightWrite tree. for _, w := range tis.InFlightWrites { - tp.ifWrites.insert(w.Key, w.Sequence) + tp.ifWrites.insert(w.Key, w.Sequence, w.Strength) } } @@ -904,7 +942,7 @@ func (tp *txnPipeliner) rollbackToSavepointLocked(ctx context.Context, s savepoi // been verified in the meantime) by removing all the extra ones. if needCollecting { for _, ifw := range writesToDelete { - tp.ifWrites.remove(ifw.Key, ifw.Sequence) + tp.ifWrites.remove(ifw.Key, ifw.Sequence, ifw.Strength) } } else { tp.ifWrites.clear(true /* reuse */) @@ -926,7 +964,7 @@ func (tp *txnPipeliner) hasAcquiredLocks() bool { // inFlightWrites represent a commitment to proving (via QueryIntent) that // a point write succeeded in replicating an intent with a specific sequence -// number. +// number and strength. type inFlightWrite struct { roachpb.SequencedWrite // chainIndex is used to avoid chaining on to the same in-flight write @@ -936,15 +974,17 @@ type inFlightWrite struct { } // makeInFlightWrite constructs an inFlightWrite. -func makeInFlightWrite(key roachpb.Key, seq enginepb.TxnSeq) inFlightWrite { - return inFlightWrite{SequencedWrite: roachpb.SequencedWrite{Key: key, Sequence: seq}} +func makeInFlightWrite(key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength) inFlightWrite { + return inFlightWrite{SequencedWrite: roachpb.SequencedWrite{ + Key: key, Sequence: seq, Strength: str, + }} } // Less implements the btree.Item interface. // -// inFlightWrites are ordered by Key and then by Sequence. Two inFlightWrites -// with the same Key but different Sequences are not considered equal and are -// maintained separately in the inFlightWritesSet. +// inFlightWrites are ordered by Key, then by Sequence, then by Strength. Two +// inFlightWrites with the same Key but different Sequences and/or Strengths are +// not considered equal and are maintained separately in the inFlightWritesSet. func (a *inFlightWrite) Less(bItem btree.Item) bool { b := bItem.(*inFlightWrite) kCmp := a.Key.Compare(b.Key) @@ -956,6 +996,10 @@ func (a *inFlightWrite) Less(bItem btree.Item) bool { // Different Sequence. return a.Sequence < b.Sequence } + if a.Strength != b.Strength { + // Different Strength. + return a.Strength < b.Strength + } // Equal. return false } @@ -975,17 +1019,17 @@ type inFlightWriteSet struct { // insert attempts to insert an in-flight write that has not been proven to have // succeeded into the in-flight write set. -func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq) { +func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength) { if s.t == nil { // Lazily initialize btree. s.t = btree.New(txnPipelinerBtreeDegree) } - w := s.alloc.alloc(key, seq) + w := s.alloc.alloc(key, seq, str) delItem := s.t.ReplaceOrInsert(w) if delItem != nil { // An in-flight write with the same key and sequence already existed in the - // set. This is unexpected, but we handle it. + // set. We replaced it with an identical in-flight write. *delItem.(*inFlightWrite) = inFlightWrite{} // for GC } else { s.bytes += keySize(key) @@ -994,14 +1038,14 @@ func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq) { // remove attempts to remove an in-flight write from the in-flight write set. // The method will be a no-op if the write was already proved. -func (s *inFlightWriteSet) remove(key roachpb.Key, seq enginepb.TxnSeq) { +func (s *inFlightWriteSet) remove(key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength) { if s.len() == 0 { // Set is empty. return } // Delete the write from the in-flight writes set. - s.tmp1 = makeInFlightWrite(key, seq) + s.tmp1 = makeInFlightWrite(key, seq, str) delItem := s.t.Delete(&s.tmp1) if delItem == nil { // The write was already proven or the txn epoch was incremented. @@ -1037,13 +1081,13 @@ func (s *inFlightWriteSet) ascendRange(start, end roachpb.Key, f func(w *inFligh // Set is empty. return } - s.tmp1 = makeInFlightWrite(start, 0) + s.tmp1 = makeInFlightWrite(start, 0, 0) if end == nil { // Point lookup. - s.tmp2 = makeInFlightWrite(start, math.MaxInt32) + s.tmp2 = makeInFlightWrite(start, math.MaxInt32, 0) } else { // Range lookup. - s.tmp2 = makeInFlightWrite(end, 0) + s.tmp2 = makeInFlightWrite(end, 0, 0) } s.t.AscendRange(&s.tmp1, &s.tmp2, func(i btree.Item) bool { f(i.(*inFlightWrite)) @@ -1093,9 +1137,11 @@ func (s *inFlightWriteSet) asSlice() []roachpb.SequencedWrite { // amortizing the overhead of each allocation. type inFlightWriteAlloc []inFlightWrite -// alloc allocates a new inFlightWrite with the specified key and sequence -// number. -func (a *inFlightWriteAlloc) alloc(key roachpb.Key, seq enginepb.TxnSeq) *inFlightWrite { +// alloc allocates a new inFlightWrite with the specified key, sequence number, +// and strength. +func (a *inFlightWriteAlloc) alloc( + key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength, +) *inFlightWrite { // If the current alloc slice has no extra capacity, reallocate a new chunk. if cap(*a)-len(*a) == 0 { const chunkAllocMinSize = 4 @@ -1112,7 +1158,7 @@ func (a *inFlightWriteAlloc) alloc(key roachpb.Key, seq enginepb.TxnSeq) *inFlig *a = (*a)[:len(*a)+1] w := &(*a)[len(*a)-1] - *w = makeInFlightWrite(key, seq) + *w = makeInFlightWrite(key, seq, str) return w } diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index de76168b3e7f..7e314043c18c 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -14,10 +14,12 @@ import ( "context" "fmt" "math" + "sort" "strings" "testing" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" @@ -26,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -122,8 +125,9 @@ func TestTxnPipeliner1PCTransaction(t *testing.T) { ba := &kvpb.BatchRequest{} ba.Header = kvpb.Header{Txn: &txn} scanArgs := kvpb.ScanRequest{ - RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyB}, - KeyLockingStrength: lock.Exclusive, + RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyB}, + KeyLockingStrength: lock.Exclusive, + KeyLockingDurability: lock.Replicated, } ba.Add(&scanArgs) putArgs := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}} @@ -151,7 +155,7 @@ func TestTxnPipeliner1PCTransaction(t *testing.T) { } require.Equal(t, expLocks, etReq.LockSpans) expInFlight := []roachpb.SequencedWrite{ - {Key: keyA, Sequence: 1}, + {Key: keyA, Sequence: 1, Strength: lock.Intent}, } require.Equal(t, expInFlight, etReq.InFlightWrites) @@ -205,7 +209,7 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) { require.Equal(t, putArgs.Key, w.Key) require.Equal(t, putArgs.Sequence, w.Sequence) - // More writes, one that replaces the other's sequence number. + // More writes, one to the same key as the previous in-flight write. keyB, keyC := roachpb.Key("b"), roachpb.Key("c") ba.Requests = nil cputArgs := kvpb.ConditionalPutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}} @@ -237,6 +241,7 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) { require.Equal(t, txn.ID, qiReq.Txn.ID) require.Equal(t, txn.WriteTimestamp, qiReq.Txn.WriteTimestamp) require.Equal(t, enginepb.TxnSeq(1), qiReq.Txn.Sequence) + require.Equal(t, lock.Intent, qiReq.Strength) require.True(t, qiReq.ErrorIfMissing) // No in-flight writes have been proved yet. @@ -298,15 +303,19 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) { require.Equal(t, enginepb.TxnSeq(3), qiReq2.Txn.Sequence) require.Equal(t, enginepb.TxnSeq(4), qiReq3.Txn.Sequence) require.Equal(t, enginepb.TxnSeq(5), qiReq4.Txn.Sequence) + require.Equal(t, lock.Intent, qiReq1.Strength) + require.Equal(t, lock.Intent, qiReq2.Strength) + require.Equal(t, lock.Intent, qiReq3.Strength) + require.Equal(t, lock.Intent, qiReq4.Strength) etReq := ba.Requests[5].GetEndTxn() require.Equal(t, []roachpb.Span{{Key: keyA}}, etReq.LockSpans) expInFlight := []roachpb.SequencedWrite{ - {Key: keyA, Sequence: 2}, - {Key: keyB, Sequence: 3}, - {Key: keyC, Sequence: 4}, - {Key: keyC, Sequence: 5}, - {Key: keyD, Sequence: 6}, + {Key: keyA, Sequence: 2, Strength: lock.Intent}, + {Key: keyB, Sequence: 3, Strength: lock.Intent}, + {Key: keyC, Sequence: 4, Strength: lock.Intent}, + {Key: keyC, Sequence: 5, Strength: lock.Intent}, + {Key: keyD, Sequence: 6, Strength: lock.Intent}, } require.Equal(t, expInFlight, etReq.InFlightWrites) @@ -394,9 +403,11 @@ func TestTxnPipelinerTrackInFlightWritesPaginatedResponse(t *testing.T) { qiReq1 := ba.Requests[0].GetQueryIntent() require.Equal(t, keyA, qiReq1.Key) require.Equal(t, enginepb.TxnSeq(1), qiReq1.Txn.Sequence) + require.Equal(t, lock.Intent, qiReq1.Strength) qiReq2 := ba.Requests[1].GetQueryIntent() require.Equal(t, keyB, qiReq2.Key) require.Equal(t, enginepb.TxnSeq(2), qiReq2.Txn.Sequence) + require.Equal(t, lock.Intent, qiReq2.Strength) // Assume a range split at key "b". DistSender will split this batch into // two partial batches: @@ -426,21 +437,40 @@ func TestTxnPipelinerTrackInFlightWritesPaginatedResponse(t *testing.T) { } // TestTxnPipelinerReads tests that txnPipeliner will never instruct batches -// with reads in them to use async consensus. It also tests that these reading -// batches will still chain on to in-flight writes, if necessary. +// with non-locking reads in them to use async consensus. It also tests that +// these reading batches will still chain on to in-flight writes, if necessary. +// +// It also performs the same test for unreplicated locking reads, which are +// handled the same by the txnPipeliner. func TestTxnPipelinerReads(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + testutils.RunTrueAndFalse(t, "unreplicated-locking", testTxnPipelinerReads) +} + +func testTxnPipelinerReads(t *testing.T, unreplLocking bool) { ctx := context.Background() tp, mockSender := makeMockTxnPipeliner(nil /* iter */) txn := makeTxnProto() keyA, keyC := roachpb.Key("a"), roachpb.Key("c") + var str lock.Strength + var dur lock.Durability + if unreplLocking { + str = lock.Exclusive + dur = lock.Unreplicated + } + getArgs := kvpb.GetRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyA}, + KeyLockingStrength: str, + KeyLockingDurability: dur, + } + // Read-only. ba := &kvpb.BatchRequest{} ba.Header = kvpb.Header{Txn: &txn} - ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}}) + ba.Add(&getArgs) mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { require.Len(t, ba.Requests, 1) @@ -458,7 +488,7 @@ func TestTxnPipelinerReads(t *testing.T) { // Read before write. ba.Requests = nil - ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}}) + ba.Add(&getArgs) ba.Add(&kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyC}}) mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { @@ -479,7 +509,7 @@ func TestTxnPipelinerReads(t *testing.T) { // Read after write. ba.Requests = nil ba.Add(&kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyC}}) - ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}}) + ba.Add(&getArgs) mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { require.Len(t, ba.Requests, 2) @@ -497,12 +527,12 @@ func TestTxnPipelinerReads(t *testing.T) { require.NotNil(t, br) // Add a key into the in-flight writes set. - tp.ifWrites.insert(keyA, 10) + tp.ifWrites.insert(keyA, 10, lock.Intent) require.Equal(t, 1, tp.ifWrites.len()) // Read-only with conflicting in-flight write. ba.Requests = nil - ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}}) + ba.Add(&getArgs) mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { require.Len(t, ba.Requests, 2) @@ -513,6 +543,7 @@ func TestTxnPipelinerReads(t *testing.T) { qiReq := ba.Requests[0].GetQueryIntent() require.Equal(t, keyA, qiReq.Key) require.Equal(t, enginepb.TxnSeq(10), qiReq.Txn.Sequence) + require.Equal(t, lock.Intent, qiReq.Strength) // No in-flight writes have been proved yet. require.Equal(t, 1, tp.ifWrites.len()) @@ -529,6 +560,314 @@ func TestTxnPipelinerReads(t *testing.T) { require.Equal(t, 0, tp.ifWrites.len()) } +// TestTxnPipelinerReplicatedLockingReads tests that txnPipeliner permits +// batches with replicated locking reads in them to use async consensus. It also +// tests that these locking reads are proved as requests are chained onto them. +// Finally, it tests that EndTxn requests chain on to all existing requests. +func TestTxnPipelinerReplicatedLockingReads(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) + + txn := makeTxnProto() + keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c") + keyD, keyE, keyF := roachpb.Key("d"), roachpb.Key("e"), roachpb.Key("f") + + // Unreplicated locking read-only. + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + ba.Add(&kvpb.GetRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyA}, + KeyLockingStrength: lock.Exclusive, + KeyLockingDurability: lock.Unreplicated, + }) + + // No keys are locked. + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 1) + require.False(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner()) + + br := ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + + br, pErr := tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, 0, tp.ifWrites.len()) + require.Equal(t, 0, len(tp.lockFootprint.asSlice())) + + // Replicated locking read-only: Get, Scan, and ReverseScan. + ba.Requests = nil + ba.Add(&kvpb.GetRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyA}, + KeyLockingStrength: lock.Exclusive, + KeyLockingDurability: lock.Replicated, + }) + ba.Add(&kvpb.ScanRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyB, EndKey: keyD}, + KeyLockingStrength: lock.Shared, + KeyLockingDurability: lock.Replicated, + }) + ba.Add(&kvpb.ReverseScanRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyF, EndKey: keyB}, + KeyLockingStrength: lock.Exclusive, + KeyLockingDurability: lock.Replicated, + }) + + // The first time it is sent, no keys are locked. + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 3) + require.True(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[1].GetInner()) + require.IsType(t, &kvpb.ReverseScanRequest{}, ba.Requests[2].GetInner()) + + br = ba.CreateReply() + br.Txn = ba.Txn + // No keys locked, no keys returned. + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, 0, tp.ifWrites.len()) + require.Equal(t, 0, len(tp.lockFootprint.asSlice())) + + // The second time it is sent, 6 keys are locked. + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 3) + require.True(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[1].GetInner()) + require.IsType(t, &kvpb.ReverseScanRequest{}, ba.Requests[2].GetInner()) + + br = ba.CreateReply() + br.Txn = ba.Txn + // 6 keys locked, 6 keys returned. + br.Responses[0].GetGet().Value = &roachpb.Value{} + br.Responses[1].GetScan().Rows = []roachpb.KeyValue{{Key: keyB}, {Key: keyC}} + br.Responses[2].GetReverseScan().Rows = []roachpb.KeyValue{{Key: keyF}, {Key: keyE}, {Key: keyC}} + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, 6, tp.ifWrites.len()) + require.Equal(t, 0, len(tp.lockFootprint.asSlice())) + + // Check that the tracked inflight writes were updated correctly. + var ifWrites []inFlightWrite + tp.ifWrites.ascend(func(w *inFlightWrite) { + ifWrites = append(ifWrites, *w) + }) + expIfWrites := []inFlightWrite{ + {SequencedWrite: roachpb.SequencedWrite{Key: keyA, Sequence: 0, Strength: lock.Exclusive}}, + {SequencedWrite: roachpb.SequencedWrite{Key: keyB, Sequence: 0, Strength: lock.Shared}}, + {SequencedWrite: roachpb.SequencedWrite{Key: keyC, Sequence: 0, Strength: lock.Shared}}, + {SequencedWrite: roachpb.SequencedWrite{Key: keyC, Sequence: 0, Strength: lock.Exclusive}}, + {SequencedWrite: roachpb.SequencedWrite{Key: keyE, Sequence: 0, Strength: lock.Exclusive}}, + {SequencedWrite: roachpb.SequencedWrite{Key: keyF, Sequence: 0, Strength: lock.Exclusive}}, + } + require.Equal(t, expIfWrites, ifWrites) + + // Replicated locking read before write. Some existing in-flight replicated + // lock writes are queried. The batch is permitted to use async consensus. + ba.Requests = nil + ba.Add(&kvpb.GetRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyA}, + KeyLockingStrength: lock.Shared, + KeyLockingDurability: lock.Replicated, + }) + ba.Add(&kvpb.PutRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyC, Sequence: 1}, + }) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 5) + require.True(t, ba.AsyncConsensus) + require.Len(t, ba.Split(false /* canSplitET */), 1) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.GetRequest{}, ba.Requests[1].GetInner()) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[2].GetInner()) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[3].GetInner()) + require.IsType(t, &kvpb.PutRequest{}, ba.Requests[4].GetInner()) + + qiReq1 := ba.Requests[0].GetQueryIntent() + qiReq2 := ba.Requests[2].GetQueryIntent() + qiReq3 := ba.Requests[3].GetQueryIntent() + require.Equal(t, keyA, qiReq1.Key) + require.Equal(t, keyC, qiReq2.Key) + require.Equal(t, keyC, qiReq3.Key) + require.Equal(t, enginepb.TxnSeq(0), qiReq1.Txn.Sequence) + require.Equal(t, enginepb.TxnSeq(0), qiReq2.Txn.Sequence) + require.Equal(t, enginepb.TxnSeq(0), qiReq3.Txn.Sequence) + require.Equal(t, lock.Exclusive, qiReq1.Strength) + require.Equal(t, lock.Shared, qiReq2.Strength) + require.Equal(t, lock.Exclusive, qiReq3.Strength) + + br = ba.CreateReply() + br.Txn = ba.Txn + br.Responses[0].GetQueryIntent().FoundIntent = true + br.Responses[1].GetGet().Value = &roachpb.Value{} + br.Responses[2].GetQueryIntent().FoundIntent = true + br.Responses[3].GetQueryIntent().FoundIntent = true + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Len(t, br.Responses, 2) // QueryIntent responses stripped + require.Equal(t, 5, tp.ifWrites.len()) + require.Equal(t, 3, len(tp.lockFootprint.asSlice())) + + // Check that the tracked inflight writes were updated correctly. + ifWrites = nil + tp.ifWrites.ascend(func(w *inFlightWrite) { + ifWrites = append(ifWrites, *w) + }) + expIfWrites = []inFlightWrite{ + {SequencedWrite: roachpb.SequencedWrite{Key: keyA, Sequence: 0, Strength: lock.Shared}}, + {SequencedWrite: roachpb.SequencedWrite{Key: keyB, Sequence: 0, Strength: lock.Shared}}, + {SequencedWrite: roachpb.SequencedWrite{Key: keyC, Sequence: 1, Strength: lock.Intent}}, + {SequencedWrite: roachpb.SequencedWrite{Key: keyE, Sequence: 0, Strength: lock.Exclusive}}, + {SequencedWrite: roachpb.SequencedWrite{Key: keyF, Sequence: 0, Strength: lock.Exclusive}}, + } + require.Equal(t, expIfWrites, ifWrites) + + // Replicated locking read after write. Some existing in-flight replicated + // lock writes are queried. The batch is permitted to use async consensus. + ba.Requests = nil + ba.Add(&kvpb.PutRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyD, Sequence: 2}, + }) + ba.Add(&kvpb.GetRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyC, Sequence: 2}, + KeyLockingStrength: lock.Exclusive, + KeyLockingDurability: lock.Replicated, + }) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 3) + require.True(t, ba.AsyncConsensus) + require.Len(t, ba.Split(false /* canSplitET */), 1) + require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[1].GetInner()) + require.IsType(t, &kvpb.GetRequest{}, ba.Requests[2].GetInner()) + + qiReq1 := ba.Requests[1].GetQueryIntent() + require.Equal(t, keyC, qiReq1.Key) + require.Equal(t, enginepb.TxnSeq(1), qiReq1.Txn.Sequence) + require.Equal(t, lock.Intent, qiReq1.Strength) + + br = ba.CreateReply() + br.Txn = ba.Txn + br.Responses[1].GetQueryIntent().FoundIntent = true + br.Responses[2].GetGet().Value = &roachpb.Value{} + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Len(t, br.Responses, 2) // QueryIntent response stripped + require.Equal(t, 6, tp.ifWrites.len()) + require.Equal(t, 4, len(tp.lockFootprint.asSlice())) + + // Check that the tracked inflight writes were updated correctly. + ifWrites = nil + tp.ifWrites.ascend(func(w *inFlightWrite) { + ifWrites = append(ifWrites, *w) + }) + expIfWrites = []inFlightWrite{ + {SequencedWrite: roachpb.SequencedWrite{Key: keyA, Sequence: 0, Strength: lock.Shared}}, + {SequencedWrite: roachpb.SequencedWrite{Key: keyB, Sequence: 0, Strength: lock.Shared}}, + {SequencedWrite: roachpb.SequencedWrite{Key: keyC, Sequence: 2, Strength: lock.Exclusive}}, + {SequencedWrite: roachpb.SequencedWrite{Key: keyD, Sequence: 2, Strength: lock.Intent}}, + {SequencedWrite: roachpb.SequencedWrite{Key: keyE, Sequence: 0, Strength: lock.Exclusive}}, + {SequencedWrite: roachpb.SequencedWrite{Key: keyF, Sequence: 0, Strength: lock.Exclusive}}, + } + require.Equal(t, expIfWrites, ifWrites) + + // Send a final replicated locking read, along with an EndTxn request. Should + // attempt to prove all in-flight writes. Should NOT use async consensus. + ba.Requests = nil + ba.Add(&kvpb.ScanRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyB, EndKey: keyD, Sequence: 2}, + KeyLockingStrength: lock.Shared, + KeyLockingDurability: lock.Replicated, + }) + ba.Add(&kvpb.EndTxnRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyA, Sequence: 3}, + Commit: true, + }) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 8) + require.False(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[1].GetInner()) + require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[2].GetInner()) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[3].GetInner()) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[4].GetInner()) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[5].GetInner()) + require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[6].GetInner()) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[7].GetInner()) + + qiReqs := [6]*kvpb.QueryIntentRequest{ + // NOTE: re-ordered because Scan overlaps 2 in-flight writes which need to + // be proven first. + ba.Requests[3].GetQueryIntent(), + ba.Requests[0].GetQueryIntent(), + ba.Requests[1].GetQueryIntent(), + ba.Requests[4].GetQueryIntent(), + ba.Requests[5].GetQueryIntent(), + ba.Requests[6].GetQueryIntent(), + } + require.Equal(t, len(expIfWrites), len(qiReqs)) + for i, qiReq := range qiReqs { + expIfWrite := expIfWrites[i] + require.Equal(t, expIfWrite.Key, qiReq.Key) + require.Equal(t, expIfWrite.Sequence, qiReq.Txn.Sequence) + require.Equal(t, expIfWrite.Strength, qiReq.Strength) + } + + etReq := ba.Requests[7].GetEndTxn() + require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB, EndKey: keyD}}, etReq.LockSpans) + var expETIfWrites []roachpb.SequencedWrite + for _, ifWrite := range expIfWrites { + expETIfWrites = append(expETIfWrites, ifWrite.SequencedWrite) + } + sort.Sort(roachpb.SequencedWriteBySeq(expETIfWrites)) + require.Equal(t, expETIfWrites, etReq.InFlightWrites) + + br = ba.CreateReply() + br.Txn = ba.Txn + br.Txn.Status = roachpb.COMMITTED + br.Responses[0].GetQueryIntent().FoundIntent = true + br.Responses[1].GetQueryIntent().FoundIntent = true + br.Responses[2].GetScan().Rows = []roachpb.KeyValue{{Key: keyB}, {Key: keyC}} + br.Responses[3].GetQueryIntent().FoundIntent = true + br.Responses[4].GetQueryIntent().FoundIntent = true + br.Responses[5].GetQueryIntent().FoundIntent = true + br.Responses[6].GetQueryIntent().FoundIntent = true + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Len(t, br.Responses, 2) // QueryIntent responses stripped + require.Equal(t, 0, tp.ifWrites.len()) + require.Equal(t, 4, len(tp.lockFootprint.asSlice())) +} + // TestTxnPipelinerRangedWrites tests that the txnPipeliner can perform some // ranged write operations using async consensus. In particular, ranged requests // which have the canPipeline flag set. It also verifies that ranged writes will @@ -574,11 +913,11 @@ func TestTxnPipelinerRangedWrites(t *testing.T) { // request. Send the batch again and assert that the Put chains onto the // first in-flight write and the DeleteRange chains onto the second and // third in-flight write. - tp.ifWrites.insert(roachpb.Key("a"), 10) - tp.ifWrites.insert(roachpb.Key("b"), 11) - tp.ifWrites.insert(roachpb.Key("c"), 12) - tp.ifWrites.insert(roachpb.Key("d"), 13) - tp.ifWrites.insert(roachpb.Key("e"), 13) + tp.ifWrites.insert(roachpb.Key("a"), 10, lock.Intent) + tp.ifWrites.insert(roachpb.Key("b"), 11, lock.Intent) + tp.ifWrites.insert(roachpb.Key("c"), 12, lock.Intent) + tp.ifWrites.insert(roachpb.Key("d"), 13, lock.Intent) + tp.ifWrites.insert(roachpb.Key("e"), 13, lock.Intent) require.Equal(t, 5, tp.ifWrites.len()) mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { @@ -602,6 +941,9 @@ func TestTxnPipelinerRangedWrites(t *testing.T) { require.Equal(t, enginepb.TxnSeq(10), qiReq1.Txn.Sequence) require.Equal(t, enginepb.TxnSeq(11), qiReq2.Txn.Sequence) require.Equal(t, enginepb.TxnSeq(12), qiReq3.Txn.Sequence) + require.Equal(t, lock.Intent, qiReq1.Strength) + require.Equal(t, lock.Intent, qiReq2.Strength) + require.Equal(t, lock.Intent, qiReq3.Strength) // No in-flight writes have been proved yet. require.Equal(t, 5, tp.ifWrites.len()) @@ -777,6 +1119,7 @@ func TestTxnPipelinerManyWrites(t *testing.T) { require.Equal(t, key, qiReq.Key) require.Equal(t, txn.ID, qiReq.Txn.ID) require.Equal(t, makeSeq(i), qiReq.Txn.Sequence) + require.Equal(t, lock.Intent, qiReq.Strength) getReq := ba.Requests[i+1].GetGet() require.Equal(t, key, getReq.Key) @@ -858,7 +1201,10 @@ func TestTxnPipelinerTransactionAbort(t *testing.T) { etReq := ba.Requests[0].GetEndTxn() require.Len(t, etReq.LockSpans, 0) - require.Equal(t, []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}}, etReq.InFlightWrites) + expInFlight := []roachpb.SequencedWrite{ + {Key: keyA, Sequence: 1, Strength: lock.Intent}, + } + require.Equal(t, expInFlight, etReq.InFlightWrites) br = ba.CreateReply() br.Txn = ba.Txn @@ -886,7 +1232,10 @@ func TestTxnPipelinerTransactionAbort(t *testing.T) { etReq := ba.Requests[0].GetEndTxn() require.Len(t, etReq.LockSpans, 0) - require.Equal(t, []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}}, etReq.InFlightWrites) + expInFlight := []roachpb.SequencedWrite{ + {Key: keyA, Sequence: 1, Strength: lock.Intent}, + } + require.Equal(t, expInFlight, etReq.InFlightWrites) br = ba.CreateReply() br.Txn = ba.Txn @@ -908,8 +1257,8 @@ func TestTxnPipelinerEpochIncrement(t *testing.T) { defer log.Scope(t).Close(t) tp, _ := makeMockTxnPipeliner(nil /* iter */) - tp.ifWrites.insert(roachpb.Key("b"), 10) - tp.ifWrites.insert(roachpb.Key("d"), 11) + tp.ifWrites.insert(roachpb.Key("b"), 10, lock.Intent) + tp.ifWrites.insert(roachpb.Key("d"), 11, lock.Intent) require.Equal(t, 2, tp.ifWrites.len()) require.Equal(t, 0, len(tp.lockFootprint.asSlice())) @@ -939,10 +1288,10 @@ func TestTxnPipelinerIntentMissingError(t *testing.T) { // Insert in-flight writes into the in-flight write set so that each request // will need to chain on with a QueryIntent. - tp.ifWrites.insert(keyA, 1) - tp.ifWrites.insert(keyB, 2) - tp.ifWrites.insert(keyC, 3) - tp.ifWrites.insert(keyD, 4) + tp.ifWrites.insert(keyA, 1, lock.Intent) + tp.ifWrites.insert(keyB, 2, lock.Intent) + tp.ifWrites.insert(keyC, 3, lock.Intent) + tp.ifWrites.insert(keyD, 4, lock.Intent) for errIdx, resErrIdx := range map[int32]int32{ 0: 0, // intent on key "a" missing @@ -1041,13 +1390,16 @@ func TestTxnPipelinerDeleteRangeRequests(t *testing.T) { require.Equal(t, enginepb.TxnSeq(7), qiReq1.Txn.Sequence) require.Equal(t, enginepb.TxnSeq(7), qiReq2.Txn.Sequence) require.Equal(t, enginepb.TxnSeq(7), qiReq3.Txn.Sequence) + require.Equal(t, lock.Intent, qiReq1.Strength) + require.Equal(t, lock.Intent, qiReq2.Strength) + require.Equal(t, lock.Intent, qiReq3.Strength) etReq := ba.Requests[4].GetEndTxn() require.Equal(t, []roachpb.Span{{Key: keyC, EndKey: keyE}}, etReq.LockSpans) expInFlight := []roachpb.SequencedWrite{ - {Key: keyA, Sequence: 7}, - {Key: keyB, Sequence: 7}, - {Key: keyD, Sequence: 7}, + {Key: keyA, Sequence: 7, Strength: lock.Intent}, + {Key: keyB, Sequence: 7, Strength: lock.Intent}, + {Key: keyD, Sequence: 7, Strength: lock.Intent}, } require.Equal(t, expInFlight, etReq.InFlightWrites) @@ -1150,6 +1502,7 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) { qiReq := ba.Requests[0].GetQueryIntent() require.Equal(t, keyA, qiReq.Key) require.Equal(t, enginepb.TxnSeq(2), qiReq.Txn.Sequence) + require.Equal(t, lock.Intent, qiReq.Strength) br = ba.CreateReply() br.Txn = ba.Txn @@ -1178,10 +1531,14 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) { qiReq := ba.Requests[0].GetQueryIntent() require.Equal(t, keyC, qiReq.Key) require.Equal(t, enginepb.TxnSeq(3), qiReq.Txn.Sequence) + require.Equal(t, lock.Intent, qiReq.Strength) etReq := ba.Requests[1].GetEndTxn() require.Equal(t, []roachpb.Span{{Key: keyA}}, etReq.LockSpans) - require.Equal(t, []roachpb.SequencedWrite{{Key: keyC, Sequence: 3}}, etReq.InFlightWrites) + expInFlight := []roachpb.SequencedWrite{ + {Key: keyC, Sequence: 3, Strength: lock.Intent}, + } + require.Equal(t, expInFlight, etReq.InFlightWrites) br = ba.CreateReply() br.Txn = ba.Txn @@ -1196,6 +1553,183 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) { require.Equal(t, 0, tp.ifWrites.len()) } +// TestTxnPipelinerDisableRangedWrites tests that the txnPipeliner behaves +// correctly if pipelining for ranged writes is disabled. +func TestTxnPipelinerDisableRangedWrites(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) + + // Start with pipelining disabled. Should NOT use async consensus. + pipelinedRangedWritesEnabled.Override(ctx, &tp.st.SV, false) + + txn := makeTxnProto() + keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c") + + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + ba.Add(&kvpb.DeleteRangeRequest{RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: 1}}) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 1) + require.False(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[0].GetInner()) + + br := ba.CreateReply() + br.Txn = ba.Txn + br.Responses[0].GetDeleteRange().Keys = []roachpb.Key{keyA, keyB} + return br, nil + }) + + br, pErr := tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, 0, tp.ifWrites.len()) + + // Enable pipelining. Should use async consensus. + pipelinedRangedWritesEnabled.Override(ctx, &tp.st.SV, true) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 1) + require.True(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[0].GetInner()) + + br = ba.CreateReply() + br.Txn = ba.Txn + br.Responses[0].GetDeleteRange().Keys = []roachpb.Key{keyA, keyB} + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, 2, tp.ifWrites.len()) +} + +// TestTxnPipelinerDisableLockingReads tests that the txnPipeliner behaves +// correctly if pipelining for locking reads is disabled. +func TestTxnPipelinerDisableLockingReads(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) + + // Start with pipelining disabled. Should NOT use async consensus. + pipelinedLockingReadsEnabled.Override(ctx, &tp.st.SV, false) + + txn := makeTxnProto() + keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c") + + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + ba.Add(&kvpb.ScanRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC}, + KeyLockingStrength: lock.Exclusive, + KeyLockingDurability: lock.Replicated, + }) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 1) + require.False(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[0].GetInner()) + + br := ba.CreateReply() + br.Txn = ba.Txn + br.Responses[0].GetScan().Rows = []roachpb.KeyValue{{Key: keyA}, {Key: keyB}} + return br, nil + }) + + br, pErr := tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, 0, tp.ifWrites.len()) + + // Enable pipelining. Should use async consensus. + pipelinedLockingReadsEnabled.Override(ctx, &tp.st.SV, true) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 1) + require.True(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[0].GetInner()) + + br = ba.CreateReply() + br.Txn = ba.Txn + br.Responses[0].GetScan().Rows = []roachpb.KeyValue{{Key: keyA}, {Key: keyB}} + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, 2, tp.ifWrites.len()) +} + +// TestTxnPipelinerMixedVersionLockingReads tests that the txnPipeliner behaves +// correctly if in a mixed-version cluster when locking reads are not supported. +func TestTxnPipelinerMixedVersionLockingReads(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + tp, mockSender := makeMockTxnPipeliner(nil /* iter */) + + // Start in a mixed-version cluster. Should NOT use async consensus. + tp.st = cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.Latest.Version(), + clusterversion.MinSupported.Version(), + false, // initializeVersion + ) + require.NoError(t, clusterversion.Initialize( + ctx, clusterversion.MinSupported.Version(), &tp.st.SV)) + + txn := makeTxnProto() + keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c") + + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + ba.Add(&kvpb.ScanRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC}, + KeyLockingStrength: lock.Exclusive, + KeyLockingDurability: lock.Replicated, + }) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 1) + require.False(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[0].GetInner()) + + br := ba.CreateReply() + br.Txn = ba.Txn + br.Responses[0].GetScan().Rows = []roachpb.KeyValue{{Key: keyA}, {Key: keyB}} + return br, nil + }) + + br, pErr := tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, 0, tp.ifWrites.len()) + + // Upgrade to the latest version. Should use async consensus. + require.NoError(t, clusterversion.Initialize( + ctx, clusterversion.Latest.Version(), &tp.st.SV)) + + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 1) + require.True(t, ba.AsyncConsensus) + require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[0].GetInner()) + + br = ba.CreateReply() + br.Txn = ba.Txn + br.Responses[0].GetScan().Rows = []roachpb.KeyValue{{Key: keyA}, {Key: keyB}} + return br, nil + }) + + br, pErr = tp.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, 2, tp.ifWrites.len()) +} + // TestTxnPipelinerMaxInFlightSize tests that batches are not pipelined if doing // so would push the memory used to track locks and in-flight writes over the // limit allowed by the kv.transaction.max_intents_bytes setting. @@ -1720,17 +2254,17 @@ func TestTxnPipelinerSavepoints(t *testing.T) { initialSavepoint := savepoint{} tp.createSavepointLocked(ctx, &initialSavepoint) - tp.ifWrites.insert(roachpb.Key("a"), 10) - tp.ifWrites.insert(roachpb.Key("b"), 11) - tp.ifWrites.insert(roachpb.Key("c"), 12) + tp.ifWrites.insert(roachpb.Key("a"), 10, lock.Intent) + tp.ifWrites.insert(roachpb.Key("b"), 11, lock.Intent) + tp.ifWrites.insert(roachpb.Key("c"), 12, lock.Intent) require.Equal(t, 3, tp.ifWrites.len()) s := savepoint{seqNum: enginepb.TxnSeq(12), active: true} tp.createSavepointLocked(ctx, &s) // Some more writes after the savepoint. - tp.ifWrites.insert(roachpb.Key("c"), 13) - tp.ifWrites.insert(roachpb.Key("d"), 14) + tp.ifWrites.insert(roachpb.Key("c"), 13, lock.Intent) + tp.ifWrites.insert(roachpb.Key("d"), 14, lock.Intent) require.Equal(t, 5, tp.ifWrites.len()) require.Empty(t, tp.lockFootprint.asSlice()) @@ -1749,6 +2283,7 @@ func TestTxnPipelinerSavepoints(t *testing.T) { qiReq := ba.Requests[0].GetQueryIntent() require.Equal(t, roachpb.Key("a"), qiReq.Key) require.Equal(t, enginepb.TxnSeq(10), qiReq.Txn.Sequence) + require.Equal(t, lock.Intent, qiReq.Strength) br := ba.CreateReply() br.Txn = ba.Txn @@ -1773,8 +2308,8 @@ func TestTxnPipelinerSavepoints(t *testing.T) { }) require.Equal(t, []inFlightWrite{ - {SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("b"), Sequence: 11}}, - {SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("c"), Sequence: 12}}, + {SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("b"), Sequence: 11, Strength: lock.Intent}}, + {SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("c"), Sequence: 12, Strength: lock.Intent}}, }, ifWrites) @@ -1878,7 +2413,7 @@ func TestTxnPipelinerCondenseLockSpans2(t *testing.T) { } for _, k := range tc.ifWrites { - tp.ifWrites.insert(roachpb.Key(k), 1) + tp.ifWrites.insert(roachpb.Key(k), 1, lock.Intent) } txn := makeTxnProto() diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 93f293f44372..c2755c104319 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -84,8 +84,8 @@ var flagDependencies = map[flag][]flag{ isAdmin: {isAlone}, isLocking: {isTxn}, isIntentWrite: {isWrite, isLocking}, - canPipeline: {isIntentWrite}, - canParallelCommit: {canPipeline}, + canPipeline: {isLocking}, + canParallelCommit: {canPipeline, isIntentWrite}, appliesTSCache: {isWrite}, skipsLeaseCheck: {isAlone}, } @@ -1871,7 +1871,7 @@ func flagForLockStrength(l lock.Strength) flag { func flagForLockDurability(d lock.Durability) flag { if d == lock.Replicated { - return isWrite + return isWrite | canPipeline } return 0 } diff --git a/pkg/kv/kvpb/api_test.go b/pkg/kv/kvpb/api_test.go index 57ba6d72fda3..0c3898ecc72c 100644 --- a/pkg/kv/kvpb/api_test.go +++ b/pkg/kv/kvpb/api_test.go @@ -381,9 +381,12 @@ func TestFlagCombinations(t *testing.T) { &AddSSTableRequest{SSTTimestampToRequestTimestamp: hlc.Timestamp{Logical: 1}}, &DeleteRangeRequest{Inline: true}, &DeleteRangeRequest{UseRangeTombstone: true}, - &GetRequest{KeyLockingStrength: lock.Exclusive}, - &ReverseScanRequest{KeyLockingStrength: lock.Exclusive}, - &ScanRequest{KeyLockingStrength: lock.Exclusive}, + &GetRequest{KeyLockingStrength: lock.Shared, KeyLockingDurability: lock.Unreplicated}, + &GetRequest{KeyLockingStrength: lock.Exclusive, KeyLockingDurability: lock.Replicated}, + &ScanRequest{KeyLockingStrength: lock.Shared, KeyLockingDurability: lock.Unreplicated}, + &ScanRequest{KeyLockingStrength: lock.Exclusive, KeyLockingDurability: lock.Replicated}, + &ReverseScanRequest{KeyLockingStrength: lock.Shared, KeyLockingDurability: lock.Unreplicated}, + &ReverseScanRequest{KeyLockingStrength: lock.Exclusive, KeyLockingDurability: lock.Replicated}, } reqTypes := []Request{} diff --git a/pkg/kv/kvpb/batch_test.go b/pkg/kv/kvpb/batch_test.go index a3652769bba9..299193ed5852 100644 --- a/pkg/kv/kvpb/batch_test.go +++ b/pkg/kv/kvpb/batch_test.go @@ -78,6 +78,8 @@ func TestBatchIsCompleteTransaction(t *testing.T) { func TestBatchSplit(t *testing.T) { get := &GetRequest{} + ulget := &GetRequest{KeyLockingStrength: lock.Exclusive, KeyLockingDurability: lock.Unreplicated} + rlget := &GetRequest{KeyLockingStrength: lock.Exclusive, KeyLockingDurability: lock.Replicated} scan := &ScanRequest{} put := &PutRequest{} spl := &AdminSplitRequest{} @@ -91,6 +93,8 @@ func TestBatchSplit(t *testing.T) { canSplitET bool }{ {[]Request{get, put}, []int{1, 1}, true}, + {[]Request{ulget, put}, []int{1, 1}, true}, + {[]Request{rlget, put}, []int{2}, true}, {[]Request{put, et}, []int{1, 1}, true}, {[]Request{get, get, get, put, put, get, get}, []int{3, 2, 2}, true}, {[]Request{spl, get, scan, spl, get}, []int{1, 2, 1, 1}, true}, @@ -112,6 +116,9 @@ func TestBatchSplit(t *testing.T) { // request that follows. {[]Request{get, qi, put}, []int{1, 2}, true}, {[]Request{get, qi, qi, qi, qi, put}, []int{1, 5}, true}, + {[]Request{qi, get, qi, put}, []int{2, 2}, true}, + {[]Request{qi, ulget, qi, put}, []int{2, 2}, true}, + {[]Request{qi, rlget, qi, put}, []int{4}, true}, {[]Request{qi, get, qi, get, qi, get, qi, put, qi, put, qi, get, qi, get}, []int{6, 4, 4}, true}, {[]Request{qi, spl, qi, get, scan, qi, qi, spl, qi, get}, []int{1, 1, 5, 1, 2}, true}, {[]Request{scan, qi, qi, qi, et}, []int{4, 1}, true}, diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 70568cac05a6..c693b1747631 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -211,9 +211,15 @@ func (r *Replica) evalAndPropose( maybeFinishSpan = proposal.sp.Finish } - // Signal the proposal's response channel immediately. + // Signal the proposal's response channel immediately. Return a shallow-ish + // copy of the response to avoid aliasing issues if the client mutates the + // batch response header or individual response headers before replication + // completes. reply := *proposal.Local.Reply reply.Responses = append([]kvpb.ResponseUnion(nil), reply.Responses...) + for i, ru := range reply.Responses { + reply.Responses[i].MustSetInner(ru.GetInner().ShallowCopy()) + } pr := makeProposalResult(&reply, nil /* pErr */, proposal.Local.DetachEncounteredIntents(), nil /* eti */) proposal.signalProposalResult(pr) diff --git a/pkg/kv/kvserver/txnrecovery/BUILD.bazel b/pkg/kv/kvserver/txnrecovery/BUILD.bazel index 91f27ec67d87..18e61a00a8d1 100644 --- a/pkg/kv/kvserver/txnrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/txnrecovery/BUILD.bazel @@ -11,7 +11,6 @@ go_library( deps = [ "//pkg/kv", "//pkg/kv/kvpb", - "//pkg/kv/kvserver/concurrency/lock", "//pkg/roachpb", "//pkg/util/hlc", "//pkg/util/log", diff --git a/pkg/kv/kvserver/txnrecovery/manager.go b/pkg/kv/kvserver/txnrecovery/manager.go index e3a527fab019..ec0c021b580a 100644 --- a/pkg/kv/kvserver/txnrecovery/manager.go +++ b/pkg/kv/kvserver/txnrecovery/manager.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -206,9 +205,8 @@ func (m *manager) resolveIndeterminateCommitForTxnProbe( RequestHeader: kvpb.RequestHeader{ Key: w.Key, }, - Txn: meta, - // TODO(nvanbenschoten): pass in the correct lock strength here. - Strength: lock.Intent, + Txn: meta, + Strength: w.Strength, IgnoredSeqNums: txn.IgnoredSeqNums, }) } diff --git a/pkg/roachpb/data.proto b/pkg/roachpb/data.proto index 067b86a108aa..de880c1299c4 100644 --- a/pkg/roachpb/data.proto +++ b/pkg/roachpb/data.proto @@ -641,6 +641,8 @@ message SequencedWrite { // The sequence number of the request that created the write. int32 sequence = 2 [ (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/storage/enginepb.TxnSeq"]; + // The strength with which the lock was acquired at. + kv.kvserver.concurrency.lock.Strength strength = 3; } // LeaseAcquisitionType indicates the type of lease acquisition event that diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index eaae9ba8e46f..ad5c9d485354 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -632,9 +632,12 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error { // after making sure to nil out the requests in order to lose references to // the underlying Get and Scan requests which could keep large byte slices // alive. - f.reqsScratch = ba.Requests - for i := range f.reqsScratch { - f.reqsScratch[i] = kvpb.RequestUnion{} + // TODO(nvanbenschoten): explain why this was needed. + if f.lockDurability != lock.Replicated { + f.reqsScratch = ba.Requests + for i := range f.reqsScratch { + f.reqsScratch[i] = kvpb.RequestUnion{} + } } if monitoring { reqsScratchMemUsage := requestUnionOverhead * int64(cap(f.reqsScratch))