Skip to content

Commit

Permalink
kv: perform recovery on rollback of staging transaction record
Browse files Browse the repository at this point in the history
Fixes #48301.

During a transaction rollback (i.e. an `EndTxn(abort)`), if the request
arrives to find that the transaction record is STAGING, it can only move
it to ABORTED if it is *not* already implicitly committed. On the commit
path, the transaction coordinator is deliberate to only ever issue an
`EndTxn(commit)` once the transaction has reached an implicit commit
state. However, on the rollback path, the transaction coordinator does
not make the opposite guarantee that it will never issue an
`EndTxn(abort)` once the transaction has reached (or if it still could
reach) an implicit commit state.

As a result, on the rollback path, we don't trust the transaction's
coordinator to be an authoritative source of truth about whether the
transaction is implicitly committed. In other words, we don't consider
this `EndTxn(abort)` to be a claim that the transaction is not
implicitly committed. The transaction's coordinator may have just given
up on the transaction before it heard the outcome of a commit attempt.
So in this case, we now return an `IndeterminateCommitError` during
evaluation to trigger the transaction recovery protocol and transition
the transaction record to a finalized state (COMMITTED or ABORTED).

Prior to this change, we were blindly trusting the `EndTxn(abort)` to
be such an authoritative source of truth, so we were at risk of hazards
where a transaction was implicitly committed but its coordinator did not
know and issued a rollback. This was observed to cause workloads to hit
"found ABORTED record for implicitly committed transaction" errors under
sufficient network fault injection.

This was one of the two alternatives described in #48301. The other
option was to lock down the txn client to attempt to make the guarantee
that rollbacks will only be performed if the client is certain that the
transaction is not currently in and can no longer ever enter the
implicit commit state. I was previously drawn to this other approach
because it would avoid the need for transaction recovery during the
rollback of a staging txn. However, I don't think it's possible to make
such a guarantee in all cases due to the possibility of ambiguity. To
eliminate this ambiguity, there are cases where a transaction's
coordinator would need to query the result of writes, which turns out to
be analogous to the transaction recovery protocol.

So instead of trying to make the guarantee in all cases, I'd rather make
rollbacks safe in all cases and then later explore selectively opting in
to skipping txn recovery in specific cases where the client can
definitively guarantee that the it is rolling back a staging transaction
that is not implicitly committed. I don't expect this to be needed
immediately.

Release note (bug fix): fixed a rare race condition that could lead to
client-visible errors that looked like "found ABORTED record for
implicitly committed transaction". These errors were harmless in that
they did not indicate data corruption, but they could be disruptive to
clients.
  • Loading branch information
nvanbenschoten committed Feb 3, 2022
1 parent 7499c27 commit a91d7aa
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 29 deletions.
49 changes: 48 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,25 @@ func EndTxn(
return result.FromEndTxn(reply.Txn, true /* alwaysReturn */, args.Poison),
roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_ABORTED_RECORD_FOUND)

case roachpb.PENDING, roachpb.STAGING:
case roachpb.PENDING:
if h.Txn.Epoch < reply.Txn.Epoch {
return result.Result{}, errors.AssertionFailedf(
"programming error: epoch regression: %d", h.Txn.Epoch)
}

case roachpb.STAGING:
if h.Txn.Epoch < reply.Txn.Epoch {
return result.Result{}, errors.AssertionFailedf(
"programming error: epoch regression: %d", h.Txn.Epoch)
}
if h.Txn.Epoch > reply.Txn.Epoch {
// If the EndTxn carries a newer epoch than a STAGING txn record, we do
// not consider the transaction to be performing a parallel commit and
// potentially already implicitly committed because we know that the
// transaction restarted since entering the STAGING state.
reply.Txn.Status = roachpb.PENDING
}

