From 83af90f5ff025c6f7e9f9437eac996fb37750739 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 1 Apr 2024 14:42:23 +0000 Subject: [PATCH] kv: give savepoints distinct start and end sequence numbers This commit increments a transaction's write sequence number on savepoint creation and rollback. This ensures that savepoints have distinct start and end sequence numbers, which is necessary distinguish between all operations (writes and locking reads) that happened before the savepoint creation, those that happened within the savepoint, and those that happened after the savepoint rollback. This avoids the problem described in the removed TODO. That hypothesized problem is real. Without this change, #121088 runs into trouble with the following sequence of operations: ```sql create table kv (k int primary key, v int); insert into kv values (1, 2); begin isolation level read committed; insert into kv values (2, 2); savepoint s1; insert into kv values (3, 2); rollback to s1; select * from kv where k = 1 for update; commit; ERROR: internal error: QueryIntent request for lock at sequence number 2 but sequence number is ignored [{2 2}] ``` Epic: None Release note: None --- pkg/kv/kvclient/kvcoord/testdata/savepoints | 44 +++++++++---------- .../kvcoord/txn_coord_sender_savepoints.go | 33 +++++++++----- .../kvcoord/txn_interceptor_pipeliner.go | 2 +- .../kvcoord/txn_interceptor_pipeliner_test.go | 6 +-- .../txn_interceptor_seq_num_allocator.go | 15 +++++-- .../txn_interceptor_seq_num_allocator_test.go | 2 +- 6 files changed, 60 insertions(+), 42 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/testdata/savepoints b/pkg/kv/kvclient/kvcoord/testdata/savepoints index b356c20c62b8..9b2d6c7ec561 100644 --- a/pkg/kv/kvclient/kvcoord/testdata/savepoints +++ b/pkg/kv/kvclient/kvcoord/testdata/savepoints @@ -13,7 +13,7 @@ get k savepoint x ---- -1 +2 put k b ---- @@ -28,7 +28,7 @@ true release x ---- -2 +3 get k ---- @@ -66,7 +66,7 @@ get k savepoint x ---- -1 +2 put k b ---- @@ -81,7 +81,7 @@ true rollback x ---- -2 [2-2] +4 [2-3] get k ---- @@ -114,14 +114,14 @@ put k ar savepoint x ---- -1 +2 put k br ---- savepoint y ---- -2 +4 put k cr ---- @@ -132,7 +132,7 @@ true release y ---- -3 +5 put k dr ---- @@ -143,7 +143,7 @@ true rollback x ---- -4 [2-4] +7 [2-6] get k ---- @@ -179,7 +179,7 @@ put b d1 savepoint x ---- -2 +3 put a d2 ---- @@ -190,14 +190,14 @@ true rollback x ---- -3 [3-3] +5 [3-4] put c d1 ---- savepoint x ---- -4 [3-3] +7 [3-4] put b 2 ---- @@ -208,7 +208,7 @@ true rollback x ---- -5 [3-3][5-5] +9 [3-4][7-8] put d 1 ---- @@ -270,7 +270,7 @@ put k nop savepoint x ---- -1 +2 can-use x ---- @@ -278,7 +278,7 @@ true rollback x ---- -1 +3 [2-2] can-use x ---- @@ -286,7 +286,7 @@ true release x ---- -1 +3 [2-2] commit ---- @@ -322,11 +322,11 @@ true rollback x ---- -1 [1-1] +2 [0-1] rollback x ---- -1 [1-1] +3 [0-2] get k ---- @@ -341,7 +341,7 @@ true rollback x ---- -2 [1-2] +5 [0-4] commit ---- @@ -419,7 +419,7 @@ true rollback x ---- -1 [1-1] +1 subtest end @@ -468,7 +468,7 @@ true rollback x ---- -1 [1-1] +1 subtest end @@ -517,7 +517,7 @@ true rollback x ---- -1 [1-1] +1 subtest end @@ -600,7 +600,7 @@ put k a savepoint x ---- -1 +2 retry ---- diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go index cc87c678a323..2224d8fc90a2 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go @@ -39,13 +39,7 @@ type savepoint struct { // seqNum represents the write seq num at the time the savepoint was created. // On rollback, it configures the txn to ignore all seqnums from this value - // until the most recent seqnum. - // TODO(nvanbenschoten): this field is currently defined to be an exclusive - // lower bound, with the assumption that any writes performed after the - // savepoint is established will use a higher sequence number. This probably - // isn't working correctly with shared and exclusive lock acquisition, which - // don't increment the writeSeq. We should increment the writeSeq when a - // savepoint is established and then consider this an inclusive lower bound. + // (inclusive) until the most recent seqnum (inclusive). seqNum enginepb.TxnSeq // txnSpanRefresher fields. @@ -90,6 +84,16 @@ func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (kv.SavepointToke return &initialSavepoint, nil } + // If the transaction has acquired any locks, increment the write sequence on + // savepoint creation and assign this sequence to the savepoint. This allows + // us to distinguish between all operations (writes and locking reads) that + // happened before the savepoint and those that happened after. + // TODO(nvanbenschoten): once #113765 is resolved, we should make this + // unconditional and push it into txnSeqNumAllocator.createSavepointLocked. + if tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() { + tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked() + } + s := &savepoint{ active: true, // we've handled the not-active case above txnID: tc.mu.txn.ID, @@ -142,13 +146,20 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin reqInt.rollbackToSavepointLocked(ctx, *sp) } - // If there's been any more writes since the savepoint was created, they'll - // need to be ignored. - if sp.seqNum < tc.interceptorAlloc.txnSeqNumAllocator.writeSeq { + // If the transaction has acquired any locks (before or after the savepoint), + // ignore all seqnums from the beginning of the savepoint (inclusive) until + // the most recent seqnum (inclusive). Then increment the write sequence to + // differentiate all future operations from this ignored sequence number + // range. + // TODO(nvanbenschoten): once #113765 is resolved, we should make this + // unconditional and push the write sequence increment into + // txnSeqNumAllocator.rollbackToSavepointLocked. + if tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() { tc.mu.txn.AddIgnoredSeqNumRange( enginepb.IgnoredSeqNumRange{ - Start: sp.seqNum + 1, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq, + Start: sp.seqNum, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq, }) + tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked() } return nil diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index d277dea1b07b..20482c1172d4 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -891,7 +891,7 @@ func (tp *txnPipeliner) rollbackToSavepointLocked(ctx context.Context, s savepoi var writesToDelete []*inFlightWrite needCollecting := !s.Initial() tp.ifWrites.ascend(func(w *inFlightWrite) { - if w.Sequence > s.seqNum { + if w.Sequence >= s.seqNum { tp.lockFootprint.insert(roachpb.Span{Key: w.Key}) if needCollecting { writesToDelete = append(writesToDelete, w) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index de76168b3e7f..c35a89904287 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -1725,12 +1725,12 @@ func TestTxnPipelinerSavepoints(t *testing.T) { tp.ifWrites.insert(roachpb.Key("c"), 12) require.Equal(t, 3, tp.ifWrites.len()) - s := savepoint{seqNum: enginepb.TxnSeq(12), active: true} + s := savepoint{seqNum: enginepb.TxnSeq(13), active: true} tp.createSavepointLocked(ctx, &s) // Some more writes after the savepoint. - tp.ifWrites.insert(roachpb.Key("c"), 13) - tp.ifWrites.insert(roachpb.Key("d"), 14) + tp.ifWrites.insert(roachpb.Key("c"), 14) + tp.ifWrites.insert(roachpb.Key("d"), 15) require.Equal(t, 5, tp.ifWrites.len()) require.Empty(t, tp.lockFootprint.asSlice()) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go index 1583f4ac6071..c5620e2c969a 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go @@ -62,7 +62,7 @@ type txnSeqNumAllocator struct { // writeSeq is the current write seqnum, i.e. the value last assigned // to a write operation in a batch. It remains at 0 until the first - // write operation is encountered. + // write or savepoint operation is encountered. writeSeq enginepb.TxnSeq // readSeq is the sequence number at which to perform read-only operations. @@ -99,7 +99,7 @@ func (s *txnSeqNumAllocator) SendLocked( // Notably, this includes Get/Scan/ReverseScan requests that acquire // replicated locks, even though they go through raft. if kvpb.IsIntentWrite(req) || req.Method() == kvpb.EndTxn { - s.writeSeq++ + s.stepWriteSeqLocked() if err := s.maybeAutoStepReadSeqLocked(ctx); err != nil { return nil, kvpb.NewError(err) } @@ -171,6 +171,11 @@ func (s *txnSeqNumAllocator) stepReadSeqLocked(ctx context.Context) error { return nil } +// stepWriteSeqLocked increments the write seqnum. +func (s *txnSeqNumAllocator) stepWriteSeqLocked() { + s.writeSeq++ +} + // configureSteppingLocked configures the stepping mode. // // When enabling stepping from the non-enabled state, the read seqnum @@ -205,9 +210,11 @@ func (s *txnSeqNumAllocator) createSavepointLocked(ctx context.Context, sp *save } // rollbackToSavepointLocked is part of the txnInterceptor interface. -func (*txnSeqNumAllocator) rollbackToSavepointLocked(context.Context, savepoint) { +func (s *txnSeqNumAllocator) rollbackToSavepointLocked(context.Context, savepoint) { // Nothing to restore. The seq nums keep increasing. The TxnCoordSender has - // added a range of sequence numbers to the ignored list. + // added a range of sequence numbers to the ignored list. It may have also + // manually stepped the write seqnum to distinguish the ignored range from + // any future operations. } // closeLocked is part of the txnInterceptor interface. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go index f9fcd025e09d..9955b0dc7677 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator_test.go @@ -526,5 +526,5 @@ func TestSequenceNumberAllocationSavepoint(t *testing.T) { sp := &savepoint{} s.createSavepointLocked(ctx, sp) - require.Equal(t, enginepb.TxnSeq(2), sp.seqNum) + require.Equal(t, enginepb.TxnSeq(2), s.writeSeq) }