diff --git a/pkg/kv/kvclient/kvcoord/testdata/savepoints b/pkg/kv/kvclient/kvcoord/testdata/savepoints index b356c20c62b8..9b2d6c7ec561 100644 --- a/pkg/kv/kvclient/kvcoord/testdata/savepoints +++ b/pkg/kv/kvclient/kvcoord/testdata/savepoints @@ -13,7 +13,7 @@ get k savepoint x ---- -1 +2 put k b ---- @@ -28,7 +28,7 @@ true release x ---- -2 +3 get k ---- @@ -66,7 +66,7 @@ get k savepoint x ---- -1 +2 put k b ---- @@ -81,7 +81,7 @@ true rollback x ---- -2 [2-2] +4 [2-3] get k ---- @@ -114,14 +114,14 @@ put k ar savepoint x ---- -1 +2 put k br ---- savepoint y ---- -2 +4 put k cr ---- @@ -132,7 +132,7 @@ true release y ---- -3 +5 put k dr ---- @@ -143,7 +143,7 @@ true rollback x ---- -4 [2-4] +7 [2-6] get k ---- @@ -179,7 +179,7 @@ put b d1 savepoint x ---- -2 +3 put a d2 ---- @@ -190,14 +190,14 @@ true rollback x ---- -3 [3-3] +5 [3-4] put c d1 ---- savepoint x ---- -4 [3-3] +7 [3-4] put b 2 ---- @@ -208,7 +208,7 @@ true rollback x ---- -5 [3-3][5-5] +9 [3-4][7-8] put d 1 ---- @@ -270,7 +270,7 @@ put k nop savepoint x ---- -1 +2 can-use x ---- @@ -278,7 +278,7 @@ true rollback x ---- -1 +3 [2-2] can-use x ---- @@ -286,7 +286,7 @@ true release x ---- -1 +3 [2-2] commit ---- @@ -322,11 +322,11 @@ true rollback x ---- -1 [1-1] +2 [0-1] rollback x ---- -1 [1-1] +3 [0-2] get k ---- @@ -341,7 +341,7 @@ true rollback x ---- -2 [1-2] +5 [0-4] commit ---- @@ -419,7 +419,7 @@ true rollback x ---- -1 [1-1] +1 subtest end @@ -468,7 +468,7 @@ true rollback x ---- -1 [1-1] +1 subtest end @@ -517,7 +517,7 @@ true rollback x ---- -1 [1-1] +1 subtest end @@ -600,7 +600,7 @@ put k a savepoint x ---- -1 +2 retry ---- diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go index cc87c678a323..58280149d302 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go @@ -39,13 +39,7 @@ type savepoint struct { // seqNum represents the write seq num at the time the savepoint was created. // On rollback, it configures the txn to ignore all seqnums from this value - // until the most recent seqnum. - // TODO(nvanbenschoten): this field is currently defined to be an exclusive - // lower bound, with the assumption that any writes performed after the - // savepoint is established will use a higher sequence number. This probably - // isn't working correctly with shared and exclusive lock acquisition, which - // don't increment the writeSeq. We should increment the writeSeq when a - // savepoint is established and then consider this an inclusive lower bound. + // (inclusive) until the most recent seqnum (inclusive). seqNum enginepb.TxnSeq // txnSpanRefresher fields. @@ -90,6 +84,14 @@ func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (kv.SavepointToke return &initialSavepoint, nil } + // If the transaction has acquired any locks, increment the write sequence on + // savepoint creation and assign this sequence to the savepoint. This allows + // us to distinguish between all operations (writes and locking reads) that + // happened before the savepoint and those that happened after. + if tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() { + tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked() + } + s := &savepoint{ active: true, // we've handled the not-active case above txnID: tc.mu.txn.ID, @@ -142,13 +144,17 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin reqInt.rollbackToSavepointLocked(ctx, *sp) } - // If there's been any more writes since the savepoint was created, they'll - // need to be ignored. - if sp.seqNum < tc.interceptorAlloc.txnSeqNumAllocator.writeSeq { + // If the transaction has acquired any locks (before or after the savepoint), + // ignore all seqnums from the beginning of the savepoint (inclusive) until + // the most recent seqnum (inclusive). Then increment the write sequence to + // differentiate all future operations from this ignored sequence number + // range. + if tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() { tc.mu.txn.AddIgnoredSeqNumRange( enginepb.IgnoredSeqNumRange{ - Start: sp.seqNum + 1, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq, + Start: sp.seqNum, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq, }) + tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked() } return nil diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index d277dea1b07b..20482c1172d4 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -891,7 +891,7 @@ func (tp *txnPipeliner) rollbackToSavepointLocked(ctx context.Context, s savepoi var writesToDelete []*inFlightWrite needCollecting := !s.Initial() tp.ifWrites.ascend(func(w *inFlightWrite) { - if w.Sequence > s.seqNum { + if w.Sequence >= s.seqNum { tp.lockFootprint.insert(roachpb.Span{Key: w.Key}) if needCollecting { writesToDelete = append(writesToDelete, w) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index de76168b3e7f..c35a89904287 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -1725,12 +1725,12 @@ func TestTxnPipelinerSavepoints(t *testing.T) { tp.ifWrites.insert(roachpb.Key("c"), 12) require.Equal(t, 3, tp.ifWrites.len()) - s := savepoint{seqNum: enginepb.TxnSeq(12), active: true} + s := savepoint{seqNum: enginepb.TxnSeq(13), active: true} tp.createSavepointLocked(ctx, &s) // Some more writes after the savepoint. - tp.ifWrites.insert(roachpb.Key("c"), 13) - tp.ifWrites.insert(roachpb.Key("d"), 14) + tp.ifWrites.insert(roachpb.Key("c"), 14) + tp.ifWrites.insert(roachpb.Key("d"), 15) require.Equal(t, 5, tp.ifWrites.len()) require.Empty(t, tp.lockFootprint.asSlice()) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go index 1583f4ac6071..c5620e2c969a 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go @@ -62,7 +62,7 @@ type txnSeqNumAllocator struct { // writeSeq is the current write seqnum, i.e. the value last assigned // to a write operation in a batch. It remains at 0 until the first - // write operation is encountered. + // write or savepoint operation is encountered. writeSeq enginepb.TxnSeq // readSeq is the sequence number at which to perform read-only operations. @@ -99,7 +99,7 @@ func (s *txnSeqNumAllocator) SendLocked( // Notably, this includes Get/Scan/ReverseScan requests that acquire // replicated locks, even though they go through raft. if kvpb.IsIntentWrite(req) || req.Method() == kvpb.EndTxn { - s.writeSeq++ + s.stepWriteSeqLocked() if err := s.maybeAutoStepReadSeqLocked(ctx); err != nil { return nil, kvpb.NewError(err) } @@ -171,6 +171,11 @@ func (s *txnSeqNumAllocator) stepReadSeqLocked(ctx context.Context) error { return nil } +// stepWriteSeqLocked increments the write seqnum. +func (s *txnSeqNumAllocator) stepWriteSeqLocked() { + s.writeSeq++ +} + // configureSteppingLocked configures the stepping mode. // // When enabling stepping from the non-enabled state, the read seqnum @@ -205,9 +210,11 @@ func (s *txnSeqNumAllocator) createSavepointLocked(ctx context.Context, sp *save } // rollbackToSavepointLocked is part of the txnInterceptor interface. -func (*txnSeqNumAllocator) rollbackToSavepointLocked(context.Context, savepoint) { +func (s *txnSeqNumAllocator) rollbackToSavepointLocked(context.Context, savepoint) { // Nothing to restore. The seq nums keep increasing. The TxnCoordSender has - // added a range of sequence numbers to the ignored list. + // added a range of sequence numbers to the ignored list. It may have also + // manually stepped the write seqnum to distinguish the ignored range from + // any future operations. } // closeLocked is part of the txnInterceptor interface. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go index f9fcd025e09d..9955b0dc7677 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go @@ -526,5 +526,5 @@ func TestSequenceNumberAllocationSavepoint(t *testing.T) { sp := &savepoint{} s.createSavepointLocked(ctx, sp) - require.Equal(t, enginepb.TxnSeq(2), sp.seqNum) + require.Equal(t, enginepb.TxnSeq(2), s.writeSeq) }