Skip to content

Commit

Permalink
kv: give savepoints distinct start and end sequence numbers
Browse files Browse the repository at this point in the history
This commit increments a transaction's write sequence number on
savepoint creation and rollback. This ensures that savepoints have
distinct start and end sequence numbers, which is necessary distinguish
between all operations (writes and locking reads) that happened before
the savepoint creation, those that happened within the savepoint, and
those that happened after the savepoint rollback.

This avoids the problem described in the removed TODO. That hypothesized
problem is real. Without this change, #121088 runs into trouble with the
following sequence of operations:
```sql
create table kv (k int primary key, v int);
insert into kv values (1, 2);

begin isolation level read committed;
insert into kv values (2, 2);
savepoint s1;
insert into kv values (3, 2);
rollback to s1;
select * from kv where k = 1 for update;
commit;

ERROR: internal error: QueryIntent request for lock at sequence number 2 but sequence number is ignored [{2 2}]
```

Epic: None
Release note: None
  • Loading branch information
nvanbenschoten committed Apr 1, 2024
1 parent 2a5e231 commit d6f902e
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 42 deletions.
44 changes: 22 additions & 22 deletions pkg/kv/kvclient/kvcoord/testdata/savepoints
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ get k

savepoint x
----
1 <noignore>
2 <noignore>

put k b
----
Expand All @@ -28,7 +28,7 @@ true

release x
----
2 <noignore>
3 <noignore>

get k
----
Expand Down Expand Up @@ -66,7 +66,7 @@ get k

savepoint x
----
1 <noignore>
2 <noignore>

put k b
----
Expand All @@ -81,7 +81,7 @@ true

rollback x
----
2 [2-2]
4 [2-3]

get k
----
Expand Down Expand Up @@ -114,14 +114,14 @@ put k ar

savepoint x
----
1 <noignore>
2 <noignore>

put k br
----

savepoint y
----
2 <noignore>
4 <noignore>

put k cr
----
Expand All @@ -132,7 +132,7 @@ true

release y
----
3 <noignore>
5 <noignore>

put k dr
----
Expand All @@ -143,7 +143,7 @@ true

rollback x
----
4 [2-4]
7 [2-6]

get k
----
Expand Down Expand Up @@ -179,7 +179,7 @@ put b d1

savepoint x
----
2 <noignore>
3 <noignore>

put a d2
----
Expand All @@ -190,14 +190,14 @@ true

rollback x
----
3 [3-3]
5 [3-4]

put c d1
----

savepoint x
----
4 [3-3]
7 [3-4]

put b 2
----
Expand All @@ -208,7 +208,7 @@ true

rollback x
----
5 [3-3][5-5]
9 [3-4][7-8]

put d 1
----
Expand Down Expand Up @@ -270,23 +270,23 @@ put k nop

savepoint x
----
1 <noignore>
2 <noignore>

can-use x
----
true

rollback x
----
1 <noignore>
3 [2-2]

can-use x
----
true

release x
----
1 <noignore>
3 [2-2]

commit
----
Expand Down Expand Up @@ -322,11 +322,11 @@ true

rollback x
----
1 [1-1]
2 [0-1]

rollback x
----
1 [1-1]
3 [0-2]

get k
----
Expand All @@ -341,7 +341,7 @@ true

rollback x
----
2 [1-2]
5 [0-4]

commit
----
Expand Down Expand Up @@ -419,7 +419,7 @@ true

rollback x
----
1 [1-1]
1 <noignore>

subtest end

Expand Down Expand Up @@ -468,7 +468,7 @@ true

rollback x
----
1 [1-1]
1 <noignore>

subtest end

Expand Down Expand Up @@ -517,7 +517,7 @@ true

rollback x
----
1 [1-1]
1 <noignore>

subtest end

Expand Down Expand Up @@ -600,7 +600,7 @@ put k a

savepoint x
----
1 <noignore>
2 <noignore>

