Skip to content

Commit

Permalink
Merge pull request #121503 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.1-121458

release-24.1: kv: give savepoints distinct start and end sequence numbers
  • Loading branch information
nvanbenschoten authored Apr 3, 2024
2 parents 4a623b2 + 83af90f commit 5215fc4
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 42 deletions.
44 changes: 22 additions & 22 deletions pkg/kv/kvclient/kvcoord/testdata/savepoints
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ get k

savepoint x
----
1 <noignore>
2 <noignore>

put k b
----
Expand All @@ -28,7 +28,7 @@ true

release x
----
2 <noignore>
3 <noignore>

get k
----
Expand Down Expand Up @@ -66,7 +66,7 @@ get k

savepoint x
----
1 <noignore>
2 <noignore>

put k b
----
Expand All @@ -81,7 +81,7 @@ true

rollback x
----
2 [2-2]
4 [2-3]

get k
----
Expand Down Expand Up @@ -114,14 +114,14 @@ put k ar

savepoint x
----
1 <noignore>
2 <noignore>

put k br
----

savepoint y
----
2 <noignore>
4 <noignore>

put k cr
----
Expand All @@ -132,7 +132,7 @@ true

release y
----
3 <noignore>
5 <noignore>

put k dr
----
Expand All @@ -143,7 +143,7 @@ true

rollback x
----
4 [2-4]
7 [2-6]

get k
----
Expand Down Expand Up @@ -179,7 +179,7 @@ put b d1

savepoint x
----
2 <noignore>
3 <noignore>

put a d2
----
Expand All @@ -190,14 +190,14 @@ true

rollback x
----
3 [3-3]
5 [3-4]

put c d1
----

savepoint x
----
4 [3-3]
7 [3-4]

put b 2
----
Expand All @@ -208,7 +208,7 @@ true

rollback x
----
5 [3-3][5-5]
9 [3-4][7-8]

put d 1
----
Expand Down Expand Up @@ -270,23 +270,23 @@ put k nop

savepoint x
----
1 <noignore>
2 <noignore>

can-use x
----
true

rollback x
----
1 <noignore>
3 [2-2]

can-use x
----
true

release x
----
1 <noignore>
3 [2-2]

commit
----
Expand Down Expand Up @@ -322,11 +322,11 @@ true

rollback x
----
1 [1-1]
2 [0-1]

rollback x
----
1 [1-1]
3 [0-2]

get k
----
Expand All @@ -341,7 +341,7 @@ true

rollback x
----
2 [1-2]
5 [0-4]

commit
----
Expand Down Expand Up @@ -419,7 +419,7 @@ true

rollback x
----
1 [1-1]
1 <noignore>

subtest end

Expand Down Expand Up @@ -468,7 +468,7 @@ true

rollback x
----
1 [1-1]
1 <noignore>

subtest end

Expand Down Expand Up @@ -517,7 +517,7 @@ true

rollback x
----
1 [1-1]
1 <noignore>

subtest end

Expand Down Expand Up @@ -600,7 +600,7 @@ put k a

savepoint x
----
1 <noignore>
2 <noignore>

