Skip to content

Commit

Permalink
kv: perform recovery on rollback of staging transaction record
Browse files Browse the repository at this point in the history
Fixes cockroachdb#48301.

During a transaction rollback (i.e. an `EndTxn(abort)`), if the request
arrives to find that the transaction record is STAGING, it can only move
it to ABORTED if it is *not* already implicitly committed. On the commit
path, the transaction coordinator is deliberate to only ever issue an
`EndTxn(commit)` once the transaction has reached an implicit commit
state. However, on the rollback path, the transaction coordinator does
not make the opposite guarantee that it will never issue an
`EndTxn(abort)` once the transaction has reached (or if it still could
reach) an implicit commit state.

As a result, on the rollback path, we don't trust the transaction's
coordinator to be an authoritative source of truth about whether the
transaction is implicitly committed. In other words, we don't consider
this `EndTxn(abort)` to be a claim that the transaction is not
implicitly committed. The transaction's coordinator may have just given
up on the transaction before it heard the outcome of a commit attempt.
So in this case, we now return an `IndeterminateCommitError` during
evaluation to trigger the transaction recovery protocol and transition
the transaction record to a finalized state (COMMITTED or ABORTED).

This was one of the two alternatives described in cockroachdb#48301. The other
option was to lock down the txn client to attempt to make the guarantee
that rollbacks will only be performed if the client is certain that the
transaction is not currently in and can no longer ever enter the
implicit commit state. I was previously drawn to this other approach
because it would avoid the need for transaction recovery during the
rollback of a staging txn. However, I don't think it's possible to make
such a guarantee in all cases due to the possibility of ambiguity. To
eliminate this ambiguity, there are cases where a transaction's
coordinator would need to query the result of writes, which turns out to
be analogous to the transaction recovery protocol.

So instead of trying to make the guarantee in all cases, I'd rather make
rollbacks safe in all cases and then later explore selectively opting in
to skipping txn recovery in specific cases where the client can
definitively guarantee that the it is rolling back a staging transaction
that is not implicitly committed. I don't expect this to be needed
immediately.

Release note (bug fix): fixed a rare race condition that could lead to
client-visible errors that looked like "found ABORTED record for
implicitly committed transaction". These errors were harmless in that
they did not indicate data corruption, but they could be disruptive to
clients.
  • Loading branch information
nvanbenschoten committed Jan 27, 2022
1 parent 659fe19 commit ac7094e
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 26 deletions.
29 changes: 29 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ func EndTxn(
return result.Result{}, errors.AssertionFailedf(
"programming error: epoch regression: %d", h.Txn.Epoch)
}
if h.Txn.Epoch > reply.Txn.Epoch {
// If the EndTxn carries a newer epoch than a STAGING txn record, we do
// not consider the record to be performing a parallel commit because we
// know that the transaction restarted since entering the STAGING state.
reply.Txn.Status = roachpb.PENDING
}

