diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 8b7d703f52fd..d3b06de4cc10 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -66,6 +66,10 @@ kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don'
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions application
kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes used to track refresh spans in serializable transactions application
kv.transaction.reject_over_max_intents_budget.enabled boolean false if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed application
+kv.transaction.write_pipelining.locking_reads.enabled boolean true if enabled, transactional locking reads are pipelined through Raft consensus application
+kv.transaction.write_pipelining.ranged_writes.enabled boolean true if enabled, transactional ranged writes are pipelined through Raft consensus application
+kv.transaction.write_pipelining.enabled boolean true if enabled, transactional writes are pipelined through Raft consensus application
+kv.transaction.write_pipelining.max_batch_size integer 128 if non-zero, defines that maximum size batch that will be pipelined through Raft consensus application
schedules.backup.gc_protection.enabled boolean true enable chaining of GC protection across backups run as part of a schedule application
security.ocsp.mode enumeration off use OCSP to check whether TLS certificates are revoked. If the OCSP server is unreachable, in strict mode all certificates will be rejected and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2] application
security.ocsp.timeout duration 3s timeout before considering the OCSP server unreachable application
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index e5628f9fbb49..e00a6767b824 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -92,6 +92,10 @@
kv.transaction.max_intents_bytes
| integer | 4194304 | maximum number of bytes used to track locks in transactions | Serverless/Dedicated/Self-Hosted |
kv.transaction.max_refresh_spans_bytes
| integer | 4194304 | maximum number of bytes used to track refresh spans in serializable transactions | Serverless/Dedicated/Self-Hosted |
kv.transaction.reject_over_max_intents_budget.enabled
| boolean | false | if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed | Serverless/Dedicated/Self-Hosted |
+kv.transaction.write_pipelining.locking_reads.enabled
| boolean | true | if enabled, transactional locking reads are pipelined through Raft consensus | Serverless/Dedicated/Self-Hosted |
+kv.transaction.write_pipelining.ranged_writes.enabled
| boolean | true | if enabled, transactional ranged writes are pipelined through Raft consensus | Serverless/Dedicated/Self-Hosted |
+kv.transaction.write_pipelining.enabled
| boolean | true | if enabled, transactional writes are pipelined through Raft consensus | Serverless/Dedicated/Self-Hosted |
+kv.transaction.write_pipelining.max_batch_size
| integer | 128 | if non-zero, defines that maximum size batch that will be pipelined through Raft consensus | Serverless/Dedicated/Self-Hosted |
kvadmission.store.provisioned_bandwidth
| byte size | 0 B | if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be overridden on a per-store basis using the --store flag. Note that setting the provisioned bandwidth to a positive value may enable disk bandwidth based admission control, since admission.disk_bandwidth_tokens.elastic.enabled defaults to true | Dedicated/Self-Hosted |
schedules.backup.gc_protection.enabled
| boolean | true | enable chaining of GC protection across backups run as part of a schedule | Serverless/Dedicated/Self-Hosted |
security.ocsp.mode
| enumeration | off | use OCSP to check whether TLS certificates are revoked. If the OCSP server is unreachable, in strict mode all certificates will be rejected and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2] | Serverless/Dedicated/Self-Hosted |
diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go
index 2224d8fc90a2..4150a9c8eff1 100644
--- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go
+++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go
@@ -91,7 +91,9 @@ func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (kv.SavepointToke
// TODO(nvanbenschoten): once #113765 is resolved, we should make this
// unconditional and push it into txnSeqNumAllocator.createSavepointLocked.
if tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() {
- tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked()
+ if err := tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked(ctx); err != nil {
+ return nil, err
+ }
}
s := &savepoint{
@@ -159,7 +161,9 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin
enginepb.IgnoredSeqNumRange{
Start: sp.seqNum, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
})
- tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked()
+ if err := tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked(ctx); err != nil {
+ return err
+ }
}
return nil
diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
index 20482c1172d4..b39aedaeee55 100644
--- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
+++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
@@ -15,6 +15,7 @@ import (
"math"
"sort"
+ "github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -39,7 +40,25 @@ var PipelinedWritesEnabled = settings.RegisterBoolSetting(
"if enabled, transactional writes are pipelined through Raft consensus",
true,
settings.WithName("kv.transaction.write_pipelining.enabled"),
+ settings.WithPublic,
)
+
+var pipelinedRangedWritesEnabled = settings.RegisterBoolSetting(
+ settings.ApplicationLevel,
+ "kv.transaction.write_pipelining.ranged_writes.enabled",
+ "if enabled, transactional ranged writes are pipelined through Raft consensus",
+ true,
+ settings.WithPublic,
+)
+
+var pipelinedLockingReadsEnabled = settings.RegisterBoolSetting(
+ settings.ApplicationLevel,
+ "kv.transaction.write_pipelining.locking_reads.enabled",
+ "if enabled, transactional locking reads are pipelined through Raft consensus",
+ true,
+ settings.WithPublic,
+)
+
var pipelinedWritesMaxBatchSize = settings.RegisterIntSetting(
settings.ApplicationLevel,
"kv.transaction.write_pipelining_max_batch_size",
@@ -55,6 +74,7 @@ var pipelinedWritesMaxBatchSize = settings.RegisterIntSetting(
128,
settings.NonNegativeInt,
settings.WithName("kv.transaction.write_pipelining.max_batch_size"),
+ settings.WithPublic,
)
// TrackedWritesMaxSize is a byte threshold for the tracking of writes performed
@@ -416,7 +436,7 @@ func (tp *txnPipeliner) attachLocksToEndTxn(
// and forgo a parallel commit, but let's not break that abstraction
// boundary here.
if kvpb.IsIntentWrite(req) && !kvpb.IsRange(req) {
- w := roachpb.SequencedWrite{Key: h.Key, Sequence: h.Sequence}
+ w := roachpb.SequencedWrite{Key: h.Key, Sequence: h.Sequence, Strength: lock.Intent}
et.InFlightWrites = append(et.InFlightWrites, w)
} else {
et.LockSpans = append(et.LockSpans, h.Span())
@@ -433,7 +453,7 @@ func (tp *txnPipeliner) attachLocksToEndTxn(
log.Infof(ctx, "intent: [%s,%s)", intent.Key, intent.EndKey)
}
for _, write := range et.InFlightWrites {
- log.Infof(ctx, "in-flight: %d:%s", write.Sequence, write.Key)
+ log.Infof(ctx, "in-flight: %d:%s (%s)", write.Sequence, write.Key, write.Strength)
}
}
return ba, nil
@@ -500,6 +520,21 @@ func (tp *txnPipeliner) canUseAsyncConsensus(ctx context.Context, ba *kvpb.Batch
return false
}
+ if kvpb.IsRange(req) {
+ if !pipelinedRangedWritesEnabled.Get(&tp.st.SV) {
+ return false
+ }
+ }
+
+ if !kvpb.IsIntentWrite(req) {
+ if !pipelinedLockingReadsEnabled.Get(&tp.st.SV) {
+ return false
+ }
+ if !tp.st.Version.IsActive(ctx, clusterversion.V24_1_ReplicatedLockPipelining) {
+ return false
+ }
+ }
+
// Inhibit async consensus if the batch would push us over the maximum
// tracking memory budget. If we allowed async consensus on this batch, its
// writes would need to be tracked precisely. By inhibiting async consensus,
@@ -575,6 +610,8 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba *kvpb.BatchRequest) *kvpb.Batch
Key: w.Key,
},
Txn: meta,
+ Strength: w.Strength,
+ IgnoredSeqNums: ba.Txn.IgnoredSeqNums,
ErrorIfMissing: true,
})
@@ -758,13 +795,17 @@ func (tp *txnPipeliner) updateLockTrackingInner(
// in the future.
qiResp := resp.(*kvpb.QueryIntentResponse)
if qiResp.FoundIntent || qiResp.FoundUnpushedIntent {
- tp.ifWrites.remove(qiReq.Key, qiReq.Txn.Sequence)
+ tp.ifWrites.remove(qiReq.Key, qiReq.Txn.Sequence, qiReq.Strength)
// Move to lock footprint.
tp.lockFootprint.insert(roachpb.Span{Key: qiReq.Key})
}
} else if kvpb.IsLocking(req) {
// If the request intended to acquire locks, track its lock spans.
seq := req.Header().Sequence
+ str := lock.Intent
+ if readOnlyReq, ok := req.(kvpb.LockingReadRequest); ok {
+ str, _ = readOnlyReq.KeyLocking()
+ }
trackLocks := func(span roachpb.Span, _ lock.Durability) {
if ba.AsyncConsensus {
// Record any writes that were performed asynchronously. We'll
@@ -772,7 +813,7 @@ func (tp *txnPipeliner) updateLockTrackingInner(
if span.EndKey != nil {
log.Fatalf(ctx, "unexpected multi-key intent pipelined")
}
- tp.ifWrites.insert(span.Key, seq)
+ tp.ifWrites.insert(span.Key, seq, str)
} else {
// If the lock acquisitions weren't performed asynchronously
// then add them directly to our lock footprint.
@@ -853,7 +894,7 @@ func (tp *txnPipeliner) populateLeafInputState(tis *roachpb.LeafTxnInputState) {
func (tp *txnPipeliner) initializeLeaf(tis *roachpb.LeafTxnInputState) {
// Copy all in-flight writes into the inFlightWrite tree.
for _, w := range tis.InFlightWrites {
- tp.ifWrites.insert(w.Key, w.Sequence)
+ tp.ifWrites.insert(w.Key, w.Sequence, w.Strength)
}
}
@@ -904,7 +945,7 @@ func (tp *txnPipeliner) rollbackToSavepointLocked(ctx context.Context, s savepoi
// been verified in the meantime) by removing all the extra ones.
if needCollecting {
for _, ifw := range writesToDelete {
- tp.ifWrites.remove(ifw.Key, ifw.Sequence)
+ tp.ifWrites.remove(ifw.Key, ifw.Sequence, ifw.Strength)
}
} else {
tp.ifWrites.clear(true /* reuse */)
@@ -926,7 +967,7 @@ func (tp *txnPipeliner) hasAcquiredLocks() bool {
// inFlightWrites represent a commitment to proving (via QueryIntent) that
// a point write succeeded in replicating an intent with a specific sequence
-// number.
+// number and strength.
type inFlightWrite struct {
roachpb.SequencedWrite
// chainIndex is used to avoid chaining on to the same in-flight write
@@ -936,15 +977,17 @@ type inFlightWrite struct {
}
// makeInFlightWrite constructs an inFlightWrite.
-func makeInFlightWrite(key roachpb.Key, seq enginepb.TxnSeq) inFlightWrite {
- return inFlightWrite{SequencedWrite: roachpb.SequencedWrite{Key: key, Sequence: seq}}
+func makeInFlightWrite(key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength) inFlightWrite {
+ return inFlightWrite{SequencedWrite: roachpb.SequencedWrite{
+ Key: key, Sequence: seq, Strength: str,
+ }}
}
// Less implements the btree.Item interface.
//
-// inFlightWrites are ordered by Key and then by Sequence. Two inFlightWrites
-// with the same Key but different Sequences are not considered equal and are
-// maintained separately in the inFlightWritesSet.
+// inFlightWrites are ordered by Key, then by Sequence, then by Strength. Two
+// inFlightWrites with the same Key but different Sequences and/or Strengths are
+// not considered equal and are maintained separately in the inFlightWritesSet.
func (a *inFlightWrite) Less(bItem btree.Item) bool {
b := bItem.(*inFlightWrite)
kCmp := a.Key.Compare(b.Key)
@@ -956,6 +999,10 @@ func (a *inFlightWrite) Less(bItem btree.Item) bool {
// Different Sequence.
return a.Sequence < b.Sequence
}
+ if a.Strength != b.Strength {
+ // Different Strength.
+ return a.Strength < b.Strength
+ }
// Equal.
return false
}
@@ -975,17 +1022,17 @@ type inFlightWriteSet struct {
// insert attempts to insert an in-flight write that has not been proven to have
// succeeded into the in-flight write set.
-func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq) {
+func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength) {
if s.t == nil {
// Lazily initialize btree.
s.t = btree.New(txnPipelinerBtreeDegree)
}
- w := s.alloc.alloc(key, seq)
+ w := s.alloc.alloc(key, seq, str)
delItem := s.t.ReplaceOrInsert(w)
if delItem != nil {
// An in-flight write with the same key and sequence already existed in the
- // set. This is unexpected, but we handle it.
+ // set. We replaced it with an identical in-flight write.
*delItem.(*inFlightWrite) = inFlightWrite{} // for GC
} else {
s.bytes += keySize(key)
@@ -994,14 +1041,14 @@ func (s *inFlightWriteSet) insert(key roachpb.Key, seq enginepb.TxnSeq) {
// remove attempts to remove an in-flight write from the in-flight write set.
// The method will be a no-op if the write was already proved.
-func (s *inFlightWriteSet) remove(key roachpb.Key, seq enginepb.TxnSeq) {
+func (s *inFlightWriteSet) remove(key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength) {
if s.len() == 0 {
// Set is empty.
return
}
// Delete the write from the in-flight writes set.
- s.tmp1 = makeInFlightWrite(key, seq)
+ s.tmp1 = makeInFlightWrite(key, seq, str)
delItem := s.t.Delete(&s.tmp1)
if delItem == nil {
// The write was already proven or the txn epoch was incremented.
@@ -1037,13 +1084,13 @@ func (s *inFlightWriteSet) ascendRange(start, end roachpb.Key, f func(w *inFligh
// Set is empty.
return
}
- s.tmp1 = makeInFlightWrite(start, 0)
+ s.tmp1 = makeInFlightWrite(start, 0, 0)
if end == nil {
// Point lookup.
- s.tmp2 = makeInFlightWrite(start, math.MaxInt32)
+ s.tmp2 = makeInFlightWrite(start, math.MaxInt32, 0)
} else {
// Range lookup.
- s.tmp2 = makeInFlightWrite(end, 0)
+ s.tmp2 = makeInFlightWrite(end, 0, 0)
}
s.t.AscendRange(&s.tmp1, &s.tmp2, func(i btree.Item) bool {
f(i.(*inFlightWrite))
@@ -1093,9 +1140,11 @@ func (s *inFlightWriteSet) asSlice() []roachpb.SequencedWrite {
// amortizing the overhead of each allocation.
type inFlightWriteAlloc []inFlightWrite
-// alloc allocates a new inFlightWrite with the specified key and sequence
-// number.
-func (a *inFlightWriteAlloc) alloc(key roachpb.Key, seq enginepb.TxnSeq) *inFlightWrite {
+// alloc allocates a new inFlightWrite with the specified key, sequence number,
+// and strength.
+func (a *inFlightWriteAlloc) alloc(
+ key roachpb.Key, seq enginepb.TxnSeq, str lock.Strength,
+) *inFlightWrite {
// If the current alloc slice has no extra capacity, reallocate a new chunk.
if cap(*a)-len(*a) == 0 {
const chunkAllocMinSize = 4
@@ -1112,7 +1161,7 @@ func (a *inFlightWriteAlloc) alloc(key roachpb.Key, seq enginepb.TxnSeq) *inFlig
*a = (*a)[:len(*a)+1]
w := &(*a)[len(*a)-1]
- *w = makeInFlightWrite(key, seq)
+ *w = makeInFlightWrite(key, seq, str)
return w
}
diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
index c35a89904287..4066a7211719 100644
--- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
+++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
@@ -14,10 +14,12 @@ import (
"context"
"fmt"
"math"
+ "sort"
"strings"
"testing"
"time"
+ "github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
@@ -26,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
+ "github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -122,8 +125,9 @@ func TestTxnPipeliner1PCTransaction(t *testing.T) {
ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
scanArgs := kvpb.ScanRequest{
- RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyB},
- KeyLockingStrength: lock.Exclusive,
+ RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyB},
+ KeyLockingStrength: lock.Exclusive,
+ KeyLockingDurability: lock.Replicated,
}
ba.Add(&scanArgs)
putArgs := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}}
@@ -151,7 +155,7 @@ func TestTxnPipeliner1PCTransaction(t *testing.T) {
}
require.Equal(t, expLocks, etReq.LockSpans)
expInFlight := []roachpb.SequencedWrite{
- {Key: keyA, Sequence: 1},
+ {Key: keyA, Sequence: 1, Strength: lock.Intent},
}
require.Equal(t, expInFlight, etReq.InFlightWrites)
@@ -205,7 +209,7 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) {
require.Equal(t, putArgs.Key, w.Key)
require.Equal(t, putArgs.Sequence, w.Sequence)
- // More writes, one that replaces the other's sequence number.
+ // More writes, one to the same key as the previous in-flight write.
keyB, keyC := roachpb.Key("b"), roachpb.Key("c")
ba.Requests = nil
cputArgs := kvpb.ConditionalPutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}}
@@ -237,6 +241,7 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) {
require.Equal(t, txn.ID, qiReq.Txn.ID)
require.Equal(t, txn.WriteTimestamp, qiReq.Txn.WriteTimestamp)
require.Equal(t, enginepb.TxnSeq(1), qiReq.Txn.Sequence)
+ require.Equal(t, lock.Intent, qiReq.Strength)
require.True(t, qiReq.ErrorIfMissing)
// No in-flight writes have been proved yet.
@@ -298,15 +303,19 @@ func TestTxnPipelinerTrackInFlightWrites(t *testing.T) {
require.Equal(t, enginepb.TxnSeq(3), qiReq2.Txn.Sequence)
require.Equal(t, enginepb.TxnSeq(4), qiReq3.Txn.Sequence)
require.Equal(t, enginepb.TxnSeq(5), qiReq4.Txn.Sequence)
+ require.Equal(t, lock.Intent, qiReq1.Strength)
+ require.Equal(t, lock.Intent, qiReq2.Strength)
+ require.Equal(t, lock.Intent, qiReq3.Strength)
+ require.Equal(t, lock.Intent, qiReq4.Strength)
etReq := ba.Requests[5].GetEndTxn()
require.Equal(t, []roachpb.Span{{Key: keyA}}, etReq.LockSpans)
expInFlight := []roachpb.SequencedWrite{
- {Key: keyA, Sequence: 2},
- {Key: keyB, Sequence: 3},
- {Key: keyC, Sequence: 4},
- {Key: keyC, Sequence: 5},
- {Key: keyD, Sequence: 6},
+ {Key: keyA, Sequence: 2, Strength: lock.Intent},
+ {Key: keyB, Sequence: 3, Strength: lock.Intent},
+ {Key: keyC, Sequence: 4, Strength: lock.Intent},
+ {Key: keyC, Sequence: 5, Strength: lock.Intent},
+ {Key: keyD, Sequence: 6, Strength: lock.Intent},
}
require.Equal(t, expInFlight, etReq.InFlightWrites)
@@ -394,9 +403,11 @@ func TestTxnPipelinerTrackInFlightWritesPaginatedResponse(t *testing.T) {
qiReq1 := ba.Requests[0].GetQueryIntent()
require.Equal(t, keyA, qiReq1.Key)
require.Equal(t, enginepb.TxnSeq(1), qiReq1.Txn.Sequence)
+ require.Equal(t, lock.Intent, qiReq1.Strength)
qiReq2 := ba.Requests[1].GetQueryIntent()
require.Equal(t, keyB, qiReq2.Key)
require.Equal(t, enginepb.TxnSeq(2), qiReq2.Txn.Sequence)
+ require.Equal(t, lock.Intent, qiReq2.Strength)
// Assume a range split at key "b". DistSender will split this batch into
// two partial batches:
@@ -426,21 +437,40 @@ func TestTxnPipelinerTrackInFlightWritesPaginatedResponse(t *testing.T) {
}
// TestTxnPipelinerReads tests that txnPipeliner will never instruct batches
-// with reads in them to use async consensus. It also tests that these reading
-// batches will still chain on to in-flight writes, if necessary.
+// with non-locking reads in them to use async consensus. It also tests that
+// these reading batches will still chain on to in-flight writes, if necessary.
+//
+// It also performs the same test for unreplicated locking reads, which are
+// handled the same by the txnPipeliner.
func TestTxnPipelinerReads(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
+ testutils.RunTrueAndFalse(t, "unreplicated-locking", testTxnPipelinerReads)
+}
+
+func testTxnPipelinerReads(t *testing.T, unreplLocking bool) {
ctx := context.Background()
tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
txn := makeTxnProto()
keyA, keyC := roachpb.Key("a"), roachpb.Key("c")
+ var str lock.Strength
+ var dur lock.Durability
+ if unreplLocking {
+ str = lock.Exclusive
+ dur = lock.Unreplicated
+ }
+ getArgs := kvpb.GetRequest{
+ RequestHeader: kvpb.RequestHeader{Key: keyA},
+ KeyLockingStrength: str,
+ KeyLockingDurability: dur,
+ }
+
// Read-only.
ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
- ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}})
+ ba.Add(&getArgs)
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 1)
@@ -458,7 +488,7 @@ func TestTxnPipelinerReads(t *testing.T) {
// Read before write.
ba.Requests = nil
- ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}})
+ ba.Add(&getArgs)
ba.Add(&kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyC}})
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
@@ -479,7 +509,7 @@ func TestTxnPipelinerReads(t *testing.T) {
// Read after write.
ba.Requests = nil
ba.Add(&kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyC}})
- ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}})
+ ba.Add(&getArgs)
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 2)
@@ -497,12 +527,12 @@ func TestTxnPipelinerReads(t *testing.T) {
require.NotNil(t, br)
// Add a key into the in-flight writes set.
- tp.ifWrites.insert(keyA, 10)
+ tp.ifWrites.insert(keyA, 10, lock.Intent)
require.Equal(t, 1, tp.ifWrites.len())
// Read-only with conflicting in-flight write.
ba.Requests = nil
- ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}})
+ ba.Add(&getArgs)
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 2)
@@ -513,6 +543,7 @@ func TestTxnPipelinerReads(t *testing.T) {
qiReq := ba.Requests[0].GetQueryIntent()
require.Equal(t, keyA, qiReq.Key)
require.Equal(t, enginepb.TxnSeq(10), qiReq.Txn.Sequence)
+ require.Equal(t, lock.Intent, qiReq.Strength)
// No in-flight writes have been proved yet.
require.Equal(t, 1, tp.ifWrites.len())
@@ -529,6 +560,314 @@ func TestTxnPipelinerReads(t *testing.T) {
require.Equal(t, 0, tp.ifWrites.len())
}
+// TestTxnPipelinerReplicatedLockingReads tests that txnPipeliner permits
+// batches with replicated locking reads in them to use async consensus. It also
+// tests that these locking reads are proved as requests are chained onto them.
+// Finally, it tests that EndTxn requests chain on to all existing requests.
+func TestTxnPipelinerReplicatedLockingReads(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ ctx := context.Background()
+ tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
+
+ txn := makeTxnProto()
+ keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
+ keyD, keyE, keyF := roachpb.Key("d"), roachpb.Key("e"), roachpb.Key("f")
+
+ // Unreplicated locking read-only.
+ ba := &kvpb.BatchRequest{}
+ ba.Header = kvpb.Header{Txn: &txn}
+ ba.Add(&kvpb.GetRequest{
+ RequestHeader: kvpb.RequestHeader{Key: keyA},
+ KeyLockingStrength: lock.Exclusive,
+ KeyLockingDurability: lock.Unreplicated,
+ })
+
+ // No keys are locked.
+ mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
+ require.Len(t, ba.Requests, 1)
+ require.False(t, ba.AsyncConsensus)
+ require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
+
+ br := ba.CreateReply()
+ br.Txn = ba.Txn
+ return br, nil
+ })
+
+ br, pErr := tp.SendLocked(ctx, ba)
+ require.Nil(t, pErr)
+ require.NotNil(t, br)
+ require.Equal(t, 0, tp.ifWrites.len())
+ require.Equal(t, 0, len(tp.lockFootprint.asSlice()))
+
+ // Replicated locking read-only: Get, Scan, and ReverseScan.
+ ba.Requests = nil
+ ba.Add(&kvpb.GetRequest{
+ RequestHeader: kvpb.RequestHeader{Key: keyA},
+ KeyLockingStrength: lock.Exclusive,
+ KeyLockingDurability: lock.Replicated,
+ })
+ ba.Add(&kvpb.ScanRequest{
+ RequestHeader: kvpb.RequestHeader{Key: keyB, EndKey: keyD},
+ KeyLockingStrength: lock.Shared,
+ KeyLockingDurability: lock.Replicated,
+ })
+ ba.Add(&kvpb.ReverseScanRequest{
+ RequestHeader: kvpb.RequestHeader{Key: keyF, EndKey: keyB},
+ KeyLockingStrength: lock.Exclusive,
+ KeyLockingDurability: lock.Replicated,
+ })
+
+ // The first time it is sent, no keys are locked.
+ mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
+ require.Len(t, ba.Requests, 3)
+ require.True(t, ba.AsyncConsensus)
+ require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
+ require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[1].GetInner())
+ require.IsType(t, &kvpb.ReverseScanRequest{}, ba.Requests[2].GetInner())
+
+ br = ba.CreateReply()
+ br.Txn = ba.Txn
+ // No keys locked, no keys returned.
+ return br, nil
+ })
+
+ br, pErr = tp.SendLocked(ctx, ba)
+ require.Nil(t, pErr)
+ require.NotNil(t, br)
+ require.Equal(t, 0, tp.ifWrites.len())
+ require.Equal(t, 0, len(tp.lockFootprint.asSlice()))
+
+ // The second time it is sent, 6 keys are locked.
+ mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
+ require.Len(t, ba.Requests, 3)
+ require.True(t, ba.AsyncConsensus)
+ require.IsType(t, &kvpb.GetRequest{}, ba.Requests[0].GetInner())
+ require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[1].GetInner())
+ require.IsType(t, &kvpb.ReverseScanRequest{}, ba.Requests[2].GetInner())
+
+ br = ba.CreateReply()
+ br.Txn = ba.Txn
+ // 6 keys locked, 6 keys returned.
+ br.Responses[0].GetGet().Value = &roachpb.Value{}
+ br.Responses[1].GetScan().Rows = []roachpb.KeyValue{{Key: keyB}, {Key: keyC}}
+ br.Responses[2].GetReverseScan().Rows = []roachpb.KeyValue{{Key: keyF}, {Key: keyE}, {Key: keyC}}
+ return br, nil
+ })
+
+ br, pErr = tp.SendLocked(ctx, ba)
+ require.Nil(t, pErr)
+ require.NotNil(t, br)
+ require.Equal(t, 6, tp.ifWrites.len())
+ require.Equal(t, 0, len(tp.lockFootprint.asSlice()))
+
+ // Check that the tracked inflight writes were updated correctly.
+ var ifWrites []inFlightWrite
+ tp.ifWrites.ascend(func(w *inFlightWrite) {
+ ifWrites = append(ifWrites, *w)
+ })
+ expIfWrites := []inFlightWrite{
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyA, Sequence: 0, Strength: lock.Exclusive}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyB, Sequence: 0, Strength: lock.Shared}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyC, Sequence: 0, Strength: lock.Shared}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyC, Sequence: 0, Strength: lock.Exclusive}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyE, Sequence: 0, Strength: lock.Exclusive}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyF, Sequence: 0, Strength: lock.Exclusive}},
+ }
+ require.Equal(t, expIfWrites, ifWrites)
+
+ // Replicated locking read before write. Some existing in-flight replicated
+ // lock writes are queried. The batch is permitted to use async consensus.
+ ba.Requests = nil
+ ba.Add(&kvpb.GetRequest{
+ RequestHeader: kvpb.RequestHeader{Key: keyA},
+ KeyLockingStrength: lock.Shared,
+ KeyLockingDurability: lock.Replicated,
+ })
+ ba.Add(&kvpb.PutRequest{
+ RequestHeader: kvpb.RequestHeader{Key: keyC, Sequence: 1},
+ })
+
+ mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
+ require.Len(t, ba.Requests, 5)
+ require.True(t, ba.AsyncConsensus)
+ require.Len(t, ba.Split(false /* canSplitET */), 1)
+ require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
+ require.IsType(t, &kvpb.GetRequest{}, ba.Requests[1].GetInner())
+ require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[2].GetInner())
+ require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[3].GetInner())
+ require.IsType(t, &kvpb.PutRequest{}, ba.Requests[4].GetInner())
+
+ qiReq1 := ba.Requests[0].GetQueryIntent()
+ qiReq2 := ba.Requests[2].GetQueryIntent()
+ qiReq3 := ba.Requests[3].GetQueryIntent()
+ require.Equal(t, keyA, qiReq1.Key)
+ require.Equal(t, keyC, qiReq2.Key)
+ require.Equal(t, keyC, qiReq3.Key)
+ require.Equal(t, enginepb.TxnSeq(0), qiReq1.Txn.Sequence)
+ require.Equal(t, enginepb.TxnSeq(0), qiReq2.Txn.Sequence)
+ require.Equal(t, enginepb.TxnSeq(0), qiReq3.Txn.Sequence)
+ require.Equal(t, lock.Exclusive, qiReq1.Strength)
+ require.Equal(t, lock.Shared, qiReq2.Strength)
+ require.Equal(t, lock.Exclusive, qiReq3.Strength)
+
+ br = ba.CreateReply()
+ br.Txn = ba.Txn
+ br.Responses[0].GetQueryIntent().FoundIntent = true
+ br.Responses[1].GetGet().Value = &roachpb.Value{}
+ br.Responses[2].GetQueryIntent().FoundIntent = true
+ br.Responses[3].GetQueryIntent().FoundIntent = true
+ return br, nil
+ })
+
+ br, pErr = tp.SendLocked(ctx, ba)
+ require.Nil(t, pErr)
+ require.NotNil(t, br)
+ require.Len(t, br.Responses, 2) // QueryIntent responses stripped
+ require.Equal(t, 5, tp.ifWrites.len())
+ require.Equal(t, 3, len(tp.lockFootprint.asSlice()))
+
+ // Check that the tracked inflight writes were updated correctly.
+ ifWrites = nil
+ tp.ifWrites.ascend(func(w *inFlightWrite) {
+ ifWrites = append(ifWrites, *w)
+ })
+ expIfWrites = []inFlightWrite{
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyA, Sequence: 0, Strength: lock.Shared}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyB, Sequence: 0, Strength: lock.Shared}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyC, Sequence: 1, Strength: lock.Intent}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyE, Sequence: 0, Strength: lock.Exclusive}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyF, Sequence: 0, Strength: lock.Exclusive}},
+ }
+ require.Equal(t, expIfWrites, ifWrites)
+
+ // Replicated locking read after write. Some existing in-flight replicated
+ // lock writes are queried. The batch is permitted to use async consensus.
+ ba.Requests = nil
+ ba.Add(&kvpb.PutRequest{
+ RequestHeader: kvpb.RequestHeader{Key: keyD, Sequence: 2},
+ })
+ ba.Add(&kvpb.GetRequest{
+ RequestHeader: kvpb.RequestHeader{Key: keyC, Sequence: 2},
+ KeyLockingStrength: lock.Exclusive,
+ KeyLockingDurability: lock.Replicated,
+ })
+
+ mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
+ require.Len(t, ba.Requests, 3)
+ require.True(t, ba.AsyncConsensus)
+ require.Len(t, ba.Split(false /* canSplitET */), 1)
+ require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
+ require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[1].GetInner())
+ require.IsType(t, &kvpb.GetRequest{}, ba.Requests[2].GetInner())
+
+ qiReq1 := ba.Requests[1].GetQueryIntent()
+ require.Equal(t, keyC, qiReq1.Key)
+ require.Equal(t, enginepb.TxnSeq(1), qiReq1.Txn.Sequence)
+ require.Equal(t, lock.Intent, qiReq1.Strength)
+
+ br = ba.CreateReply()
+ br.Txn = ba.Txn
+ br.Responses[1].GetQueryIntent().FoundIntent = true
+ br.Responses[2].GetGet().Value = &roachpb.Value{}
+ return br, nil
+ })
+
+ br, pErr = tp.SendLocked(ctx, ba)
+ require.Nil(t, pErr)
+ require.NotNil(t, br)
+ require.Len(t, br.Responses, 2) // QueryIntent response stripped
+ require.Equal(t, 6, tp.ifWrites.len())
+ require.Equal(t, 4, len(tp.lockFootprint.asSlice()))
+
+ // Check that the tracked inflight writes were updated correctly.
+ ifWrites = nil
+ tp.ifWrites.ascend(func(w *inFlightWrite) {
+ ifWrites = append(ifWrites, *w)
+ })
+ expIfWrites = []inFlightWrite{
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyA, Sequence: 0, Strength: lock.Shared}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyB, Sequence: 0, Strength: lock.Shared}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyC, Sequence: 2, Strength: lock.Exclusive}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyD, Sequence: 2, Strength: lock.Intent}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyE, Sequence: 0, Strength: lock.Exclusive}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: keyF, Sequence: 0, Strength: lock.Exclusive}},
+ }
+ require.Equal(t, expIfWrites, ifWrites)
+
+ // Send a final replicated locking read, along with an EndTxn request. Should
+ // attempt to prove all in-flight writes. Should NOT use async consensus.
+ ba.Requests = nil
+ ba.Add(&kvpb.ScanRequest{
+ RequestHeader: kvpb.RequestHeader{Key: keyB, EndKey: keyD, Sequence: 2},
+ KeyLockingStrength: lock.Shared,
+ KeyLockingDurability: lock.Replicated,
+ })
+ ba.Add(&kvpb.EndTxnRequest{
+ RequestHeader: kvpb.RequestHeader{Key: keyA, Sequence: 3},
+ Commit: true,
+ })
+
+ mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
+ require.Len(t, ba.Requests, 8)
+ require.False(t, ba.AsyncConsensus)
+ require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
+ require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[1].GetInner())
+ require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[2].GetInner())
+ require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[3].GetInner())
+ require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[4].GetInner())
+ require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[5].GetInner())
+ require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[6].GetInner())
+ require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[7].GetInner())
+
+ qiReqs := [6]*kvpb.QueryIntentRequest{
+ // NOTE: re-ordered because Scan overlaps 2 in-flight writes which need to
+ // be proven first.
+ ba.Requests[3].GetQueryIntent(),
+ ba.Requests[0].GetQueryIntent(),
+ ba.Requests[1].GetQueryIntent(),
+ ba.Requests[4].GetQueryIntent(),
+ ba.Requests[5].GetQueryIntent(),
+ ba.Requests[6].GetQueryIntent(),
+ }
+ require.Equal(t, len(expIfWrites), len(qiReqs))
+ for i, qiReq := range qiReqs {
+ expIfWrite := expIfWrites[i]
+ require.Equal(t, expIfWrite.Key, qiReq.Key)
+ require.Equal(t, expIfWrite.Sequence, qiReq.Txn.Sequence)
+ require.Equal(t, expIfWrite.Strength, qiReq.Strength)
+ }
+
+ etReq := ba.Requests[7].GetEndTxn()
+ require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB, EndKey: keyD}}, etReq.LockSpans)
+ var expETIfWrites []roachpb.SequencedWrite
+ for _, ifWrite := range expIfWrites {
+ expETIfWrites = append(expETIfWrites, ifWrite.SequencedWrite)
+ }
+ sort.Sort(roachpb.SequencedWriteBySeq(expETIfWrites))
+ require.Equal(t, expETIfWrites, etReq.InFlightWrites)
+
+ br = ba.CreateReply()
+ br.Txn = ba.Txn
+ br.Txn.Status = roachpb.COMMITTED
+ br.Responses[0].GetQueryIntent().FoundIntent = true
+ br.Responses[1].GetQueryIntent().FoundIntent = true
+ br.Responses[2].GetScan().Rows = []roachpb.KeyValue{{Key: keyB}, {Key: keyC}}
+ br.Responses[3].GetQueryIntent().FoundIntent = true
+ br.Responses[4].GetQueryIntent().FoundIntent = true
+ br.Responses[5].GetQueryIntent().FoundIntent = true
+ br.Responses[6].GetQueryIntent().FoundIntent = true
+ return br, nil
+ })
+
+ br, pErr = tp.SendLocked(ctx, ba)
+ require.Nil(t, pErr)
+ require.NotNil(t, br)
+ require.Len(t, br.Responses, 2) // QueryIntent responses stripped
+ require.Equal(t, 0, tp.ifWrites.len())
+ require.Equal(t, 4, len(tp.lockFootprint.asSlice()))
+}
+
// TestTxnPipelinerRangedWrites tests that the txnPipeliner can perform some
// ranged write operations using async consensus. In particular, ranged requests
// which have the canPipeline flag set. It also verifies that ranged writes will
@@ -574,11 +913,11 @@ func TestTxnPipelinerRangedWrites(t *testing.T) {
// request. Send the batch again and assert that the Put chains onto the
// first in-flight write and the DeleteRange chains onto the second and
// third in-flight write.
- tp.ifWrites.insert(roachpb.Key("a"), 10)
- tp.ifWrites.insert(roachpb.Key("b"), 11)
- tp.ifWrites.insert(roachpb.Key("c"), 12)
- tp.ifWrites.insert(roachpb.Key("d"), 13)
- tp.ifWrites.insert(roachpb.Key("e"), 13)
+ tp.ifWrites.insert(roachpb.Key("a"), 10, lock.Intent)
+ tp.ifWrites.insert(roachpb.Key("b"), 11, lock.Intent)
+ tp.ifWrites.insert(roachpb.Key("c"), 12, lock.Intent)
+ tp.ifWrites.insert(roachpb.Key("d"), 13, lock.Intent)
+ tp.ifWrites.insert(roachpb.Key("e"), 13, lock.Intent)
require.Equal(t, 5, tp.ifWrites.len())
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
@@ -602,6 +941,9 @@ func TestTxnPipelinerRangedWrites(t *testing.T) {
require.Equal(t, enginepb.TxnSeq(10), qiReq1.Txn.Sequence)
require.Equal(t, enginepb.TxnSeq(11), qiReq2.Txn.Sequence)
require.Equal(t, enginepb.TxnSeq(12), qiReq3.Txn.Sequence)
+ require.Equal(t, lock.Intent, qiReq1.Strength)
+ require.Equal(t, lock.Intent, qiReq2.Strength)
+ require.Equal(t, lock.Intent, qiReq3.Strength)
// No in-flight writes have been proved yet.
require.Equal(t, 5, tp.ifWrites.len())
@@ -777,6 +1119,7 @@ func TestTxnPipelinerManyWrites(t *testing.T) {
require.Equal(t, key, qiReq.Key)
require.Equal(t, txn.ID, qiReq.Txn.ID)
require.Equal(t, makeSeq(i), qiReq.Txn.Sequence)
+ require.Equal(t, lock.Intent, qiReq.Strength)
getReq := ba.Requests[i+1].GetGet()
require.Equal(t, key, getReq.Key)
@@ -858,7 +1201,10 @@ func TestTxnPipelinerTransactionAbort(t *testing.T) {
etReq := ba.Requests[0].GetEndTxn()
require.Len(t, etReq.LockSpans, 0)
- require.Equal(t, []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}}, etReq.InFlightWrites)
+ expInFlight := []roachpb.SequencedWrite{
+ {Key: keyA, Sequence: 1, Strength: lock.Intent},
+ }
+ require.Equal(t, expInFlight, etReq.InFlightWrites)
br = ba.CreateReply()
br.Txn = ba.Txn
@@ -886,7 +1232,10 @@ func TestTxnPipelinerTransactionAbort(t *testing.T) {
etReq := ba.Requests[0].GetEndTxn()
require.Len(t, etReq.LockSpans, 0)
- require.Equal(t, []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}}, etReq.InFlightWrites)
+ expInFlight := []roachpb.SequencedWrite{
+ {Key: keyA, Sequence: 1, Strength: lock.Intent},
+ }
+ require.Equal(t, expInFlight, etReq.InFlightWrites)
br = ba.CreateReply()
br.Txn = ba.Txn
@@ -908,8 +1257,8 @@ func TestTxnPipelinerEpochIncrement(t *testing.T) {
defer log.Scope(t).Close(t)
tp, _ := makeMockTxnPipeliner(nil /* iter */)
- tp.ifWrites.insert(roachpb.Key("b"), 10)
- tp.ifWrites.insert(roachpb.Key("d"), 11)
+ tp.ifWrites.insert(roachpb.Key("b"), 10, lock.Intent)
+ tp.ifWrites.insert(roachpb.Key("d"), 11, lock.Intent)
require.Equal(t, 2, tp.ifWrites.len())
require.Equal(t, 0, len(tp.lockFootprint.asSlice()))
@@ -939,10 +1288,10 @@ func TestTxnPipelinerIntentMissingError(t *testing.T) {
// Insert in-flight writes into the in-flight write set so that each request
// will need to chain on with a QueryIntent.
- tp.ifWrites.insert(keyA, 1)
- tp.ifWrites.insert(keyB, 2)
- tp.ifWrites.insert(keyC, 3)
- tp.ifWrites.insert(keyD, 4)
+ tp.ifWrites.insert(keyA, 1, lock.Intent)
+ tp.ifWrites.insert(keyB, 2, lock.Intent)
+ tp.ifWrites.insert(keyC, 3, lock.Intent)
+ tp.ifWrites.insert(keyD, 4, lock.Intent)
for errIdx, resErrIdx := range map[int32]int32{
0: 0, // intent on key "a" missing
@@ -1041,13 +1390,16 @@ func TestTxnPipelinerDeleteRangeRequests(t *testing.T) {
require.Equal(t, enginepb.TxnSeq(7), qiReq1.Txn.Sequence)
require.Equal(t, enginepb.TxnSeq(7), qiReq2.Txn.Sequence)
require.Equal(t, enginepb.TxnSeq(7), qiReq3.Txn.Sequence)
+ require.Equal(t, lock.Intent, qiReq1.Strength)
+ require.Equal(t, lock.Intent, qiReq2.Strength)
+ require.Equal(t, lock.Intent, qiReq3.Strength)
etReq := ba.Requests[4].GetEndTxn()
require.Equal(t, []roachpb.Span{{Key: keyC, EndKey: keyE}}, etReq.LockSpans)
expInFlight := []roachpb.SequencedWrite{
- {Key: keyA, Sequence: 7},
- {Key: keyB, Sequence: 7},
- {Key: keyD, Sequence: 7},
+ {Key: keyA, Sequence: 7, Strength: lock.Intent},
+ {Key: keyB, Sequence: 7, Strength: lock.Intent},
+ {Key: keyD, Sequence: 7, Strength: lock.Intent},
}
require.Equal(t, expInFlight, etReq.InFlightWrites)
@@ -1150,6 +1502,7 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) {
qiReq := ba.Requests[0].GetQueryIntent()
require.Equal(t, keyA, qiReq.Key)
require.Equal(t, enginepb.TxnSeq(2), qiReq.Txn.Sequence)
+ require.Equal(t, lock.Intent, qiReq.Strength)
br = ba.CreateReply()
br.Txn = ba.Txn
@@ -1178,10 +1531,14 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) {
qiReq := ba.Requests[0].GetQueryIntent()
require.Equal(t, keyC, qiReq.Key)
require.Equal(t, enginepb.TxnSeq(3), qiReq.Txn.Sequence)
+ require.Equal(t, lock.Intent, qiReq.Strength)
etReq := ba.Requests[1].GetEndTxn()
require.Equal(t, []roachpb.Span{{Key: keyA}}, etReq.LockSpans)
- require.Equal(t, []roachpb.SequencedWrite{{Key: keyC, Sequence: 3}}, etReq.InFlightWrites)
+ expInFlight := []roachpb.SequencedWrite{
+ {Key: keyC, Sequence: 3, Strength: lock.Intent},
+ }
+ require.Equal(t, expInFlight, etReq.InFlightWrites)
br = ba.CreateReply()
br.Txn = ba.Txn
@@ -1196,6 +1553,183 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) {
require.Equal(t, 0, tp.ifWrites.len())
}
+// TestTxnPipelinerDisableRangedWrites tests that the txnPipeliner behaves
+// correctly if pipelining for ranged writes is disabled.
+func TestTxnPipelinerDisableRangedWrites(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ ctx := context.Background()
+ tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
+
+ // Start with pipelining disabled. Should NOT use async consensus.
+ pipelinedRangedWritesEnabled.Override(ctx, &tp.st.SV, false)
+
+ txn := makeTxnProto()
+ keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
+
+ ba := &kvpb.BatchRequest{}
+ ba.Header = kvpb.Header{Txn: &txn}
+ ba.Add(&kvpb.DeleteRangeRequest{RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: 1}})
+
+ mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
+ require.Len(t, ba.Requests, 1)
+ require.False(t, ba.AsyncConsensus)
+ require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[0].GetInner())
+
+ br := ba.CreateReply()
+ br.Txn = ba.Txn
+ br.Responses[0].GetDeleteRange().Keys = []roachpb.Key{keyA, keyB}
+ return br, nil
+ })
+
+ br, pErr := tp.SendLocked(ctx, ba)
+ require.Nil(t, pErr)
+ require.NotNil(t, br)
+ require.Equal(t, 0, tp.ifWrites.len())
+
+ // Enable pipelining. Should use async consensus.
+ pipelinedRangedWritesEnabled.Override(ctx, &tp.st.SV, true)
+
+ mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
+ require.Len(t, ba.Requests, 1)
+ require.True(t, ba.AsyncConsensus)
+ require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[0].GetInner())
+
+ br = ba.CreateReply()
+ br.Txn = ba.Txn
+ br.Responses[0].GetDeleteRange().Keys = []roachpb.Key{keyA, keyB}
+ return br, nil
+ })
+
+ br, pErr = tp.SendLocked(ctx, ba)
+ require.Nil(t, pErr)
+ require.NotNil(t, br)
+ require.Equal(t, 2, tp.ifWrites.len())
+}
+
+// TestTxnPipelinerDisableLockingReads tests that the txnPipeliner behaves
+// correctly if pipelining for locking reads is disabled.
+func TestTxnPipelinerDisableLockingReads(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ ctx := context.Background()
+ tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
+
+ // Start with pipelining disabled. Should NOT use async consensus.
+ pipelinedLockingReadsEnabled.Override(ctx, &tp.st.SV, false)
+
+ txn := makeTxnProto()
+ keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
+
+ ba := &kvpb.BatchRequest{}
+ ba.Header = kvpb.Header{Txn: &txn}
+ ba.Add(&kvpb.ScanRequest{
+ RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC},
+ KeyLockingStrength: lock.Exclusive,
+ KeyLockingDurability: lock.Replicated,
+ })
+
+ mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
+ require.Len(t, ba.Requests, 1)
+ require.False(t, ba.AsyncConsensus)
+ require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[0].GetInner())
+
+ br := ba.CreateReply()
+ br.Txn = ba.Txn
+ br.Responses[0].GetScan().Rows = []roachpb.KeyValue{{Key: keyA}, {Key: keyB}}
+ return br, nil
+ })
+
+ br, pErr := tp.SendLocked(ctx, ba)
+ require.Nil(t, pErr)
+ require.NotNil(t, br)
+ require.Equal(t, 0, tp.ifWrites.len())
+
+ // Enable pipelining. Should use async consensus.
+ pipelinedLockingReadsEnabled.Override(ctx, &tp.st.SV, true)
+
+ mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
+ require.Len(t, ba.Requests, 1)
+ require.True(t, ba.AsyncConsensus)
+ require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[0].GetInner())
+
+ br = ba.CreateReply()
+ br.Txn = ba.Txn
+ br.Responses[0].GetScan().Rows = []roachpb.KeyValue{{Key: keyA}, {Key: keyB}}
+ return br, nil
+ })
+
+ br, pErr = tp.SendLocked(ctx, ba)
+ require.Nil(t, pErr)
+ require.NotNil(t, br)
+ require.Equal(t, 2, tp.ifWrites.len())
+}
+
+// TestTxnPipelinerMixedVersionLockingReads tests that the txnPipeliner behaves
+// correctly if in a mixed-version cluster when locking reads are not supported.
+func TestTxnPipelinerMixedVersionLockingReads(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ ctx := context.Background()
+ tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
+
+ // Start in a mixed-version cluster. Should NOT use async consensus.
+ tp.st = cluster.MakeTestingClusterSettingsWithVersions(
+ clusterversion.Latest.Version(),
+ clusterversion.MinSupported.Version(),
+ false, // initializeVersion
+ )
+ require.NoError(t, clusterversion.Initialize(
+ ctx, clusterversion.MinSupported.Version(), &tp.st.SV))
+
+ txn := makeTxnProto()
+ keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
+
+ ba := &kvpb.BatchRequest{}
+ ba.Header = kvpb.Header{Txn: &txn}
+ ba.Add(&kvpb.ScanRequest{
+ RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC},
+ KeyLockingStrength: lock.Exclusive,
+ KeyLockingDurability: lock.Replicated,
+ })
+
+ mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
+ require.Len(t, ba.Requests, 1)
+ require.False(t, ba.AsyncConsensus)
+ require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[0].GetInner())
+
+ br := ba.CreateReply()
+ br.Txn = ba.Txn
+ br.Responses[0].GetScan().Rows = []roachpb.KeyValue{{Key: keyA}, {Key: keyB}}
+ return br, nil
+ })
+
+ br, pErr := tp.SendLocked(ctx, ba)
+ require.Nil(t, pErr)
+ require.NotNil(t, br)
+ require.Equal(t, 0, tp.ifWrites.len())
+
+ // Upgrade to the latest version. Should use async consensus.
+ require.NoError(t, clusterversion.Initialize(
+ ctx, clusterversion.Latest.Version(), &tp.st.SV))
+
+ mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
+ require.Len(t, ba.Requests, 1)
+ require.True(t, ba.AsyncConsensus)
+ require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[0].GetInner())
+
+ br = ba.CreateReply()
+ br.Txn = ba.Txn
+ br.Responses[0].GetScan().Rows = []roachpb.KeyValue{{Key: keyA}, {Key: keyB}}
+ return br, nil
+ })
+
+ br, pErr = tp.SendLocked(ctx, ba)
+ require.Nil(t, pErr)
+ require.NotNil(t, br)
+ require.Equal(t, 2, tp.ifWrites.len())
+}
+
// TestTxnPipelinerMaxInFlightSize tests that batches are not pipelined if doing
// so would push the memory used to track locks and in-flight writes over the
// limit allowed by the kv.transaction.max_intents_bytes setting.
@@ -1710,7 +2244,9 @@ func TestTxnPipelinerIgnoresLocksOnUnambiguousFailure(t *testing.T) {
require.Equal(t, expLocks, tp.lockFootprint.asSlice())
}
-// Test that the pipeliners knows how to save and restore its state.
+// TestTxnPipelinerSavepoints tests that the txnPipeliner knows how to save and
+// restore its state on savepoints creation and rollback, respectively. It also
+// attaches this savepoint-related state to QueryIntent requests that it sends.
func TestTxnPipelinerSavepoints(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
@@ -1720,17 +2256,17 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
initialSavepoint := savepoint{}
tp.createSavepointLocked(ctx, &initialSavepoint)
- tp.ifWrites.insert(roachpb.Key("a"), 10)
- tp.ifWrites.insert(roachpb.Key("b"), 11)
- tp.ifWrites.insert(roachpb.Key("c"), 12)
+ tp.ifWrites.insert(roachpb.Key("a"), 10, lock.Intent)
+ tp.ifWrites.insert(roachpb.Key("b"), 11, lock.Intent)
+ tp.ifWrites.insert(roachpb.Key("c"), 12, lock.Intent)
require.Equal(t, 3, tp.ifWrites.len())
s := savepoint{seqNum: enginepb.TxnSeq(13), active: true}
tp.createSavepointLocked(ctx, &s)
// Some more writes after the savepoint.
- tp.ifWrites.insert(roachpb.Key("c"), 14)
- tp.ifWrites.insert(roachpb.Key("d"), 15)
+ tp.ifWrites.insert(roachpb.Key("c"), 14, lock.Intent)
+ tp.ifWrites.insert(roachpb.Key("d"), 15, lock.Intent)
require.Equal(t, 5, tp.ifWrites.len())
require.Empty(t, tp.lockFootprint.asSlice())
@@ -1740,6 +2276,7 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: roachpb.Key("a")}})
+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 2)
require.False(t, ba.AsyncConsensus)
@@ -1749,12 +2286,15 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
qiReq := ba.Requests[0].GetQueryIntent()
require.Equal(t, roachpb.Key("a"), qiReq.Key)
require.Equal(t, enginepb.TxnSeq(10), qiReq.Txn.Sequence)
+ require.Equal(t, lock.Intent, qiReq.Strength)
+ require.Nil(t, qiReq.IgnoredSeqNums)
br := ba.CreateReply()
br.Txn = ba.Txn
br.Responses[0].GetQueryIntent().FoundIntent = true
return br, nil
})
+
br, pErr := tp.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
@@ -1764,6 +2304,7 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
// Now restore the savepoint and check that the in-flight write state has been restored
// and all rolled-back writes were moved to the lock footprint.
tp.rollbackToSavepointLocked(ctx, s)
+ txn.AddIgnoredSeqNumRange(enginepb.IgnoredSeqNumRange{Start: 13, End: 15})
// Check that the tracked inflight writes were updated correctly. The key that
// had been verified ("a") should have been taken out of the savepoint.
@@ -1773,8 +2314,8 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
})
require.Equal(t,
[]inFlightWrite{
- {SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("b"), Sequence: 11}},
- {SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("c"), Sequence: 12}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("b"), Sequence: 11, Strength: lock.Intent}},
+ {SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("c"), Sequence: 12, Strength: lock.Intent}},
},
ifWrites)
@@ -1789,9 +2330,40 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
},
tp.lockFootprint.asSlice())
- // Now rollback to the initial savepoint and check that all in-flight writes are gone.
+ // Verify another one of the writes. Verify that the new set of IgnoredSeqNums
+ // are attached.
+ ba.Requests = nil
+ ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: roachpb.Key("c")}})
+
+ mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
+ require.Len(t, ba.Requests, 2)
+ require.False(t, ba.AsyncConsensus)
+ require.IsType(t, &kvpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
+ require.IsType(t, &kvpb.GetRequest{}, ba.Requests[1].GetInner())
+
+ qiReq := ba.Requests[0].GetQueryIntent()
+ require.Equal(t, roachpb.Key("c"), qiReq.Key)
+ require.Equal(t, enginepb.TxnSeq(12), qiReq.Txn.Sequence)
+ require.Equal(t, lock.Intent, qiReq.Strength)
+ require.Equal(t, []enginepb.IgnoredSeqNumRange{{Start: 13, End: 15}}, qiReq.IgnoredSeqNums)
+
+ br = ba.CreateReply()
+ br.Txn = ba.Txn
+ br.Responses[0].GetQueryIntent().FoundIntent = true
+ return br, nil
+ })
+
+ br, pErr = tp.SendLocked(ctx, ba)
+ require.Nil(t, pErr)
+ require.NotNil(t, br)
+ require.Equal(t, 1, tp.ifWrites.len())
+ require.Equal(t, 4, len(tp.lockFootprint.asSlice()))
+
+ // Now rollback to the initial savepoint and check that all in-flight writes
+ // are gone.
tp.rollbackToSavepointLocked(ctx, initialSavepoint)
require.Empty(t, tp.ifWrites.len())
+ require.Equal(t, 4, len(tp.lockFootprint.asSlice()))
}
// TestTxnPipelinerCondenseLockSpans2 verifies that lock spans are condensed
@@ -1878,7 +2450,7 @@ func TestTxnPipelinerCondenseLockSpans2(t *testing.T) {
}
for _, k := range tc.ifWrites {
- tp.ifWrites.insert(roachpb.Key(k), 1)
+ tp.ifWrites.insert(roachpb.Key(k), 1, lock.Intent)
}
txn := makeTxnProto()
diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go
index c5620e2c969a..3164740e0c9f 100644
--- a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go
+++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go
@@ -83,6 +83,10 @@ type txnSeqNumAllocator struct {
func (s *txnSeqNumAllocator) SendLocked(
ctx context.Context, ba *kvpb.BatchRequest,
) (*kvpb.BatchResponse, *kvpb.Error) {
+ if err := s.checkReadSeqNotIgnoredLocked(ba); err != nil {
+ return nil, kvpb.NewError(err)
+ }
+
for _, ru := range ba.Requests {
req := ru.GetInner()
oldHeader := req.Header()
@@ -99,8 +103,7 @@ func (s *txnSeqNumAllocator) SendLocked(
// Notably, this includes Get/Scan/ReverseScan requests that acquire
// replicated locks, even though they go through raft.
if kvpb.IsIntentWrite(req) || req.Method() == kvpb.EndTxn {
- s.stepWriteSeqLocked()
- if err := s.maybeAutoStepReadSeqLocked(ctx); err != nil {
+ if err := s.stepWriteSeqLocked(ctx); err != nil {
return nil, kvpb.NewError(err)
}
oldHeader.Sequence = s.writeSeq
@@ -172,8 +175,20 @@ func (s *txnSeqNumAllocator) stepReadSeqLocked(ctx context.Context) error {
}
// stepWriteSeqLocked increments the write seqnum.
-func (s *txnSeqNumAllocator) stepWriteSeqLocked() {
+func (s *txnSeqNumAllocator) stepWriteSeqLocked(ctx context.Context) error {
s.writeSeq++
+ return s.maybeAutoStepReadSeqLocked(ctx)
+}
+
+// checkReadSeqNotIgnoredLocked verifies that the read seqnum is not in the
+// ignored seqnum list of the provided batch request.
+func (s *txnSeqNumAllocator) checkReadSeqNotIgnoredLocked(ba *kvpb.BatchRequest) error {
+ if enginepb.TxnSeqIsIgnored(s.readSeq, ba.Txn.IgnoredSeqNums) {
+ return errors.AssertionFailedf(
+ "read sequence number %d but sequence number is ignored %v after savepoint rollback",
+ s.readSeq, ba.Txn.IgnoredSeqNums)
+ }
+ return nil
}
// configureSteppingLocked configures the stepping mode.
diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go
index 9955b0dc7677..acfbd837ae7e 100644
--- a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go
+++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go
@@ -206,7 +206,7 @@ func TestSequenceNumberAllocationWithStep(t *testing.T) {
currentStepSeqNum := s.writeSeq
ba := &kvpb.BatchRequest{}
- ba.Requests = nil
+ ba.Header = kvpb.Header{Txn: &txn}
ba.Add(&kvpb.ConditionalPutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}})
ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}})
ba.Add(&kvpb.InitPutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}})
diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go
index 922033f010cd..3a5cad3be2ae 100644
--- a/pkg/kv/kvclient/kvstreamer/streamer.go
+++ b/pkg/kv/kvclient/kvstreamer/streamer.go
@@ -387,6 +387,14 @@ func NewStreamer(
if txn.Type() != kv.LeafTxn {
panic(errors.AssertionFailedf("RootTxn is given to the Streamer"))
}
+ if lockDurability == lock.Replicated {
+ // Replicated lock durability is not supported by the Streamer. If we want
+ // to support it in the future, we'll need to make sure we're not re-using
+ // request memory after the request has been sent. This is because
+ // replicated lock pipelining retains references to requests even after
+ // their response has been returned.
+ panic(errors.AssertionFailedf("Replicated lock durability is given to the Streamer"))
+ }
// sd can be nil in tests.
headOfLineOnlyFraction := 0.8
if sd != nil {
diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go
index c4c6e7f315ec..c2755c104319 100644
--- a/pkg/kv/kvpb/api.go
+++ b/pkg/kv/kvpb/api.go
@@ -84,8 +84,8 @@ var flagDependencies = map[flag][]flag{
isAdmin: {isAlone},
isLocking: {isTxn},
isIntentWrite: {isWrite, isLocking},
- canPipeline: {isIntentWrite},
- canParallelCommit: {canPipeline},
+ canPipeline: {isLocking},
+ canParallelCommit: {canPipeline, isIntentWrite},
appliesTSCache: {isWrite},
skipsLeaseCheck: {isAlone},
}
@@ -373,6 +373,8 @@ type Response interface {
Header() ResponseHeader
// SetHeader sets the response header.
SetHeader(ResponseHeader)
+ // ShallowCopy returns a shallow copy of the receiver.
+ ShallowCopy() Response
// Verify verifies response integrity, as applicable.
Verify(req Request) error
}
@@ -1299,6 +1301,294 @@ func (r *IsSpanEmptyRequest) ShallowCopy() Request {
return &shallowCopy
}
+// ShallowCopy implements the Response interface.
+func (gr *GetResponse) ShallowCopy() Response {
+ shallowCopy := *gr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (pr *PutResponse) ShallowCopy() Response {
+ shallowCopy := *pr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (cpr *ConditionalPutResponse) ShallowCopy() Response {
+ shallowCopy := *cpr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (pr *InitPutResponse) ShallowCopy() Response {
+ shallowCopy := *pr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (ir *IncrementResponse) ShallowCopy() Response {
+ shallowCopy := *ir
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (dr *DeleteResponse) ShallowCopy() Response {
+ shallowCopy := *dr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (drr *DeleteRangeResponse) ShallowCopy() Response {
+ shallowCopy := *drr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (crr *ClearRangeResponse) ShallowCopy() Response {
+ shallowCopy := *crr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (crr *RevertRangeResponse) ShallowCopy() Response {
+ shallowCopy := *crr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (sr *ScanResponse) ShallowCopy() Response {
+ shallowCopy := *sr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (rsr *ReverseScanResponse) ShallowCopy() Response {
+ shallowCopy := *rsr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (ccr *CheckConsistencyResponse) ShallowCopy() Response {
+ shallowCopy := *ccr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (etr *EndTxnResponse) ShallowCopy() Response {
+ shallowCopy := *etr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (asr *AdminSplitResponse) ShallowCopy() Response {
+ shallowCopy := *asr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (aur *AdminUnsplitResponse) ShallowCopy() Response {
+ shallowCopy := *aur
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (amr *AdminMergeResponse) ShallowCopy() Response {
+ shallowCopy := *amr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (atlr *AdminTransferLeaseResponse) ShallowCopy() Response {
+ shallowCopy := *atlr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (acrr *AdminChangeReplicasResponse) ShallowCopy() Response {
+ shallowCopy := *acrr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (acrr *AdminRelocateRangeResponse) ShallowCopy() Response {
+ shallowCopy := *acrr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (htr *HeartbeatTxnResponse) ShallowCopy() Response {
+ shallowCopy := *htr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (gcr *GCResponse) ShallowCopy() Response {
+ shallowCopy := *gcr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (ptr *PushTxnResponse) ShallowCopy() Response {
+ shallowCopy := *ptr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (rtr *RecoverTxnResponse) ShallowCopy() Response {
+ shallowCopy := *rtr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (qtr *QueryTxnResponse) ShallowCopy() Response {
+ shallowCopy := *qtr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (pir *QueryIntentResponse) ShallowCopy() Response {
+ shallowCopy := *pir
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (pir *QueryLocksResponse) ShallowCopy() Response {
+ shallowCopy := *pir
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (rir *ResolveIntentResponse) ShallowCopy() Response {
+ shallowCopy := *rir
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (rirr *ResolveIntentRangeResponse) ShallowCopy() Response {
+ shallowCopy := *rirr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (mr *MergeResponse) ShallowCopy() Response {
+ shallowCopy := *mr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (tlr *TruncateLogResponse) ShallowCopy() Response {
+ shallowCopy := *tlr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (rlr *RequestLeaseResponse) ShallowCopy() Response {
+ shallowCopy := *rlr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (r *ProbeResponse) ShallowCopy() Response {
+ shallowCopy := *r
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (lt *LeaseInfoResponse) ShallowCopy() Response {
+ shallowCopy := *lt
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (ccr *ComputeChecksumResponse) ShallowCopy() Response {
+ shallowCopy := *ccr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (ekr *ExportResponse) ShallowCopy() Response {
+ shallowCopy := *ekr
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (r *AdminScatterResponse) ShallowCopy() Response {
+ shallowCopy := *r
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (r *AddSSTableResponse) ShallowCopy() Response {
+ shallowCopy := *r
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (r *LinkExternalSSTableResponse) ShallowCopy() Response {
+ shallowCopy := *r
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (r *MigrateResponse) ShallowCopy() Response {
+ shallowCopy := *r
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (r *RecomputeStatsResponse) ShallowCopy() Response {
+ shallowCopy := *r
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (r *RefreshResponse) ShallowCopy() Response {
+ shallowCopy := *r
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (r *RefreshRangeResponse) ShallowCopy() Response {
+ shallowCopy := *r
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (r *SubsumeResponse) ShallowCopy() Response {
+ shallowCopy := *r
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (r *RangeStatsResponse) ShallowCopy() Response {
+ shallowCopy := *r
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (r *AdminVerifyProtectedTimestampResponse) ShallowCopy() Response {
+ shallowCopy := *r
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (r *QueryResolvedTimestampResponse) ShallowCopy() Response {
+ shallowCopy := *r
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (r *BarrierResponse) ShallowCopy() Response {
+ shallowCopy := *r
+ return &shallowCopy
+}
+
+// ShallowCopy implements the Response interface.
+func (r *IsSpanEmptyResponse) ShallowCopy() Response {
+ shallowCopy := *r
+ return &shallowCopy
+}
+
// NewLockingGet returns a Request initialized to get the value at key. A lock
// corresponding to the supplied lock strength and durability is acquired on the
// key, if it exists.
@@ -1581,7 +1871,7 @@ func flagForLockStrength(l lock.Strength) flag {
func flagForLockDurability(d lock.Durability) flag {
if d == lock.Replicated {
- return isWrite
+ return isWrite | canPipeline
}
return 0
}
diff --git a/pkg/kv/kvpb/api_test.go b/pkg/kv/kvpb/api_test.go
index 57ba6d72fda3..0c3898ecc72c 100644
--- a/pkg/kv/kvpb/api_test.go
+++ b/pkg/kv/kvpb/api_test.go
@@ -381,9 +381,12 @@ func TestFlagCombinations(t *testing.T) {
&AddSSTableRequest{SSTTimestampToRequestTimestamp: hlc.Timestamp{Logical: 1}},
&DeleteRangeRequest{Inline: true},
&DeleteRangeRequest{UseRangeTombstone: true},
- &GetRequest{KeyLockingStrength: lock.Exclusive},
- &ReverseScanRequest{KeyLockingStrength: lock.Exclusive},
- &ScanRequest{KeyLockingStrength: lock.Exclusive},
+ &GetRequest{KeyLockingStrength: lock.Shared, KeyLockingDurability: lock.Unreplicated},
+ &GetRequest{KeyLockingStrength: lock.Exclusive, KeyLockingDurability: lock.Replicated},
+ &ScanRequest{KeyLockingStrength: lock.Shared, KeyLockingDurability: lock.Unreplicated},
+ &ScanRequest{KeyLockingStrength: lock.Exclusive, KeyLockingDurability: lock.Replicated},
+ &ReverseScanRequest{KeyLockingStrength: lock.Shared, KeyLockingDurability: lock.Unreplicated},
+ &ReverseScanRequest{KeyLockingStrength: lock.Exclusive, KeyLockingDurability: lock.Replicated},
}
reqTypes := []Request{}
diff --git a/pkg/kv/kvpb/batch_test.go b/pkg/kv/kvpb/batch_test.go
index a3652769bba9..299193ed5852 100644
--- a/pkg/kv/kvpb/batch_test.go
+++ b/pkg/kv/kvpb/batch_test.go
@@ -78,6 +78,8 @@ func TestBatchIsCompleteTransaction(t *testing.T) {
func TestBatchSplit(t *testing.T) {
get := &GetRequest{}
+ ulget := &GetRequest{KeyLockingStrength: lock.Exclusive, KeyLockingDurability: lock.Unreplicated}
+ rlget := &GetRequest{KeyLockingStrength: lock.Exclusive, KeyLockingDurability: lock.Replicated}
scan := &ScanRequest{}
put := &PutRequest{}
spl := &AdminSplitRequest{}
@@ -91,6 +93,8 @@ func TestBatchSplit(t *testing.T) {
canSplitET bool
}{
{[]Request{get, put}, []int{1, 1}, true},
+ {[]Request{ulget, put}, []int{1, 1}, true},
+ {[]Request{rlget, put}, []int{2}, true},
{[]Request{put, et}, []int{1, 1}, true},
{[]Request{get, get, get, put, put, get, get}, []int{3, 2, 2}, true},
{[]Request{spl, get, scan, spl, get}, []int{1, 2, 1, 1}, true},
@@ -112,6 +116,9 @@ func TestBatchSplit(t *testing.T) {
// request that follows.
{[]Request{get, qi, put}, []int{1, 2}, true},
{[]Request{get, qi, qi, qi, qi, put}, []int{1, 5}, true},
+ {[]Request{qi, get, qi, put}, []int{2, 2}, true},
+ {[]Request{qi, ulget, qi, put}, []int{2, 2}, true},
+ {[]Request{qi, rlget, qi, put}, []int{4}, true},
{[]Request{qi, get, qi, get, qi, get, qi, put, qi, put, qi, get, qi, get}, []int{6, 4, 4}, true},
{[]Request{qi, spl, qi, get, scan, qi, qi, spl, qi, get}, []int{1, 1, 5, 1, 2}, true},
{[]Request{scan, qi, qi, qi, et}, []int{4, 1}, true},
diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go
index 70568cac05a6..c693b1747631 100644
--- a/pkg/kv/kvserver/replica_raft.go
+++ b/pkg/kv/kvserver/replica_raft.go
@@ -211,9 +211,15 @@ func (r *Replica) evalAndPropose(
maybeFinishSpan = proposal.sp.Finish
}
- // Signal the proposal's response channel immediately.
+ // Signal the proposal's response channel immediately. Return a shallow-ish
+ // copy of the response to avoid aliasing issues if the client mutates the
+ // batch response header or individual response headers before replication
+ // completes.
reply := *proposal.Local.Reply
reply.Responses = append([]kvpb.ResponseUnion(nil), reply.Responses...)
+ for i, ru := range reply.Responses {
+ reply.Responses[i].MustSetInner(ru.GetInner().ShallowCopy())
+ }
pr := makeProposalResult(&reply, nil /* pErr */, proposal.Local.DetachEncounteredIntents(), nil /* eti */)
proposal.signalProposalResult(pr)
diff --git a/pkg/kv/kvserver/txnrecovery/BUILD.bazel b/pkg/kv/kvserver/txnrecovery/BUILD.bazel
index 91f27ec67d87..18e61a00a8d1 100644
--- a/pkg/kv/kvserver/txnrecovery/BUILD.bazel
+++ b/pkg/kv/kvserver/txnrecovery/BUILD.bazel
@@ -11,7 +11,6 @@ go_library(
deps = [
"//pkg/kv",
"//pkg/kv/kvpb",
- "//pkg/kv/kvserver/concurrency/lock",
"//pkg/roachpb",
"//pkg/util/hlc",
"//pkg/util/log",
diff --git a/pkg/kv/kvserver/txnrecovery/manager.go b/pkg/kv/kvserver/txnrecovery/manager.go
index e3a527fab019..ec0c021b580a 100644
--- a/pkg/kv/kvserver/txnrecovery/manager.go
+++ b/pkg/kv/kvserver/txnrecovery/manager.go
@@ -16,7 +16,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
- "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -206,9 +205,8 @@ func (m *manager) resolveIndeterminateCommitForTxnProbe(
RequestHeader: kvpb.RequestHeader{
Key: w.Key,
},
- Txn: meta,
- // TODO(nvanbenschoten): pass in the correct lock strength here.
- Strength: lock.Intent,
+ Txn: meta,
+ Strength: w.Strength,
IgnoredSeqNums: txn.IgnoredSeqNums,
})
}
diff --git a/pkg/roachpb/data.proto b/pkg/roachpb/data.proto
index 067b86a108aa..de880c1299c4 100644
--- a/pkg/roachpb/data.proto
+++ b/pkg/roachpb/data.proto
@@ -641,6 +641,8 @@ message SequencedWrite {
// The sequence number of the request that created the write.
int32 sequence = 2 [
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/storage/enginepb.TxnSeq"];
+ // The strength with which the lock was acquired at.
+ kv.kvserver.concurrency.lock.Strength strength = 3;
}
// LeaseAcquisitionType indicates the type of lease acquisition event that
diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go
index e8c018c89214..1a53f435797e 100644
--- a/pkg/sql/conn_executor_exec.go
+++ b/pkg/sql/conn_executor_exec.go
@@ -1755,6 +1755,9 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
if err := ex.state.mu.txn.PrepareForPartialRetry(ctx); err != nil {
return err
}
+ if err := ex.state.mu.txn.Step(ctx, false /* allowReadTimestampStep */); err != nil {
+ return err
+ }
ex.state.mu.autoRetryCounter++
ex.state.mu.autoRetryReason = txnRetryErr
}
diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go
index eaae9ba8e46f..8eeceb36aad3 100644
--- a/pkg/sql/row/kv_batch_fetcher.go
+++ b/pkg/sql/row/kv_batch_fetcher.go
@@ -632,9 +632,16 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error {
// after making sure to nil out the requests in order to lose references to
// the underlying Get and Scan requests which could keep large byte slices
// alive.
- f.reqsScratch = ba.Requests
- for i := range f.reqsScratch {
- f.reqsScratch[i] = kvpb.RequestUnion{}
+ //
+ // However, we do not re-use the requests slice if we're using the replicated
+ // lock durability, since the requests may be pipelined through raft, which
+ // causes references to the requests to be retained even after their response
+ // has been returned.
+ if f.lockDurability != lock.Replicated {
+ f.reqsScratch = ba.Requests
+ for i := range f.reqsScratch {
+ f.reqsScratch[i] = kvpb.RequestUnion{}
+ }
}
if monitoring {
reqsScratchMemUsage := requestUnionOverhead * int64(cap(f.reqsScratch))