From c24ce09e39c75137e384ed0ded5e8ea1ec221517 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 26 Jan 2022 23:37:21 -0500 Subject: [PATCH] kv: perform recovery on rollback of staging transaction record Fixes #48301. During a transaction rollback (i.e. an `EndTxn(abort)`), if the request arrives to find that the transaction record is STAGING, it can only move it to ABORTED if it is *not* already implicitly committed. On the commit path, the transaction coordinator is deliberate to only ever issue an `EndTxn(commit)` once the transaction has reached an implicit commit state. However, on the rollback path, the transaction coordinator does not make the opposite guarantee that it will never issue an `EndTxn(abort)` once the transaction has reached (or if it still could reach) an implicit commit state. As a result, on the rollback path, we don't trust the transaction's coordinator to be an authoritative source of truth about whether the transaction is implicitly committed. In other words, we don't consider this `EndTxn(abort)` to be a claim that the transaction is not implicitly committed. The transaction's coordinator may have just given up on the transaction before it heard the outcome of a commit attempt. So in this case, we now return an `IndeterminateCommitError` during evaluation to trigger the transaction recovery protocol and transition the transaction record to a finalized state (COMMITTED or ABORTED). Prior to this change, we were blindly trusting the `EndTxn(abort)` to be such an authoritative source of truth, so we were at risk of hazards where a transaction was implicitly committed but its coordinator did not know and issued a rollback. This was observed to cause workloads to hit "found ABORTED record for implicitly committed transaction" errors under sufficient network fault injection. This was one of the two alternatives described in #48301. The other option was to lock down the txn client to attempt to make the guarantee that rollbacks will only be performed if the client is certain that the transaction is not currently in and can no longer ever enter the implicit commit state. I was previously drawn to this other approach because it would avoid the need for transaction recovery during the rollback of a staging txn. However, I don't think it's possible to make such a guarantee in all cases due to the possibility of ambiguity. To eliminate this ambiguity, there are cases where a transaction's coordinator would need to query the result of writes, which turns out to be analogous to the transaction recovery protocol. So instead of trying to make the guarantee in all cases, I'd rather make rollbacks safe in all cases and then later explore selectively opting in to skipping txn recovery in specific cases where the client can definitively guarantee that the it is rolling back a staging transaction that is not implicitly committed. I don't expect this to be needed immediately. Release note (bug fix): fixed a rare race condition that could lead to client-visible errors that looked like "found ABORTED record for implicitly committed transaction". These errors were harmless in that they did not indicate data corruption, but they could be disruptive to clients. --- .../kvserver/batcheval/cmd_end_transaction.go | 49 +++++++- .../batcheval/cmd_end_transaction_test.go | 11 +- pkg/kv/kvserver/replica_send.go | 6 +- pkg/kv/kvserver/replica_test.go | 42 ++++++- pkg/kv/txn_external_test.go | 110 +++++++++++++++--- 5 files changed, 189 insertions(+), 29 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index f287f81d5867..4cd79acd8aca 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -274,12 +274,25 @@ func EndTxn( return result.FromEndTxn(reply.Txn, true /* alwaysReturn */, args.Poison), roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_ABORTED_RECORD_FOUND) - case roachpb.PENDING, roachpb.STAGING: + case roachpb.PENDING: if h.Txn.Epoch < reply.Txn.Epoch { return result.Result{}, errors.AssertionFailedf( "programming error: epoch regression: %d", h.Txn.Epoch) } + case roachpb.STAGING: + if h.Txn.Epoch < reply.Txn.Epoch { + return result.Result{}, errors.AssertionFailedf( + "programming error: epoch regression: %d", h.Txn.Epoch) + } + if h.Txn.Epoch > reply.Txn.Epoch { + // If the EndTxn carries a newer epoch than a STAGING txn record, we do + // not consider the transaction to be performing a parallel commit and + // potentially already implicitly committed because we know that the + // transaction restarted since entering the STAGING state. + reply.Txn.Status = roachpb.PENDING + } + default: return result.Result{}, errors.AssertionFailedf("bad txn status: %s", reply.Txn) } @@ -318,6 +331,40 @@ func EndTxn( // Else, the transaction can be explicitly committed. reply.Txn.Status = roachpb.COMMITTED } else { + // If the transaction is STAGING, we can only move it to ABORTED if it is + // *not* already implicitly committed. On the commit path, the transaction + // coordinator is deliberate to only ever issue an EndTxn(commit) once the + // transaction has reached an implicit commit state. However, on the + // rollback path, the transaction coordinator does not make the opposite + // guarantee that it will never issue an EndTxn(abort) once the transaction + // has reached (or if it still could reach) an implicit commit state. + // + // As a result, on the rollback path, we don't trust the transaction's + // coordinator to be an authoritative source of truth about whether the + // transaction is implicitly committed. In other words, we don't consider + // this EndTxn(abort) to be a claim that the transaction is not implicitly + // committed. The transaction's coordinator may have just given up on the + // transaction before it heard the outcome of a commit attempt. So in this + // case, we return an IndeterminateCommitError to trigger the transaction + // recovery protocol and transition the transaction record to a finalized + // state (COMMITTED or ABORTED). + // + // Interestingly, because intents are not currently resolved until after an + // implicitly committed transaction has been moved to an explicit commit + // state (i.e. its record has moved from STAGING to COMMITTED), no other + // transaction could see the effect of an implicitly committed transaction + // that was erroneously rolled back. This means that such a mistake does not + // actually compromise atomicity. Regardless, such a transition is confusing + // and can cause errors in transaction recovery code. We would also like to + // begin resolving intents earlier, while a transaction is still implicitly + // committed. Doing so is only possible if we can guarantee that under no + // circumstances can an implicitly committed transaction be rolled back. + if reply.Txn.Status == roachpb.STAGING { + err := roachpb.NewIndeterminateCommitError(*reply.Txn) + log.VEventf(ctx, 1, "%v", err) + return result.Result{}, err + } + reply.Txn.Status = roachpb.ABORTED } diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go index 04b095bdff12..19cb34232cda 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go @@ -611,15 +611,20 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) { }, { // Standard case where a transaction is rolled back. The record - // already exists because of a failed parallel commit attempt. - name: "record staging, try rollback", + // already exists because of a failed parallel commit attempt in + // the same epoch. + // + // The rollback is not considered an authoritative indication that the + // transaction is not implicitly committed, so an indeterminate commit + // error is returned to force transaction recovery to be performed. + name: "record staging, try rollback at same epoch", // Replica state. existingTxn: stagingRecord, // Request state. headerTxn: headerTxn, commit: false, // Expected result. - expTxn: abortedRecord, + expError: "found txn in indeterminate STAGING state", }, { // Standard case where a transaction record is created during a diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index c09c251d2b5e..6066941e6603 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -489,7 +489,9 @@ func (r *Replica) executeBatchWithConcurrencyRetries( latchSpans, lockSpans = g.TakeSpanSets() r.concMgr.FinishReq(g) g = nil - // Then launch a task to handle the indeterminate commit error. + // Then launch a task to handle the indeterminate commit error. No error + // is returned if the transaction is recovered successfully to either a + // COMMITTED or ABORTED state. if pErr = r.handleIndeterminateCommitError(ctx, ba, pErr, t); pErr != nil { return nil, pErr } @@ -693,7 +695,7 @@ func (r *Replica) handleIndeterminateCommitError( newPErr.Index = pErr.Index return newPErr } - // We've recovered the transaction that blocked the push; retry command. + // We've recovered the transaction that blocked the request; retry. return nil } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 887d3f3fca52..00a01415ccfa 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -11432,6 +11432,11 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) { }, { // Case of a rollback after an unsuccessful implicit commit. + // The rollback request is not considered an authoritative indication that + // the transaction is not implicitly committed (i.e. the txn coordinator + // may have given up on the txn before it heard the result of a commit one + // way or another), so an IndeterminateCommitError is returned to force + // transaction recovery to be performed. name: "end transaction (abort) after end transaction (stage)", setup: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { et, etH := endTxnArgs(txn, true /* commit */) @@ -11442,6 +11447,27 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) { et, etH := endTxnArgs(txn, false /* commit */) return sendWrappedWithErr(etH, &et) }, + expError: "found txn in indeterminate STAGING state", + expTxn: txnWithStagingStatusAndInFlightWrites, + }, + { + // Case of a rollback after an unsuccessful implicit commit and txn + // restart. Because the rollback request uses a newer epoch than the + // staging record, it is considered an authoritative indication that the + // transaction was never implicitly committed and was later restarted, so + // the rollback succeeds and the record is immediately aborted. + name: "end transaction (abort) with epoch bump after end transaction (stage)", + setup: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { + et, etH := endTxnArgs(txn, true /* commit */) + et.InFlightWrites = inFlightWrites + return sendWrappedWithErr(etH, &et) + }, + run: func(txn *roachpb.Transaction, now hlc.Timestamp) error { + clone := txn.Clone() + clone.Restart(-1, 0, now) + et, etH := endTxnArgs(clone, false /* commit */) + return sendWrappedWithErr(etH, &et) + }, // The transaction record will be eagerly GC-ed. expTxn: noTxnRecord, }, @@ -11461,17 +11487,25 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) { expTxn: noTxnRecord, }, { - name: "end transaction (abort) without eager gc after end transaction (stage)", + // Case of a rollback after an unsuccessful implicit commit. + name: "end transaction (abort) with epoch bump, without eager gc after end transaction (stage)", setup: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { et, etH := endTxnArgs(txn, true /* commit */) et.InFlightWrites = inFlightWrites return sendWrappedWithErr(etH, &et) }, - run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error { - et, etH := endTxnArgs(txn, false /* commit */) + run: func(txn *roachpb.Transaction, now hlc.Timestamp) error { + clone := txn.Clone() + clone.Restart(-1, 0, now) + et, etH := endTxnArgs(clone, false /* commit */) return sendWrappedWithErr(etH, &et) }, - expTxn: txnWithStatus(roachpb.ABORTED), + expTxn: func(txn *roachpb.Transaction, now hlc.Timestamp) roachpb.TransactionRecord { + record := txnWithStatus(roachpb.ABORTED)(txn, now) + record.Epoch = txn.Epoch + 1 + record.WriteTimestamp.Forward(now) + return record + }, disableTxnAutoGC: true, }, { diff --git a/pkg/kv/txn_external_test.go b/pkg/kv/txn_external_test.go index c85c8933f336..da8e14af2678 100644 --- a/pkg/kv/txn_external_test.go +++ b/pkg/kv/txn_external_test.go @@ -52,6 +52,11 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) { // If txnStatus == COMMITTED, setting txnRecordCleanedUp will make us // cleanup the transaction. The txn record will be deleted. txnRecordCleanedUp bool + // If txnStatus == STAGING, setting txnImplicitlyCommitted will make + // the transaction qualify for the implicit commit condition. Otherwise, + // the transaction will not qualify because the test will attach a fake + // in-flight write to the transaction's STAGING record. + txnImplictlyCommitted bool // The error that we expect from txn.Rollback(). expRollbackErr string // Is the transaction expected to be committed or not after the @@ -76,42 +81,109 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) { expRollbackErr: "already committed", }, { - name: "STAGING", - txnStatus: roachpb.STAGING, - // The rollback succeeds. This behavior is undersired. See #48301. + name: "STAGING, implicitly committed", + txnStatus: roachpb.STAGING, + txnImplictlyCommitted: true, + // The rollback fails after performing txn recovery and moving the + // transaction to committed. + expCommitted: true, + expRollbackErr: "already committed", + }, + { + name: "STAGING, not implicitly committed", + txnStatus: roachpb.STAGING, + txnImplictlyCommitted: false, + // The rollback succeeds after performing txn recovery and moving the + // transaction to aborted. + expCommitted: false, + expRollbackErr: "", + }, + { + name: "PENDING", + txnStatus: roachpb.PENDING, + // The rollback succeeds. expCommitted: false, expRollbackErr: "", }, } for _, testCase := range testCases { + // Sanity-check test cases. if testCase.txnRecordCleanedUp { require.Equal(t, roachpb.COMMITTED, testCase.txnStatus) } + if testCase.txnImplictlyCommitted { + require.Equal(t, roachpb.STAGING, testCase.txnStatus) + } + t.Run(testCase.name, func(t *testing.T) { var filterSet int64 var key roachpb.Key commitBlocked := make(chan struct{}) + onCommitReqFilter := func( + ba roachpb.BatchRequest, fn func(et *roachpb.EndTxnRequest) *roachpb.Error, + ) *roachpb.Error { + if atomic.LoadInt64(&filterSet) == 0 { + return nil + } + req, ok := ba.GetArg(roachpb.EndTxn) + if !ok { + return nil + } + et := req.(*roachpb.EndTxnRequest) + if et.Key.Equal(key) && et.Commit { + return fn(et) + } + return nil + } + blockCommitReqFilter := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + return onCommitReqFilter(ba, func(et *roachpb.EndTxnRequest) *roachpb.Error { + // Inform the test that the commit is blocked. + commitBlocked <- struct{}{} + // Block until the client interrupts the commit. The client will + // cancel its context, at which point gRPC will return an error + // to the client and marshall the cancelation to the server. + <-ctx.Done() + return roachpb.NewError(ctx.Err()) + }) + } + addInFlightWriteToCommitReqFilter := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + return onCommitReqFilter(ba, func(et *roachpb.EndTxnRequest) *roachpb.Error { + // Add a fake in-flight write. + et.InFlightWrites = append(et.InFlightWrites, roachpb.SequencedWrite{ + Key: key.Next(), Sequence: et.Sequence, + }) + // Be sure to update the EndTxn and Txn's sequence accordingly. + et.Sequence++ + ba.Txn.Sequence++ + return nil + }) + } args := base.TestServerArgs{ Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ // We're going to block the commit of the test's txn, letting the - // test cancel the request's ctx while the request is blocked. - TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error { - if atomic.LoadInt64(&filterSet) == 0 { - return nil + // test cancel the request's ctx while the request is blocked. We + // do this either before or after the request completes, depending + // on the status that the test wants the txn record to be in when + // the rollback is performed. + TestingRequestFilter: func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + if testCase.txnStatus == roachpb.PENDING { + // Block and reject before the request writes the txn record. + return blockCommitReqFilter(ctx, ba) } - req, ok := ba.GetArg(roachpb.EndTxn) - if !ok { - return nil + if testCase.txnStatus == roachpb.STAGING && !testCase.txnImplictlyCommitted { + // If the test wants the upcoming rollback to find a STAGING + // record for a transaction that is not implicitly committed, + // add an in-flight write for a (key, sequence) that does not + // exist to the EndTxn request. + _ = addInFlightWriteToCommitReqFilter(ctx, ba) } - commit := req.(*roachpb.EndTxnRequest) - if commit.Key.Equal(key) && commit.Commit { - // Inform the test that the commit is blocked. - commitBlocked <- struct{}{} - // Block until the client interrupts the commit. The client will - // cancel its context, at which point gRPC will return an error - // to the client and marshall the cancelation to the server. - <-ctx.Done() + return nil + }, + TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error { + if testCase.txnStatus != roachpb.PENDING { + // Block and reject after the request writes the txn record. + return blockCommitReqFilter(ctx, ba) } return nil }, @@ -186,7 +258,7 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) { // If the test wants the upcoming rollback to find a COMMITTED record, // we'll perform transaction recovery. This will leave the transaction in // the COMMITTED state, without cleaning it up. - if !testCase.txnRecordCleanedUp && testCase.txnStatus == roachpb.COMMITTED { + if testCase.txnStatus == roachpb.COMMITTED && !testCase.txnRecordCleanedUp { // Sanity check - verify that the txn is STAGING. txnProto := txn.TestingCloneTxn() queryTxn := roachpb.QueryTxnRequest{