Skip to content

Commit

Permalink
Merge pull request cockroachdb#109850 from nvanbenschoten/backport22.…
Browse files Browse the repository at this point in the history
…2-109510

release-22.2: kv: prioritize errors in UpdateStateOnRemoteRetryableErr
  • Loading branch information
nvanbenschoten authored Sep 7, 2023
2 parents cfd97ed + c03ab81 commit a9d7853
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 3 deletions.
16 changes: 13 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,16 @@ func (tc *TxnCoordSender) UpdateStateOnRemoteRetryableErr(
func (tc *TxnCoordSender) handleRetryableErrLocked(
ctx context.Context, pErr *roachpb.Error,
) *roachpb.TransactionRetryWithProtoRefreshError {
// If the transaction is already in a retryable state and the provided error
// does not have a higher priority than the existing error, return the
// existing error instead of attempting to handle the retryable error. This
// prevents the TxnCoordSender from losing information about a higher
// priority error.
if tc.mu.txnState == txnRetryableError &&
roachpb.ErrPriority(pErr.GoError()) <= roachpb.ErrPriority(tc.mu.storedRetryableErr) {
return tc.mu.storedRetryableErr
}

// If the error is a transaction retry error, update metrics to
// reflect the reason for the restart. More details about the
// different error types are documented above on the metaRestart
Expand Down Expand Up @@ -829,8 +839,8 @@ func (tc *TxnCoordSender) handleRetryableErrLocked(
if errTxnID != newTxn.ID {
// Remember that this txn is aborted to reject future requests.
tc.mu.txn.Status = roachpb.ABORTED
// Abort the old txn. The client is not supposed to use use this
// TxnCoordSender any more.
// Abort the old txn. The client is not supposed to use this
// TxnCoordSender anymore.
tc.interceptorAlloc.txnHeartbeater.abortTxnAsyncLocked(ctx)
tc.cleanupTxnLocked(ctx)
return retErr
Expand Down Expand Up @@ -1167,7 +1177,7 @@ func (tc *TxnCoordSender) GetLeafTxnInputState(
}

// Also mark the TxnCoordSender as "active". This prevents changing
// the priority after a leaf has been created. It als conservatively
// the priority after a leaf has been created. It also conservatively
// ensures that Active() returns true if there's maybe a command
// being executed concurrently by a leaf.
tc.mu.active = true
Expand Down
98 changes: 98 additions & 0 deletions pkg/kv/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,3 +709,101 @@ func TestUpdateStateOnRemoteRetryableErr(t *testing.T) {
require.Equal(t, txn.Sender().TestingCloneTxn().ID, txnIDBefore)
}
}

// TestUpdateStateOnRemoteRetryableErrErrorRanking tests that when multiple
// errors are provided to UpdateStateOnRemoteRetryableError, the error with the
// highest priority is used to update the transaction state and errors with
// lower priority are unable to change the transaction state.
func TestUpdateStateOnRemoteRetryableErrErrorRanking(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, _, db := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

orderedErrs := []struct {
err *roachpb.Error
expNewError bool // if we expect the txn's retryable error to change
expNewEpoch bool // if we expect the epoch to be bumped
expAborted bool // if we expect the txn to be aborted
}{
// Step 1. The txn starts out with no retryable error and restarts when
// it hits an uncertainty error.
{
err: roachpb.NewError(&roachpb.ReadWithinUncertaintyIntervalError{}),
expNewError: true,
expNewEpoch: true,
expAborted: false,
},
// Step 2. The txn ignores any further retryable errors from the same
// epoch.
{
err: roachpb.NewError(&roachpb.WriteTooOldError{}),
expNewError: false,
expNewEpoch: false,
expAborted: false,
},
// Step 3. The txn moves to an aborted state when it hits a txn aborted
// error.
{
err: roachpb.NewError(&roachpb.TransactionAbortedError{}),
expNewError: true,
expNewEpoch: false,
expAborted: true,
},
// Step 4. The txn ignores any further retryable errors.
{
err: roachpb.NewError(&roachpb.ReadWithinUncertaintyIntervalError{}),
expNewError: false,
expNewEpoch: false,
expAborted: true,
},
// Step 5. The txn ignores any further txn aborted errors.
{
err: roachpb.NewError(&roachpb.TransactionAbortedError{}),
expNewError: false,
expNewEpoch: false,
expAborted: true,
},
}

// Unlike TestUpdateStateOnRemoteRetryableErr, use the same txn for all
// errors and observe how it changes as each error is consumed.
txn := db.NewTxn(ctx, "test")
tcs := txn.Sender()
txnProto := tcs.TestingCloneTxn()
for _, tc := range orderedErrs {
retryErrBefore := tcs.GetTxnRetryableErr(ctx)
epochBefore := tcs.Epoch()

pErr := tc.err
pErr.SetTxn(txnProto)
err := txn.UpdateStateOnRemoteRetryableErr(ctx, pErr)
// Ensure what we got back is a TransactionRetryWithProtoRefreshError.
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err)
// Ensure the same thing is stored on the TxnCoordSender as well.
retErr := tcs.GetTxnRetryableErr(ctx)
require.Equal(t, retErr, err)
require.Equal(t, retErr.TxnID, txnProto.ID)

// Assert that the transaction's state has changed as expected.
if tc.expNewError {
require.NotEqual(t, retErr, retryErrBefore)
} else {
require.Equal(t, retErr, retryErrBefore)
}
if tc.expNewEpoch {
require.Equal(t, epochBefore+1, tcs.Epoch())
} else {
require.Equal(t, epochBefore, tcs.Epoch())
}
if tc.expAborted {
require.Equal(t, roachpb.ABORTED, tcs.TxnStatus())
require.True(t, retErr.PrevTxnAborted())
} else {
require.Equal(t, roachpb.PENDING, tcs.TxnStatus())
require.False(t, retErr.PrevTxnAborted())
}
}
}

0 comments on commit a9d7853

Please sign in to comment.