default:
return result.Result{}, errors.AssertionFailedf("bad txn status: %s", reply.Txn)
Expand Down Expand Up @@ -318,6 +324,29 @@ func EndTxn(
// Else, the transaction can be explicitly committed.
reply.Txn.Status = roachpb.COMMITTED
} else {
// If the transaction is STAGING, we can only move it to ABORTED if it is
// *not* already implicitly committed. On the commit path, the transaction
// coordinator is deliberate to only ever issue an EndTxn(commit) once the
// transaction has reached an implicit commit state. However, on the
// rollback path, the transaction coordinator does not make the opposite
// guarantee that it will never issue an EndTxn(abort) once the transaction
// has reached (or if it still could reach) an implicit commit state.
//
// As a result, on the rollback path, we don't trust the transaction's
// coordinator to be an authoritative source of truth about whether the
// transaction is implicitly committed. In other words, we don't consider
// this EndTxn(abort) to be a claim that the transaction is not implicitly
// committed. The transaction's coordinator may have just given up on the
// transaction before it heard the outcome of a commit attempt. So in this
// case, we return an IndeterminateCommitError to trigger the transaction
// recovery protocol and transition the transaction record to a finalized
// state (COMMITTED or ABORTED).
if reply.Txn.Status == roachpb.STAGING {
err := roachpb.NewIndeterminateCommitError(*reply.Txn)
log.VEventf(ctx, 1, "%v", err)
return result.Result{}, err
}

reply.Txn.Status = roachpb.ABORTED
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,15 +611,16 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) {
},
{
// Standard case where a transaction is rolled back. The record
// already exists because of a failed parallel commit attempt.
name: "record staging, try rollback",
// already exists because of a failed parallel commit attempt in
// the same epoch.
name: "record staging, try rollback at same epoch",
// Replica state.
existingTxn: stagingRecord,
// Request state.
headerTxn: headerTxn,
commit: false,
// Expected result.
expTxn: abortedRecord,
expError: "found txn in indeterminate STAGING state",
},
{
// Standard case where a transaction record is created during a
Expand Down
43 changes: 39 additions & 4 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11341,6 +11341,12 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
},
{
// Case of a rollback after an unsuccessful implicit commit.
// Because the rollback request uses the same epoch as the staging record,
// it is not considered an authoritative indication that the transaction
// is not implicitly committed (i.e. the txn coordinator may have given up
// on the txn before it heard the result of a commit one way or another),
// so an IndeterminateCommitError is returned to force transaction
// recovery to be performed.
name: "end transaction (abort) after end transaction (stage)",
setup: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
Expand All @@ -11351,6 +11357,27 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
et, etH := endTxnArgs(txn, false /* commit */)
return sendWrappedWithErr(etH, &et)
},
expError: "found txn in indeterminate STAGING state",
expTxn: txnWithStagingStatusAndInFlightWrites,
},
{
// Case of a rollback after an unsuccessful implicit commit and txn
// restart. Because the rollback request uses a newer epoch than the
// staging record, it is considered an authoritative indication that the
// transaction was never implicitly committed and was later restarted, so
// the rollback succeeds and the record is immediately aborted.
name: "end transaction (abort) with epoch bump after end transaction (stage)",
setup: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
et.InFlightWrites = inFlightWrites
return sendWrappedWithErr(etH, &et)
},
run: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
clone := txn.Clone()
clone.Restart(-1, 0, now)
et, etH := endTxnArgs(clone, false /* commit */)
return sendWrappedWithErr(etH, &et)
},
// The transaction record will be eagerly GC-ed.
expTxn: noTxnRecord,
},
Expand All @@ -11370,17 +11397,25 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
expTxn: noTxnRecord,
},
{
name: "end transaction (abort) without eager gc after end transaction (stage)",
// Case of a rollback after an unsuccessful implicit commit.
name: "end transaction (abort) with epoch bump, without eager gc after end transaction (stage)",
setup: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
et.InFlightWrites = inFlightWrites
return sendWrappedWithErr(etH, &et)
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, false /* commit */)
run: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
clone := txn.Clone()
clone.Restart(-1, 0, now)
et, etH := endTxnArgs(clone, false /* commit */)
return sendWrappedWithErr(etH, &et)
},
expTxn: txnWithStatus(roachpb.ABORTED),
expTxn: func(txn *roachpb.Transaction, now hlc.Timestamp) roachpb.TransactionRecord {
record := txnWithStatus(roachpb.ABORTED)(txn, now)
record.Epoch = txn.Epoch + 1
record.WriteTimestamp.Forward(now)
return record
},
disableTxnAutoGC: true,
},
{
Expand Down
109 changes: 90 additions & 19 deletions pkg/kv/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) {
// If txnStatus == COMMITTED, setting txnRecordCleanedUp will make us
// cleanup the transaction. The txn record will be deleted.
txnRecordCleanedUp bool
// If txnStatus == STAGING, setting txnImplicitlyCommitted will make
// the transaction qualify for the implicit commit condition. Otherwise,
// the transaction will not qualify.
txnImplictlyCommitted bool
// The error that we expect from txn.Rollback().
expRollbackErr string
// Is the transaction expected to be committed or not after the
Expand All @@ -76,42 +80,109 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) {
expRollbackErr: "already committed",
},
{
name: "STAGING",
txnStatus: roachpb.STAGING,
// The rollback succeeds. This behavior is undersired. See #48301.
name: "STAGING, implicitly committed",
txnStatus: roachpb.STAGING,
txnImplictlyCommitted: true,
// The rollback fails after performing txn recovery and moving the
// transaction to committed.
expCommitted: true,
expRollbackErr: "already committed",
},
{
name: "STAGING, not implicitly committed",
txnStatus: roachpb.STAGING,
txnImplictlyCommitted: false,
// The rollback succeeds after performing txn recovery and moving the
// transaction to aborted.
expCommitted: false,
expRollbackErr: "",
},
{
name: "PENDING",
txnStatus: roachpb.PENDING,
// The rollback succeeds.
expCommitted: false,
expRollbackErr: "",
},
}
for _, testCase := range testCases {
// Sanity-check test cases.
if testCase.txnRecordCleanedUp {
require.Equal(t, roachpb.COMMITTED, testCase.txnStatus)
}
if testCase.txnImplictlyCommitted {
require.Equal(t, roachpb.STAGING, testCase.txnStatus)
}

t.Run(testCase.name, func(t *testing.T) {
var filterSet int64
var key roachpb.Key
commitBlocked := make(chan struct{})
onCommitReqFilter := func(
ba roachpb.BatchRequest, fn func(et *roachpb.EndTxnRequest) *roachpb.Error,
) *roachpb.Error {
if atomic.LoadInt64(&filterSet) == 0 {
return nil
}
req, ok := ba.GetArg(roachpb.EndTxn)
if !ok {
return nil
}
et := req.(*roachpb.EndTxnRequest)
if et.Key.Equal(key) && et.Commit {
return fn(et)
}
return nil
}
blockCommitReqFilter := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
return onCommitReqFilter(ba, func(et *roachpb.EndTxnRequest) *roachpb.Error {
// Inform the test that the commit is blocked.
commitBlocked <- struct{}{}
// Block until the client interrupts the commit. The client will
// cancel its context, at which point gRPC will return an error
// to the client and marshall the cancelation to the server.
<-ctx.Done()
return roachpb.NewError(ctx.Err())
})
}
addInFlightWriteToCommitReqFilter := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
return onCommitReqFilter(ba, func(et *roachpb.EndTxnRequest) *roachpb.Error {
// Add a fake in-flight write.
et.InFlightWrites = append(et.InFlightWrites, roachpb.SequencedWrite{
Key: key.Next(), Sequence: et.Sequence,
})
// Be sure to update the EndTxn and Txn's sequence accordingly.
et.Sequence++
ba.Txn.Sequence++
return nil
})
}
args := base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// We're going to block the commit of the test's txn, letting the
// test cancel the request's ctx while the request is blocked.
TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
if atomic.LoadInt64(&filterSet) == 0 {
return nil
// test cancel the request's ctx while the request is blocked. We
// do this either before or after the request completes, depending
// on the status that the test wants the txn record to be in when
// the rollback is performed.
TestingRequestFilter: func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if testCase.txnStatus == roachpb.PENDING {
// Block and reject before the request writes the txn record.
return blockCommitReqFilter(ctx, ba)
}
req, ok := ba.GetArg(roachpb.EndTxn)
if !ok {
return nil
if testCase.txnStatus == roachpb.STAGING && !testCase.txnImplictlyCommitted {
// If the test wants the upcoming rollback to find a STAGING
// record for a transaction that is not implicitly committed,
// add an in-flight write for a (key, sequence) that does not
// exist to the EndTxn request.
_ = addInFlightWriteToCommitReqFilter(ctx, ba)
}
commit := req.(*roachpb.EndTxnRequest)
if commit.Key.Equal(key) && commit.Commit {
// Inform the test that the commit is blocked.
commitBlocked <- struct{}{}
// Block until the client interrupts the commit. The client will
// cancel its context, at which point gRPC will return an error
// to the client and marshall the cancelation to the server.
<-ctx.Done()
return nil
},
TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
if testCase.txnStatus != roachpb.PENDING {
// Block and reject after the request writes the txn record.
return blockCommitReqFilter(ctx, ba)
}
return nil
},
Expand Down Expand Up @@ -186,7 +257,7 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) {
// If the test wants the upcoming rollback to find a COMMITTED record,
// we'll perform transaction recovery. This will leave the transaction in
// the COMMITTED state, without cleaning it up.
if !testCase.txnRecordCleanedUp && testCase.txnStatus == roachpb.COMMITTED {
if testCase.txnStatus == roachpb.COMMITTED && !testCase.txnRecordCleanedUp {
// Sanity check - verify that the txn is STAGING.
txnProto := txn.TestingCloneTxn()
queryTxn := roachpb.QueryTxnRequest{
Expand Down

0 comments on commit ac7094e

Please sign in to comment.