retry
----
Expand Down
33 changes: 22 additions & 11 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,7 @@ type savepoint struct {

// seqNum represents the write seq num at the time the savepoint was created.
// On rollback, it configures the txn to ignore all seqnums from this value
// until the most recent seqnum.
// TODO(nvanbenschoten): this field is currently defined to be an exclusive
// lower bound, with the assumption that any writes performed after the
// savepoint is established will use a higher sequence number. This probably
// isn't working correctly with shared and exclusive lock acquisition, which
// don't increment the writeSeq. We should increment the writeSeq when a
// savepoint is established and then consider this an inclusive lower bound.
// (inclusive) until the most recent seqnum (inclusive).
seqNum enginepb.TxnSeq

// txnSpanRefresher fields.
Expand Down Expand Up @@ -90,6 +84,16 @@ func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (kv.SavepointToke
return &initialSavepoint, nil
}

// If the transaction has acquired any locks, increment the write sequence on
// savepoint creation and assign this sequence to the savepoint. This allows
// us to distinguish between all operations (writes and locking reads) that
// happened before the savepoint and those that happened after.
// TODO(nvanbenschoten): once #113765 is resolved, we should make this
// unconditional and push it into txnSeqNumAllocator.createSavepointLocked.
if tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() {
tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked()
}

s := &savepoint{
active: true, // we've handled the not-active case above
txnID: tc.mu.txn.ID,
Expand Down Expand Up @@ -142,13 +146,20 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin
reqInt.rollbackToSavepointLocked(ctx, *sp)
}

// If there's been any more writes since the savepoint was created, they'll
// need to be ignored.
if sp.seqNum < tc.interceptorAlloc.txnSeqNumAllocator.writeSeq {
// If the transaction has acquired any locks (before or after the savepoint),
// ignore all seqnums from the beginning of the savepoint (inclusive) until
// the most recent seqnum (inclusive). Then increment the write sequence to
// differentiate all future operations from this ignored sequence number
// range.
// TODO(nvanbenschoten): once #113765 is resolved, we should make this
// unconditional and push the write sequence increment into
// txnSeqNumAllocator.rollbackToSavepointLocked.
if tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() {
tc.mu.txn.AddIgnoredSeqNumRange(
enginepb.IgnoredSeqNumRange{
Start: sp.seqNum + 1, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
Start: sp.seqNum, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
})
tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked()
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ func (tp *txnPipeliner) rollbackToSavepointLocked(ctx context.Context, s savepoi
var writesToDelete []*inFlightWrite
needCollecting := !s.Initial()
tp.ifWrites.ascend(func(w *inFlightWrite) {
if w.Sequence > s.seqNum {
if w.Sequence >= s.seqNum {
tp.lockFootprint.insert(roachpb.Span{Key: w.Key})
if needCollecting {
writesToDelete = append(writesToDelete, w)
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1725,12 +1725,12 @@ func TestTxnPipelinerSavepoints(t *testing.T) {
tp.ifWrites.insert(roachpb.Key("c"), 12)
require.Equal(t, 3, tp.ifWrites.len())

s := savepoint{seqNum: enginepb.TxnSeq(12), active: true}
s := savepoint{seqNum: enginepb.TxnSeq(13), active: true}
tp.createSavepointLocked(ctx, &s)

// Some more writes after the savepoint.
tp.ifWrites.insert(roachpb.Key("c"), 13)
tp.ifWrites.insert(roachpb.Key("d"), 14)
tp.ifWrites.insert(roachpb.Key("c"), 14)
tp.ifWrites.insert(roachpb.Key("d"), 15)
require.Equal(t, 5, tp.ifWrites.len())
require.Empty(t, tp.lockFootprint.asSlice())

Expand Down
15 changes: 11 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type txnSeqNumAllocator struct {

// writeSeq is the current write seqnum, i.e. the value last assigned
// to a write operation in a batch. It remains at 0 until the first
// write operation is encountered.
// write or savepoint operation is encountered.
writeSeq enginepb.TxnSeq

// readSeq is the sequence number at which to perform read-only operations.
Expand Down Expand Up @@ -99,7 +99,7 @@ func (s *txnSeqNumAllocator) SendLocked(
// Notably, this includes Get/Scan/ReverseScan requests that acquire
// replicated locks, even though they go through raft.
if kvpb.IsIntentWrite(req) || req.Method() == kvpb.EndTxn {
s.writeSeq++
s.stepWriteSeqLocked()
if err := s.maybeAutoStepReadSeqLocked(ctx); err != nil {
return nil, kvpb.NewError(err)
}
Expand Down Expand Up @@ -171,6 +171,11 @@ func (s *txnSeqNumAllocator) stepReadSeqLocked(ctx context.Context) error {
return nil
}

// stepWriteSeqLocked increments the write seqnum.
func (s *txnSeqNumAllocator) stepWriteSeqLocked() {
s.writeSeq++
}

// configureSteppingLocked configures the stepping mode.
//
// When enabling stepping from the non-enabled state, the read seqnum
Expand Down Expand Up @@ -205,9 +210,11 @@ func (s *txnSeqNumAllocator) createSavepointLocked(ctx context.Context, sp *save
}

// rollbackToSavepointLocked is part of the txnInterceptor interface.
func (*txnSeqNumAllocator) rollbackToSavepointLocked(context.Context, savepoint) {
func (s *txnSeqNumAllocator) rollbackToSavepointLocked(context.Context, savepoint) {
// Nothing to restore. The seq nums keep increasing. The TxnCoordSender has
// added a range of sequence numbers to the ignored list.
// added a range of sequence numbers to the ignored list. It may have also
// manually stepped the write seqnum to distinguish the ignored range from
// any future operations.
}

// closeLocked is part of the txnInterceptor interface.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,5 +526,5 @@ func TestSequenceNumberAllocationSavepoint(t *testing.T) {

sp := &savepoint{}
s.createSavepointLocked(ctx, sp)
require.Equal(t, enginepb.TxnSeq(2), sp.seqNum)
require.Equal(t, enginepb.TxnSeq(2), s.writeSeq)
}

0 comments on commit d6f902e

Please sign in to comment.