diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 765e7b434be8..ebbedac2ea21 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1166,7 +1166,7 @@ func (ds *DistSender) detectIntentMissingDueToIntentResolution( // We weren't able to determine whether the intent missing error is // due to intent resolution or not, so it is still ambiguous whether // the commit succeeded. - return false, kvpb.NewAmbiguousResultErrorf("error=%s [intent missing]", pErr) + return false, kvpb.NewAmbiguousResultErrorf("error=%v [intent missing]", pErr) } resp := br.Responses[0].GetQueryTxn() respTxn := &resp.QueriedTxn @@ -1980,7 +1980,8 @@ func maybeSetResumeSpan( // the error that the last attempt to execute the request returned. func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error { if ambiguousErr != nil { - return kvpb.NewAmbiguousResultErrorf("error=%s [exhausted]", ambiguousErr) + return kvpb.NewAmbiguousResultErrorf("error=%v [exhausted] (last error: %v)", + ambiguousErr, lastAttemptErr) } // TODO(bdarnell): The error from the last attempt is not necessarily the best @@ -2158,6 +2159,28 @@ func (ds *DistSender) sendToReplicas( ba = ba.ShallowCopy() ba.Replica = curReplica ba.RangeID = desc.RangeID + + // When a sub-batch from a batch containing a commit experiences an + // ambiguous error, it is critical to ensure subsequent replay attempts + // do not permit changing the write timestamp, as the transaction may + // already have been considered implicitly committed. + ba.AmbiguousReplayProtection = ambiguousError != nil + + // In the case that the batch has already seen an ambiguous error, in + // addition to enabling ambiguous replay protection, we also need to + // disable the ability for the server to forward the read timestamp, as + // the transaction may have been implicitly committed. If the intents for + // the implicitly committed transaction were already resolved, on a replay + // attempt encountering committed values above the read timestamp the + // server will attempt to handle what seems to be a write-write conflict by + // throwing a WriteTooOld, which could be refreshed away on the server if + // the read timestamp can be moved. Disabling this ability protects against + // refreshing away the error when retrying the ambiguous operation, instead + // returning to the DistSender so the ambiguous error can be propagated. + if ambiguousError != nil && ba.CanForwardReadTimestamp { + ba.CanForwardReadTimestamp = false + } + // Communicate to the server the information our cache has about the // range. If it's stale, the serve will return an update. ba.ClientRangeInfo = roachpb.ClientRangeInfo{ @@ -2183,10 +2206,13 @@ func (ds *DistSender) sendToReplicas( ds.maybeIncrementErrCounters(br, err) if err != nil { + log.VErrEventf(ctx, 2, "RPC error: %s", err) + if grpcutil.IsAuthError(err) { // Authentication or authorization error. Propagate. if ambiguousError != nil { - return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate]", ambiguousError) + return nil, kvpb.NewAmbiguousResultErrorf("error=%v [propagate] (last error: %v)", + ambiguousError, err) } return nil, err } @@ -2208,10 +2234,6 @@ func (ds *DistSender) sendToReplicas( // ambiguity. // 2) SQL recognizes AmbiguousResultErrors and gives them a special code // (StatementCompletionUnknown). - // TODO(andrei): The use of this code is inconsistent because a) the - // DistSender tries to only return the code for commits, but it'll happily - // forward along AmbiguousResultErrors coming from the replica and b) we - // probably should be returning that code for non-commit statements too. // // We retry requests in order to avoid returning errors (in particular, // AmbiguousResultError). Retrying the batch will either: @@ -2226,6 +2248,12 @@ func (ds *DistSender) sendToReplicas( // can't claim success (and even if we could claim success, we still // wouldn't have the complete result of the successful evaluation). // + // Note that in case c), a request is not idempotent if the retry finds + // the request succeeded the first time around, but requires a change to + // the transaction's write timestamp. This is guarded against by setting + // the AmbiguousReplayProtection flag, so that the replay is aware the + // batch has seen an ambiguous error. + // // Case a) is great - the retry made the request succeed. Case b) is also // good; due to idempotency we managed to swallow a communication error. // Case c) is not great - we'll end up returning an error even though the @@ -2238,10 +2266,12 @@ func (ds *DistSender) sendToReplicas( // evaluating twice, overwriting another unrelated write that fell // in-between. // + // NB: If this partial batch does not contain the EndTxn request but the + // batch contains a commit, the ambiguous error should be caught on + // retrying the writes, should it need to be propagated. if withCommit && !grpcutil.RequestDidNotStart(err) { ambiguousError = err } - log.VErrEventf(ctx, 2, "RPC error: %s", err) // If the error wasn't just a context cancellation and the down replica // is cached as the lease holder, evict it. The only other eviction @@ -2397,7 +2427,8 @@ func (ds *DistSender) sendToReplicas( } default: if ambiguousError != nil { - return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate]", ambiguousError) + return nil, kvpb.NewAmbiguousResultErrorf("error=%v [propagate] (last error: %v)", + ambiguousError, br.Error.GoError()) } // The error received is likely not specific to this diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go index b460f9e09f23..6846b768340b 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go @@ -238,18 +238,33 @@ func (tMu *interceptorHelperMutex) configureSubTest(t *testing.T) (restore func( return restore } +// validateTxnCommitAmbiguousError checks that an error on txn commit is +// ambiguous, rather than an assertion failure or a retryable error. +func validateTxnCommitAmbiguousError(t *testing.T, err error, reason string) { + aErr := (*kvpb.AmbiguousResultError)(nil) + rErr := (*kvpb.TransactionRetryWithProtoRefreshError)(nil) + tErr := (*kvpb.TransactionStatusError)(nil) + require.Errorf(t, err, "expected an AmbiguousResultError") + require.ErrorAsf(t, err, &aErr, + "expected AmbiguousResultError due to %s", reason) + require.Falsef(t, errors.As(err, &tErr), + "did not expect TransactionStatusError due to being already committed") + require.Falsef(t, errors.As(err, &rErr), + "did not expect incorrect TransactionRetryWithProtoRefreshError due to failed refresh") + require.Falsef(t, errors.HasAssertionFailure(err), + "expected no AssertionFailedError due to sanity check on transaction already committed") + require.ErrorContainsf(t, aErr, reason, + "expected AmbiguousResultError to include message \"%s\"", reason) +} + // TestTransactionUnexpectedlyCommitted validates the handling of the case where // a parallel commit transaction with an ambiguous error on a write races with // a contending transaction's recovery attempt. In the case that the recovery // succeeds prior to the original transaction's retries, an ambiguous error // should be raised. // -// NB: This case encounters a known issue described in #103817 and seen in -// #67765, where it currently is surfaced as an assertion failure that will -// result in a node crash. -// -// TODO(sarkesian): Validate the ambiguous result error once the initial fix as -// outlined in #103817 has been resolved. +// NB: These tests deal with a known issue described in #103817 and seen in +// #67765. func TestTransactionUnexpectedlyCommitted(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -591,12 +606,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) } - // -- NB: If ambiguous errors were propagated, txn1 would end here. - // -- NB: When ambiguous errors are not propagated, txn1 continues with: - // // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start // timestamp, and attempts to evaluate it as an idempotent replay, but at // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + // 10. txn1->n1: Refresh(a) // 11. txn1->n1: Refresh(b) // 12. txn1->n1: EndTxn(commit) -- Before sending, pause the request so @@ -643,21 +659,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // TODO(sarkesian): While we expect an AmbiguousResultError once the - // immediate changes outlined in #103817 are implemented, right now this is - // essentially validating the existence of the bug. This needs to be fixed, - // and we should not see an assertion failure from the transaction - // coordinator once fixed. - tErr := (*kvpb.TransactionStatusError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected TransactionStatusError due to being already committed") - require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, - "expected TransactionStatusError due to being already committed") - require.Truef(t, errors.HasAssertionFailure(err), - "expected AssertionFailedError due to sanity check on transaction already committed") + // NB: While ideally we would hope to see a successful commit without + // error, without querying txn record/intents from the txn coordinator we + // expect an AmbiguousResultError in this case (as outlined in #103817). + validateTxnCommitAmbiguousError(t, err, "replay protection" /* reason */) } // The set of test cases that use the same request scheduling. @@ -803,15 +811,16 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) } - // -- NB: If ambiguous errors were propagated, txn1 would end here. - // -- NB: When ambiguous errors are not propagated, txn1 continues with: - // // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start // timestamp, and attempts to evaluate it as an idempotent replay, but at // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + // 10. txn1->n1: Refresh(a) // 11. txn1->n1: Refresh(b,c) -- This fails due to txn2's intent on c. - // Causes the transaction coordinator to return a retriable error, + // Causes the transaction coordinator to return a retryable error, // although the transaction has been actually committed during recovery; // a highly problematic bug. @@ -829,20 +838,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // TODO(sarkesian): While we expect an AmbiguousResultError once the - // immediate changes outlined in #103817 are implemented, right now this is - // essentially validating the existence of the bug. This needs to be fixed, - // and we should not see an transaction retry error from the transaction - // coordinator once fixed. - tErr := (*kvpb.TransactionRetryWithProtoRefreshError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected incorrect TransactionRetryWithProtoRefreshError due to failed refresh") - require.False(t, tErr.PrevTxnAborted()) - require.Falsef(t, errors.HasAssertionFailure(err), - "expected no AssertionFailedError due to sanity check") + // NB: While ideally we would hope to see a successful commit without + // error, without querying txn record/intents from the txn coordinator we + // expect an AmbiguousResultError in this case (as outlined in #103817). + validateTxnCommitAmbiguousError(t, err, "replay protection" /* reason */) }) // The txn coordinator shouldn't respond with an incorrect retryable failure @@ -946,15 +948,16 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) } - // -- NB: If ambiguous errors were propagated, txn1 would end here. - // -- NB: When ambiguous errors are not propagated, txn1 continues with: - // // 10. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start // timestamp, and attempts to evaluate it as an idempotent replay, but at // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + // 11. txn1->n1: Refresh(a) // 12. txn1->n1: Refresh(b,c) -- This fails due to txn3's write on c. - // Causes the transaction coordinator to return a retriable error, + // Causes the transaction coordinator to return a retryable error, // although the transaction could be actually committed during recovery; // a highly problematic bug. // -- because txn1 @@ -962,7 +965,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // so the recovery will succeed in marking it explicitly committed. if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { // The RecoverTxn operation should be evaluated after txn1 completes, - // in this case with a problematic retriable error. + // in this case with a problematic retryable error. req.pauseUntil(t, txn1Done, cp) } if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { @@ -987,20 +990,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // TODO(sarkesian): While we expect an AmbiguousResultError once the - // immediate changes outlined in #103817 are implemented, right now this is - // essentially validating the existence of the bug. This needs to be fixed, - // and we should not see an transaction retry error from the transaction - // coordinator once fixed. - tErr := (*kvpb.TransactionRetryWithProtoRefreshError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected incorrect TransactionRetryWithProtoRefreshError due to failed refresh") - require.False(t, tErr.PrevTxnAborted()) - require.Falsef(t, errors.HasAssertionFailure(err), - "expected no AssertionFailedError due to sanity check") + // NB: While ideally we would hope to see a successful commit without + // error, without querying txn record/intents from the txn coordinator we + // expect an AmbiguousResultError in this case (as outlined in #103817). + validateTxnCommitAmbiguousError(t, err, "replay protection" /* reason */) }) // This test is primarily included for completeness, in order to ensure the @@ -1067,12 +1063,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) } - // -- NB: If ambiguous errors were propagated, txn1 would end here. - // -- NB: When ambiguous errors are not propagated, txn1 continues with: - // // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start // timestamp, and attempts to evaluate it as an idempotent replay, but at // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + // 10. txn1->n1: Refresh(a) // 11. txn1->n1: Refresh(b) // 12. txn1->n1: EndTxn(commit) -- Recovery has already completed, so this @@ -1091,21 +1088,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // TODO(sarkesian): While we expect an AmbiguousResultError once the - // immediate changes outlined in #103817 are implemented, right now this is - // essentially validating the existence of the bug. This needs to be fixed, - // and we should not see an assertion failure from the transaction - // coordinator once fixed. - tErr := (*kvpb.TransactionStatusError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected TransactionStatusError due to being already committed") - require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, - "expected TransactionStatusError due to being already committed") - require.Truef(t, errors.HasAssertionFailure(err), - "expected AssertionFailedError due to sanity check on transaction already committed") + // NB: While ideally we would hope to see a successful commit without + // error, without querying txn record/intents from the txn coordinator we + // expect an AmbiguousResultError in this case (as outlined in #103817). + validateTxnCommitAmbiguousError(t, err, "replay protection" /* reason */) }) // When a retried write happens after another txn's intent already exists on @@ -1178,6 +1167,9 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // error. Since the transaction had an earlier ambiguous failure on a // batch with a commit, it should propagate the ambiguous error. + // -- NB: Ambiguous replay protection is not required, and should not + // come into play here. + // if req.txnName == "txn2" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { req.pauseUntil(t, txn1Done, cp) @@ -1190,7 +1182,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() // NB: Prior to #102808 and #102809 which were introduced in 23.2, @@ -1301,6 +1293,9 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // serverside refresh as reads were already returned. Fails, and should // propagate the ambiguous error. + // -- NB: Ambiguous replay protection is not required, and should not + // come into play here. + // if req.txnName == "txn2" && hasPut && cp == BeforeSending { <-txn1Done @@ -1313,21 +1308,12 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // NB: Prior to #102808 and #102809 which were introduced in 23.2, - // WriteTooOlds were handled via a complicated deferral mechanism, and - // writes that encountered higher-ts values would succeed, with a flag - // on the response for the client to handle. This has the side effect of - // masking AmbiguousResultErrors, and results in "unexpectedly committed". - tErr := (*kvpb.TransactionStatusError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected TransactionStatusError due to being already committed") - require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, - "expected TransactionStatusError due to being already committed") - require.Truef(t, errors.HasAssertionFailure(err), - "expected AssertionFailedError due to sanity check on transaction already committed") + // NB: It is likely not possible to eliminate ambiguity in this case, as + // the original intents were already cleaned up. + validateTxnCommitAmbiguousError(t, err, "WriteTooOld" /* reason */) }) // This test is included for completeness, but tests expected behavior; @@ -1417,7 +1403,9 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { } // 8. txn1->n1: Put(b) -- Retry gets evaluated as idempotent replay and - // correctly succeeds. + // correctly succeeds. Ambiguous replay protection, though enabled, + // should not cause the retried write to error. + // -- NB: In this case, txn1 returns without error here, as there is no // need to refresh and the final EndTxn can be split off and run // asynchronously. @@ -1458,7 +1446,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() require.NoErrorf(t, err, "expected txn1 to succeed") @@ -1564,9 +1552,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) } - // 7. txn1->n1: Put(b) -- Retry gets WriteTooOld, performs a - // serverside refresh, and succeeds. - // 8. txn1->n1: EndTxn(commit) -- Results in "transaction unexpectedly + // 7. txn1->n1: Put(b) -- Retry gets WriteTooOld; with ambiguous replay + // protection enabled, it should not be able to perform a serverside + // refresh. Fails, and should propagate the ambiguous error. + + // -- NB: Without ambiguous replay protection, txn1 would continue with: + + // 8. txn1->n1: EndTxn(commit) -- Would get "transaction unexpectedly // committed" due to the recovery completing first. // @@ -1581,20 +1573,11 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Start test, await concurrent operations and validate results. close(txn1Ready) err := <-txn1ResultCh - t.Logf("txn1 completed with err: %+v", err) + t.Logf("txn1 completed with err: %v", err) wg.Wait() - // TODO(sarkesian): While we expect an AmbiguousResultError once the - // immediate changes outlined in #103817 are implemented, right now this is - // essentially validating the existence of the bug. This needs to be fixed, - // and we should not see an assertion failure from the transaction - // coordinator once fixed. - tErr := (*kvpb.TransactionStatusError)(nil) - require.ErrorAsf(t, err, &tErr, - "expected TransactionStatusError due to being already committed") - require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, - "expected TransactionStatusError due to being already committed") - require.Truef(t, errors.HasAssertionFailure(err), - "expected AssertionFailedError due to sanity check on transaction already committed") + // NB: It is likely not possible to eliminate ambiguity in this case, as + // the original intents were already cleaned up. + validateTxnCommitAmbiguousError(t, err, "WriteTooOld" /* reason */) }) } diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index c661d714c815..16fb5c8cba44 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -2699,9 +2699,18 @@ message Header { // sender. repeated string profile_labels = 31 [(gogoproto.customname) = "ProfileLabels"]; + // AmbiguousReplayProtection, if set, prevents a retried write operation + // from being considered an idempotent replay of a successful prior attempt + // (of the same operation) if the request's write timestamp is different from + // the prior attempt's. This protection is required when there has been an + // ambiguous write (i.e. RPC error) on a batch that contained a commit, + // as the transaction may have already been considered implicitly committed, + // and/or been explicitly committed by a RecoverTxn request. See #103817. + bool ambiguous_replay_protection = 32; + reserved 7, 10, 12, 14, 20; - // Next ID: 32 + // Next ID: 33 } // BoundedStalenessHeader contains configuration values pertaining to bounded diff --git a/pkg/kv/kvpb/batch.go b/pkg/kv/kvpb/batch.go index 331164716f3b..c3df7a5eb68f 100644 --- a/pkg/kv/kvpb/batch.go +++ b/pkg/kv/kvpb/batch.go @@ -855,6 +855,9 @@ func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) { if ba.WaitPolicy != lock.WaitPolicy_Block { s.Printf(", [wait-policy: %s]", ba.WaitPolicy) } + if ba.AmbiguousReplayProtection { + s.Printf(", [protect-ambiguous-replay]") + } if ba.CanForwardReadTimestamp { s.Printf(", [can-forward-ts]") } diff --git a/pkg/kv/kvpb/string_test.go b/pkg/kv/kvpb/string_test.go index f51b45cdaa80..a8cfb0b8a9df 100644 --- a/pkg/kv/kvpb/string_test.go +++ b/pkg/kv/kvpb/string_test.go @@ -42,6 +42,7 @@ func TestBatchRequestString(t *testing.T) { txn.ID = uuid.NamespaceDNS ba.Txn = &txn ba.WaitPolicy = lock.WaitPolicy_Error + ba.AmbiguousReplayProtection = true ba.CanForwardReadTimestamp = true ba.BoundedStaleness = &kvpb.BoundedStalenessHeader{ MinTimestampBound: hlc.Timestamp{WallTime: 1}, @@ -58,7 +59,7 @@ func TestBatchRequestString(t *testing.T) { ba.Requests = append(ba.Requests, ru) { - exp := `Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min),... 76 skipped ..., Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]` + exp := `Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min),... 76 skipped ..., Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [protect-ambiguous-replay], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]` act := ba.String() require.Equal(t, exp, act) } diff --git a/pkg/kv/kvserver/batcheval/cmd_conditional_put.go b/pkg/kv/kvserver/batcheval/cmd_conditional_put.go index 20982a2fb57a..528556b8c1bb 100644 --- a/pkg/kv/kvserver/batcheval/cmd_conditional_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_conditional_put.go @@ -57,9 +57,10 @@ func ConditionalPut( handleMissing := storage.CPutMissingBehavior(args.AllowIfDoesNotExist) opts := storage.MVCCWriteOptions{ - Txn: h.Txn, - LocalTimestamp: cArgs.Now, - Stats: cArgs.Stats, + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, } var err error diff --git a/pkg/kv/kvserver/batcheval/cmd_delete.go b/pkg/kv/kvserver/batcheval/cmd_delete.go index e30e9e8a48cb..f0ba650fd2ce 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete.go @@ -32,9 +32,10 @@ func Delete( reply := resp.(*kvpb.DeleteResponse) opts := storage.MVCCWriteOptions{ - Txn: h.Txn, - LocalTimestamp: cArgs.Now, - Stats: cArgs.Stats, + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, } var err error diff --git a/pkg/kv/kvserver/batcheval/cmd_delete_range.go b/pkg/kv/kvserver/batcheval/cmd_delete_range.go index 4532b4533561..e105f108805e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete_range.go @@ -218,6 +218,14 @@ func DeleteRange( if !args.Inline { timestamp = h.Timestamp } + + opts := storage.MVCCWriteOptions{ + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, + } + // NB: Even if args.ReturnKeys is false, we want to know which intents were // written if we're evaluating the DeleteRange for a transaction so that we // can update the Result's AcquiredLocks field. @@ -225,8 +233,7 @@ func DeleteRange( deleted, resumeSpan, num, err := storage.MVCCDeleteRange( ctx, readWriter, args.Key, args.EndKey, h.MaxSpanRequestKeys, timestamp, - storage.MVCCWriteOptions{Txn: h.Txn, LocalTimestamp: cArgs.Now, Stats: cArgs.Stats}, - returnKeys) + opts, returnKeys) if err == nil && args.ReturnKeys { reply.Keys = deleted } diff --git a/pkg/kv/kvserver/batcheval/cmd_increment.go b/pkg/kv/kvserver/batcheval/cmd_increment.go index a78bba7fa61a..e15d68493a73 100644 --- a/pkg/kv/kvserver/batcheval/cmd_increment.go +++ b/pkg/kv/kvserver/batcheval/cmd_increment.go @@ -33,9 +33,10 @@ func Increment( reply := resp.(*kvpb.IncrementResponse) opts := storage.MVCCWriteOptions{ - Txn: h.Txn, - LocalTimestamp: cArgs.Now, - Stats: cArgs.Stats, + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, } newVal, err := storage.MVCCIncrement( diff --git a/pkg/kv/kvserver/batcheval/cmd_init_put.go b/pkg/kv/kvserver/batcheval/cmd_init_put.go index 0c93b15ecaa6..4df8c97d222d 100644 --- a/pkg/kv/kvserver/batcheval/cmd_init_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_init_put.go @@ -37,9 +37,10 @@ func InitPut( } opts := storage.MVCCWriteOptions{ - Txn: h.Txn, - LocalTimestamp: cArgs.Now, - Stats: cArgs.Stats, + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, } var err error diff --git a/pkg/kv/kvserver/batcheval/cmd_put.go b/pkg/kv/kvserver/batcheval/cmd_put.go index 56c4cb74724f..7ec7f93220c3 100644 --- a/pkg/kv/kvserver/batcheval/cmd_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_put.go @@ -53,9 +53,10 @@ func Put( } opts := storage.MVCCWriteOptions{ - Txn: h.Txn, - LocalTimestamp: cArgs.Now, - Stats: cArgs.Stats, + Txn: h.Txn, + LocalTimestamp: cArgs.Now, + Stats: cArgs.Stats, + ReplayWriteTimestampProtection: h.AmbiguousReplayProtection, } var err error diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 7723d885baa8..907348884395 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -1610,6 +1610,7 @@ func replayTransactionalWrite( value roachpb.Value, txn *roachpb.Transaction, valueFn func(optionalValue) (roachpb.Value, error), + replayWriteTimestampProtection bool, ) error { var writtenValue optionalValue var err error @@ -1695,6 +1696,14 @@ func replayTransactionalWrite( txn.ID, txn.Sequence, value.RawBytes, writtenValue.RawBytes) } + // If ambiguous replay protection is enabled, a replay that changes the + // timestamp should fail, as this would break idempotency (see #103817). + if replayWriteTimestampProtection && !txn.WriteTimestamp.Equal(meta.Txn.WriteTimestamp) { + return errors.Errorf("transaction %s with sequence %d prevented from changing "+ + "write timestamp from %s to %s due to ambiguous replay protection", + txn.ID, txn.Sequence, meta.Txn.WriteTimestamp, txn.WriteTimestamp) + } + return nil } @@ -1765,6 +1774,9 @@ func mvccPutInternal( if !value.Timestamp.IsEmpty() { return false, errors.Errorf("cannot have timestamp set in value") } + if err := opts.validate(); err != nil { + return false, err + } metaKey := MakeMVCCMetadataKey(key) ok, origMetaKeySize, origMetaValSize, origRealKeyChanged, err := @@ -1860,7 +1872,7 @@ func mvccPutInternal( // The transaction has executed at this sequence before. This is merely a // replay of the transactional write. Assert that all is in order and return // early. - return false, replayTransactionalWrite(ctx, iter, meta, key, readTimestamp, value, opts.Txn, valueFn) + return false, replayTransactionalWrite(ctx, iter, meta, key, readTimestamp, value, opts.Txn, valueFn, opts.ReplayWriteTimestampProtection) } // We're overwriting the intent that was present at this key, before we do @@ -3851,9 +3863,17 @@ func buildScanIntents(data []byte) ([]roachpb.Intent, error) { // MVCCWriteOptions bundles options for the MVCCPut and MVCCDelete families of functions. type MVCCWriteOptions struct { // See the comment on mvccPutInternal for details on these parameters. - Txn *roachpb.Transaction - LocalTimestamp hlc.ClockTimestamp - Stats *enginepb.MVCCStats + Txn *roachpb.Transaction + LocalTimestamp hlc.ClockTimestamp + Stats *enginepb.MVCCStats + ReplayWriteTimestampProtection bool +} + +func (opts *MVCCWriteOptions) validate() error { + if opts.ReplayWriteTimestampProtection && opts.Txn == nil { + return errors.Errorf("cannot enable replay protection without a transaction") + } + return nil } // MVCCScanOptions bundles options for the MVCCScan family of functions. diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index 5cc63e62abbb..d45014e793ae 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -87,15 +87,14 @@ var ( // check_intent k= [none] // add_lock t= k= // -// cput [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= v= [raw] [cond=] -// del [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= -// del_range [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= [end=] [max=] [returnKeys] +// cput [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] [ambiguousReplay] k= v= [raw] [cond=] +// del [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] [ambiguousReplay] k= +// del_range [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] [ambiguousReplay] k= [end=] [max=] [returnKeys] // del_range_ts [ts=[,]] [localTs=[,]] k= end= [idempotent] [noCoveredStats] // del_range_pred [ts=[,]] [localTs=[,]] k= end= [startTime=,max=,maxBytes=,rangeThreshold=] -// increment [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= [inc=] -// initput [t=] [ts=[,]] [resolve [status=]] k= v= [raw] [failOnTombstones] -// merge [t=] [ts=[,]] [resolve [status=]] k= v= [raw] -// put [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= v= [raw] +// increment [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] [ambiguousReplay] k= [inc=] +// initput [t=] [ts=[,]] [resolve [status=]] [ambiguousReplay] k= v= [raw] [failOnTombstones] +// put [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] [ambiguousReplay] k= v= [raw] // put_rangekey ts=[,] [localTs=[,]] k= end= // get [t=] [ts=[,]] [resolve [status=]] k= [inconsistent] [skipLocked] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=[,]] [globalUncertaintyLimit=[,]] [maxKeys=] [targetBytes=] [allowEmpty] // scan [t=] [ts=[,]] [resolve [status=]] k= [end=] [inconsistent] [skipLocked] [tombstones] [reverse] [failOnMoreRecent] [localUncertaintyLimit=[,]] [globalUncertaintyLimit=[,]] [max=] [targetbytes=] [wholeRows[=]] [allowEmpty] @@ -1088,9 +1087,10 @@ func cmdCPut(e *evalCtx) error { return e.withWriter("cput", func(rw storage.ReadWriter) error { opts := storage.MVCCWriteOptions{ - Txn: txn, - LocalTimestamp: localTs, - Stats: e.ms, + Txn: txn, + LocalTimestamp: localTs, + Stats: e.ms, + ReplayWriteTimestampProtection: e.getAmbiguousReplay(), } if err := storage.MVCCConditionalPut(e.ctx, rw, key, ts, val, expVal, behavior, opts); err != nil { return err @@ -1114,9 +1114,10 @@ func cmdInitPut(e *evalCtx) error { return e.withWriter("initput", func(rw storage.ReadWriter) error { opts := storage.MVCCWriteOptions{ - Txn: txn, - LocalTimestamp: localTs, - Stats: e.ms, + Txn: txn, + LocalTimestamp: localTs, + Stats: e.ms, + ReplayWriteTimestampProtection: e.getAmbiguousReplay(), } if err := storage.MVCCInitPut(e.ctx, rw, key, ts, val, failOnTombstones, opts); err != nil { return err @@ -1136,9 +1137,10 @@ func cmdDelete(e *evalCtx) error { resolve, resolveStatus := e.getResolve() return e.withWriter("del", func(rw storage.ReadWriter) error { opts := storage.MVCCWriteOptions{ - Txn: txn, - LocalTimestamp: localTs, - Stats: e.ms, + Txn: txn, + LocalTimestamp: localTs, + Stats: e.ms, + ReplayWriteTimestampProtection: e.getAmbiguousReplay(), } foundKey, err := storage.MVCCDelete(e.ctx, rw, key, ts, opts) if err == nil || errors.HasType(err, &kvpb.WriteTooOldError{}) { @@ -1170,9 +1172,10 @@ func cmdDeleteRange(e *evalCtx) error { resolve, resolveStatus := e.getResolve() return e.withWriter("del_range", func(rw storage.ReadWriter) error { opts := storage.MVCCWriteOptions{ - Txn: txn, - LocalTimestamp: localTs, - Stats: e.ms, + Txn: txn, + LocalTimestamp: localTs, + Stats: e.ms, + ReplayWriteTimestampProtection: e.getAmbiguousReplay(), } deleted, resumeSpan, num, err := storage.MVCCDeleteRange( e.ctx, rw, key, endKey, int64(max), ts, opts, returnKeys) @@ -1329,9 +1332,10 @@ func cmdIncrement(e *evalCtx) error { return e.withWriter("increment", func(rw storage.ReadWriter) error { opts := storage.MVCCWriteOptions{ - Txn: txn, - LocalTimestamp: localTs, - Stats: e.ms, + Txn: txn, + LocalTimestamp: localTs, + Stats: e.ms, + ReplayWriteTimestampProtection: e.getAmbiguousReplay(), } curVal, err := storage.MVCCIncrement(e.ctx, rw, key, ts, opts, inc) if err != nil { @@ -1370,9 +1374,10 @@ func cmdPut(e *evalCtx) error { return e.withWriter("put", func(rw storage.ReadWriter) error { opts := storage.MVCCWriteOptions{ - Txn: txn, - LocalTimestamp: localTs, - Stats: e.ms, + Txn: txn, + LocalTimestamp: localTs, + Stats: e.ms, + ReplayWriteTimestampProtection: e.getAmbiguousReplay(), } if err := storage.MVCCPut(e.ctx, rw, key, ts, val, opts); err != nil { return err @@ -2208,6 +2213,10 @@ func (e *evalCtx) getResolve() (bool, roachpb.TransactionStatus) { return true, e.getTxnStatus() } +func (e *evalCtx) getAmbiguousReplay() bool { + return e.hasArg("ambiguousReplay") +} + func (e *evalCtx) getTs(txn *roachpb.Transaction) hlc.Timestamp { return e.getTsWithTxnAndName(txn, "ts") } diff --git a/pkg/storage/testdata/mvcc_histories/ambiguous_writes b/pkg/storage/testdata/mvcc_histories/ambiguous_writes new file mode 100644 index 000000000000..8a6023e69c17 --- /dev/null +++ b/pkg/storage/testdata/mvcc_histories/ambiguous_writes @@ -0,0 +1,394 @@ +# Ambiguous replay protection is only valid on transactional writes. + +run error +del k=a ambiguousReplay +---- +>> at end: + +error: (*withstack.withStack:) cannot enable replay protection without a transaction + +# Idempotent replays should normally be allowed, even if they move the timestamp. + +run stats ok +with t=A k=a + txn_begin ts=11 + # Lay down an intent. + put v=first +---- +>> put v=first t=A k=a +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+61 live_count=+1 live_bytes=+75 intent_count=+1 intent_bytes=+22 separated_intent_count=+1 intent_age=+89 +>> at end: +txn: "A" meta={id=00000001 key="a" iso=Serializable pri=0.00000000 epo=0 ts=11.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=11.000000000,0 wto=false gul=0,0 +meta: "a"/0,0 -> txn={id=00000001 key="a" iso=Serializable pri=0.00000000 epo=0 ts=11.000000000,0 min=0,0 seq=0} ts=11.000000000,0 del=false klen=12 vlen=10 mergeTs= txnDidNotUpdateMeta=true +data: "a"/11.000000000,0 -> /BYTES/first +stats: key_count=1 key_bytes=14 val_count=1 val_bytes=61 live_count=1 live_bytes=75 intent_count=1 intent_bytes=22 separated_intent_count=1 intent_age=89 + +run stats ok +with t=A k=a + # Perform an idempotent replay, but at a higher ts. + txn_advance ts=12 + put v=first +---- +>> put v=first t=A k=a +stats: no change +>> at end: +txn: "A" meta={id=00000001 key="a" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=11.000000000,0 wto=false gul=0,0 +meta: "a"/0,0 -> txn={id=00000001 key="a" iso=Serializable pri=0.00000000 epo=0 ts=11.000000000,0 min=0,0 seq=0} ts=11.000000000,0 del=false klen=12 vlen=10 mergeTs= txnDidNotUpdateMeta=true +data: "a"/11.000000000,0 -> /BYTES/first +stats: key_count=1 key_bytes=14 val_count=1 val_bytes=61 live_count=1 live_bytes=75 intent_count=1 intent_bytes=22 separated_intent_count=1 intent_age=89 + +run ok +with t=A k=a + resolve_intent + txn_remove +---- +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first + +# Ambiguous replay protection should allow initial writes. + +run stats ok +with t=B k=k + txn_begin ts=0,1 + initput k=k ts=0,1 v=k1 ambiguousReplay +---- +>> initput k=k ts=0,1 v=k1 ambiguousReplay t=B k=k +stats: key_count=+1 key_bytes=+14 val_count=+1 val_bytes=+50 live_count=+1 live_bytes=+64 intent_count=+1 intent_bytes=+19 separated_intent_count=+1 intent_age=+100 +>> at end: +txn: "B" meta={id=00000002 key="k" iso=Serializable pri=0.00000000 epo=0 ts=0,1 min=0,0 seq=0} lock=true stat=PENDING rts=0,1 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000002 key="k" iso=Serializable pri=0.00000000 epo=0 ts=0,1 min=0,0 seq=0} ts=0,1 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=28 val_count=2 val_bytes=73 live_count=2 live_bytes=101 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=100 + +run ok +with t=B k=k + resolve_intent + txn_remove +---- +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/0,1 -> /BYTES/k1 + +# Ambiguous replay protection should not affect identically evaluating cputs. + +run stats ok +with t=C k=k + txn_begin ts=0,2 + cput v=k2 cond=k1 +---- +>> cput v=k2 cond=k1 t=C k=k +stats: key_bytes=+12 val_count=+1 val_bytes=+50 live_bytes=+43 gc_bytes_age=+1900 intent_count=+1 intent_bytes=+19 separated_intent_count=+1 intent_age=+100 +>> at end: +txn: "C" meta={id=00000003 key="k" iso=Serializable pri=0.00000000 epo=0 ts=0,2 min=0,0 seq=0} lock=true stat=PENDING rts=0,2 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000003 key="k" iso=Serializable pri=0.00000000 epo=0 ts=0,2 min=0,0 seq=0} ts=0,2 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=40 val_count=3 val_bytes=80 live_count=2 live_bytes=101 gc_bytes_age=1900 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=100 + +run stats ok +with t=C k=k + cput v=k2 cond=k1 ambiguousReplay +---- +>> cput v=k2 cond=k1 ambiguousReplay t=C k=k +stats: no change +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000003 key="k" iso=Serializable pri=0.00000000 epo=0 ts=0,2 min=0,0 seq=0} ts=0,2 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=40 val_count=3 val_bytes=80 live_count=2 live_bytes=101 gc_bytes_age=1900 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=100 + +run ok +with t=C k=k + resolve_intent + txn_remove +---- +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 + +# Ambiguous replay protection should prevent a replay from evaluating at a higher timestamp. + +run stats ok +with t=D k=k + txn_begin ts=3,0 + put v=k3 +---- +>> put v=k3 t=D k=k +stats: key_bytes=+12 val_count=+1 val_bytes=+58 live_bytes=+51 gc_bytes_age=+1843 intent_count=+1 intent_bytes=+19 separated_intent_count=+1 intent_age=+97 +>> at end: +txn: "D" meta={id=00000004 key="k" iso=Serializable pri=0.00000000 epo=0 ts=3.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=3.000000000,0 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000004 key="k" iso=Serializable pri=0.00000000 epo=0 ts=3.000000000,0 min=0,0 seq=0} ts=3.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=52 val_count=4 val_bytes=95 live_count=2 live_bytes=109 gc_bytes_age=3743 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=97 + +run stats error +with t=D k=k + txn_advance ts=3,1 + put v=k3 ambiguousReplay +---- +>> put v=k3 ambiguousReplay t=D k=k +stats: no change +>> at end: +txn: "D" meta={id=00000004 key="k" iso=Serializable pri=0.00000000 epo=0 ts=3.000000000,1 min=0,0 seq=0} lock=true stat=PENDING rts=3.000000000,0 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000004 key="k" iso=Serializable pri=0.00000000 epo=0 ts=3.000000000,0 min=0,0 seq=0} ts=3.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=52 val_count=4 val_bytes=95 live_count=2 live_bytes=109 gc_bytes_age=3743 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=97 +error: (*withstack.withStack:) transaction 00000004-0000-0000-0000-000000000000 with sequence 0 prevented from changing write timestamp from 3.000000000,0 to 3.000000000,1 due to ambiguous replay protection + +run ok +with t=D k=k + # Commit at the original timestamp (i.e. if committed by a recovery operation). + txn_advance ts=3,0 + resolve_intent + txn_remove +---- +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 + +# Ambiguous replay protection still results in WriteTooOld errors after intent cleanup. + +run stats ok +with t=E k=k + txn_begin ts=4,0 + del resolve +---- +>> del resolve t=E k=k +del: "k": found key true +stats: key_bytes=+12 val_count=+1 live_count=-1 live_bytes=-21 gc_bytes_age=+3168 +>> at end: +txn: "E" meta={id=00000005 key="k" iso=Serializable pri=0.00000000 epo=0 ts=4.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=4.000000000,0 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=64 val_count=5 val_bytes=44 live_count=1 live_bytes=37 gc_bytes_age=6911 + +run stats error +with t=E k=k + del ambiguousReplay +---- +>> del ambiguousReplay t=E k=k +del: "k": found key false +stats: no change +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=64 val_count=5 val_bytes=44 live_count=1 live_bytes=37 gc_bytes_age=6911 +error: (*kvpb.WriteTooOldError:) WriteTooOldError: write for key "k" at timestamp 4.000000000,0 too old; must write at or above 4.000000000,1 + +run ok +txn_remove t=E +---- +>> at end: + +# Ambiguous replay protects against timestamp change on point delete. + +run stats ok +with t=F k=k + txn_begin ts=5,0 + # Write an initial value at the first sequence number. + put v=k5 +---- +>> put v=k5 t=F k=k +stats: key_bytes=+12 val_count=+1 val_bytes=+58 live_count=+1 live_bytes=+72 gc_bytes_age=-192 intent_count=+1 intent_bytes=+19 separated_intent_count=+1 intent_age=+95 +>> at end: +txn: "F" meta={id=00000006 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=5.000000000,0 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000006 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,0 min=0,0 seq=0} ts=5.000000000,0 del=false klen=12 vlen=7 mergeTs= txnDidNotUpdateMeta=true +data: "k"/5.000000000,0 -> /BYTES/k5 +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=76 val_count=6 val_bytes=102 live_count=2 live_bytes=109 gc_bytes_age=6719 intent_count=1 intent_bytes=19 separated_intent_count=1 intent_age=95 + +run stats ok +with t=F k=k + txn_step + # Let's assume we had an RPC error prior to the first successful operation. + del ambiguousReplay +---- +>> del ambiguousReplay t=F k=k +del: "k": found key true +stats: val_bytes=+6 live_count=-1 live_bytes=-72 gc_bytes_age=+7410 intent_bytes=-7 +>> at end: +txn: "F" meta={id=00000006 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,0 min=0,0 seq=1} lock=true stat=PENDING rts=5.000000000,0 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000006 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,0 min=0,0 seq=1} ts=5.000000000,0 del=true klen=12 vlen=0 ih={{0 /BYTES/k5}} mergeTs= txnDidNotUpdateMeta=false +data: "k"/5.000000000,0 -> / +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=76 val_count=6 val_bytes=108 live_count=1 live_bytes=37 gc_bytes_age=14129 intent_count=1 intent_bytes=12 separated_intent_count=1 intent_age=95 + +run stats ok +with t=F k=k + txn_step n=-1 + # A replay of a lower sequence number with the same timestamp should be allowed. + put v=k5 ambiguousReplay +---- +>> put v=k5 ambiguousReplay t=F k=k +stats: no change +>> at end: +txn: "F" meta={id=00000006 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,0 min=0,0 seq=0} lock=true stat=PENDING rts=5.000000000,0 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000006 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,0 min=0,0 seq=1} ts=5.000000000,0 del=true klen=12 vlen=0 ih={{0 /BYTES/k5}} mergeTs= txnDidNotUpdateMeta=false +data: "k"/5.000000000,0 -> / +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=76 val_count=6 val_bytes=108 live_count=1 live_bytes=37 gc_bytes_age=14129 intent_count=1 intent_bytes=12 separated_intent_count=1 intent_age=95 + +run stats error +with t=F k=k + txn_step + txn_advance ts=5,1 + del ambiguousReplay +---- +>> del ambiguousReplay t=F k=k +stats: no change +>> at end: +txn: "F" meta={id=00000006 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,1 min=0,0 seq=1} lock=true stat=PENDING rts=5.000000000,0 wto=false gul=0,0 +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000006 key="k" iso=Serializable pri=0.00000000 epo=0 ts=5.000000000,0 min=0,0 seq=1} ts=5.000000000,0 del=true klen=12 vlen=0 ih={{0 /BYTES/k5}} mergeTs= txnDidNotUpdateMeta=false +data: "k"/5.000000000,0 -> / +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=76 val_count=6 val_bytes=108 live_count=1 live_bytes=37 gc_bytes_age=14129 intent_count=1 intent_bytes=12 separated_intent_count=1 intent_age=95 +error: (*withstack.withStack:) transaction 00000006-0000-0000-0000-000000000000 with sequence 1 prevented from changing write timestamp from 5.000000000,0 to 5.000000000,1 due to ambiguous replay protection + +run ok +with t=F k=k + resolve_intent status=ABORTED + txn_remove +---- +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 + +# Ambiguous replay protection prevents timestamp change on transactional DeleteRange. + +run ok +put k=k v=k6 ts=6,0 +---- +>> at end: +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/6.000000000,0 -> /BYTES/k6 +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 + +run stats ok +with t=G k=k + txn_begin ts=12,1 + del_range k=a end=z returnKeys +---- +>> del_range k=a end=z returnKeys t=G k=k +del_range: "a"-"z" -> deleted 2 key(s) +del_range: returned "a" +del_range: returned "k" +stats: key_bytes=+24 val_count=+2 val_bytes=+106 live_count=-2 live_bytes=-58 gc_bytes_age=+16544 intent_count=+2 intent_bytes=+24 separated_intent_count=+2 intent_age=+176 +>> at end: +txn: "G" meta={id=00000007 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,1 min=0,0 seq=0} lock=true stat=PENDING rts=12.000000000,1 wto=false gul=0,0 +meta: "a"/0,0 -> txn={id=00000007 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,1 min=0,0 seq=0} ts=12.000000000,1 del=true klen=12 vlen=0 mergeTs= txnDidNotUpdateMeta=true +data: "a"/12.000000000,1 -> / +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000007 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,1 min=0,0 seq=0} ts=12.000000000,1 del=true klen=12 vlen=0 mergeTs= txnDidNotUpdateMeta=true +data: "k"/12.000000000,1 -> / +data: "k"/6.000000000,0 -> /BYTES/k6 +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=100 val_count=8 val_bytes=157 gc_bytes_age=23263 intent_count=2 intent_bytes=24 separated_intent_count=2 intent_age=176 + +run stats ok +with t=G k=k + # First attempt a standard idempotent replay at a higher timestamp. + txn_advance ts=12,2 + del_range k=a end=z returnKeys +---- +>> del_range k=a end=z returnKeys t=G k=k +del_range: "a"-"z" -> deleted 2 key(s) +del_range: returned "a" +del_range: returned "k" +stats: no change +>> at end: +txn: "G" meta={id=00000007 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,2 min=0,0 seq=0} lock=true stat=PENDING rts=12.000000000,1 wto=false gul=0,0 +meta: "a"/0,0 -> txn={id=00000007 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,1 min=0,0 seq=0} ts=12.000000000,1 del=true klen=12 vlen=0 mergeTs= txnDidNotUpdateMeta=true +data: "a"/12.000000000,1 -> / +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000007 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,1 min=0,0 seq=0} ts=12.000000000,1 del=true klen=12 vlen=0 mergeTs= txnDidNotUpdateMeta=true +data: "k"/12.000000000,1 -> / +data: "k"/6.000000000,0 -> /BYTES/k6 +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=100 val_count=8 val_bytes=157 gc_bytes_age=23263 intent_count=2 intent_bytes=24 separated_intent_count=2 intent_age=176 + +run stats error +with t=G k=k + txn_advance ts=12,3 + # However with ambiguous replay protection, a timestamp change should error. + del_range k=a end=z ambiguousReplay returnKeys +---- +>> del_range k=a end=z ambiguousReplay returnKeys t=G k=k +stats: no change +>> at end: +txn: "G" meta={id=00000007 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,3 min=0,0 seq=0} lock=true stat=PENDING rts=12.000000000,1 wto=false gul=0,0 +meta: "a"/0,0 -> txn={id=00000007 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,1 min=0,0 seq=0} ts=12.000000000,1 del=true klen=12 vlen=0 mergeTs= txnDidNotUpdateMeta=true +data: "a"/12.000000000,1 -> / +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +meta: "k"/0,0 -> txn={id=00000007 key="k" iso=Serializable pri=0.00000000 epo=0 ts=12.000000000,1 min=0,0 seq=0} ts=12.000000000,1 del=true klen=12 vlen=0 mergeTs= txnDidNotUpdateMeta=true +data: "k"/12.000000000,1 -> / +data: "k"/6.000000000,0 -> /BYTES/k6 +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1 +stats: key_count=2 key_bytes=100 val_count=8 val_bytes=157 gc_bytes_age=23263 intent_count=2 intent_bytes=24 separated_intent_count=2 intent_age=176 +error: (*withstack.withStack:) transaction 00000007-0000-0000-0000-000000000000 with sequence 0 prevented from changing write timestamp from 12.000000000,1 to 12.000000000,3 due to ambiguous replay protection + +run ok +with t=G + # Commit at the last "correct" timestamp. + txn_advance ts=12,2 + resolve_intent k=a + resolve_intent k=k + txn_remove +---- +>> at end: +data: "a"/12.000000000,2 -> {localTs=12.000000000,1}/ +data: "a"/12.000000000,0 -> {localTs=11.000000000,0}/BYTES/first +data: "k"/12.000000000,2 -> {localTs=12.000000000,1}/ +data: "k"/6.000000000,0 -> /BYTES/k6 +data: "k"/4.000000000,0 -> / +data: "k"/3.000000000,0 -> /BYTES/k3 +data: "k"/0,2 -> /BYTES/k2 +data: "k"/0,1 -> /BYTES/k1