Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.1: kv: give savepoints distinct start and end sequence numbers #121503

Merged
merged 1 commit into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions pkg/kv/kvclient/kvcoord/testdata/savepoints
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ get k

savepoint x
----
1 <noignore>
2 <noignore>

put k b
----
Expand All @@ -28,7 +28,7 @@ true

release x
----
2 <noignore>
3 <noignore>

get k
----
Expand Down Expand Up @@ -66,7 +66,7 @@ get k

savepoint x
----
1 <noignore>
2 <noignore>

put k b
----
Expand All @@ -81,7 +81,7 @@ true

rollback x
----
2 [2-2]
4 [2-3]

get k
----
Expand Down Expand Up @@ -114,14 +114,14 @@ put k ar

savepoint x
----
1 <noignore>
2 <noignore>

put k br
----

savepoint y
----
2 <noignore>
4 <noignore>

put k cr
----
Expand All @@ -132,7 +132,7 @@ true

release y
----
3 <noignore>
5 <noignore>

put k dr
----
Expand All @@ -143,7 +143,7 @@ true

rollback x
----
4 [2-4]
7 [2-6]

get k
----
Expand Down Expand Up @@ -179,7 +179,7 @@ put b d1

savepoint x
----
2 <noignore>
3 <noignore>

put a d2
----
Expand All @@ -190,14 +190,14 @@ true

rollback x
----
3 [3-3]
5 [3-4]

put c d1
----

savepoint x
----
4 [3-3]
7 [3-4]

put b 2
----
Expand All @@ -208,7 +208,7 @@ true

rollback x
----
5 [3-3][5-5]
9 [3-4][7-8]

put d 1
----
Expand Down Expand Up @@ -270,23 +270,23 @@ put k nop

savepoint x
----
1 <noignore>
2 <noignore>

can-use x
----
true

rollback x
----
1 <noignore>
3 [2-2]

can-use x
----
true

release x
----
1 <noignore>
3 [2-2]

commit
----
Expand Down Expand Up @@ -322,11 +322,11 @@ true

rollback x
----
1 [1-1]
2 [0-1]

rollback x
----
1 [1-1]
3 [0-2]

get k
----
Expand All @@ -341,7 +341,7 @@ true

rollback x
----
2 [1-2]
5 [0-4]

commit
----
Expand Down Expand Up @@ -419,7 +419,7 @@ true

rollback x
----
1 [1-1]
1 <noignore>

subtest end

Expand Down Expand Up @@ -468,7 +468,7 @@ true

rollback x
----
1 [1-1]
1 <noignore>

subtest end

Expand Down Expand Up @@ -517,7 +517,7 @@ true

rollback x
----
1 [1-1]
1 <noignore>

subtest end

Expand Down Expand Up @@ -600,7 +600,7 @@ put k a

savepoint x
----
1 <noignore>
2 <noignore>