retry
----
Expand Down
33 changes: 22 additions & 11 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,7 @@ type savepoint struct {

// seqNum represents the write seq num at the time the savepoint was created.
// On rollback, it configures the txn to ignore all seqnums from this value
// until the most recent seqnum.
// TODO(nvanbenschoten): this field is currently defined to be an exclusive
// lower bound, with the assumption that any writes performed after the
// savepoint is established will use a higher sequence number. This probably
// isn't working correctly with shared and exclusive lock acquisition, which
// don't increment the writeSeq. We should increment the writeSeq when a
// savepoint is established and then consider this an inclusive lower bound.
// (inclusive) until the most recent seqnum (inclusive).
seqNum enginepb.TxnSeq

// txnSpanRefresher fields.
Expand Down Expand Up @@ -90,6 +84,16 @@ func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (kv.SavepointToke
return &initialSavepoint, nil
}

// If the transaction has acquired any locks, increment the write sequence on
// savepoint creation and assign this sequence to the savepoint. This allows
// us to distinguish between all operations (writes and locking reads) that
// happened before the savepoint and those that happened after.
// TODO(nvanbenschoten): once #113765 is resolved, we should make this
// unconditional and push it into txnSeqNumAllocator.createSavepointLocked.
if tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() {
tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked()
}

s := &savepoint{
active: true, // we've handled the not-active case above
txnID: tc.mu.txn.ID,
Expand Down Expand Up @@ -142,13 +146,20 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin
reqInt.rollbackToSavepointLocked(ctx, *sp)
}

// If there's been any more writes since the savepoint was created, they'll
// need to be ignored.
if sp.seqNum < tc.interceptorAlloc.txnSeqNumAllocator.writeSeq {
// If the transaction has acquired any locks (before or after the savepoint),
// ignore all seqnums from the beginning of the savepoint (inclusive) until
// the most recent seqnum (inclusive). Then increment the write sequence to
// differentiate all future operations from this ignored sequence number
// range.
// TODO(nvanbenschoten): once #113765 is resolved, we should make this
// unconditional and push the write sequence increment into
// txnSeqNumAllocator.rollbackToSavepointLocked.
if tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() {
tc.mu.txn.AddIgnoredSeqNumRange(
enginepb.IgnoredSeqNumRange{
Start: sp.seqNum + 1, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
Start: sp.seqNum, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
})
tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked()
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ func (tp *txnPipeliner) rollbackToSavepointLocked(ctx context.Context, s savepoi
var writesToDelete []*inFlightWrite
needCollecting := !s.Initial()
tp.ifWrites.ascend(func(w *inFlightWrite) {
if w.Sequence > s.seqNum {
if w.Sequence >= s.seqNum {
tp.lockFootprint.insert(roachpb.Span{Key: w.Key})
if needCollecting {
writesToDelete = append(writesToDelete, w)
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1725,12 +1725,12 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
tp.ifWrites.insert(roachpb.Key("c"), 12)
require.Equal(t, 3, tp.ifWrites.len())

s := savepoint{seqNum: enginepb.TxnSeq(12), active: true}
s := savepoint{seqNum: enginepb.TxnSeq(13), active: true}
tp.createSavepointLocked(ctx, &s)

// Some more writes after the savepoint.
tp.ifWrites.insert(roachpb.Key("c"), 13)
tp.ifWrites.insert(roachpb.Key("d"), 14)
tp.ifWrites.insert(roachpb.Key("c"), 14)
tp.ifWrites.insert(roachpb.Key("d"), 15)
require.Equal(t, 5, tp.ifWrites.len())
require.Empty(t, tp.lockFootprint.asSlice())

Expand Down
15 changes: 11 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type txnSeqNumAllocator struct {

// writeSeq is the current write seqnum, i.e. the value last assigned
// to a write operation in a batch. It remains at 0 until the first
// write operation is encountered.
// write or savepoint operation is encountered.
writeSeq enginepb.TxnSeq

// readSeq is the sequence number at which to perform read-only operations.
Expand Down Expand Up @@ -99,7 +99,7 @@ func (s *txnSeqNumAllocator) SendLocked(
// Notably, this includes Get/Scan/ReverseScan requests that acquire
// replicated locks, even though they go through raft.
if kvpb.IsIntentWrite(req) || req.Method() == kvpb.EndTxn {
s.writeSeq++
s.stepWriteSeqLocked()
if err := s.maybeAutoStepReadSeqLocked(ctx); err != nil {
return nil, kvpb.NewError(err)
}
Expand Down Expand Up @@ -171,6 +171,11 @@ func (s *txnSeqNumAllocator) stepReadSeqLocked(ctx context.Context) error {
return nil
}

// stepWriteSeqLocked increments the write seqnum.
func (s *txnSeqNumAllocator) stepWriteSeqLocked() {
s.writeSeq++
}

// configureSteppingLocked configures the stepping mode.
//
// When enabling stepping from the non-enabled state, the read seqnum
Expand Down Expand Up @@ -205,9 +210,11 @@ func (s *txnSeqNumAllocator) createSavepointLocked(ctx context.Context, sp *save
}

// rollbackToSavepointLocked is part of the txnInterceptor interface.
func (*txnSeqNumAllocator) rollbackToSavepointLocked(context.Context, savepoint) {
func (s *txnSeqNumAllocator) rollbackToSavepointLocked(context.Context, savepoint) {
// Nothing to restore. The seq nums keep increasing. The TxnCoordSender has
// added a range of sequence numbers to the ignored list.
// added a range of sequence numbers to the ignored list. It may have also
// manually stepped the write seqnum to distinguish the ignored range from
// any future operations.
}

// closeLocked is part of the txnInterceptor interface.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,5 +526,5 @@ func TestSequenceNumberAllocationSavepoint(t *testing.T) {

sp := &savepoint{}
s.createSavepointLocked(ctx, sp)
require.Equal(t, enginepb.TxnSeq(2), sp.seqNum)
require.Equal(t, enginepb.TxnSeq(2), s.writeSeq)
}

0 comments on commit 5215fc4

Please sign in to comment.