Skip to content

Commit

Permalink
kv: auto-step read sequence on savepoint rollback
Browse files Browse the repository at this point in the history
Fixes #121752.
Fixes #121748.

This commit fixes the txnSeqNumAllocator to conditionally (based on the
stepping mode) auto-step the transaction's read sequence number to the
write sequence number on savepoint rollbacks.

The commit also adds in an assertion into the txnSeqNumAllocator that
the current read sequence is never part of a batch's ignored sequence
number list. This helps us detect the cases that were leading to
assertion errors in #121752 much faster.

Release note: None
  • Loading branch information
nvanbenschoten committed Apr 4, 2024
1 parent b8b598c commit 4847089
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 6 deletions.
8 changes: 6 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ func (tc *TxnCoordSender) CreateSavepoint(ctx context.Context) (kv.SavepointToke
// TODO(nvanbenschoten): once #113765 is resolved, we should make this
// unconditional and push it into txnSeqNumAllocator.createSavepointLocked.
if tc.interceptorAlloc.txnPipeliner.hasAcquiredLocks() {
tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked()
if err := tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked(ctx); err != nil {
return nil, err
}
}

s := &savepoint{
Expand Down Expand Up @@ -159,7 +161,9 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin
enginepb.IgnoredSeqNumRange{
Start: sp.seqNum, End: tc.interceptorAlloc.txnSeqNumAllocator.writeSeq,
})
tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked()
if err := tc.interceptorAlloc.txnSeqNumAllocator.stepWriteSeqLocked(ctx); err != nil {
return err
}
}

return nil
Expand Down
21 changes: 18 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ type txnSeqNumAllocator struct {
func (s *txnSeqNumAllocator) SendLocked(
ctx context.Context, ba *kvpb.BatchRequest,
) (*kvpb.BatchResponse, *kvpb.Error) {
if err := s.checkReadSeqNotIgnoredLocked(ba); err != nil {
return nil, kvpb.NewError(err)
}

for _, ru := range ba.Requests {
req := ru.GetInner()
oldHeader := req.Header()
Expand All @@ -99,8 +103,7 @@ func (s *txnSeqNumAllocator) SendLocked(
// Notably, this includes Get/Scan/ReverseScan requests that acquire
// replicated locks, even though they go through raft.
if kvpb.IsIntentWrite(req) || req.Method() == kvpb.EndTxn {
s.stepWriteSeqLocked()
if err := s.maybeAutoStepReadSeqLocked(ctx); err != nil {
if err := s.stepWriteSeqLocked(ctx); err != nil {
return nil, kvpb.NewError(err)
}
oldHeader.Sequence = s.writeSeq
Expand Down Expand Up @@ -172,8 +175,20 @@ func (s *txnSeqNumAllocator) stepReadSeqLocked(ctx context.Context) error {
}

// stepWriteSeqLocked increments the write seqnum.
func (s *txnSeqNumAllocator) stepWriteSeqLocked() {
func (s *txnSeqNumAllocator) stepWriteSeqLocked(ctx context.Context) error {
s.writeSeq++
return s.maybeAutoStepReadSeqLocked(ctx)
}

// checkReadSeqNotIgnoredLocked verifies that the read seqnum is not in the
// ignored seqnum list of the provided batch request.
func (s *txnSeqNumAllocator) checkReadSeqNotIgnoredLocked(ba *kvpb.BatchRequest) error {
if enginepb.TxnSeqIsIgnored(s.readSeq, ba.Txn.IgnoredSeqNums) {
return errors.AssertionFailedf(
"read sequence number %d but sequence number is ignored %v after savepoint rollback",
s.readSeq, ba.Txn.IgnoredSeqNums)
}
return nil
}

// configureSteppingLocked configures the stepping mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func TestSequenceNumberAllocationWithStep(t *testing.T) {
currentStepSeqNum := s.writeSeq

ba := &kvpb.BatchRequest{}
ba.Requests = nil
ba.Header = kvpb.Header{Txn: &txn}
ba.Add(&kvpb.ConditionalPutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}})
ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}})
ba.Add(&kvpb.InitPutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}})
Expand Down

0 comments on commit 4847089

Please sign in to comment.