retry
----
Expand Down
33 changes: 22 additions & 11 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,7 @@ type savepoint struct {

// seqNum represents the write seq num at the time the savepoint was created.
// On rollback, it configures the txn to ignore all seqnums from this value
// until the most recent seqnum.
// TODO(nvanbenschoten): this field is currently defined to be an exclusive
// lower bound, with the assumption that any writes performed after the
// savepoint is established will use a higher sequence number. This probably
// isn't working correctly with shared and exclusive lock acquisition, which
// don't increment the writeSeq. We should increment the writeSeq when a
// savepoint is established and then consider this an inclusive lower bound.
// (inclusive) until the most recent seqnum (inclusive).
seqNum enginepb.TxnSeq

// txnSpanRefresher fields.
Expand Down Expand Up @@ -90,6 +84,16 @@ func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (kv.SavepointToke
return &initialSavepoint, nil
}

// If the transaction has acquired any locks, increment the write sequence on
// savepoint creation and assign this sequence to the savepoint. This allows
// us to distinguish between all operations (writes and locking reads) that
// happened before the savepoint and those that happened after.
// TODO(nvanbenschoten): once #113765 is resolved, we should make this
// unconditional and push it into txnSeqNumAllocator.createSavepointLocked.
if tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() {
tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked()
}

s := &savepoint{
active: true, // we've handled the not-active case above
txnID: tc.mu.txn.ID,
Expand Down Expand Up @@ -142,13 +146,20 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin
reqInt.rollbackToSavepointLocked(ctx, *sp)
}

// If there's been any more writes since the savepoint was created, they'll
// need to be ignored.
if sp.seqNum < tc.interceptorAlloc.txnSeqNumAllocator.writeSeq {
// If the transaction has acquired any locks (before or after the savepoint),
// ignore all seqnums from the beginning of the savepoint (inclusive) until
// the most recent seqnum (inclusive). Then increment the write sequence to
// differentiate all future operations from this ignored sequence number
// range.
// TODO(nvanbenschoten): once #113765 is resolved, we should make this
// unconditional and push the write sequence increment into
// txnSeqNumAllocator.rollbackToSavepointLocked.
if tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() {
tc.mu.txn.AddIgnoredSeqNumRange(
enginepb.IgnoredSeqNumRange{
Start: sp.seqNum + 1, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
Start: sp.seqNum, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
})
tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked()
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ func (tp *txnPipeliner) rollbackToSavepointLocked(ctx context.Context, s savepoi
var writesToDelete []*inFlightWrite
needCollecting := !s.Initial()
tp.ifWrites.ascend(func(w *inFlightWrite) {
if w.Sequence > s.seqNum {
if w.Sequence >= s.seqNum {
tp.lockFootprint.insert(roachpb.Span{Key: w.Key})
if needCollecting {
writesToDelete = append(writesToDelete, w)
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1725,12 +1725,12 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
tp.ifWrites.insert(roachpb.Key("c"), 12)
require.Equal(t, 3, tp.ifWrites.len())

s := savepoint{seqNum: enginepb.TxnSeq(12), active: true}
s := savepoint{seqNum: enginepb.TxnSeq(13), active: true}
tp.createSavepointLocked(ctx, &s)

// Some more writes after the savepoint.
tp.ifWrites.insert(roachpb.Key("c"), 13)
tp.ifWrites.insert(roachpb.Key("d"), 14)
tp.ifWrites.insert(roachpb.Key("c"), 14)
tp.ifWrites.insert(roachpb.Key("d"), 15)
require.Equal(t, 5, tp.ifWrites.len())
require.Empty(t, tp.lockFootprint.asSlice())

Expand Down
15 changes: 11 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type txnSeqNumAllocator struct {

// writeSeq is the current write seqnum, i.e. the value last assigned
// to a write operation in a batch. It remains at 0 until the first
// write operation is encountered.
// write or savepoint operation is encountered.
writeSeq enginepb.TxnSeq

// readSeq is the sequence number at which to perform read-only operations.
Expand Down Expand Up @@ -99,7 +99,7 @@ func (s *txnSeqNumAllocator) SendLocked(
// Notably, this includes Get/Scan/ReverseScan requests that acquire
// replicated locks, even though they go through raft.
if kvpb.IsIntentWrite(req) || req.Method() == kvpb.EndTxn {
s.writeSeq++
s.stepWriteSeqLocked()
if err := s.maybeAutoStepReadSeqLocked(ctx); err != nil {
return nil, kvpb.NewError(err)
}
Expand Down Expand Up @@ -171,6 +171,11 @@ func (s *txnSeqNumAllocator) stepReadSeqLocked(ctx context.Context) error {
return nil
}

// stepWriteSeqLocked increments the write seqnum.
func (s *txnSeqNumAllocator) stepWriteSeqLocked() {
s.writeSeq++
}

// configureSteppingLocked configures the stepping mode.
//
// When enabling stepping from the non-enabled state, the read seqnum
Expand Down Expand Up @@ -205,9 +210,11 @@ func (s *txnSeqNumAllocator) createSavepointLocked(ctx context.Context, sp *save
}

// rollbackToSavepointLocked is part of the txnInterceptor interface.
func (*txnSeqNumAllocator) rollbackToSavepointLocked(context.Context, savepoint) {
func (s *txnSeqNumAllocator) rollbackToSavepointLocked(context.Context, savepoint) {
// Nothing to restore. The seq nums keep increasing. The TxnCoordSender has
// added a range of sequence numbers to the ignored list.
// added a range of sequence numbers to the ignored list. It may have also
// manually stepped the write seqnum to distinguish the ignored range from
// any future operations.
}

// closeLocked is part of the txnInterceptor interface.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,5 +526,5 @@ func TestSequenceNumberAllocationSavepoint(t *testing.T) {

sp := &savepoint{}
s.createSavepointLocked(ctx, sp)
require.Equal(t, enginepb.TxnSeq(2), sp.seqNum)
require.Equal(t, enginepb.TxnSeq(2), s.writeSeq)
}
Loading