default:
return result.Result{}, errors.AssertionFailedf("bad txn status: %s", reply.Txn)
}
Expand Down Expand Up @@ -318,6 +331,40 @@ func EndTxn(
// Else, the transaction can be explicitly committed.
reply.Txn.Status = roachpb.COMMITTED
} else {
// If the transaction is STAGING, we can only move it to ABORTED if it is
// *not* already implicitly committed. On the commit path, the transaction
// coordinator is deliberate to only ever issue an EndTxn(commit) once the
// transaction has reached an implicit commit state. However, on the
// rollback path, the transaction coordinator does not make the opposite
// guarantee that it will never issue an EndTxn(abort) once the transaction
// has reached (or if it still could reach) an implicit commit state.
//
// As a result, on the rollback path, we don't trust the transaction's
// coordinator to be an authoritative source of truth about whether the
// transaction is implicitly committed. In other words, we don't consider
// this EndTxn(abort) to be a claim that the transaction is not implicitly
// committed. The transaction's coordinator may have just given up on the
// transaction before it heard the outcome of a commit attempt. So in this
// case, we return an IndeterminateCommitError to trigger the transaction
// recovery protocol and transition the transaction record to a finalized
// state (COMMITTED or ABORTED).
//
// Interestingly, because intents are not currently resolved until after an
// implicitly committed transaction has been moved to an explicit commit
// state (i.e. its record has moved from STAGING to COMMITTED), no other
// transaction could see the effect of an implicitly committed transaction
// that was erroneously rolled back. This means that such a mistake does not
// actually compromise atomicity. Regardless, such a transition is confusing
// and can cause errors in transaction recovery code. We would also like to
// begin resolving intents earlier, while a transaction is still implicitly
// committed. Doing so is only possible if we can guarantee that under no
// circumstances can an implicitly committed transaction be rolled back.
if reply.Txn.Status == roachpb.STAGING {
err := roachpb.NewIndeterminateCommitError(*reply.Txn)
log.VEventf(ctx, 1, "%v", err)
return result.Result{}, err
}

reply.Txn.Status = roachpb.ABORTED
}

Expand Down
11 changes: 8 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,15 +611,20 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) {
},
{
// Standard case where a transaction is rolled back. The record
// already exists because of a failed parallel commit attempt.
name: "record staging, try rollback",
// already exists because of a failed parallel commit attempt in
// the same epoch.
//
// The rollback is not considered an authoritative indication that the
// transaction is not implicitly committed, so an indeterminate commit
// error is returned to force transaction recovery to be performed.
name: "record staging, try rollback at same epoch",
// Replica state.
existingTxn: stagingRecord,
// Request state.
headerTxn: headerTxn,
commit: false,
// Expected result.
expTxn: abortedRecord,
expError: "found txn in indeterminate STAGING state",
},
{
// Standard case where a transaction record is created during a
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,9 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
latchSpans, lockSpans = g.TakeSpanSets()
r.concMgr.FinishReq(g)
g = nil
// Then launch a task to handle the indeterminate commit error.
// Then launch a task to handle the indeterminate commit error. No error
// is returned if the transaction is recovered successfully to either a
// COMMITTED or ABORTED state.
if pErr = r.handleIndeterminateCommitError(ctx, ba, pErr, t); pErr != nil {
return nil, pErr
}
Expand Down Expand Up @@ -758,7 +760,7 @@ func (r *Replica) handleIndeterminateCommitError(
newPErr.Index = pErr.Index
return newPErr
}
// We've recovered the transaction that blocked the push; retry command.
// We've recovered the transaction that blocked the request; retry.
return nil
}

