diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index f287f81d5867..4cd79acd8aca 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -274,12 +274,25 @@ func EndTxn( return result.FromEndTxn(reply.Txn, true /* alwaysReturn */, args.Poison), roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_ABORTED_RECORD_FOUND) - case roachpb.PENDING, roachpb.STAGING: + case roachpb.PENDING: if h.Txn.Epoch < reply.Txn.Epoch { return result.Result{}, errors.AssertionFailedf( "programming error: epoch regression: %d", h.Txn.Epoch) } + case roachpb.STAGING: + if h.Txn.Epoch < reply.Txn.Epoch { + return result.Result{}, errors.AssertionFailedf( + "programming error: epoch regression: %d", h.Txn.Epoch) + } + if h.Txn.Epoch > reply.Txn.Epoch { + // If the EndTxn carries a newer epoch than a STAGING txn record, we do + // not consider the transaction to be performing a parallel commit and + // potentially already implicitly committed because we know that the + // transaction restarted since entering the STAGING state. + reply.Txn.Status = roachpb.PENDING + } + default: return result.Result{}, errors.AssertionFailedf("bad txn status: %s", reply.Txn) } @@ -318,6 +331,40 @@ func EndTxn( // Else, the transaction can be explicitly committed. reply.Txn.Status = roachpb.COMMITTED } else { + // If the transaction is STAGING, we can only move it to ABORTED if it is + // *not* already implicitly committed. On the commit path, the transaction + // coordinator is deliberate to only ever issue an EndTxn(commit) once the + // transaction has reached an implicit commit state. However, on the + // rollback path, the transaction coordinator does not make the opposite + // guarantee that it will never issue an EndTxn(abort) once the transaction + // has reached (or if it still could reach) an implicit commit state. + // + // As a result, on the rollback path, we don't trust the transaction's + // coordinator to be an authoritative source of truth about whether the + // transaction is implicitly committed. In other words, we don't consider + // this EndTxn(abort) to be a claim that the transaction is not implicitly + // committed. The transaction's coordinator may have just given up on the + // transaction before it heard the outcome of a commit attempt. So in this + // case, we return an IndeterminateCommitError to trigger the transaction + // recovery protocol and transition the transaction record to a finalized + // state (COMMITTED or ABORTED). + // + // Interestingly, because intents are not currently resolved until after an + // implicitly committed transaction has been moved to an explicit commit + // state (i.e. its record has moved from STAGING to COMMITTED), no other + // transaction could see the effect of an implicitly committed transaction + // that was erroneously rolled back. This means that such a mistake does not + // actually compromise atomicity. Regardless, such a transition is confusing + // and can cause errors in transaction recovery code. We would also like to + // begin resolving intents earlier, while a transaction is still implicitly + // committed. Doing so is only possible if we can guarantee that under no + // circumstances can an implicitly committed transaction be rolled back. + if reply.Txn.Status == roachpb.STAGING { + err := roachpb.NewIndeterminateCommitError(*reply.Txn) + log.VEventf(ctx, 1, "%v", err) + return result.Result{}, err + } + reply.Txn.Status = roachpb.ABORTED } diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go index 04b095bdff12..19cb34232cda 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go @@ -611,15 +611,20 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) { }, { // Standard case where a transaction is rolled back. The record - // already exists because of a failed parallel commit attempt. - name: "record staging, try rollback", + // already exists because of a failed parallel commit attempt in + // the same epoch. + // + // The rollback is not considered an authoritative indication that the + // transaction is not implicitly committed, so an indeterminate commit + // error is returned to force transaction recovery to be performed. + name: "record staging, try rollback at same epoch", // Replica state. existingTxn: stagingRecord, // Request state. headerTxn: headerTxn, commit: false, // Expected result. - expTxn: abortedRecord, + expError: "found txn in indeterminate STAGING state", }, { // Standard case where a transaction record is created during a diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index c09c251d2b5e..6066941e6603 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -489,7 +489,9 @@ func (r *Replica) executeBatchWithConcurrencyRetries( latchSpans, lockSpans = g.TakeSpanSets() r.concMgr.FinishReq(g) g = nil - // Then launch a task to handle the indeterminate commit error. + // Then launch a task to handle the indeterminate commit error. No error + // is returned if the transaction is recovered successfully to either a + // COMMITTED or ABORTED state. if pErr = r.handleIndeterminateCommitError(ctx, ba, pErr, t); pErr != nil { return nil, pErr } @@ -693,7 +695,7 @@ func (r *Replica) handleIndeterminateCommitError( newPErr.Index = pErr.Index return newPErr } - // We've recovered the transaction that blocked the push; retry command. + // We've recovered the transaction that blocked the request; retry. return nil } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 887d3f3fca52..00a01415ccfa 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -11432,6 +11432,11 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) { }, { // Case of a rollback after an unsuccessful implicit commit. + // The rollback request is not considered an authoritative indication that + // the transaction is not implicitly committed (i.e. the txn coordinator + // may have given up on the txn before it heard the result of a commit one + // way or another), so an IndeterminateCommitError is returned to force + // transaction recovery to be performed. name: "end transaction (abort) after end transaction (stage)", setup: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { et, etH := endTxnArgs(txn, true /* commit */) @@ -11442,6 +11447,27 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) { et, etH := endTxnArgs(txn, false /* commit */) return sendWrappedWithErr(etH, &et) }, + expError: "found txn in indeterminate STAGING state", + expTxn: txnWithStagingStatusAndInFlightWrites, + }, + { + // Case of a rollback after an unsuccessful implicit commit and txn + // restart. Because the rollback request uses a newer epoch than the + // staging record, it is considered an authoritative indication that the + // transaction was never implicitly committed and was later restarted, so + // the rollback succeeds and the record is immediately aborted. + name: "end transaction (abort) with epoch bump after end transaction (stage)", + setup: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { + et, etH := endTxnArgs(txn, true /* commit */) + et.InFlightWrites = inFlightWrites + return sendWrappedWithErr(etH, &et) + }, + run: func(txn *roachpb.Transaction, now hlc.Timestamp) error { + clone := txn.Clone() + clone.Restart(-1, 0, now) + et, etH := endTxnArgs(clone, false /* commit */) + return sendWrappedWithErr(etH, &et) + }, // The transaction record will be eagerly GC-ed. expTxn: noTxnRecord, }, @@ -11461,17 +11487,25 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) { expTxn: noTxnRecord, }, { - name: "end transaction (abort) without eager gc after end transaction (stage)", + // Case of a rollback after an unsuccessful implicit commit. + name: "end transaction (abort) with epoch bump, without eager gc after end transaction (stage)", setup: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { et, etH := endTxnArgs(txn, true /* commit */) et.InFlightWrites = inFlightWrites return sendWrappedWithErr(etH, &et) }, - run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { - et, etH := endTxnArgs(txn, false /* commit */) + run: func(txn *roachpb.Transaction, now hlc.Timestamp) error { + clone := txn.Clone() + clone.Restart(-1, 0, now) + et, etH := endTxnArgs(clone, false /* commit */) return sendWrappedWithErr(etH, &et) }, - expTxn: txnWithStatus(roachpb.ABORTED), + expTxn: func(txn *roachpb.Transaction, now hlc.Timestamp) roachpb.TransactionRecord { + record := txnWithStatus(roachpb.ABORTED)(txn, now) + record.Epoch = txn.Epoch + 1 + record.WriteTimestamp.Forward(now) + return record + }, disableTxnAutoGC: true, }, { diff --git a/pkg/kv/txn_external_test.go b/pkg/kv/txn_external_test.go index c85c8933f336..da8e14af2678 100644 --- a/pkg/kv/txn_external_test.go +++ b/pkg/kv/txn_external_test.go @@ -52,6 +52,11 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) { // If txnStatus == COMMITTED, setting txnRecordCleanedUp will make us // cleanup the transaction. The txn record will be deleted. txnRecordCleanedUp bool + // If txnStatus == STAGING, setting txnImplicitlyCommitted will make + // the transaction qualify for the implicit commit condition. Otherwise, + // the transaction will not qualify because the test will attach a fake + // in-flight write to the transaction's STAGING record. + txnImplictlyCommitted bool // The error that we expect from txn.Rollback(). expRollbackErr string // Is the transaction expected to be committed or not after the @@ -76,42 +81,109 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) { expRollbackErr: "already committed", }, { - name: "STAGING", - txnStatus: roachpb.STAGING, - // The rollback succeeds. This behavior is undersired. See #48301. + name: "STAGING, implicitly committed", + txnStatus: roachpb.STAGING, + txnImplictlyCommitted: true, + // The rollback fails after performing txn recovery and moving the + // transaction to committed. + expCommitted: true, + expRollbackErr: "already committed", + }, + { + name: "STAGING, not implicitly committed", + txnStatus: roachpb.STAGING, + txnImplictlyCommitted: false, + // The rollback succeeds after performing txn recovery and moving the + // transaction to aborted. + expCommitted: false, + expRollbackErr: "", + }, + { + name: "PENDING", + txnStatus: roachpb.PENDING, + // The rollback succeeds. expCommitted: false, expRollbackErr: "", }, } for _, testCase := range testCases { + // Sanity-check test cases. if testCase.txnRecordCleanedUp { require.Equal(t, roachpb.COMMITTED, testCase.txnStatus) } + if testCase.txnImplictlyCommitted { + require.Equal(t, roachpb.STAGING, testCase.txnStatus) + } + t.Run(testCase.name, func(t *testing.T) { var filterSet int64 var key roachpb.Key commitBlocked := make(chan struct{}) + onCommitReqFilter := func( + ba roachpb.BatchRequest, fn func(et *roachpb.EndTxnRequest) *roachpb.Error, + ) *roachpb.Error { + if atomic.LoadInt64(&filterSet) == 0 { + return nil + } + req, ok := ba.GetArg(roachpb.EndTxn) + if !ok { + return nil + } + et := req.(*roachpb.EndTxnRequest) + if et.Key.Equal(key) && et.Commit { + return fn(et) + } + return nil + } + blockCommitReqFilter := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + return onCommitReqFilter(ba, func(et *roachpb.EndTxnRequest) *roachpb.Error { + // Inform the test that the commit is blocked. + commitBlocked <- struct{}{} + // Block until the client interrupts the commit. The client will + // cancel its context, at which point gRPC will return an error + // to the client and marshall the cancelation to the server. + <-ctx.Done() + return roachpb.NewError(ctx.Err()) + }) + } + addInFlightWriteToCommitReqFilter := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + return onCommitReqFilter(ba, func(et *roachpb.EndTxnRequest) *roachpb.Error { + // Add a fake in-flight write. + et.InFlightWrites = append(et.InFlightWrites, roachpb.SequencedWrite{ + Key: key.Next(), Sequence: et.Sequence, + }) + // Be sure to update the EndTxn and Txn's sequence accordingly. + et.Sequence++ + ba.Txn.Sequence++ + return nil + }) + } args := base.TestServerArgs{ Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ // We're going to block the commit of the test's txn, letting the - // test cancel the request's ctx while the request is blocked. - TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error { - if atomic.LoadInt64(&filterSet) == 0 { - return nil + // test cancel the request's ctx while the request is blocked. We + // do this either before or after the request completes, depending + // on the status that the test wants the txn record to be in when + // the rollback is performed. + TestingRequestFilter: func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + if testCase.txnStatus == roachpb.PENDING { + // Block and reject before the request writes the txn record. + return blockCommitReqFilter(ctx, ba) } - req, ok := ba.GetArg(roachpb.EndTxn) - if !ok { - return nil + if testCase.txnStatus == roachpb.STAGING && !testCase.txnImplictlyCommitted { + // If the test wants the upcoming rollback to find a STAGING + // record for a transaction that is not implicitly committed, + // add an in-flight write for a (key, sequence) that does not + // exist to the EndTxn request. + _ = addInFlightWriteToCommitReqFilter(ctx, ba) } - commit := req.(*roachpb.EndTxnRequest) - if commit.Key.Equal(key) && commit.Commit { - // Inform the test that the commit is blocked. - commitBlocked <- struct{}{} - // Block until the client interrupts the commit. The client will - // cancel its context, at which point gRPC will return an error - // to the client and marshall the cancelation to the server. - <-ctx.Done() + return nil + }, + TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error { + if testCase.txnStatus != roachpb.PENDING { + // Block and reject after the request writes the txn record. + return blockCommitReqFilter(ctx, ba) } return nil }, @@ -186,7 +258,7 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) { // If the test wants the upcoming rollback to find a COMMITTED record, // we'll perform transaction recovery. This will leave the transaction in // the COMMITTED state, without cleaning it up. - if !testCase.txnRecordCleanedUp && testCase.txnStatus == roachpb.COMMITTED { + if testCase.txnStatus == roachpb.COMMITTED && !testCase.txnRecordCleanedUp { // Sanity check - verify that the txn is STAGING. txnProto := txn.TestingCloneTxn() queryTxn := roachpb.QueryTxnRequest{