From 546f346cf6870dc6305632bafc5075a51ce51dc9 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Wed, 26 Jul 2023 02:29:52 -0400 Subject: [PATCH] kv: enable replay protection for ambiguous writes on commits While previously, RPC failures were assumed to be retriable, as write operations (with the notable exception of `EndTxn`) were assumed to be idempotent, it has been seen in #67765 and documented in #103817 that RPC failures on write operations that occur in parallel with a commit (i.e. a partial batch where `withCommit==true`), it is not always possible to assume idempotency and retry the "ambiguous" writes. This is due to the fact that the retried write RPC could result in the transaction's `WriteTimestamp` being bumped, changing the commit timestamp of the transaction that may in fact already be implicitly committed if the initial "ambiguous" write actually succeeded. This change modifies the protocol of the DistSender to flag in subsequent retries that a batch with a commit has previously experienced ambiguity, as well as the handling of the retried write in the MVCC layer to detect this previous ambiguity and reject retries that change the write timestamp as a non-idempotent replay. The flag allows subsequent retries to "remember" the earlier ambiguous write and evaluate accordingly. The flag allows us to properly handle RPC failures (i.e. ambiguous writes) that occur on commit, as a transaction that is implicitly committed is eligible to be marked as explicitly committed by contending transactions via the `RecoverTxn` operation, resulting in a race between retries by the transaction coordinator and recovery by contending transactions that could result in either incorrectly reporting a transaction as having failed with a `RETRY_SERIALIZABLE` error (despite the possibility that it already was or could be recovered and successfully committed), or in attempting to explicitly commit an already-recovered and committed transaction, resulting in seeing an assertion failure due to `transaction unexpectedly committed`. The replay protection introduced here allows us to avoid both of these situations by detecting a replay that should be considered non-idempotent and returning an error, causing the original RPC error remembered by the DistSender to be propagated as an `AmbiguousResultError`. As such, this can be handled by application code by validating the success/failure of a transaction when receiving this error. Depends on #107680, #107323, #108154, #108001 Fixes: #103817 Release note (bug fix): Properly handles RPC failures on writes using the parallel commit protocol that execute in parallel to the commit operation, avoiding incorrect retriable failures and `transaction unexpectedly committed` assertions by detecting when writes cannot be retried idempotently, instead returning an `AmbiguousResultError`. --- pkg/kv/kvclient/kvcoord/dist_sender.go | 29 +- .../kvcoord/dist_sender_ambiguous_test.go | 248 ++++++++---------- pkg/kv/kvpb/api.proto | 10 +- pkg/kv/kvserver/batcheval/cmd_delete.go | 7 +- pkg/kv/kvserver/batcheval/cmd_delete_range.go | 11 +- pkg/kv/kvserver/batcheval/cmd_increment.go | 7 +- pkg/kv/kvserver/batcheval/cmd_init_put.go | 7 +- pkg/kv/kvserver/batcheval/cmd_put.go | 7 +- pkg/storage/mvcc.go | 29 +- 9 files changed, 186 insertions(+), 169 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 4f6aaff71f4c..25fd3f54d6ec 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2084,7 +2084,8 @@ func maybeSetResumeSpan( // the error that the last attempt to execute the request returned. func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error { if ambiguousErr != nil { - return kvpb.NewAmbiguousResultErrorf("error=%s [exhausted]", ambiguousErr) + return kvpb.NewAmbiguousResultErrorf("error=%s [exhausted] (last error: %v)", + ambiguousErr, lastAttemptErr) } // TODO(bdarnell): The error from the last attempt is not necessarily the best @@ -2262,6 +2263,10 @@ func (ds *DistSender) sendToReplicas( ba = ba.ShallowCopy() ba.Replica = curReplica ba.RangeID = desc.RangeID + ba.AmbiguousReplayProtection = ambiguousError != nil + if ba.AmbiguousReplayProtection && ba.CanForwardReadTimestamp { + ba.CanForwardReadTimestamp = false + } // Communicate to the server the information our cache has about the // range. If it's stale, the server will return an update. ba.ClientRangeInfo = roachpb.ClientRangeInfo{ @@ -2296,10 +2301,13 @@ func (ds *DistSender) sendToReplicas( ds.maybeIncrementErrCounters(br, err) if err != nil { + log.VErrEventf(ctx, 2, "RPC error: %s", err) + if grpcutil.IsAuthError(err) { // Authentication or authorization error. Propagate. if ambiguousError != nil { - return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate]", ambiguousError) + return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate] (last error: %v)", + ambiguousError, err) } return nil, err } @@ -2321,10 +2329,6 @@ func (ds *DistSender) sendToReplicas( // ambiguity. // 2) SQL recognizes AmbiguousResultErrors and gives them a special code // (StatementCompletionUnknown). - // TODO(andrei): The use of this code is inconsistent because a) the - // DistSender tries to only return the code for commits, but it'll happily - // forward along AmbiguousResultErrors coming from the replica and b) we - // probably should be returning that code for non-commit statements too. // // We retry requests in order to avoid returning errors (in particular, // AmbiguousResultError). Retrying the batch will either: @@ -2339,6 +2343,12 @@ func (ds *DistSender) sendToReplicas( // can't claim success (and even if we could claim success, we still // wouldn't have the complete result of the successful evaluation). // + // Note that in case c), a request is not idempotent if the retry finds + // the request succeeded the first time around, but requires a change to + // the transaction's write timestamp. This is guarded against by setting + // the AmbiguousReplayProtection flag, so that the replay is aware the + // batch has seen an ambiguous error. + // // Case a) is great - the retry made the request succeed. Case b) is also // good; due to idempotency we managed to swallow a communication error. // Case c) is not great - we'll end up returning an error even though the @@ -2351,10 +2361,12 @@ func (ds *DistSender) sendToReplicas( // evaluating twice, overwriting another unrelated write that fell // in-between. // + // NB: If this partial batch does not contain the EndTxn request but the + // batch contains a commit, the ambiguous error should be caught on + // retrying the writes, should it need to be propagated. if withCommit && !grpcutil.RequestDidNotStart(err) { ambiguousError = err } - log.VErrEventf(ctx, 2, "RPC error: %s", err) // If the error wasn't just a context cancellation and the down replica // is cached as the lease holder, evict it. The only other eviction @@ -2510,7 +2522,8 @@ func (ds *DistSender) sendToReplicas( } default: if ambiguousError != nil { - return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate]", ambiguousError) + return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate] (last error: %v)", + ambiguousError, br.Error.GoError()) } // The error received is likely not specific to this diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go index 5fba569a9850..84d53fee0996 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go @@ -208,18 +208,33 @@ type interceptorHelperMutex struct { maybeWait func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) } +// validateTxnCommitAmbiguousError checks that an error on txn commit is +// ambiguous, rather than an assertion failure or a retryable error. +func validateTxnCommitAmbiguousError(t *testing.T, err error, reason string) { + aErr := (*kvpb.AmbiguousResultError)(nil) + rErr := (*kvpb.TransactionRetryWithProtoRefreshError)(nil) + tErr := (*kvpb.TransactionStatusError)(nil) + require.Errorf(t, err, "expected an AmbiguousResultError") + require.ErrorAsf(t, err, &aErr, + "expected AmbiguousResultError due to %s", reason) + require.Falsef(t, errors.As(err, &tErr), + "did not expect TransactionStatusError due to being already committed") + require.Falsef(t, errors.As(err, &rErr), + "did not expect incorrect TransactionRetryWithProtoRefreshError due to failed refresh") + require.Falsef(t, errors.HasAssertionFailure(err), + "expected no AssertionFailedError due to sanity check on transaction already committed") + require.ErrorContainsf(t, aErr, reason, + "expected AmbiguousResultError to include message \"%s\"", reason) +} + // TestTransactionUnexpectedlyCommitted validates the handling of the case where // a parallel commit transaction with an ambiguous error on a write races with // a contending transaction's recovery attempt. In the case that the recovery // succeeds prior to the original transaction's retries, an ambiguous error // should be raised. // -// NB: This case encounters a known issue described in #103817 and seen in -// #67765, where it currently is surfaced as an assertion failure that will -// result in a node crash. -// -// TODO(sarkesian): Validate the ambiguous result error once the initial fix as -// outlined in #103817 has been resolved. +// NB: These tests deal with a known issue described in #103817 and seen in +// #67765. func TestTransactionUnexpectedlyCommitted(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -586,12 +601,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) } - // -- NB: If ambiguous errors were propagated, txn1 would end here. - // -- NB: When ambiguous errors are not propagated, txn1 continues with: - // // 8. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start // timestamp, and attempts to evaluate it as an idempotent replay, but at // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + // 9. txn1->n1: Refresh(a) // 10. txn1->n1: Refresh(b) // 11. txn1->n1: EndTxn(commit) -- Before sending, pause the request so @@ -628,21 +644,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // TODO(sarkesian): While we expect an AmbiguousResultError once the - // immediate changes outlined in #103817 are implemented, right now this is - // essentially validating the existence of the bug. This needs to be fixed, - // and we should not see an assertion failure from the transaction - // coordinator once fixed. - tErr := (*kvpb.TransactionStatusError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected TransactionStatusError due to being already committed") - require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, - "expected TransactionStatusError due to being already committed") - require.Truef(t, errors.HasAssertionFailure(err), - "expected AssertionFailedError due to sanity check on transaction already committed") + // NB: While ideally we would hope to see a successful commit without + // error, without querying txn record/intents from the txn coordinator we + // expect an AmbiguousResultError in this case (as outlined in #103817). + validateTxnCommitAmbiguousError(t, err, "replay protection" /* reason */) }) t.Run("writer writer conflict", func(t *testing.T) { initTest(t) @@ -725,12 +733,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) } - // -- NB: If ambiguous errors were propagated, txn1 would end here. - // -- NB: When ambiguous errors are not propagated, txn1 continues with: - // // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start // timestamp, and attempts to evaluate it as an idempotent replay, but at // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + // 10. txn1->n1: Refresh(a) // 11. txn1->n1: Refresh(b) // 12. txn1->n1: EndTxn(commit) -- Before sending, pause the request so @@ -767,21 +776,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // TODO(sarkesian): While we expect an AmbiguousResultError once the - // immediate changes outlined in #103817 are implemented, right now this is - // essentially validating the existence of the bug. This needs to be fixed, - // and we should not see an assertion failure from the transaction - // coordinator once fixed. - tErr := (*kvpb.TransactionStatusError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected TransactionStatusError due to being already committed") - require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, - "expected TransactionStatusError due to being already committed") - require.Truef(t, errors.HasAssertionFailure(err), - "expected AssertionFailedError due to sanity check on transaction already committed") + // NB: While ideally we would hope to see a successful commit without + // error, without querying txn record/intents from the txn coordinator we + // expect an AmbiguousResultError in this case (as outlined in #103817). + validateTxnCommitAmbiguousError(t, err, "replay protection" /* reason */) }) t.Run("workload conflict", func(t *testing.T) { initTest(t) @@ -849,12 +850,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) } - // -- NB: If ambiguous errors were propagated, txn1 would end here. - // -- NB: When ambiguous errors are not propagated, txn1 continues with: - // // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start // timestamp, and attempts to evaluate it as an idempotent replay, but at // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + // 10. txn1->n1: Refresh(a) // 11. txn1->n1: Refresh(b) // 12. txn1->n1: EndTxn(commit) -- Before sending, pause the request so @@ -901,21 +903,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // TODO(sarkesian): While we expect an AmbiguousResultError once the - // immediate changes outlined in #103817 are implemented, right now this is - // essentially validating the existence of the bug. This needs to be fixed, - // and we should not see an assertion failure from the transaction - // coordinator once fixed. - tErr := (*kvpb.TransactionStatusError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected TransactionStatusError due to being already committed") - require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, - "expected TransactionStatusError due to being already committed") - require.Truef(t, errors.HasAssertionFailure(err), - "expected AssertionFailedError due to sanity check on transaction already committed") + // NB: While ideally we would hope to see a successful commit without + // error, without querying txn record/intents from the txn coordinator we + // expect an AmbiguousResultError in this case (as outlined in #103817). + validateTxnCommitAmbiguousError(t, err, "replay protection" /* reason */) }) t.Run("recovery before refresh fails", func(t *testing.T) { initTest(t) @@ -1013,15 +1007,16 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) } - // -- NB: If ambiguous errors were propagated, txn1 would end here. - // -- NB: When ambiguous errors are not propagated, txn1 continues with: - // // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start // timestamp, and attempts to evaluate it as an idempotent replay, but at // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + // 10. txn1->n1: Refresh(a) // 11. txn1->n1: Refresh(b,c) -- This fails due to txn2's intent on c. - // Causes the transaction coordinator to return a retriable error, + // Causes the transaction coordinator to return a retryable error, // although the transaction has been actually committed during recovery; // a highly problematic bug. @@ -1039,20 +1034,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // TODO(sarkesian): While we expect an AmbiguousResultError once the - // immediate changes outlined in #103817 are implemented, right now this is - // essentially validating the existence of the bug. This needs to be fixed, - // and we should not see an transaction retry error from the transaction - // coordinator once fixed. - tErr := (*kvpb.TransactionRetryWithProtoRefreshError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected incorrect TransactionRetryWithProtoRefreshError due to failed refresh") - require.False(t, tErr.PrevTxnAborted()) - require.Falsef(t, errors.HasAssertionFailure(err), - "expected no AssertionFailedError due to sanity check") + // NB: While ideally we would hope to see a successful commit without + // error, without querying txn record/intents from the txn coordinator we + // expect an AmbiguousResultError in this case (as outlined in #103817). + validateTxnCommitAmbiguousError(t, err, "replay protection" /* reason */) }) t.Run("recovery after refresh fails", func(t *testing.T) { initTest(t) @@ -1160,15 +1148,16 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) } - // -- NB: If ambiguous errors were propagated, txn1 would end here. - // -- NB: When ambiguous errors are not propagated, txn1 continues with: - // // 10. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start // timestamp, and attempts to evaluate it as an idempotent replay, but at // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + // 11. txn1->n1: Refresh(a) // 12. txn1->n1: Refresh(b,c) -- This fails due to txn3's write on c. - // Causes the transaction coordinator to return a retriable error, + // Causes the transaction coordinator to return a retryable error, // although the transaction could be actually committed during recovery; // a highly problematic bug. // -- because txn1 @@ -1176,7 +1165,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // so the recovery will succeed in marking it explicitly committed. if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { // The RecoverTxn operation should be evaluated after txn1 completes, - // in this case with a problematic retriable error. + // in this case with a problematic retryable error. req.pauseUntil(t, txn1Done, cp) } if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { @@ -1201,20 +1190,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // TODO(sarkesian): While we expect an AmbiguousResultError once the - // immediate changes outlined in #103817 are implemented, right now this is - // essentially validating the existence of the bug. This needs to be fixed, - // and we should not see an transaction retry error from the transaction - // coordinator once fixed. - tErr := (*kvpb.TransactionRetryWithProtoRefreshError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected incorrect TransactionRetryWithProtoRefreshError due to failed refresh") - require.False(t, tErr.PrevTxnAborted()) - require.Falsef(t, errors.HasAssertionFailure(err), - "expected no AssertionFailedError due to sanity check") + // NB: While ideally we would hope to see a successful commit without + // error, without querying txn record/intents from the txn coordinator we + // expect an AmbiguousResultError in this case (as outlined in #103817). + validateTxnCommitAmbiguousError(t, err, "replay protection" /* reason */) }) t.Run("recovery after transfer lease", func(t *testing.T) { initTest(t) @@ -1278,12 +1260,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) } - // -- NB: If ambiguous errors were propagated, txn1 would end here. - // -- NB: When ambiguous errors are not propagated, txn1 continues with: - // // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start // timestamp, and attempts to evaluate it as an idempotent replay, but at // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + // 10. txn1->n1: Refresh(a) // 11. txn1->n1: Refresh(b) // 12. txn1->n1: EndTxn(commit) -- Recovery has already completed, so this @@ -1302,21 +1285,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // TODO(sarkesian): While we expect an AmbiguousResultError once the - // immediate changes outlined in #103817 are implemented, right now this is - // essentially validating the existence of the bug. This needs to be fixed, - // and we should not see an assertion failure from the transaction - // coordinator once fixed. - tErr := (*kvpb.TransactionStatusError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected TransactionStatusError due to being already committed") - require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, - "expected TransactionStatusError due to being already committed") - require.Truef(t, errors.HasAssertionFailure(err), - "expected AssertionFailedError due to sanity check on transaction already committed") + // NB: While ideally we would hope to see a successful commit without + // error, without querying txn record/intents from the txn coordinator we + // expect an AmbiguousResultError in this case (as outlined in #103817). + validateTxnCommitAmbiguousError(t, err, "replay protection" /* reason */) }) t.Run("retry sees other intent", func(t *testing.T) { initTest(t) @@ -1385,6 +1360,9 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // error. Since the transaction had an earlier ambiguous failure on a // batch with a commit, it should propagate the ambiguous error. + // -- NB: Ambiguous replay protection is not required, and should not + // come into play here. + // if req.txnName == "txn2" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { req.pauseUntil(t, txn1Done, cp) @@ -1397,17 +1375,12 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - aErr := (*kvpb.AmbiguousResultError)(nil) - tErr := (*kvpb.TransactionStatusError)(nil) - require.ErrorAsf(t, err, &aErr, - "expected AmbiguousResultError due to encountering an intent on retry") - require.Falsef(t, errors.As(err, &tErr), - "did not expect TransactionStatusError due to being already committed") - require.Falsef(t, errors.HasAssertionFailure(err), - "expected no AssertionFailedError due to sanity check on transaction already committed") + // NB: It is likely not possible to eliminate ambiguity in this case, as + // the original intents were already cleaned up. + validateTxnCommitAmbiguousError(t, err, "WriteTooOld" /* reason */) }) t.Run("recovery before retry at same timestamp", func(t *testing.T) { initTest(t) @@ -1497,6 +1470,9 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // serverside refresh as reads were already returned. Fails, and should // propagate the ambiguous error. + // -- NB: Ambiguous replay protection is not required, and should not + // come into play here. + // if req.txnName == "txn2" && hasPut && cp == BeforeSending { <-txn1Done @@ -1509,19 +1485,12 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // TODO(sarkesian): Once we incorporate secondary errors into the - // AmbiguousResultError, check that we see the WriteTooOldError. - aErr := (*kvpb.AmbiguousResultError)(nil) - tErr := (*kvpb.TransactionStatusError)(nil) - require.ErrorAsf(t, err, &aErr, - "expected AmbiguousResultError due to encountering an intent on retry") - require.Falsef(t, errors.As(err, &tErr), - "did not expect TransactionStatusError due to being already committed") - require.Falsef(t, errors.HasAssertionFailure(err), - "expected no AssertionFailedError due to sanity check on transaction already committed") + // NB: It is likely not possible to eliminate ambiguity in this case, as + // the original intents were already cleaned up. + validateTxnCommitAmbiguousError(t, err, "WriteTooOld" /* reason */) }) t.Run("recovery after retry at same timestamp", func(t *testing.T) { initTest(t) @@ -1604,7 +1573,9 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { } // 8. txn1->n1: Put(b) -- Retry gets evaluated as idempotent replay and - // correctly succeeds. + // correctly succeeds. Ambiguous replay protection, though enabled, + // should not cause the retried write to error. + // -- NB: In this case, txn1 returns without error here, as there is no // need to refresh and the final EndTxn can be split off and run // asynchronously. @@ -1645,7 +1616,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() require.NoErrorf(t, err, "expected txn1 to succeed") @@ -1749,9 +1720,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) } - // 7. txn1->n1: Put(b) -- Retry gets WriteTooOld, performs a - // serverside refresh, and succeeds. - // 8. txn1->n1: EndTxn(commit) -- Results in "transaction unexpectedly + // 7. txn1->n1: Put(b) -- Retry gets WriteTooOld; with ambiguous replay + // protection enabled, it should not be able to perform a serverside + // refresh. Fails, and should propagate the ambiguous error. + + // -- NB: Without ambiguous replay protection, txn1 would continue with: + + // 8. txn1->n1: EndTxn(commit) -- Would get "transaction unexpectedly // committed" due to the recovery completing first. // @@ -1766,20 +1741,11 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // TODO(sarkesian): While we expect an AmbiguousResultError once the - // immediate changes outlined in #103817 are implemented, right now this is - // essentially validating the existence of the bug. This needs to be fixed, - // and we should not see an assertion failure from the transaction - // coordinator once fixed. - tErr := (*kvpb.TransactionStatusError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected TransactionStatusError due to being already committed") - require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, - "expected TransactionStatusError due to being already committed") - require.Truef(t, errors.HasAssertionFailure(err), - "expected AssertionFailedError due to sanity check on transaction already committed") + // NB: It is likely not possible to eliminate ambiguity in this case, as + // the original intents were already cleaned up. + validateTxnCommitAmbiguousError(t, err, "WriteTooOld" /* reason */) }) } diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 46993c4a2f28..82f60d15c44d 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -2766,9 +2766,17 @@ message Header { // sender. repeated string profile_labels = 31 [(gogoproto.customname) = "ProfileLabels"]; + // AmbiguousReplayProtection, if set, prevents replay of a write operation + // from being considered idempotent if its write timestamp is different from + // the intent's. This protection is required when there has been an + // ambiguous write (i.e. RPC error) on a batch that contained a commit, + // as the transaction may have already been explicitly committed by a racing + // RecoverTxn request. See #103817. + bool ambiguous_replay_protection = 32; + reserved 7, 10, 12, 14, 20; - // Next ID: 32 + // Next ID: 33 } // BoundedStalenessHeader contains configuration values pertaining to bounded diff --git a/pkg/kv/kvserver/batcheval/cmd_delete.go b/pkg/kv/kvserver/batcheval/cmd_delete.go index e5321313c27b..3c5233471374 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete.go @@ -32,9 +32,10 @@ func Delete( reply := resp.(*kvpb.DeleteResponse) opts := storage.MVCCWriteOptions{ - Txn: h.Txn, - LocalTimestamp: cArgs.Now, - Stats: cArgs.Stats, + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + EnableReplayProtection: h.AmbiguousReplayProtection, } var err error diff --git a/pkg/kv/kvserver/batcheval/cmd_delete_range.go b/pkg/kv/kvserver/batcheval/cmd_delete_range.go index 7820b0d811e8..3b8796e2dea9 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete_range.go @@ -223,6 +223,14 @@ func DeleteRange( if !args.Inline { timestamp = h.Timestamp } + + opts := storage.MVCCWriteOptions{ + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + EnableReplayProtection: h.AmbiguousReplayProtection, + } + // NB: Even if args.ReturnKeys is false, we want to know which intents were // written if we're evaluating the DeleteRange for a transaction so that we // can update the Result's AcquiredLocks field. @@ -230,8 +238,7 @@ func DeleteRange( deleted, resumeSpan, num, err := storage.MVCCDeleteRange( ctx, readWriter, args.Key, args.EndKey, h.MaxSpanRequestKeys, timestamp, - storage.MVCCWriteOptions{Txn: h.Txn, LocalTimestamp: cArgs.Now, Stats: cArgs.Stats}, - returnKeys) + opts, returnKeys) if err != nil { return result.Result{}, err } diff --git a/pkg/kv/kvserver/batcheval/cmd_increment.go b/pkg/kv/kvserver/batcheval/cmd_increment.go index be576e3d1c48..60afb11f6de6 100644 --- a/pkg/kv/kvserver/batcheval/cmd_increment.go +++ b/pkg/kv/kvserver/batcheval/cmd_increment.go @@ -33,9 +33,10 @@ func Increment( reply := resp.(*kvpb.IncrementResponse) opts := storage.MVCCWriteOptions{ - Txn: h.Txn, - LocalTimestamp: cArgs.Now, - Stats: cArgs.Stats, + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + EnableReplayProtection: h.AmbiguousReplayProtection, } var err error diff --git a/pkg/kv/kvserver/batcheval/cmd_init_put.go b/pkg/kv/kvserver/batcheval/cmd_init_put.go index 7c6301fc7142..7ec1e7ebe22d 100644 --- a/pkg/kv/kvserver/batcheval/cmd_init_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_init_put.go @@ -37,9 +37,10 @@ func InitPut( } opts := storage.MVCCWriteOptions{ - Txn: h.Txn, - LocalTimestamp: cArgs.Now, - Stats: cArgs.Stats, + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + EnableReplayProtection: h.AmbiguousReplayProtection, } var err error diff --git a/pkg/kv/kvserver/batcheval/cmd_put.go b/pkg/kv/kvserver/batcheval/cmd_put.go index 5db6b772a688..107170117479 100644 --- a/pkg/kv/kvserver/batcheval/cmd_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_put.go @@ -55,9 +55,10 @@ func Put( } opts := storage.MVCCWriteOptions{ - Txn: h.Txn, - LocalTimestamp: cArgs.Now, - Stats: cArgs.Stats, + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + EnableReplayProtection: h.AmbiguousReplayProtection, } var err error diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index bc5387c31fe6..e00b000e6519 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -1665,6 +1665,7 @@ func replayTransactionalWrite( value roachpb.Value, txn *roachpb.Transaction, valueFn func(optionalValue) (roachpb.Value, error), + ambiguousReplayProtection bool, ) error { var writtenValue optionalValue var err error @@ -1750,6 +1751,14 @@ func replayTransactionalWrite( txn.ID, txn.Sequence, value.RawBytes, writtenValue.RawBytes) } + // If ambiguous replay protection is enabled, a replay that changes the + // timestamp should fail, as this would break idempotency (see #103817). + if ambiguousReplayProtection && !txn.WriteTimestamp.Equal(meta.Txn.WriteTimestamp) { + return errors.Errorf("transaction %s with sequence %d prevented from changing "+ + "write timestamp from %s to %s due to replay protection", + txn.ID, txn.Sequence, meta.Txn.WriteTimestamp, txn.WriteTimestamp) + } + return nil } @@ -1819,6 +1828,9 @@ func mvccPutInternal( if !value.Timestamp.IsEmpty() { return false, errors.Errorf("cannot have timestamp set in value") } + if err := opts.validate(); err != nil { + return false, err + } metaKey := MakeMVCCMetadataKey(key) ok, origMetaKeySize, origMetaValSize, origRealKeyChanged, err := @@ -1918,7 +1930,7 @@ func mvccPutInternal( // The transaction has executed at this sequence before. This is merely a // replay of the transactional write. Assert that all is in order and return // early. - return false, replayTransactionalWrite(ctx, iter, meta, key, readTimestamp, value, opts.Txn, valueFn) + return false, replayTransactionalWrite(ctx, iter, meta, key, readTimestamp, value, opts.Txn, valueFn, opts.EnableReplayProtection) } // We're overwriting the intent that was present at this key, before we do @@ -2306,7 +2318,6 @@ func MVCCIncrement( newValue.InitChecksum(key) return newValue, nil } - err := mvccPutUsingIter(ctx, rw, iter, key, timestamp, noValue, valueFn, opts) return newInt64Val, err @@ -3897,9 +3908,17 @@ func buildScanIntents(data []byte) ([]roachpb.Intent, error) { // MVCCWriteOptions bundles options for the MVCCPut and MVCCDelete families of functions. type MVCCWriteOptions struct { // See the comment on mvccPutInternal for details on these parameters. - Txn *roachpb.Transaction - LocalTimestamp hlc.ClockTimestamp - Stats *enginepb.MVCCStats + Txn *roachpb.Transaction + LocalTimestamp hlc.ClockTimestamp + Stats *enginepb.MVCCStats + EnableReplayProtection bool +} + +func (opts *MVCCWriteOptions) validate() error { + if opts.EnableReplayProtection && opts.Txn == nil { + return errors.Errorf("cannot enable replay protection without a transaction") + } + return nil } // MVCCScanOptions bundles options for the MVCCScan family of functions.