Expand Down
42 changes: 38 additions & 4 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11432,6 +11432,11 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
},
{
// Case of a rollback after an unsuccessful implicit commit.
// The rollback request is not considered an authoritative indication that
// the transaction is not implicitly committed (i.e. the txn coordinator
// may have given up on the txn before it heard the result of a commit one
// way or another), so an IndeterminateCommitError is returned to force
// transaction recovery to be performed.
name: "end transaction (abort) after end transaction (stage)",
setup: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
Expand All @@ -11442,6 +11447,27 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
et, etH := endTxnArgs(txn, false /* commit */)
return sendWrappedWithErr(etH, &et)
},
expError: "found txn in indeterminate STAGING state",
expTxn: txnWithStagingStatusAndInFlightWrites,
},
{
// Case of a rollback after an unsuccessful implicit commit and txn
// restart. Because the rollback request uses a newer epoch than the
// staging record, it is considered an authoritative indication that the
// transaction was never implicitly committed and was later restarted, so
// the rollback succeeds and the record is immediately aborted.
name: "end transaction (abort) with epoch bump after end transaction (stage)",
setup: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
et.InFlightWrites = inFlightWrites
return sendWrappedWithErr(etH, &et)
},
run: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
clone := txn.Clone()
clone.Restart(-1, 0, now)
et, etH := endTxnArgs(clone, false /* commit */)
return sendWrappedWithErr(etH, &et)
},
// The transaction record will be eagerly GC-ed.
expTxn: noTxnRecord,
},
Expand All @@ -11461,17 +11487,25 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
expTxn: noTxnRecord,
},
{
name: "end transaction (abort) without eager gc after end transaction (stage)",
// Case of a rollback after an unsuccessful implicit commit.
name: "end transaction (abort) with epoch bump, without eager gc after end transaction (stage)",
setup: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, true /* commit */)
et.InFlightWrites = inFlightWrites
return sendWrappedWithErr(etH, &et)
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
et, etH := endTxnArgs(txn, false /* commit */)
run: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
clone := txn.Clone()
clone.Restart(-1, 0, now)
et, etH := endTxnArgs(clone, false /* commit */)
return sendWrappedWithErr(etH, &et)
},
expTxn: txnWithStatus(roachpb.ABORTED),
expTxn: func(txn *roachpb.Transaction, now hlc.Timestamp) roachpb.TransactionRecord {
record := txnWithStatus(roachpb.ABORTED)(txn, now)
record.Epoch = txn.Epoch + 1
record.WriteTimestamp.Forward(now)
return record
},
disableTxnAutoGC: true,
},
{
Expand Down
110 changes: 91 additions & 19 deletions pkg/kv/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) {
// If txnStatus == COMMITTED, setting txnRecordCleanedUp will make us
// cleanup the transaction. The txn record will be deleted.
txnRecordCleanedUp bool
// If txnStatus == STAGING, setting txnImplicitlyCommitted will make
// the transaction qualify for the implicit commit condition. Otherwise,
// the transaction will not qualify because the test will attach a fake
// in-flight write to the transaction's STAGING record.
txnImplictlyCommitted bool
// The error that we expect from txn.Rollback().
expRollbackErr string
// Is the transaction expected to be committed or not after the
Expand All @@ -76,42 +81,109 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) {
expRollbackErr: "already committed",
},
{
name: "STAGING",
txnStatus: roachpb.STAGING,
// The rollback succeeds. This behavior is undersired. See #48301.
name: "STAGING, implicitly committed",
txnStatus: roachpb.STAGING,
txnImplictlyCommitted: true,
// The rollback fails after performing txn recovery and moving the
// transaction to committed.
expCommitted: true,
expRollbackErr: "already committed",
},
{
name: "STAGING, not implicitly committed",
txnStatus: roachpb.STAGING,
txnImplictlyCommitted: false,
// The rollback succeeds after performing txn recovery and moving the
// transaction to aborted.
expCommitted: false,
expRollbackErr: "",
},
{
name: "PENDING",
txnStatus: roachpb.PENDING,
// The rollback succeeds.
expCommitted: false,
expRollbackErr: "",
},
}
for _, testCase := range testCases {
// Sanity-check test cases.
if testCase.txnRecordCleanedUp {
require.Equal(t, roachpb.COMMITTED, testCase.txnStatus)
}
if testCase.txnImplictlyCommitted {
require.Equal(t, roachpb.STAGING, testCase.txnStatus)
}

t.Run(testCase.name, func(t *testing.T) {
var filterSet int64
var key roachpb.Key
commitBlocked := make(chan struct{})
onCommitReqFilter := func(
ba roachpb.BatchRequest, fn func(et *roachpb.EndTxnRequest) *roachpb.Error,
) *roachpb.Error {
if atomic.LoadInt64(&filterSet) == 0 {
return nil
}
req, ok := ba.GetArg(roachpb.EndTxn)
if !ok {
return nil
}
et := req.(*roachpb.EndTxnRequest)
if et.Key.Equal(key) && et.Commit {
return fn(et)
}
return nil
}
blockCommitReqFilter := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
return onCommitReqFilter(ba, func(et *roachpb.EndTxnRequest) *roachpb.Error {
// Inform the test that the commit is blocked.
commitBlocked <- struct{}{}
// Block until the client interrupts the commit. The client will
// cancel its context, at which point gRPC will return an error
// to the client and marshall the cancelation to the server.
<-ctx.Done()
return roachpb.NewError(ctx.Err())
})
}
addInFlightWriteToCommitReqFilter := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
return onCommitReqFilter(ba, func(et *roachpb.EndTxnRequest) *roachpb.Error {
// Add a fake in-flight write.
et.InFlightWrites = append(et.InFlightWrites, roachpb.SequencedWrite{
Key: key.Next(), Sequence: et.Sequence,
})
// Be sure to update the EndTxn and Txn's sequence accordingly.
et.Sequence++
ba.Txn.Sequence++
return nil
})
}
args := base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// We're going to block the commit of the test's txn, letting the
// test cancel the request's ctx while the request is blocked.
TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
if atomic.LoadInt64(&filterSet) == 0 {
return nil
// test cancel the request's ctx while the request is blocked. We
// do this either before or after the request completes, depending
// on the status that the test wants the txn record to be in when
// the rollback is performed.
TestingRequestFilter: func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if testCase.txnStatus == roachpb.PENDING {
// Block and reject before the request writes the txn record.
return blockCommitReqFilter(ctx, ba)
}
req, ok := ba.GetArg(roachpb.EndTxn)
if !ok {
return nil
if testCase.txnStatus == roachpb.STAGING && !testCase.txnImplictlyCommitted {
// If the test wants the upcoming rollback to find a STAGING
// record for a transaction that is not implicitly committed,
// add an in-flight write for a (key, sequence) that does not
// exist to the EndTxn request.
_ = addInFlightWriteToCommitReqFilter(ctx, ba)
}
commit := req.(*roachpb.EndTxnRequest)
if commit.Key.Equal(key) && commit.Commit {
// Inform the test that the commit is blocked.
commitBlocked <- struct{}{}
// Block until the client interrupts the commit. The client will
// cancel its context, at which point gRPC will return an error
// to the client and marshall the cancelation to the server.
<-ctx.Done()
return nil
},
TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
if testCase.txnStatus != roachpb.PENDING {
// Block and reject after the request writes the txn record.
return blockCommitReqFilter(ctx, ba)
}
return nil
},
Expand Down Expand Up @@ -186,7 +258,7 @@ func TestRollbackAfterAmbiguousCommit(t *testing.T) {
// If the test wants the upcoming rollback to find a COMMITTED record,
// we'll perform transaction recovery. This will leave the transaction in
// the COMMITTED state, without cleaning it up.
if !testCase.txnRecordCleanedUp && testCase.txnStatus == roachpb.COMMITTED {
if testCase.txnStatus == roachpb.COMMITTED && !testCase.txnRecordCleanedUp {
// Sanity check - verify that the txn is STAGING.
txnProto := txn.TestingCloneTxn()
queryTxn := roachpb.QueryTxnRequest{
Expand Down

0 comments on commit a91d7aa

Please sign in to comment.